diff --git a/backend/app/main.py b/backend/app/main.py index d1485fe..12103e0 100755 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,3 +1,4 @@ +import asyncio import logging import uuid from contextlib import asynccontextmanager @@ -213,15 +214,32 @@ async def websocket_analyze(websocket: WebSocket): }) continue - # Handle the analysis request - await handle_analyze_message( - websocket=websocket, - client_id=client_id, - data=data, - manager=manager, - analysis_service=analysis_service, - current_user_id=current_user_id, - ) + # Start keepalive heartbeat to prevent proxy idle-timeout + # (Apache/nginx/Cloud Run default is 60s; we ping every 25s) + async def _heartbeat(ws: WebSocket) -> None: + try: + while True: + await asyncio.sleep(25) + await ws.send_json({"type": "heartbeat"}) + logger.debug(f"[MAIN] Heartbeat sent to client {client_id}") + except asyncio.CancelledError: + pass + except Exception as hb_err: + logger.debug(f"[MAIN] Heartbeat stopped for client {client_id}: {hb_err}") + + heartbeat_task = asyncio.create_task(_heartbeat(websocket)) + try: + # Handle the analysis request + await handle_analyze_message( + websocket=websocket, + client_id=client_id, + data=data, + manager=manager, + analysis_service=analysis_service, + current_user_id=current_user_id, + ) + finally: + heartbeat_task.cancel() else: logger.warning(f"[MAIN] Unknown message type: {data.get('type')}") await manager.send_message(client_id, { diff --git a/backend/app/services/analysis_service.py b/backend/app/services/analysis_service.py index a922711..8134872 100755 --- a/backend/app/services/analysis_service.py +++ b/backend/app/services/analysis_service.py @@ -174,7 +174,7 @@ class AnalysisService: # Rasterize PDF to PNG images logger.info("[ANALYSIS] Detected PDF, rasterizing pages...") try: - pdf_pages = pdf_service.rasterize(file_data, max_pages=10) + pdf_pages = await asyncio.to_thread(pdf_service.rasterize, file_data, 10) images = [(png_data, "image/png") for png_data, _, _ in pdf_pages] logger.info(f"[ANALYSIS] Rasterized {len(images)} PDF pages") except ValueError as e: diff --git a/backend/app/websocket/handlers.py b/backend/app/websocket/handlers.py index 842a841..b2b016a 100755 --- a/backend/app/websocket/handlers.py +++ b/backend/app/websocket/handlers.py @@ -1,3 +1,4 @@ +import asyncio import base64 import logging import uuid @@ -148,20 +149,31 @@ async def handle_analyze_message( else: logger.warning(f"[WEBSOCKET] Cannot send model_fallback - client {client_id} not connected") - # Run the analysis + # Run the analysis (5-minute hard timeout) logger.info("[WEBSOCKET] Starting analysis...") - result, pdf_pages = await analysis_service.analyze_proof( - file_data=file_data, - file_type=file_type, - on_agent_update=on_agent_update, - is_wip=is_wip, - brand=brand, - previous_analysis=previous_analysis, - channel=channel, - sub_channel=sub_channel, - proof_type=proof_type, - on_fallback=on_model_fallback, - ) + try: + result, pdf_pages = await asyncio.wait_for( + analysis_service.analyze_proof( + file_data=file_data, + file_type=file_type, + on_agent_update=on_agent_update, + is_wip=is_wip, + brand=brand, + previous_analysis=previous_analysis, + channel=channel, + sub_channel=sub_channel, + proof_type=proof_type, + on_fallback=on_model_fallback, + ), + timeout=300.0, + ) + except asyncio.TimeoutError: + logger.error(f"[WEBSOCKET] Analysis timed out for client {client_id}") + await manager.send_message(client_id, { + "type": "error", + "message": "Analysis timed out after 5 minutes. Please try again with a smaller file." + }) + return # Build the result dict result_dict = { diff --git a/backend/app/websocket/manager.py b/backend/app/websocket/manager.py index 18190b5..e44e41a 100755 --- a/backend/app/websocket/manager.py +++ b/backend/app/websocket/manager.py @@ -1,5 +1,9 @@ +import logging + from fastapi import WebSocket +logger = logging.getLogger(__name__) + class ConnectionManager: """Manages WebSocket connections for real-time updates.""" @@ -40,8 +44,8 @@ class ConnectionManager: if client_id in self.active_connections: try: await self.active_connections[client_id].send_json(message) - except Exception: - # Client may have disconnected + except Exception as e: + logger.warning(f"[MANAGER] Failed to send message to client {client_id}: {e}") self.disconnect(client_id) def is_connected(self, client_id: str) -> bool: diff --git a/cloudrun.yaml b/cloudrun.yaml index e45303f..74c329e 100644 --- a/cloudrun.yaml +++ b/cloudrun.yaml @@ -12,19 +12,29 @@ spec: # Keep 1 instance warm to prevent cold-start WebSocket failures autoscaling.knative.dev/minScale: "1" autoscaling.knative.dev/maxScale: "10" - # Each instance handles up to 4 concurrent analyses (one per WebSocket) - autoscaling.knative.dev/target: "4" + # Each instance handles up to 10 concurrent analyses (one per WebSocket) + autoscaling.knative.dev/target: "10" # Required for WebSocket: disable HTTP/2 multiplexing run.googleapis.com/execution-environment: gen2 + # Required for WebSocket: route all frames from a client to the same instance + run.googleapis.com/sessionAffinity: "true" spec: # 10-minute timeout — analysis (4 agents + lead agent) can take 2-3 minutes # for large multi-page PDFs; 600s gives headroom without being excessive timeoutSeconds: 600 - containerConcurrency: 4 + # Gemini API calls are I/O-bound; 10 concurrent slots prevents queuing at low traffic + containerConcurrency: 10 containers: - image: gcr.io/YOUR_PROJECT_ID/modcomms-backend:latest ports: - containerPort: 8000 + startupProbe: + httpGet: + path: /health + port: 8000 + initialDelaySeconds: 5 + periodSeconds: 5 + failureThreshold: 10 resources: limits: # 2 vCPU + 4Gi RAM: handles PDF rasterisation and parallel agent calls @@ -61,3 +71,7 @@ spec: value: "0.0.0.0" - name: PORT value: "8000" + # ── Dev/staging only ────────────────────────────────────────────── + # Uncomment to disable Azure AD auth (e.g. staging environment): + # - name: DISABLE_AUTH + # value: "true" diff --git a/frontend/services/geminiService.ts b/frontend/services/geminiService.ts index 77d35f5..5b65cb8 100755 --- a/frontend/services/geminiService.ts +++ b/frontend/services/geminiService.ts @@ -127,6 +127,10 @@ export const analyzeProof = async ( }); break; + case 'heartbeat': + // Server keepalive — ignore silently + break; + case 'model_fallback': onNotification?.('The primary AI model is currently unavailable. Analysis is continuing with the backup model and may take longer than usual.'); break; @@ -201,7 +205,9 @@ export const analyzeWIPProof = async ( try { const message = JSON.parse(event.data); - if (message.type === 'agent_completed') { + if (message.type === 'heartbeat') { + // Server keepalive — ignore silently + } else if (message.type === 'agent_completed') { onAgentUpdate(message.agent_name as AgentName, message.review); } else if (message.type === 'summary') { onAgentUpdate('Summary');