diff --git a/backend/app/services/analysis_service.py b/backend/app/services/analysis_service.py index fd8416c..955f982 100755 --- a/backend/app/services/analysis_service.py +++ b/backend/app/services/analysis_service.py @@ -1,3 +1,4 @@ +import asyncio import logging from typing import Callable, Awaitable, List, Tuple, Optional @@ -22,7 +23,7 @@ class AnalysisService: """ Orchestrates the multi-agent proof analysis. - Runs agents sequentially and provides callbacks for real-time updates. + Runs agents in parallel and provides callbacks for real-time updates. """ # Agent execution order @@ -52,6 +53,31 @@ class AnalysisService: } self.lead_agent = LeadAgent(gemini_service) + async def _run_agent( + self, + agent_name: str, + images: List[Tuple[bytes, str]], + brand: str, + on_agent_update: AgentCallback | None, + ) -> Tuple[str, SubReview]: + """Run a single agent with callback notifications.""" + agent = self.agents[agent_name] + + logger.info(f"[ANALYSIS] Starting agent: {agent_name}") + if on_agent_update: + await on_agent_update(agent_name, None) + + if agent_name == "Brand Agent": + review = await agent.analyze(images, brand=brand) + else: + review = await agent.analyze(images) + + logger.info(f"[ANALYSIS] Agent completed: {agent_name} - ragStatus: {review.ragStatus}") + if on_agent_update: + await on_agent_update(agent_name, review) + + return (agent_name, review) + async def analyze_proof( self, file_data: bytes, @@ -61,7 +87,7 @@ class AnalysisService: brand: str = "Barclaycard", ) -> Tuple[AgentReview, Optional[List[Tuple[bytes, int, int]]]]: """ - Analyze a proof using all agents sequentially. + Analyze a proof using all agents in parallel. Args: file_data: Raw bytes of the file to analyze @@ -113,29 +139,13 @@ class AnalysisService: # Single image/video - wrap in list images = [(file_data, file_type)] - # Run each agent sequentially - for agent_name in self.AGENT_ORDER: - agent = self.agents[agent_name] - - logger.info(f"[ANALYSIS] Starting agent: {agent_name}") - - # Notify that agent is starting - if on_agent_update: - await on_agent_update(agent_name, None) - - # Run the agent with images list - # Pass brand to Brand Agent for selecting appropriate guidelines - if agent_name == "Brand Agent": - review = await agent.analyze(images, brand=brand) - else: - review = await agent.analyze(images) - reviews[agent_name] = review - - logger.info(f"[ANALYSIS] Agent completed: {agent_name} - ragStatus: {review.ragStatus}") - - # Notify that agent completed - if on_agent_update: - await on_agent_update(agent_name, review) + # Run all agents in parallel + tasks = [ + self._run_agent(agent_name, images, brand, on_agent_update) + for agent_name in self.AGENT_ORDER + ] + results = await asyncio.gather(*tasks) + reviews = {agent_name: review for agent_name, review in results} # Get lead agent synthesis logger.info("[ANALYSIS] Starting lead agent synthesis")