modcomms/backend/app/services/analysis_service.py
Vadym Samoilenko aeab7d3b18 Rename Legal Agent to Risk & Control Agent across frontend and backend
Updates all display labels (PDF report, campaign page, Knowledge Base card, analytics, status dashboard, checks overview) and aligns internal agent name in backend. Adds migration 010 to update the knowledge base display_name in production DB.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 15:10:32 +01:00

240 lines
9.8 KiB
Python
Executable file

import asyncio
import logging
from typing import Callable, Awaitable, List, Tuple, Optional
from app.models.schemas import PreviousReviewContext, RagStatus, SubReview, AgentReview, OverallStatus
logger = logging.getLogger(__name__)
from app.agents.brand_agent import BrandAgent
from app.agents.channel_best_practices_agent import ChannelBestPracticesAgent
from app.agents.channel_tech_specs_agent import ChannelTechSpecsAgent
from app.agents.legal_agent import LegalAgent
from app.agents.lead_agent import LeadAgent
from app.services.gemini_service import GeminiService
from app.services.reference_docs import ReferenceDocsService
from app.services.pdf_service import pdf_service
# Type alias for the callback function
AgentCallback = Callable[[str, SubReview | None], Awaitable[None]]
class AnalysisService:
"""
Orchestrates the multi-agent proof analysis.
Runs agents in parallel and provides callbacks for real-time updates.
"""
# Agent execution order
AGENT_ORDER = ["Risk & Control Agent", "Brand Agent", "Channel Best Practices Agent", "Channel Tech Specs Agent"]
# Mapping from agent name to the key in AgentReview/previous_analysis dict
AGENT_REVIEW_KEY_MAP = {
"Risk & Control Agent": "legalAgentReview",
"Brand Agent": "brandAgentReview",
"Channel Best Practices Agent": "channelBestPracticesAgentReview",
"Channel Tech Specs Agent": "channelTechSpecsAgentReview",
}
def __init__(
self,
gemini_service: GeminiService,
reference_docs: ReferenceDocsService,
):
"""
Initialize the analysis service with all required agents.
Args:
gemini_service: Service for Gemini API calls
reference_docs: Service for loading reference documents
"""
self.gemini_service = gemini_service
self.reference_docs = reference_docs
# Initialize agents
self.agents = {
"Risk & Control Agent": LegalAgent(gemini_service, reference_docs),
"Brand Agent": BrandAgent(gemini_service, reference_docs),
"Channel Best Practices Agent": ChannelBestPracticesAgent(gemini_service, reference_docs),
"Channel Tech Specs Agent": ChannelTechSpecsAgent(gemini_service, reference_docs),
}
self.lead_agent = LeadAgent(gemini_service)
def _extract_previous_review_context(
self,
agent_name: str,
previous_analysis: Optional[dict],
) -> Optional[PreviousReviewContext]:
"""
Extract the previous review context for a specific agent.
Args:
agent_name: Name of the agent
previous_analysis: Full previous analysis dict (or None)
Returns:
PreviousReviewContext for the agent, or None if not available
"""
if not previous_analysis:
return None
review_key = self.AGENT_REVIEW_KEY_MAP.get(agent_name)
if not review_key:
return None
agent_review = previous_analysis.get(review_key)
if not agent_review:
return None
version = previous_analysis.get("version", 0)
if version == 0:
return None
return PreviousReviewContext(
version=version,
ragStatus=RagStatus(agent_review.get("ragStatus", "Error")),
feedback=agent_review.get("feedback", ""),
issues=agent_review.get("issues", []),
)
async def _run_agent(
self,
agent_name: str,
images: List[Tuple[bytes, str]],
brand: str,
on_agent_update: AgentCallback | None,
previous_review: Optional[PreviousReviewContext] = None,
channel: Optional[str] = None,
sub_channel: Optional[str] = None,
proof_type: Optional[str] = None,
on_fallback=None,
) -> Tuple[str, SubReview]:
"""Run a single agent with callback notifications."""
agent = self.agents[agent_name]
logger.info(f"[ANALYSIS] Starting agent: {agent_name}, has_previous_review: {previous_review is not None}")
if on_agent_update:
await on_agent_update(agent_name, None)
if agent_name == "Brand Agent":
review = await agent.analyze(images, previous_review=previous_review, brand=brand, channel=channel, sub_channel=sub_channel, proof_type=proof_type, on_fallback=on_fallback)
else:
review = await agent.analyze(images, previous_review=previous_review, channel=channel, sub_channel=sub_channel, proof_type=proof_type, on_fallback=on_fallback)
logger.info(f"[ANALYSIS] Agent completed: {agent_name} - ragStatus: {review.ragStatus}")
if on_agent_update:
await on_agent_update(agent_name, review)
return (agent_name, review)
async def analyze_proof(
self,
file_data: bytes,
file_type: str,
on_agent_update: AgentCallback | None = None,
is_wip: bool = False,
brand: str = "Barclaycard",
previous_analysis: Optional[dict] = None,
channel: Optional[str] = None,
sub_channel: Optional[str] = None,
proof_type: Optional[str] = None,
on_fallback=None,
) -> Tuple[AgentReview, Optional[List[Tuple[bytes, int, int]]]]:
"""
Analyze a proof using all agents in parallel.
Args:
file_data: Raw bytes of the file to analyze
file_type: MIME type of the file
on_agent_update: Optional callback for real-time agent updates.
Called with (agent_name, None) when agent starts,
and (agent_name, review) when agent completes.
is_wip: Whether this is a work-in-progress analysis
brand: Brand to use for brand guidelines analysis ('Barclays' or 'Barclaycard')
previous_analysis: Optional dict containing the previous version's analysis
results. When provided, enables revision-aware analysis
that identifies resolved/outstanding/new issues.
Returns:
Tuple of:
- Complete AgentReview with all agent results and overall verdict
- List of rasterized PDF pages if input was PDF, else None
Each page is (png_bytes, width, height)
"""
previous_version = previous_analysis.get("version") if previous_analysis else None
logger.info(f"[ANALYSIS] Starting proof analysis - file_type: {file_type}, file_size: {len(file_data)} bytes, is_wip: {is_wip}, brand: {brand}, previous_version: {previous_version}")
reviews: dict[str, SubReview] = {}
# Prepare images for analysis
pdf_pages: Optional[List[Tuple[bytes, int, int]]] = None
images: List[Tuple[bytes, str]] = []
if file_type == "application/pdf":
# Rasterize PDF to PNG images
logger.info("[ANALYSIS] Detected PDF, rasterizing pages...")
try:
pdf_pages = await asyncio.to_thread(pdf_service.rasterize, file_data, 10)
images = [(png_data, "image/png") for png_data, _, _ in pdf_pages]
logger.info(f"[ANALYSIS] Rasterized {len(images)} PDF pages")
except ValueError as e:
logger.error(f"[ANALYSIS] PDF rasterization failed: {str(e)}")
# Return error review if PDF cannot be processed
error_review = SubReview(
ragStatus="Error",
feedback=f"Failed to process PDF: {str(e)}",
issues=[]
)
return AgentReview(
legalAgentReview=error_review,
brandAgentReview=error_review,
channelBestPracticesAgentReview=error_review,
channelTechSpecsAgentReview=error_review,
leadAgentSummary=f"Analysis could not proceed due to PDF processing error: {str(e)}",
overallStatus="Analysis Error",
financialPromotionReason=None,
), None
else:
# Single image/video - wrap in list
images = [(file_data, file_type)]
# Run all agents in parallel, passing previous review context to each
tasks = [
self._run_agent(
agent_name,
images,
brand,
on_agent_update,
previous_review=self._extract_previous_review_context(agent_name, previous_analysis),
channel=channel,
sub_channel=sub_channel,
proof_type=proof_type,
on_fallback=on_fallback,
)
for agent_name in self.AGENT_ORDER
]
results = await asyncio.gather(*tasks)
reviews = {agent_name: review for agent_name, review in results}
# Get lead agent synthesis
logger.info("[ANALYSIS] Starting lead agent synthesis")
if on_agent_update:
await on_agent_update("Summary", None)
overall_status, summary, financial_promotion_reason = await self.lead_agent.synthesize(
reviews, previous_analysis=previous_analysis,
channel=channel, sub_channel=sub_channel, proof_type=proof_type,
on_fallback=on_fallback,
)
logger.info(f"[ANALYSIS] Analysis complete - overallStatus: {overall_status}")
# Build the complete AgentReview
return AgentReview(
legalAgentReview=reviews["Risk & Control Agent"],
brandAgentReview=reviews["Brand Agent"],
channelBestPracticesAgentReview=reviews["Channel Best Practices Agent"],
channelTechSpecsAgentReview=reviews["Channel Tech Specs Agent"],
leadAgentSummary=summary,
overallStatus=overall_status,
financialPromotionReason=financial_promotion_reason,
), pdf_pages