feat: enhance LLMClient with Codex integration and error handling

- Implemented a new method to create an AsyncOpenAI client specifically for Codex.
- Added functionality to convert Codex event responses into a dictionary format for easier consumption.
- Updated the streaming methods to utilize the new Codex client and handle API errors gracefully.
- Improved logging for debugging purposes during Codex interactions.
- Refactored message handling in the slide content generation function for clarity and efficiency.
This commit is contained in:
sudipnext 2026-02-25 17:56:39 +05:45
parent d2e85a8ffa
commit 5f5482cfd5
2 changed files with 145 additions and 36 deletions

View file

@ -1,10 +1,9 @@
import asyncio
import dirtyjson
import httpx
import json
from typing import AsyncGenerator, List, Optional
from fastapi import HTTPException
from openai import AsyncOpenAI
from openai import APIStatusError, AsyncOpenAI, OpenAIError
from openai.types.chat.chat_completion_chunk import (
ChatCompletionChunk as OpenAIChatCompletionChunk,
)
@ -133,7 +132,7 @@ class LLMClient:
case LLMProvider.CUSTOM:
return self._get_custom_client()
case LLMProvider.CODEX:
return None # Codex uses direct httpx calls, not self._client
return None # Codex uses _get_codex_client() with AsyncOpenAI, not self._client
case _:
raise HTTPException(
status_code=400,
@ -230,6 +229,25 @@ class LLMClient:
"accept": "text/event-stream",
}
def _get_codex_client(self) -> AsyncOpenAI:
"""Return an AsyncOpenAI client configured for the Codex Responses API.
Client is built per call so headers/token are fresh after refresh.
Only Codex-specific headers are passed; content-type and accept are left
to the SDK so the server does not reject the request.
"""
headers = self._get_codex_headers()
access_token = (headers.get("Authorization") or "").replace("Bearer ", "").strip()
skip = {"authorization", "content-type", "accept"}
default_headers = {
k: v for k, v in headers.items() if k.lower() not in skip
}
return AsyncOpenAI(
base_url="https://chatgpt.com/backend-api/codex",
api_key=access_token or "codex",
default_headers=default_headers,
timeout=120.0,
)
# -------------------------------------------------------------------------
# Codex (Responses API) helpers
# -------------------------------------------------------------------------
@ -290,6 +308,19 @@ class LLMClient:
return body
def _codex_event_to_dict(self, event) -> dict:
"""Convert SDK ResponseStreamEvent to dict for existing consumers."""
if hasattr(event, "model_dump"):
return event.model_dump()
return {
"type": getattr(event, "type", None),
"delta": getattr(event, "delta", None),
"item": getattr(event, "item", None),
"message": getattr(event, "message", None),
"arguments": getattr(event, "arguments", None),
"name": getattr(event, "name", None),
}
async def _stream_codex_raw(
self,
model: str,
@ -297,32 +328,42 @@ class LLMClient:
tools: Optional[List[dict]] = None,
):
"""Async generator of raw SSE event dicts from the Codex Responses API."""
headers = self._get_codex_headers()
print(
f"Codex raw stream: starting model={model} messages={len(messages)} tools_count={len(tools) if tools else 0}"
)
client = self._get_codex_client()
body = self._build_codex_body(model, messages, tools)
create_kwargs = {k: v for k, v in body.items() if k != "stream"}
async with httpx.AsyncClient(timeout=120.0) as http_client:
async with http_client.stream(
"POST",
"https://chatgpt.com/backend-api/codex/responses",
json=body,
headers=headers,
) as resp:
if resp.status_code != 200:
error_text = await resp.aread()
raise HTTPException(
status_code=resp.status_code,
detail=f"Codex API error {resp.status_code}: {error_text.decode(errors='replace')[:400]}",
)
async for line in resp.aiter_lines():
if not line.startswith("data: "):
continue
data = line[6:].strip()
if not data or data == "[DONE]":
continue
try:
yield json.loads(data)
except json.JSONDecodeError:
continue
try:
stream = await client.responses.create(
stream=True,
**create_kwargs,
)
print("Codex raw stream: create() returned, iterating events")
except (APIStatusError, OpenAIError) as e:
status = getattr(e, "status_code", 502)
detail = getattr(e, "message", str(e)) or str(e)
print(f"Codex raw stream: API error status={status} detail={detail[:200]}")
raise HTTPException(
status_code=status,
detail=f"Codex API error: {detail}"[:400],
) from e
event_count = 0
event_types: dict[str, int] = {}
async for event in stream:
event_count += 1
d = self._codex_event_to_dict(event)
et = d.get("type") or "unknown"
event_types[et] = event_types.get(et, 0) + 1
if event_count <= 3 or et not in (
"response.output_text.delta",
"response.function_call_arguments.delta",
):
print(f"Codex raw stream: event #{event_count} type={et}")
yield d
print(f"Codex raw stream: finished event_count={event_count} types={event_types}")
async def _stream_codex(
self,
@ -479,10 +520,14 @@ class LLMClient:
all_tools: list[dict] = [response_schema_tool]
if tools:
all_tools.extend(_to_responses_tools(tools))
print(
f"Codex structured stream: start depth={depth} strict={strict} all_tools_count={len(all_tools)}"
)
has_response_schema_call = False
tool_calls_by_id: dict[str, dict] = {}
current_call_id: Optional[str] = None
chunks_yielded = 0
async for event in self._stream_codex_raw(model, messages, all_tools):
event_type = event.get("type", "")
@ -492,28 +537,62 @@ class LLMClient:
if item.get("type") == "function_call" and item.get("name") == "ResponseSchema":
current_call_id = item.get("call_id", item.get("id"))
has_response_schema_call = True
print(f"Codex structured stream: ResponseSchema output_item.added call_id={current_call_id}")
elif event_type == "response.function_call_arguments.delta":
if current_call_id is not None or has_response_schema_call:
delta = event.get("delta", "")
if delta:
chunks_yielded += 1
if chunks_yielded <= 2:
print(f"Codex structured stream: yielding delta #{chunks_yielded} len={len(delta)}")
yield delta
elif event_type == "response.function_call_arguments.done":
# Codex may send full arguments in one .done event instead of deltas
if event.get("name") == "ResponseSchema":
arguments = event.get("arguments", "")
if arguments:
chunks_yielded += 1
print(f"Codex structured stream: ResponseSchema function_call_arguments.done len={len(arguments)}")
yield arguments
elif event_type == "response.output_item.done":
item = event.get("item") or {}
if item.get("type") == "function_call":
tool_calls_by_id[item.get("call_id", item.get("id", ""))] = item
# Codex may send full arguments only on item done (no deltas)
if item.get("name") == "ResponseSchema":
arguments = item.get("arguments", "")
if arguments:
chunks_yielded += 1
print(f"Codex structured stream: ResponseSchema output_item.done len={len(arguments)}")
yield arguments
elif event_type == "response.output_text.delta":
# Fallback: model returned plain text (e.g. JSON) instead of ResponseSchema tool call
delta = event.get("delta", "")
if delta:
chunks_yielded += 1
if chunks_yielded <= 2:
print(f"Codex structured stream: yielding output_text delta #{chunks_yielded} len={len(delta)}")
yield delta
elif event_type in ("response.failed", "error"):
msg_text = event.get("message") or str(event)
print(f"Codex structured stream: error event type={event_type} message={msg_text[:200]}")
raise HTTPException(status_code=502, detail=f"Codex structured error: {msg_text}")
print(
f"Codex structured stream: end depth={depth} has_response_schema_call={has_response_schema_call} chunks_yielded={chunks_yielded} tool_calls_count={len(tool_calls_by_id)}"
)
# Handle non-ResponseSchema tool calls recursively
other_tool_calls = {
k: v for k, v in tool_calls_by_id.items()
if v.get("name") != "ResponseSchema"
}
if other_tool_calls and tools and depth < 5:
print(f"Codex structured stream: recursing for other_tool_calls count={len(other_tool_calls)}")
openai_calls = [
OpenAIToolCall(
id=item.get("call_id", item.get("id", "")),
@ -558,7 +637,11 @@ class LLMClient:
tools: Optional[List[dict]] = None,
depth: int = 0,
) -> dict | None:
"""Non-streaming structured output from Codex via ResponseSchema tool."""
"""Non-streaming structured output from Codex via ResponseSchema tool.
Codex API requires stream=True; we use the same client and stream, then
accumulate chunks (from deltas and .done events) and parse the result.
"""
print(f"Codex generate_structured: start depth={depth} strict={strict}")
accumulated: list[str] = []
async for chunk in self._stream_codex_structured(
model=model,
@ -572,10 +655,22 @@ class LLMClient:
accumulated.append(chunk)
raw = "".join(accumulated)
print(
f"Codex generate_structured: accumulated chunks={len(accumulated)} raw_len={len(raw)} depth={depth}"
)
if not raw:
print("Codex generate_structured: no content (raw empty), returning None")
return None
if depth == 0:
return dict(dirtyjson.loads(raw))
try:
parsed = dict(dirtyjson.loads(raw))
print(f"Codex generate_structured: parsed OK keys={list(parsed.keys())[:10]}")
return parsed
except Exception as parse_err:
print(
f"Codex generate_structured: parse failed raw_preview={raw[:200] if raw else ''} err={parse_err}"
)
raise
return raw
# ? Prompts
@ -1234,6 +1329,9 @@ class LLMClient:
max_tokens=max_tokens,
)
case LLMProvider.CODEX:
print(
f"generate_structured (Codex): model={model} messages={len(messages)} strict={strict}"
)
content = await self._generate_codex_structured(
model=model,
messages=messages,
@ -1242,6 +1340,9 @@ class LLMClient:
tools=parsed_tools,
max_tokens=max_tokens,
)
print(
f"generate_structured (Codex): done content_is_none={content is None} content_keys={list(content.keys())[:8] if isinstance(content, dict) else None}"
)
case LLMProvider.GOOGLE:
content = await self._generate_google_structured(
model=model,

View file

@ -125,20 +125,28 @@ async def get_slide_content_from_type_and_outline(
True,
)
messages = get_messages(
outline.content,
language,
tone,
verbosity,
instructions,
)
print(
f"get_slide_content_from_type_and_outline: model={model} outline_len={len(outline.content or '')} language={language}"
)
try:
response = await client.generate_structured(
model=model,
messages=get_messages(
outline.content,
language,
tone,
verbosity,
instructions,
),
messages=messages,
response_format=response_schema,
strict=False,
)
print(
f"get_slide_content_from_type_and_outline: response is None={response is None} keys={list(response.keys())[:6] if isinstance(response, dict) else None}"
)
return response
except Exception as e:
print(f"get_slide_content_from_type_and_outline: exception={e}")
raise handle_llm_client_exceptions(e)