feat: add CodexLLMAdapter for structured LLM calls and integrate with LLMClient

- Introduced CodexLLMAdapter to handle structured and unstructured LLM calls via the Codex Responses API.
- Implemented methods for converting tool formats and building request bodies for Codex.
- Refactored LLMClient to utilize CodexLLMAdapter, enhancing tool call handling and response processing.
- Updated LLMToolCallsHandler to support Codex as a valid LLM provider.
- Enhanced schema utilities for better JSON schema handling.
This commit is contained in:
sudipnext 2026-02-26 12:38:58 +05:45
parent 5f5482cfd5
commit d2e3ab9d15
4 changed files with 514 additions and 476 deletions

View file

@ -0,0 +1,431 @@
"""Codex (Responses API) adapter for structured and unstructured LLM calls.
Stateless adapter: receives AsyncOpenAI client and tool_calls_handler at call time.
Auth and client creation stay in LLMClient. Structure matches other providers:
generate = call API, collect content + tool_calls, recurse on tool_calls; stream = same but yield deltas.
Uses LLMToolCallsHandler directly: tools are parsed via parse_tools() in llm_client (handler supports
Codex and returns OpenAI-style dicts); this module flattens them for the Responses API. Tool execution
uses tool_calls_handler.handle_tool_calls_openai().
"""
import dirtyjson
from typing import Any, AsyncGenerator, List, Optional, Union
from fastapi import HTTPException
from openai import APIStatusError, AsyncOpenAI, OpenAIError
from models.llm_message import (
LLMMessage,
OpenAIAssistantMessage,
LLMSystemMessage,
LLMUserMessage,
)
from models.llm_tool_call import OpenAIToolCall, OpenAIToolCallFunction
from utils.schema_utils import ensure_strict_json_schema
# Responses API requires flat tool format: {"type":"function","name":...,"description":...,"parameters":...}
RESPONSE_SCHEMA_NAME = "ResponseSchema"
# Required tool choice for structured: force ResponseSchema (no plain-text fallback).
STRUCTURED_TOOL_CHOICE = {"type": "function", "name": RESPONSE_SCHEMA_NAME}
MAX_RECURSION_DEPTH = 5
def _to_responses_tools(chat_tools: List[dict]) -> List[dict]:
"""Convert Chat Completions tool format to flat Responses API format."""
result = []
for tool in chat_tools:
if tool.get("type") != "function":
result.append(tool)
continue
fn = tool.get("function") or tool
result.append({
"type": "function",
"name": fn.get("name", ""),
"description": fn.get("description", ""),
"parameters": fn.get("parameters", {}),
})
return result
def _items_to_openai_calls(items_by_id: dict[str, dict]) -> List[OpenAIToolCall]:
"""Build OpenAIToolCall list from Responses API output_item map."""
return [
OpenAIToolCall(
id=item.get("call_id", item.get("id", "")),
type="function",
function=OpenAIToolCallFunction(
name=item.get("name", ""),
arguments=item.get("arguments", "{}"),
),
)
for item in items_by_id.values()
]
async def _messages_after_tool_turn(
messages: List[LLMMessage],
items_by_id: dict[str, dict],
tool_calls_handler: Any,
) -> List[LLMMessage]:
"""Handle tool calls and return messages extended with assistant turn + tool results."""
openai_calls = _items_to_openai_calls(items_by_id)
tool_call_messages = await tool_calls_handler.handle_tool_calls_openai(openai_calls)
return [
*messages,
OpenAIAssistantMessage(
role="assistant",
content=None,
tool_calls=[tc.model_dump() for tc in openai_calls],
),
*tool_call_messages,
]
def _build_body(
model: str,
messages: List[LLMMessage],
tools: Optional[List[dict]] = None,
tool_choice: Optional[Union[str, dict]] = None,
) -> dict:
"""Build Responses API request body."""
instructions = None
input_messages = []
for msg in messages:
if isinstance(msg, LLMSystemMessage):
instructions = msg.content
elif isinstance(msg, LLMUserMessage):
input_messages.append({
"role": "user",
"content": [{"type": "input_text", "text": msg.content}],
})
elif isinstance(msg, OpenAIAssistantMessage):
text = msg.content or ""
if text:
input_messages.append({
"role": "assistant",
"content": [{"type": "output_text", "text": text}],
})
else:
text = getattr(msg, "content", "") or ""
if text:
input_messages.append({
"role": "user",
"content": [{"type": "input_text", "text": text}],
})
body: dict = {
"model": model,
"store": False,
"stream": True,
"text": {"verbosity": "medium"},
"include": ["reasoning.encrypted_content"],
"tool_choice": tool_choice if tool_choice is not None else "auto",
"parallel_tool_calls": True,
}
if instructions:
body["instructions"] = instructions
if input_messages:
body["input"] = input_messages
if tools:
body["tools"] = tools
return body
def _event_to_dict(event: Any) -> dict:
"""Convert SDK event to dict."""
if hasattr(event, "model_dump"):
return event.model_dump()
return {
"type": getattr(event, "type", None),
"delta": getattr(event, "delta", None),
"item": getattr(event, "item", None),
"message": getattr(event, "message", None),
"arguments": getattr(event, "arguments", None),
"name": getattr(event, "name", None),
}
async def _stream_raw(
client: AsyncOpenAI,
model: str,
messages: List[LLMMessage],
tools: Optional[List[dict]] = None,
tool_choice: Optional[Union[str, dict]] = None,
) -> AsyncGenerator[dict, None]:
"""Yield raw SSE event dicts from Codex Responses API."""
body = _build_body(model, messages, tools, tool_choice=tool_choice)
create_kwargs = {k: v for k, v in body.items() if k != "stream"}
try:
stream = await client.responses.create(stream=True, **create_kwargs)
except (APIStatusError, OpenAIError) as e:
status = getattr(e, "status_code", 502)
detail = getattr(e, "message", str(e)) or str(e)
raise HTTPException(
status_code=status,
detail=f"Codex API error: {detail}"[:400],
) from e
async for event in stream:
yield _event_to_dict(event)
class CodexLLMAdapter:
"""Stateless adapter for Codex Responses API. Matches other providers: generate/stream + tool recursion."""
@staticmethod
async def generate_codex(
client: AsyncOpenAI,
model: str,
messages: List[LLMMessage],
tool_calls_handler: Any,
max_tokens: Optional[int] = None,
tools: Optional[List[dict]] = None,
depth: int = 0,
) -> Optional[str]:
"""Generate text; on tool_calls handle and recurse (like _generate_openai / _generate_anthropic)."""
print(
f"Codex generate: model={model} depth={depth} tools_count={len(tools) if tools else 0}"
)
responses_tools = _to_responses_tools(tools) if tools else None
text_parts: List[str] = []
tool_calls_by_id: dict[str, dict] = {}
async for event in _stream_raw(client, model, messages, responses_tools, tool_choice=None):
event_type = event.get("type", "")
if event_type == "response.output_text.delta":
delta = event.get("delta", "")
if delta:
text_parts.append(delta)
elif event_type == "response.output_item.done":
item = event.get("item") or {}
if item.get("type") == "function_call":
tool_calls_by_id[item.get("call_id", item.get("id", ""))] = item
elif event_type in ("response.failed", "error"):
msg_text = event.get("message") or str(event)
raise HTTPException(status_code=502, detail=f"Codex error: {msg_text}")
if tool_calls_by_id and tools and depth < MAX_RECURSION_DEPTH:
print(
f"Codex generate: tool calls detected depth={depth} count={len(tool_calls_by_id)}"
)
new_messages = await _messages_after_tool_turn(
messages, tool_calls_by_id, tool_calls_handler
)
return await CodexLLMAdapter.generate_codex(
client, model, new_messages, tool_calls_handler,
max_tokens=max_tokens, tools=tools, depth=depth + 1,
)
return "".join(text_parts) or None
@staticmethod
async def stream_codex(
client: AsyncOpenAI,
model: str,
messages: List[LLMMessage],
tool_calls_handler: Any,
max_tokens: Optional[int] = None,
tools: Optional[List[dict]] = None,
depth: int = 0,
) -> AsyncGenerator[str, None]:
"""Stream text deltas; on tool_calls handle and recurse (like _stream_openai)."""
print(
f"Codex stream: model={model} depth={depth} tools_count={len(tools) if tools else 0}"
)
responses_tools = _to_responses_tools(tools) if tools else None
tool_calls_by_id: dict[str, dict] = {}
async for event in _stream_raw(client, model, messages, responses_tools, tool_choice=None):
event_type = event.get("type", "")
if event_type == "response.output_text.delta":
delta = event.get("delta", "")
if delta:
yield delta
elif event_type == "response.output_item.done":
item = event.get("item") or {}
if item.get("type") == "function_call":
tool_calls_by_id[item.get("call_id", item.get("id", ""))] = item
elif event_type in ("response.failed", "error"):
msg_text = event.get("message") or str(event)
raise HTTPException(status_code=502, detail=f"Codex stream error: {msg_text}")
if tool_calls_by_id and tools and depth < MAX_RECURSION_DEPTH:
print(
f"Codex stream: tool calls detected depth={depth} count={len(tool_calls_by_id)}"
)
new_messages = await _messages_after_tool_turn(
messages, tool_calls_by_id, tool_calls_handler
)
async for chunk in CodexLLMAdapter.stream_codex(
client, model, new_messages, tool_calls_handler,
max_tokens=max_tokens, tools=tools, depth=depth + 1,
):
yield chunk
@staticmethod
async def stream_codex_structured(
client: AsyncOpenAI,
model: str,
messages: List[LLMMessage],
response_format: dict,
tool_calls_handler: Any,
strict: bool = False,
max_tokens: Optional[int] = None,
tools: Optional[List[dict]] = None,
depth: int = 0,
) -> AsyncGenerator[str, None]:
"""Stream JSON chunks from ResponseSchema tool; recurse for other tool_calls.
Structured output is achieved by always adding an internal ResponseSchema "tool"
(with response_format as its parameters) and tool_choice=ResponseSchema. So
user_tools=0 only means no extra tools like web search; we still use the
ResponseSchema tool to receive the model's JSON.
"""
user_tools_count = len(tools) if tools else 0
print(
f"Codex stream_structured: model={model} depth={depth} strict={strict} "
f"user_tools={user_tools_count} (always adding ResponseSchema tool for structured JSON)"
)
schema = ensure_strict_json_schema(response_format, path=(), root=response_format) if strict and depth == 0 else response_format
response_schema_tool = {
"type": "function",
"name": RESPONSE_SCHEMA_NAME,
"description": "Provide response to the user",
"parameters": schema,
}
all_tools: List[dict] = [response_schema_tool]
if tools:
all_tools.extend(_to_responses_tools(tools))
tool_calls_by_id: dict[str, dict] = {}
current_call_id: Optional[str] = None
async for event in _stream_raw(
client, model, messages, all_tools, tool_choice=STRUCTURED_TOOL_CHOICE
):
event_type = event.get("type", "")
if event_type == "response.output_item.added":
item = event.get("item") or {}
if item.get("type") == "function_call" and item.get("name") == RESPONSE_SCHEMA_NAME:
current_call_id = item.get("call_id", item.get("id"))
print(
f"Codex stream_structured: ResponseSchema call started call_id={current_call_id}"
)
elif event_type == "response.function_call_arguments.delta":
if current_call_id is not None:
delta = event.get("delta", "")
if delta:
# Log only first few chunks to avoid log spam
print(
f"Codex stream_structured: ResponseSchema delta chunk len={len(delta)}"
)
yield delta
elif event_type == "response.function_call_arguments.done":
if event.get("name") == RESPONSE_SCHEMA_NAME:
arguments = event.get("arguments", "")
if arguments:
print(
f"Codex stream_structured: ResponseSchema arguments.done len={len(arguments)}"
)
yield arguments
elif event_type == "response.output_item.done":
item = event.get("item") or {}
if item.get("type") == "function_call":
tool_calls_by_id[item.get("call_id", item.get("id", ""))] = item
if item.get("name") == RESPONSE_SCHEMA_NAME:
arguments = item.get("arguments", "")
if arguments:
print(
f"Codex stream_structured: ResponseSchema output_item.done len={len(arguments)}"
)
yield arguments
elif event_type in ("response.failed", "error"):
msg_text = event.get("message") or str(event)
raise HTTPException(status_code=502, detail=f"Codex structured error: {msg_text}")
other_tool_calls = {
k: v for k, v in tool_calls_by_id.items()
if v.get("name") != RESPONSE_SCHEMA_NAME
}
if other_tool_calls and tools and depth < MAX_RECURSION_DEPTH:
print(
f"Codex stream_structured: recursing for non-ResponseSchema tool calls "
f"depth={depth} count={len(other_tool_calls)}"
)
new_messages = await _messages_after_tool_turn(
messages, other_tool_calls, tool_calls_handler
)
async for chunk in CodexLLMAdapter.stream_codex_structured(
client, model, new_messages, response_format, tool_calls_handler,
strict=strict, max_tokens=max_tokens, tools=tools, depth=depth + 1,
):
yield chunk
@staticmethod
async def generate_codex_structured(
client: AsyncOpenAI,
model: str,
messages: List[LLMMessage],
response_format: dict,
tool_calls_handler: Any,
strict: bool = False,
max_tokens: Optional[int] = None,
tools: Optional[List[dict]] = None,
depth: int = 0,
) -> Optional[dict]:
"""Collect stream and parse JSON (like _generate_openai_structured)."""
user_tools_count = len(tools) if tools else 0
print(
f"Codex generate_structured: model={model} depth={depth} strict={strict} "
f"user_tools={user_tools_count} (using ResponseSchema tool for structured JSON)"
)
accumulated: List[str] = []
async for chunk in CodexLLMAdapter.stream_codex_structured(
client, model, messages, response_format, tool_calls_handler,
strict=strict, max_tokens=max_tokens, tools=tools, depth=depth,
):
accumulated.append(chunk)
raw = "".join(accumulated)
if not raw:
return None
if depth == 0:
try:
parsed = dict(dirtyjson.loads(raw))
print(
f"Codex generate_structured: parsed JSON keys={list(parsed.keys())[:8]}"
)
return parsed
except Exception:
start = raw.find("{")
if start >= 0:
try:
parsed = dict(dirtyjson.loads(raw[start:]))
print(
"Codex generate_structured: parsed JSON from offset "
f"{start} keys={list(parsed.keys())[:8]}"
)
return parsed
except Exception:
pass
raise HTTPException(
status_code=502,
detail=(
"Model did not return valid structured output (expected JSON from ResponseSchema). "
"Please retry."
),
)
return None

View file

@ -1,7 +1,7 @@
import asyncio
import dirtyjson
import json
from typing import AsyncGenerator, List, Optional
from typing import Any, AsyncGenerator, List, Optional, Union
from fastapi import HTTPException
from openai import APIStatusError, AsyncOpenAI, OpenAIError
from openai.types.chat.chat_completion_chunk import (
@ -39,6 +39,7 @@ from models.llm_tool_call import (
OpenAIToolCallFunction,
)
from models.llm_tools import LLMDynamicTool, LLMTool
from services.codex_llm import CodexLLMAdapter
from services.llm_tool_calls_handler import LLMToolCallsHandler
from utils.async_iterator import iterator_to_async
from utils.dummy_functions import do_nothing_async
@ -72,27 +73,6 @@ from utils.schema_utils import (
)
def _to_responses_tools(chat_tools: List[dict]) -> List[dict]:
"""Convert Chat Completions tool format to flat Responses API format.
Chat Completions: {"type": "function", "function": {"name": ..., "description": ..., "parameters": ...}}
Responses API: {"type": "function", "name": ..., "description": ..., "parameters": ...}
"""
result = []
for tool in chat_tools:
if tool.get("type") != "function":
result.append(tool)
continue
fn = tool.get("function") or tool
result.append({
"type": "function",
"name": fn.get("name", ""),
"description": fn.get("description", ""),
"parameters": fn.get("parameters", {}),
})
return result
class LLMClient:
def __init__(self):
self.llm_provider = get_llm_provider()
@ -248,431 +228,6 @@ class LLMClient:
timeout=120.0,
)
# -------------------------------------------------------------------------
# Codex (Responses API) helpers
# -------------------------------------------------------------------------
def _build_codex_body(
self,
model: str,
messages: List[LLMMessage],
tools: Optional[List[dict]] = None,
) -> dict:
"""Convert LLMMessages to the Responses API request body for Codex."""
instructions = None
input_messages = []
for msg in messages:
if isinstance(msg, LLMSystemMessage):
instructions = msg.content
elif isinstance(msg, LLMUserMessage):
input_messages.append({
"role": "user",
"content": [{"type": "input_text", "text": msg.content}],
})
elif isinstance(msg, OpenAIAssistantMessage):
# Assistant turn — may carry text or tool-call data
text = msg.content or ""
if text:
input_messages.append({
"role": "assistant",
"content": [{"type": "output_text", "text": text}],
})
# Tool calls from a previous assistant turn are already resolved
# via the tool result messages that follow; skip raw tool call data.
else:
# Generic fallback: treat as user message
text = getattr(msg, "content", "") or ""
if text:
input_messages.append({
"role": "user",
"content": [{"type": "input_text", "text": text}],
})
body: dict = {
"model": model,
"store": False,
"stream": True,
"text": {"verbosity": "medium"},
"include": ["reasoning.encrypted_content"],
"tool_choice": "auto",
"parallel_tool_calls": True,
}
if instructions:
body["instructions"] = instructions
if input_messages:
body["input"] = input_messages
if tools:
# Responses API uses flat format: {"type":"function","name":...}
body["tools"] = tools
return body
def _codex_event_to_dict(self, event) -> dict:
"""Convert SDK ResponseStreamEvent to dict for existing consumers."""
if hasattr(event, "model_dump"):
return event.model_dump()
return {
"type": getattr(event, "type", None),
"delta": getattr(event, "delta", None),
"item": getattr(event, "item", None),
"message": getattr(event, "message", None),
"arguments": getattr(event, "arguments", None),
"name": getattr(event, "name", None),
}
async def _stream_codex_raw(
self,
model: str,
messages: List[LLMMessage],
tools: Optional[List[dict]] = None,
):
"""Async generator of raw SSE event dicts from the Codex Responses API."""
print(
f"Codex raw stream: starting model={model} messages={len(messages)} tools_count={len(tools) if tools else 0}"
)
client = self._get_codex_client()
body = self._build_codex_body(model, messages, tools)
create_kwargs = {k: v for k, v in body.items() if k != "stream"}
try:
stream = await client.responses.create(
stream=True,
**create_kwargs,
)
print("Codex raw stream: create() returned, iterating events")
except (APIStatusError, OpenAIError) as e:
status = getattr(e, "status_code", 502)
detail = getattr(e, "message", str(e)) or str(e)
print(f"Codex raw stream: API error status={status} detail={detail[:200]}")
raise HTTPException(
status_code=status,
detail=f"Codex API error: {detail}"[:400],
) from e
event_count = 0
event_types: dict[str, int] = {}
async for event in stream:
event_count += 1
d = self._codex_event_to_dict(event)
et = d.get("type") or "unknown"
event_types[et] = event_types.get(et, 0) + 1
if event_count <= 3 or et not in (
"response.output_text.delta",
"response.function_call_arguments.delta",
):
print(f"Codex raw stream: event #{event_count} type={et}")
yield d
print(f"Codex raw stream: finished event_count={event_count} types={event_types}")
async def _stream_codex(
self,
model: str,
messages: List[LLMMessage],
max_tokens: Optional[int] = None,
tools: Optional[List[dict]] = None,
depth: int = 0,
) -> AsyncGenerator[str, None]:
"""Stream text from the Codex Responses API, handling tool calls."""
# Convert Chat-Completions tool format to flat Responses API format
responses_tools = _to_responses_tools(tools) if tools else None
tool_calls_by_id: dict[str, dict] = {}
async for event in self._stream_codex_raw(model, messages, responses_tools):
event_type = event.get("type", "")
if event_type == "response.output_text.delta":
delta = event.get("delta", "")
if delta:
yield delta
elif event_type == "response.output_item.done":
item = event.get("item") or {}
if item.get("type") == "function_call":
tool_calls_by_id[item.get("call_id", item.get("id", ""))] = item
elif event_type in ("response.failed", "error"):
msg_text = event.get("message") or str(event)
raise HTTPException(status_code=502, detail=f"Codex stream error: {msg_text}")
if tool_calls_by_id and tools and depth < 5:
openai_calls = [
OpenAIToolCall(
id=item.get("call_id", item.get("id", "")),
type="function",
function=OpenAIToolCallFunction(
name=item.get("name", ""),
arguments=item.get("arguments", "{}"),
),
)
for item in tool_calls_by_id.values()
]
tool_call_messages = await self.tool_calls_handler.handle_tool_calls_openai(
openai_calls
)
new_messages = [
*messages,
OpenAIAssistantMessage(
role="assistant",
content=None,
tool_calls=[tc.model_dump() for tc in openai_calls],
),
*tool_call_messages,
]
async for chunk in self._stream_codex(
model=model,
messages=new_messages,
max_tokens=max_tokens,
tools=tools,
depth=depth + 1,
):
yield chunk
async def _generate_codex(
self,
model: str,
messages: List[LLMMessage],
max_tokens: Optional[int] = None,
tools: Optional[List[dict]] = None,
depth: int = 0,
) -> str | None:
"""Non-streaming text generation via Codex Responses API."""
responses_tools = _to_responses_tools(tools) if tools else None
text_parts: list[str] = []
tool_calls_by_id: dict[str, dict] = {}
async for event in self._stream_codex_raw(model, messages, responses_tools):
event_type = event.get("type", "")
if event_type == "response.output_text.delta":
delta = event.get("delta", "")
if delta:
text_parts.append(delta)
elif event_type == "response.output_item.done":
item = event.get("item") or {}
if item.get("type") == "function_call":
tool_calls_by_id[item.get("call_id", item.get("id", ""))] = item
elif event_type in ("response.failed", "error"):
msg_text = event.get("message") or str(event)
raise HTTPException(status_code=502, detail=f"Codex error: {msg_text}")
if tool_calls_by_id and tools and depth < 5:
openai_calls = [
OpenAIToolCall(
id=item.get("call_id", item.get("id", "")),
type="function",
function=OpenAIToolCallFunction(
name=item.get("name", ""),
arguments=item.get("arguments", "{}"),
),
)
for item in tool_calls_by_id.values()
]
tool_call_messages = await self.tool_calls_handler.handle_tool_calls_openai(
openai_calls
)
new_messages = [
*messages,
OpenAIAssistantMessage(
role="assistant",
content=None,
tool_calls=[tc.model_dump() for tc in openai_calls],
),
*tool_call_messages,
]
return await self._generate_codex(
model=model,
messages=new_messages,
max_tokens=max_tokens,
tools=tools,
depth=depth + 1,
)
return "".join(text_parts) or None
async def _stream_codex_structured(
self,
model: str,
messages: List[LLMMessage],
response_format: dict,
strict: bool = False,
max_tokens: Optional[int] = None,
tools: Optional[List[dict]] = None,
depth: int = 0,
) -> AsyncGenerator[str, None]:
"""Stream structured output from Codex via a ResponseSchema tool call."""
# Build the ResponseSchema tool in flat Responses API format
schema = response_format
if strict and depth == 0:
schema = ensure_strict_json_schema(schema, path=(), root=schema)
response_schema_tool = {
"type": "function",
"name": "ResponseSchema",
"description": "Provide response to the user",
"parameters": schema,
}
all_tools: list[dict] = [response_schema_tool]
if tools:
all_tools.extend(_to_responses_tools(tools))
print(
f"Codex structured stream: start depth={depth} strict={strict} all_tools_count={len(all_tools)}"
)
has_response_schema_call = False
tool_calls_by_id: dict[str, dict] = {}
current_call_id: Optional[str] = None
chunks_yielded = 0
async for event in self._stream_codex_raw(model, messages, all_tools):
event_type = event.get("type", "")
if event_type == "response.output_item.added":
item = event.get("item") or {}
if item.get("type") == "function_call" and item.get("name") == "ResponseSchema":
current_call_id = item.get("call_id", item.get("id"))
has_response_schema_call = True
print(f"Codex structured stream: ResponseSchema output_item.added call_id={current_call_id}")
elif event_type == "response.function_call_arguments.delta":
if current_call_id is not None or has_response_schema_call:
delta = event.get("delta", "")
if delta:
chunks_yielded += 1
if chunks_yielded <= 2:
print(f"Codex structured stream: yielding delta #{chunks_yielded} len={len(delta)}")
yield delta
elif event_type == "response.function_call_arguments.done":
# Codex may send full arguments in one .done event instead of deltas
if event.get("name") == "ResponseSchema":
arguments = event.get("arguments", "")
if arguments:
chunks_yielded += 1
print(f"Codex structured stream: ResponseSchema function_call_arguments.done len={len(arguments)}")
yield arguments
elif event_type == "response.output_item.done":
item = event.get("item") or {}
if item.get("type") == "function_call":
tool_calls_by_id[item.get("call_id", item.get("id", ""))] = item
# Codex may send full arguments only on item done (no deltas)
if item.get("name") == "ResponseSchema":
arguments = item.get("arguments", "")
if arguments:
chunks_yielded += 1
print(f"Codex structured stream: ResponseSchema output_item.done len={len(arguments)}")
yield arguments
elif event_type == "response.output_text.delta":
# Fallback: model returned plain text (e.g. JSON) instead of ResponseSchema tool call
delta = event.get("delta", "")
if delta:
chunks_yielded += 1
if chunks_yielded <= 2:
print(f"Codex structured stream: yielding output_text delta #{chunks_yielded} len={len(delta)}")
yield delta
elif event_type in ("response.failed", "error"):
msg_text = event.get("message") or str(event)
print(f"Codex structured stream: error event type={event_type} message={msg_text[:200]}")
raise HTTPException(status_code=502, detail=f"Codex structured error: {msg_text}")
print(
f"Codex structured stream: end depth={depth} has_response_schema_call={has_response_schema_call} chunks_yielded={chunks_yielded} tool_calls_count={len(tool_calls_by_id)}"
)
# Handle non-ResponseSchema tool calls recursively
other_tool_calls = {
k: v for k, v in tool_calls_by_id.items()
if v.get("name") != "ResponseSchema"
}
if other_tool_calls and tools and depth < 5:
print(f"Codex structured stream: recursing for other_tool_calls count={len(other_tool_calls)}")
openai_calls = [
OpenAIToolCall(
id=item.get("call_id", item.get("id", "")),
type="function",
function=OpenAIToolCallFunction(
name=item.get("name", ""),
arguments=item.get("arguments", "{}"),
),
)
for item in other_tool_calls.values()
]
tool_call_messages = await self.tool_calls_handler.handle_tool_calls_openai(
openai_calls
)
new_messages = [
*messages,
OpenAIAssistantMessage(
role="assistant",
content=None,
tool_calls=[tc.model_dump() for tc in openai_calls],
),
*tool_call_messages,
]
async for chunk in self._stream_codex_structured(
model=model,
messages=new_messages,
response_format=response_format,
strict=strict,
max_tokens=max_tokens,
tools=tools,
depth=depth + 1,
):
yield chunk
async def _generate_codex_structured(
self,
model: str,
messages: List[LLMMessage],
response_format: dict,
strict: bool = False,
max_tokens: Optional[int] = None,
tools: Optional[List[dict]] = None,
depth: int = 0,
) -> dict | None:
"""Non-streaming structured output from Codex via ResponseSchema tool.
Codex API requires stream=True; we use the same client and stream, then
accumulate chunks (from deltas and .done events) and parse the result.
"""
print(f"Codex generate_structured: start depth={depth} strict={strict}")
accumulated: list[str] = []
async for chunk in self._stream_codex_structured(
model=model,
messages=messages,
response_format=response_format,
strict=strict,
max_tokens=max_tokens,
tools=tools,
depth=depth,
):
accumulated.append(chunk)
raw = "".join(accumulated)
print(
f"Codex generate_structured: accumulated chunks={len(accumulated)} raw_len={len(raw)} depth={depth}"
)
if not raw:
print("Codex generate_structured: no content (raw empty), returning None")
return None
if depth == 0:
try:
parsed = dict(dirtyjson.loads(raw))
print(f"Codex generate_structured: parsed OK keys={list(parsed.keys())[:10]}")
return parsed
except Exception as parse_err:
print(
f"Codex generate_structured: parse failed raw_preview={raw[:200] if raw else ''} err={parse_err}"
)
raise
return raw
# ? Prompts
def _get_system_prompt(self, messages: List[LLMMessage]) -> str:
for message in messages:
@ -946,11 +501,14 @@ class LLMClient:
tools=parsed_tools,
)
case LLMProvider.CODEX:
content = await self._generate_codex(
model=model,
messages=messages,
max_tokens=max_tokens,
tools=parsed_tools,
print(
f"LLMClient.generate Codex: model={model} messages={len(messages)} "
f"user_tools={len(parsed_tools) if parsed_tools else 0}"
)
client = self._get_codex_client()
content = await CodexLLMAdapter.generate_codex(
client, model, messages, self.tool_calls_handler,
max_tokens=max_tokens, tools=parsed_tools,
)
case LLMProvider.GOOGLE:
content = await self._generate_google(
@ -1330,18 +888,18 @@ class LLMClient:
)
case LLMProvider.CODEX:
print(
f"generate_structured (Codex): model={model} messages={len(messages)} strict={strict}"
f"LLMClient.generate_structured Codex: model={model} messages={len(messages)} "
f"strict={strict} user_tools={len(parsed_tools) if parsed_tools else 0}"
)
content = await self._generate_codex_structured(
model=model,
messages=messages,
response_format=response_format,
strict=strict,
tools=parsed_tools,
max_tokens=max_tokens,
client = self._get_codex_client()
content = await CodexLLMAdapter.generate_codex_structured(
client, model, messages, response_format, self.tool_calls_handler,
strict=strict, max_tokens=max_tokens, tools=parsed_tools,
)
print(
f"generate_structured (Codex): done content_is_none={content is None} content_keys={list(content.keys())[:8] if isinstance(content, dict) else None}"
"LLMClient.generate_structured Codex: done "
f"content_is_none={content is None} "
f"content_keys={list(content.keys())[:8] if isinstance(content, dict) else None}"
)
case LLMProvider.GOOGLE:
content = await self._generate_google_structured(
@ -1661,11 +1219,14 @@ class LLMClient:
tools=parsed_tools,
)
case LLMProvider.CODEX:
return self._stream_codex(
model=model,
messages=messages,
max_tokens=max_tokens,
tools=parsed_tools,
print(
f"LLMClient.stream Codex: model={model} messages={len(messages)} "
f"user_tools={len(parsed_tools) if parsed_tools else 0}"
)
client = self._get_codex_client()
return CodexLLMAdapter.stream_codex(
client, model, messages, self.tool_calls_handler,
max_tokens=max_tokens, tools=parsed_tools,
)
case LLMProvider.GOOGLE:
return self._stream_google(
@ -2094,13 +1655,14 @@ class LLMClient:
max_tokens=max_tokens,
)
case LLMProvider.CODEX:
return self._stream_codex_structured(
model=model,
messages=messages,
response_format=response_format,
strict=strict,
tools=parsed_tools,
max_tokens=max_tokens,
print(
f"LLMClient.stream_structured Codex: model={model} messages={len(messages)} "
f"strict={strict} user_tools={len(parsed_tools) if parsed_tools else 0}"
)
client = self._get_codex_client()
return CodexLLMAdapter.stream_codex_structured(
client, model, messages, response_format, self.tool_calls_handler,
strict=strict, tools=parsed_tools, max_tokens=max_tokens,
)
case LLMProvider.GOOGLE:
return self._stream_google_structured(

View file

@ -55,7 +55,7 @@ class LLMToolCallsHandler:
self.dynamic_tools.append(tool)
match self.client.llm_provider:
case LLMProvider.OPENAI | LLMProvider.OLLAMA | LLMProvider.CUSTOM:
case LLMProvider.OPENAI | LLMProvider.OLLAMA | LLMProvider.CUSTOM | LLMProvider.CODEX:
return self.parse_tool_openai(tool, strict)
case LLMProvider.ANTHROPIC:
return self.parse_tool_anthropic(tool)
@ -63,7 +63,7 @@ class LLMToolCallsHandler:
return self.parse_tool_google(tool)
case _:
raise ValueError(
f"LLM provider must be either openai, anthropic, or google"
f"LLM provider must be one of: openai, anthropic, google, codex"
)
def parse_tool_openai(

View file

@ -1,5 +1,5 @@
from copy import deepcopy
from typing import Any, List
from typing import Any, List, Mapping, Union
from openai import NOT_GIVEN
@ -22,6 +22,51 @@ supported_string_formats = [
]
def _is_json_object(value: object) -> bool:
"""True if value is a dict-like object but not a list."""
return isinstance(value, Mapping) and not isinstance(value, list)
def _convert_pydantic_schema(schema: object) -> dict | None:
"""Return JSON schema dict from a Pydantic model or class, else None."""
if BaseModel is None:
return None
if isinstance(schema, BaseModel):
return schema.model_json_schema()
if isinstance(schema, type) and issubclass(schema, BaseModel):
return schema.model_json_schema()
if hasattr(schema, "model_json_schema"):
try:
return getattr(schema, "model_json_schema")()
except TypeError:
return None
return None
def normalize_output_schema(
schema: Union[dict, type, object] | None,
) -> dict[str, Any] | None:
"""
Normalize output schema to a plain JSON schema dict (SDK-style).
Accepts a JSON schema dict, a Pydantic model class, or a Pydantic instance.
Returns None if schema is None; otherwise returns a dict suitable for
ResponseSchema / structured output.
"""
if schema is None:
return None
converted = _convert_pydantic_schema(schema)
if converted is not None:
return converted
if not _is_json_object(schema):
raise ValueError(
"output_schema must be a plain JSON object (dict) or a Pydantic model"
)
return dict(schema)
def remove_fields_from_schema(schema: dict, fields_to_remove: List[str]):
schema = deepcopy(schema)
properties_paths = get_dict_paths_with_key(schema, "properties")