modcomms/backend/app/services/gemini_service.py
michael 3a5c3bcde3 Implement revision-aware proof analysis pipeline
When a subsequent revision of a proof is uploaded, the analysis now takes
place in context of the previous version's results. The system identifies:
- Resolved issues: fixed in the new revision
- Outstanding issues: still present from previous version
- New issues: introduced in the new revision

Key changes:
- Add resolvedIssues, outstandingIssues, newIssues fields to SubReview
- Add PreviousReviewContext model for passing previous review data
- Update all specialist agents to accept previous_review context
- Extend GeminiService with include_revision_fields parameter
- Add get_latest_version_review() repository method
- Update LeadAgent to synthesize cross-version context in summary
- Fetch previous analysis in WebSocket handler for revisions

First version analysis continues to work exactly as before with revision
fields set to null.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 10:04:16 -06:00

339 lines
15 KiB
Python
Executable file

import json
import logging
from typing import List, Tuple
from google import genai
from google.genai import types
from app.models.schemas import SubReview, RagStatus
# Configure logging
logger = logging.getLogger(__name__)
class GeminiService:
"""Service wrapper for Google Gemini API calls."""
def __init__(self, api_key: str):
"""
Initialize the Gemini service.
Args:
api_key: Google Gemini API key
"""
self.client = genai.Client(api_key=api_key)
self.model = "gemini-3-pro-preview"
async def analyze_with_image(
self,
prompt: str,
file_data: bytes,
file_type: str,
include_revision_fields: bool = False,
) -> SubReview:
"""
Analyze an image/file with Gemini and return a structured SubReview.
Args:
prompt: The analysis prompt including reference doc context
file_data: Raw bytes of the file
file_type: MIME type of the file (e.g., "image/png")
include_revision_fields: If True, require revision fields in response schema
Returns:
SubReview with ragStatus, feedback, and issues.
When include_revision_fields is True, also includes resolvedIssues,
outstandingIssues, and newIssues.
"""
try:
logger.info(f"[GEMINI API] Starting image analysis - file_type: {file_type}, file_size: {len(file_data)} bytes, revision_fields: {include_revision_fields}")
# Create inline data part for the file
file_part = types.Part.from_bytes(
data=file_data,
mime_type=file_type
)
# Define the response schema for structured output
response_schema = {
"type": "object",
"properties": {
"analysisStatus": {
"type": "string",
"enum": ["success", "low_confidence"],
"description": "Set to 'low_confidence' if the proof is nonsensical, completely irrelevant to marketing, or otherwise impossible to analyze. Otherwise, set to 'success'."
},
"ragStatus": {
"type": "string",
"enum": ["Red", "Amber", "Green"],
"description": "A RAG status. Red: Issues that must be resolved. Amber: Issues that should be addressed. Green: No issues found."
},
"feedback": {
"type": "string",
"description": "Brief bullet-point feedback. Each bullet: one actionable sentence. Format: '• Issue: recommendation'. Max 5 bullets."
},
"issues": {
"type": "array",
"items": {"type": "string"},
"description": "A list of specific, actionable issues found. If no issues, return an empty array."
}
},
"required": ["analysisStatus", "ragStatus", "feedback", "issues"]
}
# Add revision fields if requested
if include_revision_fields:
response_schema["properties"]["resolvedIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "Issues from the previous version that have been resolved in this revision."
}
response_schema["properties"]["outstandingIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "Issues from the previous version that remain unresolved in this revision."
}
response_schema["properties"]["newIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "New issues introduced in this revision that were not present in the previous version."
}
response_schema["required"].extend(["resolvedIssues", "outstandingIssues", "newIssues"])
# Make the API call
logger.info(f"[GEMINI API] Calling Gemini model: {self.model}")
response = await self.client.aio.models.generate_content(
model=self.model,
contents=[file_part, prompt],
config=types.GenerateContentConfig(
response_mime_type="application/json",
response_schema=response_schema
)
)
logger.info(f"[GEMINI API] Response received from Gemini")
# Parse the JSON response
json_text = response.text.strip()
parsed_result = json.loads(json_text)
logger.info(f"[GEMINI API] Parsed result - ragStatus: {parsed_result.get('ragStatus')}, analysisStatus: {parsed_result.get('analysisStatus')}")
# Handle low confidence analysis
if parsed_result.get("analysisStatus") == "low_confidence":
return SubReview(
ragStatus=RagStatus.ERROR,
feedback="The agent could not analyze this proof with high confidence. This may be because the content is irrelevant, nonsensical, or too far outside of expected marketing materials.",
issues=[]
)
# Build SubReview with optional revision fields
review_kwargs = {
"ragStatus": RagStatus(parsed_result["ragStatus"]),
"feedback": parsed_result["feedback"],
"issues": parsed_result["issues"],
}
if include_revision_fields:
review_kwargs["resolvedIssues"] = parsed_result.get("resolvedIssues", [])
review_kwargs["outstandingIssues"] = parsed_result.get("outstandingIssues", [])
review_kwargs["newIssues"] = parsed_result.get("newIssues", [])
return SubReview(**review_kwargs)
except json.JSONDecodeError as e:
logger.error(f"[GEMINI API] JSON parse error: {str(e)}")
return SubReview(
ragStatus=RagStatus.ERROR,
feedback=f"Failed to parse AI response as JSON: {str(e)}",
issues=[]
)
except Exception as e:
logger.error(f"[GEMINI API] Error during analysis: {str(e)}")
return SubReview(
ragStatus=RagStatus.ERROR,
feedback=f"An error occurred during analysis: {str(e)}",
issues=[]
)
async def analyze_with_images(
self,
prompt: str,
images: List[Tuple[bytes, str]],
include_revision_fields: bool = False,
) -> SubReview:
"""
Analyze multiple images with Gemini and return a structured SubReview.
This is used for multi-page PDFs where all pages need to be analyzed together.
Args:
prompt: The analysis prompt including reference doc context
images: List of (file_data, mime_type) tuples for each image
include_revision_fields: If True, require revision fields in response schema
Returns:
SubReview with ragStatus, feedback, and issues.
When include_revision_fields is True, also includes resolvedIssues,
outstandingIssues, and newIssues.
"""
try:
logger.info(f"[GEMINI API] Starting multi-image analysis - {len(images)} images, revision_fields: {include_revision_fields}")
# Create inline data parts for all images
file_parts = []
for i, (file_data, file_type) in enumerate(images):
part = types.Part.from_bytes(data=file_data, mime_type=file_type)
file_parts.append(part)
logger.info(f"[GEMINI API] Added image {i + 1}/{len(images)} - type: {file_type}, size: {len(file_data)} bytes")
# Define the response schema for structured output
response_schema = {
"type": "object",
"properties": {
"analysisStatus": {
"type": "string",
"enum": ["success", "low_confidence"],
"description": "Set to 'low_confidence' if the proof is nonsensical, completely irrelevant to marketing, or otherwise impossible to analyze. Otherwise, set to 'success'."
},
"ragStatus": {
"type": "string",
"enum": ["Red", "Amber", "Green"],
"description": "A RAG status. Red: Issues that must be resolved. Amber: Issues that should be addressed. Green: No issues found."
},
"feedback": {
"type": "string",
"description": "Brief bullet-point feedback. Each bullet: one actionable sentence. Format: '• Issue: recommendation'. Max 5 bullets."
},
"issues": {
"type": "array",
"items": {"type": "string"},
"description": "A list of specific, actionable issues found. If no issues, return an empty array."
}
},
"required": ["analysisStatus", "ragStatus", "feedback", "issues"]
}
# Add revision fields if requested
if include_revision_fields:
response_schema["properties"]["resolvedIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "Issues from the previous version that have been resolved in this revision."
}
response_schema["properties"]["outstandingIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "Issues from the previous version that remain unresolved in this revision."
}
response_schema["properties"]["newIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "New issues introduced in this revision that were not present in the previous version."
}
response_schema["required"].extend(["resolvedIssues", "outstandingIssues", "newIssues"])
# Combine file parts with prompt
contents = file_parts + [prompt]
# Make the API call
logger.info(f"[GEMINI API] Calling Gemini model: {self.model} with {len(images)} images")
response = await self.client.aio.models.generate_content(
model=self.model,
contents=contents,
config=types.GenerateContentConfig(
response_mime_type="application/json",
response_schema=response_schema
)
)
logger.info(f"[GEMINI API] Response received from Gemini (multi-image)")
# Parse the JSON response
json_text = response.text.strip()
parsed_result = json.loads(json_text)
logger.info(f"[GEMINI API] Parsed result - ragStatus: {parsed_result.get('ragStatus')}, analysisStatus: {parsed_result.get('analysisStatus')}")
# Handle low confidence analysis
if parsed_result.get("analysisStatus") == "low_confidence":
return SubReview(
ragStatus=RagStatus.ERROR,
feedback="The agent could not analyze this proof with high confidence. This may be because the content is irrelevant, nonsensical, or too far outside of expected marketing materials.",
issues=[]
)
# Build SubReview with optional revision fields
review_kwargs = {
"ragStatus": RagStatus(parsed_result["ragStatus"]),
"feedback": parsed_result["feedback"],
"issues": parsed_result["issues"],
}
if include_revision_fields:
review_kwargs["resolvedIssues"] = parsed_result.get("resolvedIssues", [])
review_kwargs["outstandingIssues"] = parsed_result.get("outstandingIssues", [])
review_kwargs["newIssues"] = parsed_result.get("newIssues", [])
return SubReview(**review_kwargs)
except json.JSONDecodeError as e:
logger.error(f"[GEMINI API] JSON parse error: {str(e)}")
return SubReview(
ragStatus=RagStatus.ERROR,
feedback=f"Failed to parse AI response as JSON: {str(e)}",
issues=[]
)
except Exception as e:
logger.error(f"[GEMINI API] Error during multi-image analysis: {str(e)}")
return SubReview(
ragStatus=RagStatus.ERROR,
feedback=f"An error occurred during analysis: {str(e)}",
issues=[]
)
async def generate_summary(
self,
prompt: str,
) -> dict:
"""
Generate a text summary (for lead agent).
Args:
prompt: The prompt for generating the summary
Returns:
Parsed JSON response with summary and overall_status
"""
try:
logger.info("[GEMINI API] Generating lead agent summary")
response_schema = {
"type": "object",
"properties": {
"overallStatus": {
"type": "string",
"enum": ["Passed", "Failed", "Analysis Error", "Requires Manual Legal Review"],
},
"summary": {
"type": "string",
"description": "Brief summary: one-line verdict, then 3-5 bullet points listing key issues/recommendations."
}
},
"required": ["overallStatus", "summary"]
}
response = await self.client.aio.models.generate_content(
model=self.model,
contents=prompt,
config=types.GenerateContentConfig(
response_mime_type="application/json",
response_schema=response_schema
)
)
result = json.loads(response.text.strip())
logger.info(f"[GEMINI API] Summary generated - overallStatus: {result.get('overallStatus')}")
return result
except Exception as e:
logger.error(f"[GEMINI API] Error generating summary: {str(e)}")
return {
"overallStatus": "Analysis Error",
"summary": f"An error occurred while generating the summary: {str(e)}"
}