modcomms/backend/app/api/analysis_routes.py
Vadym Samoilenko a6fc149788 Replace WebSocket with REST polling to fix GCP LB 30s timeout
POST /api/analyze submits an analysis job and returns job_id instantly.
GET /api/analyze/{job_id} returns progress + result; frontend polls every 2s.

Analysis runs as asyncio.create_task in the background — each HTTP request
completes in milliseconds, well within the 30s GCP Load Balancer limit.

- Add backend/app/services/job_store.py: in-memory AnalysisJob store with
  30-min TTL cleanup
- Add backend/app/api/analysis_routes.py: POST + GET /api/analyze endpoints
  with full analysis pipeline (hash check, DB persistence, PDF pages, etc.)
- Remove backend/app/websocket/: handlers.py, manager.py, __init__.py
- Update backend/app/main.py: wire analysis_router, store analysis_service
  in app.state, drop all WebSocket imports and endpoint
- Update frontend/services/geminiService.ts: replace WS with fetch+poll;
  function signatures unchanged so App.tsx / WIPReviewer.tsx need no edits
- Remove VITE_BACKEND_WS_URL from vite.config.ts, deploy.sh, .env.deploy.example
- Update cloudrun.yaml: remove WebSocket-specific session affinity annotation

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 15:26:01 +00:00

297 lines
13 KiB
Python

import asyncio
import base64
import logging
import uuid
from typing import Optional, Any
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
from app.dependencies.auth import get_current_user
from app.models.schemas import SubReview
from app.models.database import async_session_factory
from app.repositories import ProofRepository, CampaignRepository, UserRepository, AuditRepository
from app.services.storage_service import storage_service
from app.services.job_store import create_job, get_job, AnalysisJob
logger = logging.getLogger(__name__)
analysis_router = APIRouter()
class AnalyzeRequest(BaseModel):
file_data: str # base64-encoded
file_type: str = "image/png"
is_wip: bool = False
campaign_id: Optional[str] = None
proof_name: Optional[str] = None
channel: Optional[str] = None
sub_channel: Optional[str] = None
proof_type: Optional[str] = None
brand: str = "Barclaycard"
@analysis_router.post("/analyze")
async def submit_analysis(
body: AnalyzeRequest,
request: Request,
current_user: dict = Depends(get_current_user),
):
"""Submit a proof for analysis. Returns job_id immediately."""
analysis_service = getattr(request.app.state, "analysis_service", None)
if analysis_service is None:
raise HTTPException(status_code=503, detail="Backend not ready. Please wait for initialization.")
# Resolve current_user_id from DB
current_user_id: Optional[uuid.UUID] = None
try:
async with async_session_factory() as session:
user_repo = UserRepository(session)
azure_oid = current_user.get("oid") or current_user.get("sub")
db_user = await user_repo.get_by_azure_oid(azure_oid) if azure_oid else None
current_user_id = db_user.id if db_user else None
except Exception as e:
logger.warning(f"[ANALYZE] Failed to resolve user_id: {e}")
job = create_job()
asyncio.create_task(
_run_analysis(job, body, analysis_service, current_user_id)
)
logger.info(f"[ANALYZE] Submitted job {job.job_id} for user {current_user.get('name', 'unknown')}")
return {"job_id": job.job_id}
@analysis_router.get("/analyze/{job_id}")
async def poll_analysis(job_id: str, current_user: dict = Depends(get_current_user)):
"""Poll the status of an analysis job."""
job = get_job(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found or expired")
response: dict[str, Any] = {
"status": job.status,
"agents_started": job.agents_started,
"agents_completed": job.agents_completed,
"model_fallback": job.model_fallback,
}
if job.status == "complete":
response["result"] = job.result
response["proof_id"] = job.proof_id
response["version_id"] = job.version_id
response["pdf_pages"] = job.pdf_pages
response["is_identical_file"] = job.is_identical_file
elif job.status == "error":
response["error_message"] = job.error_message
return response
async def _run_analysis(
job: AnalysisJob,
body: AnalyzeRequest,
analysis_service: Any,
current_user_id: Optional[uuid.UUID],
) -> None:
"""Background task: run the full analysis pipeline and update job state."""
job.status = "running"
logger.info(f"[ANALYZE] Starting analysis for job {job.job_id}")
try:
# Decode base64 file data
try:
file_data = base64.b64decode(body.file_data)
except Exception as e:
raise ValueError(f"Failed to decode file data: {e}")
# Compute file hash for duplicate detection
file_hash = storage_service.get_checksum(file_data)
# Fetch previous analysis if this is a revision
previous_analysis = None
previous_file_hash = None
is_identical_file = False
if body.campaign_id and body.proof_name:
try:
async with async_session_factory() as session:
proof_repo = ProofRepository(session)
existing_proof = await proof_repo.get_by_campaign_and_name(
uuid.UUID(body.campaign_id), body.proof_name
)
if existing_proof:
previous_analysis = await proof_repo.get_latest_version_review(existing_proof.id)
previous_file_hash = await proof_repo.get_latest_version_hash(existing_proof.id)
if previous_file_hash and previous_file_hash == file_hash:
is_identical_file = True
logger.info(f"[ANALYZE] Identical file detected for job {job.job_id}")
except Exception as e:
logger.warning(f"[ANALYZE] Failed to fetch previous analysis: {e}")
job.is_identical_file = is_identical_file
# Agent update callback — updates job store in place
async def on_agent_update(agent_name: str, review: SubReview | None) -> None:
if review is None:
job.agents_started.append(agent_name)
logger.info(f"[ANALYZE] Agent started: {agent_name} (job {job.job_id})")
else:
job.agents_completed[agent_name] = {
"ragStatus": review.ragStatus,
"feedback": review.feedback,
"issues": review.issues,
"isFinancialPromotion": review.isFinancialPromotion,
"financialPromotionReason": review.financialPromotionReason,
"resolvedIssues": review.resolvedIssues,
"outstandingIssues": review.outstandingIssues,
"newIssues": review.newIssues,
}
logger.info(f"[ANALYZE] Agent completed: {agent_name} (job {job.job_id})")
# Model fallback callback
async def on_model_fallback() -> None:
if not job.model_fallback:
job.model_fallback = True
logger.info(f"[ANALYZE] Model fallback triggered for job {job.job_id}")
# Run the analysis (5-minute hard timeout)
result, pdf_pages = await asyncio.wait_for(
analysis_service.analyze_proof(
file_data=file_data,
file_type=body.file_type,
on_agent_update=on_agent_update,
is_wip=body.is_wip,
brand=body.brand,
previous_analysis=previous_analysis,
channel=body.channel,
sub_channel=body.sub_channel,
proof_type=body.proof_type,
on_fallback=on_model_fallback,
),
timeout=300.0,
)
# Build result dict
result_dict = {
"legalAgentReview": {
"ragStatus": result.legalAgentReview.ragStatus,
"feedback": result.legalAgentReview.feedback,
"issues": result.legalAgentReview.issues,
"isFinancialPromotion": result.legalAgentReview.isFinancialPromotion,
"financialPromotionReason": result.legalAgentReview.financialPromotionReason,
"resolvedIssues": result.legalAgentReview.resolvedIssues,
"outstandingIssues": result.legalAgentReview.outstandingIssues,
"newIssues": result.legalAgentReview.newIssues,
},
"brandAgentReview": {
"ragStatus": result.brandAgentReview.ragStatus,
"feedback": result.brandAgentReview.feedback,
"issues": result.brandAgentReview.issues,
"resolvedIssues": result.brandAgentReview.resolvedIssues,
"outstandingIssues": result.brandAgentReview.outstandingIssues,
"newIssues": result.brandAgentReview.newIssues,
},
"channelBestPracticesAgentReview": {
"ragStatus": result.channelBestPracticesAgentReview.ragStatus,
"feedback": result.channelBestPracticesAgentReview.feedback,
"issues": result.channelBestPracticesAgentReview.issues,
"resolvedIssues": result.channelBestPracticesAgentReview.resolvedIssues,
"outstandingIssues": result.channelBestPracticesAgentReview.outstandingIssues,
"newIssues": result.channelBestPracticesAgentReview.newIssues,
},
"channelTechSpecsAgentReview": {
"ragStatus": result.channelTechSpecsAgentReview.ragStatus,
"feedback": result.channelTechSpecsAgentReview.feedback,
"issues": result.channelTechSpecsAgentReview.issues,
"resolvedIssues": result.channelTechSpecsAgentReview.resolvedIssues,
"outstandingIssues": result.channelTechSpecsAgentReview.outstandingIssues,
"newIssues": result.channelTechSpecsAgentReview.newIssues,
},
"leadAgentSummary": result.leadAgentSummary,
"overallStatus": result.overallStatus,
"financialPromotionReason": result.financialPromotionReason,
}
# Persist to database
proof_id: Optional[str] = None
version_id: Optional[str] = None
serialized_pdf_pages: list[dict] | None = None
if body.campaign_id and body.proof_name:
try:
async with async_session_factory() as session:
proof_repo = ProofRepository(session)
file_storage_key = await storage_service.store_file(
file_data=file_data,
campaign_id=uuid.UUID(body.campaign_id),
proof_name=body.proof_name,
version=1,
file_type=body.file_type,
)
thumbnail_url = None
if len(file_data) < 10_000_000:
if body.file_type.startswith("image/"):
thumbnail_url = await storage_service.generate_thumbnail_data_url(file_data, body.file_type)
elif body.file_type == "application/pdf" and pdf_pages:
first_page_data, _, _ = pdf_pages[0]
thumbnail_url = await storage_service.generate_thumbnail_data_url(first_page_data, "image/png")
proof, version = await proof_repo.add_version_with_review(
campaign_id=uuid.UUID(body.campaign_id),
proof_name=body.proof_name,
channel=body.channel,
sub_channel=body.sub_channel,
proof_type=body.proof_type,
file_storage_key=file_storage_key,
thumbnail_url=thumbnail_url,
agent_review=result_dict,
overall_status=result.overallStatus,
file_hash=file_hash,
is_identical_file=is_identical_file,
created_by=current_user_id,
)
if result.overallStatus == "Analysis Error":
audit_repo = AuditRepository(session)
await audit_repo.create_error_item(
proof_version_id=version.id,
error_summary=result.leadAgentSummary,
)
await session.commit()
proof_id = str(proof.id)
version_id = str(version.id)
logger.info(f"[ANALYZE] Persisted proof {proof_id} version {version.version} for job {job.job_id}")
except Exception as e:
logger.error(f"[ANALYZE] Failed to persist result for job {job.job_id}: {e}")
# Serialize PDF pages if present
if pdf_pages:
serialized_pdf_pages = [
{
"page": i + 1,
"data_url": f"data:image/png;base64,{base64.b64encode(png_data).decode('utf-8')}",
"width": width,
"height": height,
}
for i, (png_data, width, height) in enumerate(pdf_pages)
]
# Mark job complete
job.result = result_dict
job.proof_id = proof_id
job.version_id = version_id
job.pdf_pages = serialized_pdf_pages
job.status = "complete"
logger.info(f"[ANALYZE] Job {job.job_id} complete — overallStatus: {result.overallStatus}")
except asyncio.TimeoutError:
job.status = "error"
job.error_message = "Analysis timed out after 5 minutes. Please try again with a smaller file."
logger.error(f"[ANALYZE] Job {job.job_id} timed out")
except Exception as e:
job.status = "error"
job.error_message = f"Analysis failed: {e}"
logger.error(f"[ANALYZE] Job {job.job_id} failed: {e}")