Replace WebSocket with REST polling to fix GCP LB 30s timeout
POST /api/analyze submits an analysis job and returns job_id instantly.
GET /api/analyze/{job_id} returns progress + result; frontend polls every 2s.
Analysis runs as asyncio.create_task in the background — each HTTP request
completes in milliseconds, well within the 30s GCP Load Balancer limit.
- Add backend/app/services/job_store.py: in-memory AnalysisJob store with
30-min TTL cleanup
- Add backend/app/api/analysis_routes.py: POST + GET /api/analyze endpoints
with full analysis pipeline (hash check, DB persistence, PDF pages, etc.)
- Remove backend/app/websocket/: handlers.py, manager.py, __init__.py
- Update backend/app/main.py: wire analysis_router, store analysis_service
in app.state, drop all WebSocket imports and endpoint
- Update frontend/services/geminiService.ts: replace WS with fetch+poll;
function signatures unchanged so App.tsx / WIPReviewer.tsx need no edits
- Remove VITE_BACKEND_WS_URL from vite.config.ts, deploy.sh, .env.deploy.example
- Update cloudrun.yaml: remove WebSocket-specific session affinity annotation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
1de572fcb0
commit
a6fc149788
12 changed files with 526 additions and 756 deletions
|
|
@ -42,9 +42,6 @@ VITE_BASE_PATH=/modcomms/
|
|||
# HTTP base URL of the backend (no trailing slash)
|
||||
VITE_BACKEND_URL=https://baic.oliver.solutions/back
|
||||
|
||||
# WebSocket URL for the analysis endpoint
|
||||
VITE_BACKEND_WS_URL=wss://baic.oliver.solutions/back/ws/analyze
|
||||
|
||||
# CORS origins allowed by the backend (must match VITE_BACKEND_URL origin)
|
||||
CORS_ORIGINS=https://baic.oliver.solutions
|
||||
|
||||
|
|
@ -85,7 +82,6 @@ SUPPORT_EMAIL=BAICsupport@oliver.agency
|
|||
# Apache proxy config (for reference — configure in Apache vhost, not here)
|
||||
# -----------------------------------------------------------------------
|
||||
#
|
||||
# ProxyPass /back/ws/analyze ws://localhost:8000/ws/analyze
|
||||
# ProxyPass /back/ http://localhost:8000/
|
||||
# ProxyPassReverse /back/ http://localhost:8000/
|
||||
#
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
from app.api.routes import router
|
||||
from app.api.knowledge_base_routes import kb_router
|
||||
from app.api.analysis_routes import analysis_router
|
||||
|
||||
__all__ = ["router", "kb_router"]
|
||||
__all__ = ["router", "kb_router", "analysis_router"]
|
||||
|
|
|
|||
297
backend/app/api/analysis_routes.py
Normal file
297
backend/app/api/analysis_routes.py
Normal file
|
|
@ -0,0 +1,297 @@
|
|||
import asyncio
|
||||
import base64
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Optional, Any
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Request
|
||||
from pydantic import BaseModel
|
||||
|
||||
from app.dependencies.auth import get_current_user
|
||||
from app.models.schemas import SubReview
|
||||
from app.models.database import async_session_factory
|
||||
from app.repositories import ProofRepository, CampaignRepository, UserRepository, AuditRepository
|
||||
from app.services.storage_service import storage_service
|
||||
from app.services.job_store import create_job, get_job, AnalysisJob
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
analysis_router = APIRouter()
|
||||
|
||||
|
||||
class AnalyzeRequest(BaseModel):
|
||||
file_data: str # base64-encoded
|
||||
file_type: str = "image/png"
|
||||
is_wip: bool = False
|
||||
campaign_id: Optional[str] = None
|
||||
proof_name: Optional[str] = None
|
||||
channel: Optional[str] = None
|
||||
sub_channel: Optional[str] = None
|
||||
proof_type: Optional[str] = None
|
||||
brand: str = "Barclaycard"
|
||||
|
||||
|
||||
@analysis_router.post("/analyze")
|
||||
async def submit_analysis(
|
||||
body: AnalyzeRequest,
|
||||
request: Request,
|
||||
current_user: dict = Depends(get_current_user),
|
||||
):
|
||||
"""Submit a proof for analysis. Returns job_id immediately."""
|
||||
analysis_service = getattr(request.app.state, "analysis_service", None)
|
||||
if analysis_service is None:
|
||||
raise HTTPException(status_code=503, detail="Backend not ready. Please wait for initialization.")
|
||||
|
||||
# Resolve current_user_id from DB
|
||||
current_user_id: Optional[uuid.UUID] = None
|
||||
try:
|
||||
async with async_session_factory() as session:
|
||||
user_repo = UserRepository(session)
|
||||
azure_oid = current_user.get("oid") or current_user.get("sub")
|
||||
db_user = await user_repo.get_by_azure_oid(azure_oid) if azure_oid else None
|
||||
current_user_id = db_user.id if db_user else None
|
||||
except Exception as e:
|
||||
logger.warning(f"[ANALYZE] Failed to resolve user_id: {e}")
|
||||
|
||||
job = create_job()
|
||||
asyncio.create_task(
|
||||
_run_analysis(job, body, analysis_service, current_user_id)
|
||||
)
|
||||
logger.info(f"[ANALYZE] Submitted job {job.job_id} for user {current_user.get('name', 'unknown')}")
|
||||
return {"job_id": job.job_id}
|
||||
|
||||
|
||||
@analysis_router.get("/analyze/{job_id}")
|
||||
async def poll_analysis(job_id: str, current_user: dict = Depends(get_current_user)):
|
||||
"""Poll the status of an analysis job."""
|
||||
job = get_job(job_id)
|
||||
if job is None:
|
||||
raise HTTPException(status_code=404, detail="Job not found or expired")
|
||||
|
||||
response: dict[str, Any] = {
|
||||
"status": job.status,
|
||||
"agents_started": job.agents_started,
|
||||
"agents_completed": job.agents_completed,
|
||||
"model_fallback": job.model_fallback,
|
||||
}
|
||||
|
||||
if job.status == "complete":
|
||||
response["result"] = job.result
|
||||
response["proof_id"] = job.proof_id
|
||||
response["version_id"] = job.version_id
|
||||
response["pdf_pages"] = job.pdf_pages
|
||||
response["is_identical_file"] = job.is_identical_file
|
||||
elif job.status == "error":
|
||||
response["error_message"] = job.error_message
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def _run_analysis(
|
||||
job: AnalysisJob,
|
||||
body: AnalyzeRequest,
|
||||
analysis_service: Any,
|
||||
current_user_id: Optional[uuid.UUID],
|
||||
) -> None:
|
||||
"""Background task: run the full analysis pipeline and update job state."""
|
||||
job.status = "running"
|
||||
logger.info(f"[ANALYZE] Starting analysis for job {job.job_id}")
|
||||
|
||||
try:
|
||||
# Decode base64 file data
|
||||
try:
|
||||
file_data = base64.b64decode(body.file_data)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to decode file data: {e}")
|
||||
|
||||
# Compute file hash for duplicate detection
|
||||
file_hash = storage_service.get_checksum(file_data)
|
||||
|
||||
# Fetch previous analysis if this is a revision
|
||||
previous_analysis = None
|
||||
previous_file_hash = None
|
||||
is_identical_file = False
|
||||
|
||||
if body.campaign_id and body.proof_name:
|
||||
try:
|
||||
async with async_session_factory() as session:
|
||||
proof_repo = ProofRepository(session)
|
||||
existing_proof = await proof_repo.get_by_campaign_and_name(
|
||||
uuid.UUID(body.campaign_id), body.proof_name
|
||||
)
|
||||
if existing_proof:
|
||||
previous_analysis = await proof_repo.get_latest_version_review(existing_proof.id)
|
||||
previous_file_hash = await proof_repo.get_latest_version_hash(existing_proof.id)
|
||||
if previous_file_hash and previous_file_hash == file_hash:
|
||||
is_identical_file = True
|
||||
logger.info(f"[ANALYZE] Identical file detected for job {job.job_id}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[ANALYZE] Failed to fetch previous analysis: {e}")
|
||||
|
||||
job.is_identical_file = is_identical_file
|
||||
|
||||
# Agent update callback — updates job store in place
|
||||
async def on_agent_update(agent_name: str, review: SubReview | None) -> None:
|
||||
if review is None:
|
||||
job.agents_started.append(agent_name)
|
||||
logger.info(f"[ANALYZE] Agent started: {agent_name} (job {job.job_id})")
|
||||
else:
|
||||
job.agents_completed[agent_name] = {
|
||||
"ragStatus": review.ragStatus,
|
||||
"feedback": review.feedback,
|
||||
"issues": review.issues,
|
||||
"isFinancialPromotion": review.isFinancialPromotion,
|
||||
"financialPromotionReason": review.financialPromotionReason,
|
||||
"resolvedIssues": review.resolvedIssues,
|
||||
"outstandingIssues": review.outstandingIssues,
|
||||
"newIssues": review.newIssues,
|
||||
}
|
||||
logger.info(f"[ANALYZE] Agent completed: {agent_name} (job {job.job_id})")
|
||||
|
||||
# Model fallback callback
|
||||
async def on_model_fallback() -> None:
|
||||
if not job.model_fallback:
|
||||
job.model_fallback = True
|
||||
logger.info(f"[ANALYZE] Model fallback triggered for job {job.job_id}")
|
||||
|
||||
# Run the analysis (5-minute hard timeout)
|
||||
result, pdf_pages = await asyncio.wait_for(
|
||||
analysis_service.analyze_proof(
|
||||
file_data=file_data,
|
||||
file_type=body.file_type,
|
||||
on_agent_update=on_agent_update,
|
||||
is_wip=body.is_wip,
|
||||
brand=body.brand,
|
||||
previous_analysis=previous_analysis,
|
||||
channel=body.channel,
|
||||
sub_channel=body.sub_channel,
|
||||
proof_type=body.proof_type,
|
||||
on_fallback=on_model_fallback,
|
||||
),
|
||||
timeout=300.0,
|
||||
)
|
||||
|
||||
# Build result dict
|
||||
result_dict = {
|
||||
"legalAgentReview": {
|
||||
"ragStatus": result.legalAgentReview.ragStatus,
|
||||
"feedback": result.legalAgentReview.feedback,
|
||||
"issues": result.legalAgentReview.issues,
|
||||
"isFinancialPromotion": result.legalAgentReview.isFinancialPromotion,
|
||||
"financialPromotionReason": result.legalAgentReview.financialPromotionReason,
|
||||
"resolvedIssues": result.legalAgentReview.resolvedIssues,
|
||||
"outstandingIssues": result.legalAgentReview.outstandingIssues,
|
||||
"newIssues": result.legalAgentReview.newIssues,
|
||||
},
|
||||
"brandAgentReview": {
|
||||
"ragStatus": result.brandAgentReview.ragStatus,
|
||||
"feedback": result.brandAgentReview.feedback,
|
||||
"issues": result.brandAgentReview.issues,
|
||||
"resolvedIssues": result.brandAgentReview.resolvedIssues,
|
||||
"outstandingIssues": result.brandAgentReview.outstandingIssues,
|
||||
"newIssues": result.brandAgentReview.newIssues,
|
||||
},
|
||||
"channelBestPracticesAgentReview": {
|
||||
"ragStatus": result.channelBestPracticesAgentReview.ragStatus,
|
||||
"feedback": result.channelBestPracticesAgentReview.feedback,
|
||||
"issues": result.channelBestPracticesAgentReview.issues,
|
||||
"resolvedIssues": result.channelBestPracticesAgentReview.resolvedIssues,
|
||||
"outstandingIssues": result.channelBestPracticesAgentReview.outstandingIssues,
|
||||
"newIssues": result.channelBestPracticesAgentReview.newIssues,
|
||||
},
|
||||
"channelTechSpecsAgentReview": {
|
||||
"ragStatus": result.channelTechSpecsAgentReview.ragStatus,
|
||||
"feedback": result.channelTechSpecsAgentReview.feedback,
|
||||
"issues": result.channelTechSpecsAgentReview.issues,
|
||||
"resolvedIssues": result.channelTechSpecsAgentReview.resolvedIssues,
|
||||
"outstandingIssues": result.channelTechSpecsAgentReview.outstandingIssues,
|
||||
"newIssues": result.channelTechSpecsAgentReview.newIssues,
|
||||
},
|
||||
"leadAgentSummary": result.leadAgentSummary,
|
||||
"overallStatus": result.overallStatus,
|
||||
"financialPromotionReason": result.financialPromotionReason,
|
||||
}
|
||||
|
||||
# Persist to database
|
||||
proof_id: Optional[str] = None
|
||||
version_id: Optional[str] = None
|
||||
serialized_pdf_pages: list[dict] | None = None
|
||||
|
||||
if body.campaign_id and body.proof_name:
|
||||
try:
|
||||
async with async_session_factory() as session:
|
||||
proof_repo = ProofRepository(session)
|
||||
|
||||
file_storage_key = await storage_service.store_file(
|
||||
file_data=file_data,
|
||||
campaign_id=uuid.UUID(body.campaign_id),
|
||||
proof_name=body.proof_name,
|
||||
version=1,
|
||||
file_type=body.file_type,
|
||||
)
|
||||
|
||||
thumbnail_url = None
|
||||
if len(file_data) < 10_000_000:
|
||||
if body.file_type.startswith("image/"):
|
||||
thumbnail_url = await storage_service.generate_thumbnail_data_url(file_data, body.file_type)
|
||||
elif body.file_type == "application/pdf" and pdf_pages:
|
||||
first_page_data, _, _ = pdf_pages[0]
|
||||
thumbnail_url = await storage_service.generate_thumbnail_data_url(first_page_data, "image/png")
|
||||
|
||||
proof, version = await proof_repo.add_version_with_review(
|
||||
campaign_id=uuid.UUID(body.campaign_id),
|
||||
proof_name=body.proof_name,
|
||||
channel=body.channel,
|
||||
sub_channel=body.sub_channel,
|
||||
proof_type=body.proof_type,
|
||||
file_storage_key=file_storage_key,
|
||||
thumbnail_url=thumbnail_url,
|
||||
agent_review=result_dict,
|
||||
overall_status=result.overallStatus,
|
||||
file_hash=file_hash,
|
||||
is_identical_file=is_identical_file,
|
||||
created_by=current_user_id,
|
||||
)
|
||||
|
||||
if result.overallStatus == "Analysis Error":
|
||||
audit_repo = AuditRepository(session)
|
||||
await audit_repo.create_error_item(
|
||||
proof_version_id=version.id,
|
||||
error_summary=result.leadAgentSummary,
|
||||
)
|
||||
|
||||
await session.commit()
|
||||
proof_id = str(proof.id)
|
||||
version_id = str(version.id)
|
||||
logger.info(f"[ANALYZE] Persisted proof {proof_id} version {version.version} for job {job.job_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"[ANALYZE] Failed to persist result for job {job.job_id}: {e}")
|
||||
|
||||
# Serialize PDF pages if present
|
||||
if pdf_pages:
|
||||
serialized_pdf_pages = [
|
||||
{
|
||||
"page": i + 1,
|
||||
"data_url": f"data:image/png;base64,{base64.b64encode(png_data).decode('utf-8')}",
|
||||
"width": width,
|
||||
"height": height,
|
||||
}
|
||||
for i, (png_data, width, height) in enumerate(pdf_pages)
|
||||
]
|
||||
|
||||
# Mark job complete
|
||||
job.result = result_dict
|
||||
job.proof_id = proof_id
|
||||
job.version_id = version_id
|
||||
job.pdf_pages = serialized_pdf_pages
|
||||
job.status = "complete"
|
||||
logger.info(f"[ANALYZE] Job {job.job_id} complete — overallStatus: {result.overallStatus}")
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
job.status = "error"
|
||||
job.error_message = "Analysis timed out after 5 minutes. Please try again with a smaller file."
|
||||
logger.error(f"[ANALYZE] Job {job.job_id} timed out")
|
||||
except Exception as e:
|
||||
job.status = "error"
|
||||
job.error_message = f"Analysis failed: {e}"
|
||||
logger.error(f"[ANALYZE] Job {job.job_id} failed: {e}")
|
||||
|
|
@ -1,17 +1,17 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends
|
||||
from fastapi import FastAPI, Depends
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
from app.config import settings
|
||||
from app.services.auth_service import verify_access_token
|
||||
from app.dependencies.auth import get_current_user
|
||||
from app.models.database import init_db, close_db, async_session_factory as _session_factory
|
||||
from app.repositories.user_repository import UserRepository
|
||||
from app.api import router as api_router, kb_router
|
||||
from app.models.database import init_db, close_db
|
||||
from app.api import router as api_router, kb_router, analysis_router
|
||||
from app.services.gemini_service import GeminiService
|
||||
from app.services.reference_docs import ReferenceDocsService
|
||||
from app.services.analysis_service import AnalysisService
|
||||
from app.services.knowledge_base_service import KnowledgeBaseService
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
|
|
@ -25,37 +25,17 @@ logger = logging.getLogger(__name__)
|
|||
class HealthCheckFilter(logging.Filter):
|
||||
"""Filter out health check endpoint logs from uvicorn access log."""
|
||||
def filter(self, record: logging.LogRecord) -> bool:
|
||||
message = record.getMessage()
|
||||
if "GET /health" in message:
|
||||
if "GET /health" in record.getMessage():
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
# Filter out health check logs from uvicorn access log
|
||||
logging.getLogger("uvicorn.access").addFilter(HealthCheckFilter())
|
||||
from app.websocket.manager import ConnectionManager
|
||||
from app.websocket.handlers import handle_analyze_message
|
||||
from app.services.gemini_service import GeminiService
|
||||
from app.services.reference_docs import ReferenceDocsService
|
||||
from app.services.analysis_service import AnalysisService
|
||||
from app.services.knowledge_base_service import KnowledgeBaseService
|
||||
|
||||
|
||||
# Global services - initialized at startup
|
||||
manager = ConnectionManager()
|
||||
analysis_service: AnalysisService | None = None
|
||||
knowledge_base_service: KnowledgeBaseService | None = None
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""
|
||||
Initialize services on startup and cleanup on shutdown.
|
||||
|
||||
Loads reference documents and initializes the analysis service.
|
||||
"""
|
||||
global analysis_service, knowledge_base_service
|
||||
|
||||
"""Initialize services on startup and cleanup on shutdown."""
|
||||
# Validate settings
|
||||
settings.validate()
|
||||
|
||||
|
|
@ -68,13 +48,12 @@ async def lifespan(app: FastAPI):
|
|||
db_available = True
|
||||
except Exception as e:
|
||||
logger.warning(f"Database initialization failed (may not be available): {e}")
|
||||
print(f"Warning: Database not available - running in stateless mode")
|
||||
print("Warning: Database not available - running in stateless mode")
|
||||
|
||||
# Initialize services
|
||||
print("Loading reference documents...")
|
||||
reference_docs = ReferenceDocsService(settings.REFERENCE_DOCS_PATH)
|
||||
|
||||
# Load specs from DB if database is available
|
||||
if db_available:
|
||||
try:
|
||||
from app.models.database import async_session_factory
|
||||
|
|
@ -84,7 +63,6 @@ async def lifespan(app: FastAPI):
|
|||
except Exception as e:
|
||||
logger.warning(f"Failed to load specs from DB (falling back to files): {e}")
|
||||
|
||||
# Log document info
|
||||
doc_summary = reference_docs.get_context_summary()
|
||||
print(f" Brand documents: {len(doc_summary['brand_files'])} files ({doc_summary['brand_context_length']} chars)")
|
||||
print(f" Channel documents: {len(doc_summary['channel_files'])} files ({doc_summary['channel_context_length']} chars)")
|
||||
|
|
@ -94,13 +72,16 @@ async def lifespan(app: FastAPI):
|
|||
|
||||
print("Initializing analysis service...")
|
||||
analysis_service = AnalysisService(gemini_service, reference_docs)
|
||||
app.state.analysis_service = analysis_service
|
||||
|
||||
# Initialize Knowledge Base service (requires LlamaParse API key)
|
||||
knowledge_base_service = None
|
||||
if settings.LLAMA_CLOUD_API_KEY:
|
||||
from app.services.llamaparse_service import LlamaParseService
|
||||
print("Initializing LlamaParse service...")
|
||||
llamaparse_service = LlamaParseService(settings.LLAMA_CLOUD_API_KEY, settings.LLAMA_CLOUD_BASE_URL)
|
||||
knowledge_base_service = KnowledgeBaseService(llamaparse_service, gemini_service, reference_docs)
|
||||
app.state.knowledge_base_service = knowledge_base_service
|
||||
print("Knowledge Base pipeline ready!")
|
||||
else:
|
||||
print("LLAMA_CLOUD_API_KEY not set - Knowledge Base processing pipeline disabled")
|
||||
|
|
@ -122,7 +103,7 @@ app = FastAPI(
|
|||
lifespan=lifespan,
|
||||
)
|
||||
|
||||
# CORS middleware - allow frontend to connect
|
||||
# CORS middleware
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=settings.CORS_ORIGINS.split(","),
|
||||
|
|
@ -134,6 +115,7 @@ app.add_middleware(
|
|||
# Include API routes
|
||||
app.include_router(api_router, prefix="/api")
|
||||
app.include_router(kb_router, prefix="/api")
|
||||
app.include_router(analysis_router, prefix="/api")
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
|
|
@ -145,6 +127,7 @@ async def health_check():
|
|||
@app.get("/info")
|
||||
async def info(user: dict = Depends(get_current_user)):
|
||||
"""Get backend information. Requires authentication."""
|
||||
analysis_service = getattr(app.state, "analysis_service", None)
|
||||
if analysis_service:
|
||||
ref_docs = analysis_service.reference_docs
|
||||
doc_summary = ref_docs.get_context_summary()
|
||||
|
|
@ -155,118 +138,3 @@ async def info(user: dict = Depends(get_current_user)):
|
|||
"reference_docs": doc_summary,
|
||||
}
|
||||
return {"status": "initializing", "user": user.get("name", "Unknown")}
|
||||
|
||||
|
||||
@app.websocket("/ws/analyze")
|
||||
async def websocket_analyze(websocket: WebSocket):
|
||||
"""
|
||||
WebSocket endpoint for proof analysis with real-time updates.
|
||||
|
||||
Protocol:
|
||||
- Client sends: {"type": "analyze", "file_data": "<base64>", "file_type": "image/png", "is_wip": false, "access_token": "<jwt>"}
|
||||
- Server verifies token before processing
|
||||
- Server sends: {"type": "agent_started", "agent_name": "..."}
|
||||
- Server sends: {"type": "agent_completed", "agent_name": "...", "review": {...}}
|
||||
- Server sends: {"type": "complete", "result": {...}}
|
||||
- On error: {"type": "error", "message": "..."}
|
||||
"""
|
||||
client_id = str(uuid.uuid4())
|
||||
logger.info(f"[MAIN] WebSocket connection established - client_id: {client_id}")
|
||||
await manager.connect(websocket, client_id)
|
||||
|
||||
try:
|
||||
while True:
|
||||
# Wait for a message from the client
|
||||
data = await websocket.receive_json()
|
||||
logger.info(f"[MAIN] Received message from client {client_id} - type: {data.get('type')}")
|
||||
|
||||
if data.get("type") == "analyze":
|
||||
# Verify access token from message
|
||||
access_token = data.get("access_token")
|
||||
user_claims = await verify_access_token(access_token)
|
||||
|
||||
if not user_claims:
|
||||
logger.warning(f"[MAIN] Authentication failed for client {client_id}")
|
||||
await manager.send_message(client_id, {
|
||||
"type": "error",
|
||||
"message": "Authentication failed. Please sign in again."
|
||||
})
|
||||
continue
|
||||
|
||||
logger.info(f"[MAIN] Authenticated user: {user_claims.get('name', 'unknown')}")
|
||||
|
||||
# Check role: oversight_admin cannot upload/analyze proofs
|
||||
current_user_id: Optional[uuid.UUID] = None
|
||||
try:
|
||||
async with _session_factory() as ws_session:
|
||||
ws_user_repo = UserRepository(ws_session)
|
||||
azure_oid = user_claims.get("oid") or user_claims.get("sub")
|
||||
ws_user = await ws_user_repo.get_by_azure_oid(azure_oid) if azure_oid else None
|
||||
current_user_id = ws_user.id if ws_user else None
|
||||
except Exception as role_err:
|
||||
logger.warning(f"[MAIN] Role check failed for client {client_id}: {role_err}")
|
||||
|
||||
if analysis_service is None:
|
||||
logger.error("[MAIN] Analysis service not ready")
|
||||
await manager.send_message(client_id, {
|
||||
"type": "error",
|
||||
"message": "Backend not ready. Please wait for initialization."
|
||||
})
|
||||
continue
|
||||
|
||||
# Start keepalive heartbeat to prevent proxy idle-timeout
|
||||
# (Apache/nginx/Cloud Run default is 60s; we ping every 25s)
|
||||
async def _heartbeat(ws: WebSocket) -> None:
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(10)
|
||||
await ws.send_json({"type": "heartbeat"})
|
||||
logger.debug(f"[MAIN] Heartbeat sent to client {client_id}")
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as hb_err:
|
||||
logger.debug(f"[MAIN] Heartbeat stopped for client {client_id}: {hb_err}")
|
||||
|
||||
heartbeat_task = asyncio.create_task(_heartbeat(websocket))
|
||||
try:
|
||||
# Handle the analysis request
|
||||
await handle_analyze_message(
|
||||
websocket=websocket,
|
||||
client_id=client_id,
|
||||
data=data,
|
||||
manager=manager,
|
||||
analysis_service=analysis_service,
|
||||
current_user_id=current_user_id,
|
||||
)
|
||||
finally:
|
||||
heartbeat_task.cancel()
|
||||
elif data.get("type") == "ping":
|
||||
# Client keepalive ping — respond with pong
|
||||
await manager.send_message(client_id, {"type": "pong"})
|
||||
else:
|
||||
logger.warning(f"[MAIN] Unknown message type: {data.get('type')}")
|
||||
await manager.send_message(client_id, {
|
||||
"type": "error",
|
||||
"message": f"Unknown message type: {data.get('type')}"
|
||||
})
|
||||
|
||||
except WebSocketDisconnect:
|
||||
logger.info(f"[MAIN] Client {client_id} disconnected")
|
||||
manager.disconnect(client_id)
|
||||
except RuntimeError as e:
|
||||
# Client disconnected mid-analysis (e.g. navigated away before result arrived)
|
||||
if "not connected" in str(e).lower() or "websocket" in str(e).lower():
|
||||
logger.info(f"[MAIN] Client {client_id} disconnected before result was sent")
|
||||
else:
|
||||
logger.error(f"[MAIN] RuntimeError for client {client_id}: {str(e)}")
|
||||
manager.disconnect(client_id)
|
||||
except Exception as e:
|
||||
logger.error(f"[MAIN] Error for client {client_id}: {str(e)}")
|
||||
try:
|
||||
await manager.send_message(client_id, {
|
||||
"type": "error",
|
||||
"message": str(e)
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
manager.disconnect(client_id)
|
||||
|
|
|
|||
50
backend/app/services/job_store.py
Normal file
50
backend/app/services/job_store.py
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
import time
|
||||
import uuid
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# In-memory job store — safe for single Cloud Run instance
|
||||
_jobs: dict[str, "AnalysisJob"] = {}
|
||||
|
||||
JOB_TTL_SECONDS = 1800 # 30 minutes
|
||||
|
||||
|
||||
@dataclass
|
||||
class AnalysisJob:
|
||||
job_id: str
|
||||
status: str # "pending" | "running" | "complete" | "error"
|
||||
created_at: float
|
||||
agents_started: list[str] = field(default_factory=list)
|
||||
agents_completed: dict[str, dict] = field(default_factory=dict)
|
||||
model_fallback: bool = False
|
||||
result: dict | None = None
|
||||
proof_id: str | None = None
|
||||
version_id: str | None = None
|
||||
pdf_pages: list[dict] | None = None
|
||||
is_identical_file: bool = False
|
||||
error_message: str | None = None
|
||||
|
||||
|
||||
def _cleanup_old_jobs() -> None:
|
||||
"""Remove jobs older than JOB_TTL_SECONDS."""
|
||||
cutoff = time.time() - JOB_TTL_SECONDS
|
||||
stale = [jid for jid, job in _jobs.items() if job.created_at < cutoff]
|
||||
for jid in stale:
|
||||
del _jobs[jid]
|
||||
if stale:
|
||||
logger.info(f"[JOB_STORE] Cleaned up {len(stale)} stale jobs")
|
||||
|
||||
|
||||
def create_job() -> AnalysisJob:
|
||||
_cleanup_old_jobs()
|
||||
job_id = str(uuid.uuid4())
|
||||
job = AnalysisJob(job_id=job_id, status="pending", created_at=time.time())
|
||||
_jobs[job_id] = job
|
||||
logger.info(f"[JOB_STORE] Created job {job_id}")
|
||||
return job
|
||||
|
||||
|
||||
def get_job(job_id: str) -> AnalysisJob | None:
|
||||
return _jobs.get(job_id)
|
||||
|
|
@ -1,2 +0,0 @@
|
|||
from .manager import ConnectionManager
|
||||
from .handlers import handle_analyze_message
|
||||
|
|
@ -1,320 +0,0 @@
|
|||
import asyncio
|
||||
import base64
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import WebSocket
|
||||
|
||||
from app.websocket.manager import ConnectionManager
|
||||
from app.services.analysis_service import AnalysisService
|
||||
from app.models.schemas import SubReview
|
||||
from app.models.database import async_session_factory
|
||||
from app.repositories import ProofRepository, CampaignRepository, UserRepository, AuditRepository
|
||||
from app.services.storage_service import storage_service
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def handle_analyze_message(
|
||||
websocket: WebSocket,
|
||||
client_id: str,
|
||||
data: dict,
|
||||
manager: ConnectionManager,
|
||||
analysis_service: AnalysisService,
|
||||
current_user_id: Optional[uuid.UUID] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Handle an 'analyze' message from the client.
|
||||
|
||||
Runs the proof analysis and sends real-time updates via WebSocket.
|
||||
|
||||
Args:
|
||||
websocket: The WebSocket connection
|
||||
client_id: Unique client identifier
|
||||
data: The message data containing file_data, file_type, is_wip
|
||||
manager: Connection manager for sending messages
|
||||
analysis_service: Service to run the analysis
|
||||
"""
|
||||
try:
|
||||
logger.info(f"[WEBSOCKET] Received analyze request from client: {client_id}")
|
||||
|
||||
# Extract and decode the file data
|
||||
file_data_b64 = data.get("file_data", "")
|
||||
file_type = data.get("file_type", "image/png")
|
||||
is_wip = data.get("is_wip", False)
|
||||
|
||||
logger.info(f"[WEBSOCKET] File type: {file_type}, is_wip: {is_wip}, base64 length: {len(file_data_b64)}")
|
||||
|
||||
# Decode base64 file data
|
||||
try:
|
||||
file_data = base64.b64decode(file_data_b64)
|
||||
logger.info(f"[WEBSOCKET] Decoded file size: {len(file_data)} bytes")
|
||||
except Exception as e:
|
||||
logger.error(f"[WEBSOCKET] Failed to decode file data: {str(e)}")
|
||||
await manager.send_message(client_id, {
|
||||
"type": "error",
|
||||
"message": f"Failed to decode file data: {str(e)}"
|
||||
})
|
||||
return
|
||||
|
||||
# Compute file hash for duplicate detection
|
||||
file_hash = storage_service.get_checksum(file_data)
|
||||
logger.info(f"[WEBSOCKET] Computed file hash: {file_hash}")
|
||||
|
||||
# Create callback for real-time updates
|
||||
async def on_agent_update(agent_name: str, review: SubReview | None) -> None:
|
||||
if not manager.is_connected(client_id):
|
||||
logger.warning(f"[WEBSOCKET] Client {client_id} disconnected, skipping update")
|
||||
return
|
||||
|
||||
if review is None:
|
||||
# Agent is starting
|
||||
logger.info(f"[WEBSOCKET] Sending agent_started: {agent_name}")
|
||||
await manager.send_message(client_id, {
|
||||
"type": "agent_started",
|
||||
"agent_name": agent_name
|
||||
})
|
||||
else:
|
||||
# Agent completed
|
||||
logger.info(f"[WEBSOCKET] Sending agent_completed: {agent_name} - ragStatus: {review.ragStatus}")
|
||||
await manager.send_message(client_id, {
|
||||
"type": "agent_completed",
|
||||
"agent_name": agent_name,
|
||||
"review": {
|
||||
"ragStatus": review.ragStatus,
|
||||
"feedback": review.feedback,
|
||||
"issues": review.issues,
|
||||
"isFinancialPromotion": review.isFinancialPromotion,
|
||||
"financialPromotionReason": review.financialPromotionReason,
|
||||
"resolvedIssues": review.resolvedIssues,
|
||||
"outstandingIssues": review.outstandingIssues,
|
||||
"newIssues": review.newIssues,
|
||||
}
|
||||
})
|
||||
|
||||
# Extract brand selection for brand agent
|
||||
brand = data.get("brand", "Barclaycard") # Default to Barclaycard if not specified
|
||||
logger.info(f"[WEBSOCKET] Brand selection: {brand}")
|
||||
|
||||
# Fetch previous analysis if this is a revision
|
||||
previous_analysis = None
|
||||
previous_file_hash = None
|
||||
is_identical_file = False
|
||||
campaign_id = data.get("campaign_id")
|
||||
proof_name = data.get("proof_name")
|
||||
|
||||
if campaign_id and proof_name:
|
||||
try:
|
||||
logger.info(f"[WEBSOCKET] Checking for previous analysis - campaign: {campaign_id}, proof: {proof_name}")
|
||||
async with async_session_factory() as session:
|
||||
proof_repo = ProofRepository(session)
|
||||
existing_proof = await proof_repo.get_by_campaign_and_name(
|
||||
uuid.UUID(campaign_id), proof_name
|
||||
)
|
||||
if existing_proof:
|
||||
previous_analysis = await proof_repo.get_latest_version_review(existing_proof.id)
|
||||
previous_file_hash = await proof_repo.get_latest_version_hash(existing_proof.id)
|
||||
if previous_analysis:
|
||||
logger.info(f"[WEBSOCKET] Found previous analysis version {previous_analysis.get('version')}")
|
||||
else:
|
||||
logger.info("[WEBSOCKET] No previous analysis found (new proof)")
|
||||
# Check if file is identical to previous version
|
||||
if previous_file_hash and previous_file_hash == file_hash:
|
||||
is_identical_file = True
|
||||
logger.info(f"[WEBSOCKET] Identical file detected - hash matches previous version: {file_hash}")
|
||||
else:
|
||||
logger.info("[WEBSOCKET] No existing proof found (new proof)")
|
||||
except Exception as e:
|
||||
logger.warning(f"[WEBSOCKET] Failed to fetch previous analysis: {str(e)}")
|
||||
# Continue without previous analysis - still run the current analysis
|
||||
|
||||
# Extract proof metadata for agent context
|
||||
channel = data.get("channel")
|
||||
sub_channel = data.get("sub_channel")
|
||||
proof_type = data.get("proof_type")
|
||||
|
||||
# Build a once-only callback that notifies the client when the primary
|
||||
# Gemini model is unavailable and the fallback model is used instead.
|
||||
fallback_notified = False
|
||||
|
||||
async def on_model_fallback() -> None:
|
||||
nonlocal fallback_notified
|
||||
if fallback_notified:
|
||||
return
|
||||
fallback_notified = True
|
||||
logger.info(f"[WEBSOCKET] Sending model_fallback notification to client: {client_id}")
|
||||
if manager.is_connected(client_id):
|
||||
await manager.send_message(client_id, {"type": "model_fallback"})
|
||||
else:
|
||||
logger.warning(f"[WEBSOCKET] Cannot send model_fallback - client {client_id} not connected")
|
||||
|
||||
# Run the analysis (5-minute hard timeout)
|
||||
logger.info("[WEBSOCKET] Starting analysis...")
|
||||
try:
|
||||
result, pdf_pages = await asyncio.wait_for(
|
||||
analysis_service.analyze_proof(
|
||||
file_data=file_data,
|
||||
file_type=file_type,
|
||||
on_agent_update=on_agent_update,
|
||||
is_wip=is_wip,
|
||||
brand=brand,
|
||||
previous_analysis=previous_analysis,
|
||||
channel=channel,
|
||||
sub_channel=sub_channel,
|
||||
proof_type=proof_type,
|
||||
on_fallback=on_model_fallback,
|
||||
),
|
||||
timeout=300.0,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"[WEBSOCKET] Analysis timed out for client {client_id}")
|
||||
await manager.send_message(client_id, {
|
||||
"type": "error",
|
||||
"message": "Analysis timed out after 5 minutes. Please try again with a smaller file."
|
||||
})
|
||||
return
|
||||
|
||||
# Build the result dict
|
||||
result_dict = {
|
||||
"legalAgentReview": {
|
||||
"ragStatus": result.legalAgentReview.ragStatus,
|
||||
"feedback": result.legalAgentReview.feedback,
|
||||
"issues": result.legalAgentReview.issues,
|
||||
"isFinancialPromotion": result.legalAgentReview.isFinancialPromotion,
|
||||
"financialPromotionReason": result.legalAgentReview.financialPromotionReason,
|
||||
"resolvedIssues": result.legalAgentReview.resolvedIssues,
|
||||
"outstandingIssues": result.legalAgentReview.outstandingIssues,
|
||||
"newIssues": result.legalAgentReview.newIssues,
|
||||
},
|
||||
"brandAgentReview": {
|
||||
"ragStatus": result.brandAgentReview.ragStatus,
|
||||
"feedback": result.brandAgentReview.feedback,
|
||||
"issues": result.brandAgentReview.issues,
|
||||
"resolvedIssues": result.brandAgentReview.resolvedIssues,
|
||||
"outstandingIssues": result.brandAgentReview.outstandingIssues,
|
||||
"newIssues": result.brandAgentReview.newIssues,
|
||||
},
|
||||
"channelBestPracticesAgentReview": {
|
||||
"ragStatus": result.channelBestPracticesAgentReview.ragStatus,
|
||||
"feedback": result.channelBestPracticesAgentReview.feedback,
|
||||
"issues": result.channelBestPracticesAgentReview.issues,
|
||||
"resolvedIssues": result.channelBestPracticesAgentReview.resolvedIssues,
|
||||
"outstandingIssues": result.channelBestPracticesAgentReview.outstandingIssues,
|
||||
"newIssues": result.channelBestPracticesAgentReview.newIssues,
|
||||
},
|
||||
"channelTechSpecsAgentReview": {
|
||||
"ragStatus": result.channelTechSpecsAgentReview.ragStatus,
|
||||
"feedback": result.channelTechSpecsAgentReview.feedback,
|
||||
"issues": result.channelTechSpecsAgentReview.issues,
|
||||
"resolvedIssues": result.channelTechSpecsAgentReview.resolvedIssues,
|
||||
"outstandingIssues": result.channelTechSpecsAgentReview.outstandingIssues,
|
||||
"newIssues": result.channelTechSpecsAgentReview.newIssues,
|
||||
},
|
||||
"leadAgentSummary": result.leadAgentSummary,
|
||||
"overallStatus": result.overallStatus,
|
||||
"financialPromotionReason": result.financialPromotionReason,
|
||||
}
|
||||
|
||||
# Persist to database if campaign info provided
|
||||
proof_id: Optional[str] = None
|
||||
version_id: Optional[str] = None
|
||||
|
||||
if campaign_id and proof_name:
|
||||
try:
|
||||
logger.info(f"[WEBSOCKET] Persisting result for campaign {campaign_id}, proof {proof_name}")
|
||||
async with async_session_factory() as session:
|
||||
proof_repo = ProofRepository(session)
|
||||
|
||||
# Store the file
|
||||
file_storage_key = await storage_service.store_file(
|
||||
file_data=file_data,
|
||||
campaign_id=uuid.UUID(campaign_id),
|
||||
proof_name=proof_name,
|
||||
version=1, # Will be updated by add_version_with_review
|
||||
file_type=file_type,
|
||||
)
|
||||
|
||||
# Generate thumbnail for images up to 10MB (data URLs are ~33% larger than binary)
|
||||
thumbnail_url = None
|
||||
if len(file_data) < 10000000:
|
||||
if file_type.startswith('image/'):
|
||||
thumbnail_url = await storage_service.generate_thumbnail_data_url(file_data, file_type)
|
||||
elif file_type == 'application/pdf' and pdf_pages:
|
||||
# Use first rasterized page as thumbnail
|
||||
first_page_data, _, _ = pdf_pages[0]
|
||||
thumbnail_url = await storage_service.generate_thumbnail_data_url(first_page_data, 'image/png')
|
||||
|
||||
# Save proof and version
|
||||
proof, version = await proof_repo.add_version_with_review(
|
||||
campaign_id=uuid.UUID(campaign_id),
|
||||
proof_name=proof_name,
|
||||
channel=data.get("channel"),
|
||||
sub_channel=data.get("sub_channel"),
|
||||
proof_type=data.get("proof_type"),
|
||||
file_storage_key=file_storage_key,
|
||||
thumbnail_url=thumbnail_url,
|
||||
agent_review=result_dict,
|
||||
overall_status=result.overallStatus,
|
||||
file_hash=file_hash,
|
||||
is_identical_file=is_identical_file,
|
||||
created_by=current_user_id,
|
||||
)
|
||||
|
||||
# Auto-create ErrorItem when analysis results in an error
|
||||
if result.overallStatus == "Analysis Error":
|
||||
audit_repo = AuditRepository(session)
|
||||
await audit_repo.create_error_item(
|
||||
proof_version_id=version.id,
|
||||
error_summary=result.leadAgentSummary,
|
||||
)
|
||||
logger.info(f"[WEBSOCKET] Created ErrorItem for analysis error on version {version.id}")
|
||||
|
||||
await session.commit()
|
||||
proof_id = str(proof.id)
|
||||
version_id = str(version.id)
|
||||
logger.info(f"[WEBSOCKET] Persisted proof {proof_id} version {version.version}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[WEBSOCKET] Failed to persist result: {str(e)}")
|
||||
# Continue - still send result to client even if persistence fails
|
||||
|
||||
# Send the complete result
|
||||
logger.info(f"[WEBSOCKET] Analysis complete, sending result - overallStatus: {result.overallStatus}")
|
||||
if manager.is_connected(client_id):
|
||||
response = {
|
||||
"type": "complete",
|
||||
"result": result_dict,
|
||||
"is_identical_file": is_identical_file,
|
||||
}
|
||||
# Include proof/version IDs if persisted
|
||||
if proof_id:
|
||||
response["proof_id"] = proof_id
|
||||
if version_id:
|
||||
response["version_id"] = version_id
|
||||
|
||||
# Include rasterized PDF pages if present
|
||||
if pdf_pages:
|
||||
import base64 as b64_module
|
||||
response["pdf_pages"] = [
|
||||
{
|
||||
"page": i + 1,
|
||||
"data_url": f"data:image/png;base64,{b64_module.b64encode(png_data).decode('utf-8')}",
|
||||
"width": width,
|
||||
"height": height,
|
||||
}
|
||||
for i, (png_data, width, height) in enumerate(pdf_pages)
|
||||
]
|
||||
logger.info(f"[WEBSOCKET] Including {len(pdf_pages)} rasterized PDF pages in response")
|
||||
|
||||
await manager.send_message(client_id, response)
|
||||
logger.info(f"[WEBSOCKET] Result sent to client: {client_id}")
|
||||
|
||||
except Exception as e:
|
||||
# Send error message
|
||||
logger.error(f"[WEBSOCKET] Analysis failed for client {client_id}: {str(e)}")
|
||||
if manager.is_connected(client_id):
|
||||
await manager.send_message(client_id, {
|
||||
"type": "error",
|
||||
"message": f"Analysis failed: {str(e)}"
|
||||
})
|
||||
|
|
@ -1,53 +0,0 @@
|
|||
import logging
|
||||
|
||||
from fastapi import WebSocket
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConnectionManager:
|
||||
"""Manages WebSocket connections for real-time updates."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the connection manager."""
|
||||
self.active_connections: dict[str, WebSocket] = {}
|
||||
|
||||
async def connect(self, websocket: WebSocket, client_id: str) -> None:
|
||||
"""
|
||||
Accept a new WebSocket connection.
|
||||
|
||||
Args:
|
||||
websocket: The WebSocket connection
|
||||
client_id: Unique identifier for this client
|
||||
"""
|
||||
await websocket.accept()
|
||||
self.active_connections[client_id] = websocket
|
||||
|
||||
def disconnect(self, client_id: str) -> None:
|
||||
"""
|
||||
Remove a client connection.
|
||||
|
||||
Args:
|
||||
client_id: The client to disconnect
|
||||
"""
|
||||
if client_id in self.active_connections:
|
||||
del self.active_connections[client_id]
|
||||
|
||||
async def send_message(self, client_id: str, message: dict) -> None:
|
||||
"""
|
||||
Send a JSON message to a specific client.
|
||||
|
||||
Args:
|
||||
client_id: The target client
|
||||
message: Dictionary to send as JSON
|
||||
"""
|
||||
if client_id in self.active_connections:
|
||||
try:
|
||||
await self.active_connections[client_id].send_json(message)
|
||||
except Exception as e:
|
||||
logger.warning(f"[MANAGER] Failed to send message to client {client_id}: {e}")
|
||||
self.disconnect(client_id)
|
||||
|
||||
def is_connected(self, client_id: str) -> bool:
|
||||
"""Check if a client is still connected."""
|
||||
return client_id in self.active_connections
|
||||
|
|
@ -9,15 +9,12 @@ spec:
|
|||
template:
|
||||
metadata:
|
||||
annotations:
|
||||
# Keep 1 instance warm to prevent cold-start WebSocket failures
|
||||
# Keep 1 instance warm to prevent cold-start latency
|
||||
autoscaling.knative.dev/minScale: "1"
|
||||
autoscaling.knative.dev/maxScale: "10"
|
||||
# Each instance handles up to 10 concurrent analyses (one per WebSocket)
|
||||
# Each instance handles up to 10 concurrent analyses
|
||||
autoscaling.knative.dev/target: "10"
|
||||
# Required for WebSocket: disable HTTP/2 multiplexing
|
||||
run.googleapis.com/execution-environment: gen2
|
||||
# Required for WebSocket: route all frames from a client to the same instance
|
||||
run.googleapis.com/sessionAffinity: "true"
|
||||
spec:
|
||||
# 10-minute timeout — analysis (4 agents + lead agent) can take 2-3 minutes
|
||||
# for large multi-page PDFs; 600s gives headroom without being excessive
|
||||
|
|
|
|||
|
|
@ -56,7 +56,6 @@ require_var GEMINI_API_KEY
|
|||
require_var AZURE_TENANT_ID
|
||||
require_var AZURE_CLIENT_ID
|
||||
require_var VITE_BACKEND_URL
|
||||
require_var VITE_BACKEND_WS_URL
|
||||
require_var VITE_AZURE_REDIRECT_URI
|
||||
|
||||
# Set defaults
|
||||
|
|
@ -114,7 +113,6 @@ cat > frontend/.env.local << EOF
|
|||
# Edit .env.deploy instead and re-run deploy.sh
|
||||
VITE_BASE_PATH=${VITE_BASE_PATH}
|
||||
VITE_BACKEND_URL=${VITE_BACKEND_URL}
|
||||
VITE_BACKEND_WS_URL=${VITE_BACKEND_WS_URL}
|
||||
VITE_AZURE_CLIENT_ID=${AZURE_CLIENT_ID}
|
||||
VITE_AZURE_TENANT_ID=${AZURE_TENANT_ID}
|
||||
VITE_AZURE_REDIRECT_URI=${VITE_AZURE_REDIRECT_URI}
|
||||
|
|
|
|||
|
|
@ -2,10 +2,10 @@ import type { AgentReview, SubReview, AgentName, PDFPage } from '../types';
|
|||
import { IPublicClientApplication } from '@azure/msal-browser';
|
||||
import { getAccessToken } from './authService';
|
||||
|
||||
// WebSocket URL for backend communication
|
||||
const WS_URL = import.meta.env.VITE_BACKEND_WS_URL || 'ws://localhost:8000/ws/analyze';
|
||||
const HTTP_URL = import.meta.env.VITE_BACKEND_URL || 'http://localhost:8000';
|
||||
|
||||
const POLL_INTERVAL_MS = 2000;
|
||||
|
||||
/**
|
||||
* Options for proof analysis with optional database persistence.
|
||||
*/
|
||||
|
|
@ -30,11 +30,38 @@ export interface AnalyzeProofResult {
|
|||
isIdenticalFile?: boolean;
|
||||
}
|
||||
|
||||
/** Read a File as base64 string (without the data-url prefix). */
|
||||
async function fileToBase64(file: File): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const reader = new FileReader();
|
||||
reader.onloadend = () => resolve((reader.result as string).split(',')[1]);
|
||||
reader.onerror = () => reject(new Error('Failed to read file'));
|
||||
reader.readAsDataURL(file);
|
||||
});
|
||||
}
|
||||
|
||||
/** Sleep for `ms` milliseconds. */
|
||||
const sleep = (ms: number) => new Promise<void>(resolve => setTimeout(resolve, ms));
|
||||
|
||||
/** Authenticated fetch helper — adds Bearer token. */
|
||||
async function authFetch(
|
||||
url: string,
|
||||
init: RequestInit,
|
||||
accessToken: string,
|
||||
): Promise<Response> {
|
||||
return fetch(url, {
|
||||
...init,
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': `Bearer ${accessToken}`,
|
||||
...(init.headers ?? {}),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Analyze a proof using the backend WebSocket API.
|
||||
* Provides real-time updates as each agent completes.
|
||||
* Now requires MSAL instance to acquire access token.
|
||||
* Optionally pass campaign info to persist results to database.
|
||||
* Analyze a proof using the backend REST API.
|
||||
* Provides real-time updates as each agent completes via polling.
|
||||
*/
|
||||
export const analyzeProof = async (
|
||||
file: File,
|
||||
|
|
@ -43,149 +70,87 @@ export const analyzeProof = async (
|
|||
options?: AnalyzeProofOptions,
|
||||
onNotification?: (message: string) => void,
|
||||
): Promise<AnalyzeProofResult> => {
|
||||
// Acquire token before connecting
|
||||
const accessToken = await getAccessToken(msalInstance);
|
||||
if (!accessToken) {
|
||||
throw new Error('Failed to acquire access token. Please sign in again.');
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const ws = new WebSocket(WS_URL);
|
||||
let resolved = false;
|
||||
const fileData = await fileToBase64(file);
|
||||
|
||||
// Send client→server pings every 15s to keep proxy idle timers alive
|
||||
let pingInterval: ReturnType<typeof setInterval> | null = null;
|
||||
const startPing = () => {
|
||||
pingInterval = setInterval(() => {
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(JSON.stringify({ type: 'ping' }));
|
||||
}
|
||||
}, 15000);
|
||||
};
|
||||
const stopPing = () => {
|
||||
if (pingInterval !== null) {
|
||||
clearInterval(pingInterval);
|
||||
pingInterval = null;
|
||||
// Submit the analysis job
|
||||
const submitRes = await authFetch(
|
||||
`${HTTP_URL}/api/analyze`,
|
||||
{
|
||||
method: 'POST',
|
||||
body: JSON.stringify({
|
||||
file_data: fileData,
|
||||
file_type: file.type,
|
||||
is_wip: false,
|
||||
campaign_id: options?.campaignId,
|
||||
proof_name: options?.proofName,
|
||||
channel: options?.channel,
|
||||
sub_channel: options?.subChannel,
|
||||
proof_type: options?.proofType,
|
||||
brand: options?.brand ?? 'Barclaycard',
|
||||
}),
|
||||
},
|
||||
accessToken,
|
||||
);
|
||||
|
||||
if (!submitRes.ok) {
|
||||
const err = await submitRes.text();
|
||||
throw new Error(`Failed to submit analysis: ${submitRes.status} ${err}`);
|
||||
}
|
||||
|
||||
const { job_id } = await submitRes.json();
|
||||
|
||||
// Poll until complete
|
||||
const seenAgents = new Set<string>();
|
||||
let notifiedFallback = false;
|
||||
|
||||
while (true) {
|
||||
await sleep(POLL_INTERVAL_MS);
|
||||
|
||||
const pollRes = await authFetch(
|
||||
`${HTTP_URL}/api/analyze/${job_id}`,
|
||||
{ method: 'GET' },
|
||||
accessToken,
|
||||
);
|
||||
|
||||
if (!pollRes.ok) {
|
||||
throw new Error(`Polling failed: ${pollRes.status}`);
|
||||
}
|
||||
|
||||
const job = await pollRes.json();
|
||||
|
||||
// Notify about model fallback once
|
||||
if (job.model_fallback && !notifiedFallback) {
|
||||
notifiedFallback = true;
|
||||
onNotification?.('The primary AI model is currently unavailable. Analysis is continuing with the backup model and may take longer than usual.');
|
||||
}
|
||||
|
||||
// Fire callbacks for newly completed agents
|
||||
for (const agentName of Object.keys(job.agents_completed ?? {})) {
|
||||
if (!seenAgents.has(agentName)) {
|
||||
seenAgents.add(agentName);
|
||||
onAgentUpdate(agentName as AgentName, job.agents_completed[agentName] as SubReview);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
ws.onopen = () => {
|
||||
startPing();
|
||||
// Convert file to base64 and send
|
||||
const reader = new FileReader();
|
||||
reader.onloadend = () => {
|
||||
const base64Data = (reader.result as string).split(',')[1];
|
||||
const message: Record<string, any> = {
|
||||
type: 'analyze',
|
||||
file_data: base64Data,
|
||||
file_type: file.type,
|
||||
is_wip: false,
|
||||
access_token: accessToken
|
||||
};
|
||||
|
||||
// Include campaign info for database persistence if provided
|
||||
if (options?.campaignId) {
|
||||
message.campaign_id = options.campaignId;
|
||||
}
|
||||
if (options?.proofName) {
|
||||
message.proof_name = options.proofName;
|
||||
}
|
||||
if (options?.channel) {
|
||||
message.channel = options.channel;
|
||||
}
|
||||
if (options?.subChannel) {
|
||||
message.sub_channel = options.subChannel;
|
||||
}
|
||||
if (options?.proofType) {
|
||||
message.proof_type = options.proofType;
|
||||
}
|
||||
if (options?.brand) {
|
||||
message.brand = options.brand;
|
||||
}
|
||||
|
||||
ws.send(JSON.stringify(message));
|
||||
if (job.status === 'complete') {
|
||||
return {
|
||||
review: job.result as AgentReview,
|
||||
proofId: job.proof_id ?? undefined,
|
||||
versionId: job.version_id ?? undefined,
|
||||
pdfPages: job.pdf_pages as PDFPage[] | undefined,
|
||||
isIdenticalFile: job.is_identical_file ?? undefined,
|
||||
};
|
||||
reader.onerror = () => {
|
||||
ws.close();
|
||||
reject(new Error('Failed to read file'));
|
||||
};
|
||||
reader.readAsDataURL(file);
|
||||
};
|
||||
}
|
||||
|
||||
ws.onmessage = (event) => {
|
||||
try {
|
||||
const message = JSON.parse(event.data);
|
||||
|
||||
switch (message.type) {
|
||||
case 'agent_started':
|
||||
// Agent is starting - can optionally show loading state
|
||||
break;
|
||||
|
||||
case 'agent_completed':
|
||||
// Agent completed - update UI
|
||||
onAgentUpdate(message.agent_name as AgentName, message.review);
|
||||
break;
|
||||
|
||||
case 'summary':
|
||||
// Summary ready
|
||||
onAgentUpdate('Summary');
|
||||
break;
|
||||
|
||||
case 'pong':
|
||||
// Response to our ping — ignore
|
||||
break;
|
||||
|
||||
case 'complete':
|
||||
// Analysis complete - resolve with full result
|
||||
resolved = true;
|
||||
stopPing();
|
||||
ws.close();
|
||||
resolve({
|
||||
review: message.result as AgentReview,
|
||||
proofId: message.proof_id,
|
||||
versionId: message.version_id,
|
||||
pdfPages: message.pdf_pages as PDFPage[] | undefined,
|
||||
isIdenticalFile: message.is_identical_file as boolean | undefined,
|
||||
});
|
||||
break;
|
||||
|
||||
case 'heartbeat':
|
||||
// Server keepalive — ignore silently
|
||||
break;
|
||||
|
||||
case 'model_fallback':
|
||||
onNotification?.('The primary AI model is currently unavailable. Analysis is continuing with the backup model and may take longer than usual.');
|
||||
break;
|
||||
|
||||
case 'error':
|
||||
// Error occurred
|
||||
resolved = true;
|
||||
stopPing();
|
||||
ws.close();
|
||||
reject(new Error(message.message || 'Analysis failed'));
|
||||
break;
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('Failed to parse WebSocket message:', e);
|
||||
}
|
||||
};
|
||||
|
||||
ws.onerror = () => {
|
||||
stopPing();
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
reject(new Error('WebSocket connection error. Is the backend running?'));
|
||||
}
|
||||
};
|
||||
|
||||
ws.onclose = (event) => {
|
||||
stopPing();
|
||||
if (!resolved && !event.wasClean) {
|
||||
resolved = true;
|
||||
reject(new Error('WebSocket connection closed unexpectedly'));
|
||||
}
|
||||
};
|
||||
});
|
||||
if (job.status === 'error') {
|
||||
throw new Error(job.error_message || 'Analysis failed');
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
@ -197,98 +162,73 @@ export const analyzeWIPProof = async (
|
|||
onAgentUpdate: (name: AgentName | 'Summary', review?: SubReview) => void,
|
||||
msalInstance: IPublicClientApplication
|
||||
): Promise<string> => {
|
||||
// Acquire token before connecting
|
||||
const accessToken = await getAccessToken(msalInstance);
|
||||
if (!accessToken) {
|
||||
throw new Error('Failed to acquire access token. Please sign in again.');
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const ws = new WebSocket(WS_URL);
|
||||
let resolved = false;
|
||||
const fileData = await fileToBase64(file);
|
||||
|
||||
let wipPingInterval: ReturnType<typeof setInterval> | null = null;
|
||||
const startWipPing = () => {
|
||||
wipPingInterval = setInterval(() => {
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(JSON.stringify({ type: 'ping' }));
|
||||
}
|
||||
}, 15000);
|
||||
};
|
||||
const stopWipPing = () => {
|
||||
if (wipPingInterval !== null) {
|
||||
clearInterval(wipPingInterval);
|
||||
wipPingInterval = null;
|
||||
// Submit the analysis job
|
||||
const submitRes = await authFetch(
|
||||
`${HTTP_URL}/api/analyze`,
|
||||
{
|
||||
method: 'POST',
|
||||
body: JSON.stringify({
|
||||
file_data: fileData,
|
||||
file_type: file.type,
|
||||
is_wip: true,
|
||||
}),
|
||||
},
|
||||
accessToken,
|
||||
);
|
||||
|
||||
if (!submitRes.ok) {
|
||||
const err = await submitRes.text();
|
||||
throw new Error(`Failed to submit WIP analysis: ${submitRes.status} ${err}`);
|
||||
}
|
||||
|
||||
const { job_id } = await submitRes.json();
|
||||
|
||||
// Poll until complete
|
||||
const seenAgents = new Set<string>();
|
||||
|
||||
while (true) {
|
||||
await sleep(POLL_INTERVAL_MS);
|
||||
|
||||
const pollRes = await authFetch(
|
||||
`${HTTP_URL}/api/analyze/${job_id}`,
|
||||
{ method: 'GET' },
|
||||
accessToken,
|
||||
);
|
||||
|
||||
if (!pollRes.ok) {
|
||||
throw new Error(`Polling failed: ${pollRes.status}`);
|
||||
}
|
||||
|
||||
const job = await pollRes.json();
|
||||
|
||||
// Fire callbacks for newly completed agents
|
||||
for (const agentName of Object.keys(job.agents_completed ?? {})) {
|
||||
if (!seenAgents.has(agentName)) {
|
||||
seenAgents.add(agentName);
|
||||
onAgentUpdate(agentName as AgentName, job.agents_completed[agentName] as SubReview);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
ws.onopen = () => {
|
||||
startWipPing();
|
||||
const reader = new FileReader();
|
||||
reader.onloadend = () => {
|
||||
const base64Data = (reader.result as string).split(',')[1];
|
||||
ws.send(JSON.stringify({
|
||||
type: 'analyze',
|
||||
file_data: base64Data,
|
||||
file_type: file.type,
|
||||
is_wip: true,
|
||||
access_token: accessToken
|
||||
}));
|
||||
};
|
||||
reader.onerror = () => {
|
||||
ws.close();
|
||||
reject(new Error('Failed to read file'));
|
||||
};
|
||||
reader.readAsDataURL(file);
|
||||
};
|
||||
if (job.status === 'complete') {
|
||||
return job.result?.leadAgentSummary || 'Analysis complete.';
|
||||
}
|
||||
|
||||
ws.onmessage = (event) => {
|
||||
try {
|
||||
const message = JSON.parse(event.data);
|
||||
|
||||
if (message.type === 'heartbeat' || message.type === 'pong') {
|
||||
// Keepalive — ignore silently
|
||||
} else if (message.type === 'agent_completed') {
|
||||
onAgentUpdate(message.agent_name as AgentName, message.review);
|
||||
} else if (message.type === 'summary') {
|
||||
onAgentUpdate('Summary');
|
||||
} else if (message.type === 'complete') {
|
||||
resolved = true;
|
||||
stopWipPing();
|
||||
ws.close();
|
||||
resolve(message.result?.leadAgentSummary || 'Analysis complete.');
|
||||
} else if (message.type === 'error') {
|
||||
resolved = true;
|
||||
stopWipPing();
|
||||
ws.close();
|
||||
reject(new Error(message.message || 'Analysis failed'));
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('Failed to parse WebSocket message:', e);
|
||||
}
|
||||
};
|
||||
|
||||
ws.onerror = () => {
|
||||
stopWipPing();
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
reject(new Error('WebSocket connection error'));
|
||||
}
|
||||
};
|
||||
|
||||
ws.onclose = (event) => {
|
||||
stopWipPing();
|
||||
if (!resolved && !event.wasClean) {
|
||||
resolved = true;
|
||||
reject(new Error('Connection closed unexpectedly'));
|
||||
}
|
||||
};
|
||||
});
|
||||
if (job.status === 'error') {
|
||||
throw new Error(job.error_message || 'Analysis failed');
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Get a chat response from the WIP Lead Agent.
|
||||
* Uses HTTP REST endpoint (not WebSocket).
|
||||
* Uses HTTP REST endpoint.
|
||||
*/
|
||||
export const getWIPChatResponse = async (prompt: string): Promise<string> => {
|
||||
try {
|
||||
|
|
@ -306,7 +246,6 @@ export const getWIPChatResponse = async (prompt: string): Promise<string> => {
|
|||
return data.response || data.message || 'No response from Lead Agent.';
|
||||
} catch (error) {
|
||||
console.error('Error getting WIP chat response:', error);
|
||||
// Fallback message when backend chat endpoint is not available
|
||||
return "I'm sorry, the chat feature requires the backend chat endpoint to be implemented. For now, please use the proof analysis feature.";
|
||||
}
|
||||
};
|
||||
|
|
|
|||
|
|
@ -14,8 +14,7 @@ export default defineConfig(({ mode }) => {
|
|||
// Legacy - keep for backward compatibility during transition
|
||||
'process.env.API_KEY': JSON.stringify(env.GEMINI_API_KEY),
|
||||
'process.env.GEMINI_API_KEY': JSON.stringify(env.GEMINI_API_KEY),
|
||||
// Backend URLs for WebSocket and HTTP communication
|
||||
'process.env.VITE_BACKEND_WS_URL': JSON.stringify(env.VITE_BACKEND_WS_URL || 'ws://localhost:8000/ws/analyze'),
|
||||
// Backend HTTP URL
|
||||
'process.env.VITE_BACKEND_URL': JSON.stringify(env.VITE_BACKEND_URL || 'http://localhost:8000'),
|
||||
},
|
||||
base: env.VITE_BASE_PATH || '/',
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue