modcomms/backend/app/services/analysis_service.py
2025-12-18 16:51:27 +00:00

115 lines
4 KiB
Python
Executable file

import logging
from typing import Callable, Awaitable
from app.models.schemas import SubReview, AgentReview, OverallStatus
logger = logging.getLogger(__name__)
from app.agents.brand_agent import BrandAgent
from app.agents.channel_agent import ChannelAgent
from app.agents.legal_agent import LegalAgent
from app.agents.tone_agent import ToneAgent
from app.agents.lead_agent import LeadAgent
from app.services.gemini_service import GeminiService
from app.services.reference_docs import ReferenceDocsService
# Type alias for the callback function
AgentCallback = Callable[[str, SubReview | None], Awaitable[None]]
class AnalysisService:
"""
Orchestrates the multi-agent proof analysis.
Runs agents sequentially and provides callbacks for real-time updates.
"""
# Agent execution order
AGENT_ORDER = ["Legal Agent", "Brand Agent", "Tone Agent", "Channel Agent"]
def __init__(
self,
gemini_service: GeminiService,
reference_docs: ReferenceDocsService,
):
"""
Initialize the analysis service with all required agents.
Args:
gemini_service: Service for Gemini API calls
reference_docs: Service for loading reference documents
"""
self.gemini_service = gemini_service
self.reference_docs = reference_docs
# Initialize agents
self.agents = {
"Legal Agent": LegalAgent(),
"Brand Agent": BrandAgent(gemini_service, reference_docs),
"Tone Agent": ToneAgent(),
"Channel Agent": ChannelAgent(gemini_service, reference_docs),
}
self.lead_agent = LeadAgent(gemini_service)
async def analyze_proof(
self,
file_data: bytes,
file_type: str,
on_agent_update: AgentCallback | None = None,
is_wip: bool = False,
) -> AgentReview:
"""
Analyze a proof using all agents sequentially.
Args:
file_data: Raw bytes of the file to analyze
file_type: MIME type of the file
on_agent_update: Optional callback for real-time agent updates.
Called with (agent_name, None) when agent starts,
and (agent_name, review) when agent completes.
is_wip: Whether this is a work-in-progress analysis
Returns:
Complete AgentReview with all agent results and overall verdict
"""
logger.info(f"[ANALYSIS] Starting proof analysis - file_type: {file_type}, file_size: {len(file_data)} bytes, is_wip: {is_wip}")
reviews: dict[str, SubReview] = {}
# Run each agent sequentially
for agent_name in self.AGENT_ORDER:
agent = self.agents[agent_name]
logger.info(f"[ANALYSIS] Starting agent: {agent_name}")
# Notify that agent is starting
if on_agent_update:
await on_agent_update(agent_name, None)
# Run the agent
review = await agent.analyze(file_data, file_type)
reviews[agent_name] = review
logger.info(f"[ANALYSIS] Agent completed: {agent_name} - ragStatus: {review.ragStatus}")
# Notify that agent completed
if on_agent_update:
await on_agent_update(agent_name, review)
# Get lead agent synthesis
logger.info("[ANALYSIS] Starting lead agent synthesis")
if on_agent_update:
await on_agent_update("Summary", None)
overall_status, summary, financial_promotion_reason = await self.lead_agent.synthesize(reviews)
logger.info(f"[ANALYSIS] Analysis complete - overallStatus: {overall_status}")
# Build the complete AgentReview
return AgentReview(
legalAgentReview=reviews["Legal Agent"],
brandAgentReview=reviews["Brand Agent"],
toneAgentReview=reviews["Tone Agent"],
channelAgentReview=reviews["Channel Agent"],
leadAgentSummary=summary,
overallStatus=overall_status,
financialPromotionReason=financial_promotion_reason,
)