From f13fa2f7e84f03709af27ebd84c41d4ed3fc60a7 Mon Sep 17 00:00:00 2001 From: michael Date: Sun, 25 Jan 2026 09:13:15 -0600 Subject: [PATCH] Parallelize specialist agent analysis with asyncio.gather Run all 4 specialist agents (Legal, Brand, Channel Best Practices, Channel Tech Specs) concurrently instead of sequentially. This reduces total analysis time to roughly the duration of the slowest agent rather than the sum of all agent times. Co-Authored-By: Claude Opus 4.5 --- backend/app/services/analysis_service.py | 60 ++++++++++++++---------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/backend/app/services/analysis_service.py b/backend/app/services/analysis_service.py index fd8416c..955f982 100755 --- a/backend/app/services/analysis_service.py +++ b/backend/app/services/analysis_service.py @@ -1,3 +1,4 @@ +import asyncio import logging from typing import Callable, Awaitable, List, Tuple, Optional @@ -22,7 +23,7 @@ class AnalysisService: """ Orchestrates the multi-agent proof analysis. - Runs agents sequentially and provides callbacks for real-time updates. + Runs agents in parallel and provides callbacks for real-time updates. """ # Agent execution order @@ -52,6 +53,31 @@ class AnalysisService: } self.lead_agent = LeadAgent(gemini_service) + async def _run_agent( + self, + agent_name: str, + images: List[Tuple[bytes, str]], + brand: str, + on_agent_update: AgentCallback | None, + ) -> Tuple[str, SubReview]: + """Run a single agent with callback notifications.""" + agent = self.agents[agent_name] + + logger.info(f"[ANALYSIS] Starting agent: {agent_name}") + if on_agent_update: + await on_agent_update(agent_name, None) + + if agent_name == "Brand Agent": + review = await agent.analyze(images, brand=brand) + else: + review = await agent.analyze(images) + + logger.info(f"[ANALYSIS] Agent completed: {agent_name} - ragStatus: {review.ragStatus}") + if on_agent_update: + await on_agent_update(agent_name, review) + + return (agent_name, review) + async def analyze_proof( self, file_data: bytes, @@ -61,7 +87,7 @@ class AnalysisService: brand: str = "Barclaycard", ) -> Tuple[AgentReview, Optional[List[Tuple[bytes, int, int]]]]: """ - Analyze a proof using all agents sequentially. + Analyze a proof using all agents in parallel. Args: file_data: Raw bytes of the file to analyze @@ -113,29 +139,13 @@ class AnalysisService: # Single image/video - wrap in list images = [(file_data, file_type)] - # Run each agent sequentially - for agent_name in self.AGENT_ORDER: - agent = self.agents[agent_name] - - logger.info(f"[ANALYSIS] Starting agent: {agent_name}") - - # Notify that agent is starting - if on_agent_update: - await on_agent_update(agent_name, None) - - # Run the agent with images list - # Pass brand to Brand Agent for selecting appropriate guidelines - if agent_name == "Brand Agent": - review = await agent.analyze(images, brand=brand) - else: - review = await agent.analyze(images) - reviews[agent_name] = review - - logger.info(f"[ANALYSIS] Agent completed: {agent_name} - ragStatus: {review.ragStatus}") - - # Notify that agent completed - if on_agent_update: - await on_agent_update(agent_name, review) + # Run all agents in parallel + tasks = [ + self._run_agent(agent_name, images, brand, on_agent_update) + for agent_name in self.AGENT_ORDER + ] + results = await asyncio.gather(*tasks) + reviews = {agent_name: review for agent_name, review in results} # Get lead agent synthesis logger.info("[ANALYSIS] Starting lead agent synthesis")