diff --git a/servers/fastapi/api/v1/ppt/endpoints/chat.py b/servers/fastapi/api/v1/ppt/endpoints/chat.py index 76b744a5..49d25de1 100644 --- a/servers/fastapi/api/v1/ppt/endpoints/chat.py +++ b/servers/fastapi/api/v1/ppt/endpoints/chat.py @@ -1,8 +1,18 @@ -from fastapi import APIRouter, Depends +import json + +from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import StreamingResponse from sqlalchemy.ext.asyncio import AsyncSession from models.chat import ChatMessageRequest, ChatMessageResponse -from services.chat import PresentationChatService +from models.sse_response import ( + SSECompleteResponse, + SSEErrorResponse, + SSEStatusResponse, + SSETraceResponse, + SSEResponse, +) +from services.chat import ChatTurnResult, PresentationChatService from services.database import get_async_session CHAT_ROUTER = APIRouter(prefix="/chat", tags=["Chat"]) @@ -24,3 +34,43 @@ async def chat_message( response=result.response_text, tool_calls=result.tool_calls, ) + + +@CHAT_ROUTER.post("/message/stream") +async def chat_message_stream( + payload: ChatMessageRequest, + sql_session: AsyncSession = Depends(get_async_session), +): + service = PresentationChatService( + sql_session=sql_session, + presentation_id=payload.presentation_id, + conversation_id=payload.conversation_id, + ) + + async def inner(): + try: + async for event_type, value in service.stream_reply(payload.message): + if event_type == "chunk" and isinstance(value, str): + yield SSEResponse( + event="response", + data=json.dumps({"type": "chunk", "chunk": value}), + ).to_string() + elif event_type == "status" and isinstance(value, str): + yield SSEStatusResponse(status=value).to_string() + elif event_type == "trace" and isinstance(value, dict): + yield SSETraceResponse(trace=value).to_string() + elif event_type == "complete" and isinstance(value, ChatTurnResult): + result = value + complete_payload = ChatMessageResponse( + conversation_id=result.conversation_id, + response=result.response_text, + tool_calls=result.tool_calls, + ) + yield SSECompleteResponse( + key="chat", + value=complete_payload.model_dump(mode="json"), + ).to_string() + except HTTPException as exc: + yield SSEErrorResponse(detail=exc.detail).to_string() + + return StreamingResponse(inner(), media_type="text/event-stream") diff --git a/servers/fastapi/models/sse_response.py b/servers/fastapi/models/sse_response.py index 18dc9a47..64213f06 100644 --- a/servers/fastapi/models/sse_response.py +++ b/servers/fastapi/models/sse_response.py @@ -20,6 +20,15 @@ class SSEStatusResponse(BaseModel): ).to_string() +class SSETraceResponse(BaseModel): + trace: object + + def to_string(self): + return SSEResponse( + event="response", data=json.dumps({"type": "trace", "trace": self.trace}) + ).to_string() + + class SSEErrorResponse(BaseModel): detail: str diff --git a/servers/fastapi/services/chat/service.py b/servers/fastapi/services/chat/service.py index 545189b0..c754691c 100644 --- a/servers/fastapi/services/chat/service.py +++ b/servers/fastapi/services/chat/service.py @@ -2,8 +2,9 @@ import asyncio import json import logging import uuid +from collections.abc import AsyncGenerator from dataclasses import dataclass -from typing import Any +from typing import Any, Literal from fastapi import HTTPException from llmai import get_client # type: ignore[import-not-found] @@ -24,7 +25,11 @@ from services.chat.tools import ChatTools from utils.llm_client_error_handler import handle_llm_client_exceptions from utils.llm_config import get_llm_config from utils.llm_provider import get_model -from utils.llm_utils import extract_text, get_generate_kwargs +from utils.llm_utils import ( + extract_text, + get_generate_kwargs, + stream_generate_events, +) LOGGER = logging.getLogger(__name__) MAX_TOOL_ROUNDS = 16 @@ -37,6 +42,10 @@ class ChatTurnResult: tool_calls: list[str] +ChatStreamEventType = Literal["chunk", "complete", "status", "trace"] +ChatStreamEventValue = str | ChatTurnResult | dict[str, Any] + + class PresentationChatService: def __init__( self, @@ -53,6 +62,146 @@ class PresentationChatService: self._tools = ChatTools(self._memory) async def generate_reply(self, user_message: str) -> ChatTurnResult: + conversation_id, messages = await self._prepare_turn_context(user_message) + response_text, tool_calls = await self._run_llm_with_tools(messages) + return await self._persist_turn( + conversation_id=conversation_id, + user_message=user_message, + response_text=response_text, + tool_calls=tool_calls, + ) + + async def stream_reply( + self, user_message: str + ) -> AsyncGenerator[tuple[ChatStreamEventType, ChatStreamEventValue], None]: + yield "status", "Preparing context" + conversation_id, messages = await self._prepare_turn_context(user_message) + yield "status", "Thinking" + + client = get_client(config=get_llm_config()) + model = get_model() + tools = self._tools.get_tool_definitions() + + called_tools: list[str] = [] + last_tool_results: list[dict[str, Any]] = [] + response_text: str | None = None + + for round_index in range(MAX_TOOL_ROUNDS): + completion_chunk: Any | None = None + round_content_chunks: list[str] = [] + + try: + async for event in stream_generate_events( + client, + **get_generate_kwargs( + model=model, + messages=messages, + tools=tools, + stream=True, + ), + ): + if getattr(event, "type", None) == "content": + chunk = getattr(event, "chunk", None) + if chunk: + round_content_chunks.append(chunk) + yield "chunk", chunk + elif getattr(event, "type", None) == "completion": + completion_chunk = event + except Exception as exc: + raise handle_llm_client_exceptions(exc) + + completion_tool_calls = list( + getattr(completion_chunk, "tool_calls", []) or [] + ) + if completion_tool_calls: + tool_names = [tool_call.name for tool_call in completion_tool_calls] + called_tools.extend(tool_names) + yield "trace", { + "kind": "tool_plan", + "round": round_index + 1, + "tools": tool_names, + "message": f"Using tools: {', '.join(tool_names)}", + } + messages = ( + list(getattr(completion_chunk, "messages", []) or []) + if getattr(completion_chunk, "messages", None) + else list(messages) + ) + + last_tool_results = [] + for tool_call in completion_tool_calls: + yield "trace", { + "kind": "tool_call", + "round": round_index + 1, + "tool": tool_call.name, + "status": "start", + "message": f"Running {tool_call.name}", + } + tool_result = await self._tools.execute_tool_call(tool_call) + last_tool_results.append(tool_result) + yield "trace", { + "kind": "tool_call", + "round": round_index + 1, + "tool": tool_call.name, + "status": "success" if tool_result.get("ok") else "error", + "message": self._summarize_tool_result( + tool_call.name, tool_result + ), + } + tool_response_content = json.dumps(tool_result, ensure_ascii=False) + messages.append( + ToolResponseMessage( + id=tool_call.id, + content=[tool_response_content], + ) + ) + yield "status", "Thinking" + continue + + response_text = "".join(round_content_chunks) + if not response_text and completion_chunk: + response_text = extract_text(getattr(completion_chunk, "content", None)) + if not response_text: + response_text = "I could not generate a response for that request." + + if not round_content_chunks: + yield "chunk", response_text + break + else: + LOGGER.warning("Max tool rounds reached in chat stream flow") + yield "trace", { + "kind": "limit", + "message": ( + "Reached tool-call limit before final answer; " + "attempting best-effort summary." + ), + } + yield "status", "Finalizing response" + response_text = await self._try_final_response_without_tools( + client=client, + model=model, + messages=messages, + ) + if not response_text: + response_text = self._build_tool_limit_fallback(last_tool_results) + yield "chunk", response_text + + final_response_text = response_text or "I could not generate a response for that request." + if response_text is None: + yield "chunk", final_response_text + + yield "status", "Saving conversation" + result = await self._persist_turn( + conversation_id=conversation_id, + user_message=user_message, + response_text=final_response_text, + tool_calls=called_tools, + ) + yield "complete", result + + async def _prepare_turn_context( + self, user_message: str + ) -> tuple[uuid.UUID, list[Message]]: if not (user_message or "").strip(): raise HTTPException(status_code=400, detail="Message is required") @@ -75,8 +224,16 @@ class PresentationChatService: *history_messages, UserMessage(content=user_message), ] + return conversation_id, messages - response_text, tool_calls = await self._run_llm_with_tools(messages) + async def _persist_turn( + self, + *, + conversation_id: uuid.UUID, + user_message: str, + response_text: str, + tool_calls: list[str], + ) -> ChatTurnResult: await self._conversation_store.append_turn( presentation_id=self._presentation_id, conversation_id=conversation_id, @@ -190,6 +347,38 @@ class PresentationChatService: "within the tool limit. Please ask a follow-up and I will continue." ) + @staticmethod + def _summarize_tool_result(tool_name: str, tool_result: dict[str, Any]) -> str: + if not tool_result.get("ok"): + error = tool_result.get("error") + if isinstance(error, str) and error.strip(): + return f"{tool_name} failed: {error.strip()}" + return f"{tool_name} failed." + + result = tool_result.get("result") + if isinstance(result, dict): + message = result.get("message") + if isinstance(message, str) and message.strip(): + return message.strip() + + note = result.get("note") + if isinstance(note, str) and note.strip(): + return note.strip() + + count = result.get("count") + if isinstance(count, int): + return f"{tool_name} returned {count} result(s)." + + found = result.get("found") + if isinstance(found, bool): + return ( + f"{tool_name} found requested data." + if found + else f"{tool_name} did not find matching data." + ) + + return f"{tool_name} completed." + @staticmethod def _convert_history_to_messages(history: list[dict[str, str]]) -> list[Message]: messages: list[Message] = [] diff --git a/servers/nextjs/app/(presentation-generator)/presentation/components/Chat.tsx b/servers/nextjs/app/(presentation-generator)/presentation/components/Chat.tsx index 2d98b040..6e5699e1 100644 --- a/servers/nextjs/app/(presentation-generator)/presentation/components/Chat.tsx +++ b/servers/nextjs/app/(presentation-generator)/presentation/components/Chat.tsx @@ -1,13 +1,18 @@ "use client"; import { + Activity, + CheckCircle2, + ChevronDown, ChevronRight, + ChevronUp, + CircleDot, Loader2, - MessageCircle, MessageCircleMore, Plus, RefreshCw, Send, + XCircle, } from "lucide-react"; import React, { FormEvent, @@ -19,6 +24,7 @@ import React, { } from "react"; import { toast } from "sonner"; import { PresentationChatApi } from "../../services/api/chat"; +import type { ChatStreamTrace } from "../../services/api/chat"; const suggestions: { id: string; icon: ReactNode; suggestion: string }[] = [ { @@ -223,6 +229,7 @@ type ChatMessage = { role: "user" | "assistant" | "error"; content: string; toolCalls?: string[]; + activity?: AssistantActivity[]; }; type ChatProps = { @@ -231,6 +238,12 @@ type ChatProps = { onPresentationChanged?: () => Promise | void; }; +type AssistantActivity = { + id: string; + label: string; + state: "running" | "success" | "error" | "info"; +}; + const createMessageId = () => { if (typeof crypto !== "undefined" && "randomUUID" in crypto) { return crypto.randomUUID(); @@ -246,6 +259,73 @@ const AssistantMarker = () => ( ); +const inferStatusState = (status: string): AssistantActivity["state"] => { + const normalized = status.trim().toLowerCase(); + if ( + normalized.includes("preparing") || + normalized.includes("thinking") || + normalized.includes("finalizing") || + normalized.includes("saving") + ) { + return "running"; + } + + return "info"; +}; + +const formatTraceActivity = ( + trace: ChatStreamTrace +): Omit | null => { + if (typeof trace.message === "string" && trace.message.trim().length > 0) { + return { + label: trace.message.trim(), + state: + trace.status === "error" + ? "error" + : trace.status === "success" + ? "success" + : "running", + }; + } + + if (trace.tool && trace.status === "start") { + return { label: `Running ${trace.tool}`, state: "running" }; + } + + if (trace.tool && trace.status === "success") { + return { label: `${trace.tool} completed`, state: "success" }; + } + + if (trace.tool && trace.status === "error") { + return { label: `${trace.tool} failed`, state: "error" }; + } + + if (trace.kind === "tool_plan" && Array.isArray(trace.tools) && trace.tools.length) { + return { + label: `Using tools: ${trace.tools.join(", ")}`, + state: "info", + }; + } + + return null; +}; + +const ActivityIcon = ({ state }: { state: AssistantActivity["state"] }) => { + if (state === "running") { + return ; + } + + if (state === "success") { + return ; + } + + if (state === "error") { + return ; + } + + return ; +}; + const Chat = ({ presentationId, currentSlide, @@ -253,8 +333,12 @@ const Chat = ({ }: ChatProps) => { const [input, setInput] = useState(""); const [messages, setMessages] = useState([]); + const [conversationId, setConversationId] = useState(null); const [isSending, setIsSending] = useState(false); const [errorMessage, setErrorMessage] = useState(null); + const [expandedActivityByMessage, setExpandedActivityByMessage] = useState< + Record + >({}); const inputRef = useRef(null); const messagesEndRef = useRef(null); @@ -262,7 +346,9 @@ const Chat = ({ useEffect(() => { setMessages([]); setInput(""); + setConversationId(null); setErrorMessage(null); + setExpandedActivityByMessage({}); }, [presentationId]); useEffect(() => { @@ -283,7 +369,9 @@ const Chat = ({ const resetChat = () => { setMessages([]); setInput(""); + setConversationId(null); setErrorMessage(null); + setExpandedActivityByMessage({}); inputRef.current?.focus(); }; @@ -301,6 +389,107 @@ const Chat = ({ } }; + const appendAssistantActivity = ( + assistantMessageId: string, + activity: Omit + ) => { + const normalizedLabel = activity.label.trim(); + if (!normalizedLabel) { + return; + } + + setMessages((previous) => + previous.map((message) => { + if (message.id !== assistantMessageId) { + return message; + } + + const currentActivity = message.activity ?? []; + const lastActivity = currentActivity[currentActivity.length - 1]; + if ( + lastActivity && + lastActivity.label === normalizedLabel && + lastActivity.state === activity.state + ) { + return message; + } + + const settledActivity: AssistantActivity[] = + lastActivity && lastActivity.state === "running" + ? [ + ...currentActivity.slice(0, -1), + { + ...lastActivity, + state: + activity.state === "error" + ? "error" + : ("success" as AssistantActivity["state"]), + }, + ] + : currentActivity; + + const lastSettledActivity = settledActivity[settledActivity.length - 1]; + if ( + lastSettledActivity && + lastSettledActivity.label === normalizedLabel && + lastSettledActivity.state !== activity.state + ) { + return { + ...message, + activity: [ + ...settledActivity.slice(0, -1), + { + ...lastSettledActivity, + state: activity.state, + }, + ], + }; + } + + return { + ...message, + activity: [ + ...settledActivity, + { + id: createMessageId(), + label: normalizedLabel, + state: activity.state, + }, + ], + }; + }) + ); + }; + + const settleAssistantActivities = ( + assistantMessageId: string, + finalState: "success" | "error" + ) => { + setMessages((previous) => + previous.map((message) => { + if (message.id !== assistantMessageId || !message.activity?.length) { + return message; + } + + return { + ...message, + activity: message.activity.map((activityItem) => + activityItem.state === "running" + ? { ...activityItem, state: finalState } + : activityItem + ), + }; + }) + ); + }; + + const toggleActivityExpanded = (messageId: string) => { + setExpandedActivityByMessage((previous) => ({ + ...previous, + [messageId]: !previous[messageId], + })); + }; + const submitMessage = async (rawMessage: string) => { const trimmedMessage = rawMessage.trim(); @@ -319,28 +508,81 @@ const Chat = ({ content: trimmedMessage, }; - setMessages((previous) => [...previous, userMessage]); + const assistantMessageId = createMessageId(); + setMessages((previous) => [ + ...previous, + userMessage, + { + id: assistantMessageId, + role: "assistant", + content: "", + toolCalls: [], + activity: [], + }, + ]); + setExpandedActivityByMessage((previous) => ({ + ...previous, + [assistantMessageId]: true, + })); setInput(""); setErrorMessage(null); setIsSending(true); try { - const response = await PresentationChatApi.sendMessage({ - presentation_id: presentationId, - message: buildBackendMessage(trimmedMessage), - }); - - setMessages((previous) => [ - ...previous, + const response = await PresentationChatApi.streamMessage( { - id: createMessageId(), - role: "assistant", - content: response.response, - toolCalls: Array.isArray(response.tool_calls) - ? response.tool_calls - : [], + presentation_id: presentationId, + message: buildBackendMessage(trimmedMessage), + conversation_id: conversationId ?? undefined, }, - ]); + { + onChunk: (chunk) => { + setMessages((previous) => + previous.map((message) => + message.id === assistantMessageId + ? { + ...message, + content: `${message.content}${chunk}`, + } + : message + ) + ); + }, + onStatus: (status) => { + appendAssistantActivity(assistantMessageId, { + label: status, + state: inferStatusState(status), + }); + }, + onTrace: (trace) => { + const traceActivity = formatTraceActivity(trace); + if (!traceActivity) { + return; + } + appendAssistantActivity(assistantMessageId, traceActivity); + }, + } + ); + + setMessages((previous) => + previous.map((message) => + message.id === assistantMessageId + ? { + ...message, + content: response.response, + toolCalls: Array.isArray(response.tool_calls) + ? response.tool_calls + : [], + } + : message + ) + ); + settleAssistantActivities(assistantMessageId, "success"); + setConversationId((previous) => + typeof response.conversation_id === "string" + ? response.conversation_id + : previous + ); await refreshPresentationIfNeeded( Array.isArray(response.tool_calls) ? response.tool_calls : [] @@ -349,6 +591,11 @@ const Chat = ({ const message = error instanceof Error ? error.message : "Failed to send chat message"; + settleAssistantActivities(assistantMessageId, "error"); + appendAssistantActivity(assistantMessageId, { + label: message, + state: "error", + }); setErrorMessage(message); setMessages((previous) => [ ...previous, @@ -485,8 +732,58 @@ const Chat = ({ : "text-[#535862]" }`} > - {message.content} + {message.content || + (isSending && message.role === "assistant" + ? message.activity?.[message.activity.length - 1]?.label || + "Working on it..." + : "")} + {message.activity && message.activity.length > 0 && ( +
+ + {!expandedActivityByMessage[message.id] && ( +
+ {message.activity[message.activity.length - 1]?.label} +
+ )} + {expandedActivityByMessage[message.id] && ( +
+
+ {message.activity.map((activityItem) => ( +
+ + + + {activityItem.label} +
+ ))} +
+
+ )} +
+ )} {message.toolCalls && message.toolCalls.length > 0 && (
{message.toolCalls.map((toolCall) => ( @@ -505,16 +802,6 @@ const Chat = ({
)} - {isSending && ( -
- -
- Got it. Let me analyze your slide - -
-
- )} -
diff --git a/servers/nextjs/app/(presentation-generator)/services/api/chat.ts b/servers/nextjs/app/(presentation-generator)/services/api/chat.ts index d636ff56..1aad7c7a 100644 --- a/servers/nextjs/app/(presentation-generator)/services/api/chat.ts +++ b/servers/nextjs/app/(presentation-generator)/services/api/chat.ts @@ -5,6 +5,7 @@ import { getHeader } from "./header"; export interface ChatMessageRequest { presentation_id: string; message: string; + conversation_id?: string; } export interface ChatMessageResponse { @@ -13,6 +14,55 @@ export interface ChatMessageResponse { tool_calls?: string[]; } +export interface ChatStreamTrace { + kind?: string; + round?: number; + tool?: string; + status?: string; + message?: string; + tools?: string[]; +} + +export interface ChatStreamHandlers { + onChunk?: (chunk: string) => void; + onStatus?: (status: string) => void; + onTrace?: (trace: ChatStreamTrace) => void; + onComplete?: (response: ChatMessageResponse) => void; +} + +interface ChatStreamDataChunk { + type: "chunk"; + chunk?: unknown; +} + +interface ChatStreamDataComplete { + type: "complete"; + chat?: unknown; +} + +interface ChatStreamDataError { + type: "error"; + detail?: unknown; +} + +interface ChatStreamDataStatus { + type: "status"; + status?: unknown; +} + +interface ChatStreamDataTrace { + type: "trace"; + trace?: unknown; +} + +type ChatStreamData = + | ChatStreamDataChunk + | ChatStreamDataComplete + | ChatStreamDataError + | ChatStreamDataStatus + | ChatStreamDataTrace + | Record; + export class PresentationChatApi { static async sendMessage( payload: ChatMessageRequest @@ -29,4 +79,170 @@ export class PresentationChatApi { "Failed to send chat message" ); } + + static async streamMessage( + payload: ChatMessageRequest, + handlers: ChatStreamHandlers = {}, + options?: { signal?: AbortSignal } + ): Promise { + const response = await fetch(getApiUrl("/api/v1/ppt/chat/message/stream"), { + method: "POST", + headers: getHeader(), + body: JSON.stringify(payload), + cache: "no-cache", + signal: options?.signal, + }); + + if (!response.ok) { + await ApiResponseHandler.handleResponse( + response, + "Failed to stream chat message" + ); + throw new Error("Failed to stream chat message"); + } + + if (!response.body) { + throw new Error("No response body received from chat stream"); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder("utf-8"); + let buffer = ""; + let finalResponse: ChatMessageResponse | null = null; + + const processSseFrame = (frame: string) => { + const normalized = frame.replaceAll("\r", ""); + const lines = normalized.split("\n"); + let eventName = ""; + const dataLines: string[] = []; + + for (const line of lines) { + if (line.startsWith("event:")) { + eventName = line.slice(6).trim(); + continue; + } + if (line.startsWith("data:")) { + dataLines.push(line.slice(5).trimStart()); + } + } + + if (eventName && eventName !== "response") { + return; + } + if (!dataLines.length) { + return; + } + + let parsedData: ChatStreamData; + try { + parsedData = JSON.parse(dataLines.join("\n")) as ChatStreamData; + } catch { + return; + } + + const payloadType = parsedData.type; + if (payloadType === "chunk") { + const chunk = parsedData.chunk; + if (typeof chunk === "string" && chunk.length > 0) { + handlers.onChunk?.(chunk); + } + return; + } + + if (payloadType === "complete") { + const chatPayload = (parsedData as ChatStreamDataComplete).chat; + if ( + chatPayload && + typeof chatPayload === "object" && + typeof (chatPayload as { response?: unknown }).response === "string" + ) { + const typedResponse: ChatMessageResponse = { + conversation_id: + typeof (chatPayload as { conversation_id?: unknown }) + .conversation_id === "string" + ? (chatPayload as { conversation_id?: string }).conversation_id + : undefined, + response: (chatPayload as { response: string }).response, + tool_calls: Array.isArray( + (chatPayload as { tool_calls?: unknown }).tool_calls + ) + ? ( + (chatPayload as { tool_calls?: unknown[] }).tool_calls ?? [] + ).filter((item): item is string => typeof item === "string") + : [], + }; + finalResponse = typedResponse; + handlers.onComplete?.(typedResponse); + } + return; + } + + if (payloadType === "error") { + const detail = (parsedData as ChatStreamDataError).detail; + const message = + typeof detail === "string" && detail.trim().length > 0 + ? detail + : "Chat stream failed"; + throw new Error(message); + } + + if (payloadType === "status") { + const status = (parsedData as ChatStreamDataStatus).status; + if (typeof status === "string" && status.trim().length > 0) { + handlers.onStatus?.(status); + } + return; + } + + if (payloadType === "trace") { + const trace = (parsedData as ChatStreamDataTrace).trace; + if (trace && typeof trace === "object") { + const typedTrace = trace as Record; + handlers.onTrace?.({ + kind: + typeof typedTrace.kind === "string" ? typedTrace.kind : undefined, + round: + typeof typedTrace.round === "number" ? typedTrace.round : undefined, + tool: + typeof typedTrace.tool === "string" ? typedTrace.tool : undefined, + status: + typeof typedTrace.status === "string" ? typedTrace.status : undefined, + message: + typeof typedTrace.message === "string" ? typedTrace.message : undefined, + tools: Array.isArray(typedTrace.tools) + ? typedTrace.tools.filter( + (value): value is string => typeof value === "string" + ) + : undefined, + }); + } + } + }; + + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + buffer += decoder.decode(value, { stream: true }); + + let delimiterIndex = buffer.indexOf("\n\n"); + while (delimiterIndex >= 0) { + const frame = buffer.slice(0, delimiterIndex); + buffer = buffer.slice(delimiterIndex + 2); + processSseFrame(frame); + delimiterIndex = buffer.indexOf("\n\n"); + } + } + + if (buffer.trim().length > 0) { + processSseFrame(buffer); + } + + if (finalResponse) { + return finalResponse; + } + + throw new Error("Chat stream ended before completion"); + } }