import asyncio import logging from typing import Callable, Awaitable, List, Tuple, Optional from app.models.schemas import PreviousReviewContext, RagStatus, SubReview, AgentReview, OverallStatus logger = logging.getLogger(__name__) from app.agents.brand_agent import BrandAgent from app.agents.channel_best_practices_agent import ChannelBestPracticesAgent from app.agents.channel_tech_specs_agent import ChannelTechSpecsAgent from app.agents.legal_agent import LegalAgent from app.agents.lead_agent import LeadAgent from app.services.gemini_service import GeminiService from app.services.reference_docs import ReferenceDocsService from app.services.pdf_service import pdf_service # Type alias for the callback function AgentCallback = Callable[[str, SubReview | None], Awaitable[None]] class AnalysisService: """ Orchestrates the multi-agent proof analysis. Runs agents in parallel and provides callbacks for real-time updates. """ # Agent execution order AGENT_ORDER = ["Risk & Control Agent", "Brand Agent", "Channel Best Practices Agent", "Channel Tech Specs Agent"] # Mapping from agent name to the key in AgentReview/previous_analysis dict AGENT_REVIEW_KEY_MAP = { "Risk & Control Agent": "legalAgentReview", "Brand Agent": "brandAgentReview", "Channel Best Practices Agent": "channelBestPracticesAgentReview", "Channel Tech Specs Agent": "channelTechSpecsAgentReview", } def __init__( self, gemini_service: GeminiService, reference_docs: ReferenceDocsService, ): """ Initialize the analysis service with all required agents. Args: gemini_service: Service for Gemini API calls reference_docs: Service for loading reference documents """ self.gemini_service = gemini_service self.reference_docs = reference_docs # Initialize agents self.agents = { "Risk & Control Agent": LegalAgent(gemini_service, reference_docs), "Brand Agent": BrandAgent(gemini_service, reference_docs), "Channel Best Practices Agent": ChannelBestPracticesAgent(gemini_service, reference_docs), "Channel Tech Specs Agent": ChannelTechSpecsAgent(gemini_service, reference_docs), } self.lead_agent = LeadAgent(gemini_service) def _extract_previous_review_context( self, agent_name: str, previous_analysis: Optional[dict], ) -> Optional[PreviousReviewContext]: """ Extract the previous review context for a specific agent. Args: agent_name: Name of the agent previous_analysis: Full previous analysis dict (or None) Returns: PreviousReviewContext for the agent, or None if not available """ if not previous_analysis: return None review_key = self.AGENT_REVIEW_KEY_MAP.get(agent_name) if not review_key: return None agent_review = previous_analysis.get(review_key) if not agent_review: return None version = previous_analysis.get("version", 0) if version == 0: return None return PreviousReviewContext( version=version, ragStatus=RagStatus(agent_review.get("ragStatus", "Error")), feedback=agent_review.get("feedback", ""), issues=agent_review.get("issues", []), ) async def _run_agent( self, agent_name: str, images: List[Tuple[bytes, str]], brand: str, on_agent_update: AgentCallback | None, previous_review: Optional[PreviousReviewContext] = None, channel: Optional[str] = None, sub_channel: Optional[str] = None, proof_type: Optional[str] = None, on_fallback=None, ) -> Tuple[str, SubReview]: """Run a single agent with callback notifications.""" agent = self.agents[agent_name] logger.info(f"[ANALYSIS] Starting agent: {agent_name}, has_previous_review: {previous_review is not None}") if on_agent_update: await on_agent_update(agent_name, None) if agent_name == "Brand Agent": review = await agent.analyze(images, previous_review=previous_review, brand=brand, channel=channel, sub_channel=sub_channel, proof_type=proof_type, on_fallback=on_fallback) else: review = await agent.analyze(images, previous_review=previous_review, channel=channel, sub_channel=sub_channel, proof_type=proof_type, on_fallback=on_fallback) logger.info(f"[ANALYSIS] Agent completed: {agent_name} - ragStatus: {review.ragStatus}") if on_agent_update: await on_agent_update(agent_name, review) return (agent_name, review) async def analyze_proof( self, file_data: bytes, file_type: str, on_agent_update: AgentCallback | None = None, is_wip: bool = False, brand: str = "Barclaycard", previous_analysis: Optional[dict] = None, channel: Optional[str] = None, sub_channel: Optional[str] = None, proof_type: Optional[str] = None, on_fallback=None, ) -> Tuple[AgentReview, Optional[List[Tuple[bytes, int, int]]]]: """ Analyze a proof using all agents in parallel. Args: file_data: Raw bytes of the file to analyze file_type: MIME type of the file on_agent_update: Optional callback for real-time agent updates. Called with (agent_name, None) when agent starts, and (agent_name, review) when agent completes. is_wip: Whether this is a work-in-progress analysis brand: Brand to use for brand guidelines analysis ('Barclays' or 'Barclaycard') previous_analysis: Optional dict containing the previous version's analysis results. When provided, enables revision-aware analysis that identifies resolved/outstanding/new issues. Returns: Tuple of: - Complete AgentReview with all agent results and overall verdict - List of rasterized PDF pages if input was PDF, else None Each page is (png_bytes, width, height) """ previous_version = previous_analysis.get("version") if previous_analysis else None logger.info(f"[ANALYSIS] Starting proof analysis - file_type: {file_type}, file_size: {len(file_data)} bytes, is_wip: {is_wip}, brand: {brand}, previous_version: {previous_version}") reviews: dict[str, SubReview] = {} # Prepare images for analysis pdf_pages: Optional[List[Tuple[bytes, int, int]]] = None images: List[Tuple[bytes, str]] = [] if file_type == "application/pdf": # Rasterize PDF to PNG images logger.info("[ANALYSIS] Detected PDF, rasterizing pages...") try: pdf_pages = await asyncio.to_thread(pdf_service.rasterize, file_data, 10) images = [(png_data, "image/png") for png_data, _, _ in pdf_pages] logger.info(f"[ANALYSIS] Rasterized {len(images)} PDF pages") except ValueError as e: logger.error(f"[ANALYSIS] PDF rasterization failed: {str(e)}") # Return error review if PDF cannot be processed error_review = SubReview( ragStatus="Error", feedback=f"Failed to process PDF: {str(e)}", issues=[] ) return AgentReview( legalAgentReview=error_review, brandAgentReview=error_review, channelBestPracticesAgentReview=error_review, channelTechSpecsAgentReview=error_review, leadAgentSummary=f"Analysis could not proceed due to PDF processing error: {str(e)}", overallStatus="Analysis Error", financialPromotionReason=None, ), None else: # Single image/video - wrap in list images = [(file_data, file_type)] # Run all agents in parallel, passing previous review context to each tasks = [ self._run_agent( agent_name, images, brand, on_agent_update, previous_review=self._extract_previous_review_context(agent_name, previous_analysis), channel=channel, sub_channel=sub_channel, proof_type=proof_type, on_fallback=on_fallback, ) for agent_name in self.AGENT_ORDER ] results = await asyncio.gather(*tasks) reviews = {agent_name: review for agent_name, review in results} # Get lead agent synthesis logger.info("[ANALYSIS] Starting lead agent synthesis") if on_agent_update: await on_agent_update("Summary", None) overall_status, summary, financial_promotion_reason = await self.lead_agent.synthesize( reviews, previous_analysis=previous_analysis, channel=channel, sub_channel=sub_channel, proof_type=proof_type, on_fallback=on_fallback, ) logger.info(f"[ANALYSIS] Analysis complete - overallStatus: {overall_status}") # Build the complete AgentReview return AgentReview( legalAgentReview=reviews["Risk & Control Agent"], brandAgentReview=reviews["Brand Agent"], channelBestPracticesAgentReview=reviews["Channel Best Practices Agent"], channelTechSpecsAgentReview=reviews["Channel Tech Specs Agent"], leadAgentSummary=summary, overallStatus=overall_status, financialPromotionReason=financial_promotion_reason, ), pdf_pages