- Add WARNING log when usage_metadata/usage is None so zero-cost events are visible in logs instead of silently disappearing - Capture thoughts_token_count from Gemini thinking models into reasoning field (already included in candidates_token_count for billing, now also tracked separately) - Add same warning for OpenAI missing usage object Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1097 lines
No EOL
48 KiB
Python
Executable file
1097 lines
No EOL
48 KiB
Python
Executable file
"""
|
|
LLM Service for Synthetic Society
|
|
This service provides a centralized interface for interacting with language models
|
|
through the Google Generative AI API. It supports various prompting functions for
|
|
different application features.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import asyncio
|
|
import logging
|
|
import base64
|
|
import traceback
|
|
import time
|
|
from google import genai
|
|
from google.genai import errors as genai_errors
|
|
from openai import AsyncOpenAI
|
|
import httpx
|
|
from typing import Dict, Any, Optional, Union, List
|
|
from PIL import Image
|
|
import io
|
|
|
|
# Set up API keys — must be set in environment, no hardcoded fallbacks
|
|
def _require_env(key: str) -> str:
|
|
value = os.environ.get(key)
|
|
if not value:
|
|
raise RuntimeError(f"Required environment variable '{key}' is not set. Set it in backend/.env before starting the server.")
|
|
return value
|
|
|
|
GEMINI_API_KEY = _require_env('GEMINI_API_KEY')
|
|
OPENAI_API_KEY = _require_env('OPENAI_API_KEY')
|
|
|
|
|
|
def get_gemini_client():
|
|
"""Create a fresh Gemini client for each call.
|
|
|
|
Creating a new client per call avoids event loop mismatch issues that occur
|
|
when caching clients in ASGI environments where requests may come on different
|
|
event loops. The overhead is minimal compared to the LLM API call.
|
|
|
|
Force httpx transport to avoid aiohttp AssertionError (connector is None)
|
|
that occurs when aiohttp is installed in the environment via other packages.
|
|
"""
|
|
from google.genai import types as genai_types
|
|
return genai.Client(
|
|
api_key=GEMINI_API_KEY,
|
|
http_options=genai_types.HttpOptions(
|
|
httpx_async_client=httpx.AsyncClient(timeout=600.0)
|
|
)
|
|
)
|
|
|
|
|
|
def get_openai_client():
|
|
"""Create a fresh OpenAI client for each call.
|
|
|
|
Creating a new client per call avoids event loop mismatch issues that occur
|
|
when caching clients in ASGI environments where requests may come on different
|
|
event loops. The overhead is minimal compared to the LLM API call.
|
|
"""
|
|
return AsyncOpenAI(api_key=OPENAI_API_KEY, timeout=600.0)
|
|
|
|
# The default model we're using
|
|
DEFAULT_MODEL = "gemini-3.1-pro-preview"
|
|
|
|
# Supported models
|
|
SUPPORTED_MODELS = {
|
|
'gemini-3.1-pro-preview': 'gemini',
|
|
'gpt-5.4-2026-03-05': 'openai',
|
|
}
|
|
|
|
# Aliases for renamed/legacy model IDs stored in the database
|
|
MODEL_ALIASES = {
|
|
'gpt-5': 'gpt-5.4-2026-03-05',
|
|
'gpt-5.2': 'gpt-5.4-2026-03-05',
|
|
'gemini-3-pro-preview': 'gemini-3.1-pro-preview',
|
|
'gpt-4.1': 'gemini-3.1-pro-preview',
|
|
}
|
|
|
|
class LLMServiceError(Exception):
|
|
"""Exception raised for errors in LLM operations."""
|
|
pass
|
|
|
|
class LLMService:
|
|
"""Centralized service for LLM operations."""
|
|
|
|
@staticmethod
|
|
def _extract_responses_api_content(response) -> str:
|
|
"""
|
|
Extract text content from OpenAI Responses API response.
|
|
|
|
Args:
|
|
response: The response object from OpenAI Responses API
|
|
|
|
Returns:
|
|
The extracted text content
|
|
"""
|
|
result = ""
|
|
|
|
# Try to extract from output structure
|
|
if hasattr(response, 'output') and response.output:
|
|
for item in response.output:
|
|
if hasattr(item, 'content') and item.content is not None:
|
|
for content in item.content:
|
|
if hasattr(content, 'text'):
|
|
result += content.text
|
|
|
|
# Fallback to output_text if available
|
|
if not result and hasattr(response, 'output_text'):
|
|
result = response.output_text
|
|
|
|
# Additional fallback - try direct text access
|
|
if not result and hasattr(response, 'text'):
|
|
result = response.text
|
|
|
|
return result.strip()
|
|
|
|
@staticmethod
|
|
def _resolve_model(model_name: Optional[str] = None) -> str:
|
|
"""
|
|
Resolve a model name, applying aliases for legacy/renamed models.
|
|
|
|
Args:
|
|
model_name: Optional model name to use. Defaults to the default model.
|
|
|
|
Returns:
|
|
The resolved model name
|
|
"""
|
|
actual_model = model_name or DEFAULT_MODEL
|
|
return MODEL_ALIASES.get(actual_model, actual_model)
|
|
|
|
@staticmethod
|
|
def _get_model_provider(model_name: Optional[str] = None) -> str:
|
|
"""
|
|
Get the provider for a given model name.
|
|
|
|
Args:
|
|
model_name: Optional model name to use. Defaults to the default model.
|
|
|
|
Returns:
|
|
The provider name ('gemini' or 'openai')
|
|
"""
|
|
actual_model = LLMService._resolve_model(model_name)
|
|
return SUPPORTED_MODELS.get(actual_model, 'gemini')
|
|
|
|
|
|
@staticmethod
|
|
def _extract_text_from_new_genai_response(response) -> str:
|
|
"""
|
|
Extract text from a new Google GenAI SDK response.
|
|
|
|
Args:
|
|
response: The response object from the new Google GenAI SDK
|
|
|
|
Returns:
|
|
The extracted text content
|
|
|
|
Raises:
|
|
LLMServiceError: If no text content can be extracted
|
|
"""
|
|
try:
|
|
# New SDK has a simpler text attribute
|
|
if hasattr(response, 'text') and response.text:
|
|
return response.text.strip()
|
|
|
|
# If that doesn't work, check for candidates structure
|
|
if hasattr(response, 'candidates') and response.candidates:
|
|
for candidate in response.candidates:
|
|
if hasattr(candidate, 'content') and candidate.content:
|
|
if hasattr(candidate.content, 'parts') and candidate.content.parts:
|
|
text_parts = []
|
|
for part in candidate.content.parts:
|
|
if hasattr(part, 'text') and part.text:
|
|
text_parts.append(part.text)
|
|
if text_parts:
|
|
return ''.join(text_parts).strip()
|
|
|
|
# If no text found, check if the response object has direct text content
|
|
if hasattr(response, 'content') and response.content:
|
|
return str(response.content).strip()
|
|
|
|
raise LLMServiceError("Unable to extract text from new GenAI SDK response")
|
|
|
|
except Exception as e:
|
|
if isinstance(e, LLMServiceError):
|
|
raise
|
|
raise LLMServiceError(f"Error extracting text from new GenAI SDK response: {str(e)}")
|
|
|
|
@staticmethod
|
|
def _extract_usage_metadata(response, provider: str) -> dict:
|
|
"""Extract token counts from a provider response. All fields default to 0."""
|
|
_log = logging.getLogger(__name__)
|
|
if provider == 'gemini':
|
|
um = getattr(response, 'usage_metadata', None)
|
|
if um is None:
|
|
_log.warning("Gemini response missing usage_metadata — token counts will be 0, cost recorded as $0")
|
|
return {'prompt': 0, 'completion': 0, 'cached': 0, 'reasoning': 0}
|
|
# thoughts_token_count (thinking models) is already included in candidates_token_count.
|
|
# Capture it separately so the stored event can show the split.
|
|
thoughts = getattr(um, 'thoughts_token_count', 0) or 0
|
|
return {
|
|
'prompt': getattr(um, 'prompt_token_count', 0) or 0,
|
|
'completion': getattr(um, 'candidates_token_count', 0) or 0,
|
|
'cached': getattr(um, 'cached_content_token_count', 0) or 0,
|
|
'reasoning': thoughts,
|
|
}
|
|
elif provider == 'openai':
|
|
usage = getattr(response, 'usage', None)
|
|
if usage is None:
|
|
_log.warning("OpenAI response missing usage — token counts will be 0, cost recorded as $0")
|
|
return {'prompt': 0, 'completion': 0, 'cached': 0, 'reasoning': 0}
|
|
# Responses API (gpt-5.4-2026-03-05)
|
|
if hasattr(usage, 'input_tokens'):
|
|
input_details = getattr(usage, 'input_tokens_details', None)
|
|
output_details = getattr(usage, 'output_tokens_details', None)
|
|
return {
|
|
'prompt': getattr(usage, 'input_tokens', 0) or 0,
|
|
'completion': getattr(usage, 'output_tokens', 0) or 0,
|
|
'cached': getattr(input_details, 'cached_tokens', 0) or 0 if input_details else 0,
|
|
'reasoning': getattr(output_details, 'reasoning_tokens', 0) or 0 if output_details else 0,
|
|
}
|
|
# Chat Completions API
|
|
prompt_details = getattr(usage, 'prompt_tokens_details', None)
|
|
return {
|
|
'prompt': getattr(usage, 'prompt_tokens', 0) or 0,
|
|
'completion': getattr(usage, 'completion_tokens', 0) or 0,
|
|
'cached': getattr(prompt_details, 'cached_tokens', 0) or 0 if prompt_details else 0,
|
|
'reasoning': 0,
|
|
}
|
|
return {'prompt': 0, 'completion': 0, 'cached': 0, 'reasoning': 0}
|
|
|
|
@staticmethod
|
|
async def _record_usage(response, provider: str, model: str, start_time: float, retry_count: int) -> None:
|
|
"""Record a usage event after a successful LLM call. Never raises."""
|
|
try:
|
|
from app.services.llm_usage_context import current_context
|
|
from app.models.usage_event import UsageEvent
|
|
from app.models.model_pricing import ModelPricing
|
|
|
|
ctx = current_context()
|
|
tokens = LLMService._extract_usage_metadata(response, provider)
|
|
pricing = await ModelPricing.current_for(model)
|
|
cost = ModelPricing.compute_cost(
|
|
pricing,
|
|
prompt_tokens=tokens['prompt'],
|
|
completion_tokens=tokens['completion'],
|
|
cached_tokens=tokens['cached'],
|
|
)
|
|
price_id = pricing.get('_id') if pricing else None
|
|
|
|
await UsageEvent.record(
|
|
provider=provider,
|
|
model=model,
|
|
prompt_tokens=tokens['prompt'],
|
|
completion_tokens=tokens['completion'],
|
|
cached_tokens=tokens['cached'],
|
|
reasoning_tokens=tokens['reasoning'],
|
|
cost_usd=cost,
|
|
price_snapshot_id=price_id,
|
|
duration_ms=int((time.monotonic() - start_time) * 1000),
|
|
retry_count=retry_count,
|
|
status="success",
|
|
user_id=ctx.user_id,
|
|
focus_group_id=ctx.focus_group_id,
|
|
persona_id=ctx.persona_id,
|
|
feature=ctx.feature,
|
|
task_id=ctx.task_id,
|
|
)
|
|
|
|
# Notify focus group room of cost delta (non-fatal)
|
|
try:
|
|
if ctx.focus_group_id:
|
|
from app.models.focus_group import emit_websocket_event
|
|
asyncio.create_task(emit_websocket_event(
|
|
'usage_update',
|
|
ctx.focus_group_id,
|
|
{
|
|
'cost_delta': cost.get('total', 0),
|
|
'tokens_delta': tokens['prompt'] + tokens['completion'],
|
|
'feature': ctx.feature,
|
|
}
|
|
))
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
logging.getLogger(__name__).warning("_record_usage failed (non-fatal)", exc_info=True)
|
|
|
|
@staticmethod
|
|
async def generate_content(
|
|
prompt: str,
|
|
temperature: float = 0.7,
|
|
max_tokens: Optional[int] = None,
|
|
model_name: Optional[str] = None,
|
|
system_prompt: Optional[str] = None,
|
|
reasoning_effort: Optional[str] = None,
|
|
verbosity: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
Generate content using the LLM with retry mechanism for transient errors.
|
|
|
|
Args:
|
|
prompt: The prompt to send to the model
|
|
temperature: Controls randomness (0.0 = deterministic, 1.0 = creative)
|
|
max_tokens: Maximum number of tokens to generate
|
|
model_name: Optional model name to use
|
|
system_prompt: Optional system prompt to define the role of the AI
|
|
reasoning_effort: GPT-5.2 only - Controls thinking time (minimal/low/medium/high)
|
|
verbosity: GPT-5.2 only - Controls response length (low/medium/high)
|
|
|
|
Returns:
|
|
The generated text response
|
|
|
|
Raises:
|
|
LLMServiceError: If there's an issue with the generation
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
max_retries = 3
|
|
last_error = None
|
|
|
|
# Quota pre-flight — raises QuotaExceededError if over limit
|
|
try:
|
|
from app.models.quota import check_quota, QuotaExceededError as _QuotaExceededError
|
|
from app.services.llm_usage_context import current_context as _ctx
|
|
_c = _ctx()
|
|
await check_quota(_c.user_id, _c.focus_group_id)
|
|
except Exception as _qe:
|
|
from app.models.quota import QuotaExceededError as _QEE
|
|
if isinstance(_qe, _QEE):
|
|
raise
|
|
pass # Non-fatal: DB failures must not block LLM calls
|
|
|
|
actual_model = LLMService._resolve_model(model_name)
|
|
provider = LLMService._get_model_provider(model_name)
|
|
_start_time = time.monotonic()
|
|
|
|
for attempt in range(max_retries):
|
|
attempt_num = attempt + 1
|
|
logger.debug(f"LLM content generation attempt {attempt_num}/{max_retries} using {provider} provider")
|
|
|
|
try:
|
|
if provider == 'openai':
|
|
if actual_model == 'gpt-5.4-2026-03-05':
|
|
# Use OpenAI Responses API for gpt-5.4-2026-03-05
|
|
input_content = prompt
|
|
if system_prompt:
|
|
input_content = f"System: {system_prompt}\n\nUser: {prompt}"
|
|
|
|
kwargs = {
|
|
"model": actual_model,
|
|
"input": input_content,
|
|
}
|
|
|
|
# Add reasoning configuration
|
|
reasoning_config = {}
|
|
if reasoning_effort:
|
|
reasoning_config["effort"] = reasoning_effort
|
|
else:
|
|
reasoning_config["effort"] = "low" # Default
|
|
kwargs["reasoning"] = reasoning_config
|
|
|
|
# Add text configuration with verbosity
|
|
text_config = {
|
|
"format": {"type": "text"}
|
|
}
|
|
if verbosity:
|
|
text_config["verbosity"] = verbosity
|
|
else:
|
|
text_config["verbosity"] = "medium" # Default
|
|
kwargs["text"] = text_config
|
|
|
|
# Note: GPT-5 Responses API does not support max_tokens parameter
|
|
|
|
response = await get_openai_client().responses.create(**kwargs)
|
|
result = LLMService._extract_responses_api_content(response)
|
|
|
|
else:
|
|
# Use Chat Completions API for non-GPT-5 models
|
|
messages = []
|
|
if system_prompt:
|
|
messages.append({"role": "system", "content": system_prompt})
|
|
messages.append({"role": "user", "content": prompt})
|
|
|
|
kwargs = {
|
|
"model": actual_model,
|
|
"messages": messages,
|
|
"temperature": temperature,
|
|
}
|
|
|
|
if max_tokens:
|
|
kwargs["max_tokens"] = max_tokens
|
|
|
|
response = await get_openai_client().chat.completions.create(**kwargs)
|
|
result = response.choices[0].message.content.strip()
|
|
|
|
else:
|
|
# New Google GenAI SDK - async call
|
|
config = genai.types.GenerateContentConfig(
|
|
temperature=temperature,
|
|
)
|
|
|
|
if max_tokens:
|
|
config.max_output_tokens = max_tokens
|
|
|
|
# Prepare the prompt - combine system prompt with user prompt if needed
|
|
if system_prompt:
|
|
combined_prompt = f"System: {system_prompt}\n\nUser: {prompt}"
|
|
else:
|
|
combined_prompt = prompt
|
|
|
|
# Make async call to new GenAI SDK
|
|
response = await get_gemini_client().aio.models.generate_content(
|
|
model=actual_model,
|
|
contents=combined_prompt,
|
|
config=config
|
|
)
|
|
|
|
# Extract text from new SDK response
|
|
result = LLMService._extract_text_from_new_genai_response(response)
|
|
|
|
if attempt > 0:
|
|
logger.info(f"LLM content generation succeeded on attempt {attempt_num}/{max_retries}")
|
|
await LLMService._record_usage(response, provider, actual_model, _start_time, attempt)
|
|
return result
|
|
|
|
except genai_errors.APIError as e:
|
|
# Google GenAI SDK specific error handling
|
|
last_error = e
|
|
error_code = getattr(e, 'code', 'unknown')
|
|
error_message = getattr(e, 'message', str(e)) or str(e) or repr(e)
|
|
|
|
logger.warning(f"LLM attempt {attempt_num}/{max_retries} failed: [Google API {error_code}] {error_message}")
|
|
|
|
# Retryable: 429 rate limit, 500+ server errors
|
|
is_retryable = (
|
|
error_code == 429 or
|
|
(isinstance(error_code, int) and error_code >= 500)
|
|
)
|
|
|
|
if is_retryable:
|
|
if attempt < max_retries - 1:
|
|
wait_time = 2 ** attempt
|
|
logger.info(f"Retryable Google API error. Waiting {wait_time}s before retry {attempt_num + 1}/{max_retries}")
|
|
await asyncio.sleep(wait_time)
|
|
continue
|
|
else:
|
|
logger.error(f"Retryable Google API error [{error_code}] but max retries ({max_retries}) reached")
|
|
else:
|
|
# 400, 403, 404, etc. - non-retryable
|
|
logger.error(f"Non-retryable Google API error [{error_code}]: {error_message}")
|
|
break
|
|
|
|
except Exception as e:
|
|
# Fallback for OpenAI and other non-Google errors
|
|
last_error = e
|
|
# Debug: capture full exception details
|
|
exc_type = type(e).__name__
|
|
exc_module = type(e).__module__
|
|
exc_str = str(e)
|
|
exc_repr = repr(e)
|
|
exc_args = getattr(e, 'args', ())
|
|
exc_dict = getattr(e, '__dict__', {})
|
|
exc_tb = traceback.format_exc()
|
|
|
|
logger.warning(f"LLM attempt {attempt_num}/{max_retries} failed - Type: {exc_module}.{exc_type}, str: '{exc_str}', repr: {exc_repr}, args: {exc_args}, dict: {exc_dict}")
|
|
logger.warning(f"Full traceback:\n{exc_tb}")
|
|
|
|
error_message = exc_str.lower() if exc_str else exc_repr.lower()
|
|
|
|
# Check if this is a retryable error (API internal errors, rate limiting, etc.)
|
|
if ("500" in error_message or
|
|
"internal error" in error_message or
|
|
"internal server error" in error_message or
|
|
"service unavailable" in error_message or
|
|
"timeout" in error_message or
|
|
"rate" in error_message):
|
|
|
|
if attempt < max_retries - 1:
|
|
# Wait before retrying (exponential backoff)
|
|
wait_time = 2 ** attempt # 1s, 2s, 4s
|
|
logger.info(f"Retryable error detected. Waiting {wait_time} seconds before retry {attempt_num + 1}/{max_retries}")
|
|
await asyncio.sleep(wait_time)
|
|
continue
|
|
else:
|
|
logger.error(f"Retryable error detected but max retries ({max_retries}) reached")
|
|
else:
|
|
logger.error(f"Non-retryable error detected: {str(e)}")
|
|
break
|
|
|
|
# If we've exhausted all retries or hit a non-retryable error, raise the last error
|
|
error_detail = ""
|
|
if isinstance(last_error, genai_errors.APIError):
|
|
error_code = getattr(last_error, 'code', 'unknown')
|
|
error_msg = getattr(last_error, 'message', str(last_error)) or str(last_error) or repr(last_error)
|
|
error_detail = f"[Google API {error_code}] {error_msg}"
|
|
else:
|
|
# Use repr if str is empty
|
|
error_detail = str(last_error) or repr(last_error) or f"{type(last_error).__module__}.{type(last_error).__name__}: {getattr(last_error, 'args', ())}"
|
|
logger.error(f"LLM content generation failed after {max_retries} attempts. Final error: {error_detail}")
|
|
raise LLMServiceError(f"Error generating content: {error_detail}")
|
|
|
|
@staticmethod
|
|
def parse_json_response(response_text: str) -> Union[Dict[str, Any], List[Any]]:
|
|
"""
|
|
Parse a JSON response from the LLM.
|
|
|
|
Args:
|
|
response_text: The text response from the LLM
|
|
|
|
Returns:
|
|
A dictionary or list parsed from the JSON response
|
|
|
|
Raises:
|
|
LLMServiceError: If there's an issue parsing the JSON
|
|
"""
|
|
try:
|
|
# Handle common formatting issues in the response
|
|
clean_response = response_text
|
|
|
|
# Remove markdown code blocks if present
|
|
if clean_response.startswith("```json"):
|
|
clean_response = clean_response.strip("```json").strip("```").strip()
|
|
elif clean_response.startswith("```"):
|
|
clean_response = clean_response.strip("```").strip()
|
|
|
|
# Parse the JSON
|
|
return json.loads(clean_response)
|
|
|
|
except json.JSONDecodeError as e:
|
|
error_msg = f"Failed to parse JSON response: {str(e)}. Raw response: {response_text[:200]}..."
|
|
logging.getLogger(__name__).error(error_msg)
|
|
raise LLMServiceError(error_msg)
|
|
|
|
@staticmethod
|
|
async def generate_structured_response(
|
|
prompt: str,
|
|
temperature: float = 0.7,
|
|
max_tokens: Optional[int] = None,
|
|
model_name: Optional[str] = None,
|
|
system_prompt: Optional[str] = None,
|
|
reasoning_effort: Optional[str] = None,
|
|
verbosity: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Generate a structured JSON response using the LLM.
|
|
|
|
Args:
|
|
prompt: The prompt to send to the model
|
|
temperature: Controls randomness in generation
|
|
max_tokens: Maximum tokens to generate
|
|
model_name: Optional model name to use
|
|
system_prompt: Optional system prompt to define the role of the AI
|
|
reasoning_effort: GPT-5.2 only - Controls thinking time (minimal/low/medium/high)
|
|
verbosity: GPT-5.2 only - Controls response length (low/medium/high)
|
|
|
|
Returns:
|
|
A dictionary parsed from the JSON response
|
|
|
|
Raises:
|
|
LLMServiceError: If there's an issue with generation or parsing
|
|
"""
|
|
response_text = await LLMService.generate_content(
|
|
prompt=prompt,
|
|
temperature=temperature,
|
|
max_tokens=max_tokens,
|
|
model_name=model_name,
|
|
system_prompt=system_prompt,
|
|
reasoning_effort=reasoning_effort,
|
|
verbosity=verbosity
|
|
)
|
|
|
|
return LLMService.parse_json_response(response_text)
|
|
|
|
@staticmethod
|
|
async def generate_structured_array(
|
|
prompt: str,
|
|
temperature: float = 0.7,
|
|
max_tokens: Optional[int] = None,
|
|
model_name: Optional[str] = None,
|
|
system_prompt: Optional[str] = None,
|
|
reasoning_effort: Optional[str] = None,
|
|
verbosity: Optional[str] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Generate a structured JSON array response using the LLM.
|
|
|
|
Args:
|
|
prompt: The prompt to send to the model
|
|
temperature: Controls randomness in generation
|
|
max_tokens: Maximum tokens to generate
|
|
model_name: Optional model name to use
|
|
system_prompt: Optional system prompt to define the role of the AI
|
|
reasoning_effort: GPT-5.2 only - Controls thinking time (minimal/low/medium/high)
|
|
verbosity: GPT-5.2 only - Controls response length (low/medium/high)
|
|
|
|
Returns:
|
|
A list of dictionaries parsed from the JSON array response
|
|
|
|
Raises:
|
|
LLMServiceError: If there's an issue with generation or parsing
|
|
"""
|
|
response_text = await LLMService.generate_content(
|
|
prompt=prompt,
|
|
temperature=temperature,
|
|
max_tokens=max_tokens,
|
|
model_name=model_name,
|
|
system_prompt=system_prompt,
|
|
reasoning_effort=reasoning_effort,
|
|
verbosity=verbosity
|
|
)
|
|
|
|
result = LLMService.parse_json_response(response_text)
|
|
|
|
# Ensure the result is a list
|
|
if not isinstance(result, list):
|
|
raise LLMServiceError(f"Expected a JSON array but received {type(result)}")
|
|
|
|
return result
|
|
|
|
@staticmethod
|
|
async def generate_multimodal_content(
|
|
prompt: str,
|
|
image_paths: List[str],
|
|
temperature: float = 0.7,
|
|
max_tokens: Optional[int] = None,
|
|
model_name: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
Generate content using both text and image inputs.
|
|
|
|
Args:
|
|
prompt: The text prompt to send to the model
|
|
image_paths: List of paths to image files to include
|
|
temperature: Controls randomness in generation
|
|
max_tokens: Maximum tokens to generate
|
|
model_name: Optional model name to use
|
|
|
|
Returns:
|
|
The generated text response
|
|
|
|
Raises:
|
|
LLMServiceError: If there's an issue with generation or image processing
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
max_retries = 3
|
|
last_error = None
|
|
|
|
# Quota pre-flight — raises QuotaExceededError if over limit
|
|
try:
|
|
from app.models.quota import check_quota, QuotaExceededError as _QuotaExceededError
|
|
from app.services.llm_usage_context import current_context as _ctx
|
|
_c = _ctx()
|
|
await check_quota(_c.user_id, _c.focus_group_id)
|
|
except Exception as _qe:
|
|
from app.models.quota import QuotaExceededError as _QEE
|
|
if isinstance(_qe, _QEE):
|
|
raise
|
|
pass # Non-fatal: DB failures must not block LLM calls
|
|
|
|
actual_model = LLMService._resolve_model(model_name)
|
|
provider = LLMService._get_model_provider(model_name)
|
|
|
|
logger.info(f"Generating multimodal content with {len(image_paths)} image(s) using {provider} provider")
|
|
_start_time = time.monotonic()
|
|
|
|
for attempt in range(max_retries):
|
|
attempt_num = attempt + 1
|
|
logger.debug(f"Multimodal content generation attempt {attempt_num}/{max_retries}")
|
|
|
|
try:
|
|
if provider == 'openai':
|
|
# OpenAI multimodal API call
|
|
import base64
|
|
|
|
# Prepare image content for OpenAI API
|
|
image_content = []
|
|
for image_path in image_paths:
|
|
if not os.path.exists(image_path):
|
|
raise LLMServiceError(f"Image file not found: {image_path}")
|
|
|
|
# Encode image to base64
|
|
with open(image_path, "rb") as image_file:
|
|
base64_image = base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
|
# Get image format
|
|
image_format = image_path.lower().split('.')[-1]
|
|
if image_format == 'jpg':
|
|
image_format = 'jpeg'
|
|
|
|
image_content.append({
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/{image_format};base64,{base64_image}"
|
|
}
|
|
})
|
|
logger.debug(f"Successfully loaded image for OpenAI: {image_path}")
|
|
|
|
if actual_model == 'gpt-5.4-2026-03-05':
|
|
# Use Responses API for gpt-5.4-2026-03-05 multimodal
|
|
# Note: GPT-5 Responses API supports multimodal input
|
|
input_content = [{"role": "user", "content": [{"type": "input_text", "text": prompt}]}]
|
|
# Add images to the content array
|
|
for img_content in image_content:
|
|
input_content[0]["content"].append({
|
|
"type": "input_image",
|
|
"image_url": img_content["image_url"]["url"]
|
|
})
|
|
|
|
kwargs = {
|
|
"model": actual_model,
|
|
"input": input_content,
|
|
"reasoning": {"effort": "low"}, # Default reasoning for multimodal
|
|
"text": {
|
|
"verbosity": "medium", # Default verbosity for multimodal
|
|
"format": {"type": "text"}
|
|
}
|
|
}
|
|
|
|
# Note: GPT-5 Responses API does not support max_tokens parameter
|
|
|
|
response = await get_openai_client().responses.create(**kwargs)
|
|
result = LLMService._extract_responses_api_content(response)
|
|
|
|
else:
|
|
# Use Chat Completions API for non-GPT-5 models
|
|
content = [{"type": "text", "text": prompt}]
|
|
content.extend(image_content)
|
|
|
|
kwargs = {
|
|
"model": actual_model,
|
|
"messages": [{"role": "user", "content": content}],
|
|
"temperature": temperature,
|
|
}
|
|
|
|
if max_tokens:
|
|
kwargs["max_tokens"] = max_tokens
|
|
|
|
response = await get_openai_client().chat.completions.create(**kwargs)
|
|
result = response.choices[0].message.content.strip()
|
|
|
|
else:
|
|
# New Google GenAI SDK - multimodal async call
|
|
config = genai.types.GenerateContentConfig(
|
|
temperature=temperature,
|
|
)
|
|
|
|
if max_tokens:
|
|
config.max_output_tokens = max_tokens
|
|
|
|
# Prepare multimodal content for new SDK
|
|
content_parts = []
|
|
|
|
# Add text prompt
|
|
content_parts.append(genai.types.Part.from_text(prompt))
|
|
|
|
# Add images
|
|
for image_path in image_paths:
|
|
try:
|
|
if not os.path.exists(image_path):
|
|
raise LLMServiceError(f"Image file not found: {image_path}")
|
|
|
|
# Read image data for new SDK
|
|
with open(image_path, 'rb') as img_file:
|
|
image_data = img_file.read()
|
|
|
|
# Determine MIME type from file extension
|
|
ext = os.path.splitext(image_path)[1].lower()
|
|
mime_type = {
|
|
'.jpg': 'image/jpeg',
|
|
'.jpeg': 'image/jpeg',
|
|
'.png': 'image/png',
|
|
'.gif': 'image/gif',
|
|
'.webp': 'image/webp'
|
|
}.get(ext, 'image/jpeg') # Default to JPEG
|
|
|
|
content_parts.append(genai.types.Part.from_bytes(image_data, mime_type=mime_type))
|
|
logger.debug(f"Successfully loaded image for new GenAI SDK: {image_path}")
|
|
|
|
except Exception as e:
|
|
raise LLMServiceError(f"Failed to load image {image_path}: {str(e)}")
|
|
|
|
# Make async call to new GenAI SDK with multimodal content
|
|
response = await get_gemini_client().aio.models.generate_content(
|
|
model=actual_model,
|
|
contents=content_parts,
|
|
config=config
|
|
)
|
|
|
|
# Extract text from new SDK response
|
|
result = LLMService._extract_text_from_new_genai_response(response)
|
|
|
|
if attempt > 0:
|
|
logger.info(f"Multimodal content generation succeeded on attempt {attempt_num}/{max_retries}")
|
|
await LLMService._record_usage(response, provider, actual_model, _start_time, attempt)
|
|
return result
|
|
|
|
except Exception as e:
|
|
last_error = e
|
|
error_message = str(e).lower()
|
|
|
|
logger.warning(f"Multimodal attempt {attempt_num}/{max_retries} failed: {str(e)}")
|
|
|
|
# Check if this is a retryable error
|
|
if ("500" in error_message or
|
|
"internal error" in error_message or
|
|
"internal server error" in error_message or
|
|
"service unavailable" in error_message or
|
|
"timeout" in error_message or
|
|
"rate" in error_message):
|
|
|
|
if attempt < max_retries - 1:
|
|
# Wait before retrying (exponential backoff)
|
|
wait_time = 2 ** attempt # 1s, 2s, 4s
|
|
logger.info(f"Retryable error detected. Waiting {wait_time} seconds before retry {attempt_num + 1}/{max_retries}")
|
|
await asyncio.sleep(wait_time)
|
|
continue
|
|
else:
|
|
logger.error(f"Retryable error detected but max retries ({max_retries}) reached")
|
|
else:
|
|
logger.error(f"Non-retryable error detected: {str(e)}")
|
|
break
|
|
|
|
# If we've exhausted all retries or hit a non-retryable error, raise the last error
|
|
logger.error(f"Multimodal content generation failed after {max_retries} attempts. Final error: {str(last_error)}")
|
|
raise LLMServiceError(f"Error generating multimodal content: {str(last_error)}")
|
|
|
|
@staticmethod
|
|
async def generate_contextual_response(
|
|
prompt: str,
|
|
conversation_context: List[Dict[str, Any]],
|
|
temperature: float = 0.7,
|
|
max_tokens: Optional[int] = None,
|
|
model_name: Optional[str] = None,
|
|
reasoning_effort: Optional[str] = None,
|
|
verbosity: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
Generate content using conversation context that may include both text and images in sequence.
|
|
|
|
Args:
|
|
prompt: The main prompt for the LLM
|
|
conversation_context: List of context items (text and image) in chronological order
|
|
temperature: Controls randomness in generation
|
|
max_tokens: Maximum tokens to generate
|
|
model_name: Optional model name to use
|
|
reasoning_effort: GPT-5.2 only - Controls thinking time (minimal/low/medium/high)
|
|
verbosity: GPT-5.2 only - Controls response length (low/medium/high)
|
|
|
|
Returns:
|
|
The generated text response
|
|
|
|
Raises:
|
|
LLMServiceError: If there's an issue with generation
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Quota pre-flight — raises QuotaExceededError if over limit
|
|
try:
|
|
from app.models.quota import check_quota, QuotaExceededError as _QuotaExceededError
|
|
from app.services.llm_usage_context import current_context as _ctx
|
|
_c = _ctx()
|
|
await check_quota(_c.user_id, _c.focus_group_id)
|
|
except Exception as _qe:
|
|
from app.models.quota import QuotaExceededError as _QEE
|
|
if isinstance(_qe, _QEE):
|
|
raise
|
|
pass # Non-fatal: DB failures must not block LLM calls
|
|
|
|
# Separate text and image content from the conversation context
|
|
text_context_parts = []
|
|
image_parts = []
|
|
|
|
print(f"🎯 Processing {len(conversation_context)} context items for LLM")
|
|
|
|
for item in conversation_context:
|
|
if item["type"] == "text":
|
|
text_context_parts.append(item["content"])
|
|
elif item["type"] == "image":
|
|
try:
|
|
image_path = item["path"]
|
|
if os.path.exists(image_path):
|
|
# Load image using PIL
|
|
with Image.open(image_path) as img:
|
|
# Convert to RGB if necessary
|
|
if img.mode != 'RGB':
|
|
img = img.convert('RGB')
|
|
image_parts.append(img.copy())
|
|
print(f"🖼️ Loaded image for context: {item['filename']}")
|
|
else:
|
|
print(f"⚠️ Image not found for context: {image_path}")
|
|
except Exception as e:
|
|
print(f"❌ Failed to load image for context: {item['path']}: {e}")
|
|
|
|
# Build the full context prompt
|
|
context_prompt = ""
|
|
if text_context_parts:
|
|
context_prompt = "CONVERSATION CONTEXT:\n" + "\n".join(text_context_parts) + "\n\n"
|
|
|
|
full_prompt = context_prompt + prompt
|
|
|
|
print(f"📝 Context prompt length: {len(context_prompt)} characters")
|
|
print(f"🖼️ Total images in context: {len(image_parts)}")
|
|
|
|
# If we have images, use multimodal generation
|
|
if image_parts:
|
|
print(f"🎨 Using multimodal generation with {len(image_parts)} images")
|
|
|
|
actual_model = LLMService._resolve_model(model_name)
|
|
provider = LLMService._get_model_provider(model_name)
|
|
|
|
max_retries = 3
|
|
last_error = None
|
|
_start_time = time.monotonic()
|
|
|
|
for attempt in range(max_retries):
|
|
attempt_num = attempt + 1
|
|
logger.debug(f"Contextual multimodal generation attempt {attempt_num}/{max_retries}")
|
|
|
|
try:
|
|
if provider == 'openai':
|
|
# OpenAI contextual multimodal API call
|
|
|
|
# Convert PIL images to base64 for OpenAI API
|
|
image_content = []
|
|
for i, img in enumerate(image_parts):
|
|
# Convert PIL image to base64
|
|
buffer = io.BytesIO()
|
|
img.save(buffer, format='PNG')
|
|
base64_image = base64.b64encode(buffer.getvalue()).decode('utf-8')
|
|
|
|
image_content.append({
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/png;base64,{base64_image}"
|
|
}
|
|
})
|
|
|
|
if actual_model == 'gpt-5.4-2026-03-05':
|
|
# Use Responses API for gpt-5.4-2026-03-05 contextual multimodal
|
|
input_content = [{"role": "user", "content": [{"type": "input_text", "text": full_prompt}]}]
|
|
# Add images to the content array
|
|
for img_content in image_content:
|
|
input_content[0]["content"].append({
|
|
"type": "input_image",
|
|
"image_url": img_content["image_url"]["url"]
|
|
})
|
|
|
|
kwargs = {
|
|
"model": actual_model,
|
|
"input": input_content,
|
|
"reasoning": {"effort": reasoning_effort or "low"},
|
|
"text": {
|
|
"verbosity": verbosity or "medium",
|
|
"format": {"type": "text"}
|
|
}
|
|
}
|
|
|
|
# Note: GPT-5 Responses API does not support max_tokens parameter
|
|
|
|
response = await get_openai_client().responses.create(**kwargs)
|
|
result = LLMService._extract_responses_api_content(response)
|
|
|
|
else:
|
|
# Use Chat Completions API for non-GPT-5 models
|
|
content = [{"type": "text", "text": full_prompt}]
|
|
content.extend(image_content)
|
|
|
|
kwargs = {
|
|
"model": actual_model,
|
|
"messages": [{"role": "user", "content": content}],
|
|
"temperature": temperature,
|
|
}
|
|
|
|
if max_tokens:
|
|
kwargs["max_tokens"] = max_tokens
|
|
|
|
response = await get_openai_client().chat.completions.create(**kwargs)
|
|
result = response.choices[0].message.content.strip()
|
|
|
|
else:
|
|
# New Google GenAI SDK - contextual multimodal async call
|
|
config = genai.types.GenerateContentConfig(
|
|
temperature=temperature,
|
|
)
|
|
|
|
if max_tokens:
|
|
config.max_output_tokens = max_tokens
|
|
|
|
# Prepare content parts for new SDK
|
|
new_content_parts = []
|
|
|
|
# Add text prompt
|
|
new_content_parts.append(genai.types.Part.from_text(full_prompt))
|
|
|
|
# Convert PIL image parts to new SDK format
|
|
for img in image_parts:
|
|
# Convert PIL image to bytes
|
|
buffer = io.BytesIO()
|
|
img.save(buffer, format='PNG')
|
|
image_data = buffer.getvalue()
|
|
|
|
# Add as image part in new SDK format
|
|
new_content_parts.append(genai.types.Part.from_bytes(image_data, mime_type='image/png'))
|
|
|
|
# Make async call to new GenAI SDK
|
|
response = await get_gemini_client().aio.models.generate_content(
|
|
model=actual_model,
|
|
contents=new_content_parts,
|
|
config=config
|
|
)
|
|
|
|
result = LLMService._extract_text_from_new_genai_response(response)
|
|
|
|
if attempt > 0:
|
|
logger.info(f"Contextual multimodal generation succeeded on attempt {attempt_num}/{max_retries}")
|
|
|
|
print(f"✅ Generated contextual response with visual context using {provider}")
|
|
print(f"🔍 LLM RESULT DEBUG:")
|
|
print(f" - Result type: {type(result)}")
|
|
print(f" - Result length: {len(result) if result else 0} characters")
|
|
print(f" - Result preview: '{result[:200] if result else 'EMPTY'}...'")
|
|
print(f" - Result repr: {repr(result[:50]) if result else 'NONE'}")
|
|
await LLMService._record_usage(response, provider, actual_model, _start_time, attempt)
|
|
return result
|
|
|
|
except genai_errors.APIError as e:
|
|
# Google GenAI SDK specific error handling
|
|
last_error = e
|
|
error_code = getattr(e, 'code', 'unknown')
|
|
error_message = getattr(e, 'message', str(e)) or str(e) or repr(e)
|
|
|
|
logger.warning(f"Contextual multimodal attempt {attempt_num}/{max_retries} failed: [Google API {error_code}] {error_message}")
|
|
|
|
# Retryable: 429 rate limit, 500+ server errors
|
|
is_retryable = (
|
|
error_code == 429 or
|
|
(isinstance(error_code, int) and error_code >= 500)
|
|
)
|
|
|
|
if is_retryable:
|
|
if attempt < max_retries - 1:
|
|
wait_time = 2 ** attempt
|
|
logger.info(f"Retryable Google API error. Waiting {wait_time}s before retry {attempt_num + 1}/{max_retries}")
|
|
await asyncio.sleep(wait_time)
|
|
continue
|
|
else:
|
|
logger.error(f"Retryable Google API error [{error_code}] but max retries ({max_retries}) reached")
|
|
else:
|
|
logger.error(f"Non-retryable Google API error [{error_code}]: {error_message}")
|
|
break
|
|
|
|
except Exception as e:
|
|
# Fallback for non-Google errors
|
|
last_error = e
|
|
error_message = str(e).lower()
|
|
|
|
logger.warning(f"Contextual multimodal attempt {attempt_num}/{max_retries} failed: {str(e)}")
|
|
|
|
# Check if this is a retryable error
|
|
if ("500" in error_message or
|
|
"internal error" in error_message or
|
|
"internal server error" in error_message or
|
|
"service unavailable" in error_message or
|
|
"timeout" in error_message or
|
|
"rate" in error_message):
|
|
|
|
if attempt < max_retries - 1:
|
|
wait_time = 2 ** attempt
|
|
logger.info(f"Retryable error detected. Waiting {wait_time} seconds before retry {attempt_num + 1}/{max_retries}")
|
|
await asyncio.sleep(wait_time)
|
|
continue
|
|
else:
|
|
logger.error(f"Retryable error detected but max retries ({max_retries}) reached")
|
|
else:
|
|
logger.error(f"Non-retryable error detected: {str(e)}")
|
|
break
|
|
|
|
# If multimodal failed, raise the error
|
|
error_detail = ""
|
|
if isinstance(last_error, genai_errors.APIError):
|
|
error_code = getattr(last_error, 'code', 'unknown')
|
|
error_msg = getattr(last_error, 'message', str(last_error)) or str(last_error) or repr(last_error)
|
|
error_detail = f"[Google API {error_code}] {error_msg}"
|
|
else:
|
|
error_detail = str(last_error)
|
|
logger.error(f"Contextual multimodal generation failed after {max_retries} attempts. Final error: {error_detail}")
|
|
raise LLMServiceError(f"Error generating contextual multimodal content: {error_detail}")
|
|
|
|
else:
|
|
# No images, use standard text generation
|
|
print(f"📝 Using text-only generation (no visual context)")
|
|
return await LLMService.generate_content(
|
|
prompt=full_prompt,
|
|
temperature=temperature,
|
|
max_tokens=max_tokens,
|
|
model_name=model_name,
|
|
reasoning_effort=reasoning_effort,
|
|
verbosity=verbosity
|
|
) |