import logging from typing import Callable, Awaitable, List, Tuple, Optional from app.models.schemas import SubReview, AgentReview, OverallStatus logger = logging.getLogger(__name__) from app.agents.brand_agent import BrandAgent from app.agents.channel_best_practices_agent import ChannelBestPracticesAgent from app.agents.channel_tech_specs_agent import ChannelTechSpecsAgent from app.agents.legal_agent import LegalAgent from app.agents.lead_agent import LeadAgent from app.services.gemini_service import GeminiService from app.services.reference_docs import ReferenceDocsService from app.services.pdf_service import pdf_service # Type alias for the callback function AgentCallback = Callable[[str, SubReview | None], Awaitable[None]] class AnalysisService: """ Orchestrates the multi-agent proof analysis. Runs agents sequentially and provides callbacks for real-time updates. """ # Agent execution order AGENT_ORDER = ["Legal Agent", "Brand Agent", "Channel Best Practices Agent", "Channel Tech Specs Agent"] def __init__( self, gemini_service: GeminiService, reference_docs: ReferenceDocsService, ): """ Initialize the analysis service with all required agents. Args: gemini_service: Service for Gemini API calls reference_docs: Service for loading reference documents """ self.gemini_service = gemini_service self.reference_docs = reference_docs # Initialize agents self.agents = { "Legal Agent": LegalAgent(gemini_service, reference_docs), "Brand Agent": BrandAgent(gemini_service, reference_docs), "Channel Best Practices Agent": ChannelBestPracticesAgent(gemini_service, reference_docs), "Channel Tech Specs Agent": ChannelTechSpecsAgent(gemini_service, reference_docs), } self.lead_agent = LeadAgent(gemini_service) async def analyze_proof( self, file_data: bytes, file_type: str, on_agent_update: AgentCallback | None = None, is_wip: bool = False, brand: str = "Barclaycard", ) -> Tuple[AgentReview, Optional[List[Tuple[bytes, int, int]]]]: """ Analyze a proof using all agents sequentially. Args: file_data: Raw bytes of the file to analyze file_type: MIME type of the file on_agent_update: Optional callback for real-time agent updates. Called with (agent_name, None) when agent starts, and (agent_name, review) when agent completes. is_wip: Whether this is a work-in-progress analysis brand: Brand to use for brand guidelines analysis ('Barclays' or 'Barclaycard') Returns: Tuple of: - Complete AgentReview with all agent results and overall verdict - List of rasterized PDF pages if input was PDF, else None Each page is (png_bytes, width, height) """ logger.info(f"[ANALYSIS] Starting proof analysis - file_type: {file_type}, file_size: {len(file_data)} bytes, is_wip: {is_wip}, brand: {brand}") reviews: dict[str, SubReview] = {} # Prepare images for analysis pdf_pages: Optional[List[Tuple[bytes, int, int]]] = None images: List[Tuple[bytes, str]] = [] if file_type == "application/pdf": # Rasterize PDF to PNG images logger.info("[ANALYSIS] Detected PDF, rasterizing pages...") try: pdf_pages = pdf_service.rasterize(file_data, max_pages=10) images = [(png_data, "image/png") for png_data, _, _ in pdf_pages] logger.info(f"[ANALYSIS] Rasterized {len(images)} PDF pages") except ValueError as e: logger.error(f"[ANALYSIS] PDF rasterization failed: {str(e)}") # Return error review if PDF cannot be processed error_review = SubReview( ragStatus="Error", feedback=f"Failed to process PDF: {str(e)}", issues=[] ) return AgentReview( legalAgentReview=error_review, brandAgentReview=error_review, channelBestPracticesAgentReview=error_review, channelTechSpecsAgentReview=error_review, leadAgentSummary=f"Analysis could not proceed due to PDF processing error: {str(e)}", overallStatus="Analysis Error", financialPromotionReason=None, ), None else: # Single image/video - wrap in list images = [(file_data, file_type)] # Run each agent sequentially for agent_name in self.AGENT_ORDER: agent = self.agents[agent_name] logger.info(f"[ANALYSIS] Starting agent: {agent_name}") # Notify that agent is starting if on_agent_update: await on_agent_update(agent_name, None) # Run the agent with images list # Pass brand to Brand Agent for selecting appropriate guidelines if agent_name == "Brand Agent": review = await agent.analyze(images, brand=brand) else: review = await agent.analyze(images) reviews[agent_name] = review logger.info(f"[ANALYSIS] Agent completed: {agent_name} - ragStatus: {review.ragStatus}") # Notify that agent completed if on_agent_update: await on_agent_update(agent_name, review) # Get lead agent synthesis logger.info("[ANALYSIS] Starting lead agent synthesis") if on_agent_update: await on_agent_update("Summary", None) overall_status, summary, financial_promotion_reason = await self.lead_agent.synthesize(reviews) logger.info(f"[ANALYSIS] Analysis complete - overallStatus: {overall_status}") # Build the complete AgentReview return AgentReview( legalAgentReview=reviews["Legal Agent"], brandAgentReview=reviews["Brand Agent"], channelBestPracticesAgentReview=reviews["Channel Best Practices Agent"], channelTechSpecsAgentReview=reviews["Channel Tech Specs Agent"], leadAgentSummary=summary, overallStatus=overall_status, financialPromotionReason=financial_promotion_reason, ), pdf_pages