From d2e3ab9d15d923ed53348279f5e24bbeb21d6822 Mon Sep 17 00:00:00 2001 From: sudipnext Date: Thu, 26 Feb 2026 12:38:58 +0545 Subject: [PATCH] feat: add CodexLLMAdapter for structured LLM calls and integrate with LLMClient - Introduced CodexLLMAdapter to handle structured and unstructured LLM calls via the Codex Responses API. - Implemented methods for converting tool formats and building request bodies for Codex. - Refactored LLMClient to utilize CodexLLMAdapter, enhancing tool call handling and response processing. - Updated LLMToolCallsHandler to support Codex as a valid LLM provider. - Enhanced schema utilities for better JSON schema handling. --- servers/fastapi/services/codex_llm.py | 431 +++++++++++++++ servers/fastapi/services/llm_client.py | 508 ++---------------- .../services/llm_tool_calls_handler.py | 4 +- servers/fastapi/utils/schema_utils.py | 47 +- 4 files changed, 514 insertions(+), 476 deletions(-) create mode 100644 servers/fastapi/services/codex_llm.py diff --git a/servers/fastapi/services/codex_llm.py b/servers/fastapi/services/codex_llm.py new file mode 100644 index 00000000..a94313f9 --- /dev/null +++ b/servers/fastapi/services/codex_llm.py @@ -0,0 +1,431 @@ +"""Codex (Responses API) adapter for structured and unstructured LLM calls. + +Stateless adapter: receives AsyncOpenAI client and tool_calls_handler at call time. +Auth and client creation stay in LLMClient. Structure matches other providers: +generate = call API, collect content + tool_calls, recurse on tool_calls; stream = same but yield deltas. + +Uses LLMToolCallsHandler directly: tools are parsed via parse_tools() in llm_client (handler supports +Codex and returns OpenAI-style dicts); this module flattens them for the Responses API. Tool execution +uses tool_calls_handler.handle_tool_calls_openai(). +""" + +import dirtyjson +from typing import Any, AsyncGenerator, List, Optional, Union + +from fastapi import HTTPException +from openai import APIStatusError, AsyncOpenAI, OpenAIError + +from models.llm_message import ( + LLMMessage, + OpenAIAssistantMessage, + LLMSystemMessage, + LLMUserMessage, +) +from models.llm_tool_call import OpenAIToolCall, OpenAIToolCallFunction +from utils.schema_utils import ensure_strict_json_schema + +# Responses API requires flat tool format: {"type":"function","name":...,"description":...,"parameters":...} +RESPONSE_SCHEMA_NAME = "ResponseSchema" +# Required tool choice for structured: force ResponseSchema (no plain-text fallback). +STRUCTURED_TOOL_CHOICE = {"type": "function", "name": RESPONSE_SCHEMA_NAME} +MAX_RECURSION_DEPTH = 5 + + +def _to_responses_tools(chat_tools: List[dict]) -> List[dict]: + """Convert Chat Completions tool format to flat Responses API format.""" + result = [] + for tool in chat_tools: + if tool.get("type") != "function": + result.append(tool) + continue + fn = tool.get("function") or tool + result.append({ + "type": "function", + "name": fn.get("name", ""), + "description": fn.get("description", ""), + "parameters": fn.get("parameters", {}), + }) + return result + + +def _items_to_openai_calls(items_by_id: dict[str, dict]) -> List[OpenAIToolCall]: + """Build OpenAIToolCall list from Responses API output_item map.""" + return [ + OpenAIToolCall( + id=item.get("call_id", item.get("id", "")), + type="function", + function=OpenAIToolCallFunction( + name=item.get("name", ""), + arguments=item.get("arguments", "{}"), + ), + ) + for item in items_by_id.values() + ] + + +async def _messages_after_tool_turn( + messages: List[LLMMessage], + items_by_id: dict[str, dict], + tool_calls_handler: Any, +) -> List[LLMMessage]: + """Handle tool calls and return messages extended with assistant turn + tool results.""" + openai_calls = _items_to_openai_calls(items_by_id) + tool_call_messages = await tool_calls_handler.handle_tool_calls_openai(openai_calls) + return [ + *messages, + OpenAIAssistantMessage( + role="assistant", + content=None, + tool_calls=[tc.model_dump() for tc in openai_calls], + ), + *tool_call_messages, + ] + + +def _build_body( + model: str, + messages: List[LLMMessage], + tools: Optional[List[dict]] = None, + tool_choice: Optional[Union[str, dict]] = None, +) -> dict: + """Build Responses API request body.""" + instructions = None + input_messages = [] + + for msg in messages: + if isinstance(msg, LLMSystemMessage): + instructions = msg.content + elif isinstance(msg, LLMUserMessage): + input_messages.append({ + "role": "user", + "content": [{"type": "input_text", "text": msg.content}], + }) + elif isinstance(msg, OpenAIAssistantMessage): + text = msg.content or "" + if text: + input_messages.append({ + "role": "assistant", + "content": [{"type": "output_text", "text": text}], + }) + else: + text = getattr(msg, "content", "") or "" + if text: + input_messages.append({ + "role": "user", + "content": [{"type": "input_text", "text": text}], + }) + + body: dict = { + "model": model, + "store": False, + "stream": True, + "text": {"verbosity": "medium"}, + "include": ["reasoning.encrypted_content"], + "tool_choice": tool_choice if tool_choice is not None else "auto", + "parallel_tool_calls": True, + } + if instructions: + body["instructions"] = instructions + if input_messages: + body["input"] = input_messages + if tools: + body["tools"] = tools + + return body + + +def _event_to_dict(event: Any) -> dict: + """Convert SDK event to dict.""" + if hasattr(event, "model_dump"): + return event.model_dump() + return { + "type": getattr(event, "type", None), + "delta": getattr(event, "delta", None), + "item": getattr(event, "item", None), + "message": getattr(event, "message", None), + "arguments": getattr(event, "arguments", None), + "name": getattr(event, "name", None), + } + + +async def _stream_raw( + client: AsyncOpenAI, + model: str, + messages: List[LLMMessage], + tools: Optional[List[dict]] = None, + tool_choice: Optional[Union[str, dict]] = None, +) -> AsyncGenerator[dict, None]: + """Yield raw SSE event dicts from Codex Responses API.""" + body = _build_body(model, messages, tools, tool_choice=tool_choice) + create_kwargs = {k: v for k, v in body.items() if k != "stream"} + + try: + stream = await client.responses.create(stream=True, **create_kwargs) + except (APIStatusError, OpenAIError) as e: + status = getattr(e, "status_code", 502) + detail = getattr(e, "message", str(e)) or str(e) + raise HTTPException( + status_code=status, + detail=f"Codex API error: {detail}"[:400], + ) from e + + async for event in stream: + yield _event_to_dict(event) + + +class CodexLLMAdapter: + """Stateless adapter for Codex Responses API. Matches other providers: generate/stream + tool recursion.""" + + @staticmethod + async def generate_codex( + client: AsyncOpenAI, + model: str, + messages: List[LLMMessage], + tool_calls_handler: Any, + max_tokens: Optional[int] = None, + tools: Optional[List[dict]] = None, + depth: int = 0, + ) -> Optional[str]: + """Generate text; on tool_calls handle and recurse (like _generate_openai / _generate_anthropic).""" + print( + f"Codex generate: model={model} depth={depth} tools_count={len(tools) if tools else 0}" + ) + responses_tools = _to_responses_tools(tools) if tools else None + text_parts: List[str] = [] + tool_calls_by_id: dict[str, dict] = {} + + async for event in _stream_raw(client, model, messages, responses_tools, tool_choice=None): + event_type = event.get("type", "") + + if event_type == "response.output_text.delta": + delta = event.get("delta", "") + if delta: + text_parts.append(delta) + elif event_type == "response.output_item.done": + item = event.get("item") or {} + if item.get("type") == "function_call": + tool_calls_by_id[item.get("call_id", item.get("id", ""))] = item + elif event_type in ("response.failed", "error"): + msg_text = event.get("message") or str(event) + raise HTTPException(status_code=502, detail=f"Codex error: {msg_text}") + + if tool_calls_by_id and tools and depth < MAX_RECURSION_DEPTH: + print( + f"Codex generate: tool calls detected depth={depth} count={len(tool_calls_by_id)}" + ) + new_messages = await _messages_after_tool_turn( + messages, tool_calls_by_id, tool_calls_handler + ) + return await CodexLLMAdapter.generate_codex( + client, model, new_messages, tool_calls_handler, + max_tokens=max_tokens, tools=tools, depth=depth + 1, + ) + + return "".join(text_parts) or None + + @staticmethod + async def stream_codex( + client: AsyncOpenAI, + model: str, + messages: List[LLMMessage], + tool_calls_handler: Any, + max_tokens: Optional[int] = None, + tools: Optional[List[dict]] = None, + depth: int = 0, + ) -> AsyncGenerator[str, None]: + """Stream text deltas; on tool_calls handle and recurse (like _stream_openai).""" + print( + f"Codex stream: model={model} depth={depth} tools_count={len(tools) if tools else 0}" + ) + responses_tools = _to_responses_tools(tools) if tools else None + tool_calls_by_id: dict[str, dict] = {} + + async for event in _stream_raw(client, model, messages, responses_tools, tool_choice=None): + event_type = event.get("type", "") + + if event_type == "response.output_text.delta": + delta = event.get("delta", "") + if delta: + yield delta + elif event_type == "response.output_item.done": + item = event.get("item") or {} + if item.get("type") == "function_call": + tool_calls_by_id[item.get("call_id", item.get("id", ""))] = item + elif event_type in ("response.failed", "error"): + msg_text = event.get("message") or str(event) + raise HTTPException(status_code=502, detail=f"Codex stream error: {msg_text}") + + if tool_calls_by_id and tools and depth < MAX_RECURSION_DEPTH: + print( + f"Codex stream: tool calls detected depth={depth} count={len(tool_calls_by_id)}" + ) + new_messages = await _messages_after_tool_turn( + messages, tool_calls_by_id, tool_calls_handler + ) + async for chunk in CodexLLMAdapter.stream_codex( + client, model, new_messages, tool_calls_handler, + max_tokens=max_tokens, tools=tools, depth=depth + 1, + ): + yield chunk + + @staticmethod + async def stream_codex_structured( + client: AsyncOpenAI, + model: str, + messages: List[LLMMessage], + response_format: dict, + tool_calls_handler: Any, + strict: bool = False, + max_tokens: Optional[int] = None, + tools: Optional[List[dict]] = None, + depth: int = 0, + ) -> AsyncGenerator[str, None]: + """Stream JSON chunks from ResponseSchema tool; recurse for other tool_calls. + + Structured output is achieved by always adding an internal ResponseSchema "tool" + (with response_format as its parameters) and tool_choice=ResponseSchema. So + user_tools=0 only means no extra tools like web search; we still use the + ResponseSchema tool to receive the model's JSON. + """ + user_tools_count = len(tools) if tools else 0 + print( + f"Codex stream_structured: model={model} depth={depth} strict={strict} " + f"user_tools={user_tools_count} (always adding ResponseSchema tool for structured JSON)" + ) + schema = ensure_strict_json_schema(response_format, path=(), root=response_format) if strict and depth == 0 else response_format + response_schema_tool = { + "type": "function", + "name": RESPONSE_SCHEMA_NAME, + "description": "Provide response to the user", + "parameters": schema, + } + all_tools: List[dict] = [response_schema_tool] + if tools: + all_tools.extend(_to_responses_tools(tools)) + + tool_calls_by_id: dict[str, dict] = {} + current_call_id: Optional[str] = None + + async for event in _stream_raw( + client, model, messages, all_tools, tool_choice=STRUCTURED_TOOL_CHOICE + ): + event_type = event.get("type", "") + + if event_type == "response.output_item.added": + item = event.get("item") or {} + if item.get("type") == "function_call" and item.get("name") == RESPONSE_SCHEMA_NAME: + current_call_id = item.get("call_id", item.get("id")) + print( + f"Codex stream_structured: ResponseSchema call started call_id={current_call_id}" + ) + + elif event_type == "response.function_call_arguments.delta": + if current_call_id is not None: + delta = event.get("delta", "") + if delta: + # Log only first few chunks to avoid log spam + print( + f"Codex stream_structured: ResponseSchema delta chunk len={len(delta)}" + ) + yield delta + + elif event_type == "response.function_call_arguments.done": + if event.get("name") == RESPONSE_SCHEMA_NAME: + arguments = event.get("arguments", "") + if arguments: + print( + f"Codex stream_structured: ResponseSchema arguments.done len={len(arguments)}" + ) + yield arguments + + elif event_type == "response.output_item.done": + item = event.get("item") or {} + if item.get("type") == "function_call": + tool_calls_by_id[item.get("call_id", item.get("id", ""))] = item + if item.get("name") == RESPONSE_SCHEMA_NAME: + arguments = item.get("arguments", "") + if arguments: + print( + f"Codex stream_structured: ResponseSchema output_item.done len={len(arguments)}" + ) + yield arguments + + elif event_type in ("response.failed", "error"): + msg_text = event.get("message") or str(event) + raise HTTPException(status_code=502, detail=f"Codex structured error: {msg_text}") + + other_tool_calls = { + k: v for k, v in tool_calls_by_id.items() + if v.get("name") != RESPONSE_SCHEMA_NAME + } + if other_tool_calls and tools and depth < MAX_RECURSION_DEPTH: + print( + f"Codex stream_structured: recursing for non-ResponseSchema tool calls " + f"depth={depth} count={len(other_tool_calls)}" + ) + new_messages = await _messages_after_tool_turn( + messages, other_tool_calls, tool_calls_handler + ) + async for chunk in CodexLLMAdapter.stream_codex_structured( + client, model, new_messages, response_format, tool_calls_handler, + strict=strict, max_tokens=max_tokens, tools=tools, depth=depth + 1, + ): + yield chunk + + @staticmethod + async def generate_codex_structured( + client: AsyncOpenAI, + model: str, + messages: List[LLMMessage], + response_format: dict, + tool_calls_handler: Any, + strict: bool = False, + max_tokens: Optional[int] = None, + tools: Optional[List[dict]] = None, + depth: int = 0, + ) -> Optional[dict]: + """Collect stream and parse JSON (like _generate_openai_structured).""" + user_tools_count = len(tools) if tools else 0 + print( + f"Codex generate_structured: model={model} depth={depth} strict={strict} " + f"user_tools={user_tools_count} (using ResponseSchema tool for structured JSON)" + ) + accumulated: List[str] = [] + async for chunk in CodexLLMAdapter.stream_codex_structured( + client, model, messages, response_format, tool_calls_handler, + strict=strict, max_tokens=max_tokens, tools=tools, depth=depth, + ): + accumulated.append(chunk) + + raw = "".join(accumulated) + if not raw: + return None + + if depth == 0: + try: + parsed = dict(dirtyjson.loads(raw)) + print( + f"Codex generate_structured: parsed JSON keys={list(parsed.keys())[:8]}" + ) + return parsed + except Exception: + start = raw.find("{") + if start >= 0: + try: + parsed = dict(dirtyjson.loads(raw[start:])) + print( + "Codex generate_structured: parsed JSON from offset " + f"{start} keys={list(parsed.keys())[:8]}" + ) + return parsed + except Exception: + pass + raise HTTPException( + status_code=502, + detail=( + "Model did not return valid structured output (expected JSON from ResponseSchema). " + "Please retry." + ), + ) + + return None diff --git a/servers/fastapi/services/llm_client.py b/servers/fastapi/services/llm_client.py index b47760dd..3353eca0 100644 --- a/servers/fastapi/services/llm_client.py +++ b/servers/fastapi/services/llm_client.py @@ -1,7 +1,7 @@ import asyncio import dirtyjson import json -from typing import AsyncGenerator, List, Optional +from typing import Any, AsyncGenerator, List, Optional, Union from fastapi import HTTPException from openai import APIStatusError, AsyncOpenAI, OpenAIError from openai.types.chat.chat_completion_chunk import ( @@ -39,6 +39,7 @@ from models.llm_tool_call import ( OpenAIToolCallFunction, ) from models.llm_tools import LLMDynamicTool, LLMTool +from services.codex_llm import CodexLLMAdapter from services.llm_tool_calls_handler import LLMToolCallsHandler from utils.async_iterator import iterator_to_async from utils.dummy_functions import do_nothing_async @@ -72,27 +73,6 @@ from utils.schema_utils import ( ) -def _to_responses_tools(chat_tools: List[dict]) -> List[dict]: - """Convert Chat Completions tool format to flat Responses API format. - - Chat Completions: {"type": "function", "function": {"name": ..., "description": ..., "parameters": ...}} - Responses API: {"type": "function", "name": ..., "description": ..., "parameters": ...} - """ - result = [] - for tool in chat_tools: - if tool.get("type") != "function": - result.append(tool) - continue - fn = tool.get("function") or tool - result.append({ - "type": "function", - "name": fn.get("name", ""), - "description": fn.get("description", ""), - "parameters": fn.get("parameters", {}), - }) - return result - - class LLMClient: def __init__(self): self.llm_provider = get_llm_provider() @@ -248,431 +228,6 @@ class LLMClient: timeout=120.0, ) - # ------------------------------------------------------------------------- - # Codex (Responses API) helpers - # ------------------------------------------------------------------------- - - def _build_codex_body( - self, - model: str, - messages: List[LLMMessage], - tools: Optional[List[dict]] = None, - ) -> dict: - """Convert LLMMessages to the Responses API request body for Codex.""" - instructions = None - input_messages = [] - - for msg in messages: - if isinstance(msg, LLMSystemMessage): - instructions = msg.content - elif isinstance(msg, LLMUserMessage): - input_messages.append({ - "role": "user", - "content": [{"type": "input_text", "text": msg.content}], - }) - elif isinstance(msg, OpenAIAssistantMessage): - # Assistant turn — may carry text or tool-call data - text = msg.content or "" - if text: - input_messages.append({ - "role": "assistant", - "content": [{"type": "output_text", "text": text}], - }) - # Tool calls from a previous assistant turn are already resolved - # via the tool result messages that follow; skip raw tool call data. - else: - # Generic fallback: treat as user message - text = getattr(msg, "content", "") or "" - if text: - input_messages.append({ - "role": "user", - "content": [{"type": "input_text", "text": text}], - }) - - body: dict = { - "model": model, - "store": False, - "stream": True, - "text": {"verbosity": "medium"}, - "include": ["reasoning.encrypted_content"], - "tool_choice": "auto", - "parallel_tool_calls": True, - } - if instructions: - body["instructions"] = instructions - if input_messages: - body["input"] = input_messages - if tools: - # Responses API uses flat format: {"type":"function","name":...} - body["tools"] = tools - - return body - - def _codex_event_to_dict(self, event) -> dict: - """Convert SDK ResponseStreamEvent to dict for existing consumers.""" - if hasattr(event, "model_dump"): - return event.model_dump() - return { - "type": getattr(event, "type", None), - "delta": getattr(event, "delta", None), - "item": getattr(event, "item", None), - "message": getattr(event, "message", None), - "arguments": getattr(event, "arguments", None), - "name": getattr(event, "name", None), - } - - async def _stream_codex_raw( - self, - model: str, - messages: List[LLMMessage], - tools: Optional[List[dict]] = None, - ): - """Async generator of raw SSE event dicts from the Codex Responses API.""" - print( - f"Codex raw stream: starting model={model} messages={len(messages)} tools_count={len(tools) if tools else 0}" - ) - client = self._get_codex_client() - body = self._build_codex_body(model, messages, tools) - create_kwargs = {k: v for k, v in body.items() if k != "stream"} - - try: - stream = await client.responses.create( - stream=True, - **create_kwargs, - ) - print("Codex raw stream: create() returned, iterating events") - except (APIStatusError, OpenAIError) as e: - status = getattr(e, "status_code", 502) - detail = getattr(e, "message", str(e)) or str(e) - print(f"Codex raw stream: API error status={status} detail={detail[:200]}") - raise HTTPException( - status_code=status, - detail=f"Codex API error: {detail}"[:400], - ) from e - - event_count = 0 - event_types: dict[str, int] = {} - async for event in stream: - event_count += 1 - d = self._codex_event_to_dict(event) - et = d.get("type") or "unknown" - event_types[et] = event_types.get(et, 0) + 1 - if event_count <= 3 or et not in ( - "response.output_text.delta", - "response.function_call_arguments.delta", - ): - print(f"Codex raw stream: event #{event_count} type={et}") - yield d - print(f"Codex raw stream: finished event_count={event_count} types={event_types}") - - async def _stream_codex( - self, - model: str, - messages: List[LLMMessage], - max_tokens: Optional[int] = None, - tools: Optional[List[dict]] = None, - depth: int = 0, - ) -> AsyncGenerator[str, None]: - """Stream text from the Codex Responses API, handling tool calls.""" - # Convert Chat-Completions tool format to flat Responses API format - responses_tools = _to_responses_tools(tools) if tools else None - - tool_calls_by_id: dict[str, dict] = {} - - async for event in self._stream_codex_raw(model, messages, responses_tools): - event_type = event.get("type", "") - - if event_type == "response.output_text.delta": - delta = event.get("delta", "") - if delta: - yield delta - - elif event_type == "response.output_item.done": - item = event.get("item") or {} - if item.get("type") == "function_call": - tool_calls_by_id[item.get("call_id", item.get("id", ""))] = item - - elif event_type in ("response.failed", "error"): - msg_text = event.get("message") or str(event) - raise HTTPException(status_code=502, detail=f"Codex stream error: {msg_text}") - - if tool_calls_by_id and tools and depth < 5: - openai_calls = [ - OpenAIToolCall( - id=item.get("call_id", item.get("id", "")), - type="function", - function=OpenAIToolCallFunction( - name=item.get("name", ""), - arguments=item.get("arguments", "{}"), - ), - ) - for item in tool_calls_by_id.values() - ] - tool_call_messages = await self.tool_calls_handler.handle_tool_calls_openai( - openai_calls - ) - new_messages = [ - *messages, - OpenAIAssistantMessage( - role="assistant", - content=None, - tool_calls=[tc.model_dump() for tc in openai_calls], - ), - *tool_call_messages, - ] - async for chunk in self._stream_codex( - model=model, - messages=new_messages, - max_tokens=max_tokens, - tools=tools, - depth=depth + 1, - ): - yield chunk - - async def _generate_codex( - self, - model: str, - messages: List[LLMMessage], - max_tokens: Optional[int] = None, - tools: Optional[List[dict]] = None, - depth: int = 0, - ) -> str | None: - """Non-streaming text generation via Codex Responses API.""" - responses_tools = _to_responses_tools(tools) if tools else None - - text_parts: list[str] = [] - tool_calls_by_id: dict[str, dict] = {} - - async for event in self._stream_codex_raw(model, messages, responses_tools): - event_type = event.get("type", "") - - if event_type == "response.output_text.delta": - delta = event.get("delta", "") - if delta: - text_parts.append(delta) - - elif event_type == "response.output_item.done": - item = event.get("item") or {} - if item.get("type") == "function_call": - tool_calls_by_id[item.get("call_id", item.get("id", ""))] = item - - elif event_type in ("response.failed", "error"): - msg_text = event.get("message") or str(event) - raise HTTPException(status_code=502, detail=f"Codex error: {msg_text}") - - if tool_calls_by_id and tools and depth < 5: - openai_calls = [ - OpenAIToolCall( - id=item.get("call_id", item.get("id", "")), - type="function", - function=OpenAIToolCallFunction( - name=item.get("name", ""), - arguments=item.get("arguments", "{}"), - ), - ) - for item in tool_calls_by_id.values() - ] - tool_call_messages = await self.tool_calls_handler.handle_tool_calls_openai( - openai_calls - ) - new_messages = [ - *messages, - OpenAIAssistantMessage( - role="assistant", - content=None, - tool_calls=[tc.model_dump() for tc in openai_calls], - ), - *tool_call_messages, - ] - return await self._generate_codex( - model=model, - messages=new_messages, - max_tokens=max_tokens, - tools=tools, - depth=depth + 1, - ) - - return "".join(text_parts) or None - - async def _stream_codex_structured( - self, - model: str, - messages: List[LLMMessage], - response_format: dict, - strict: bool = False, - max_tokens: Optional[int] = None, - tools: Optional[List[dict]] = None, - depth: int = 0, - ) -> AsyncGenerator[str, None]: - """Stream structured output from Codex via a ResponseSchema tool call.""" - # Build the ResponseSchema tool in flat Responses API format - schema = response_format - if strict and depth == 0: - schema = ensure_strict_json_schema(schema, path=(), root=schema) - - response_schema_tool = { - "type": "function", - "name": "ResponseSchema", - "description": "Provide response to the user", - "parameters": schema, - } - - all_tools: list[dict] = [response_schema_tool] - if tools: - all_tools.extend(_to_responses_tools(tools)) - print( - f"Codex structured stream: start depth={depth} strict={strict} all_tools_count={len(all_tools)}" - ) - - has_response_schema_call = False - tool_calls_by_id: dict[str, dict] = {} - current_call_id: Optional[str] = None - chunks_yielded = 0 - - async for event in self._stream_codex_raw(model, messages, all_tools): - event_type = event.get("type", "") - - if event_type == "response.output_item.added": - item = event.get("item") or {} - if item.get("type") == "function_call" and item.get("name") == "ResponseSchema": - current_call_id = item.get("call_id", item.get("id")) - has_response_schema_call = True - print(f"Codex structured stream: ResponseSchema output_item.added call_id={current_call_id}") - - elif event_type == "response.function_call_arguments.delta": - if current_call_id is not None or has_response_schema_call: - delta = event.get("delta", "") - if delta: - chunks_yielded += 1 - if chunks_yielded <= 2: - print(f"Codex structured stream: yielding delta #{chunks_yielded} len={len(delta)}") - yield delta - - elif event_type == "response.function_call_arguments.done": - # Codex may send full arguments in one .done event instead of deltas - if event.get("name") == "ResponseSchema": - arguments = event.get("arguments", "") - if arguments: - chunks_yielded += 1 - print(f"Codex structured stream: ResponseSchema function_call_arguments.done len={len(arguments)}") - yield arguments - - elif event_type == "response.output_item.done": - item = event.get("item") or {} - if item.get("type") == "function_call": - tool_calls_by_id[item.get("call_id", item.get("id", ""))] = item - # Codex may send full arguments only on item done (no deltas) - if item.get("name") == "ResponseSchema": - arguments = item.get("arguments", "") - if arguments: - chunks_yielded += 1 - print(f"Codex structured stream: ResponseSchema output_item.done len={len(arguments)}") - yield arguments - - elif event_type == "response.output_text.delta": - # Fallback: model returned plain text (e.g. JSON) instead of ResponseSchema tool call - delta = event.get("delta", "") - if delta: - chunks_yielded += 1 - if chunks_yielded <= 2: - print(f"Codex structured stream: yielding output_text delta #{chunks_yielded} len={len(delta)}") - yield delta - - elif event_type in ("response.failed", "error"): - msg_text = event.get("message") or str(event) - print(f"Codex structured stream: error event type={event_type} message={msg_text[:200]}") - raise HTTPException(status_code=502, detail=f"Codex structured error: {msg_text}") - - print( - f"Codex structured stream: end depth={depth} has_response_schema_call={has_response_schema_call} chunks_yielded={chunks_yielded} tool_calls_count={len(tool_calls_by_id)}" - ) - # Handle non-ResponseSchema tool calls recursively - other_tool_calls = { - k: v for k, v in tool_calls_by_id.items() - if v.get("name") != "ResponseSchema" - } - if other_tool_calls and tools and depth < 5: - print(f"Codex structured stream: recursing for other_tool_calls count={len(other_tool_calls)}") - openai_calls = [ - OpenAIToolCall( - id=item.get("call_id", item.get("id", "")), - type="function", - function=OpenAIToolCallFunction( - name=item.get("name", ""), - arguments=item.get("arguments", "{}"), - ), - ) - for item in other_tool_calls.values() - ] - tool_call_messages = await self.tool_calls_handler.handle_tool_calls_openai( - openai_calls - ) - new_messages = [ - *messages, - OpenAIAssistantMessage( - role="assistant", - content=None, - tool_calls=[tc.model_dump() for tc in openai_calls], - ), - *tool_call_messages, - ] - async for chunk in self._stream_codex_structured( - model=model, - messages=new_messages, - response_format=response_format, - strict=strict, - max_tokens=max_tokens, - tools=tools, - depth=depth + 1, - ): - yield chunk - - async def _generate_codex_structured( - self, - model: str, - messages: List[LLMMessage], - response_format: dict, - strict: bool = False, - max_tokens: Optional[int] = None, - tools: Optional[List[dict]] = None, - depth: int = 0, - ) -> dict | None: - """Non-streaming structured output from Codex via ResponseSchema tool. - Codex API requires stream=True; we use the same client and stream, then - accumulate chunks (from deltas and .done events) and parse the result. - """ - print(f"Codex generate_structured: start depth={depth} strict={strict}") - accumulated: list[str] = [] - async for chunk in self._stream_codex_structured( - model=model, - messages=messages, - response_format=response_format, - strict=strict, - max_tokens=max_tokens, - tools=tools, - depth=depth, - ): - accumulated.append(chunk) - - raw = "".join(accumulated) - print( - f"Codex generate_structured: accumulated chunks={len(accumulated)} raw_len={len(raw)} depth={depth}" - ) - if not raw: - print("Codex generate_structured: no content (raw empty), returning None") - return None - if depth == 0: - try: - parsed = dict(dirtyjson.loads(raw)) - print(f"Codex generate_structured: parsed OK keys={list(parsed.keys())[:10]}") - return parsed - except Exception as parse_err: - print( - f"Codex generate_structured: parse failed raw_preview={raw[:200] if raw else ''} err={parse_err}" - ) - raise - return raw - # ? Prompts def _get_system_prompt(self, messages: List[LLMMessage]) -> str: for message in messages: @@ -946,11 +501,14 @@ class LLMClient: tools=parsed_tools, ) case LLMProvider.CODEX: - content = await self._generate_codex( - model=model, - messages=messages, - max_tokens=max_tokens, - tools=parsed_tools, + print( + f"LLMClient.generate Codex: model={model} messages={len(messages)} " + f"user_tools={len(parsed_tools) if parsed_tools else 0}" + ) + client = self._get_codex_client() + content = await CodexLLMAdapter.generate_codex( + client, model, messages, self.tool_calls_handler, + max_tokens=max_tokens, tools=parsed_tools, ) case LLMProvider.GOOGLE: content = await self._generate_google( @@ -1330,18 +888,18 @@ class LLMClient: ) case LLMProvider.CODEX: print( - f"generate_structured (Codex): model={model} messages={len(messages)} strict={strict}" + f"LLMClient.generate_structured Codex: model={model} messages={len(messages)} " + f"strict={strict} user_tools={len(parsed_tools) if parsed_tools else 0}" ) - content = await self._generate_codex_structured( - model=model, - messages=messages, - response_format=response_format, - strict=strict, - tools=parsed_tools, - max_tokens=max_tokens, + client = self._get_codex_client() + content = await CodexLLMAdapter.generate_codex_structured( + client, model, messages, response_format, self.tool_calls_handler, + strict=strict, max_tokens=max_tokens, tools=parsed_tools, ) print( - f"generate_structured (Codex): done content_is_none={content is None} content_keys={list(content.keys())[:8] if isinstance(content, dict) else None}" + "LLMClient.generate_structured Codex: done " + f"content_is_none={content is None} " + f"content_keys={list(content.keys())[:8] if isinstance(content, dict) else None}" ) case LLMProvider.GOOGLE: content = await self._generate_google_structured( @@ -1661,11 +1219,14 @@ class LLMClient: tools=parsed_tools, ) case LLMProvider.CODEX: - return self._stream_codex( - model=model, - messages=messages, - max_tokens=max_tokens, - tools=parsed_tools, + print( + f"LLMClient.stream Codex: model={model} messages={len(messages)} " + f"user_tools={len(parsed_tools) if parsed_tools else 0}" + ) + client = self._get_codex_client() + return CodexLLMAdapter.stream_codex( + client, model, messages, self.tool_calls_handler, + max_tokens=max_tokens, tools=parsed_tools, ) case LLMProvider.GOOGLE: return self._stream_google( @@ -2094,13 +1655,14 @@ class LLMClient: max_tokens=max_tokens, ) case LLMProvider.CODEX: - return self._stream_codex_structured( - model=model, - messages=messages, - response_format=response_format, - strict=strict, - tools=parsed_tools, - max_tokens=max_tokens, + print( + f"LLMClient.stream_structured Codex: model={model} messages={len(messages)} " + f"strict={strict} user_tools={len(parsed_tools) if parsed_tools else 0}" + ) + client = self._get_codex_client() + return CodexLLMAdapter.stream_codex_structured( + client, model, messages, response_format, self.tool_calls_handler, + strict=strict, tools=parsed_tools, max_tokens=max_tokens, ) case LLMProvider.GOOGLE: return self._stream_google_structured( diff --git a/servers/fastapi/services/llm_tool_calls_handler.py b/servers/fastapi/services/llm_tool_calls_handler.py index 63476028..44c2003b 100644 --- a/servers/fastapi/services/llm_tool_calls_handler.py +++ b/servers/fastapi/services/llm_tool_calls_handler.py @@ -55,7 +55,7 @@ class LLMToolCallsHandler: self.dynamic_tools.append(tool) match self.client.llm_provider: - case LLMProvider.OPENAI | LLMProvider.OLLAMA | LLMProvider.CUSTOM: + case LLMProvider.OPENAI | LLMProvider.OLLAMA | LLMProvider.CUSTOM | LLMProvider.CODEX: return self.parse_tool_openai(tool, strict) case LLMProvider.ANTHROPIC: return self.parse_tool_anthropic(tool) @@ -63,7 +63,7 @@ class LLMToolCallsHandler: return self.parse_tool_google(tool) case _: raise ValueError( - f"LLM provider must be either openai, anthropic, or google" + f"LLM provider must be one of: openai, anthropic, google, codex" ) def parse_tool_openai( diff --git a/servers/fastapi/utils/schema_utils.py b/servers/fastapi/utils/schema_utils.py index 92aafd97..9efed7c5 100644 --- a/servers/fastapi/utils/schema_utils.py +++ b/servers/fastapi/utils/schema_utils.py @@ -1,5 +1,5 @@ from copy import deepcopy -from typing import Any, List +from typing import Any, List, Mapping, Union from openai import NOT_GIVEN @@ -22,6 +22,51 @@ supported_string_formats = [ ] +def _is_json_object(value: object) -> bool: + """True if value is a dict-like object but not a list.""" + return isinstance(value, Mapping) and not isinstance(value, list) + + +def _convert_pydantic_schema(schema: object) -> dict | None: + """Return JSON schema dict from a Pydantic model or class, else None.""" + if BaseModel is None: + return None + if isinstance(schema, BaseModel): + return schema.model_json_schema() + if isinstance(schema, type) and issubclass(schema, BaseModel): + return schema.model_json_schema() + if hasattr(schema, "model_json_schema"): + try: + return getattr(schema, "model_json_schema")() + except TypeError: + return None + return None + + +def normalize_output_schema( + schema: Union[dict, type, object] | None, +) -> dict[str, Any] | None: + """ + Normalize output schema to a plain JSON schema dict (SDK-style). + Accepts a JSON schema dict, a Pydantic model class, or a Pydantic instance. + Returns None if schema is None; otherwise returns a dict suitable for + ResponseSchema / structured output. + """ + if schema is None: + return None + + converted = _convert_pydantic_schema(schema) + if converted is not None: + return converted + + if not _is_json_object(schema): + raise ValueError( + "output_schema must be a plain JSON object (dict) or a Pydantic model" + ) + + return dict(schema) + + def remove_fields_from_schema(schema: dict, fields_to_remove: List[str]): schema = deepcopy(schema) properties_paths = get_dict_paths_with_key(schema, "properties")