385 lines
17 KiB
Python
Executable file
385 lines
17 KiB
Python
Executable file
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Awaitable, Callable, List, Optional, Tuple
|
|
|
|
from google import genai
|
|
from google.genai import types
|
|
|
|
from app.models.schemas import SubReview, RagStatus
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Timeout for each Gemini API call.
|
|
# google-genai http_options 'timeout' is in MILLISECONDS.
|
|
_PRIMARY_TIMEOUT_MS = 45_000 # 45 seconds
|
|
_FALLBACK_TIMEOUT_MS = 150_000 # 150 seconds
|
|
|
|
|
|
class GeminiService:
|
|
"""Service wrapper for Google Gemini API calls."""
|
|
|
|
def __init__(self, api_key: str):
|
|
"""
|
|
Initialize the Gemini service.
|
|
|
|
Args:
|
|
api_key: Google Gemini API key
|
|
"""
|
|
# Two separate clients with different HTTP-level timeouts so the
|
|
# network connection is torn down when the deadline is reached.
|
|
# Note: google-genai http_options 'timeout' is in milliseconds.
|
|
self.primary_client = genai.Client(
|
|
api_key=api_key,
|
|
http_options={"timeout": _PRIMARY_TIMEOUT_MS},
|
|
)
|
|
self.fallback_client = genai.Client(
|
|
api_key=api_key,
|
|
http_options={"timeout": _FALLBACK_TIMEOUT_MS},
|
|
)
|
|
self.model = "gemini-3.1-pro-preview"
|
|
self.fallback_model = "gemini-3.1-flash-lite"
|
|
|
|
async def _generate_content(
|
|
self,
|
|
contents,
|
|
config,
|
|
on_fallback: Optional[Callable[[], Awaitable[None]]] = None,
|
|
) -> any:
|
|
"""Call generate_content, falling back to fallback_model if the primary fails or times out."""
|
|
try:
|
|
return await self.primary_client.aio.models.generate_content(
|
|
model=self.model,
|
|
contents=contents,
|
|
config=config,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"[GEMINI API] Primary model {self.model} failed: {e}. "
|
|
f"Retrying with fallback {self.fallback_model}"
|
|
)
|
|
if on_fallback:
|
|
await on_fallback()
|
|
return await self.fallback_client.aio.models.generate_content(
|
|
model=self.fallback_model,
|
|
contents=contents,
|
|
config=config,
|
|
)
|
|
|
|
async def analyze_with_image(
|
|
self,
|
|
prompt: str,
|
|
file_data: bytes,
|
|
file_type: str,
|
|
include_revision_fields: bool = False,
|
|
on_fallback: Optional[Callable[[], Awaitable[None]]] = None,
|
|
) -> SubReview:
|
|
"""
|
|
Analyze an image/file with Gemini and return a structured SubReview.
|
|
|
|
Args:
|
|
prompt: The analysis prompt including reference doc context
|
|
file_data: Raw bytes of the file
|
|
file_type: MIME type of the file (e.g., "image/png")
|
|
include_revision_fields: If True, require revision fields in response schema
|
|
|
|
Returns:
|
|
SubReview with ragStatus, feedback, and issues.
|
|
When include_revision_fields is True, also includes resolvedIssues,
|
|
outstandingIssues, and newIssues.
|
|
"""
|
|
try:
|
|
logger.info(f"[GEMINI API] Starting image analysis - file_type: {file_type}, file_size: {len(file_data)} bytes, revision_fields: {include_revision_fields}")
|
|
|
|
# Create inline data part for the file
|
|
file_part = types.Part.from_bytes(
|
|
data=file_data,
|
|
mime_type=file_type
|
|
)
|
|
|
|
# Define the response schema for structured output
|
|
response_schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"analysisStatus": {
|
|
"type": "string",
|
|
"enum": ["success", "low_confidence"],
|
|
"description": "Set to 'low_confidence' if the proof is nonsensical, completely irrelevant to marketing, or otherwise impossible to analyze. Otherwise, set to 'success'."
|
|
},
|
|
"ragStatus": {
|
|
"type": "string",
|
|
"enum": ["Red", "Amber", "Green"],
|
|
"description": "A RAG status. Red: Issues that must be resolved. Amber: Issues that should be addressed. Green: No issues found."
|
|
},
|
|
"feedback": {
|
|
"type": "string",
|
|
"description": "Brief bullet-point feedback. Each bullet has two parts: '**Issue:** [problem]' on one line, then '**Recommendation:** [fix]' on the next line. Use • for bullets. Max 5 bullets."
|
|
},
|
|
"issues": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "A list of specific, actionable issues found. If no issues, return an empty array."
|
|
}
|
|
},
|
|
"required": ["analysisStatus", "ragStatus", "feedback", "issues"]
|
|
}
|
|
|
|
# Add revision fields if requested
|
|
if include_revision_fields:
|
|
response_schema["properties"]["resolvedIssues"] = {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "Issues from the previous version that have been resolved in this revision."
|
|
}
|
|
response_schema["properties"]["outstandingIssues"] = {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "Issues from the previous version that remain unresolved in this revision."
|
|
}
|
|
response_schema["properties"]["newIssues"] = {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "New issues introduced in this revision that were not present in the previous version."
|
|
}
|
|
response_schema["required"].extend(["resolvedIssues", "outstandingIssues", "newIssues"])
|
|
|
|
# Make the API call
|
|
logger.info(f"[GEMINI API] Calling Gemini model: {self.model}")
|
|
response = await self._generate_content(
|
|
contents=[file_part, prompt],
|
|
config=types.GenerateContentConfig(
|
|
response_mime_type="application/json",
|
|
response_schema=response_schema,
|
|
),
|
|
on_fallback=on_fallback,
|
|
)
|
|
logger.info(f"[GEMINI API] Response received from Gemini")
|
|
|
|
# Parse the JSON response
|
|
json_text = response.text.strip()
|
|
parsed_result = json.loads(json_text)
|
|
logger.info(f"[GEMINI API] Parsed result - ragStatus: {parsed_result.get('ragStatus')}, analysisStatus: {parsed_result.get('analysisStatus')}")
|
|
|
|
# Handle low confidence analysis
|
|
if parsed_result.get("analysisStatus") == "low_confidence":
|
|
return SubReview(
|
|
ragStatus=RagStatus.ERROR,
|
|
feedback="The agent could not analyze this proof with high confidence. This may be because the content is irrelevant, nonsensical, or too far outside of expected marketing materials.",
|
|
issues=[]
|
|
)
|
|
|
|
# Build SubReview with optional revision fields
|
|
review_kwargs = {
|
|
"ragStatus": RagStatus(parsed_result["ragStatus"]),
|
|
"feedback": parsed_result["feedback"],
|
|
"issues": parsed_result["issues"],
|
|
}
|
|
|
|
if include_revision_fields:
|
|
review_kwargs["resolvedIssues"] = parsed_result.get("resolvedIssues", [])
|
|
review_kwargs["outstandingIssues"] = parsed_result.get("outstandingIssues", [])
|
|
review_kwargs["newIssues"] = parsed_result.get("newIssues", [])
|
|
|
|
return SubReview(**review_kwargs)
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"[GEMINI API] JSON parse error: {str(e)}")
|
|
return SubReview(
|
|
ragStatus=RagStatus.ERROR,
|
|
feedback=f"Failed to parse AI response as JSON: {str(e)}",
|
|
issues=[]
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"[GEMINI API] Error during analysis: {str(e)}")
|
|
return SubReview(
|
|
ragStatus=RagStatus.ERROR,
|
|
feedback=f"An error occurred during analysis: {str(e)}",
|
|
issues=[]
|
|
)
|
|
|
|
async def analyze_with_images(
|
|
self,
|
|
prompt: str,
|
|
images: List[Tuple[bytes, str]],
|
|
include_revision_fields: bool = False,
|
|
on_fallback: Optional[Callable[[], Awaitable[None]]] = None,
|
|
) -> SubReview:
|
|
"""
|
|
Analyze multiple images with Gemini and return a structured SubReview.
|
|
|
|
This is used for multi-page PDFs where all pages need to be analyzed together.
|
|
|
|
Args:
|
|
prompt: The analysis prompt including reference doc context
|
|
images: List of (file_data, mime_type) tuples for each image
|
|
include_revision_fields: If True, require revision fields in response schema
|
|
|
|
Returns:
|
|
SubReview with ragStatus, feedback, and issues.
|
|
When include_revision_fields is True, also includes resolvedIssues,
|
|
outstandingIssues, and newIssues.
|
|
"""
|
|
try:
|
|
logger.info(f"[GEMINI API] Starting multi-image analysis - {len(images)} images, revision_fields: {include_revision_fields}")
|
|
|
|
# Create inline data parts for all images
|
|
file_parts = []
|
|
for i, (file_data, file_type) in enumerate(images):
|
|
part = types.Part.from_bytes(data=file_data, mime_type=file_type)
|
|
file_parts.append(part)
|
|
logger.info(f"[GEMINI API] Added image {i + 1}/{len(images)} - type: {file_type}, size: {len(file_data)} bytes")
|
|
|
|
# Define the response schema for structured output
|
|
response_schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"analysisStatus": {
|
|
"type": "string",
|
|
"enum": ["success", "low_confidence"],
|
|
"description": "Set to 'low_confidence' if the proof is nonsensical, completely irrelevant to marketing, or otherwise impossible to analyze. Otherwise, set to 'success'."
|
|
},
|
|
"ragStatus": {
|
|
"type": "string",
|
|
"enum": ["Red", "Amber", "Green"],
|
|
"description": "A RAG status. Red: Issues that must be resolved. Amber: Issues that should be addressed. Green: No issues found."
|
|
},
|
|
"feedback": {
|
|
"type": "string",
|
|
"description": "Brief bullet-point feedback. Each bullet has two parts: '**Issue:** [problem]' on one line, then '**Recommendation:** [fix]' on the next line. Use • for bullets. Max 5 bullets."
|
|
},
|
|
"issues": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "A list of specific, actionable issues found. If no issues, return an empty array."
|
|
}
|
|
},
|
|
"required": ["analysisStatus", "ragStatus", "feedback", "issues"]
|
|
}
|
|
|
|
# Add revision fields if requested
|
|
if include_revision_fields:
|
|
response_schema["properties"]["resolvedIssues"] = {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "Issues from the previous version that have been resolved in this revision."
|
|
}
|
|
response_schema["properties"]["outstandingIssues"] = {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "Issues from the previous version that remain unresolved in this revision."
|
|
}
|
|
response_schema["properties"]["newIssues"] = {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "New issues introduced in this revision that were not present in the previous version."
|
|
}
|
|
response_schema["required"].extend(["resolvedIssues", "outstandingIssues", "newIssues"])
|
|
|
|
# Combine file parts with prompt
|
|
contents = file_parts + [prompt]
|
|
|
|
# Make the API call
|
|
logger.info(f"[GEMINI API] Calling Gemini model: {self.model} with {len(images)} images")
|
|
response = await self._generate_content(
|
|
contents=contents,
|
|
config=types.GenerateContentConfig(
|
|
response_mime_type="application/json",
|
|
response_schema=response_schema,
|
|
),
|
|
on_fallback=on_fallback,
|
|
)
|
|
logger.info(f"[GEMINI API] Response received from Gemini (multi-image)")
|
|
|
|
# Parse the JSON response
|
|
json_text = response.text.strip()
|
|
parsed_result = json.loads(json_text)
|
|
logger.info(f"[GEMINI API] Parsed result - ragStatus: {parsed_result.get('ragStatus')}, analysisStatus: {parsed_result.get('analysisStatus')}")
|
|
|
|
# Handle low confidence analysis
|
|
if parsed_result.get("analysisStatus") == "low_confidence":
|
|
return SubReview(
|
|
ragStatus=RagStatus.ERROR,
|
|
feedback="The agent could not analyze this proof with high confidence. This may be because the content is irrelevant, nonsensical, or too far outside of expected marketing materials.",
|
|
issues=[]
|
|
)
|
|
|
|
# Build SubReview with optional revision fields
|
|
review_kwargs = {
|
|
"ragStatus": RagStatus(parsed_result["ragStatus"]),
|
|
"feedback": parsed_result["feedback"],
|
|
"issues": parsed_result["issues"],
|
|
}
|
|
|
|
if include_revision_fields:
|
|
review_kwargs["resolvedIssues"] = parsed_result.get("resolvedIssues", [])
|
|
review_kwargs["outstandingIssues"] = parsed_result.get("outstandingIssues", [])
|
|
review_kwargs["newIssues"] = parsed_result.get("newIssues", [])
|
|
|
|
return SubReview(**review_kwargs)
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"[GEMINI API] JSON parse error: {str(e)}")
|
|
return SubReview(
|
|
ragStatus=RagStatus.ERROR,
|
|
feedback=f"Failed to parse AI response as JSON: {str(e)}",
|
|
issues=[]
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"[GEMINI API] Error during multi-image analysis: {str(e)}")
|
|
return SubReview(
|
|
ragStatus=RagStatus.ERROR,
|
|
feedback=f"An error occurred during analysis: {str(e)}",
|
|
issues=[]
|
|
)
|
|
|
|
async def generate_summary(
|
|
self,
|
|
prompt: str,
|
|
on_fallback: Optional[Callable[[], Awaitable[None]]] = None,
|
|
) -> dict:
|
|
"""
|
|
Generate a text summary (for lead agent).
|
|
|
|
Args:
|
|
prompt: The prompt for generating the summary
|
|
|
|
Returns:
|
|
Parsed JSON response with summary and overall_status
|
|
"""
|
|
try:
|
|
logger.info("[GEMINI API] Generating lead agent summary")
|
|
response_schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"overallStatus": {
|
|
"type": "string",
|
|
"enum": ["Passed", "Failed", "Analysis Error", "Requires Manual Legal Review"],
|
|
},
|
|
"summary": {
|
|
"type": "string",
|
|
"description": "Brief summary: one-line verdict, then 3-5 bullet points listing key issues/recommendations."
|
|
}
|
|
},
|
|
"required": ["overallStatus", "summary"]
|
|
}
|
|
|
|
response = await self._generate_content(
|
|
contents=prompt,
|
|
config=types.GenerateContentConfig(
|
|
response_mime_type="application/json",
|
|
response_schema=response_schema,
|
|
),
|
|
on_fallback=on_fallback,
|
|
)
|
|
|
|
result = json.loads(response.text.strip())
|
|
logger.info(f"[GEMINI API] Summary generated - overallStatus: {result.get('overallStatus')}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"[GEMINI API] Error generating summary: {str(e)}")
|
|
return {
|
|
"overallStatus": "Analysis Error",
|
|
"summary": f"An error occurred while generating the summary: {str(e)}"
|
|
}
|