semblance-dev/backend/app/services/llm_service.py
Vadym Samoilenko ad619d45fc Improve live token extraction: warn on missing usage_metadata, capture thinking tokens
- Add WARNING log when usage_metadata/usage is None so zero-cost events
  are visible in logs instead of silently disappearing
- Capture thoughts_token_count from Gemini thinking models into reasoning field
  (already included in candidates_token_count for billing, now also tracked separately)
- Add same warning for OpenAI missing usage object

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-24 19:13:39 +01:00

1097 lines
No EOL
48 KiB
Python
Executable file

"""
LLM Service for Synthetic Society
This service provides a centralized interface for interacting with language models
through the Google Generative AI API. It supports various prompting functions for
different application features.
"""
import os
import json
import asyncio
import logging
import base64
import traceback
import time
from google import genai
from google.genai import errors as genai_errors
from openai import AsyncOpenAI
import httpx
from typing import Dict, Any, Optional, Union, List
from PIL import Image
import io
# Set up API keys — must be set in environment, no hardcoded fallbacks
def _require_env(key: str) -> str:
value = os.environ.get(key)
if not value:
raise RuntimeError(f"Required environment variable '{key}' is not set. Set it in backend/.env before starting the server.")
return value
GEMINI_API_KEY = _require_env('GEMINI_API_KEY')
OPENAI_API_KEY = _require_env('OPENAI_API_KEY')
def get_gemini_client():
"""Create a fresh Gemini client for each call.
Creating a new client per call avoids event loop mismatch issues that occur
when caching clients in ASGI environments where requests may come on different
event loops. The overhead is minimal compared to the LLM API call.
Force httpx transport to avoid aiohttp AssertionError (connector is None)
that occurs when aiohttp is installed in the environment via other packages.
"""
from google.genai import types as genai_types
return genai.Client(
api_key=GEMINI_API_KEY,
http_options=genai_types.HttpOptions(
httpx_async_client=httpx.AsyncClient(timeout=600.0)
)
)
def get_openai_client():
"""Create a fresh OpenAI client for each call.
Creating a new client per call avoids event loop mismatch issues that occur
when caching clients in ASGI environments where requests may come on different
event loops. The overhead is minimal compared to the LLM API call.
"""
return AsyncOpenAI(api_key=OPENAI_API_KEY, timeout=600.0)
# The default model we're using
DEFAULT_MODEL = "gemini-3.1-pro-preview"
# Supported models
SUPPORTED_MODELS = {
'gemini-3.1-pro-preview': 'gemini',
'gpt-5.4-2026-03-05': 'openai',
}
# Aliases for renamed/legacy model IDs stored in the database
MODEL_ALIASES = {
'gpt-5': 'gpt-5.4-2026-03-05',
'gpt-5.2': 'gpt-5.4-2026-03-05',
'gemini-3-pro-preview': 'gemini-3.1-pro-preview',
'gpt-4.1': 'gemini-3.1-pro-preview',
}
class LLMServiceError(Exception):
"""Exception raised for errors in LLM operations."""
pass
class LLMService:
"""Centralized service for LLM operations."""
@staticmethod
def _extract_responses_api_content(response) -> str:
"""
Extract text content from OpenAI Responses API response.
Args:
response: The response object from OpenAI Responses API
Returns:
The extracted text content
"""
result = ""
# Try to extract from output structure
if hasattr(response, 'output') and response.output:
for item in response.output:
if hasattr(item, 'content') and item.content is not None:
for content in item.content:
if hasattr(content, 'text'):
result += content.text
# Fallback to output_text if available
if not result and hasattr(response, 'output_text'):
result = response.output_text
# Additional fallback - try direct text access
if not result and hasattr(response, 'text'):
result = response.text
return result.strip()
@staticmethod
def _resolve_model(model_name: Optional[str] = None) -> str:
"""
Resolve a model name, applying aliases for legacy/renamed models.
Args:
model_name: Optional model name to use. Defaults to the default model.
Returns:
The resolved model name
"""
actual_model = model_name or DEFAULT_MODEL
return MODEL_ALIASES.get(actual_model, actual_model)
@staticmethod
def _get_model_provider(model_name: Optional[str] = None) -> str:
"""
Get the provider for a given model name.
Args:
model_name: Optional model name to use. Defaults to the default model.
Returns:
The provider name ('gemini' or 'openai')
"""
actual_model = LLMService._resolve_model(model_name)
return SUPPORTED_MODELS.get(actual_model, 'gemini')
@staticmethod
def _extract_text_from_new_genai_response(response) -> str:
"""
Extract text from a new Google GenAI SDK response.
Args:
response: The response object from the new Google GenAI SDK
Returns:
The extracted text content
Raises:
LLMServiceError: If no text content can be extracted
"""
try:
# New SDK has a simpler text attribute
if hasattr(response, 'text') and response.text:
return response.text.strip()
# If that doesn't work, check for candidates structure
if hasattr(response, 'candidates') and response.candidates:
for candidate in response.candidates:
if hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
text_parts = []
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
text_parts.append(part.text)
if text_parts:
return ''.join(text_parts).strip()
# If no text found, check if the response object has direct text content
if hasattr(response, 'content') and response.content:
return str(response.content).strip()
raise LLMServiceError("Unable to extract text from new GenAI SDK response")
except Exception as e:
if isinstance(e, LLMServiceError):
raise
raise LLMServiceError(f"Error extracting text from new GenAI SDK response: {str(e)}")
@staticmethod
def _extract_usage_metadata(response, provider: str) -> dict:
"""Extract token counts from a provider response. All fields default to 0."""
_log = logging.getLogger(__name__)
if provider == 'gemini':
um = getattr(response, 'usage_metadata', None)
if um is None:
_log.warning("Gemini response missing usage_metadata — token counts will be 0, cost recorded as $0")
return {'prompt': 0, 'completion': 0, 'cached': 0, 'reasoning': 0}
# thoughts_token_count (thinking models) is already included in candidates_token_count.
# Capture it separately so the stored event can show the split.
thoughts = getattr(um, 'thoughts_token_count', 0) or 0
return {
'prompt': getattr(um, 'prompt_token_count', 0) or 0,
'completion': getattr(um, 'candidates_token_count', 0) or 0,
'cached': getattr(um, 'cached_content_token_count', 0) or 0,
'reasoning': thoughts,
}
elif provider == 'openai':
usage = getattr(response, 'usage', None)
if usage is None:
_log.warning("OpenAI response missing usage — token counts will be 0, cost recorded as $0")
return {'prompt': 0, 'completion': 0, 'cached': 0, 'reasoning': 0}
# Responses API (gpt-5.4-2026-03-05)
if hasattr(usage, 'input_tokens'):
input_details = getattr(usage, 'input_tokens_details', None)
output_details = getattr(usage, 'output_tokens_details', None)
return {
'prompt': getattr(usage, 'input_tokens', 0) or 0,
'completion': getattr(usage, 'output_tokens', 0) or 0,
'cached': getattr(input_details, 'cached_tokens', 0) or 0 if input_details else 0,
'reasoning': getattr(output_details, 'reasoning_tokens', 0) or 0 if output_details else 0,
}
# Chat Completions API
prompt_details = getattr(usage, 'prompt_tokens_details', None)
return {
'prompt': getattr(usage, 'prompt_tokens', 0) or 0,
'completion': getattr(usage, 'completion_tokens', 0) or 0,
'cached': getattr(prompt_details, 'cached_tokens', 0) or 0 if prompt_details else 0,
'reasoning': 0,
}
return {'prompt': 0, 'completion': 0, 'cached': 0, 'reasoning': 0}
@staticmethod
async def _record_usage(response, provider: str, model: str, start_time: float, retry_count: int) -> None:
"""Record a usage event after a successful LLM call. Never raises."""
try:
from app.services.llm_usage_context import current_context
from app.models.usage_event import UsageEvent
from app.models.model_pricing import ModelPricing
ctx = current_context()
tokens = LLMService._extract_usage_metadata(response, provider)
pricing = await ModelPricing.current_for(model)
cost = ModelPricing.compute_cost(
pricing,
prompt_tokens=tokens['prompt'],
completion_tokens=tokens['completion'],
cached_tokens=tokens['cached'],
)
price_id = pricing.get('_id') if pricing else None
await UsageEvent.record(
provider=provider,
model=model,
prompt_tokens=tokens['prompt'],
completion_tokens=tokens['completion'],
cached_tokens=tokens['cached'],
reasoning_tokens=tokens['reasoning'],
cost_usd=cost,
price_snapshot_id=price_id,
duration_ms=int((time.monotonic() - start_time) * 1000),
retry_count=retry_count,
status="success",
user_id=ctx.user_id,
focus_group_id=ctx.focus_group_id,
persona_id=ctx.persona_id,
feature=ctx.feature,
task_id=ctx.task_id,
)
# Notify focus group room of cost delta (non-fatal)
try:
if ctx.focus_group_id:
from app.models.focus_group import emit_websocket_event
asyncio.create_task(emit_websocket_event(
'usage_update',
ctx.focus_group_id,
{
'cost_delta': cost.get('total', 0),
'tokens_delta': tokens['prompt'] + tokens['completion'],
'feature': ctx.feature,
}
))
except Exception:
pass
except Exception:
logging.getLogger(__name__).warning("_record_usage failed (non-fatal)", exc_info=True)
@staticmethod
async def generate_content(
prompt: str,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
model_name: Optional[str] = None,
system_prompt: Optional[str] = None,
reasoning_effort: Optional[str] = None,
verbosity: Optional[str] = None
) -> str:
"""
Generate content using the LLM with retry mechanism for transient errors.
Args:
prompt: The prompt to send to the model
temperature: Controls randomness (0.0 = deterministic, 1.0 = creative)
max_tokens: Maximum number of tokens to generate
model_name: Optional model name to use
system_prompt: Optional system prompt to define the role of the AI
reasoning_effort: GPT-5.2 only - Controls thinking time (minimal/low/medium/high)
verbosity: GPT-5.2 only - Controls response length (low/medium/high)
Returns:
The generated text response
Raises:
LLMServiceError: If there's an issue with the generation
"""
logger = logging.getLogger(__name__)
max_retries = 3
last_error = None
# Quota pre-flight — raises QuotaExceededError if over limit
try:
from app.models.quota import check_quota, QuotaExceededError as _QuotaExceededError
from app.services.llm_usage_context import current_context as _ctx
_c = _ctx()
await check_quota(_c.user_id, _c.focus_group_id)
except Exception as _qe:
from app.models.quota import QuotaExceededError as _QEE
if isinstance(_qe, _QEE):
raise
pass # Non-fatal: DB failures must not block LLM calls
actual_model = LLMService._resolve_model(model_name)
provider = LLMService._get_model_provider(model_name)
_start_time = time.monotonic()
for attempt in range(max_retries):
attempt_num = attempt + 1
logger.debug(f"LLM content generation attempt {attempt_num}/{max_retries} using {provider} provider")
try:
if provider == 'openai':
if actual_model == 'gpt-5.4-2026-03-05':
# Use OpenAI Responses API for gpt-5.4-2026-03-05
input_content = prompt
if system_prompt:
input_content = f"System: {system_prompt}\n\nUser: {prompt}"
kwargs = {
"model": actual_model,
"input": input_content,
}
# Add reasoning configuration
reasoning_config = {}
if reasoning_effort:
reasoning_config["effort"] = reasoning_effort
else:
reasoning_config["effort"] = "low" # Default
kwargs["reasoning"] = reasoning_config
# Add text configuration with verbosity
text_config = {
"format": {"type": "text"}
}
if verbosity:
text_config["verbosity"] = verbosity
else:
text_config["verbosity"] = "medium" # Default
kwargs["text"] = text_config
# Note: GPT-5 Responses API does not support max_tokens parameter
response = await get_openai_client().responses.create(**kwargs)
result = LLMService._extract_responses_api_content(response)
else:
# Use Chat Completions API for non-GPT-5 models
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
kwargs = {
"model": actual_model,
"messages": messages,
"temperature": temperature,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
response = await get_openai_client().chat.completions.create(**kwargs)
result = response.choices[0].message.content.strip()
else:
# New Google GenAI SDK - async call
config = genai.types.GenerateContentConfig(
temperature=temperature,
)
if max_tokens:
config.max_output_tokens = max_tokens
# Prepare the prompt - combine system prompt with user prompt if needed
if system_prompt:
combined_prompt = f"System: {system_prompt}\n\nUser: {prompt}"
else:
combined_prompt = prompt
# Make async call to new GenAI SDK
response = await get_gemini_client().aio.models.generate_content(
model=actual_model,
contents=combined_prompt,
config=config
)
# Extract text from new SDK response
result = LLMService._extract_text_from_new_genai_response(response)
if attempt > 0:
logger.info(f"LLM content generation succeeded on attempt {attempt_num}/{max_retries}")
await LLMService._record_usage(response, provider, actual_model, _start_time, attempt)
return result
except genai_errors.APIError as e:
# Google GenAI SDK specific error handling
last_error = e
error_code = getattr(e, 'code', 'unknown')
error_message = getattr(e, 'message', str(e)) or str(e) or repr(e)
logger.warning(f"LLM attempt {attempt_num}/{max_retries} failed: [Google API {error_code}] {error_message}")
# Retryable: 429 rate limit, 500+ server errors
is_retryable = (
error_code == 429 or
(isinstance(error_code, int) and error_code >= 500)
)
if is_retryable:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
logger.info(f"Retryable Google API error. Waiting {wait_time}s before retry {attempt_num + 1}/{max_retries}")
await asyncio.sleep(wait_time)
continue
else:
logger.error(f"Retryable Google API error [{error_code}] but max retries ({max_retries}) reached")
else:
# 400, 403, 404, etc. - non-retryable
logger.error(f"Non-retryable Google API error [{error_code}]: {error_message}")
break
except Exception as e:
# Fallback for OpenAI and other non-Google errors
last_error = e
# Debug: capture full exception details
exc_type = type(e).__name__
exc_module = type(e).__module__
exc_str = str(e)
exc_repr = repr(e)
exc_args = getattr(e, 'args', ())
exc_dict = getattr(e, '__dict__', {})
exc_tb = traceback.format_exc()
logger.warning(f"LLM attempt {attempt_num}/{max_retries} failed - Type: {exc_module}.{exc_type}, str: '{exc_str}', repr: {exc_repr}, args: {exc_args}, dict: {exc_dict}")
logger.warning(f"Full traceback:\n{exc_tb}")
error_message = exc_str.lower() if exc_str else exc_repr.lower()
# Check if this is a retryable error (API internal errors, rate limiting, etc.)
if ("500" in error_message or
"internal error" in error_message or
"internal server error" in error_message or
"service unavailable" in error_message or
"timeout" in error_message or
"rate" in error_message):
if attempt < max_retries - 1:
# Wait before retrying (exponential backoff)
wait_time = 2 ** attempt # 1s, 2s, 4s
logger.info(f"Retryable error detected. Waiting {wait_time} seconds before retry {attempt_num + 1}/{max_retries}")
await asyncio.sleep(wait_time)
continue
else:
logger.error(f"Retryable error detected but max retries ({max_retries}) reached")
else:
logger.error(f"Non-retryable error detected: {str(e)}")
break
# If we've exhausted all retries or hit a non-retryable error, raise the last error
error_detail = ""
if isinstance(last_error, genai_errors.APIError):
error_code = getattr(last_error, 'code', 'unknown')
error_msg = getattr(last_error, 'message', str(last_error)) or str(last_error) or repr(last_error)
error_detail = f"[Google API {error_code}] {error_msg}"
else:
# Use repr if str is empty
error_detail = str(last_error) or repr(last_error) or f"{type(last_error).__module__}.{type(last_error).__name__}: {getattr(last_error, 'args', ())}"
logger.error(f"LLM content generation failed after {max_retries} attempts. Final error: {error_detail}")
raise LLMServiceError(f"Error generating content: {error_detail}")
@staticmethod
def parse_json_response(response_text: str) -> Union[Dict[str, Any], List[Any]]:
"""
Parse a JSON response from the LLM.
Args:
response_text: The text response from the LLM
Returns:
A dictionary or list parsed from the JSON response
Raises:
LLMServiceError: If there's an issue parsing the JSON
"""
try:
# Handle common formatting issues in the response
clean_response = response_text
# Remove markdown code blocks if present
if clean_response.startswith("```json"):
clean_response = clean_response.strip("```json").strip("```").strip()
elif clean_response.startswith("```"):
clean_response = clean_response.strip("```").strip()
# Parse the JSON
return json.loads(clean_response)
except json.JSONDecodeError as e:
error_msg = f"Failed to parse JSON response: {str(e)}. Raw response: {response_text[:200]}..."
logging.getLogger(__name__).error(error_msg)
raise LLMServiceError(error_msg)
@staticmethod
async def generate_structured_response(
prompt: str,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
model_name: Optional[str] = None,
system_prompt: Optional[str] = None,
reasoning_effort: Optional[str] = None,
verbosity: Optional[str] = None
) -> Dict[str, Any]:
"""
Generate a structured JSON response using the LLM.
Args:
prompt: The prompt to send to the model
temperature: Controls randomness in generation
max_tokens: Maximum tokens to generate
model_name: Optional model name to use
system_prompt: Optional system prompt to define the role of the AI
reasoning_effort: GPT-5.2 only - Controls thinking time (minimal/low/medium/high)
verbosity: GPT-5.2 only - Controls response length (low/medium/high)
Returns:
A dictionary parsed from the JSON response
Raises:
LLMServiceError: If there's an issue with generation or parsing
"""
response_text = await LLMService.generate_content(
prompt=prompt,
temperature=temperature,
max_tokens=max_tokens,
model_name=model_name,
system_prompt=system_prompt,
reasoning_effort=reasoning_effort,
verbosity=verbosity
)
return LLMService.parse_json_response(response_text)
@staticmethod
async def generate_structured_array(
prompt: str,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
model_name: Optional[str] = None,
system_prompt: Optional[str] = None,
reasoning_effort: Optional[str] = None,
verbosity: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Generate a structured JSON array response using the LLM.
Args:
prompt: The prompt to send to the model
temperature: Controls randomness in generation
max_tokens: Maximum tokens to generate
model_name: Optional model name to use
system_prompt: Optional system prompt to define the role of the AI
reasoning_effort: GPT-5.2 only - Controls thinking time (minimal/low/medium/high)
verbosity: GPT-5.2 only - Controls response length (low/medium/high)
Returns:
A list of dictionaries parsed from the JSON array response
Raises:
LLMServiceError: If there's an issue with generation or parsing
"""
response_text = await LLMService.generate_content(
prompt=prompt,
temperature=temperature,
max_tokens=max_tokens,
model_name=model_name,
system_prompt=system_prompt,
reasoning_effort=reasoning_effort,
verbosity=verbosity
)
result = LLMService.parse_json_response(response_text)
# Ensure the result is a list
if not isinstance(result, list):
raise LLMServiceError(f"Expected a JSON array but received {type(result)}")
return result
@staticmethod
async def generate_multimodal_content(
prompt: str,
image_paths: List[str],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
model_name: Optional[str] = None
) -> str:
"""
Generate content using both text and image inputs.
Args:
prompt: The text prompt to send to the model
image_paths: List of paths to image files to include
temperature: Controls randomness in generation
max_tokens: Maximum tokens to generate
model_name: Optional model name to use
Returns:
The generated text response
Raises:
LLMServiceError: If there's an issue with generation or image processing
"""
logger = logging.getLogger(__name__)
max_retries = 3
last_error = None
# Quota pre-flight — raises QuotaExceededError if over limit
try:
from app.models.quota import check_quota, QuotaExceededError as _QuotaExceededError
from app.services.llm_usage_context import current_context as _ctx
_c = _ctx()
await check_quota(_c.user_id, _c.focus_group_id)
except Exception as _qe:
from app.models.quota import QuotaExceededError as _QEE
if isinstance(_qe, _QEE):
raise
pass # Non-fatal: DB failures must not block LLM calls
actual_model = LLMService._resolve_model(model_name)
provider = LLMService._get_model_provider(model_name)
logger.info(f"Generating multimodal content with {len(image_paths)} image(s) using {provider} provider")
_start_time = time.monotonic()
for attempt in range(max_retries):
attempt_num = attempt + 1
logger.debug(f"Multimodal content generation attempt {attempt_num}/{max_retries}")
try:
if provider == 'openai':
# OpenAI multimodal API call
import base64
# Prepare image content for OpenAI API
image_content = []
for image_path in image_paths:
if not os.path.exists(image_path):
raise LLMServiceError(f"Image file not found: {image_path}")
# Encode image to base64
with open(image_path, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode('utf-8')
# Get image format
image_format = image_path.lower().split('.')[-1]
if image_format == 'jpg':
image_format = 'jpeg'
image_content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/{image_format};base64,{base64_image}"
}
})
logger.debug(f"Successfully loaded image for OpenAI: {image_path}")
if actual_model == 'gpt-5.4-2026-03-05':
# Use Responses API for gpt-5.4-2026-03-05 multimodal
# Note: GPT-5 Responses API supports multimodal input
input_content = [{"role": "user", "content": [{"type": "input_text", "text": prompt}]}]
# Add images to the content array
for img_content in image_content:
input_content[0]["content"].append({
"type": "input_image",
"image_url": img_content["image_url"]["url"]
})
kwargs = {
"model": actual_model,
"input": input_content,
"reasoning": {"effort": "low"}, # Default reasoning for multimodal
"text": {
"verbosity": "medium", # Default verbosity for multimodal
"format": {"type": "text"}
}
}
# Note: GPT-5 Responses API does not support max_tokens parameter
response = await get_openai_client().responses.create(**kwargs)
result = LLMService._extract_responses_api_content(response)
else:
# Use Chat Completions API for non-GPT-5 models
content = [{"type": "text", "text": prompt}]
content.extend(image_content)
kwargs = {
"model": actual_model,
"messages": [{"role": "user", "content": content}],
"temperature": temperature,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
response = await get_openai_client().chat.completions.create(**kwargs)
result = response.choices[0].message.content.strip()
else:
# New Google GenAI SDK - multimodal async call
config = genai.types.GenerateContentConfig(
temperature=temperature,
)
if max_tokens:
config.max_output_tokens = max_tokens
# Prepare multimodal content for new SDK
content_parts = []
# Add text prompt
content_parts.append(genai.types.Part.from_text(prompt))
# Add images
for image_path in image_paths:
try:
if not os.path.exists(image_path):
raise LLMServiceError(f"Image file not found: {image_path}")
# Read image data for new SDK
with open(image_path, 'rb') as img_file:
image_data = img_file.read()
# Determine MIME type from file extension
ext = os.path.splitext(image_path)[1].lower()
mime_type = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp'
}.get(ext, 'image/jpeg') # Default to JPEG
content_parts.append(genai.types.Part.from_bytes(image_data, mime_type=mime_type))
logger.debug(f"Successfully loaded image for new GenAI SDK: {image_path}")
except Exception as e:
raise LLMServiceError(f"Failed to load image {image_path}: {str(e)}")
# Make async call to new GenAI SDK with multimodal content
response = await get_gemini_client().aio.models.generate_content(
model=actual_model,
contents=content_parts,
config=config
)
# Extract text from new SDK response
result = LLMService._extract_text_from_new_genai_response(response)
if attempt > 0:
logger.info(f"Multimodal content generation succeeded on attempt {attempt_num}/{max_retries}")
await LLMService._record_usage(response, provider, actual_model, _start_time, attempt)
return result
except Exception as e:
last_error = e
error_message = str(e).lower()
logger.warning(f"Multimodal attempt {attempt_num}/{max_retries} failed: {str(e)}")
# Check if this is a retryable error
if ("500" in error_message or
"internal error" in error_message or
"internal server error" in error_message or
"service unavailable" in error_message or
"timeout" in error_message or
"rate" in error_message):
if attempt < max_retries - 1:
# Wait before retrying (exponential backoff)
wait_time = 2 ** attempt # 1s, 2s, 4s
logger.info(f"Retryable error detected. Waiting {wait_time} seconds before retry {attempt_num + 1}/{max_retries}")
await asyncio.sleep(wait_time)
continue
else:
logger.error(f"Retryable error detected but max retries ({max_retries}) reached")
else:
logger.error(f"Non-retryable error detected: {str(e)}")
break
# If we've exhausted all retries or hit a non-retryable error, raise the last error
logger.error(f"Multimodal content generation failed after {max_retries} attempts. Final error: {str(last_error)}")
raise LLMServiceError(f"Error generating multimodal content: {str(last_error)}")
@staticmethod
async def generate_contextual_response(
prompt: str,
conversation_context: List[Dict[str, Any]],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
model_name: Optional[str] = None,
reasoning_effort: Optional[str] = None,
verbosity: Optional[str] = None
) -> str:
"""
Generate content using conversation context that may include both text and images in sequence.
Args:
prompt: The main prompt for the LLM
conversation_context: List of context items (text and image) in chronological order
temperature: Controls randomness in generation
max_tokens: Maximum tokens to generate
model_name: Optional model name to use
reasoning_effort: GPT-5.2 only - Controls thinking time (minimal/low/medium/high)
verbosity: GPT-5.2 only - Controls response length (low/medium/high)
Returns:
The generated text response
Raises:
LLMServiceError: If there's an issue with generation
"""
logger = logging.getLogger(__name__)
# Quota pre-flight — raises QuotaExceededError if over limit
try:
from app.models.quota import check_quota, QuotaExceededError as _QuotaExceededError
from app.services.llm_usage_context import current_context as _ctx
_c = _ctx()
await check_quota(_c.user_id, _c.focus_group_id)
except Exception as _qe:
from app.models.quota import QuotaExceededError as _QEE
if isinstance(_qe, _QEE):
raise
pass # Non-fatal: DB failures must not block LLM calls
# Separate text and image content from the conversation context
text_context_parts = []
image_parts = []
print(f"🎯 Processing {len(conversation_context)} context items for LLM")
for item in conversation_context:
if item["type"] == "text":
text_context_parts.append(item["content"])
elif item["type"] == "image":
try:
image_path = item["path"]
if os.path.exists(image_path):
# Load image using PIL
with Image.open(image_path) as img:
# Convert to RGB if necessary
if img.mode != 'RGB':
img = img.convert('RGB')
image_parts.append(img.copy())
print(f"🖼️ Loaded image for context: {item['filename']}")
else:
print(f"⚠️ Image not found for context: {image_path}")
except Exception as e:
print(f"❌ Failed to load image for context: {item['path']}: {e}")
# Build the full context prompt
context_prompt = ""
if text_context_parts:
context_prompt = "CONVERSATION CONTEXT:\n" + "\n".join(text_context_parts) + "\n\n"
full_prompt = context_prompt + prompt
print(f"📝 Context prompt length: {len(context_prompt)} characters")
print(f"🖼️ Total images in context: {len(image_parts)}")
# If we have images, use multimodal generation
if image_parts:
print(f"🎨 Using multimodal generation with {len(image_parts)} images")
actual_model = LLMService._resolve_model(model_name)
provider = LLMService._get_model_provider(model_name)
max_retries = 3
last_error = None
_start_time = time.monotonic()
for attempt in range(max_retries):
attempt_num = attempt + 1
logger.debug(f"Contextual multimodal generation attempt {attempt_num}/{max_retries}")
try:
if provider == 'openai':
# OpenAI contextual multimodal API call
# Convert PIL images to base64 for OpenAI API
image_content = []
for i, img in enumerate(image_parts):
# Convert PIL image to base64
buffer = io.BytesIO()
img.save(buffer, format='PNG')
base64_image = base64.b64encode(buffer.getvalue()).decode('utf-8')
image_content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{base64_image}"
}
})
if actual_model == 'gpt-5.4-2026-03-05':
# Use Responses API for gpt-5.4-2026-03-05 contextual multimodal
input_content = [{"role": "user", "content": [{"type": "input_text", "text": full_prompt}]}]
# Add images to the content array
for img_content in image_content:
input_content[0]["content"].append({
"type": "input_image",
"image_url": img_content["image_url"]["url"]
})
kwargs = {
"model": actual_model,
"input": input_content,
"reasoning": {"effort": reasoning_effort or "low"},
"text": {
"verbosity": verbosity or "medium",
"format": {"type": "text"}
}
}
# Note: GPT-5 Responses API does not support max_tokens parameter
response = await get_openai_client().responses.create(**kwargs)
result = LLMService._extract_responses_api_content(response)
else:
# Use Chat Completions API for non-GPT-5 models
content = [{"type": "text", "text": full_prompt}]
content.extend(image_content)
kwargs = {
"model": actual_model,
"messages": [{"role": "user", "content": content}],
"temperature": temperature,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
response = await get_openai_client().chat.completions.create(**kwargs)
result = response.choices[0].message.content.strip()
else:
# New Google GenAI SDK - contextual multimodal async call
config = genai.types.GenerateContentConfig(
temperature=temperature,
)
if max_tokens:
config.max_output_tokens = max_tokens
# Prepare content parts for new SDK
new_content_parts = []
# Add text prompt
new_content_parts.append(genai.types.Part.from_text(full_prompt))
# Convert PIL image parts to new SDK format
for img in image_parts:
# Convert PIL image to bytes
buffer = io.BytesIO()
img.save(buffer, format='PNG')
image_data = buffer.getvalue()
# Add as image part in new SDK format
new_content_parts.append(genai.types.Part.from_bytes(image_data, mime_type='image/png'))
# Make async call to new GenAI SDK
response = await get_gemini_client().aio.models.generate_content(
model=actual_model,
contents=new_content_parts,
config=config
)
result = LLMService._extract_text_from_new_genai_response(response)
if attempt > 0:
logger.info(f"Contextual multimodal generation succeeded on attempt {attempt_num}/{max_retries}")
print(f"✅ Generated contextual response with visual context using {provider}")
print(f"🔍 LLM RESULT DEBUG:")
print(f" - Result type: {type(result)}")
print(f" - Result length: {len(result) if result else 0} characters")
print(f" - Result preview: '{result[:200] if result else 'EMPTY'}...'")
print(f" - Result repr: {repr(result[:50]) if result else 'NONE'}")
await LLMService._record_usage(response, provider, actual_model, _start_time, attempt)
return result
except genai_errors.APIError as e:
# Google GenAI SDK specific error handling
last_error = e
error_code = getattr(e, 'code', 'unknown')
error_message = getattr(e, 'message', str(e)) or str(e) or repr(e)
logger.warning(f"Contextual multimodal attempt {attempt_num}/{max_retries} failed: [Google API {error_code}] {error_message}")
# Retryable: 429 rate limit, 500+ server errors
is_retryable = (
error_code == 429 or
(isinstance(error_code, int) and error_code >= 500)
)
if is_retryable:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
logger.info(f"Retryable Google API error. Waiting {wait_time}s before retry {attempt_num + 1}/{max_retries}")
await asyncio.sleep(wait_time)
continue
else:
logger.error(f"Retryable Google API error [{error_code}] but max retries ({max_retries}) reached")
else:
logger.error(f"Non-retryable Google API error [{error_code}]: {error_message}")
break
except Exception as e:
# Fallback for non-Google errors
last_error = e
error_message = str(e).lower()
logger.warning(f"Contextual multimodal attempt {attempt_num}/{max_retries} failed: {str(e)}")
# Check if this is a retryable error
if ("500" in error_message or
"internal error" in error_message or
"internal server error" in error_message or
"service unavailable" in error_message or
"timeout" in error_message or
"rate" in error_message):
if attempt < max_retries - 1:
wait_time = 2 ** attempt
logger.info(f"Retryable error detected. Waiting {wait_time} seconds before retry {attempt_num + 1}/{max_retries}")
await asyncio.sleep(wait_time)
continue
else:
logger.error(f"Retryable error detected but max retries ({max_retries}) reached")
else:
logger.error(f"Non-retryable error detected: {str(e)}")
break
# If multimodal failed, raise the error
error_detail = ""
if isinstance(last_error, genai_errors.APIError):
error_code = getattr(last_error, 'code', 'unknown')
error_msg = getattr(last_error, 'message', str(last_error)) or str(last_error) or repr(last_error)
error_detail = f"[Google API {error_code}] {error_msg}"
else:
error_detail = str(last_error)
logger.error(f"Contextual multimodal generation failed after {max_retries} attempts. Final error: {error_detail}")
raise LLMServiceError(f"Error generating contextual multimodal content: {error_detail}")
else:
# No images, use standard text generation
print(f"📝 Using text-only generation (no visual context)")
return await LLMService.generate_content(
prompt=full_prompt,
temperature=temperature,
max_tokens=max_tokens,
model_name=model_name,
reasoning_effort=reasoning_effort,
verbosity=verbosity
)