semblance-dev/backend/app/services/llm_service.py

838 lines
No EOL
37 KiB
Python

"""
LLM Service for Synthetic Society
This service provides a centralized interface for interacting with language models
through the Google Generative AI API. It supports various prompting functions for
different application features.
"""
import os
import json
import time
import logging
import google.generativeai as genai
from openai import OpenAI
from typing import Dict, Any, Optional, Union, List
from PIL import Image
import io
# Set up the Gemini API key
GEMINI_API_KEY = os.environ.get('GEMINI_API_KEY', 'AIzaSyAc50jzC3k9K1PmKT1vGFi0sCdhhnqsvl0')
genai.configure(api_key=GEMINI_API_KEY)
# Set up OpenAI API key
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY', 'REDACTED_OPENAI_KEY')
openai_client = OpenAI(api_key=OPENAI_API_KEY)
# The default model we're using
DEFAULT_MODEL = "gemini-2.5-pro"
# Supported models
SUPPORTED_MODELS = {
'gemini-2.5-pro': 'gemini',
'gpt-4.1': 'openai',
'gpt-5': 'openai'
}
class LLMServiceError(Exception):
"""Exception raised for errors in LLM operations."""
pass
class LLMService:
"""Centralized service for LLM operations."""
@staticmethod
def _extract_responses_api_content(response) -> str:
"""
Extract text content from OpenAI Responses API response.
Args:
response: The response object from OpenAI Responses API
Returns:
The extracted text content
"""
result = ""
# Try to extract from output structure
if hasattr(response, 'output') and response.output:
for item in response.output:
if hasattr(item, 'content'):
for content in item.content:
if hasattr(content, 'text'):
result += content.text
# Fallback to output_text if available
if not result and hasattr(response, 'output_text'):
result = response.output_text
# Additional fallback - try direct text access
if not result and hasattr(response, 'text'):
result = response.text
return result.strip()
@staticmethod
def _get_model_provider(model_name: Optional[str] = None) -> str:
"""
Get the provider for a given model name.
Args:
model_name: Optional model name to use. Defaults to the default model.
Returns:
The provider name ('gemini' or 'openai')
"""
actual_model = model_name or DEFAULT_MODEL
return SUPPORTED_MODELS.get(actual_model, 'gemini')
@staticmethod
def get_model(model_name: Optional[str] = None) -> genai.GenerativeModel:
"""
Get a configured Gemini model.
Args:
model_name: Optional model name to use. Defaults to the default model.
Returns:
A configured Gemini generative model
"""
return genai.GenerativeModel(model_name or DEFAULT_MODEL)
@staticmethod
def _extract_text_from_response(response) -> str:
"""
Extract text from a Gemini API response, handling both simple and multi-part responses.
Args:
response: The response object from the Gemini API
Returns:
The extracted text content
Raises:
LLMServiceError: If no text content can be extracted
"""
try:
# Try the simple text accessor first
return response.text.strip()
except Exception:
# If that fails, try to extract from parts using the recommended approach
try:
text_parts = []
# Check if response has direct parts attribute (as suggested in error message)
if hasattr(response, 'parts') and response.parts:
for part in response.parts:
if hasattr(part, 'text'):
text_parts.append(part.text)
# If that didn't work, try the candidates approach
if not text_parts and hasattr(response, 'candidates') and response.candidates:
for candidate in response.candidates:
# Check if finish reason indicates blocking
if candidate.finish_reason == 3:
raise LLMServiceError("Response was blocked for safety reasons")
elif candidate.finish_reason == 4:
raise LLMServiceError("Response was blocked for recitation reasons")
elif candidate.finish_reason == 2:
raise LLMServiceError("Response was cut off due to length limit - try reducing max_tokens or removing the limit")
if hasattr(candidate, 'content') and hasattr(candidate.content, 'parts'):
for part in candidate.content.parts:
if hasattr(part, 'text'):
text_parts.append(part.text)
# Join all text parts if we found any
if text_parts:
return ''.join(text_parts).strip()
# If we still can't extract text, it might be a safety/blocking issue
if hasattr(response, 'candidates') and response.candidates:
finish_reason = response.candidates[0].finish_reason
if finish_reason == 3:
raise LLMServiceError("Response was blocked for safety reasons")
elif finish_reason == 4:
raise LLMServiceError("Response was blocked for recitation reasons")
elif finish_reason == 2:
raise LLMServiceError("Response was cut off due to length limit - try reducing max_tokens or removing the limit")
raise LLMServiceError("Unable to extract text from response parts")
except Exception as e:
raise LLMServiceError(f"Error extracting text from multi-part response: {str(e)}")
@staticmethod
def generate_content(
prompt: str,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
model_name: Optional[str] = None,
system_prompt: Optional[str] = None,
reasoning_effort: Optional[str] = None,
verbosity: Optional[str] = None
) -> str:
"""
Generate content using the LLM with retry mechanism for transient errors.
Args:
prompt: The prompt to send to the model
temperature: Controls randomness (0.0 = deterministic, 1.0 = creative)
max_tokens: Maximum number of tokens to generate
model_name: Optional model name to use
system_prompt: Optional system prompt to define the role of the AI
reasoning_effort: GPT-5 only - Controls thinking time (minimal/low/medium/high)
verbosity: GPT-5 only - Controls response length (low/medium/high)
Returns:
The generated text response
Raises:
LLMServiceError: If there's an issue with the generation
"""
logger = logging.getLogger(__name__)
max_retries = 3
last_error = None
actual_model = model_name or DEFAULT_MODEL
provider = LLMService._get_model_provider(model_name)
for attempt in range(max_retries):
attempt_num = attempt + 1
logger.debug(f"LLM content generation attempt {attempt_num}/{max_retries} using {provider} provider")
try:
if provider == 'openai':
if actual_model == 'gpt-5':
# Use OpenAI Responses API for GPT-5
input_content = prompt
if system_prompt:
input_content = f"System: {system_prompt}\n\nUser: {prompt}"
kwargs = {
"model": actual_model,
"input": input_content,
}
# Add reasoning configuration
reasoning_config = {}
if reasoning_effort:
reasoning_config["effort"] = reasoning_effort
else:
reasoning_config["effort"] = "medium" # Default
kwargs["reasoning"] = reasoning_config
# Add text configuration with verbosity
text_config = {
"format": {"type": "text"}
}
if verbosity:
text_config["verbosity"] = verbosity
else:
text_config["verbosity"] = "medium" # Default
kwargs["text"] = text_config
# Note: GPT-5 Responses API does not support max_tokens parameter
response = openai_client.responses.create(**kwargs)
result = LLMService._extract_responses_api_content(response)
else:
# Use Chat Completions API for non-GPT-5 models
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
kwargs = {
"model": actual_model,
"messages": messages,
"temperature": temperature,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
response = openai_client.chat.completions.create(**kwargs)
result = response.choices[0].message.content.strip()
else:
# Gemini API call (existing logic)
model = LLMService.get_model(model_name)
generation_config = {
"temperature": temperature,
}
if max_tokens:
generation_config["max_output_tokens"] = max_tokens
# If system prompt is provided, use it to create a structured chat
if system_prompt:
# For Gemini models, system prompts need to be passed as part of the user prompt
# as Gemini API doesn't support 'system' role directly
response = model.generate_content(
[
{"role": "user", "parts": [f"System: {system_prompt}\n\nUser: {prompt}"]}
],
generation_config=genai.types.GenerationConfig(**generation_config)
)
else:
# Otherwise use the standard prompt-only approach
response = model.generate_content(
prompt,
generation_config=genai.types.GenerationConfig(**generation_config)
)
# If successful, extract and return the response
result = LLMService._extract_text_from_response(response)
if attempt > 0:
logger.info(f"LLM content generation succeeded on attempt {attempt_num}/{max_retries}")
return result
except Exception as e:
last_error = e
error_message = str(e).lower()
logger.warning(f"LLM attempt {attempt_num}/{max_retries} failed: {str(e)}")
# Check if this is a retryable error (API internal errors, rate limiting, etc.)
if ("500" in error_message or
"internal error" in error_message or
"internal server error" in error_message or
"service unavailable" in error_message or
"timeout" in error_message or
"rate" in error_message):
if attempt < max_retries - 1:
# Wait before retrying (exponential backoff)
wait_time = 2 ** attempt # 1s, 2s, 4s
logger.info(f"Retryable error detected. Waiting {wait_time} seconds before retry {attempt_num + 1}/{max_retries}")
time.sleep(wait_time)
continue
else:
logger.error(f"Retryable error detected but max retries ({max_retries}) reached")
else:
logger.error(f"Non-retryable error detected: {str(e)}")
break
# If we've exhausted all retries or hit a non-retryable error, raise the last error
logger.error(f"LLM content generation failed after {max_retries} attempts. Final error: {str(last_error)}")
raise LLMServiceError(f"Error generating content: {str(last_error)}")
@staticmethod
def parse_json_response(response_text: str) -> Union[Dict[str, Any], List[Any]]:
"""
Parse a JSON response from the LLM.
Args:
response_text: The text response from the LLM
Returns:
A dictionary or list parsed from the JSON response
Raises:
LLMServiceError: If there's an issue parsing the JSON
"""
try:
# Handle common formatting issues in the response
clean_response = response_text
# Remove markdown code blocks if present
if clean_response.startswith("```json"):
clean_response = clean_response.strip("```json").strip("```").strip()
elif clean_response.startswith("```"):
clean_response = clean_response.strip("```").strip()
# Parse the JSON
return json.loads(clean_response)
except json.JSONDecodeError as e:
error_msg = f"Failed to parse JSON response: {str(e)}. Raw response: {response_text[:200]}..."
logger.error(error_msg)
raise LLMServiceError(error_msg)
@staticmethod
def generate_structured_response(
prompt: str,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
model_name: Optional[str] = None,
system_prompt: Optional[str] = None,
reasoning_effort: Optional[str] = None,
verbosity: Optional[str] = None
) -> Dict[str, Any]:
"""
Generate a structured JSON response using the LLM.
Args:
prompt: The prompt to send to the model
temperature: Controls randomness in generation
max_tokens: Maximum tokens to generate
model_name: Optional model name to use
system_prompt: Optional system prompt to define the role of the AI
reasoning_effort: GPT-5 only - Controls thinking time (minimal/low/medium/high)
verbosity: GPT-5 only - Controls response length (low/medium/high)
Returns:
A dictionary parsed from the JSON response
Raises:
LLMServiceError: If there's an issue with generation or parsing
"""
response_text = LLMService.generate_content(
prompt=prompt,
temperature=temperature,
max_tokens=max_tokens,
model_name=model_name,
system_prompt=system_prompt,
reasoning_effort=reasoning_effort,
verbosity=verbosity
)
return LLMService.parse_json_response(response_text)
@staticmethod
def generate_structured_array(
prompt: str,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
model_name: Optional[str] = None,
system_prompt: Optional[str] = None,
reasoning_effort: Optional[str] = None,
verbosity: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Generate a structured JSON array response using the LLM.
Args:
prompt: The prompt to send to the model
temperature: Controls randomness in generation
max_tokens: Maximum tokens to generate
model_name: Optional model name to use
system_prompt: Optional system prompt to define the role of the AI
reasoning_effort: GPT-5 only - Controls thinking time (minimal/low/medium/high)
verbosity: GPT-5 only - Controls response length (low/medium/high)
Returns:
A list of dictionaries parsed from the JSON array response
Raises:
LLMServiceError: If there's an issue with generation or parsing
"""
response_text = LLMService.generate_content(
prompt=prompt,
temperature=temperature,
max_tokens=max_tokens,
model_name=model_name,
system_prompt=system_prompt,
reasoning_effort=reasoning_effort,
verbosity=verbosity
)
result = LLMService.parse_json_response(response_text)
# Ensure the result is a list
if not isinstance(result, list):
raise LLMServiceError(f"Expected a JSON array but received {type(result)}")
return result
@staticmethod
def generate_multimodal_content(
prompt: str,
image_paths: List[str],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
model_name: Optional[str] = None
) -> str:
"""
Generate content using both text and image inputs.
Args:
prompt: The text prompt to send to the model
image_paths: List of paths to image files to include
temperature: Controls randomness in generation
max_tokens: Maximum tokens to generate
model_name: Optional model name to use
Returns:
The generated text response
Raises:
LLMServiceError: If there's an issue with generation or image processing
"""
logger = logging.getLogger(__name__)
max_retries = 3
last_error = None
actual_model = model_name or DEFAULT_MODEL
provider = LLMService._get_model_provider(model_name)
logger.info(f"Generating multimodal content with {len(image_paths)} image(s) using {provider} provider")
for attempt in range(max_retries):
attempt_num = attempt + 1
logger.debug(f"Multimodal content generation attempt {attempt_num}/{max_retries}")
try:
if provider == 'openai':
# OpenAI multimodal API call
import base64
# Prepare image content for OpenAI API
image_content = []
for image_path in image_paths:
if not os.path.exists(image_path):
raise LLMServiceError(f"Image file not found: {image_path}")
# Encode image to base64
with open(image_path, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode('utf-8')
# Get image format
image_format = image_path.lower().split('.')[-1]
if image_format == 'jpg':
image_format = 'jpeg'
image_content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/{image_format};base64,{base64_image}"
}
})
logger.debug(f"Successfully loaded image for OpenAI: {image_path}")
if actual_model == 'gpt-5':
# Use Responses API for GPT-5 multimodal
# Note: GPT-5 Responses API supports multimodal input
input_content = [{"role": "user", "content": [{"type": "input_text", "text": prompt}]}]
# Add images to the content array
for img_content in image_content:
input_content[0]["content"].append({
"type": "input_image",
"image_url": img_content["image_url"]["url"]
})
kwargs = {
"model": actual_model,
"input": input_content,
"reasoning": {"effort": "medium"}, # Default reasoning for multimodal
"text": {
"verbosity": "medium", # Default verbosity for multimodal
"format": {"type": "text"}
}
}
# Note: GPT-5 Responses API does not support max_tokens parameter
response = openai_client.responses.create(**kwargs)
result = LLMService._extract_responses_api_content(response)
else:
# Use Chat Completions API for non-GPT-5 models
content = [{"type": "text", "text": prompt}]
content.extend(image_content)
kwargs = {
"model": actual_model,
"messages": [{"role": "user", "content": content}],
"temperature": temperature,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
response = openai_client.chat.completions.create(**kwargs)
result = response.choices[0].message.content.strip()
else:
# Gemini multimodal API call (existing logic)
# Load and validate images
images = []
for image_path in image_paths:
try:
if not os.path.exists(image_path):
raise LLMServiceError(f"Image file not found: {image_path}")
# Load image using PIL
with Image.open(image_path) as img:
# Convert to RGB if necessary
if img.mode != 'RGB':
img = img.convert('RGB')
images.append(img.copy())
logger.debug(f"Successfully loaded image for Gemini: {image_path}")
except Exception as e:
raise LLMServiceError(f"Failed to load image {image_path}: {str(e)}")
model = LLMService.get_model(model_name)
generation_config = {
"temperature": temperature,
}
if max_tokens:
generation_config["max_output_tokens"] = max_tokens
# Create multimodal input - combine text prompt with images
content_parts = [prompt]
content_parts.extend(images)
response = model.generate_content(
content_parts,
generation_config=genai.types.GenerationConfig(**generation_config)
)
# Extract and return the response
result = LLMService._extract_text_from_response(response)
if attempt > 0:
logger.info(f"Multimodal content generation succeeded on attempt {attempt_num}/{max_retries}")
return result
except Exception as e:
last_error = e
error_message = str(e).lower()
logger.warning(f"Multimodal attempt {attempt_num}/{max_retries} failed: {str(e)}")
# Check if this is a retryable error
if ("500" in error_message or
"internal error" in error_message or
"internal server error" in error_message or
"service unavailable" in error_message or
"timeout" in error_message or
"rate" in error_message):
if attempt < max_retries - 1:
# Wait before retrying (exponential backoff)
wait_time = 2 ** attempt # 1s, 2s, 4s
logger.info(f"Retryable error detected. Waiting {wait_time} seconds before retry {attempt_num + 1}/{max_retries}")
time.sleep(wait_time)
continue
else:
logger.error(f"Retryable error detected but max retries ({max_retries}) reached")
else:
logger.error(f"Non-retryable error detected: {str(e)}")
break
# If we've exhausted all retries or hit a non-retryable error, raise the last error
logger.error(f"Multimodal content generation failed after {max_retries} attempts. Final error: {str(last_error)}")
raise LLMServiceError(f"Error generating multimodal content: {str(last_error)}")
@staticmethod
def generate_contextual_response(
prompt: str,
conversation_context: List[Dict[str, Any]],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
model_name: Optional[str] = None,
reasoning_effort: Optional[str] = None,
verbosity: Optional[str] = None
) -> str:
"""
Generate content using conversation context that may include both text and images in sequence.
Args:
prompt: The main prompt for the LLM
conversation_context: List of context items (text and image) in chronological order
temperature: Controls randomness in generation
max_tokens: Maximum tokens to generate
model_name: Optional model name to use
reasoning_effort: GPT-5 only - Controls thinking time (minimal/low/medium/high)
verbosity: GPT-5 only - Controls response length (low/medium/high)
Returns:
The generated text response
Raises:
LLMServiceError: If there's an issue with generation
"""
logger = logging.getLogger(__name__)
# Separate text and image content from the conversation context
text_context_parts = []
image_parts = []
print(f"🎯 Processing {len(conversation_context)} context items for LLM")
for item in conversation_context:
if item["type"] == "text":
text_context_parts.append(item["content"])
elif item["type"] == "image":
try:
image_path = item["path"]
if os.path.exists(image_path):
# Load image using PIL
with Image.open(image_path) as img:
# Convert to RGB if necessary
if img.mode != 'RGB':
img = img.convert('RGB')
image_parts.append(img.copy())
print(f"🖼️ Loaded image for context: {item['filename']}")
else:
print(f"⚠️ Image not found for context: {image_path}")
except Exception as e:
print(f"❌ Failed to load image for context: {item['path']}: {e}")
# Build the full context prompt
context_prompt = ""
if text_context_parts:
context_prompt = "CONVERSATION CONTEXT:\n" + "\n".join(text_context_parts) + "\n\n"
full_prompt = context_prompt + prompt
print(f"📝 Context prompt length: {len(context_prompt)} characters")
print(f"🖼️ Total images in context: {len(image_parts)}")
# If we have images, use multimodal generation
if image_parts:
print(f"🎨 Using multimodal generation with {len(image_parts)} images")
actual_model = model_name or DEFAULT_MODEL
provider = LLMService._get_model_provider(model_name)
max_retries = 3
last_error = None
for attempt in range(max_retries):
attempt_num = attempt + 1
logger.debug(f"Contextual multimodal generation attempt {attempt_num}/{max_retries}")
try:
if provider == 'openai':
# OpenAI contextual multimodal API call
import base64
# Convert PIL images to base64 for OpenAI API
image_content = []
for i, img in enumerate(image_parts):
# Convert PIL image to base64
buffer = io.BytesIO()
img.save(buffer, format='PNG')
base64_image = base64.b64encode(buffer.getvalue()).decode('utf-8')
image_content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{base64_image}"
}
})
if actual_model == 'gpt-5':
# Use Responses API for GPT-5 contextual multimodal
input_content = [{"role": "user", "content": [{"type": "input_text", "text": full_prompt}]}]
# Add images to the content array
for img_content in image_content:
input_content[0]["content"].append({
"type": "input_image",
"image_url": img_content["image_url"]["url"]
})
kwargs = {
"model": actual_model,
"input": input_content,
"reasoning": {"effort": reasoning_effort or "medium"},
"text": {
"verbosity": verbosity or "medium",
"format": {"type": "text"}
}
}
# Note: GPT-5 Responses API does not support max_tokens parameter
response = openai_client.responses.create(**kwargs)
result = LLMService._extract_responses_api_content(response)
else:
# Use Chat Completions API for non-GPT-5 models
content = [{"type": "text", "text": full_prompt}]
content.extend(image_content)
kwargs = {
"model": actual_model,
"messages": [{"role": "user", "content": content}],
"temperature": temperature,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
response = openai_client.chat.completions.create(**kwargs)
result = response.choices[0].message.content.strip()
else:
# Gemini contextual multimodal API call (existing logic)
# Create content parts with text and images
content_parts = [full_prompt]
content_parts.extend(image_parts)
model = LLMService.get_model(model_name)
generation_config = {
"temperature": temperature,
}
if max_tokens:
generation_config["max_output_tokens"] = max_tokens
response = model.generate_content(
content_parts,
generation_config=genai.types.GenerationConfig(**generation_config)
)
result = LLMService._extract_text_from_response(response)
if attempt > 0:
logger.info(f"Contextual multimodal generation succeeded on attempt {attempt_num}/{max_retries}")
print(f"✅ Generated contextual response with visual context using {provider}")
print(f"🔍 LLM RESULT DEBUG:")
print(f" - Result type: {type(result)}")
print(f" - Result length: {len(result) if result else 0} characters")
print(f" - Result preview: '{result[:200] if result else 'EMPTY'}...'")
print(f" - Result repr: {repr(result[:50]) if result else 'NONE'}")
return result
except Exception as e:
last_error = e
error_message = str(e).lower()
logger.warning(f"Contextual multimodal attempt {attempt_num}/{max_retries} failed: {str(e)}")
# Check if this is a retryable error
if ("500" in error_message or
"internal error" in error_message or
"internal server error" in error_message or
"service unavailable" in error_message or
"timeout" in error_message or
"rate" in error_message):
if attempt < max_retries - 1:
wait_time = 2 ** attempt
logger.info(f"Retryable error detected. Waiting {wait_time} seconds before retry {attempt_num + 1}/{max_retries}")
time.sleep(wait_time)
continue
else:
logger.error(f"Retryable error detected but max retries ({max_retries}) reached")
else:
logger.error(f"Non-retryable error detected: {str(e)}")
break
# If multimodal failed, raise the error
logger.error(f"Contextual multimodal generation failed after {max_retries} attempts. Final error: {str(last_error)}")
raise LLMServiceError(f"Error generating contextual multimodal content: {str(last_error)}")
else:
# No images, use standard text generation
print(f"📝 Using text-only generation (no visual context)")
return LLMService.generate_content(
prompt=full_prompt,
temperature=temperature,
max_tokens=max_tokens,
model_name=model_name,
reasoning_effort=reasoning_effort,
verbosity=verbosity
)