modcomms/backend/app/services/gemini_service.py

385 lines
17 KiB
Python
Executable file

import asyncio
import json
import logging
from typing import Awaitable, Callable, List, Optional, Tuple
from google import genai
from google.genai import types
from app.models.schemas import SubReview, RagStatus
# Configure logging
logger = logging.getLogger(__name__)
# Timeout for each Gemini API call.
# google-genai http_options 'timeout' is in MILLISECONDS.
_PRIMARY_TIMEOUT_MS = 45_000 # 45 seconds
_FALLBACK_TIMEOUT_MS = 150_000 # 150 seconds
class GeminiService:
"""Service wrapper for Google Gemini API calls."""
def __init__(self, api_key: str):
"""
Initialize the Gemini service.
Args:
api_key: Google Gemini API key
"""
# Two separate clients with different HTTP-level timeouts so the
# network connection is torn down when the deadline is reached.
# Note: google-genai http_options 'timeout' is in milliseconds.
self.primary_client = genai.Client(
api_key=api_key,
http_options={"timeout": _PRIMARY_TIMEOUT_MS},
)
self.fallback_client = genai.Client(
api_key=api_key,
http_options={"timeout": _FALLBACK_TIMEOUT_MS},
)
self.model = "gemini-3.1-pro-preview"
self.fallback_model = "gemini-3.1-flash-lite"
async def _generate_content(
self,
contents,
config,
on_fallback: Optional[Callable[[], Awaitable[None]]] = None,
) -> any:
"""Call generate_content, falling back to fallback_model if the primary fails or times out."""
try:
return await self.primary_client.aio.models.generate_content(
model=self.model,
contents=contents,
config=config,
)
except Exception as e:
logger.warning(
f"[GEMINI API] Primary model {self.model} failed: {e}. "
f"Retrying with fallback {self.fallback_model}"
)
if on_fallback:
await on_fallback()
return await self.fallback_client.aio.models.generate_content(
model=self.fallback_model,
contents=contents,
config=config,
)
async def analyze_with_image(
self,
prompt: str,
file_data: bytes,
file_type: str,
include_revision_fields: bool = False,
on_fallback: Optional[Callable[[], Awaitable[None]]] = None,
) -> SubReview:
"""
Analyze an image/file with Gemini and return a structured SubReview.
Args:
prompt: The analysis prompt including reference doc context
file_data: Raw bytes of the file
file_type: MIME type of the file (e.g., "image/png")
include_revision_fields: If True, require revision fields in response schema
Returns:
SubReview with ragStatus, feedback, and issues.
When include_revision_fields is True, also includes resolvedIssues,
outstandingIssues, and newIssues.
"""
try:
logger.info(f"[GEMINI API] Starting image analysis - file_type: {file_type}, file_size: {len(file_data)} bytes, revision_fields: {include_revision_fields}")
# Create inline data part for the file
file_part = types.Part.from_bytes(
data=file_data,
mime_type=file_type
)
# Define the response schema for structured output
response_schema = {
"type": "object",
"properties": {
"analysisStatus": {
"type": "string",
"enum": ["success", "low_confidence"],
"description": "Set to 'low_confidence' if the proof is nonsensical, completely irrelevant to marketing, or otherwise impossible to analyze. Otherwise, set to 'success'."
},
"ragStatus": {
"type": "string",
"enum": ["Red", "Amber", "Green"],
"description": "A RAG status. Red: Issues that must be resolved. Amber: Issues that should be addressed. Green: No issues found."
},
"feedback": {
"type": "string",
"description": "Brief bullet-point feedback. Each bullet has two parts: '**Issue:** [problem]' on one line, then '**Recommendation:** [fix]' on the next line. Use • for bullets. Max 5 bullets."
},
"issues": {
"type": "array",
"items": {"type": "string"},
"description": "A list of specific, actionable issues found. If no issues, return an empty array."
}
},
"required": ["analysisStatus", "ragStatus", "feedback", "issues"]
}
# Add revision fields if requested
if include_revision_fields:
response_schema["properties"]["resolvedIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "Issues from the previous version that have been resolved in this revision."
}
response_schema["properties"]["outstandingIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "Issues from the previous version that remain unresolved in this revision."
}
response_schema["properties"]["newIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "New issues introduced in this revision that were not present in the previous version."
}
response_schema["required"].extend(["resolvedIssues", "outstandingIssues", "newIssues"])
# Make the API call
logger.info(f"[GEMINI API] Calling Gemini model: {self.model}")
response = await self._generate_content(
contents=[file_part, prompt],
config=types.GenerateContentConfig(
response_mime_type="application/json",
response_schema=response_schema,
),
on_fallback=on_fallback,
)
logger.info(f"[GEMINI API] Response received from Gemini")
# Parse the JSON response
json_text = response.text.strip()
parsed_result = json.loads(json_text)
logger.info(f"[GEMINI API] Parsed result - ragStatus: {parsed_result.get('ragStatus')}, analysisStatus: {parsed_result.get('analysisStatus')}")
# Handle low confidence analysis
if parsed_result.get("analysisStatus") == "low_confidence":
return SubReview(
ragStatus=RagStatus.ERROR,
feedback="The agent could not analyze this proof with high confidence. This may be because the content is irrelevant, nonsensical, or too far outside of expected marketing materials.",
issues=[]
)
# Build SubReview with optional revision fields
review_kwargs = {
"ragStatus": RagStatus(parsed_result["ragStatus"]),
"feedback": parsed_result["feedback"],
"issues": parsed_result["issues"],
}
if include_revision_fields:
review_kwargs["resolvedIssues"] = parsed_result.get("resolvedIssues", [])
review_kwargs["outstandingIssues"] = parsed_result.get("outstandingIssues", [])
review_kwargs["newIssues"] = parsed_result.get("newIssues", [])
return SubReview(**review_kwargs)
except json.JSONDecodeError as e:
logger.error(f"[GEMINI API] JSON parse error: {str(e)}")
return SubReview(
ragStatus=RagStatus.ERROR,
feedback=f"Failed to parse AI response as JSON: {str(e)}",
issues=[]
)
except Exception as e:
logger.error(f"[GEMINI API] Error during analysis: {str(e)}")
return SubReview(
ragStatus=RagStatus.ERROR,
feedback=f"An error occurred during analysis: {str(e)}",
issues=[]
)
async def analyze_with_images(
self,
prompt: str,
images: List[Tuple[bytes, str]],
include_revision_fields: bool = False,
on_fallback: Optional[Callable[[], Awaitable[None]]] = None,
) -> SubReview:
"""
Analyze multiple images with Gemini and return a structured SubReview.
This is used for multi-page PDFs where all pages need to be analyzed together.
Args:
prompt: The analysis prompt including reference doc context
images: List of (file_data, mime_type) tuples for each image
include_revision_fields: If True, require revision fields in response schema
Returns:
SubReview with ragStatus, feedback, and issues.
When include_revision_fields is True, also includes resolvedIssues,
outstandingIssues, and newIssues.
"""
try:
logger.info(f"[GEMINI API] Starting multi-image analysis - {len(images)} images, revision_fields: {include_revision_fields}")
# Create inline data parts for all images
file_parts = []
for i, (file_data, file_type) in enumerate(images):
part = types.Part.from_bytes(data=file_data, mime_type=file_type)
file_parts.append(part)
logger.info(f"[GEMINI API] Added image {i + 1}/{len(images)} - type: {file_type}, size: {len(file_data)} bytes")
# Define the response schema for structured output
response_schema = {
"type": "object",
"properties": {
"analysisStatus": {
"type": "string",
"enum": ["success", "low_confidence"],
"description": "Set to 'low_confidence' if the proof is nonsensical, completely irrelevant to marketing, or otherwise impossible to analyze. Otherwise, set to 'success'."
},
"ragStatus": {
"type": "string",
"enum": ["Red", "Amber", "Green"],
"description": "A RAG status. Red: Issues that must be resolved. Amber: Issues that should be addressed. Green: No issues found."
},
"feedback": {
"type": "string",
"description": "Brief bullet-point feedback. Each bullet has two parts: '**Issue:** [problem]' on one line, then '**Recommendation:** [fix]' on the next line. Use • for bullets. Max 5 bullets."
},
"issues": {
"type": "array",
"items": {"type": "string"},
"description": "A list of specific, actionable issues found. If no issues, return an empty array."
}
},
"required": ["analysisStatus", "ragStatus", "feedback", "issues"]
}
# Add revision fields if requested
if include_revision_fields:
response_schema["properties"]["resolvedIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "Issues from the previous version that have been resolved in this revision."
}
response_schema["properties"]["outstandingIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "Issues from the previous version that remain unresolved in this revision."
}
response_schema["properties"]["newIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "New issues introduced in this revision that were not present in the previous version."
}
response_schema["required"].extend(["resolvedIssues", "outstandingIssues", "newIssues"])
# Combine file parts with prompt
contents = file_parts + [prompt]
# Make the API call
logger.info(f"[GEMINI API] Calling Gemini model: {self.model} with {len(images)} images")
response = await self._generate_content(
contents=contents,
config=types.GenerateContentConfig(
response_mime_type="application/json",
response_schema=response_schema,
),
on_fallback=on_fallback,
)
logger.info(f"[GEMINI API] Response received from Gemini (multi-image)")
# Parse the JSON response
json_text = response.text.strip()
parsed_result = json.loads(json_text)
logger.info(f"[GEMINI API] Parsed result - ragStatus: {parsed_result.get('ragStatus')}, analysisStatus: {parsed_result.get('analysisStatus')}")
# Handle low confidence analysis
if parsed_result.get("analysisStatus") == "low_confidence":
return SubReview(
ragStatus=RagStatus.ERROR,
feedback="The agent could not analyze this proof with high confidence. This may be because the content is irrelevant, nonsensical, or too far outside of expected marketing materials.",
issues=[]
)
# Build SubReview with optional revision fields
review_kwargs = {
"ragStatus": RagStatus(parsed_result["ragStatus"]),
"feedback": parsed_result["feedback"],
"issues": parsed_result["issues"],
}
if include_revision_fields:
review_kwargs["resolvedIssues"] = parsed_result.get("resolvedIssues", [])
review_kwargs["outstandingIssues"] = parsed_result.get("outstandingIssues", [])
review_kwargs["newIssues"] = parsed_result.get("newIssues", [])
return SubReview(**review_kwargs)
except json.JSONDecodeError as e:
logger.error(f"[GEMINI API] JSON parse error: {str(e)}")
return SubReview(
ragStatus=RagStatus.ERROR,
feedback=f"Failed to parse AI response as JSON: {str(e)}",
issues=[]
)
except Exception as e:
logger.error(f"[GEMINI API] Error during multi-image analysis: {str(e)}")
return SubReview(
ragStatus=RagStatus.ERROR,
feedback=f"An error occurred during analysis: {str(e)}",
issues=[]
)
async def generate_summary(
self,
prompt: str,
on_fallback: Optional[Callable[[], Awaitable[None]]] = None,
) -> dict:
"""
Generate a text summary (for lead agent).
Args:
prompt: The prompt for generating the summary
Returns:
Parsed JSON response with summary and overall_status
"""
try:
logger.info("[GEMINI API] Generating lead agent summary")
response_schema = {
"type": "object",
"properties": {
"overallStatus": {
"type": "string",
"enum": ["Passed", "Failed", "Analysis Error", "Requires Manual Legal Review"],
},
"summary": {
"type": "string",
"description": "Brief summary: one-line verdict, then 3-5 bullet points listing key issues/recommendations."
}
},
"required": ["overallStatus", "summary"]
}
response = await self._generate_content(
contents=prompt,
config=types.GenerateContentConfig(
response_mime_type="application/json",
response_schema=response_schema,
),
on_fallback=on_fallback,
)
result = json.loads(response.text.strip())
logger.info(f"[GEMINI API] Summary generated - overallStatus: {result.get('overallStatus')}")
return result
except Exception as e:
logger.error(f"[GEMINI API] Error generating summary: {str(e)}")
return {
"overallStatus": "Analysis Error",
"summary": f"An error occurred while generating the summary: {str(e)}"
}