modcomms/backend/app/websocket/handlers.py
michael a957cf0276 Pass proof metadata (channel, sub-channel, proof type) to AI agents during analysis
Previously, proof metadata collected during upload was only used for database
persistence. Now it flows through the entire analysis pipeline so agents can
tailor their feedback to the specific channel and format being reviewed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 11:30:38 -06:00

281 lines
13 KiB
Python
Executable file

import base64
import logging
import uuid
from typing import Optional
from fastapi import WebSocket
from app.websocket.manager import ConnectionManager
from app.services.analysis_service import AnalysisService
from app.models.schemas import SubReview
from app.models.database import async_session_factory
from app.repositories import ProofRepository, CampaignRepository, UserRepository
from app.services.storage_service import storage_service
logger = logging.getLogger(__name__)
async def handle_analyze_message(
websocket: WebSocket,
client_id: str,
data: dict,
manager: ConnectionManager,
analysis_service: AnalysisService,
) -> None:
"""
Handle an 'analyze' message from the client.
Runs the proof analysis and sends real-time updates via WebSocket.
Args:
websocket: The WebSocket connection
client_id: Unique client identifier
data: The message data containing file_data, file_type, is_wip
manager: Connection manager for sending messages
analysis_service: Service to run the analysis
"""
try:
logger.info(f"[WEBSOCKET] Received analyze request from client: {client_id}")
# Extract and decode the file data
file_data_b64 = data.get("file_data", "")
file_type = data.get("file_type", "image/png")
is_wip = data.get("is_wip", False)
logger.info(f"[WEBSOCKET] File type: {file_type}, is_wip: {is_wip}, base64 length: {len(file_data_b64)}")
# Decode base64 file data
try:
file_data = base64.b64decode(file_data_b64)
logger.info(f"[WEBSOCKET] Decoded file size: {len(file_data)} bytes")
except Exception as e:
logger.error(f"[WEBSOCKET] Failed to decode file data: {str(e)}")
await manager.send_message(client_id, {
"type": "error",
"message": f"Failed to decode file data: {str(e)}"
})
return
# Compute file hash for duplicate detection
file_hash = storage_service.get_checksum(file_data)
logger.info(f"[WEBSOCKET] Computed file hash: {file_hash}")
# Create callback for real-time updates
async def on_agent_update(agent_name: str, review: SubReview | None) -> None:
if not manager.is_connected(client_id):
logger.warning(f"[WEBSOCKET] Client {client_id} disconnected, skipping update")
return
if review is None:
# Agent is starting
logger.info(f"[WEBSOCKET] Sending agent_started: {agent_name}")
await manager.send_message(client_id, {
"type": "agent_started",
"agent_name": agent_name
})
else:
# Agent completed
logger.info(f"[WEBSOCKET] Sending agent_completed: {agent_name} - ragStatus: {review.ragStatus}")
await manager.send_message(client_id, {
"type": "agent_completed",
"agent_name": agent_name,
"review": {
"ragStatus": review.ragStatus,
"feedback": review.feedback,
"issues": review.issues,
"isFinancialPromotion": review.isFinancialPromotion,
"financialPromotionReason": review.financialPromotionReason,
"resolvedIssues": review.resolvedIssues,
"outstandingIssues": review.outstandingIssues,
"newIssues": review.newIssues,
}
})
# Extract brand selection for brand agent
brand = data.get("brand", "Barclaycard") # Default to Barclaycard if not specified
logger.info(f"[WEBSOCKET] Brand selection: {brand}")
# Fetch previous analysis if this is a revision
previous_analysis = None
previous_file_hash = None
is_identical_file = False
campaign_id = data.get("campaign_id")
proof_name = data.get("proof_name")
if campaign_id and proof_name:
try:
logger.info(f"[WEBSOCKET] Checking for previous analysis - campaign: {campaign_id}, proof: {proof_name}")
async with async_session_factory() as session:
proof_repo = ProofRepository(session)
existing_proof = await proof_repo.get_by_campaign_and_name(
uuid.UUID(campaign_id), proof_name
)
if existing_proof:
previous_analysis = await proof_repo.get_latest_version_review(existing_proof.id)
previous_file_hash = await proof_repo.get_latest_version_hash(existing_proof.id)
if previous_analysis:
logger.info(f"[WEBSOCKET] Found previous analysis version {previous_analysis.get('version')}")
else:
logger.info("[WEBSOCKET] No previous analysis found (new proof)")
# Check if file is identical to previous version
if previous_file_hash and previous_file_hash == file_hash:
is_identical_file = True
logger.info(f"[WEBSOCKET] Identical file detected - hash matches previous version: {file_hash}")
else:
logger.info("[WEBSOCKET] No existing proof found (new proof)")
except Exception as e:
logger.warning(f"[WEBSOCKET] Failed to fetch previous analysis: {str(e)}")
# Continue without previous analysis - still run the current analysis
# Extract proof metadata for agent context
channel = data.get("channel")
sub_channel = data.get("sub_channel")
proof_type = data.get("proof_type")
# Run the analysis
logger.info("[WEBSOCKET] Starting analysis...")
result, pdf_pages = await analysis_service.analyze_proof(
file_data=file_data,
file_type=file_type,
on_agent_update=on_agent_update,
is_wip=is_wip,
brand=brand,
previous_analysis=previous_analysis,
channel=channel,
sub_channel=sub_channel,
proof_type=proof_type,
)
# Build the result dict
result_dict = {
"legalAgentReview": {
"ragStatus": result.legalAgentReview.ragStatus,
"feedback": result.legalAgentReview.feedback,
"issues": result.legalAgentReview.issues,
"isFinancialPromotion": result.legalAgentReview.isFinancialPromotion,
"financialPromotionReason": result.legalAgentReview.financialPromotionReason,
"resolvedIssues": result.legalAgentReview.resolvedIssues,
"outstandingIssues": result.legalAgentReview.outstandingIssues,
"newIssues": result.legalAgentReview.newIssues,
},
"brandAgentReview": {
"ragStatus": result.brandAgentReview.ragStatus,
"feedback": result.brandAgentReview.feedback,
"issues": result.brandAgentReview.issues,
"resolvedIssues": result.brandAgentReview.resolvedIssues,
"outstandingIssues": result.brandAgentReview.outstandingIssues,
"newIssues": result.brandAgentReview.newIssues,
},
"channelBestPracticesAgentReview": {
"ragStatus": result.channelBestPracticesAgentReview.ragStatus,
"feedback": result.channelBestPracticesAgentReview.feedback,
"issues": result.channelBestPracticesAgentReview.issues,
"resolvedIssues": result.channelBestPracticesAgentReview.resolvedIssues,
"outstandingIssues": result.channelBestPracticesAgentReview.outstandingIssues,
"newIssues": result.channelBestPracticesAgentReview.newIssues,
},
"channelTechSpecsAgentReview": {
"ragStatus": result.channelTechSpecsAgentReview.ragStatus,
"feedback": result.channelTechSpecsAgentReview.feedback,
"issues": result.channelTechSpecsAgentReview.issues,
"resolvedIssues": result.channelTechSpecsAgentReview.resolvedIssues,
"outstandingIssues": result.channelTechSpecsAgentReview.outstandingIssues,
"newIssues": result.channelTechSpecsAgentReview.newIssues,
},
"leadAgentSummary": result.leadAgentSummary,
"overallStatus": result.overallStatus,
"financialPromotionReason": result.financialPromotionReason,
}
# Persist to database if campaign info provided
proof_id: Optional[str] = None
version_id: Optional[str] = None
if campaign_id and proof_name:
try:
logger.info(f"[WEBSOCKET] Persisting result for campaign {campaign_id}, proof {proof_name}")
async with async_session_factory() as session:
proof_repo = ProofRepository(session)
# Store the file
file_storage_key = await storage_service.store_file(
file_data=file_data,
campaign_id=uuid.UUID(campaign_id),
proof_name=proof_name,
version=1, # Will be updated by add_version_with_review
file_type=file_type,
)
# Generate thumbnail for images up to 10MB (data URLs are ~33% larger than binary)
thumbnail_url = None
if len(file_data) < 10000000:
if file_type.startswith('image/'):
thumbnail_url = await storage_service.generate_thumbnail_data_url(file_data, file_type)
elif file_type == 'application/pdf' and pdf_pages:
# Use first rasterized page as thumbnail
first_page_data, _, _ = pdf_pages[0]
thumbnail_url = await storage_service.generate_thumbnail_data_url(first_page_data, 'image/png')
# Save proof and version
proof, version = await proof_repo.add_version_with_review(
campaign_id=uuid.UUID(campaign_id),
proof_name=proof_name,
channel=data.get("channel"),
sub_channel=data.get("sub_channel"),
proof_type=data.get("proof_type"),
file_storage_key=file_storage_key,
thumbnail_url=thumbnail_url,
agent_review=result_dict,
overall_status=result.overallStatus,
file_hash=file_hash,
is_identical_file=is_identical_file,
)
await session.commit()
proof_id = str(proof.id)
version_id = str(version.id)
logger.info(f"[WEBSOCKET] Persisted proof {proof_id} version {version.version}")
except Exception as e:
logger.error(f"[WEBSOCKET] Failed to persist result: {str(e)}")
# Continue - still send result to client even if persistence fails
# Send the complete result
logger.info(f"[WEBSOCKET] Analysis complete, sending result - overallStatus: {result.overallStatus}")
if manager.is_connected(client_id):
response = {
"type": "complete",
"result": result_dict,
"is_identical_file": is_identical_file,
}
# Include proof/version IDs if persisted
if proof_id:
response["proof_id"] = proof_id
if version_id:
response["version_id"] = version_id
# Include rasterized PDF pages if present
if pdf_pages:
import base64 as b64_module
response["pdf_pages"] = [
{
"page": i + 1,
"data_url": f"data:image/png;base64,{b64_module.b64encode(png_data).decode('utf-8')}",
"width": width,
"height": height,
}
for i, (png_data, width, height) in enumerate(pdf_pages)
]
logger.info(f"[WEBSOCKET] Including {len(pdf_pages)} rasterized PDF pages in response")
await manager.send_message(client_id, response)
logger.info(f"[WEBSOCKET] Result sent to client: {client_id}")
except Exception as e:
# Send error message
logger.error(f"[WEBSOCKET] Analysis failed for client {client_id}: {str(e)}")
if manager.is_connected(client_id):
await manager.send_message(client_id, {
"type": "error",
"message": f"Analysis failed: {str(e)}"
})