import asyncio import base64 import logging import uuid from typing import Optional, Any from fastapi import APIRouter, Depends, HTTPException, Request from pydantic import BaseModel from app.dependencies.auth import get_current_user from app.models.schemas import SubReview from app.models.database import async_session_factory from app.repositories import ProofRepository, CampaignRepository, UserRepository, AuditRepository from app.services.storage_service import storage_service from app.services.job_store import create_job, get_job, AnalysisJob logger = logging.getLogger(__name__) analysis_router = APIRouter() class AnalyzeRequest(BaseModel): file_data: str # base64-encoded file_type: str = "image/png" is_wip: bool = False campaign_id: Optional[str] = None proof_name: Optional[str] = None channel: Optional[str] = None sub_channel: Optional[str] = None proof_type: Optional[str] = None brand: str = "Barclaycard" @analysis_router.post("/analyze") async def submit_analysis( body: AnalyzeRequest, request: Request, current_user: dict = Depends(get_current_user), ): """Submit a proof for analysis. Returns job_id immediately.""" analysis_service = getattr(request.app.state, "analysis_service", None) if analysis_service is None: raise HTTPException(status_code=503, detail="Backend not ready. Please wait for initialization.") # Resolve current_user_id from DB current_user_id: Optional[uuid.UUID] = None try: async with async_session_factory() as session: user_repo = UserRepository(session) azure_oid = current_user.get("oid") or current_user.get("sub") db_user = await user_repo.get_by_azure_oid(azure_oid) if azure_oid else None current_user_id = db_user.id if db_user else None except Exception as e: logger.warning(f"[ANALYZE] Failed to resolve user_id: {e}") job = create_job() asyncio.create_task( _run_analysis(job, body, analysis_service, current_user_id) ) logger.info(f"[ANALYZE] Submitted job {job.job_id} for user {current_user.get('name', 'unknown')}") return {"job_id": job.job_id} @analysis_router.get("/analyze/{job_id}") async def poll_analysis(job_id: str, current_user: dict = Depends(get_current_user)): """Poll the status of an analysis job.""" job = get_job(job_id) if job is None: raise HTTPException(status_code=404, detail="Job not found or expired") response: dict[str, Any] = { "status": job.status, "agents_started": job.agents_started, "agents_completed": job.agents_completed, "model_fallback": job.model_fallback, } if job.status == "complete": response["result"] = job.result response["proof_id"] = job.proof_id response["version_id"] = job.version_id response["pdf_pages"] = job.pdf_pages response["is_identical_file"] = job.is_identical_file elif job.status == "error": response["error_message"] = job.error_message return response async def _run_analysis( job: AnalysisJob, body: AnalyzeRequest, analysis_service: Any, current_user_id: Optional[uuid.UUID], ) -> None: """Background task: run the full analysis pipeline and update job state.""" job.status = "running" logger.info(f"[ANALYZE] Starting analysis for job {job.job_id}") try: # Decode base64 file data try: file_data = base64.b64decode(body.file_data) except Exception as e: raise ValueError(f"Failed to decode file data: {e}") # Compute file hash for duplicate detection file_hash = storage_service.get_checksum(file_data) # Fetch previous analysis if this is a revision previous_analysis = None previous_file_hash = None is_identical_file = False if body.campaign_id and body.proof_name: try: async with async_session_factory() as session: proof_repo = ProofRepository(session) existing_proof = await proof_repo.get_by_campaign_and_name( uuid.UUID(body.campaign_id), body.proof_name ) if existing_proof: previous_analysis = await proof_repo.get_latest_version_review(existing_proof.id) previous_file_hash = await proof_repo.get_latest_version_hash(existing_proof.id) if previous_file_hash and previous_file_hash == file_hash: is_identical_file = True logger.info(f"[ANALYZE] Identical file detected for job {job.job_id}") except Exception as e: logger.warning(f"[ANALYZE] Failed to fetch previous analysis: {e}") job.is_identical_file = is_identical_file # Agent update callback — updates job store in place async def on_agent_update(agent_name: str, review: SubReview | None) -> None: if review is None: job.agents_started.append(agent_name) logger.info(f"[ANALYZE] Agent started: {agent_name} (job {job.job_id})") else: job.agents_completed[agent_name] = { "ragStatus": review.ragStatus, "feedback": review.feedback, "issues": review.issues, "isFinancialPromotion": review.isFinancialPromotion, "financialPromotionReason": review.financialPromotionReason, "resolvedIssues": review.resolvedIssues, "outstandingIssues": review.outstandingIssues, "newIssues": review.newIssues, } logger.info(f"[ANALYZE] Agent completed: {agent_name} (job {job.job_id})") # Model fallback callback async def on_model_fallback() -> None: if not job.model_fallback: job.model_fallback = True logger.info(f"[ANALYZE] Model fallback triggered for job {job.job_id}") # Run the analysis (5-minute hard timeout) result, pdf_pages = await asyncio.wait_for( analysis_service.analyze_proof( file_data=file_data, file_type=body.file_type, on_agent_update=on_agent_update, is_wip=body.is_wip, brand=body.brand, previous_analysis=previous_analysis, channel=body.channel, sub_channel=body.sub_channel, proof_type=body.proof_type, on_fallback=on_model_fallback, ), timeout=300.0, ) # Build result dict result_dict = { "legalAgentReview": { "ragStatus": result.legalAgentReview.ragStatus, "feedback": result.legalAgentReview.feedback, "issues": result.legalAgentReview.issues, "isFinancialPromotion": result.legalAgentReview.isFinancialPromotion, "financialPromotionReason": result.legalAgentReview.financialPromotionReason, "resolvedIssues": result.legalAgentReview.resolvedIssues, "outstandingIssues": result.legalAgentReview.outstandingIssues, "newIssues": result.legalAgentReview.newIssues, }, "brandAgentReview": { "ragStatus": result.brandAgentReview.ragStatus, "feedback": result.brandAgentReview.feedback, "issues": result.brandAgentReview.issues, "resolvedIssues": result.brandAgentReview.resolvedIssues, "outstandingIssues": result.brandAgentReview.outstandingIssues, "newIssues": result.brandAgentReview.newIssues, }, "channelBestPracticesAgentReview": { "ragStatus": result.channelBestPracticesAgentReview.ragStatus, "feedback": result.channelBestPracticesAgentReview.feedback, "issues": result.channelBestPracticesAgentReview.issues, "resolvedIssues": result.channelBestPracticesAgentReview.resolvedIssues, "outstandingIssues": result.channelBestPracticesAgentReview.outstandingIssues, "newIssues": result.channelBestPracticesAgentReview.newIssues, }, "channelTechSpecsAgentReview": { "ragStatus": result.channelTechSpecsAgentReview.ragStatus, "feedback": result.channelTechSpecsAgentReview.feedback, "issues": result.channelTechSpecsAgentReview.issues, "resolvedIssues": result.channelTechSpecsAgentReview.resolvedIssues, "outstandingIssues": result.channelTechSpecsAgentReview.outstandingIssues, "newIssues": result.channelTechSpecsAgentReview.newIssues, }, "leadAgentSummary": result.leadAgentSummary, "overallStatus": result.overallStatus, "financialPromotionReason": result.financialPromotionReason, } # Persist to database proof_id: Optional[str] = None version_id: Optional[str] = None serialized_pdf_pages: list[dict] | None = None if body.campaign_id and body.proof_name: try: async with async_session_factory() as session: proof_repo = ProofRepository(session) file_storage_key = await storage_service.store_file( file_data=file_data, campaign_id=uuid.UUID(body.campaign_id), proof_name=body.proof_name, version=1, file_type=body.file_type, ) thumbnail_url = None if len(file_data) < 10_000_000: if body.file_type.startswith("image/"): thumbnail_url = await storage_service.generate_thumbnail_data_url(file_data, body.file_type) elif body.file_type == "application/pdf" and pdf_pages: first_page_data, _, _ = pdf_pages[0] thumbnail_url = await storage_service.generate_thumbnail_data_url(first_page_data, "image/png") proof, version = await proof_repo.add_version_with_review( campaign_id=uuid.UUID(body.campaign_id), proof_name=body.proof_name, channel=body.channel, sub_channel=body.sub_channel, proof_type=body.proof_type, file_storage_key=file_storage_key, thumbnail_url=thumbnail_url, agent_review=result_dict, overall_status=result.overallStatus, file_hash=file_hash, is_identical_file=is_identical_file, created_by=current_user_id, ) if result.overallStatus == "Analysis Error": audit_repo = AuditRepository(session) await audit_repo.create_error_item( proof_version_id=version.id, error_summary=result.leadAgentSummary, ) await session.commit() proof_id = str(proof.id) version_id = str(version.id) logger.info(f"[ANALYZE] Persisted proof {proof_id} version {version.version} for job {job.job_id}") except Exception as e: logger.error(f"[ANALYZE] Failed to persist result for job {job.job_id}: {e}") # Serialize PDF pages if present if pdf_pages: serialized_pdf_pages = [ { "page": i + 1, "data_url": f"data:image/png;base64,{base64.b64encode(png_data).decode('utf-8')}", "width": width, "height": height, } for i, (png_data, width, height) in enumerate(pdf_pages) ] # Mark job complete job.result = result_dict job.proof_id = proof_id job.version_id = version_id job.pdf_pages = serialized_pdf_pages job.status = "complete" logger.info(f"[ANALYZE] Job {job.job_id} complete — overallStatus: {result.overallStatus}") except asyncio.TimeoutError: job.status = "error" job.error_message = "Analysis timed out after 5 minutes. Please try again with a smaller file." logger.error(f"[ANALYZE] Job {job.job_id} timed out") except Exception as e: job.status = "error" job.error_message = f"Analysis failed: {e}" logger.error(f"[ANALYZE] Job {job.job_id} failed: {e}")