POST /api/analyze submits an analysis job and returns job_id instantly.
GET /api/analyze/{job_id} returns progress + result; frontend polls every 2s.
Analysis runs as asyncio.create_task in the background — each HTTP request
completes in milliseconds, well within the 30s GCP Load Balancer limit.
- Add backend/app/services/job_store.py: in-memory AnalysisJob store with
30-min TTL cleanup
- Add backend/app/api/analysis_routes.py: POST + GET /api/analyze endpoints
with full analysis pipeline (hash check, DB persistence, PDF pages, etc.)
- Remove backend/app/websocket/: handlers.py, manager.py, __init__.py
- Update backend/app/main.py: wire analysis_router, store analysis_service
in app.state, drop all WebSocket imports and endpoint
- Update frontend/services/geminiService.ts: replace WS with fetch+poll;
function signatures unchanged so App.tsx / WIPReviewer.tsx need no edits
- Remove VITE_BACKEND_WS_URL from vite.config.ts, deploy.sh, .env.deploy.example
- Update cloudrun.yaml: remove WebSocket-specific session affinity annotation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
297 lines
13 KiB
Python
297 lines
13 KiB
Python
import asyncio
|
|
import base64
|
|
import logging
|
|
import uuid
|
|
from typing import Optional, Any
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request
|
|
from pydantic import BaseModel
|
|
|
|
from app.dependencies.auth import get_current_user
|
|
from app.models.schemas import SubReview
|
|
from app.models.database import async_session_factory
|
|
from app.repositories import ProofRepository, CampaignRepository, UserRepository, AuditRepository
|
|
from app.services.storage_service import storage_service
|
|
from app.services.job_store import create_job, get_job, AnalysisJob
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
analysis_router = APIRouter()
|
|
|
|
|
|
class AnalyzeRequest(BaseModel):
|
|
file_data: str # base64-encoded
|
|
file_type: str = "image/png"
|
|
is_wip: bool = False
|
|
campaign_id: Optional[str] = None
|
|
proof_name: Optional[str] = None
|
|
channel: Optional[str] = None
|
|
sub_channel: Optional[str] = None
|
|
proof_type: Optional[str] = None
|
|
brand: str = "Barclaycard"
|
|
|
|
|
|
@analysis_router.post("/analyze")
|
|
async def submit_analysis(
|
|
body: AnalyzeRequest,
|
|
request: Request,
|
|
current_user: dict = Depends(get_current_user),
|
|
):
|
|
"""Submit a proof for analysis. Returns job_id immediately."""
|
|
analysis_service = getattr(request.app.state, "analysis_service", None)
|
|
if analysis_service is None:
|
|
raise HTTPException(status_code=503, detail="Backend not ready. Please wait for initialization.")
|
|
|
|
# Resolve current_user_id from DB
|
|
current_user_id: Optional[uuid.UUID] = None
|
|
try:
|
|
async with async_session_factory() as session:
|
|
user_repo = UserRepository(session)
|
|
azure_oid = current_user.get("oid") or current_user.get("sub")
|
|
db_user = await user_repo.get_by_azure_oid(azure_oid) if azure_oid else None
|
|
current_user_id = db_user.id if db_user else None
|
|
except Exception as e:
|
|
logger.warning(f"[ANALYZE] Failed to resolve user_id: {e}")
|
|
|
|
job = create_job()
|
|
asyncio.create_task(
|
|
_run_analysis(job, body, analysis_service, current_user_id)
|
|
)
|
|
logger.info(f"[ANALYZE] Submitted job {job.job_id} for user {current_user.get('name', 'unknown')}")
|
|
return {"job_id": job.job_id}
|
|
|
|
|
|
@analysis_router.get("/analyze/{job_id}")
|
|
async def poll_analysis(job_id: str, current_user: dict = Depends(get_current_user)):
|
|
"""Poll the status of an analysis job."""
|
|
job = get_job(job_id)
|
|
if job is None:
|
|
raise HTTPException(status_code=404, detail="Job not found or expired")
|
|
|
|
response: dict[str, Any] = {
|
|
"status": job.status,
|
|
"agents_started": job.agents_started,
|
|
"agents_completed": job.agents_completed,
|
|
"model_fallback": job.model_fallback,
|
|
}
|
|
|
|
if job.status == "complete":
|
|
response["result"] = job.result
|
|
response["proof_id"] = job.proof_id
|
|
response["version_id"] = job.version_id
|
|
response["pdf_pages"] = job.pdf_pages
|
|
response["is_identical_file"] = job.is_identical_file
|
|
elif job.status == "error":
|
|
response["error_message"] = job.error_message
|
|
|
|
return response
|
|
|
|
|
|
async def _run_analysis(
|
|
job: AnalysisJob,
|
|
body: AnalyzeRequest,
|
|
analysis_service: Any,
|
|
current_user_id: Optional[uuid.UUID],
|
|
) -> None:
|
|
"""Background task: run the full analysis pipeline and update job state."""
|
|
job.status = "running"
|
|
logger.info(f"[ANALYZE] Starting analysis for job {job.job_id}")
|
|
|
|
try:
|
|
# Decode base64 file data
|
|
try:
|
|
file_data = base64.b64decode(body.file_data)
|
|
except Exception as e:
|
|
raise ValueError(f"Failed to decode file data: {e}")
|
|
|
|
# Compute file hash for duplicate detection
|
|
file_hash = storage_service.get_checksum(file_data)
|
|
|
|
# Fetch previous analysis if this is a revision
|
|
previous_analysis = None
|
|
previous_file_hash = None
|
|
is_identical_file = False
|
|
|
|
if body.campaign_id and body.proof_name:
|
|
try:
|
|
async with async_session_factory() as session:
|
|
proof_repo = ProofRepository(session)
|
|
existing_proof = await proof_repo.get_by_campaign_and_name(
|
|
uuid.UUID(body.campaign_id), body.proof_name
|
|
)
|
|
if existing_proof:
|
|
previous_analysis = await proof_repo.get_latest_version_review(existing_proof.id)
|
|
previous_file_hash = await proof_repo.get_latest_version_hash(existing_proof.id)
|
|
if previous_file_hash and previous_file_hash == file_hash:
|
|
is_identical_file = True
|
|
logger.info(f"[ANALYZE] Identical file detected for job {job.job_id}")
|
|
except Exception as e:
|
|
logger.warning(f"[ANALYZE] Failed to fetch previous analysis: {e}")
|
|
|
|
job.is_identical_file = is_identical_file
|
|
|
|
# Agent update callback — updates job store in place
|
|
async def on_agent_update(agent_name: str, review: SubReview | None) -> None:
|
|
if review is None:
|
|
job.agents_started.append(agent_name)
|
|
logger.info(f"[ANALYZE] Agent started: {agent_name} (job {job.job_id})")
|
|
else:
|
|
job.agents_completed[agent_name] = {
|
|
"ragStatus": review.ragStatus,
|
|
"feedback": review.feedback,
|
|
"issues": review.issues,
|
|
"isFinancialPromotion": review.isFinancialPromotion,
|
|
"financialPromotionReason": review.financialPromotionReason,
|
|
"resolvedIssues": review.resolvedIssues,
|
|
"outstandingIssues": review.outstandingIssues,
|
|
"newIssues": review.newIssues,
|
|
}
|
|
logger.info(f"[ANALYZE] Agent completed: {agent_name} (job {job.job_id})")
|
|
|
|
# Model fallback callback
|
|
async def on_model_fallback() -> None:
|
|
if not job.model_fallback:
|
|
job.model_fallback = True
|
|
logger.info(f"[ANALYZE] Model fallback triggered for job {job.job_id}")
|
|
|
|
# Run the analysis (5-minute hard timeout)
|
|
result, pdf_pages = await asyncio.wait_for(
|
|
analysis_service.analyze_proof(
|
|
file_data=file_data,
|
|
file_type=body.file_type,
|
|
on_agent_update=on_agent_update,
|
|
is_wip=body.is_wip,
|
|
brand=body.brand,
|
|
previous_analysis=previous_analysis,
|
|
channel=body.channel,
|
|
sub_channel=body.sub_channel,
|
|
proof_type=body.proof_type,
|
|
on_fallback=on_model_fallback,
|
|
),
|
|
timeout=300.0,
|
|
)
|
|
|
|
# Build result dict
|
|
result_dict = {
|
|
"legalAgentReview": {
|
|
"ragStatus": result.legalAgentReview.ragStatus,
|
|
"feedback": result.legalAgentReview.feedback,
|
|
"issues": result.legalAgentReview.issues,
|
|
"isFinancialPromotion": result.legalAgentReview.isFinancialPromotion,
|
|
"financialPromotionReason": result.legalAgentReview.financialPromotionReason,
|
|
"resolvedIssues": result.legalAgentReview.resolvedIssues,
|
|
"outstandingIssues": result.legalAgentReview.outstandingIssues,
|
|
"newIssues": result.legalAgentReview.newIssues,
|
|
},
|
|
"brandAgentReview": {
|
|
"ragStatus": result.brandAgentReview.ragStatus,
|
|
"feedback": result.brandAgentReview.feedback,
|
|
"issues": result.brandAgentReview.issues,
|
|
"resolvedIssues": result.brandAgentReview.resolvedIssues,
|
|
"outstandingIssues": result.brandAgentReview.outstandingIssues,
|
|
"newIssues": result.brandAgentReview.newIssues,
|
|
},
|
|
"channelBestPracticesAgentReview": {
|
|
"ragStatus": result.channelBestPracticesAgentReview.ragStatus,
|
|
"feedback": result.channelBestPracticesAgentReview.feedback,
|
|
"issues": result.channelBestPracticesAgentReview.issues,
|
|
"resolvedIssues": result.channelBestPracticesAgentReview.resolvedIssues,
|
|
"outstandingIssues": result.channelBestPracticesAgentReview.outstandingIssues,
|
|
"newIssues": result.channelBestPracticesAgentReview.newIssues,
|
|
},
|
|
"channelTechSpecsAgentReview": {
|
|
"ragStatus": result.channelTechSpecsAgentReview.ragStatus,
|
|
"feedback": result.channelTechSpecsAgentReview.feedback,
|
|
"issues": result.channelTechSpecsAgentReview.issues,
|
|
"resolvedIssues": result.channelTechSpecsAgentReview.resolvedIssues,
|
|
"outstandingIssues": result.channelTechSpecsAgentReview.outstandingIssues,
|
|
"newIssues": result.channelTechSpecsAgentReview.newIssues,
|
|
},
|
|
"leadAgentSummary": result.leadAgentSummary,
|
|
"overallStatus": result.overallStatus,
|
|
"financialPromotionReason": result.financialPromotionReason,
|
|
}
|
|
|
|
# Persist to database
|
|
proof_id: Optional[str] = None
|
|
version_id: Optional[str] = None
|
|
serialized_pdf_pages: list[dict] | None = None
|
|
|
|
if body.campaign_id and body.proof_name:
|
|
try:
|
|
async with async_session_factory() as session:
|
|
proof_repo = ProofRepository(session)
|
|
|
|
file_storage_key = await storage_service.store_file(
|
|
file_data=file_data,
|
|
campaign_id=uuid.UUID(body.campaign_id),
|
|
proof_name=body.proof_name,
|
|
version=1,
|
|
file_type=body.file_type,
|
|
)
|
|
|
|
thumbnail_url = None
|
|
if len(file_data) < 10_000_000:
|
|
if body.file_type.startswith("image/"):
|
|
thumbnail_url = await storage_service.generate_thumbnail_data_url(file_data, body.file_type)
|
|
elif body.file_type == "application/pdf" and pdf_pages:
|
|
first_page_data, _, _ = pdf_pages[0]
|
|
thumbnail_url = await storage_service.generate_thumbnail_data_url(first_page_data, "image/png")
|
|
|
|
proof, version = await proof_repo.add_version_with_review(
|
|
campaign_id=uuid.UUID(body.campaign_id),
|
|
proof_name=body.proof_name,
|
|
channel=body.channel,
|
|
sub_channel=body.sub_channel,
|
|
proof_type=body.proof_type,
|
|
file_storage_key=file_storage_key,
|
|
thumbnail_url=thumbnail_url,
|
|
agent_review=result_dict,
|
|
overall_status=result.overallStatus,
|
|
file_hash=file_hash,
|
|
is_identical_file=is_identical_file,
|
|
created_by=current_user_id,
|
|
)
|
|
|
|
if result.overallStatus == "Analysis Error":
|
|
audit_repo = AuditRepository(session)
|
|
await audit_repo.create_error_item(
|
|
proof_version_id=version.id,
|
|
error_summary=result.leadAgentSummary,
|
|
)
|
|
|
|
await session.commit()
|
|
proof_id = str(proof.id)
|
|
version_id = str(version.id)
|
|
logger.info(f"[ANALYZE] Persisted proof {proof_id} version {version.version} for job {job.job_id}")
|
|
except Exception as e:
|
|
logger.error(f"[ANALYZE] Failed to persist result for job {job.job_id}: {e}")
|
|
|
|
# Serialize PDF pages if present
|
|
if pdf_pages:
|
|
serialized_pdf_pages = [
|
|
{
|
|
"page": i + 1,
|
|
"data_url": f"data:image/png;base64,{base64.b64encode(png_data).decode('utf-8')}",
|
|
"width": width,
|
|
"height": height,
|
|
}
|
|
for i, (png_data, width, height) in enumerate(pdf_pages)
|
|
]
|
|
|
|
# Mark job complete
|
|
job.result = result_dict
|
|
job.proof_id = proof_id
|
|
job.version_id = version_id
|
|
job.pdf_pages = serialized_pdf_pages
|
|
job.status = "complete"
|
|
logger.info(f"[ANALYZE] Job {job.job_id} complete — overallStatus: {result.overallStatus}")
|
|
|
|
except asyncio.TimeoutError:
|
|
job.status = "error"
|
|
job.error_message = "Analysis timed out after 5 minutes. Please try again with a smaller file."
|
|
logger.error(f"[ANALYZE] Job {job.job_id} timed out")
|
|
except Exception as e:
|
|
job.status = "error"
|
|
job.error_message = f"Analysis failed: {e}"
|
|
logger.error(f"[ANALYZE] Job {job.job_id} failed: {e}")
|