modcomms/backend/app/services/analysis_service.py
michael f13fa2f7e8 Parallelize specialist agent analysis with asyncio.gather
Run all 4 specialist agents (Legal, Brand, Channel Best Practices,
Channel Tech Specs) concurrently instead of sequentially. This reduces
total analysis time to roughly the duration of the slowest agent rather
than the sum of all agent times.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 09:13:15 -06:00

167 lines
6.7 KiB
Python
Executable file

import asyncio
import logging
from typing import Callable, Awaitable, List, Tuple, Optional
from app.models.schemas import SubReview, AgentReview, OverallStatus
logger = logging.getLogger(__name__)
from app.agents.brand_agent import BrandAgent
from app.agents.channel_best_practices_agent import ChannelBestPracticesAgent
from app.agents.channel_tech_specs_agent import ChannelTechSpecsAgent
from app.agents.legal_agent import LegalAgent
from app.agents.lead_agent import LeadAgent
from app.services.gemini_service import GeminiService
from app.services.reference_docs import ReferenceDocsService
from app.services.pdf_service import pdf_service
# Type alias for the callback function
AgentCallback = Callable[[str, SubReview | None], Awaitable[None]]
class AnalysisService:
"""
Orchestrates the multi-agent proof analysis.
Runs agents in parallel and provides callbacks for real-time updates.
"""
# Agent execution order
AGENT_ORDER = ["Legal Agent", "Brand Agent", "Channel Best Practices Agent", "Channel Tech Specs Agent"]
def __init__(
self,
gemini_service: GeminiService,
reference_docs: ReferenceDocsService,
):
"""
Initialize the analysis service with all required agents.
Args:
gemini_service: Service for Gemini API calls
reference_docs: Service for loading reference documents
"""
self.gemini_service = gemini_service
self.reference_docs = reference_docs
# Initialize agents
self.agents = {
"Legal Agent": LegalAgent(gemini_service, reference_docs),
"Brand Agent": BrandAgent(gemini_service, reference_docs),
"Channel Best Practices Agent": ChannelBestPracticesAgent(gemini_service, reference_docs),
"Channel Tech Specs Agent": ChannelTechSpecsAgent(gemini_service, reference_docs),
}
self.lead_agent = LeadAgent(gemini_service)
async def _run_agent(
self,
agent_name: str,
images: List[Tuple[bytes, str]],
brand: str,
on_agent_update: AgentCallback | None,
) -> Tuple[str, SubReview]:
"""Run a single agent with callback notifications."""
agent = self.agents[agent_name]
logger.info(f"[ANALYSIS] Starting agent: {agent_name}")
if on_agent_update:
await on_agent_update(agent_name, None)
if agent_name == "Brand Agent":
review = await agent.analyze(images, brand=brand)
else:
review = await agent.analyze(images)
logger.info(f"[ANALYSIS] Agent completed: {agent_name} - ragStatus: {review.ragStatus}")
if on_agent_update:
await on_agent_update(agent_name, review)
return (agent_name, review)
async def analyze_proof(
self,
file_data: bytes,
file_type: str,
on_agent_update: AgentCallback | None = None,
is_wip: bool = False,
brand: str = "Barclaycard",
) -> Tuple[AgentReview, Optional[List[Tuple[bytes, int, int]]]]:
"""
Analyze a proof using all agents in parallel.
Args:
file_data: Raw bytes of the file to analyze
file_type: MIME type of the file
on_agent_update: Optional callback for real-time agent updates.
Called with (agent_name, None) when agent starts,
and (agent_name, review) when agent completes.
is_wip: Whether this is a work-in-progress analysis
brand: Brand to use for brand guidelines analysis ('Barclays' or 'Barclaycard')
Returns:
Tuple of:
- Complete AgentReview with all agent results and overall verdict
- List of rasterized PDF pages if input was PDF, else None
Each page is (png_bytes, width, height)
"""
logger.info(f"[ANALYSIS] Starting proof analysis - file_type: {file_type}, file_size: {len(file_data)} bytes, is_wip: {is_wip}, brand: {brand}")
reviews: dict[str, SubReview] = {}
# Prepare images for analysis
pdf_pages: Optional[List[Tuple[bytes, int, int]]] = None
images: List[Tuple[bytes, str]] = []
if file_type == "application/pdf":
# Rasterize PDF to PNG images
logger.info("[ANALYSIS] Detected PDF, rasterizing pages...")
try:
pdf_pages = pdf_service.rasterize(file_data, max_pages=10)
images = [(png_data, "image/png") for png_data, _, _ in pdf_pages]
logger.info(f"[ANALYSIS] Rasterized {len(images)} PDF pages")
except ValueError as e:
logger.error(f"[ANALYSIS] PDF rasterization failed: {str(e)}")
# Return error review if PDF cannot be processed
error_review = SubReview(
ragStatus="Error",
feedback=f"Failed to process PDF: {str(e)}",
issues=[]
)
return AgentReview(
legalAgentReview=error_review,
brandAgentReview=error_review,
channelBestPracticesAgentReview=error_review,
channelTechSpecsAgentReview=error_review,
leadAgentSummary=f"Analysis could not proceed due to PDF processing error: {str(e)}",
overallStatus="Analysis Error",
financialPromotionReason=None,
), None
else:
# Single image/video - wrap in list
images = [(file_data, file_type)]
# Run all agents in parallel
tasks = [
self._run_agent(agent_name, images, brand, on_agent_update)
for agent_name in self.AGENT_ORDER
]
results = await asyncio.gather(*tasks)
reviews = {agent_name: review for agent_name, review in results}
# Get lead agent synthesis
logger.info("[ANALYSIS] Starting lead agent synthesis")
if on_agent_update:
await on_agent_update("Summary", None)
overall_status, summary, financial_promotion_reason = await self.lead_agent.synthesize(reviews)
logger.info(f"[ANALYSIS] Analysis complete - overallStatus: {overall_status}")
# Build the complete AgentReview
return AgentReview(
legalAgentReview=reviews["Legal Agent"],
brandAgentReview=reviews["Brand Agent"],
channelBestPracticesAgentReview=reviews["Channel Best Practices Agent"],
channelTechSpecsAgentReview=reviews["Channel Tech Specs Agent"],
leadAgentSummary=summary,
overallStatus=overall_status,
financialPromotionReason=financial_promotion_reason,
), pdf_pages