modcomms/backend/app/services/job_store.py
Vadym Samoilenko a6fc149788 Replace WebSocket with REST polling to fix GCP LB 30s timeout
POST /api/analyze submits an analysis job and returns job_id instantly.
GET /api/analyze/{job_id} returns progress + result; frontend polls every 2s.

Analysis runs as asyncio.create_task in the background — each HTTP request
completes in milliseconds, well within the 30s GCP Load Balancer limit.

- Add backend/app/services/job_store.py: in-memory AnalysisJob store with
  30-min TTL cleanup
- Add backend/app/api/analysis_routes.py: POST + GET /api/analyze endpoints
  with full analysis pipeline (hash check, DB persistence, PDF pages, etc.)
- Remove backend/app/websocket/: handlers.py, manager.py, __init__.py
- Update backend/app/main.py: wire analysis_router, store analysis_service
  in app.state, drop all WebSocket imports and endpoint
- Update frontend/services/geminiService.ts: replace WS with fetch+poll;
  function signatures unchanged so App.tsx / WIPReviewer.tsx need no edits
- Remove VITE_BACKEND_WS_URL from vite.config.ts, deploy.sh, .env.deploy.example
- Update cloudrun.yaml: remove WebSocket-specific session affinity annotation

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 15:26:01 +00:00

50 lines
1.4 KiB
Python

import time
import uuid
import logging
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
# In-memory job store — safe for single Cloud Run instance
_jobs: dict[str, "AnalysisJob"] = {}
JOB_TTL_SECONDS = 1800 # 30 minutes
@dataclass
class AnalysisJob:
job_id: str
status: str # "pending" | "running" | "complete" | "error"
created_at: float
agents_started: list[str] = field(default_factory=list)
agents_completed: dict[str, dict] = field(default_factory=dict)
model_fallback: bool = False
result: dict | None = None
proof_id: str | None = None
version_id: str | None = None
pdf_pages: list[dict] | None = None
is_identical_file: bool = False
error_message: str | None = None
def _cleanup_old_jobs() -> None:
"""Remove jobs older than JOB_TTL_SECONDS."""
cutoff = time.time() - JOB_TTL_SECONDS
stale = [jid for jid, job in _jobs.items() if job.created_at < cutoff]
for jid in stale:
del _jobs[jid]
if stale:
logger.info(f"[JOB_STORE] Cleaned up {len(stale)} stale jobs")
def create_job() -> AnalysisJob:
_cleanup_old_jobs()
job_id = str(uuid.uuid4())
job = AnalysisJob(job_id=job_id, status="pending", created_at=time.time())
_jobs[job_id] = job
logger.info(f"[JOB_STORE] Created job {job_id}")
return job
def get_job(job_id: str) -> AnalysisJob | None:
return _jobs.get(job_id)