diff --git a/servers/fastapi/services/llm_client.py b/servers/fastapi/services/llm_client.py index 1c4d8e5d..b47760dd 100644 --- a/servers/fastapi/services/llm_client.py +++ b/servers/fastapi/services/llm_client.py @@ -1,10 +1,9 @@ import asyncio import dirtyjson -import httpx import json from typing import AsyncGenerator, List, Optional from fastapi import HTTPException -from openai import AsyncOpenAI +from openai import APIStatusError, AsyncOpenAI, OpenAIError from openai.types.chat.chat_completion_chunk import ( ChatCompletionChunk as OpenAIChatCompletionChunk, ) @@ -133,7 +132,7 @@ class LLMClient: case LLMProvider.CUSTOM: return self._get_custom_client() case LLMProvider.CODEX: - return None # Codex uses direct httpx calls, not self._client + return None # Codex uses _get_codex_client() with AsyncOpenAI, not self._client case _: raise HTTPException( status_code=400, @@ -230,6 +229,25 @@ class LLMClient: "accept": "text/event-stream", } + def _get_codex_client(self) -> AsyncOpenAI: + """Return an AsyncOpenAI client configured for the Codex Responses API. + Client is built per call so headers/token are fresh after refresh. + Only Codex-specific headers are passed; content-type and accept are left + to the SDK so the server does not reject the request. + """ + headers = self._get_codex_headers() + access_token = (headers.get("Authorization") or "").replace("Bearer ", "").strip() + skip = {"authorization", "content-type", "accept"} + default_headers = { + k: v for k, v in headers.items() if k.lower() not in skip + } + return AsyncOpenAI( + base_url="https://chatgpt.com/backend-api/codex", + api_key=access_token or "codex", + default_headers=default_headers, + timeout=120.0, + ) + # ------------------------------------------------------------------------- # Codex (Responses API) helpers # ------------------------------------------------------------------------- @@ -290,6 +308,19 @@ class LLMClient: return body + def _codex_event_to_dict(self, event) -> dict: + """Convert SDK ResponseStreamEvent to dict for existing consumers.""" + if hasattr(event, "model_dump"): + return event.model_dump() + return { + "type": getattr(event, "type", None), + "delta": getattr(event, "delta", None), + "item": getattr(event, "item", None), + "message": getattr(event, "message", None), + "arguments": getattr(event, "arguments", None), + "name": getattr(event, "name", None), + } + async def _stream_codex_raw( self, model: str, @@ -297,32 +328,42 @@ class LLMClient: tools: Optional[List[dict]] = None, ): """Async generator of raw SSE event dicts from the Codex Responses API.""" - headers = self._get_codex_headers() + print( + f"Codex raw stream: starting model={model} messages={len(messages)} tools_count={len(tools) if tools else 0}" + ) + client = self._get_codex_client() body = self._build_codex_body(model, messages, tools) + create_kwargs = {k: v for k, v in body.items() if k != "stream"} - async with httpx.AsyncClient(timeout=120.0) as http_client: - async with http_client.stream( - "POST", - "https://chatgpt.com/backend-api/codex/responses", - json=body, - headers=headers, - ) as resp: - if resp.status_code != 200: - error_text = await resp.aread() - raise HTTPException( - status_code=resp.status_code, - detail=f"Codex API error {resp.status_code}: {error_text.decode(errors='replace')[:400]}", - ) - async for line in resp.aiter_lines(): - if not line.startswith("data: "): - continue - data = line[6:].strip() - if not data or data == "[DONE]": - continue - try: - yield json.loads(data) - except json.JSONDecodeError: - continue + try: + stream = await client.responses.create( + stream=True, + **create_kwargs, + ) + print("Codex raw stream: create() returned, iterating events") + except (APIStatusError, OpenAIError) as e: + status = getattr(e, "status_code", 502) + detail = getattr(e, "message", str(e)) or str(e) + print(f"Codex raw stream: API error status={status} detail={detail[:200]}") + raise HTTPException( + status_code=status, + detail=f"Codex API error: {detail}"[:400], + ) from e + + event_count = 0 + event_types: dict[str, int] = {} + async for event in stream: + event_count += 1 + d = self._codex_event_to_dict(event) + et = d.get("type") or "unknown" + event_types[et] = event_types.get(et, 0) + 1 + if event_count <= 3 or et not in ( + "response.output_text.delta", + "response.function_call_arguments.delta", + ): + print(f"Codex raw stream: event #{event_count} type={et}") + yield d + print(f"Codex raw stream: finished event_count={event_count} types={event_types}") async def _stream_codex( self, @@ -479,10 +520,14 @@ class LLMClient: all_tools: list[dict] = [response_schema_tool] if tools: all_tools.extend(_to_responses_tools(tools)) + print( + f"Codex structured stream: start depth={depth} strict={strict} all_tools_count={len(all_tools)}" + ) has_response_schema_call = False tool_calls_by_id: dict[str, dict] = {} current_call_id: Optional[str] = None + chunks_yielded = 0 async for event in self._stream_codex_raw(model, messages, all_tools): event_type = event.get("type", "") @@ -492,28 +537,62 @@ class LLMClient: if item.get("type") == "function_call" and item.get("name") == "ResponseSchema": current_call_id = item.get("call_id", item.get("id")) has_response_schema_call = True + print(f"Codex structured stream: ResponseSchema output_item.added call_id={current_call_id}") elif event_type == "response.function_call_arguments.delta": if current_call_id is not None or has_response_schema_call: delta = event.get("delta", "") if delta: + chunks_yielded += 1 + if chunks_yielded <= 2: + print(f"Codex structured stream: yielding delta #{chunks_yielded} len={len(delta)}") yield delta + elif event_type == "response.function_call_arguments.done": + # Codex may send full arguments in one .done event instead of deltas + if event.get("name") == "ResponseSchema": + arguments = event.get("arguments", "") + if arguments: + chunks_yielded += 1 + print(f"Codex structured stream: ResponseSchema function_call_arguments.done len={len(arguments)}") + yield arguments + elif event_type == "response.output_item.done": item = event.get("item") or {} if item.get("type") == "function_call": tool_calls_by_id[item.get("call_id", item.get("id", ""))] = item + # Codex may send full arguments only on item done (no deltas) + if item.get("name") == "ResponseSchema": + arguments = item.get("arguments", "") + if arguments: + chunks_yielded += 1 + print(f"Codex structured stream: ResponseSchema output_item.done len={len(arguments)}") + yield arguments + + elif event_type == "response.output_text.delta": + # Fallback: model returned plain text (e.g. JSON) instead of ResponseSchema tool call + delta = event.get("delta", "") + if delta: + chunks_yielded += 1 + if chunks_yielded <= 2: + print(f"Codex structured stream: yielding output_text delta #{chunks_yielded} len={len(delta)}") + yield delta elif event_type in ("response.failed", "error"): msg_text = event.get("message") or str(event) + print(f"Codex structured stream: error event type={event_type} message={msg_text[:200]}") raise HTTPException(status_code=502, detail=f"Codex structured error: {msg_text}") + print( + f"Codex structured stream: end depth={depth} has_response_schema_call={has_response_schema_call} chunks_yielded={chunks_yielded} tool_calls_count={len(tool_calls_by_id)}" + ) # Handle non-ResponseSchema tool calls recursively other_tool_calls = { k: v for k, v in tool_calls_by_id.items() if v.get("name") != "ResponseSchema" } if other_tool_calls and tools and depth < 5: + print(f"Codex structured stream: recursing for other_tool_calls count={len(other_tool_calls)}") openai_calls = [ OpenAIToolCall( id=item.get("call_id", item.get("id", "")), @@ -558,7 +637,11 @@ class LLMClient: tools: Optional[List[dict]] = None, depth: int = 0, ) -> dict | None: - """Non-streaming structured output from Codex via ResponseSchema tool.""" + """Non-streaming structured output from Codex via ResponseSchema tool. + Codex API requires stream=True; we use the same client and stream, then + accumulate chunks (from deltas and .done events) and parse the result. + """ + print(f"Codex generate_structured: start depth={depth} strict={strict}") accumulated: list[str] = [] async for chunk in self._stream_codex_structured( model=model, @@ -572,10 +655,22 @@ class LLMClient: accumulated.append(chunk) raw = "".join(accumulated) + print( + f"Codex generate_structured: accumulated chunks={len(accumulated)} raw_len={len(raw)} depth={depth}" + ) if not raw: + print("Codex generate_structured: no content (raw empty), returning None") return None if depth == 0: - return dict(dirtyjson.loads(raw)) + try: + parsed = dict(dirtyjson.loads(raw)) + print(f"Codex generate_structured: parsed OK keys={list(parsed.keys())[:10]}") + return parsed + except Exception as parse_err: + print( + f"Codex generate_structured: parse failed raw_preview={raw[:200] if raw else ''} err={parse_err}" + ) + raise return raw # ? Prompts @@ -1234,6 +1329,9 @@ class LLMClient: max_tokens=max_tokens, ) case LLMProvider.CODEX: + print( + f"generate_structured (Codex): model={model} messages={len(messages)} strict={strict}" + ) content = await self._generate_codex_structured( model=model, messages=messages, @@ -1242,6 +1340,9 @@ class LLMClient: tools=parsed_tools, max_tokens=max_tokens, ) + print( + f"generate_structured (Codex): done content_is_none={content is None} content_keys={list(content.keys())[:8] if isinstance(content, dict) else None}" + ) case LLMProvider.GOOGLE: content = await self._generate_google_structured( model=model, diff --git a/servers/fastapi/utils/llm_calls/generate_slide_content.py b/servers/fastapi/utils/llm_calls/generate_slide_content.py index fcdb9f10..1c80d2a9 100644 --- a/servers/fastapi/utils/llm_calls/generate_slide_content.py +++ b/servers/fastapi/utils/llm_calls/generate_slide_content.py @@ -125,20 +125,28 @@ async def get_slide_content_from_type_and_outline( True, ) + messages = get_messages( + outline.content, + language, + tone, + verbosity, + instructions, + ) + print( + f"get_slide_content_from_type_and_outline: model={model} outline_len={len(outline.content or '')} language={language}" + ) try: response = await client.generate_structured( model=model, - messages=get_messages( - outline.content, - language, - tone, - verbosity, - instructions, - ), + messages=messages, response_format=response_schema, strict=False, ) + print( + f"get_slide_content_from_type_and_outline: response is None={response is None} keys={list(response.keys())[:6] if isinstance(response, dict) else None}" + ) return response except Exception as e: + print(f"get_slide_content_from_type_and_outline: exception={e}") raise handle_llm_client_exceptions(e)