Updates all display labels (PDF report, campaign page, Knowledge Base card, analytics, status dashboard, checks overview) and aligns internal agent name in backend. Adds migration 010 to update the knowledge base display_name in production DB. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
240 lines
9.8 KiB
Python
Executable file
240 lines
9.8 KiB
Python
Executable file
import asyncio
|
|
import logging
|
|
from typing import Callable, Awaitable, List, Tuple, Optional
|
|
|
|
from app.models.schemas import PreviousReviewContext, RagStatus, SubReview, AgentReview, OverallStatus
|
|
|
|
logger = logging.getLogger(__name__)
|
|
from app.agents.brand_agent import BrandAgent
|
|
from app.agents.channel_best_practices_agent import ChannelBestPracticesAgent
|
|
from app.agents.channel_tech_specs_agent import ChannelTechSpecsAgent
|
|
from app.agents.legal_agent import LegalAgent
|
|
from app.agents.lead_agent import LeadAgent
|
|
from app.services.gemini_service import GeminiService
|
|
from app.services.reference_docs import ReferenceDocsService
|
|
from app.services.pdf_service import pdf_service
|
|
|
|
|
|
# Type alias for the callback function
|
|
AgentCallback = Callable[[str, SubReview | None], Awaitable[None]]
|
|
|
|
|
|
class AnalysisService:
|
|
"""
|
|
Orchestrates the multi-agent proof analysis.
|
|
|
|
Runs agents in parallel and provides callbacks for real-time updates.
|
|
"""
|
|
|
|
# Agent execution order
|
|
AGENT_ORDER = ["Risk & Control Agent", "Brand Agent", "Channel Best Practices Agent", "Channel Tech Specs Agent"]
|
|
|
|
# Mapping from agent name to the key in AgentReview/previous_analysis dict
|
|
AGENT_REVIEW_KEY_MAP = {
|
|
"Risk & Control Agent": "legalAgentReview",
|
|
"Brand Agent": "brandAgentReview",
|
|
"Channel Best Practices Agent": "channelBestPracticesAgentReview",
|
|
"Channel Tech Specs Agent": "channelTechSpecsAgentReview",
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
gemini_service: GeminiService,
|
|
reference_docs: ReferenceDocsService,
|
|
):
|
|
"""
|
|
Initialize the analysis service with all required agents.
|
|
|
|
Args:
|
|
gemini_service: Service for Gemini API calls
|
|
reference_docs: Service for loading reference documents
|
|
"""
|
|
self.gemini_service = gemini_service
|
|
self.reference_docs = reference_docs
|
|
|
|
# Initialize agents
|
|
self.agents = {
|
|
"Risk & Control Agent": LegalAgent(gemini_service, reference_docs),
|
|
"Brand Agent": BrandAgent(gemini_service, reference_docs),
|
|
"Channel Best Practices Agent": ChannelBestPracticesAgent(gemini_service, reference_docs),
|
|
"Channel Tech Specs Agent": ChannelTechSpecsAgent(gemini_service, reference_docs),
|
|
}
|
|
self.lead_agent = LeadAgent(gemini_service)
|
|
|
|
def _extract_previous_review_context(
|
|
self,
|
|
agent_name: str,
|
|
previous_analysis: Optional[dict],
|
|
) -> Optional[PreviousReviewContext]:
|
|
"""
|
|
Extract the previous review context for a specific agent.
|
|
|
|
Args:
|
|
agent_name: Name of the agent
|
|
previous_analysis: Full previous analysis dict (or None)
|
|
|
|
Returns:
|
|
PreviousReviewContext for the agent, or None if not available
|
|
"""
|
|
if not previous_analysis:
|
|
return None
|
|
|
|
review_key = self.AGENT_REVIEW_KEY_MAP.get(agent_name)
|
|
if not review_key:
|
|
return None
|
|
|
|
agent_review = previous_analysis.get(review_key)
|
|
if not agent_review:
|
|
return None
|
|
|
|
version = previous_analysis.get("version", 0)
|
|
if version == 0:
|
|
return None
|
|
|
|
return PreviousReviewContext(
|
|
version=version,
|
|
ragStatus=RagStatus(agent_review.get("ragStatus", "Error")),
|
|
feedback=agent_review.get("feedback", ""),
|
|
issues=agent_review.get("issues", []),
|
|
)
|
|
|
|
async def _run_agent(
|
|
self,
|
|
agent_name: str,
|
|
images: List[Tuple[bytes, str]],
|
|
brand: str,
|
|
on_agent_update: AgentCallback | None,
|
|
previous_review: Optional[PreviousReviewContext] = None,
|
|
channel: Optional[str] = None,
|
|
sub_channel: Optional[str] = None,
|
|
proof_type: Optional[str] = None,
|
|
on_fallback=None,
|
|
) -> Tuple[str, SubReview]:
|
|
"""Run a single agent with callback notifications."""
|
|
agent = self.agents[agent_name]
|
|
|
|
logger.info(f"[ANALYSIS] Starting agent: {agent_name}, has_previous_review: {previous_review is not None}")
|
|
if on_agent_update:
|
|
await on_agent_update(agent_name, None)
|
|
|
|
if agent_name == "Brand Agent":
|
|
review = await agent.analyze(images, previous_review=previous_review, brand=brand, channel=channel, sub_channel=sub_channel, proof_type=proof_type, on_fallback=on_fallback)
|
|
else:
|
|
review = await agent.analyze(images, previous_review=previous_review, channel=channel, sub_channel=sub_channel, proof_type=proof_type, on_fallback=on_fallback)
|
|
|
|
logger.info(f"[ANALYSIS] Agent completed: {agent_name} - ragStatus: {review.ragStatus}")
|
|
if on_agent_update:
|
|
await on_agent_update(agent_name, review)
|
|
|
|
return (agent_name, review)
|
|
|
|
async def analyze_proof(
|
|
self,
|
|
file_data: bytes,
|
|
file_type: str,
|
|
on_agent_update: AgentCallback | None = None,
|
|
is_wip: bool = False,
|
|
brand: str = "Barclaycard",
|
|
previous_analysis: Optional[dict] = None,
|
|
channel: Optional[str] = None,
|
|
sub_channel: Optional[str] = None,
|
|
proof_type: Optional[str] = None,
|
|
on_fallback=None,
|
|
) -> Tuple[AgentReview, Optional[List[Tuple[bytes, int, int]]]]:
|
|
"""
|
|
Analyze a proof using all agents in parallel.
|
|
|
|
Args:
|
|
file_data: Raw bytes of the file to analyze
|
|
file_type: MIME type of the file
|
|
on_agent_update: Optional callback for real-time agent updates.
|
|
Called with (agent_name, None) when agent starts,
|
|
and (agent_name, review) when agent completes.
|
|
is_wip: Whether this is a work-in-progress analysis
|
|
brand: Brand to use for brand guidelines analysis ('Barclays' or 'Barclaycard')
|
|
previous_analysis: Optional dict containing the previous version's analysis
|
|
results. When provided, enables revision-aware analysis
|
|
that identifies resolved/outstanding/new issues.
|
|
|
|
Returns:
|
|
Tuple of:
|
|
- Complete AgentReview with all agent results and overall verdict
|
|
- List of rasterized PDF pages if input was PDF, else None
|
|
Each page is (png_bytes, width, height)
|
|
"""
|
|
previous_version = previous_analysis.get("version") if previous_analysis else None
|
|
logger.info(f"[ANALYSIS] Starting proof analysis - file_type: {file_type}, file_size: {len(file_data)} bytes, is_wip: {is_wip}, brand: {brand}, previous_version: {previous_version}")
|
|
reviews: dict[str, SubReview] = {}
|
|
|
|
# Prepare images for analysis
|
|
pdf_pages: Optional[List[Tuple[bytes, int, int]]] = None
|
|
images: List[Tuple[bytes, str]] = []
|
|
|
|
if file_type == "application/pdf":
|
|
# Rasterize PDF to PNG images
|
|
logger.info("[ANALYSIS] Detected PDF, rasterizing pages...")
|
|
try:
|
|
pdf_pages = await asyncio.to_thread(pdf_service.rasterize, file_data, 10)
|
|
images = [(png_data, "image/png") for png_data, _, _ in pdf_pages]
|
|
logger.info(f"[ANALYSIS] Rasterized {len(images)} PDF pages")
|
|
except ValueError as e:
|
|
logger.error(f"[ANALYSIS] PDF rasterization failed: {str(e)}")
|
|
# Return error review if PDF cannot be processed
|
|
error_review = SubReview(
|
|
ragStatus="Error",
|
|
feedback=f"Failed to process PDF: {str(e)}",
|
|
issues=[]
|
|
)
|
|
return AgentReview(
|
|
legalAgentReview=error_review,
|
|
brandAgentReview=error_review,
|
|
channelBestPracticesAgentReview=error_review,
|
|
channelTechSpecsAgentReview=error_review,
|
|
leadAgentSummary=f"Analysis could not proceed due to PDF processing error: {str(e)}",
|
|
overallStatus="Analysis Error",
|
|
financialPromotionReason=None,
|
|
), None
|
|
else:
|
|
# Single image/video - wrap in list
|
|
images = [(file_data, file_type)]
|
|
|
|
# Run all agents in parallel, passing previous review context to each
|
|
tasks = [
|
|
self._run_agent(
|
|
agent_name,
|
|
images,
|
|
brand,
|
|
on_agent_update,
|
|
previous_review=self._extract_previous_review_context(agent_name, previous_analysis),
|
|
channel=channel,
|
|
sub_channel=sub_channel,
|
|
proof_type=proof_type,
|
|
on_fallback=on_fallback,
|
|
)
|
|
for agent_name in self.AGENT_ORDER
|
|
]
|
|
results = await asyncio.gather(*tasks)
|
|
reviews = {agent_name: review for agent_name, review in results}
|
|
|
|
# Get lead agent synthesis
|
|
logger.info("[ANALYSIS] Starting lead agent synthesis")
|
|
if on_agent_update:
|
|
await on_agent_update("Summary", None)
|
|
|
|
overall_status, summary, financial_promotion_reason = await self.lead_agent.synthesize(
|
|
reviews, previous_analysis=previous_analysis,
|
|
channel=channel, sub_channel=sub_channel, proof_type=proof_type,
|
|
on_fallback=on_fallback,
|
|
)
|
|
logger.info(f"[ANALYSIS] Analysis complete - overallStatus: {overall_status}")
|
|
|
|
# Build the complete AgentReview
|
|
return AgentReview(
|
|
legalAgentReview=reviews["Risk & Control Agent"],
|
|
brandAgentReview=reviews["Brand Agent"],
|
|
channelBestPracticesAgentReview=reviews["Channel Best Practices Agent"],
|
|
channelTechSpecsAgentReview=reviews["Channel Tech Specs Agent"],
|
|
leadAgentSummary=summary,
|
|
overallStatus=overall_status,
|
|
financialPromotionReason=financial_promotion_reason,
|
|
), pdf_pages
|