Fix WebSocket connection dropped during long proof analysis
- Add 25s heartbeat ping from backend to prevent Apache/proxy idle-timeout killing the connection during 1-3 min analysis runs - Handle heartbeat silently in both analyzeProof and analyzeWIPProof frontend handlers - Run PDF rasterization via asyncio.to_thread so heartbeats aren't blocked - Wrap analyze_proof with asyncio.wait_for(timeout=300) for a hard 5-min cap - Log dropped send_message calls in ConnectionManager instead of swallowing silently - cloudrun.yaml: add sessionAffinity, startup probe, raise containerConcurrency 4→10, document DISABLE_AUTH option Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
ef1e4adabd
commit
5c338c31fb
6 changed files with 83 additions and 29 deletions
|
|
@ -1,3 +1,4 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
from contextlib import asynccontextmanager
|
||||
|
|
@ -213,15 +214,32 @@ async def websocket_analyze(websocket: WebSocket):
|
|||
})
|
||||
continue
|
||||
|
||||
# Handle the analysis request
|
||||
await handle_analyze_message(
|
||||
websocket=websocket,
|
||||
client_id=client_id,
|
||||
data=data,
|
||||
manager=manager,
|
||||
analysis_service=analysis_service,
|
||||
current_user_id=current_user_id,
|
||||
)
|
||||
# Start keepalive heartbeat to prevent proxy idle-timeout
|
||||
# (Apache/nginx/Cloud Run default is 60s; we ping every 25s)
|
||||
async def _heartbeat(ws: WebSocket) -> None:
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(25)
|
||||
await ws.send_json({"type": "heartbeat"})
|
||||
logger.debug(f"[MAIN] Heartbeat sent to client {client_id}")
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as hb_err:
|
||||
logger.debug(f"[MAIN] Heartbeat stopped for client {client_id}: {hb_err}")
|
||||
|
||||
heartbeat_task = asyncio.create_task(_heartbeat(websocket))
|
||||
try:
|
||||
# Handle the analysis request
|
||||
await handle_analyze_message(
|
||||
websocket=websocket,
|
||||
client_id=client_id,
|
||||
data=data,
|
||||
manager=manager,
|
||||
analysis_service=analysis_service,
|
||||
current_user_id=current_user_id,
|
||||
)
|
||||
finally:
|
||||
heartbeat_task.cancel()
|
||||
else:
|
||||
logger.warning(f"[MAIN] Unknown message type: {data.get('type')}")
|
||||
await manager.send_message(client_id, {
|
||||
|
|
|
|||
|
|
@ -174,7 +174,7 @@ class AnalysisService:
|
|||
# Rasterize PDF to PNG images
|
||||
logger.info("[ANALYSIS] Detected PDF, rasterizing pages...")
|
||||
try:
|
||||
pdf_pages = pdf_service.rasterize(file_data, max_pages=10)
|
||||
pdf_pages = await asyncio.to_thread(pdf_service.rasterize, file_data, 10)
|
||||
images = [(png_data, "image/png") for png_data, _, _ in pdf_pages]
|
||||
logger.info(f"[ANALYSIS] Rasterized {len(images)} PDF pages")
|
||||
except ValueError as e:
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import asyncio
|
||||
import base64
|
||||
import logging
|
||||
import uuid
|
||||
|
|
@ -148,20 +149,31 @@ async def handle_analyze_message(
|
|||
else:
|
||||
logger.warning(f"[WEBSOCKET] Cannot send model_fallback - client {client_id} not connected")
|
||||
|
||||
# Run the analysis
|
||||
# Run the analysis (5-minute hard timeout)
|
||||
logger.info("[WEBSOCKET] Starting analysis...")
|
||||
result, pdf_pages = await analysis_service.analyze_proof(
|
||||
file_data=file_data,
|
||||
file_type=file_type,
|
||||
on_agent_update=on_agent_update,
|
||||
is_wip=is_wip,
|
||||
brand=brand,
|
||||
previous_analysis=previous_analysis,
|
||||
channel=channel,
|
||||
sub_channel=sub_channel,
|
||||
proof_type=proof_type,
|
||||
on_fallback=on_model_fallback,
|
||||
)
|
||||
try:
|
||||
result, pdf_pages = await asyncio.wait_for(
|
||||
analysis_service.analyze_proof(
|
||||
file_data=file_data,
|
||||
file_type=file_type,
|
||||
on_agent_update=on_agent_update,
|
||||
is_wip=is_wip,
|
||||
brand=brand,
|
||||
previous_analysis=previous_analysis,
|
||||
channel=channel,
|
||||
sub_channel=sub_channel,
|
||||
proof_type=proof_type,
|
||||
on_fallback=on_model_fallback,
|
||||
),
|
||||
timeout=300.0,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(f"[WEBSOCKET] Analysis timed out for client {client_id}")
|
||||
await manager.send_message(client_id, {
|
||||
"type": "error",
|
||||
"message": "Analysis timed out after 5 minutes. Please try again with a smaller file."
|
||||
})
|
||||
return
|
||||
|
||||
# Build the result dict
|
||||
result_dict = {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,9 @@
|
|||
import logging
|
||||
|
||||
from fastapi import WebSocket
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConnectionManager:
|
||||
"""Manages WebSocket connections for real-time updates."""
|
||||
|
|
@ -40,8 +44,8 @@ class ConnectionManager:
|
|||
if client_id in self.active_connections:
|
||||
try:
|
||||
await self.active_connections[client_id].send_json(message)
|
||||
except Exception:
|
||||
# Client may have disconnected
|
||||
except Exception as e:
|
||||
logger.warning(f"[MANAGER] Failed to send message to client {client_id}: {e}")
|
||||
self.disconnect(client_id)
|
||||
|
||||
def is_connected(self, client_id: str) -> bool:
|
||||
|
|
|
|||
|
|
@ -12,19 +12,29 @@ spec:
|
|||
# Keep 1 instance warm to prevent cold-start WebSocket failures
|
||||
autoscaling.knative.dev/minScale: "1"
|
||||
autoscaling.knative.dev/maxScale: "10"
|
||||
# Each instance handles up to 4 concurrent analyses (one per WebSocket)
|
||||
autoscaling.knative.dev/target: "4"
|
||||
# Each instance handles up to 10 concurrent analyses (one per WebSocket)
|
||||
autoscaling.knative.dev/target: "10"
|
||||
# Required for WebSocket: disable HTTP/2 multiplexing
|
||||
run.googleapis.com/execution-environment: gen2
|
||||
# Required for WebSocket: route all frames from a client to the same instance
|
||||
run.googleapis.com/sessionAffinity: "true"
|
||||
spec:
|
||||
# 10-minute timeout — analysis (4 agents + lead agent) can take 2-3 minutes
|
||||
# for large multi-page PDFs; 600s gives headroom without being excessive
|
||||
timeoutSeconds: 600
|
||||
containerConcurrency: 4
|
||||
# Gemini API calls are I/O-bound; 10 concurrent slots prevents queuing at low traffic
|
||||
containerConcurrency: 10
|
||||
containers:
|
||||
- image: gcr.io/YOUR_PROJECT_ID/modcomms-backend:latest
|
||||
ports:
|
||||
- containerPort: 8000
|
||||
startupProbe:
|
||||
httpGet:
|
||||
path: /health
|
||||
port: 8000
|
||||
initialDelaySeconds: 5
|
||||
periodSeconds: 5
|
||||
failureThreshold: 10
|
||||
resources:
|
||||
limits:
|
||||
# 2 vCPU + 4Gi RAM: handles PDF rasterisation and parallel agent calls
|
||||
|
|
@ -61,3 +71,7 @@ spec:
|
|||
value: "0.0.0.0"
|
||||
- name: PORT
|
||||
value: "8000"
|
||||
# ── Dev/staging only ──────────────────────────────────────────────
|
||||
# Uncomment to disable Azure AD auth (e.g. staging environment):
|
||||
# - name: DISABLE_AUTH
|
||||
# value: "true"
|
||||
|
|
|
|||
|
|
@ -127,6 +127,10 @@ export const analyzeProof = async (
|
|||
});
|
||||
break;
|
||||
|
||||
case 'heartbeat':
|
||||
// Server keepalive — ignore silently
|
||||
break;
|
||||
|
||||
case 'model_fallback':
|
||||
onNotification?.('The primary AI model is currently unavailable. Analysis is continuing with the backup model and may take longer than usual.');
|
||||
break;
|
||||
|
|
@ -201,7 +205,9 @@ export const analyzeWIPProof = async (
|
|||
try {
|
||||
const message = JSON.parse(event.data);
|
||||
|
||||
if (message.type === 'agent_completed') {
|
||||
if (message.type === 'heartbeat') {
|
||||
// Server keepalive — ignore silently
|
||||
} else if (message.type === 'agent_completed') {
|
||||
onAgentUpdate(message.agent_name as AgentName, message.review);
|
||||
} else if (message.type === 'summary') {
|
||||
onAgentUpdate('Summary');
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue