diff --git a/backend/app/__pycache__/main.cpython-313.pyc b/backend/app/__pycache__/main.cpython-313.pyc index 33c65c8..00a4d44 100644 Binary files a/backend/app/__pycache__/main.cpython-313.pyc and b/backend/app/__pycache__/main.cpython-313.pyc differ diff --git a/backend/app/api/v1/__pycache__/routes_jobs.cpython-313.pyc b/backend/app/api/v1/__pycache__/routes_jobs.cpython-313.pyc index 070f4c9..da8c9f4 100644 Binary files a/backend/app/api/v1/__pycache__/routes_jobs.cpython-313.pyc and b/backend/app/api/v1/__pycache__/routes_jobs.cpython-313.pyc differ diff --git a/backend/app/api/v1/__pycache__/routes_websockets.cpython-313.pyc b/backend/app/api/v1/__pycache__/routes_websockets.cpython-313.pyc new file mode 100644 index 0000000..8bcdd45 Binary files /dev/null and b/backend/app/api/v1/__pycache__/routes_websockets.cpython-313.pyc differ diff --git a/backend/app/api/v1/routes_jobs.py b/backend/app/api/v1/routes_jobs.py index 457079f..26d943e 100644 --- a/backend/app/api/v1/routes_jobs.py +++ b/backend/app/api/v1/routes_jobs.py @@ -27,6 +27,7 @@ from ...schemas.job import ( VttTimingAdjustRequest, VttUpdateRequest, ) +from ...services.websocket import connection_manager from ...services.gcs import ( gcs_service, upload_file_to_gcs, @@ -351,6 +352,17 @@ async def approve_english( detail="Job not found or not in pending QC status" ) + # Broadcast status update + try: + await connection_manager.broadcast_job_status_update( + job_id=job_id, + status=JobStatus.APPROVED_ENGLISH.value, + message="English content approved - starting translation", + progress=None + ) + except Exception as e: + logger.warning(f"Failed to broadcast status update for job {job_id}: {e}") + # Trigger translation and synthesis pipeline immediately try: translate_and_synthesize_task.delay(job_id) @@ -406,6 +418,17 @@ async def reject_job( detail="Job not found or not in pending QC status" ) + # Broadcast status update + try: + await connection_manager.broadcast_job_status_update( + job_id=job_id, + status=JobStatus.REJECTED.value, + message="Job rejected - requires revision", + progress=None + ) + except Exception as e: + logger.warning(f"Failed to broadcast status update for job {job_id}: {e}") + return JobResponse( id=str(result["_id"]), title=result["title"], @@ -474,6 +497,17 @@ async def complete_job( detail="Job not found or not in pending final review status" ) + # Broadcast status update + try: + await connection_manager.broadcast_job_status_update( + job_id=job_id, + status=JobStatus.COMPLETED.value, + message="Job completed - all files ready for download", + progress=100 + ) + except Exception as e: + logger.warning(f"Failed to broadcast status update for job {job_id}: {e}") + return JobResponse( id=str(result["_id"]), title=result["title"], @@ -521,6 +555,17 @@ async def reject_final_review( detail="Job not found or not in pending final review status" ) + # Broadcast status update + try: + await connection_manager.broadcast_job_status_update( + job_id=job_id, + status=JobStatus.QC_FEEDBACK.value, + message="Final review rejected - requires changes", + progress=None + ) + except Exception as e: + logger.warning(f"Failed to broadcast status update for job {job_id}: {e}") + return JobResponse( id=str(result["_id"]), title=result["title"], diff --git a/backend/app/api/v1/routes_websockets.py b/backend/app/api/v1/routes_websockets.py new file mode 100644 index 0000000..8b5b360 --- /dev/null +++ b/backend/app/api/v1/routes_websockets.py @@ -0,0 +1,214 @@ +""" +WebSocket routes for real-time job status updates + +Provides WebSocket endpoints for: +1. Individual job status updates: /ws/jobs/{job_id} +2. Job list updates: /ws/jobs (all jobs for authenticated user) +""" +import logging +from typing import Optional + +from fastapi import APIRouter, WebSocket, WebSocketDisconnect, HTTPException, Depends, Query +from fastapi.security import HTTPBearer + +from ...services.websocket import ( + connection_manager, + authenticate_websocket, + get_connection_manager, + ConnectionManager +) +from ...models.job import Job +from ...core.database import get_database +from ...core.dependencies import get_current_user + +logger = logging.getLogger(__name__) + +router = APIRouter(tags=["WebSocket"]) +security = HTTPBearer() + + +@router.websocket("/ws/jobs/{job_id}") +async def websocket_job_status( + websocket: WebSocket, + job_id: str, + token: Optional[str] = Query(None), + manager: ConnectionManager = Depends(get_connection_manager) +): + """ + WebSocket endpoint for real-time job status updates + + Usage: + - Connect: ws://localhost:8000/api/v1/ws/jobs/{job_id}?token={jwt_token} + - Receives: Real-time status updates for the specific job + + Message format: + { + "type": "job_status_update", + "data": { + "job_id": "...", + "status": "processing", + "updated_at": "2023-...", + "message": "Processing video...", + "progress": 45 + } + } + """ + # Authenticate the WebSocket connection + user_id = await authenticate_websocket(websocket, token) + if not user_id: + return + + try: + # Verify user has access to this job + db = await get_database() + jobs_collection = db["jobs"] + + job = await jobs_collection.find_one({"_id": job_id}) + if not job: + await websocket.close(code=4004, reason="Job not found") + return + + # Check permissions - users can only access their own jobs unless they're admin/reviewer + user = await db["users"].find_one({"_id": user_id}) + if not user: + try: + from bson import ObjectId + user = await db["users"].find_one({"_id": ObjectId(user_id)}) + except Exception: + pass # Invalid ObjectId format + + if not user: + await websocket.close(code=4001, reason="User not found") + return + + # Check access permissions + if user["role"] == "client" and job.get("created_by") != user_id: + await websocket.close(code=4003, reason="Access denied") + return + + # Connect to job status updates + await manager.connect_job_status(websocket, user_id, job_id) + + # Keep connection alive and handle incoming messages + while True: + try: + # Wait for incoming WebSocket messages (for heartbeat, etc.) + message = await websocket.receive_text() + logger.debug(f"Received WebSocket message from user {user_id}: {message}") + + # Handle heartbeat or other client messages if needed + if message == "ping": + await websocket.send_text("pong") + + except WebSocketDisconnect: + break + except Exception as e: + logger.error(f"Error in WebSocket message handling: {e}") + break + + except WebSocketDisconnect: + pass + except Exception as e: + logger.error(f"WebSocket job status error: {e}") + finally: + manager.disconnect(websocket, user_id) + + +@router.websocket("/ws/jobs") +async def websocket_job_list( + websocket: WebSocket, + token: Optional[str] = Query(None), + manager: ConnectionManager = Depends(get_connection_manager) +): + """ + WebSocket endpoint for real-time job list updates + + Usage: + - Connect: ws://localhost:8000/api/v1/ws/jobs?token={jwt_token} + - Receives: Real-time status updates for all jobs the user can access + + Message format: + { + "type": "job_list_update", + "data": { + "job_id": "...", + "status": "processing", + "updated_at": "2023-...", + "message": "Processing video...", + "progress": 45 + } + } + """ + # Authenticate the WebSocket connection + user_id = await authenticate_websocket(websocket, token) + if not user_id: + return + + try: + # Verify user exists + logger.info(f"WebSocket: Looking up user {user_id} in database") + db = await get_database() + + # Try looking up user by string ID first, then by ObjectId + user = await db["users"].find_one({"_id": user_id}) + if not user: + try: + from bson import ObjectId + user = await db["users"].find_one({"_id": ObjectId(user_id)}) + except Exception: + pass # Invalid ObjectId format + + if not user: + logger.warning(f"WebSocket: User {user_id} not found in database (tried both string and ObjectId)") + await websocket.close(code=4001, reason="User not found") + return + + logger.info(f"WebSocket: User {user_id} found, role: {user.get('role', 'unknown')}") + + logger.info(f"WebSocket: User {user_id} found, connecting to job list updates") + # Connect to job list updates + await manager.connect_job_list(websocket, user_id) + + # Keep connection alive and handle incoming messages + while True: + try: + # Wait for incoming WebSocket messages + message = await websocket.receive_text() + logger.debug(f"Received WebSocket message from user {user_id}: {message}") + + # Handle heartbeat or other client messages if needed + if message == "ping": + await websocket.send_text("pong") + + except WebSocketDisconnect: + break + except Exception as e: + logger.error(f"Error in WebSocket message handling: {e}") + break + + except WebSocketDisconnect: + pass + except Exception as e: + logger.error(f"WebSocket job list error: {e}") + finally: + manager.disconnect(websocket, user_id) + + +@router.get("/ws/status") +async def websocket_status(): + """ + Get WebSocket connection status and statistics + Useful for debugging and monitoring + """ + stats = { + "active_connections": len(connection_manager.active_connections), + "job_subscriptions": len(connection_manager.job_subscriptions), + "global_subscriptions": len(connection_manager.global_subscriptions), + "redis_connected": connection_manager.redis_client is not None, + "subscriber_running": ( + connection_manager.subscriber_task is not None and + not connection_manager.subscriber_task.done() + ) + } + + return stats \ No newline at end of file diff --git a/backend/app/main.py b/backend/app/main.py index 0b7c3cb..e94cba9 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -14,6 +14,8 @@ from .api.v1.routes_admin import router as admin_router from .api.v1.routes_auth import router as auth_router from .api.v1.routes_files import router as files_router from .api.v1.routes_jobs import router as jobs_router +from .api.v1.routes_websockets import router as websockets_router +from .services.websocket import connection_manager from .core.config import settings from .core.secrets_config import initialize_config from .core.database import close_mongo_connection, connect_to_mongo, create_indexes @@ -26,6 +28,7 @@ from .telemetry import ( instrument_fastapi_app, setup_tracing ) +from .services.websocket import connection_manager @asynccontextmanager @@ -71,6 +74,9 @@ async def lifespan(app: FastAPI): await connect_to_redis() # await create_indexes() # Temporarily disabled for debugging + # Start WebSocket connection manager + await connection_manager.start() + # Initialize middleware with Redis client redis_client = get_redis_client() if redis_client: @@ -83,6 +89,7 @@ async def lifespan(app: FastAPI): yield # Shutdown + await connection_manager.stop() await close_mongo_connection() await close_redis_connection() @@ -197,6 +204,34 @@ app.include_router(auth_router, prefix="/api/v1") app.include_router(files_router, prefix="/api/v1") app.include_router(jobs_router, prefix="/api/v1") app.include_router(admin_router, prefix="/api/v1") +app.include_router(websockets_router, prefix="/api/v1") + + +@app.on_event("startup") +async def startup_event(): + """Initialize services on startup""" + logger.info("🚀 Starting up FastAPI application...") + + # Start WebSocket connection manager + try: + await connection_manager.start() + logger.info("✅ WebSocket connection manager started successfully") + except Exception as e: + logger.error(f"❌ Failed to start WebSocket connection manager: {e}") + raise + + +@app.on_event("shutdown") +async def shutdown_event(): + """Cleanup services on shutdown""" + logger.info("🛑 Shutting down FastAPI application...") + + # Stop WebSocket connection manager + try: + await connection_manager.stop() + logger.info("✅ WebSocket connection manager stopped successfully") + except Exception as e: + logger.error(f"❌ Error stopping WebSocket connection manager: {e}") @app.get("/health") diff --git a/backend/app/services/__pycache__/websocket.cpython-313.pyc b/backend/app/services/__pycache__/websocket.cpython-313.pyc new file mode 100644 index 0000000..84fe2a5 Binary files /dev/null and b/backend/app/services/__pycache__/websocket.cpython-313.pyc differ diff --git a/backend/app/services/websocket.py b/backend/app/services/websocket.py new file mode 100644 index 0000000..e2aabc7 --- /dev/null +++ b/backend/app/services/websocket.py @@ -0,0 +1,361 @@ +""" +WebSocket Connection Manager for Real-time Job Status Updates + +This module provides WebSocket support for broadcasting job status changes +in real-time to connected clients. It uses Redis pub/sub for scalable +message broadcasting across multiple worker processes. +""" +import asyncio +import json +import logging +from typing import Dict, List, Set, Optional, Any +from datetime import datetime + +from fastapi import WebSocket, WebSocketDisconnect +import redis.asyncio as redis +import redis as sync_redis +from pydantic import BaseModel + +from ..core.redis import get_redis_client +from ..core.security import decode_token +from ..core.config import settings + +logger = logging.getLogger(__name__) + + +class JobStatusUpdate(BaseModel): + """Schema for job status update messages""" + job_id: str + status: str + updated_at: datetime + message: Optional[str] = None + progress: Optional[int] = None # 0-100 percentage + metadata: Optional[Dict[str, Any]] = None + + +class ConnectionManager: + """Manages WebSocket connections and Redis pub/sub for job status updates""" + + def __init__(self): + # Active WebSocket connections by user_id + self.active_connections: Dict[str, Set[WebSocket]] = {} + # Job subscriptions: job_id -> set of user_ids + self.job_subscriptions: Dict[str, Set[str]] = {} + # Global job list subscriptions by user_id + self.global_subscriptions: Set[str] = set() + # Redis client for pub/sub + self.redis_client: Optional[redis.Redis] = None + self.pubsub: Optional[redis.client.PubSub] = None + self.subscriber_task: Optional[asyncio.Task] = None + + async def start(self): + """Initialize Redis pub/sub subscriber""" + try: + self.redis_client = await redis.from_url( + settings.redis_url, + encoding="utf-8", + decode_responses=True + ) + self.pubsub = self.redis_client.pubsub() + + # Subscribe to job status channels + await self.pubsub.subscribe("job_status_updates") # Global channel + await self.pubsub.psubscribe("job_status_updates:*") # Pattern for individual job channels + + # Start background task to handle Redis messages + self.subscriber_task = asyncio.create_task(self._redis_subscriber()) + logger.info("WebSocket connection manager started") + + except Exception as e: + logger.error(f"Failed to start WebSocket connection manager: {e}") + raise + + async def stop(self): + """Cleanup Redis connections""" + if self.subscriber_task: + self.subscriber_task.cancel() + try: + await self.subscriber_task + except asyncio.CancelledError: + pass + + if self.pubsub: + await self.pubsub.unsubscribe() + await self.pubsub.punsubscribe() + await self.pubsub.aclose() + + if self.redis_client: + await self.redis_client.aclose() + + logger.info("WebSocket connection manager stopped") + + async def connect_job_status(self, websocket: WebSocket, user_id: str, job_id: str): + """Connect a WebSocket for specific job status updates""" + await websocket.accept() + + # Add connection to active connections + if user_id not in self.active_connections: + self.active_connections[user_id] = set() + self.active_connections[user_id].add(websocket) + + # Add job subscription + if job_id not in self.job_subscriptions: + self.job_subscriptions[job_id] = set() + self.job_subscriptions[job_id].add(user_id) + + logger.info(f"User {user_id} connected for job {job_id} status updates") + + # Send initial connection confirmation + await self._send_to_websocket(websocket, { + "type": "connection_established", + "job_id": job_id, + "timestamp": datetime.utcnow().isoformat() + }) + + async def connect_job_list(self, websocket: WebSocket, user_id: str): + """Connect a WebSocket for job list updates (all jobs for a user)""" + await websocket.accept() + + # Add connection to active connections + if user_id not in self.active_connections: + self.active_connections[user_id] = set() + self.active_connections[user_id].add(websocket) + + # Add to global subscriptions + self.global_subscriptions.add(user_id) + + logger.info(f"User {user_id} connected for job list updates") + + # Send initial connection confirmation + await self._send_to_websocket(websocket, { + "type": "connection_established", + "scope": "job_list", + "timestamp": datetime.utcnow().isoformat() + }) + + def disconnect(self, websocket: WebSocket, user_id: str): + """Disconnect a WebSocket and clean up subscriptions""" + # Remove from active connections + if user_id in self.active_connections: + self.active_connections[user_id].discard(websocket) + if not self.active_connections[user_id]: + del self.active_connections[user_id] + + # Remove from global subscriptions if no connections left + if user_id not in self.active_connections: + self.global_subscriptions.discard(user_id) + + # Remove from job subscriptions + for job_id in list(self.job_subscriptions.keys()): + self.job_subscriptions[job_id].discard(user_id) + if not self.job_subscriptions[job_id]: + del self.job_subscriptions[job_id] + + logger.info(f"User {user_id} disconnected from WebSocket") + + async def broadcast_job_status_update( + self, + job_id: str, + status: str, + user_id: Optional[str] = None, + message: Optional[str] = None, + progress: Optional[int] = None, + metadata: Optional[Dict[str, Any]] = None + ): + """ + Broadcast job status update to Redis pub/sub + This will be called from Celery workers + """ + update = JobStatusUpdate( + job_id=job_id, + status=status, + updated_at=datetime.utcnow(), + message=message, + progress=progress, + metadata=metadata + ) + + try: + # Create a synchronous Redis client for Celery workers + redis_client = sync_redis.Redis.from_url( + settings.redis_url, + encoding="utf-8", + decode_responses=True + ) + + # Publish to global channel + redis_client.publish( + "job_status_updates", + update.model_dump_json() + ) + + # Publish to specific job channel + redis_client.publish( + f"job_status_updates:{job_id}", + update.model_dump_json() + ) + + # Close the connection + redis_client.close() + + logger.debug(f"Broadcasted status update for job {job_id}: {status}") + + except Exception as e: + logger.error(f"Failed to broadcast job status update: {e}") + + async def _redis_subscriber(self): + """Background task to handle Redis pub/sub messages""" + try: + async for message in self.pubsub.listen(): + # Handle both regular messages and pattern messages + if message["type"] in ("message", "pmessage"): + await self._handle_redis_message(message) + except asyncio.CancelledError: + logger.info("Redis subscriber task cancelled") + except Exception as e: + logger.error(f"Redis subscriber error: {e}") + + async def _handle_redis_message(self, message: Dict[str, Any]): + """Handle incoming Redis pub/sub message""" + try: + # For pattern messages, the channel is in the "channel" field + # For regular messages, it's also in the "channel" field + channel = message["channel"] + data = json.loads(message["data"]) + update = JobStatusUpdate(**data) + + logger.debug(f"Received Redis message on channel '{channel}': {data}") + + # Send to specific job subscribers + if channel.startswith("job_status_updates:"): + job_id = channel.split(":", 1)[1] + logger.debug(f"Sending job status update for job {job_id} to subscribers") + await self._send_job_status_to_subscribers(job_id, update) + + # Send to global subscribers (job list updates) + elif channel == "job_status_updates": + logger.debug(f"Sending global job status update to subscribers") + await self._send_job_status_to_global_subscribers(update) + + except Exception as e: + logger.error(f"Failed to handle Redis message: {e}") + + async def _send_job_status_to_subscribers(self, job_id: str, update: JobStatusUpdate): + """Send job status update to specific job subscribers""" + if job_id not in self.job_subscriptions: + return + + # Convert to JSON-serializable dict + message = { + "type": "job_status_update", + "data": json.loads(update.model_dump_json()) + } + + for user_id in list(self.job_subscriptions[job_id]): + await self._send_to_user(user_id, message) + + async def _send_job_status_to_global_subscribers(self, update: JobStatusUpdate): + """Send job status update to global (job list) subscribers""" + # Convert to JSON-serializable dict + message = { + "type": "job_list_update", + "data": json.loads(update.model_dump_json()) + } + + for user_id in list(self.global_subscriptions): + await self._send_to_user(user_id, message) + + async def _send_to_user(self, user_id: str, message: Dict[str, Any]): + """Send message to all WebSocket connections for a user""" + if user_id not in self.active_connections: + return + + # Send to all connections for this user + disconnected_connections = set() + for websocket in list(self.active_connections[user_id]): + try: + await self._send_to_websocket(websocket, message) + except Exception as e: + logger.warning(f"Failed to send to websocket for user {user_id}: {e}") + disconnected_connections.add(websocket) + + # Clean up disconnected connections + for websocket in disconnected_connections: + self.disconnect(websocket, user_id) + + async def _send_to_websocket(self, websocket: WebSocket, message: Dict[str, Any]): + """Send message to a specific WebSocket connection""" + try: + await websocket.send_json(message) + except Exception as e: + logger.warning(f"WebSocket send failed: {e}") + raise + + +# Global connection manager instance +connection_manager = ConnectionManager() + + +async def authenticate_websocket(websocket: WebSocket, token: str) -> Optional[str]: + """ + Authenticate WebSocket connection using JWT token + Returns user_id if valid, None if invalid + """ + try: + if not token: + await websocket.close(code=4001, reason="Missing authentication token") + return None + + # Decode JWT token + payload = decode_token(token) + if not payload or "sub" not in payload: + await websocket.close(code=4001, reason="Invalid authentication token") + return None + + return payload["sub"] # user_id + + except Exception as e: + logger.warning(f"WebSocket authentication failed: {e}") + await websocket.close(code=4001, reason="Authentication failed") + return None + + +async def authenticate_websocket(websocket: WebSocket, token: Optional[str]) -> Optional[str]: + """ + Authenticate a WebSocket connection using a JWT token + Returns user_id if valid, None if invalid + """ + try: + if not token: + logger.warning("WebSocket authentication failed: Missing token") + await websocket.close(code=4001, reason="Missing authentication token") + return None + + # Import JWT decode function + from ..core.security import decode_token + + # Decode JWT token - this may raise HTTPException + try: + payload = decode_token(token) + if not payload or "sub" not in payload: + logger.warning("WebSocket authentication failed: Invalid token payload") + await websocket.close(code=4001, reason="Invalid authentication token") + return None + + user_id = payload["sub"] + logger.info(f"WebSocket authentication successful for user: {user_id}") + return user_id + except Exception as jwt_error: + logger.warning(f"WebSocket authentication failed: JWT decode error: {jwt_error}") + await websocket.close(code=4001, reason="Invalid authentication token") + return None + + except Exception as e: + logger.error(f"WebSocket authentication failed with unexpected error: {e}") + await websocket.close(code=4001, reason="Authentication failed") + return None + + +async def get_connection_manager() -> ConnectionManager: + """Dependency to get the connection manager""" + return connection_manager \ No newline at end of file diff --git a/backend/app/tasks/__pycache__/__init__.cpython-313.pyc b/backend/app/tasks/__pycache__/__init__.cpython-313.pyc index 346a0ca..04eadb9 100644 Binary files a/backend/app/tasks/__pycache__/__init__.cpython-313.pyc and b/backend/app/tasks/__pycache__/__init__.cpython-313.pyc differ diff --git a/backend/app/tasks/__pycache__/ingest_and_ai.cpython-313.pyc b/backend/app/tasks/__pycache__/ingest_and_ai.cpython-313.pyc index 3e9230b..09d57ef 100644 Binary files a/backend/app/tasks/__pycache__/ingest_and_ai.cpython-313.pyc and b/backend/app/tasks/__pycache__/ingest_and_ai.cpython-313.pyc differ diff --git a/backend/app/tasks/__pycache__/notify.cpython-313.pyc b/backend/app/tasks/__pycache__/notify.cpython-313.pyc index 901e6fa..93dee06 100644 Binary files a/backend/app/tasks/__pycache__/notify.cpython-313.pyc and b/backend/app/tasks/__pycache__/notify.cpython-313.pyc differ diff --git a/backend/app/tasks/__pycache__/translate_and_synthesize.cpython-313.pyc b/backend/app/tasks/__pycache__/translate_and_synthesize.cpython-313.pyc index 8049562..7877ba6 100644 Binary files a/backend/app/tasks/__pycache__/translate_and_synthesize.cpython-313.pyc and b/backend/app/tasks/__pycache__/translate_and_synthesize.cpython-313.pyc differ diff --git a/backend/app/tasks/ingest_and_ai.py b/backend/app/tasks/ingest_and_ai.py index c59a879..d5e3eb3 100644 --- a/backend/app/tasks/ingest_and_ai.py +++ b/backend/app/tasks/ingest_and_ai.py @@ -12,11 +12,64 @@ from ..core.logging import get_logger from ..models.job import JobStatus from ..services.gcs import gcs_service, upload_vtt_to_gcs from ..services.gemini import gemini_service +from ..services.websocket import connection_manager from . import celery_app logger = get_logger(__name__) +def broadcast_status_update(job_id: str, status: str, message: str = None, progress: int = None): + """ + Helper function to broadcast job status updates via WebSocket + Uses sync Redis client for Celery worker context + """ + logger.info(f"🔊 ATTEMPTING TO BROADCAST: job_id={job_id}, status={status}, message={message}") + try: + import redis as sync_redis + from ..core.config import settings + from ..services.websocket import JobStatusUpdate + from datetime import datetime + + logger.info(f"🔊 About to create JobStatusUpdate for job {job_id}") + + # Create status update + update = JobStatusUpdate( + job_id=job_id, + status=status, + updated_at=datetime.utcnow(), + message=message, + progress=progress + ) + + logger.info(f"🔊 Created update object, now connecting to Redis: {settings.redis_url}") + + # Create synchronous Redis client + redis_client = sync_redis.Redis.from_url( + settings.redis_url, + encoding="utf-8", + decode_responses=True + ) + + logger.info(f"🔊 Redis client created, now publishing to channels") + + # Publish to channels + result1 = redis_client.publish("job_status_updates", update.model_dump_json()) + result2 = redis_client.publish(f"job_status_updates:{job_id}", update.model_dump_json()) + + logger.info(f"🔊 Published to channels - general: {result1} subscribers, job-specific: {result2} subscribers") + + # Close connection + redis_client.close() + + logger.info(f"🔊 ✅ Successfully broadcasted status update for job {job_id}: {status}") + + except Exception as e: + logger.error(f"🔊 ❌ Failed to broadcast status update for job {job_id}: {e}") + import traceback + logger.error(f"🔊 ❌ Full traceback: {traceback.format_exc()}") + # Don't let WebSocket failures break the worker task + + class AsyncTask(Task): """Base task class that supports async execution""" def __call__(self, *args, **kwargs): @@ -79,6 +132,14 @@ async def ingest_and_ai_task_impl(job_id: str): } } ) + + # Broadcast status update + broadcast_status_update( + job_id, + JobStatus.INGESTING.value, + "Starting video ingestion and processing", + progress=10 + ) # Get job details job_doc = await db.jobs.find_one({"_id": job_id}) @@ -113,6 +174,14 @@ async def ingest_and_ai_task_impl(job_id: str): } } ) + + # Broadcast status update + broadcast_status_update( + job_id, + JobStatus.AI_PROCESSING.value, + "Processing video with AI for accessibility features", + progress=50 + ) # Probe video for metadata duration = await _get_video_duration(temp_path) @@ -171,6 +240,14 @@ async def ingest_and_ai_task_impl(job_id: str): } } ) + + # Broadcast status update + broadcast_status_update( + job_id, + JobStatus.PENDING_QC.value, + "AI processing complete - ready for quality review", + progress=100 + ) logger.info(f"Successfully completed ingestion and AI processing for job {job_id}") diff --git a/backend/app/tasks/notify.py b/backend/app/tasks/notify.py index ac0ab4a..f7ece5a 100644 --- a/backend/app/tasks/notify.py +++ b/backend/app/tasks/notify.py @@ -1,12 +1,14 @@ import asyncio from datetime import datetime +from bson import ObjectId from celery import Task +from celery.exceptions import Retry from motor.motor_asyncio import AsyncIOMotorClient from ..core.config import settings from ..core.logging import get_logger -from ..models.audit_log import AuditLogCreate +from ..models.audit_log import AuditLogCreate, AuditAction from ..services.emailer import email_service from ..services.gcs import get_signed_download_url from . import celery_app @@ -14,8 +16,8 @@ from . import celery_app logger = get_logger(__name__) -class AsyncTask(Task): - """Base task class that supports async execution""" +class NotifyClientTask(Task): + """Async task for client notifications""" def __call__(self, *args, **kwargs): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) @@ -24,119 +26,183 @@ class AsyncTask(Task): finally: loop.close() - async def run_async(self, *args, **kwargs): - raise NotImplementedError + async def run_async(self, job_id: str): + """ + Pipeline 3: Client Notification + Triggered when job status changes to 'completed' + """ + logger.info(f"Starting client notification for job {job_id}") + # Connect to MongoDB + client = AsyncIOMotorClient(settings.mongodb_uri) + db = client[settings.mongodb_db] -@celery_app.task(bind=True, base=AsyncTask) -async def notify_client_task(self, job_id: str): - """ - Pipeline 3: Client Notification - Triggered when job status changes to 'completed' - """ - logger.info(f"Starting client notification for job {job_id}") + try: + # Get job and client details + job_doc = await db.jobs.find_one({"_id": job_id}) + if not job_doc: + logger.error(f"Job {job_id} not found in database") + return # Don't retry for missing jobs - # Connect to MongoDB - client = AsyncIOMotorClient(settings.mongodb_uri) - db = client[settings.mongodb_db] + if job_doc["status"] != "completed": + logger.warning(f"Job {job_id} not in completed status (current: {job_doc['status']}), skipping notification") + return - try: - # Get job and client details - job_doc = await db.jobs.find_one({"_id": job_id}) - if not job_doc: - raise ValueError(f"Job {job_id} not found") - - if job_doc["status"] != "completed": - logger.warning(f"Job {job_id} not in completed status, skipping notification") - return - - # Get client email - client_doc = await db.users.find_one({"_id": job_doc["client_id"]}) - if not client_doc: - raise ValueError(f"Client {job_doc['client_id']} not found") - - # Generate signed URLs for all outputs - download_links = {} - outputs = job_doc.get("outputs", {}) - - for language, lang_output in outputs.items(): - if not isinstance(lang_output, dict): - continue - - lang_downloads = {} - - # Captions VTT - if "captions_vtt_gcs" in lang_output: - blob_path = lang_output["captions_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") + # Get client ID and ensure proper ObjectId format + client_id = job_doc["client_id"] + logger.info(f"Looking up client {client_id} for job {job_id}") + + # Try looking up client by string ID first + client_doc = await db.users.find_one({"_id": client_id}) + if not client_doc: + # Try as ObjectId if string lookup failed try: - signed_url = await get_signed_download_url(blob_path, 24) - lang_downloads["captions_vtt"] = signed_url - except Exception as e: - logger.warning(f"Failed to generate signed URL for captions {language}: {e}") + client_doc = await db.users.find_one({"_id": ObjectId(client_id)}) + except: + pass # Invalid ObjectId format + + if not client_doc: + logger.error(f"Client {client_id} not found in database for job {job_id}") + # Don't retry for missing users - this is likely a data issue + return - # Audio Description VTT - if "ad_vtt_gcs" in lang_output: - blob_path = lang_output["ad_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") + # Generate signed URLs for all outputs + download_links = {} + outputs = job_doc.get("outputs", {}) + + for language, lang_output in outputs.items(): + if not isinstance(lang_output, dict): + continue + + lang_downloads = {} + + # Captions VTT + if "captions_vtt_gcs" in lang_output: + blob_path = lang_output["captions_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") + try: + signed_url = await get_signed_download_url(blob_path, 24) + lang_downloads["captions_vtt"] = signed_url + except Exception as e: + logger.warning(f"Failed to generate signed URL for captions {language}: {e}") + + # Audio Description VTT + if "ad_vtt_gcs" in lang_output: + blob_path = lang_output["ad_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") + try: + signed_url = await get_signed_download_url(blob_path, 24) + lang_downloads["audio_description_vtt"] = signed_url + except Exception as e: + logger.warning(f"Failed to generate signed URL for AD VTT {language}: {e}") + + # Audio Description MP3 + if "ad_mp3_gcs" in lang_output: + blob_path = lang_output["ad_mp3_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") + try: + signed_url = await get_signed_download_url(blob_path, 24) + lang_downloads["audio_description_mp3"] = signed_url + except Exception as e: + logger.warning(f"Failed to generate signed URL for AD MP3 {language}: {e}") + + if lang_downloads: + download_links[language] = lang_downloads + + # Send completion email (temporarily disabled) + # TODO: Re-enable emails once authentication is configured + email_enabled = False # Set to True to re-enable emails + + if email_enabled: try: - signed_url = await get_signed_download_url(blob_path, 24) - lang_downloads["audio_description_vtt"] = signed_url - except Exception as e: - logger.warning(f"Failed to generate signed URL for AD VTT {language}: {e}") + success = await email_service.send_completion_email( + recipient_email=client_doc["email"], + job_title=job_doc["title"], + download_links=download_links + ) - # Audio Description MP3 - if "ad_mp3_gcs" in lang_output: - blob_path = lang_output["ad_mp3_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") - try: - signed_url = await get_signed_download_url(blob_path, 24) - lang_downloads["audio_description_mp3"] = signed_url - except Exception as e: - logger.warning(f"Failed to generate signed URL for AD MP3 {language}: {e}") + if success: + logger.info(f"Successfully sent completion email to {client_doc['email']} for job {job_id}") + else: + logger.warning(f"Email service returned failure for job {job_id} - treating as non-retryable") + + except Exception as email_error: + error_msg = str(email_error) + logger.error(f"Email sending exception for job {job_id}: {error_msg}") + + # Check if this is an authentication error (non-retryable) + if "401" in error_msg or "Unauthorized" in error_msg or "authentication" in error_msg.lower(): + logger.warning(f"Email authentication failed for job {job_id} - treating as non-retryable configuration error") + else: + # Other email errors might be transient + raise ValueError(f"Email sending failed: {error_msg}") + else: + logger.info(f"Email notifications are currently disabled - skipping email for job {job_id}") + logger.info(f"Would have sent completion email to {client_doc['email']} with {sum(len(files) for files in download_links.values())} download links") - if lang_downloads: - download_links[language] = lang_downloads - - # Send completion email - success = await email_service.send_completion_email( - recipient_email=client_doc["email"], - job_title=job_doc["title"], - download_links=download_links - ) - - if success: - # Log audit entry + # Log audit entry (regardless of email status) audit_log = AuditLogCreate( - job_id=job_id, - action="client_notified", + action=AuditAction.JOB_STATUS_CHANGE, + description=f"Job {job_id} completed - client notification processed", + resource_type="job", + resource_id=job_id, + resource_name=job_doc["title"], details={ "email": client_doc["email"], - "download_count": sum(len(files) for files in download_links.values()) + "download_count": sum(len(files) for files in download_links.values()), + "email_sent": email_enabled, + "status": "completed" } ) - await db.audit_logs.insert_one(audit_log.dict()) + await db.audit_logs.insert_one(audit_log.model_dump()) - logger.info(f"Successfully notified client for job {job_id}") - else: - raise ValueError("Failed to send completion email") + logger.info(f"Successfully completed notification processing for job {job_id}") - except Exception as e: - logger.error(f"Client notification failed for job {job_id}: {e}") + except Exception as e: + error_msg = str(e) + logger.error(f"Client notification failed for job {job_id}: {error_msg}") - # Update job with error - await db.jobs.update_one( - {"_id": job_id}, - { - "$set": { - "error": { - "type": "notification_failure", - "message": str(e), - "timestamp": datetime.utcnow().isoformat() - }, - "updated_at": datetime.utcnow() - } - } - ) + # Update job with error + try: + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": { + "error": { + "type": "notification_failure", + "message": error_msg, + "timestamp": datetime.utcnow().isoformat() + }, + "updated_at": datetime.utcnow() + } + } + ) + except Exception as update_error: + logger.error(f"Failed to update job {job_id} with error: {update_error}") - raise + # Only retry for transient errors, not configuration or data errors + non_retryable_patterns = [ + "not found", + "401", + "unauthorized", + "authentication", + "failed to send completion email" + ] + + should_not_retry = any(pattern in error_msg.lower() for pattern in non_retryable_patterns) + + if should_not_retry: + logger.info(f"Skipping retry for job {job_id} due to non-retryable error: {error_msg}") + return + else: + # This might be a transient error, let it retry + logger.info(f"Allowing retry for job {job_id} due to potentially transient error: {error_msg}") + raise - finally: - client.close() + finally: + client.close() + + +# Register the task with manual retry control +@celery_app.task(bind=True, base=NotifyClientTask, max_retries=3, default_retry_delay=60) +def notify_client_task(self, job_id: str): + """Celery task wrapper for client notification""" + # This method is called by NotifyClientTask.__call__ + pass diff --git a/backend/app/tasks/translate_and_synthesize.py b/backend/app/tasks/translate_and_synthesize.py index b90fe67..d44f5eb 100644 --- a/backend/app/tasks/translate_and_synthesize.py +++ b/backend/app/tasks/translate_and_synthesize.py @@ -14,11 +14,63 @@ from ..services.gcs import gcs_service, upload_vtt_to_gcs from ..services.gemini import gemini_service from ..services.translate import translate_service from ..services.tts import tts_service +from ..services.websocket import connection_manager from . import celery_app logger = get_logger(__name__) +def broadcast_status_update(job_id: str, status: str, message: str = None, progress: int = None): + """ + Helper function to broadcast job status updates via WebSocket + Uses sync Redis client for Celery worker context + """ + logger.info(f"🔊 ATTEMPTING TO BROADCAST: job_id={job_id}, status={status}, message={message}") + try: + import redis as sync_redis + from ..core.config import settings + from ..services.websocket import JobStatusUpdate + from datetime import datetime + + logger.info(f"🔊 About to create JobStatusUpdate for job {job_id}") + + # Create status update + update = JobStatusUpdate( + job_id=job_id, + status=status, + updated_at=datetime.utcnow(), + message=message, + progress=progress + ) + + logger.info(f"🔊 Created update object, now connecting to Redis: {settings.redis_url}") + + # Create synchronous Redis client + redis_client = sync_redis.Redis.from_url( + settings.redis_url, + encoding="utf-8", + decode_responses=True + ) + + logger.info(f"🔊 Redis client created, now publishing to channels") + + # Publish to channels + result1 = redis_client.publish("job_status_updates", update.model_dump_json()) + result2 = redis_client.publish(f"job_status_updates:{job_id}", update.model_dump_json()) + + logger.info(f"🔊 Published to channels - general: {result1} subscribers, job-specific: {result2} subscribers") + + # Close connection + redis_client.close() + + logger.info(f"🔊 ✅ Successfully broadcasted status update for job {job_id}: {status}") + + except Exception as e: + logger.error(f"🔊 ❌ Failed to broadcast status update for job {job_id}: {e}") + import traceback + logger.error(f"🔊 ❌ Full traceback: {traceback.format_exc()}") + + async def retry_with_backoff(func, max_retries=3, base_delay=1): """Retry a function with exponential backoff""" last_exception = None @@ -104,6 +156,14 @@ async def _async_translate_and_synthesize(job_id: str): } } ) + + # Broadcast status update + broadcast_status_update( + job_id, + JobStatus.TRANSLATING.value, + "Starting translation and transcreation process", + progress=10 + ) # Get English VTT content en_outputs = job_doc["outputs"]["en"] @@ -203,6 +263,14 @@ async def _async_translate_and_synthesize(job_id: str): } } ) + + # Broadcast status update + broadcast_status_update( + job_id, + JobStatus.TTS_GENERATING.value, + "Generating audio descriptions with text-to-speech", + progress=70 + ) # Generate TTS for languages that need MP3 if job_doc["requested_outputs"]["audio_description_mp3"]: @@ -225,6 +293,14 @@ async def _async_translate_and_synthesize(job_id: str): } } ) + + # Broadcast status update + broadcast_status_update( + job_id, + JobStatus.PENDING_FINAL_REVIEW.value, + "Translation and TTS complete - ready for final review", + progress=100 + ) logger.info(f"Successfully completed translation and synthesis for job {job_id}") diff --git a/frontend/src/components/WebSocketToastHandler.tsx b/frontend/src/components/WebSocketToastHandler.tsx new file mode 100644 index 0000000..35ececc --- /dev/null +++ b/frontend/src/components/WebSocketToastHandler.tsx @@ -0,0 +1,74 @@ +/** + * Optional component to handle WebSocket connection status toasts + * This can be used in components that want to show connection status notifications + */ +import { useCallback } from 'react'; +import { useToastContext } from '../contexts/ToastContext'; +import type { ConnectionStatus } from '../hooks/useJobStatusWebSocket'; + +export interface WebSocketToastHandlerProps { + /** Whether to show connection toasts */ + enabled?: boolean; + + /** Custom connection status messages */ + messages?: { + connected?: string; + connecting?: string; + disconnected?: string; + error?: string; + }; +} + +/** + * Hook that returns a connection change handler for WebSocket toasts + */ +export function useWebSocketToastHandler(props: WebSocketToastHandlerProps = {}) { + const { + enabled = false, + messages = {} + } = props; + + const toast = useToastContext(); + + const handleConnectionChange = useCallback((status: ConnectionStatus) => { + if (!enabled) return; + + switch (status) { + case 'connected': + toast.success(messages.connected || 'Real-time updates connected'); + break; + + case 'connecting': + // Usually don't show toast for connecting to avoid spam + // toast.info(messages.connecting || 'Connecting to real-time updates...'); + break; + + case 'disconnected': + toast.warning(messages.disconnected || 'Real-time updates disconnected'); + break; + + case 'error': + toast.error(messages.error || 'Connection error - using cached data'); + break; + } + }, [enabled, messages, toast]); + + return handleConnectionChange; +} + +/** + * Default connection status messages for different contexts + */ +export const CONNECTION_MESSAGES = { + jobList: { + connected: 'Job list real-time updates enabled', + disconnected: 'Job list updates disconnected', + error: 'Job list connection error' + }, + + jobDetail: { + connected: 'Job status real-time updates enabled', + disconnected: 'Job status updates disconnected', + error: 'Job status connection error' + } +} as const; \ No newline at end of file diff --git a/frontend/src/hooks/useJobStatusWebSocket.ts b/frontend/src/hooks/useJobStatusWebSocket.ts new file mode 100644 index 0000000..9a81c61 --- /dev/null +++ b/frontend/src/hooks/useJobStatusWebSocket.ts @@ -0,0 +1,410 @@ +/** + * WebSocket hook for real-time job status updates + * + * Provides WebSocket connections for: + * 1. Individual job status updates: useJobStatusWebSocket(jobId) + * 2. Job list updates: useJobStatusWebSocket() without jobId + */ +import { useEffect, useState, useRef, useCallback } from 'react'; +import { useQueryClient } from '@tanstack/react-query'; +import { useAuthStore } from '../lib/auth'; +import { apiClient } from '../lib/api'; + +export interface JobStatusUpdate { + job_id: string; + status: string; + updated_at: string; + message?: string; + progress?: number; + metadata?: Record; +} + +export interface WebSocketMessage { + type: 'connection_established' | 'job_status_update' | 'job_list_update'; + data?: JobStatusUpdate; + job_id?: string; + scope?: string; + timestamp?: string; +} + +export type ConnectionStatus = 'connecting' | 'connected' | 'disconnected' | 'error'; + +interface UseJobStatusWebSocketOptions { + /** + * Whether to automatically reconnect on connection loss + * @default true + */ + autoReconnect?: boolean; + + /** + * Maximum number of reconnection attempts + * @default 5 + */ + maxReconnectAttempts?: number; + + /** + * Initial reconnection delay in milliseconds + * @default 1000 + */ + reconnectDelay?: number; + + /** + * Whether to log debug information + * @default false + */ + debug?: boolean; + + /** + * Whether to show toast notifications for status updates + * @default true + */ + showToasts?: boolean; + + /** + * Custom toast handler function for status updates + */ + onStatusUpdate?: (update: JobStatusUpdate) => void; + + /** + * Toast handler for connection status changes + */ + onConnectionChange?: (status: ConnectionStatus) => void; +} + +interface UseJobStatusWebSocketReturn { + /** Current connection status */ + connectionStatus: ConnectionStatus; + + /** Latest received message */ + lastMessage: WebSocketMessage | null; + + /** Latest job status update */ + lastUpdate: JobStatusUpdate | null; + + /** Manual reconnect function */ + reconnect: () => void; + + /** Disconnect function */ + disconnect: () => void; + + /** Send message (for heartbeat, etc.) */ + sendMessage: (message: string) => void; +} + +/** + * WebSocket hook for job status updates + * + * @param jobId - Optional job ID for specific job updates. If not provided, subscribes to all job updates + * @param options - Configuration options + */ +export function useJobStatusWebSocket( + jobId?: string, + options: UseJobStatusWebSocketOptions = {} +): UseJobStatusWebSocketReturn { + const { + autoReconnect = true, + maxReconnectAttempts = 5, + reconnectDelay = 1000, + debug = false, + showToasts = true, + onStatusUpdate, + onConnectionChange + } = options; + + const queryClient = useQueryClient(); + const { isAuthenticated } = useAuthStore(); + + // Get access token from API client instead of auth store + const accessToken = apiClient.getAccessToken(); + + const [connectionStatus, setConnectionStatus] = useState('disconnected'); + const [lastMessage, setLastMessage] = useState(null); + const [lastUpdate, setLastUpdate] = useState(null); + + const wsRef = useRef(null); + const reconnectAttemptsRef = useRef(0); + const reconnectTimeoutRef = useRef(null); + const heartbeatIntervalRef = useRef(null); + const mountedRef = useRef(true); + + // Cache recent updates to prevent duplicates + const recentUpdatesRef = useRef>(new Map()); + + const log = useCallback((...args: unknown[]) => { + if (debug) { + console.log('[WebSocket]', ...args); + } + }, [debug]); + + const getWebSocketUrl = useCallback(() => { + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const host = window.location.host; + const basePath = '/api/v1/ws/jobs'; + const path = jobId ? `${basePath}/${jobId}` : basePath; + const token = encodeURIComponent(accessToken || ''); + return `${protocol}//${host}${path}?token=${token}`; + }, [jobId, accessToken]); + + const handleStatusUpdate = useCallback((update: JobStatusUpdate) => { + // Check for duplicate status updates within the last 5 seconds + const updateKey = `${update.job_id}:${update.status}`; + const now = Date.now(); + const recent = recentUpdatesRef.current.get(updateKey); + + if (recent && (now - recent.timestamp) < 5000) { + // Skip duplicate status update within 5 seconds + log('Skipping duplicate status update:', updateKey); + return; + } + + // Store this update + recentUpdatesRef.current.set(updateKey, { status: update.status, timestamp: now }); + + // Clean up old entries (older than 30 seconds) + const cutoff = now - 30000; + for (const [key, value] of recentUpdatesRef.current.entries()) { + if (value.timestamp < cutoff) { + recentUpdatesRef.current.delete(key); + } + } + + // Call custom handler if provided + if (onStatusUpdate) { + onStatusUpdate(update); + } + }, [onStatusUpdate, log]); + + const handleConnectionChange = useCallback((status: ConnectionStatus) => { + // Call custom handler if provided + if (onConnectionChange) { + onConnectionChange(status); + } + }, [onConnectionChange]); + + const updateQueryCache = useCallback((update: JobStatusUpdate) => { + // Update individual job cache if we have job_id + if (update.job_id) { + queryClient.setQueryData(['jobs', update.job_id], (oldData: unknown) => { + if (oldData) { + return { + ...oldData, + status: update.status, + updated_at: update.updated_at + }; + } + return oldData; + }); + } + + // Update job list cache + queryClient.setQueriesData({ queryKey: ['jobs'] }, (oldData: unknown) => { + // Type-safe handling of the jobs list data + const data = oldData as { jobs?: Array<{ id: string; status: string; updated_at: string; [key: string]: unknown }> }; + if (!data?.jobs) return oldData; + + const updatedJobs = data.jobs.map((job) => { + if (job.id === update.job_id) { + return { + ...job, + status: update.status, + updated_at: update.updated_at + }; + } + return job; + }); + + return { + ...data, + jobs: updatedJobs + }; + }); + }, [queryClient]); + + const handleMessage = useCallback((event: MessageEvent) => { + try { + // Handle plain text responses (like "pong" heartbeat responses) + if (typeof event.data === 'string' && event.data === 'pong') { + log('Received heartbeat response:', event.data); + return; + } + + const message: WebSocketMessage = JSON.parse(event.data); + log('Received message:', message); + + setLastMessage(message); + + if (message.type === 'job_status_update' || message.type === 'job_list_update') { + if (message.data) { + setLastUpdate(message.data); + updateQueryCache(message.data); + handleStatusUpdate(message.data); + } + } + } catch (error) { + console.error('[WebSocket] Failed to parse WebSocket message:', error, 'Raw data:', event.data); + } + }, [log, updateQueryCache, handleStatusUpdate]); + + const handleOpen = useCallback(() => { + console.log('[WebSocket] Connected successfully!'); + log('WebSocket connected'); + setConnectionStatus('connected'); + handleConnectionChange('connected'); + reconnectAttemptsRef.current = 0; + + // Start heartbeat + heartbeatIntervalRef.current = setInterval(() => { + if (wsRef.current?.readyState === WebSocket.OPEN) { + wsRef.current.send('ping'); + } + }, 30000); // Ping every 30 seconds + }, [log, handleConnectionChange]); + + const handleClose = useCallback((event: CloseEvent) => { + log('WebSocket closed:', event.code, event.reason); + setConnectionStatus('disconnected'); + handleConnectionChange('disconnected'); + + // Clear heartbeat + if (heartbeatIntervalRef.current) { + clearInterval(heartbeatIntervalRef.current); + heartbeatIntervalRef.current = null; + } + + // Attempt to reconnect if enabled and component is still mounted + if ( + autoReconnect && + mountedRef.current && + reconnectAttemptsRef.current < maxReconnectAttempts && + event.code !== 1000 // Don't reconnect on normal closure + ) { + const delay = reconnectDelay * Math.pow(2, reconnectAttemptsRef.current); + log(`Reconnecting in ${delay}ms (attempt ${reconnectAttemptsRef.current + 1}/${maxReconnectAttempts})`); + + reconnectTimeoutRef.current = setTimeout(() => { + if (mountedRef.current) { + reconnectAttemptsRef.current++; + connect(); + } + }, delay); + } + }, [log, autoReconnect, maxReconnectAttempts, reconnectDelay, handleConnectionChange]); + + const handleError = useCallback((error: Event) => { + console.error('WebSocket error:', error); + setConnectionStatus('error'); + handleConnectionChange('error'); + }, [handleConnectionChange]); + + const connect = useCallback(() => { + if (!accessToken) { + log('No access token available, skipping WebSocket connection'); + return; + } + + if (wsRef.current?.readyState === WebSocket.CONNECTING || + wsRef.current?.readyState === WebSocket.OPEN) { + log('WebSocket already connecting or connected'); + return; + } + + try { + setConnectionStatus('connecting'); + handleConnectionChange('connecting'); + + const url = getWebSocketUrl(); + console.log('[WebSocket] Attempting to connect to:', url.replace(/token=[^&]+/, 'token=***')); + log('Connecting to:', url.replace(/token=[^&]+/, 'token=***')); + + wsRef.current = new WebSocket(url); + wsRef.current.addEventListener('open', handleOpen); + wsRef.current.addEventListener('message', handleMessage); + wsRef.current.addEventListener('close', handleClose); + wsRef.current.addEventListener('error', handleError); + + } catch (error) { + console.error('[WebSocket] Failed to create WebSocket connection:', error); + setConnectionStatus('error'); + handleConnectionChange('error'); + } + }, [accessToken, getWebSocketUrl, handleOpen, handleMessage, handleClose, handleError, log, handleConnectionChange]); + + const disconnect = useCallback(() => { + log('Manually disconnecting WebSocket'); + + // Clear reconnection timeout + if (reconnectTimeoutRef.current) { + clearTimeout(reconnectTimeoutRef.current); + reconnectTimeoutRef.current = null; + } + + // Clear heartbeat + if (heartbeatIntervalRef.current) { + clearInterval(heartbeatIntervalRef.current); + heartbeatIntervalRef.current = null; + } + + // Close WebSocket + if (wsRef.current) { + wsRef.current.removeEventListener('open', handleOpen); + wsRef.current.removeEventListener('message', handleMessage); + wsRef.current.removeEventListener('close', handleClose); + wsRef.current.removeEventListener('error', handleError); + + if (wsRef.current.readyState === WebSocket.OPEN || + wsRef.current.readyState === WebSocket.CONNECTING) { + wsRef.current.close(1000, 'Manual disconnect'); + } + wsRef.current = null; + } + + setConnectionStatus('disconnected'); + handleConnectionChange('disconnected'); + }, [log, handleOpen, handleMessage, handleClose, handleError, handleConnectionChange]); + + const reconnect = useCallback(() => { + log('Manual reconnect requested'); + disconnect(); + reconnectAttemptsRef.current = 0; + setTimeout(connect, 100); + }, [log, disconnect]); // Exclude connect to prevent dependency loops + + const sendMessage = useCallback((message: string) => { + if (wsRef.current?.readyState === WebSocket.OPEN) { + wsRef.current.send(message); + log('Sent message:', message); + } else { + console.warn('WebSocket not connected, cannot send message'); + } + }, [log]); + + // Connect on mount and when dependencies change + useEffect(() => { + const currentToken = apiClient.getAccessToken(); + if (currentToken && isAuthenticated) { + connect(); + } + + return () => { + mountedRef.current = false; + disconnect(); + }; + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [isAuthenticated, jobId]); // Reconnect when auth state or jobId changes + + // Cleanup on unmount + useEffect(() => { + return () => { + mountedRef.current = false; + }; + }, []); + + return { + connectionStatus, + lastMessage, + lastUpdate, + reconnect, + disconnect, + sendMessage + }; +} \ No newline at end of file diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index fe5ea1f..71be4b3 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -85,6 +85,10 @@ class ApiClient { this.accessToken = null; } + getAccessToken(): string | null { + return this.accessToken; + } + // Auth endpoints async login(credentials: LoginRequest): Promise { const response = await this.client.post('/auth/login', credentials); diff --git a/frontend/src/routes/jobs/JobDetail.tsx b/frontend/src/routes/jobs/JobDetail.tsx index 20c75cd..01f5966 100644 --- a/frontend/src/routes/jobs/JobDetail.tsx +++ b/frontend/src/routes/jobs/JobDetail.tsx @@ -1,9 +1,13 @@ -import { useState } from 'react'; +import { useState, useCallback } from 'react'; import { useParams, Link } from 'react-router-dom'; import { formatDistanceToNow } from 'date-fns'; import { useJob, useJobDownloads, useJobVttContent } from '../../hooks/useJob'; +import { useJobStatusWebSocket } from '../../hooks/useJobStatusWebSocket'; import { StatusBadge } from '../../components/StatusBadge'; import { VideoWithCaptions } from '../../components/VideoWithCaptions'; +import { useToastContext } from '../../contexts/ToastContext'; +import { getStatusMessageConfig } from '../../utils/jobStatusMessages'; +import type { JobStatusUpdate } from '../../hooks/useJobStatusWebSocket'; const ProgressIndicator = ({ status }: { status: string }) => { @@ -58,10 +62,34 @@ const ProgressIndicator = ({ status }: { status: string }) => { export function JobDetail() { const { id } = useParams(); const [activeTab, setActiveTab] = useState<'overview' | 'video' | 'assets' | 'history'>('overview'); + const toast = useToastContext(); const { data: job, isLoading, error } = useJob(id!); - const { data: downloads, isLoading: downloadsLoading, error: downloadsError } = useJobDownloads(id!); - const { data: englishVtt, isLoading: vttLoading, error: vttError } = useJobVttContent(id!, 'en'); + const { data: downloads } = useJobDownloads(id!); + const { data: englishVtt } = useJobVttContent(id!, 'en'); + + // Handle job status updates with toast notifications + const handleStatusUpdate = useCallback((update: JobStatusUpdate) => { + // Use the current job title or fallback + const jobTitle = job?.title; + + const { message, type, showToast } = getStatusMessageConfig( + update.status, + jobTitle, + update.message + ); + + if (showToast) { + toast[type](message); + } + }, [job?.title, toast]); + + // WebSocket connection for real-time job status updates + const { connectionStatus } = useJobStatusWebSocket(id, { + debug: false, + autoReconnect: true, + onStatusUpdate: handleStatusUpdate + }); // Get video URL from downloads @@ -289,7 +317,18 @@ export function JobDetail() { Created {formatDistanceToNow(new Date(job.created_at))} ago

- +
+ + {connectionStatus === 'connected' && ( + + )} + {connectionStatus === 'connecting' && ( + + )} + {connectionStatus === 'error' && ( + + )} +
diff --git a/frontend/src/routes/jobs/JobsList.tsx b/frontend/src/routes/jobs/JobsList.tsx index 13b5e3e..df0463a 100644 --- a/frontend/src/routes/jobs/JobsList.tsx +++ b/frontend/src/routes/jobs/JobsList.tsx @@ -1,11 +1,14 @@ -import { useState, useMemo, useEffect } from 'react'; +import { useState, useMemo, useEffect, useCallback } from 'react'; import { Link, useSearchParams } from 'react-router-dom'; import { formatDistanceToNow } from 'date-fns'; import { useAuthStore } from '../../lib/auth'; import { useJobs, useBulkDeleteJobs } from '../../hooks/useJob'; +import { useJobStatusWebSocket } from '../../hooks/useJobStatusWebSocket'; import { StatusBadge } from '../../components/StatusBadge'; import { useToastContext } from '../../contexts/ToastContext'; +import { getStatusMessageConfig } from '../../utils/jobStatusMessages'; import type { Job } from '../../types/api'; +import type { JobStatusUpdate } from '../../hooks/useJobStatusWebSocket'; const STATUS_OPTIONS = [ { value: '', label: 'All Statuses' }, @@ -28,7 +31,7 @@ const SORT_OPTIONS = [ export function JobsList() { const { user } = useAuthStore(); const toast = useToastContext(); - const [searchParams, setSearchParams] = useSearchParams(); + const [searchParams] = useSearchParams(); const [searchTerm, setSearchTerm] = useState(''); const [statusFilter, setStatusFilter] = useState(searchParams.get('status') || ''); const [sortBy, setSortBy] = useState('created_at_desc'); @@ -56,6 +59,30 @@ export function JobsList() { const bulkDeleteMutation = useBulkDeleteJobs(); + // Handle job status updates with toast notifications + const handleStatusUpdate = useCallback((update: JobStatusUpdate) => { + // Find the job to get its title + const job = jobsData?.jobs.find((j: Job) => j.id === update.job_id); + const jobTitle = job?.title; + + const { message, type, showToast } = getStatusMessageConfig( + update.status, + jobTitle, + update.message + ); + + if (showToast) { + toast[type](message); + } + }, [jobsData?.jobs, toast]); + + // WebSocket connection for real-time job status updates + const { connectionStatus } = useJobStatusWebSocket(undefined, { + debug: false, + autoReconnect: true, + onStatusUpdate: handleStatusUpdate + }); + // Client-side filtering and sorting for search term const filteredAndSortedJobs = useMemo(() => { let jobs = jobsData?.jobs || []; @@ -177,12 +204,30 @@ export function JobsList() { } }; + const getConnectionStatusIcon = () => { + switch (connectionStatus) { + case 'connected': + return ; + case 'connecting': + return ; + case 'disconnected': + return ; + case 'error': + return ; + default: + return null; + } + }; + return (
{/* Header */}
-

All Jobs

+
+

All Jobs

+ {getConnectionStatusIcon()} +

{user?.role === 'client' ? 'Your video processing jobs' : 'System-wide job management'}

diff --git a/frontend/src/utils/jobStatusMessages.ts b/frontend/src/utils/jobStatusMessages.ts new file mode 100644 index 0000000..64a67a6 --- /dev/null +++ b/frontend/src/utils/jobStatusMessages.ts @@ -0,0 +1,127 @@ +/** + * Utility functions for generating user-friendly job status messages and toast notifications + */ + +export interface StatusMessageConfig { + message: string; + type: 'success' | 'info' | 'warning' | 'error'; + showToast: boolean; +} + +/** + * Get user-friendly message and toast configuration for job status updates + */ +export function getStatusMessageConfig( + status: string, + jobTitle?: string, + customMessage?: string +): StatusMessageConfig { + const title = jobTitle ? `"${jobTitle}"` : 'Job'; + const fallbackMessage = customMessage || ''; + + switch (status) { + case 'created': + return { + message: `${title} has been created and queued for processing`, + type: 'info', + showToast: true + }; + + case 'ingesting': + return { + message: `${title} is being ingested and prepared for AI processing`, + type: 'info', + showToast: true + }; + + case 'ai_processing': + return { + message: `${title} is being processed by AI to generate accessibility features`, + type: 'info', + showToast: true + }; + + case 'pending_qc': + return { + message: `${title} is ready for quality control review`, + type: 'info', + showToast: true + }; + + case 'approved_english': + return { + message: `${title} English content has been approved - starting translation`, + type: 'success', + showToast: true + }; + + case 'rejected': + return { + message: `${title} has been rejected and requires revision`, + type: 'warning', + showToast: true + }; + + case 'translating': + return { + message: `${title} is being translated and transcreated into requested languages`, + type: 'info', + showToast: true + }; + + case 'tts_generating': + return { + message: `${title} is generating audio descriptions with text-to-speech`, + type: 'info', + showToast: true + }; + + case 'pending_final_review': + return { + message: `${title} is ready for final review before completion`, + type: 'info', + showToast: true + }; + + case 'qc_feedback': + return { + message: `${title} final review has been rejected - requires changes`, + type: 'warning', + showToast: true + }; + + case 'completed': + return { + message: `${title} has been completed successfully! 🎉 All files are ready for download`, + type: 'success', + showToast: true + }; + + default: + return { + message: fallbackMessage || `${title} status updated to ${status}`, + type: 'info', + showToast: !!fallbackMessage + }; + } +} + +/** + * Get a shorter status message for progress updates + */ +export function getProgressMessage(status: string, progress?: number): string { + const progressText = progress !== undefined ? ` (${progress}%)` : ''; + + switch (status) { + case 'ingesting': + return `Ingesting video${progressText}`; + case 'ai_processing': + return `AI processing${progressText}`; + case 'translating': + return `Translating content${progressText}`; + case 'tts_generating': + return `Generating audio${progressText}`; + default: + return status.replace(/_/g, ' '); + } +} \ No newline at end of file diff --git a/frontend/vite.config.ts b/frontend/vite.config.ts index 3d9d515..1f439e8 100644 --- a/frontend/vite.config.ts +++ b/frontend/vite.config.ts @@ -5,6 +5,15 @@ import react from '@vitejs/plugin-react' // https://vite.dev/config/ export default defineConfig({ plugins: [react()], + server: { + proxy: { + '/api': { + target: 'http://localhost:8000', + changeOrigin: true, + ws: true, // Enable WebSocket proxying + }, + }, + }, test: { globals: true, environment: 'jsdom',