""" LLM Service for Synthetic Society This service provides a centralized interface for interacting with language models through the Google Generative AI API. It supports various prompting functions for different application features. """ import os import json import asyncio import logging import base64 import traceback import time from google import genai from google.genai import errors as genai_errors from openai import AsyncOpenAI import httpx from typing import Dict, Any, Optional, Union, List from PIL import Image import io # Set up API keys — must be set in environment, no hardcoded fallbacks def _require_env(key: str) -> str: value = os.environ.get(key) if not value: raise RuntimeError(f"Required environment variable '{key}' is not set. Set it in backend/.env before starting the server.") return value GEMINI_API_KEY = _require_env('GEMINI_API_KEY') OPENAI_API_KEY = _require_env('OPENAI_API_KEY') def get_gemini_client(): """Create a fresh Gemini client for each call. Creating a new client per call avoids event loop mismatch issues that occur when caching clients in ASGI environments where requests may come on different event loops. The overhead is minimal compared to the LLM API call. Force httpx transport to avoid aiohttp AssertionError (connector is None) that occurs when aiohttp is installed in the environment via other packages. """ from google.genai import types as genai_types return genai.Client( api_key=GEMINI_API_KEY, http_options=genai_types.HttpOptions( httpx_async_client=httpx.AsyncClient(timeout=600.0) ) ) def get_openai_client(): """Create a fresh OpenAI client for each call. Creating a new client per call avoids event loop mismatch issues that occur when caching clients in ASGI environments where requests may come on different event loops. The overhead is minimal compared to the LLM API call. """ return AsyncOpenAI(api_key=OPENAI_API_KEY, timeout=600.0) # The default model we're using DEFAULT_MODEL = "gemini-3.1-pro-preview" # Supported models SUPPORTED_MODELS = { 'gemini-3.1-pro-preview': 'gemini', 'gpt-5.4-2026-03-05': 'openai', } # Aliases for renamed/legacy model IDs stored in the database MODEL_ALIASES = { 'gpt-5': 'gpt-5.4-2026-03-05', 'gpt-5.2': 'gpt-5.4-2026-03-05', 'gemini-3-pro-preview': 'gemini-3.1-pro-preview', 'gpt-4.1': 'gemini-3.1-pro-preview', } class LLMServiceError(Exception): """Exception raised for errors in LLM operations.""" pass class LLMService: """Centralized service for LLM operations.""" @staticmethod def _extract_responses_api_content(response) -> str: """ Extract text content from OpenAI Responses API response. Args: response: The response object from OpenAI Responses API Returns: The extracted text content """ result = "" # Try to extract from output structure if hasattr(response, 'output') and response.output: for item in response.output: if hasattr(item, 'content') and item.content is not None: for content in item.content: if hasattr(content, 'text'): result += content.text # Fallback to output_text if available if not result and hasattr(response, 'output_text'): result = response.output_text # Additional fallback - try direct text access if not result and hasattr(response, 'text'): result = response.text return result.strip() @staticmethod def _resolve_model(model_name: Optional[str] = None) -> str: """ Resolve a model name, applying aliases for legacy/renamed models. Args: model_name: Optional model name to use. Defaults to the default model. Returns: The resolved model name """ actual_model = model_name or DEFAULT_MODEL return MODEL_ALIASES.get(actual_model, actual_model) @staticmethod def _get_model_provider(model_name: Optional[str] = None) -> str: """ Get the provider for a given model name. Args: model_name: Optional model name to use. Defaults to the default model. Returns: The provider name ('gemini' or 'openai') """ actual_model = LLMService._resolve_model(model_name) return SUPPORTED_MODELS.get(actual_model, 'gemini') @staticmethod def _extract_text_from_new_genai_response(response) -> str: """ Extract text from a new Google GenAI SDK response. Args: response: The response object from the new Google GenAI SDK Returns: The extracted text content Raises: LLMServiceError: If no text content can be extracted """ try: # New SDK has a simpler text attribute if hasattr(response, 'text') and response.text: return response.text.strip() # If that doesn't work, check for candidates structure if hasattr(response, 'candidates') and response.candidates: for candidate in response.candidates: if hasattr(candidate, 'content') and candidate.content: if hasattr(candidate.content, 'parts') and candidate.content.parts: text_parts = [] for part in candidate.content.parts: if hasattr(part, 'text') and part.text: text_parts.append(part.text) if text_parts: return ''.join(text_parts).strip() # If no text found, check if the response object has direct text content if hasattr(response, 'content') and response.content: return str(response.content).strip() raise LLMServiceError("Unable to extract text from new GenAI SDK response") except Exception as e: if isinstance(e, LLMServiceError): raise raise LLMServiceError(f"Error extracting text from new GenAI SDK response: {str(e)}") @staticmethod def _extract_usage_metadata(response, provider: str) -> dict: """Extract token counts from a provider response. All fields default to 0.""" _log = logging.getLogger(__name__) if provider == 'gemini': um = getattr(response, 'usage_metadata', None) if um is None: _log.warning("Gemini response missing usage_metadata — token counts will be 0, cost recorded as $0") return {'prompt': 0, 'completion': 0, 'cached': 0, 'reasoning': 0} # thoughts_token_count (thinking models) is already included in candidates_token_count. # Capture it separately so the stored event can show the split. thoughts = getattr(um, 'thoughts_token_count', 0) or 0 return { 'prompt': getattr(um, 'prompt_token_count', 0) or 0, 'completion': getattr(um, 'candidates_token_count', 0) or 0, 'cached': getattr(um, 'cached_content_token_count', 0) or 0, 'reasoning': thoughts, } elif provider == 'openai': usage = getattr(response, 'usage', None) if usage is None: _log.warning("OpenAI response missing usage — token counts will be 0, cost recorded as $0") return {'prompt': 0, 'completion': 0, 'cached': 0, 'reasoning': 0} # Responses API (gpt-5.4-2026-03-05) if hasattr(usage, 'input_tokens'): input_details = getattr(usage, 'input_tokens_details', None) output_details = getattr(usage, 'output_tokens_details', None) return { 'prompt': getattr(usage, 'input_tokens', 0) or 0, 'completion': getattr(usage, 'output_tokens', 0) or 0, 'cached': getattr(input_details, 'cached_tokens', 0) or 0 if input_details else 0, 'reasoning': getattr(output_details, 'reasoning_tokens', 0) or 0 if output_details else 0, } # Chat Completions API prompt_details = getattr(usage, 'prompt_tokens_details', None) return { 'prompt': getattr(usage, 'prompt_tokens', 0) or 0, 'completion': getattr(usage, 'completion_tokens', 0) or 0, 'cached': getattr(prompt_details, 'cached_tokens', 0) or 0 if prompt_details else 0, 'reasoning': 0, } return {'prompt': 0, 'completion': 0, 'cached': 0, 'reasoning': 0} @staticmethod async def _record_usage(response, provider: str, model: str, start_time: float, retry_count: int) -> None: """Record a usage event after a successful LLM call. Never raises.""" try: from app.services.llm_usage_context import current_context from app.models.usage_event import UsageEvent from app.models.model_pricing import ModelPricing ctx = current_context() tokens = LLMService._extract_usage_metadata(response, provider) pricing = await ModelPricing.current_for(model) cost = ModelPricing.compute_cost( pricing, prompt_tokens=tokens['prompt'], completion_tokens=tokens['completion'], cached_tokens=tokens['cached'], ) price_id = pricing.get('_id') if pricing else None await UsageEvent.record( provider=provider, model=model, prompt_tokens=tokens['prompt'], completion_tokens=tokens['completion'], cached_tokens=tokens['cached'], reasoning_tokens=tokens['reasoning'], cost_usd=cost, price_snapshot_id=price_id, duration_ms=int((time.monotonic() - start_time) * 1000), retry_count=retry_count, status="success", user_id=ctx.user_id, focus_group_id=ctx.focus_group_id, persona_id=ctx.persona_id, feature=ctx.feature, task_id=ctx.task_id, ) # Notify focus group room of cost delta (non-fatal) try: if ctx.focus_group_id: from app.models.focus_group import emit_websocket_event asyncio.create_task(emit_websocket_event( 'usage_update', ctx.focus_group_id, { 'cost_delta': cost.get('total', 0), 'tokens_delta': tokens['prompt'] + tokens['completion'], 'feature': ctx.feature, } )) except Exception: pass except Exception: logging.getLogger(__name__).warning("_record_usage failed (non-fatal)", exc_info=True) @staticmethod async def generate_content( prompt: str, temperature: float = 0.7, max_tokens: Optional[int] = None, model_name: Optional[str] = None, system_prompt: Optional[str] = None, reasoning_effort: Optional[str] = None, verbosity: Optional[str] = None ) -> str: """ Generate content using the LLM with retry mechanism for transient errors. Args: prompt: The prompt to send to the model temperature: Controls randomness (0.0 = deterministic, 1.0 = creative) max_tokens: Maximum number of tokens to generate model_name: Optional model name to use system_prompt: Optional system prompt to define the role of the AI reasoning_effort: GPT-5.2 only - Controls thinking time (minimal/low/medium/high) verbosity: GPT-5.2 only - Controls response length (low/medium/high) Returns: The generated text response Raises: LLMServiceError: If there's an issue with the generation """ logger = logging.getLogger(__name__) max_retries = 3 last_error = None # Quota pre-flight — raises QuotaExceededError if over limit try: from app.models.quota import check_quota, QuotaExceededError as _QuotaExceededError from app.services.llm_usage_context import current_context as _ctx _c = _ctx() await check_quota(_c.user_id, _c.focus_group_id) except Exception as _qe: from app.models.quota import QuotaExceededError as _QEE if isinstance(_qe, _QEE): raise pass # Non-fatal: DB failures must not block LLM calls actual_model = LLMService._resolve_model(model_name) provider = LLMService._get_model_provider(model_name) _start_time = time.monotonic() for attempt in range(max_retries): attempt_num = attempt + 1 logger.debug(f"LLM content generation attempt {attempt_num}/{max_retries} using {provider} provider") try: if provider == 'openai': if actual_model == 'gpt-5.4-2026-03-05': # Use OpenAI Responses API for gpt-5.4-2026-03-05 input_content = prompt if system_prompt: input_content = f"System: {system_prompt}\n\nUser: {prompt}" kwargs = { "model": actual_model, "input": input_content, } # Add reasoning configuration reasoning_config = {} if reasoning_effort: reasoning_config["effort"] = reasoning_effort else: reasoning_config["effort"] = "low" # Default kwargs["reasoning"] = reasoning_config # Add text configuration with verbosity text_config = { "format": {"type": "text"} } if verbosity: text_config["verbosity"] = verbosity else: text_config["verbosity"] = "medium" # Default kwargs["text"] = text_config # Note: GPT-5 Responses API does not support max_tokens parameter response = await get_openai_client().responses.create(**kwargs) result = LLMService._extract_responses_api_content(response) else: # Use Chat Completions API for non-GPT-5 models messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": prompt}) kwargs = { "model": actual_model, "messages": messages, "temperature": temperature, } if max_tokens: kwargs["max_tokens"] = max_tokens response = await get_openai_client().chat.completions.create(**kwargs) result = response.choices[0].message.content.strip() else: # New Google GenAI SDK - async call config = genai.types.GenerateContentConfig( temperature=temperature, ) if max_tokens: config.max_output_tokens = max_tokens # Prepare the prompt - combine system prompt with user prompt if needed if system_prompt: combined_prompt = f"System: {system_prompt}\n\nUser: {prompt}" else: combined_prompt = prompt # Make async call to new GenAI SDK response = await get_gemini_client().aio.models.generate_content( model=actual_model, contents=combined_prompt, config=config ) # Extract text from new SDK response result = LLMService._extract_text_from_new_genai_response(response) if attempt > 0: logger.info(f"LLM content generation succeeded on attempt {attempt_num}/{max_retries}") await LLMService._record_usage(response, provider, actual_model, _start_time, attempt) return result except genai_errors.APIError as e: # Google GenAI SDK specific error handling last_error = e error_code = getattr(e, 'code', 'unknown') error_message = getattr(e, 'message', str(e)) or str(e) or repr(e) logger.warning(f"LLM attempt {attempt_num}/{max_retries} failed: [Google API {error_code}] {error_message}") # Retryable: 429 rate limit, 500+ server errors is_retryable = ( error_code == 429 or (isinstance(error_code, int) and error_code >= 500) ) if is_retryable: if attempt < max_retries - 1: wait_time = 2 ** attempt logger.info(f"Retryable Google API error. Waiting {wait_time}s before retry {attempt_num + 1}/{max_retries}") await asyncio.sleep(wait_time) continue else: logger.error(f"Retryable Google API error [{error_code}] but max retries ({max_retries}) reached") else: # 400, 403, 404, etc. - non-retryable logger.error(f"Non-retryable Google API error [{error_code}]: {error_message}") break except Exception as e: # Fallback for OpenAI and other non-Google errors last_error = e # Debug: capture full exception details exc_type = type(e).__name__ exc_module = type(e).__module__ exc_str = str(e) exc_repr = repr(e) exc_args = getattr(e, 'args', ()) exc_dict = getattr(e, '__dict__', {}) exc_tb = traceback.format_exc() logger.warning(f"LLM attempt {attempt_num}/{max_retries} failed - Type: {exc_module}.{exc_type}, str: '{exc_str}', repr: {exc_repr}, args: {exc_args}, dict: {exc_dict}") logger.warning(f"Full traceback:\n{exc_tb}") error_message = exc_str.lower() if exc_str else exc_repr.lower() # Check if this is a retryable error (API internal errors, rate limiting, etc.) if ("500" in error_message or "internal error" in error_message or "internal server error" in error_message or "service unavailable" in error_message or "timeout" in error_message or "rate" in error_message): if attempt < max_retries - 1: # Wait before retrying (exponential backoff) wait_time = 2 ** attempt # 1s, 2s, 4s logger.info(f"Retryable error detected. Waiting {wait_time} seconds before retry {attempt_num + 1}/{max_retries}") await asyncio.sleep(wait_time) continue else: logger.error(f"Retryable error detected but max retries ({max_retries}) reached") else: logger.error(f"Non-retryable error detected: {str(e)}") break # If we've exhausted all retries or hit a non-retryable error, raise the last error error_detail = "" if isinstance(last_error, genai_errors.APIError): error_code = getattr(last_error, 'code', 'unknown') error_msg = getattr(last_error, 'message', str(last_error)) or str(last_error) or repr(last_error) error_detail = f"[Google API {error_code}] {error_msg}" else: # Use repr if str is empty error_detail = str(last_error) or repr(last_error) or f"{type(last_error).__module__}.{type(last_error).__name__}: {getattr(last_error, 'args', ())}" logger.error(f"LLM content generation failed after {max_retries} attempts. Final error: {error_detail}") raise LLMServiceError(f"Error generating content: {error_detail}") @staticmethod def parse_json_response(response_text: str) -> Union[Dict[str, Any], List[Any]]: """ Parse a JSON response from the LLM. Args: response_text: The text response from the LLM Returns: A dictionary or list parsed from the JSON response Raises: LLMServiceError: If there's an issue parsing the JSON """ try: # Handle common formatting issues in the response clean_response = response_text # Remove markdown code blocks if present if clean_response.startswith("```json"): clean_response = clean_response.strip("```json").strip("```").strip() elif clean_response.startswith("```"): clean_response = clean_response.strip("```").strip() # Parse the JSON return json.loads(clean_response) except json.JSONDecodeError as e: error_msg = f"Failed to parse JSON response: {str(e)}. Raw response: {response_text[:200]}..." logging.getLogger(__name__).error(error_msg) raise LLMServiceError(error_msg) @staticmethod async def generate_structured_response( prompt: str, temperature: float = 0.7, max_tokens: Optional[int] = None, model_name: Optional[str] = None, system_prompt: Optional[str] = None, reasoning_effort: Optional[str] = None, verbosity: Optional[str] = None ) -> Dict[str, Any]: """ Generate a structured JSON response using the LLM. Args: prompt: The prompt to send to the model temperature: Controls randomness in generation max_tokens: Maximum tokens to generate model_name: Optional model name to use system_prompt: Optional system prompt to define the role of the AI reasoning_effort: GPT-5.2 only - Controls thinking time (minimal/low/medium/high) verbosity: GPT-5.2 only - Controls response length (low/medium/high) Returns: A dictionary parsed from the JSON response Raises: LLMServiceError: If there's an issue with generation or parsing """ response_text = await LLMService.generate_content( prompt=prompt, temperature=temperature, max_tokens=max_tokens, model_name=model_name, system_prompt=system_prompt, reasoning_effort=reasoning_effort, verbosity=verbosity ) return LLMService.parse_json_response(response_text) @staticmethod async def generate_structured_array( prompt: str, temperature: float = 0.7, max_tokens: Optional[int] = None, model_name: Optional[str] = None, system_prompt: Optional[str] = None, reasoning_effort: Optional[str] = None, verbosity: Optional[str] = None ) -> List[Dict[str, Any]]: """ Generate a structured JSON array response using the LLM. Args: prompt: The prompt to send to the model temperature: Controls randomness in generation max_tokens: Maximum tokens to generate model_name: Optional model name to use system_prompt: Optional system prompt to define the role of the AI reasoning_effort: GPT-5.2 only - Controls thinking time (minimal/low/medium/high) verbosity: GPT-5.2 only - Controls response length (low/medium/high) Returns: A list of dictionaries parsed from the JSON array response Raises: LLMServiceError: If there's an issue with generation or parsing """ response_text = await LLMService.generate_content( prompt=prompt, temperature=temperature, max_tokens=max_tokens, model_name=model_name, system_prompt=system_prompt, reasoning_effort=reasoning_effort, verbosity=verbosity ) result = LLMService.parse_json_response(response_text) # Ensure the result is a list if not isinstance(result, list): raise LLMServiceError(f"Expected a JSON array but received {type(result)}") return result @staticmethod async def generate_multimodal_content( prompt: str, image_paths: List[str], temperature: float = 0.7, max_tokens: Optional[int] = None, model_name: Optional[str] = None ) -> str: """ Generate content using both text and image inputs. Args: prompt: The text prompt to send to the model image_paths: List of paths to image files to include temperature: Controls randomness in generation max_tokens: Maximum tokens to generate model_name: Optional model name to use Returns: The generated text response Raises: LLMServiceError: If there's an issue with generation or image processing """ logger = logging.getLogger(__name__) max_retries = 3 last_error = None # Quota pre-flight — raises QuotaExceededError if over limit try: from app.models.quota import check_quota, QuotaExceededError as _QuotaExceededError from app.services.llm_usage_context import current_context as _ctx _c = _ctx() await check_quota(_c.user_id, _c.focus_group_id) except Exception as _qe: from app.models.quota import QuotaExceededError as _QEE if isinstance(_qe, _QEE): raise pass # Non-fatal: DB failures must not block LLM calls actual_model = LLMService._resolve_model(model_name) provider = LLMService._get_model_provider(model_name) logger.info(f"Generating multimodal content with {len(image_paths)} image(s) using {provider} provider") _start_time = time.monotonic() for attempt in range(max_retries): attempt_num = attempt + 1 logger.debug(f"Multimodal content generation attempt {attempt_num}/{max_retries}") try: if provider == 'openai': # OpenAI multimodal API call import base64 # Prepare image content for OpenAI API image_content = [] for image_path in image_paths: if not os.path.exists(image_path): raise LLMServiceError(f"Image file not found: {image_path}") # Encode image to base64 with open(image_path, "rb") as image_file: base64_image = base64.b64encode(image_file.read()).decode('utf-8') # Get image format image_format = image_path.lower().split('.')[-1] if image_format == 'jpg': image_format = 'jpeg' image_content.append({ "type": "image_url", "image_url": { "url": f"data:image/{image_format};base64,{base64_image}" } }) logger.debug(f"Successfully loaded image for OpenAI: {image_path}") if actual_model == 'gpt-5.4-2026-03-05': # Use Responses API for gpt-5.4-2026-03-05 multimodal # Note: GPT-5 Responses API supports multimodal input input_content = [{"role": "user", "content": [{"type": "input_text", "text": prompt}]}] # Add images to the content array for img_content in image_content: input_content[0]["content"].append({ "type": "input_image", "image_url": img_content["image_url"]["url"] }) kwargs = { "model": actual_model, "input": input_content, "reasoning": {"effort": "low"}, # Default reasoning for multimodal "text": { "verbosity": "medium", # Default verbosity for multimodal "format": {"type": "text"} } } # Note: GPT-5 Responses API does not support max_tokens parameter response = await get_openai_client().responses.create(**kwargs) result = LLMService._extract_responses_api_content(response) else: # Use Chat Completions API for non-GPT-5 models content = [{"type": "text", "text": prompt}] content.extend(image_content) kwargs = { "model": actual_model, "messages": [{"role": "user", "content": content}], "temperature": temperature, } if max_tokens: kwargs["max_tokens"] = max_tokens response = await get_openai_client().chat.completions.create(**kwargs) result = response.choices[0].message.content.strip() else: # New Google GenAI SDK - multimodal async call config = genai.types.GenerateContentConfig( temperature=temperature, ) if max_tokens: config.max_output_tokens = max_tokens # Prepare multimodal content for new SDK content_parts = [] # Add text prompt content_parts.append(genai.types.Part.from_text(prompt)) # Add images for image_path in image_paths: try: if not os.path.exists(image_path): raise LLMServiceError(f"Image file not found: {image_path}") # Read image data for new SDK with open(image_path, 'rb') as img_file: image_data = img_file.read() # Determine MIME type from file extension ext = os.path.splitext(image_path)[1].lower() mime_type = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.webp': 'image/webp' }.get(ext, 'image/jpeg') # Default to JPEG content_parts.append(genai.types.Part.from_bytes(image_data, mime_type=mime_type)) logger.debug(f"Successfully loaded image for new GenAI SDK: {image_path}") except Exception as e: raise LLMServiceError(f"Failed to load image {image_path}: {str(e)}") # Make async call to new GenAI SDK with multimodal content response = await get_gemini_client().aio.models.generate_content( model=actual_model, contents=content_parts, config=config ) # Extract text from new SDK response result = LLMService._extract_text_from_new_genai_response(response) if attempt > 0: logger.info(f"Multimodal content generation succeeded on attempt {attempt_num}/{max_retries}") await LLMService._record_usage(response, provider, actual_model, _start_time, attempt) return result except Exception as e: last_error = e error_message = str(e).lower() logger.warning(f"Multimodal attempt {attempt_num}/{max_retries} failed: {str(e)}") # Check if this is a retryable error if ("500" in error_message or "internal error" in error_message or "internal server error" in error_message or "service unavailable" in error_message or "timeout" in error_message or "rate" in error_message): if attempt < max_retries - 1: # Wait before retrying (exponential backoff) wait_time = 2 ** attempt # 1s, 2s, 4s logger.info(f"Retryable error detected. Waiting {wait_time} seconds before retry {attempt_num + 1}/{max_retries}") await asyncio.sleep(wait_time) continue else: logger.error(f"Retryable error detected but max retries ({max_retries}) reached") else: logger.error(f"Non-retryable error detected: {str(e)}") break # If we've exhausted all retries or hit a non-retryable error, raise the last error logger.error(f"Multimodal content generation failed after {max_retries} attempts. Final error: {str(last_error)}") raise LLMServiceError(f"Error generating multimodal content: {str(last_error)}") @staticmethod async def generate_contextual_response( prompt: str, conversation_context: List[Dict[str, Any]], temperature: float = 0.7, max_tokens: Optional[int] = None, model_name: Optional[str] = None, reasoning_effort: Optional[str] = None, verbosity: Optional[str] = None ) -> str: """ Generate content using conversation context that may include both text and images in sequence. Args: prompt: The main prompt for the LLM conversation_context: List of context items (text and image) in chronological order temperature: Controls randomness in generation max_tokens: Maximum tokens to generate model_name: Optional model name to use reasoning_effort: GPT-5.2 only - Controls thinking time (minimal/low/medium/high) verbosity: GPT-5.2 only - Controls response length (low/medium/high) Returns: The generated text response Raises: LLMServiceError: If there's an issue with generation """ logger = logging.getLogger(__name__) # Quota pre-flight — raises QuotaExceededError if over limit try: from app.models.quota import check_quota, QuotaExceededError as _QuotaExceededError from app.services.llm_usage_context import current_context as _ctx _c = _ctx() await check_quota(_c.user_id, _c.focus_group_id) except Exception as _qe: from app.models.quota import QuotaExceededError as _QEE if isinstance(_qe, _QEE): raise pass # Non-fatal: DB failures must not block LLM calls # Separate text and image content from the conversation context text_context_parts = [] image_parts = [] print(f"🎯 Processing {len(conversation_context)} context items for LLM") for item in conversation_context: if item["type"] == "text": text_context_parts.append(item["content"]) elif item["type"] == "image": try: image_path = item["path"] if os.path.exists(image_path): # Load image using PIL with Image.open(image_path) as img: # Convert to RGB if necessary if img.mode != 'RGB': img = img.convert('RGB') image_parts.append(img.copy()) print(f"🖼️ Loaded image for context: {item['filename']}") else: print(f"⚠️ Image not found for context: {image_path}") except Exception as e: print(f"❌ Failed to load image for context: {item['path']}: {e}") # Build the full context prompt context_prompt = "" if text_context_parts: context_prompt = "CONVERSATION CONTEXT:\n" + "\n".join(text_context_parts) + "\n\n" full_prompt = context_prompt + prompt print(f"📝 Context prompt length: {len(context_prompt)} characters") print(f"🖼️ Total images in context: {len(image_parts)}") # If we have images, use multimodal generation if image_parts: print(f"🎨 Using multimodal generation with {len(image_parts)} images") actual_model = LLMService._resolve_model(model_name) provider = LLMService._get_model_provider(model_name) max_retries = 3 last_error = None _start_time = time.monotonic() for attempt in range(max_retries): attempt_num = attempt + 1 logger.debug(f"Contextual multimodal generation attempt {attempt_num}/{max_retries}") try: if provider == 'openai': # OpenAI contextual multimodal API call # Convert PIL images to base64 for OpenAI API image_content = [] for i, img in enumerate(image_parts): # Convert PIL image to base64 buffer = io.BytesIO() img.save(buffer, format='PNG') base64_image = base64.b64encode(buffer.getvalue()).decode('utf-8') image_content.append({ "type": "image_url", "image_url": { "url": f"data:image/png;base64,{base64_image}" } }) if actual_model == 'gpt-5.4-2026-03-05': # Use Responses API for gpt-5.4-2026-03-05 contextual multimodal input_content = [{"role": "user", "content": [{"type": "input_text", "text": full_prompt}]}] # Add images to the content array for img_content in image_content: input_content[0]["content"].append({ "type": "input_image", "image_url": img_content["image_url"]["url"] }) kwargs = { "model": actual_model, "input": input_content, "reasoning": {"effort": reasoning_effort or "low"}, "text": { "verbosity": verbosity or "medium", "format": {"type": "text"} } } # Note: GPT-5 Responses API does not support max_tokens parameter response = await get_openai_client().responses.create(**kwargs) result = LLMService._extract_responses_api_content(response) else: # Use Chat Completions API for non-GPT-5 models content = [{"type": "text", "text": full_prompt}] content.extend(image_content) kwargs = { "model": actual_model, "messages": [{"role": "user", "content": content}], "temperature": temperature, } if max_tokens: kwargs["max_tokens"] = max_tokens response = await get_openai_client().chat.completions.create(**kwargs) result = response.choices[0].message.content.strip() else: # New Google GenAI SDK - contextual multimodal async call config = genai.types.GenerateContentConfig( temperature=temperature, ) if max_tokens: config.max_output_tokens = max_tokens # Prepare content parts for new SDK new_content_parts = [] # Add text prompt new_content_parts.append(genai.types.Part.from_text(full_prompt)) # Convert PIL image parts to new SDK format for img in image_parts: # Convert PIL image to bytes buffer = io.BytesIO() img.save(buffer, format='PNG') image_data = buffer.getvalue() # Add as image part in new SDK format new_content_parts.append(genai.types.Part.from_bytes(image_data, mime_type='image/png')) # Make async call to new GenAI SDK response = await get_gemini_client().aio.models.generate_content( model=actual_model, contents=new_content_parts, config=config ) result = LLMService._extract_text_from_new_genai_response(response) if attempt > 0: logger.info(f"Contextual multimodal generation succeeded on attempt {attempt_num}/{max_retries}") print(f"✅ Generated contextual response with visual context using {provider}") print(f"🔍 LLM RESULT DEBUG:") print(f" - Result type: {type(result)}") print(f" - Result length: {len(result) if result else 0} characters") print(f" - Result preview: '{result[:200] if result else 'EMPTY'}...'") print(f" - Result repr: {repr(result[:50]) if result else 'NONE'}") await LLMService._record_usage(response, provider, actual_model, _start_time, attempt) return result except genai_errors.APIError as e: # Google GenAI SDK specific error handling last_error = e error_code = getattr(e, 'code', 'unknown') error_message = getattr(e, 'message', str(e)) or str(e) or repr(e) logger.warning(f"Contextual multimodal attempt {attempt_num}/{max_retries} failed: [Google API {error_code}] {error_message}") # Retryable: 429 rate limit, 500+ server errors is_retryable = ( error_code == 429 or (isinstance(error_code, int) and error_code >= 500) ) if is_retryable: if attempt < max_retries - 1: wait_time = 2 ** attempt logger.info(f"Retryable Google API error. Waiting {wait_time}s before retry {attempt_num + 1}/{max_retries}") await asyncio.sleep(wait_time) continue else: logger.error(f"Retryable Google API error [{error_code}] but max retries ({max_retries}) reached") else: logger.error(f"Non-retryable Google API error [{error_code}]: {error_message}") break except Exception as e: # Fallback for non-Google errors last_error = e error_message = str(e).lower() logger.warning(f"Contextual multimodal attempt {attempt_num}/{max_retries} failed: {str(e)}") # Check if this is a retryable error if ("500" in error_message or "internal error" in error_message or "internal server error" in error_message or "service unavailable" in error_message or "timeout" in error_message or "rate" in error_message): if attempt < max_retries - 1: wait_time = 2 ** attempt logger.info(f"Retryable error detected. Waiting {wait_time} seconds before retry {attempt_num + 1}/{max_retries}") await asyncio.sleep(wait_time) continue else: logger.error(f"Retryable error detected but max retries ({max_retries}) reached") else: logger.error(f"Non-retryable error detected: {str(e)}") break # If multimodal failed, raise the error error_detail = "" if isinstance(last_error, genai_errors.APIError): error_code = getattr(last_error, 'code', 'unknown') error_msg = getattr(last_error, 'message', str(last_error)) or str(last_error) or repr(last_error) error_detail = f"[Google API {error_code}] {error_msg}" else: error_detail = str(last_error) logger.error(f"Contextual multimodal generation failed after {max_retries} attempts. Final error: {error_detail}") raise LLMServiceError(f"Error generating contextual multimodal content: {error_detail}") else: # No images, use standard text generation print(f"📝 Using text-only generation (no visual context)") return await LLMService.generate_content( prompt=full_prompt, temperature=temperature, max_tokens=max_tokens, model_name=model_name, reasoning_effort=reasoning_effort, verbosity=verbosity )