added websockets for live job status updates with toast notifications on job list page

This commit is contained in:
michael 2025-08-24 19:41:23 -05:00
parent af2562096a
commit 0c54dd4f29
22 changed files with 1689 additions and 107 deletions

View file

@ -27,6 +27,7 @@ from ...schemas.job import (
VttTimingAdjustRequest,
VttUpdateRequest,
)
from ...services.websocket import connection_manager
from ...services.gcs import (
gcs_service,
upload_file_to_gcs,
@ -351,6 +352,17 @@ async def approve_english(
detail="Job not found or not in pending QC status"
)
# Broadcast status update
try:
await connection_manager.broadcast_job_status_update(
job_id=job_id,
status=JobStatus.APPROVED_ENGLISH.value,
message="English content approved - starting translation",
progress=None
)
except Exception as e:
logger.warning(f"Failed to broadcast status update for job {job_id}: {e}")
# Trigger translation and synthesis pipeline immediately
try:
translate_and_synthesize_task.delay(job_id)
@ -406,6 +418,17 @@ async def reject_job(
detail="Job not found or not in pending QC status"
)
# Broadcast status update
try:
await connection_manager.broadcast_job_status_update(
job_id=job_id,
status=JobStatus.REJECTED.value,
message="Job rejected - requires revision",
progress=None
)
except Exception as e:
logger.warning(f"Failed to broadcast status update for job {job_id}: {e}")
return JobResponse(
id=str(result["_id"]),
title=result["title"],
@ -474,6 +497,17 @@ async def complete_job(
detail="Job not found or not in pending final review status"
)
# Broadcast status update
try:
await connection_manager.broadcast_job_status_update(
job_id=job_id,
status=JobStatus.COMPLETED.value,
message="Job completed - all files ready for download",
progress=100
)
except Exception as e:
logger.warning(f"Failed to broadcast status update for job {job_id}: {e}")
return JobResponse(
id=str(result["_id"]),
title=result["title"],
@ -521,6 +555,17 @@ async def reject_final_review(
detail="Job not found or not in pending final review status"
)
# Broadcast status update
try:
await connection_manager.broadcast_job_status_update(
job_id=job_id,
status=JobStatus.QC_FEEDBACK.value,
message="Final review rejected - requires changes",
progress=None
)
except Exception as e:
logger.warning(f"Failed to broadcast status update for job {job_id}: {e}")
return JobResponse(
id=str(result["_id"]),
title=result["title"],

View file

@ -0,0 +1,214 @@
"""
WebSocket routes for real-time job status updates
Provides WebSocket endpoints for:
1. Individual job status updates: /ws/jobs/{job_id}
2. Job list updates: /ws/jobs (all jobs for authenticated user)
"""
import logging
from typing import Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, HTTPException, Depends, Query
from fastapi.security import HTTPBearer
from ...services.websocket import (
connection_manager,
authenticate_websocket,
get_connection_manager,
ConnectionManager
)
from ...models.job import Job
from ...core.database import get_database
from ...core.dependencies import get_current_user
logger = logging.getLogger(__name__)
router = APIRouter(tags=["WebSocket"])
security = HTTPBearer()
@router.websocket("/ws/jobs/{job_id}")
async def websocket_job_status(
websocket: WebSocket,
job_id: str,
token: Optional[str] = Query(None),
manager: ConnectionManager = Depends(get_connection_manager)
):
"""
WebSocket endpoint for real-time job status updates
Usage:
- Connect: ws://localhost:8000/api/v1/ws/jobs/{job_id}?token={jwt_token}
- Receives: Real-time status updates for the specific job
Message format:
{
"type": "job_status_update",
"data": {
"job_id": "...",
"status": "processing",
"updated_at": "2023-...",
"message": "Processing video...",
"progress": 45
}
}
"""
# Authenticate the WebSocket connection
user_id = await authenticate_websocket(websocket, token)
if not user_id:
return
try:
# Verify user has access to this job
db = await get_database()
jobs_collection = db["jobs"]
job = await jobs_collection.find_one({"_id": job_id})
if not job:
await websocket.close(code=4004, reason="Job not found")
return
# Check permissions - users can only access their own jobs unless they're admin/reviewer
user = await db["users"].find_one({"_id": user_id})
if not user:
try:
from bson import ObjectId
user = await db["users"].find_one({"_id": ObjectId(user_id)})
except Exception:
pass # Invalid ObjectId format
if not user:
await websocket.close(code=4001, reason="User not found")
return
# Check access permissions
if user["role"] == "client" and job.get("created_by") != user_id:
await websocket.close(code=4003, reason="Access denied")
return
# Connect to job status updates
await manager.connect_job_status(websocket, user_id, job_id)
# Keep connection alive and handle incoming messages
while True:
try:
# Wait for incoming WebSocket messages (for heartbeat, etc.)
message = await websocket.receive_text()
logger.debug(f"Received WebSocket message from user {user_id}: {message}")
# Handle heartbeat or other client messages if needed
if message == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
break
except Exception as e:
logger.error(f"Error in WebSocket message handling: {e}")
break
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"WebSocket job status error: {e}")
finally:
manager.disconnect(websocket, user_id)
@router.websocket("/ws/jobs")
async def websocket_job_list(
websocket: WebSocket,
token: Optional[str] = Query(None),
manager: ConnectionManager = Depends(get_connection_manager)
):
"""
WebSocket endpoint for real-time job list updates
Usage:
- Connect: ws://localhost:8000/api/v1/ws/jobs?token={jwt_token}
- Receives: Real-time status updates for all jobs the user can access
Message format:
{
"type": "job_list_update",
"data": {
"job_id": "...",
"status": "processing",
"updated_at": "2023-...",
"message": "Processing video...",
"progress": 45
}
}
"""
# Authenticate the WebSocket connection
user_id = await authenticate_websocket(websocket, token)
if not user_id:
return
try:
# Verify user exists
logger.info(f"WebSocket: Looking up user {user_id} in database")
db = await get_database()
# Try looking up user by string ID first, then by ObjectId
user = await db["users"].find_one({"_id": user_id})
if not user:
try:
from bson import ObjectId
user = await db["users"].find_one({"_id": ObjectId(user_id)})
except Exception:
pass # Invalid ObjectId format
if not user:
logger.warning(f"WebSocket: User {user_id} not found in database (tried both string and ObjectId)")
await websocket.close(code=4001, reason="User not found")
return
logger.info(f"WebSocket: User {user_id} found, role: {user.get('role', 'unknown')}")
logger.info(f"WebSocket: User {user_id} found, connecting to job list updates")
# Connect to job list updates
await manager.connect_job_list(websocket, user_id)
# Keep connection alive and handle incoming messages
while True:
try:
# Wait for incoming WebSocket messages
message = await websocket.receive_text()
logger.debug(f"Received WebSocket message from user {user_id}: {message}")
# Handle heartbeat or other client messages if needed
if message == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
break
except Exception as e:
logger.error(f"Error in WebSocket message handling: {e}")
break
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"WebSocket job list error: {e}")
finally:
manager.disconnect(websocket, user_id)
@router.get("/ws/status")
async def websocket_status():
"""
Get WebSocket connection status and statistics
Useful for debugging and monitoring
"""
stats = {
"active_connections": len(connection_manager.active_connections),
"job_subscriptions": len(connection_manager.job_subscriptions),
"global_subscriptions": len(connection_manager.global_subscriptions),
"redis_connected": connection_manager.redis_client is not None,
"subscriber_running": (
connection_manager.subscriber_task is not None and
not connection_manager.subscriber_task.done()
)
}
return stats

View file

@ -14,6 +14,8 @@ from .api.v1.routes_admin import router as admin_router
from .api.v1.routes_auth import router as auth_router
from .api.v1.routes_files import router as files_router
from .api.v1.routes_jobs import router as jobs_router
from .api.v1.routes_websockets import router as websockets_router
from .services.websocket import connection_manager
from .core.config import settings
from .core.secrets_config import initialize_config
from .core.database import close_mongo_connection, connect_to_mongo, create_indexes
@ -26,6 +28,7 @@ from .telemetry import (
instrument_fastapi_app,
setup_tracing
)
from .services.websocket import connection_manager
@asynccontextmanager
@ -71,6 +74,9 @@ async def lifespan(app: FastAPI):
await connect_to_redis()
# await create_indexes() # Temporarily disabled for debugging
# Start WebSocket connection manager
await connection_manager.start()
# Initialize middleware with Redis client
redis_client = get_redis_client()
if redis_client:
@ -83,6 +89,7 @@ async def lifespan(app: FastAPI):
yield
# Shutdown
await connection_manager.stop()
await close_mongo_connection()
await close_redis_connection()
@ -197,6 +204,34 @@ app.include_router(auth_router, prefix="/api/v1")
app.include_router(files_router, prefix="/api/v1")
app.include_router(jobs_router, prefix="/api/v1")
app.include_router(admin_router, prefix="/api/v1")
app.include_router(websockets_router, prefix="/api/v1")
@app.on_event("startup")
async def startup_event():
"""Initialize services on startup"""
logger.info("🚀 Starting up FastAPI application...")
# Start WebSocket connection manager
try:
await connection_manager.start()
logger.info("✅ WebSocket connection manager started successfully")
except Exception as e:
logger.error(f"❌ Failed to start WebSocket connection manager: {e}")
raise
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup services on shutdown"""
logger.info("🛑 Shutting down FastAPI application...")
# Stop WebSocket connection manager
try:
await connection_manager.stop()
logger.info("✅ WebSocket connection manager stopped successfully")
except Exception as e:
logger.error(f"❌ Error stopping WebSocket connection manager: {e}")
@app.get("/health")

View file

@ -0,0 +1,361 @@
"""
WebSocket Connection Manager for Real-time Job Status Updates
This module provides WebSocket support for broadcasting job status changes
in real-time to connected clients. It uses Redis pub/sub for scalable
message broadcasting across multiple worker processes.
"""
import asyncio
import json
import logging
from typing import Dict, List, Set, Optional, Any
from datetime import datetime
from fastapi import WebSocket, WebSocketDisconnect
import redis.asyncio as redis
import redis as sync_redis
from pydantic import BaseModel
from ..core.redis import get_redis_client
from ..core.security import decode_token
from ..core.config import settings
logger = logging.getLogger(__name__)
class JobStatusUpdate(BaseModel):
"""Schema for job status update messages"""
job_id: str
status: str
updated_at: datetime
message: Optional[str] = None
progress: Optional[int] = None # 0-100 percentage
metadata: Optional[Dict[str, Any]] = None
class ConnectionManager:
"""Manages WebSocket connections and Redis pub/sub for job status updates"""
def __init__(self):
# Active WebSocket connections by user_id
self.active_connections: Dict[str, Set[WebSocket]] = {}
# Job subscriptions: job_id -> set of user_ids
self.job_subscriptions: Dict[str, Set[str]] = {}
# Global job list subscriptions by user_id
self.global_subscriptions: Set[str] = set()
# Redis client for pub/sub
self.redis_client: Optional[redis.Redis] = None
self.pubsub: Optional[redis.client.PubSub] = None
self.subscriber_task: Optional[asyncio.Task] = None
async def start(self):
"""Initialize Redis pub/sub subscriber"""
try:
self.redis_client = await redis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=True
)
self.pubsub = self.redis_client.pubsub()
# Subscribe to job status channels
await self.pubsub.subscribe("job_status_updates") # Global channel
await self.pubsub.psubscribe("job_status_updates:*") # Pattern for individual job channels
# Start background task to handle Redis messages
self.subscriber_task = asyncio.create_task(self._redis_subscriber())
logger.info("WebSocket connection manager started")
except Exception as e:
logger.error(f"Failed to start WebSocket connection manager: {e}")
raise
async def stop(self):
"""Cleanup Redis connections"""
if self.subscriber_task:
self.subscriber_task.cancel()
try:
await self.subscriber_task
except asyncio.CancelledError:
pass
if self.pubsub:
await self.pubsub.unsubscribe()
await self.pubsub.punsubscribe()
await self.pubsub.aclose()
if self.redis_client:
await self.redis_client.aclose()
logger.info("WebSocket connection manager stopped")
async def connect_job_status(self, websocket: WebSocket, user_id: str, job_id: str):
"""Connect a WebSocket for specific job status updates"""
await websocket.accept()
# Add connection to active connections
if user_id not in self.active_connections:
self.active_connections[user_id] = set()
self.active_connections[user_id].add(websocket)
# Add job subscription
if job_id not in self.job_subscriptions:
self.job_subscriptions[job_id] = set()
self.job_subscriptions[job_id].add(user_id)
logger.info(f"User {user_id} connected for job {job_id} status updates")
# Send initial connection confirmation
await self._send_to_websocket(websocket, {
"type": "connection_established",
"job_id": job_id,
"timestamp": datetime.utcnow().isoformat()
})
async def connect_job_list(self, websocket: WebSocket, user_id: str):
"""Connect a WebSocket for job list updates (all jobs for a user)"""
await websocket.accept()
# Add connection to active connections
if user_id not in self.active_connections:
self.active_connections[user_id] = set()
self.active_connections[user_id].add(websocket)
# Add to global subscriptions
self.global_subscriptions.add(user_id)
logger.info(f"User {user_id} connected for job list updates")
# Send initial connection confirmation
await self._send_to_websocket(websocket, {
"type": "connection_established",
"scope": "job_list",
"timestamp": datetime.utcnow().isoformat()
})
def disconnect(self, websocket: WebSocket, user_id: str):
"""Disconnect a WebSocket and clean up subscriptions"""
# Remove from active connections
if user_id in self.active_connections:
self.active_connections[user_id].discard(websocket)
if not self.active_connections[user_id]:
del self.active_connections[user_id]
# Remove from global subscriptions if no connections left
if user_id not in self.active_connections:
self.global_subscriptions.discard(user_id)
# Remove from job subscriptions
for job_id in list(self.job_subscriptions.keys()):
self.job_subscriptions[job_id].discard(user_id)
if not self.job_subscriptions[job_id]:
del self.job_subscriptions[job_id]
logger.info(f"User {user_id} disconnected from WebSocket")
async def broadcast_job_status_update(
self,
job_id: str,
status: str,
user_id: Optional[str] = None,
message: Optional[str] = None,
progress: Optional[int] = None,
metadata: Optional[Dict[str, Any]] = None
):
"""
Broadcast job status update to Redis pub/sub
This will be called from Celery workers
"""
update = JobStatusUpdate(
job_id=job_id,
status=status,
updated_at=datetime.utcnow(),
message=message,
progress=progress,
metadata=metadata
)
try:
# Create a synchronous Redis client for Celery workers
redis_client = sync_redis.Redis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=True
)
# Publish to global channel
redis_client.publish(
"job_status_updates",
update.model_dump_json()
)
# Publish to specific job channel
redis_client.publish(
f"job_status_updates:{job_id}",
update.model_dump_json()
)
# Close the connection
redis_client.close()
logger.debug(f"Broadcasted status update for job {job_id}: {status}")
except Exception as e:
logger.error(f"Failed to broadcast job status update: {e}")
async def _redis_subscriber(self):
"""Background task to handle Redis pub/sub messages"""
try:
async for message in self.pubsub.listen():
# Handle both regular messages and pattern messages
if message["type"] in ("message", "pmessage"):
await self._handle_redis_message(message)
except asyncio.CancelledError:
logger.info("Redis subscriber task cancelled")
except Exception as e:
logger.error(f"Redis subscriber error: {e}")
async def _handle_redis_message(self, message: Dict[str, Any]):
"""Handle incoming Redis pub/sub message"""
try:
# For pattern messages, the channel is in the "channel" field
# For regular messages, it's also in the "channel" field
channel = message["channel"]
data = json.loads(message["data"])
update = JobStatusUpdate(**data)
logger.debug(f"Received Redis message on channel '{channel}': {data}")
# Send to specific job subscribers
if channel.startswith("job_status_updates:"):
job_id = channel.split(":", 1)[1]
logger.debug(f"Sending job status update for job {job_id} to subscribers")
await self._send_job_status_to_subscribers(job_id, update)
# Send to global subscribers (job list updates)
elif channel == "job_status_updates":
logger.debug(f"Sending global job status update to subscribers")
await self._send_job_status_to_global_subscribers(update)
except Exception as e:
logger.error(f"Failed to handle Redis message: {e}")
async def _send_job_status_to_subscribers(self, job_id: str, update: JobStatusUpdate):
"""Send job status update to specific job subscribers"""
if job_id not in self.job_subscriptions:
return
# Convert to JSON-serializable dict
message = {
"type": "job_status_update",
"data": json.loads(update.model_dump_json())
}
for user_id in list(self.job_subscriptions[job_id]):
await self._send_to_user(user_id, message)
async def _send_job_status_to_global_subscribers(self, update: JobStatusUpdate):
"""Send job status update to global (job list) subscribers"""
# Convert to JSON-serializable dict
message = {
"type": "job_list_update",
"data": json.loads(update.model_dump_json())
}
for user_id in list(self.global_subscriptions):
await self._send_to_user(user_id, message)
async def _send_to_user(self, user_id: str, message: Dict[str, Any]):
"""Send message to all WebSocket connections for a user"""
if user_id not in self.active_connections:
return
# Send to all connections for this user
disconnected_connections = set()
for websocket in list(self.active_connections[user_id]):
try:
await self._send_to_websocket(websocket, message)
except Exception as e:
logger.warning(f"Failed to send to websocket for user {user_id}: {e}")
disconnected_connections.add(websocket)
# Clean up disconnected connections
for websocket in disconnected_connections:
self.disconnect(websocket, user_id)
async def _send_to_websocket(self, websocket: WebSocket, message: Dict[str, Any]):
"""Send message to a specific WebSocket connection"""
try:
await websocket.send_json(message)
except Exception as e:
logger.warning(f"WebSocket send failed: {e}")
raise
# Global connection manager instance
connection_manager = ConnectionManager()
async def authenticate_websocket(websocket: WebSocket, token: str) -> Optional[str]:
"""
Authenticate WebSocket connection using JWT token
Returns user_id if valid, None if invalid
"""
try:
if not token:
await websocket.close(code=4001, reason="Missing authentication token")
return None
# Decode JWT token
payload = decode_token(token)
if not payload or "sub" not in payload:
await websocket.close(code=4001, reason="Invalid authentication token")
return None
return payload["sub"] # user_id
except Exception as e:
logger.warning(f"WebSocket authentication failed: {e}")
await websocket.close(code=4001, reason="Authentication failed")
return None
async def authenticate_websocket(websocket: WebSocket, token: Optional[str]) -> Optional[str]:
"""
Authenticate a WebSocket connection using a JWT token
Returns user_id if valid, None if invalid
"""
try:
if not token:
logger.warning("WebSocket authentication failed: Missing token")
await websocket.close(code=4001, reason="Missing authentication token")
return None
# Import JWT decode function
from ..core.security import decode_token
# Decode JWT token - this may raise HTTPException
try:
payload = decode_token(token)
if not payload or "sub" not in payload:
logger.warning("WebSocket authentication failed: Invalid token payload")
await websocket.close(code=4001, reason="Invalid authentication token")
return None
user_id = payload["sub"]
logger.info(f"WebSocket authentication successful for user: {user_id}")
return user_id
except Exception as jwt_error:
logger.warning(f"WebSocket authentication failed: JWT decode error: {jwt_error}")
await websocket.close(code=4001, reason="Invalid authentication token")
return None
except Exception as e:
logger.error(f"WebSocket authentication failed with unexpected error: {e}")
await websocket.close(code=4001, reason="Authentication failed")
return None
async def get_connection_manager() -> ConnectionManager:
"""Dependency to get the connection manager"""
return connection_manager

View file

@ -12,11 +12,64 @@ from ..core.logging import get_logger
from ..models.job import JobStatus
from ..services.gcs import gcs_service, upload_vtt_to_gcs
from ..services.gemini import gemini_service
from ..services.websocket import connection_manager
from . import celery_app
logger = get_logger(__name__)
def broadcast_status_update(job_id: str, status: str, message: str = None, progress: int = None):
"""
Helper function to broadcast job status updates via WebSocket
Uses sync Redis client for Celery worker context
"""
logger.info(f"🔊 ATTEMPTING TO BROADCAST: job_id={job_id}, status={status}, message={message}")
try:
import redis as sync_redis
from ..core.config import settings
from ..services.websocket import JobStatusUpdate
from datetime import datetime
logger.info(f"🔊 About to create JobStatusUpdate for job {job_id}")
# Create status update
update = JobStatusUpdate(
job_id=job_id,
status=status,
updated_at=datetime.utcnow(),
message=message,
progress=progress
)
logger.info(f"🔊 Created update object, now connecting to Redis: {settings.redis_url}")
# Create synchronous Redis client
redis_client = sync_redis.Redis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=True
)
logger.info(f"🔊 Redis client created, now publishing to channels")
# Publish to channels
result1 = redis_client.publish("job_status_updates", update.model_dump_json())
result2 = redis_client.publish(f"job_status_updates:{job_id}", update.model_dump_json())
logger.info(f"🔊 Published to channels - general: {result1} subscribers, job-specific: {result2} subscribers")
# Close connection
redis_client.close()
logger.info(f"🔊 ✅ Successfully broadcasted status update for job {job_id}: {status}")
except Exception as e:
logger.error(f"🔊 ❌ Failed to broadcast status update for job {job_id}: {e}")
import traceback
logger.error(f"🔊 ❌ Full traceback: {traceback.format_exc()}")
# Don't let WebSocket failures break the worker task
class AsyncTask(Task):
"""Base task class that supports async execution"""
def __call__(self, *args, **kwargs):
@ -79,6 +132,14 @@ async def ingest_and_ai_task_impl(job_id: str):
}
}
)
# Broadcast status update
broadcast_status_update(
job_id,
JobStatus.INGESTING.value,
"Starting video ingestion and processing",
progress=10
)
# Get job details
job_doc = await db.jobs.find_one({"_id": job_id})
@ -113,6 +174,14 @@ async def ingest_and_ai_task_impl(job_id: str):
}
}
)
# Broadcast status update
broadcast_status_update(
job_id,
JobStatus.AI_PROCESSING.value,
"Processing video with AI for accessibility features",
progress=50
)
# Probe video for metadata
duration = await _get_video_duration(temp_path)
@ -171,6 +240,14 @@ async def ingest_and_ai_task_impl(job_id: str):
}
}
)
# Broadcast status update
broadcast_status_update(
job_id,
JobStatus.PENDING_QC.value,
"AI processing complete - ready for quality review",
progress=100
)
logger.info(f"Successfully completed ingestion and AI processing for job {job_id}")

View file

@ -1,12 +1,14 @@
import asyncio
from datetime import datetime
from bson import ObjectId
from celery import Task
from celery.exceptions import Retry
from motor.motor_asyncio import AsyncIOMotorClient
from ..core.config import settings
from ..core.logging import get_logger
from ..models.audit_log import AuditLogCreate
from ..models.audit_log import AuditLogCreate, AuditAction
from ..services.emailer import email_service
from ..services.gcs import get_signed_download_url
from . import celery_app
@ -14,8 +16,8 @@ from . import celery_app
logger = get_logger(__name__)
class AsyncTask(Task):
"""Base task class that supports async execution"""
class NotifyClientTask(Task):
"""Async task for client notifications"""
def __call__(self, *args, **kwargs):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
@ -24,119 +26,183 @@ class AsyncTask(Task):
finally:
loop.close()
async def run_async(self, *args, **kwargs):
raise NotImplementedError
async def run_async(self, job_id: str):
"""
Pipeline 3: Client Notification
Triggered when job status changes to 'completed'
"""
logger.info(f"Starting client notification for job {job_id}")
# Connect to MongoDB
client = AsyncIOMotorClient(settings.mongodb_uri)
db = client[settings.mongodb_db]
@celery_app.task(bind=True, base=AsyncTask)
async def notify_client_task(self, job_id: str):
"""
Pipeline 3: Client Notification
Triggered when job status changes to 'completed'
"""
logger.info(f"Starting client notification for job {job_id}")
try:
# Get job and client details
job_doc = await db.jobs.find_one({"_id": job_id})
if not job_doc:
logger.error(f"Job {job_id} not found in database")
return # Don't retry for missing jobs
# Connect to MongoDB
client = AsyncIOMotorClient(settings.mongodb_uri)
db = client[settings.mongodb_db]
if job_doc["status"] != "completed":
logger.warning(f"Job {job_id} not in completed status (current: {job_doc['status']}), skipping notification")
return
try:
# Get job and client details
job_doc = await db.jobs.find_one({"_id": job_id})
if not job_doc:
raise ValueError(f"Job {job_id} not found")
if job_doc["status"] != "completed":
logger.warning(f"Job {job_id} not in completed status, skipping notification")
return
# Get client email
client_doc = await db.users.find_one({"_id": job_doc["client_id"]})
if not client_doc:
raise ValueError(f"Client {job_doc['client_id']} not found")
# Generate signed URLs for all outputs
download_links = {}
outputs = job_doc.get("outputs", {})
for language, lang_output in outputs.items():
if not isinstance(lang_output, dict):
continue
lang_downloads = {}
# Captions VTT
if "captions_vtt_gcs" in lang_output:
blob_path = lang_output["captions_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
# Get client ID and ensure proper ObjectId format
client_id = job_doc["client_id"]
logger.info(f"Looking up client {client_id} for job {job_id}")
# Try looking up client by string ID first
client_doc = await db.users.find_one({"_id": client_id})
if not client_doc:
# Try as ObjectId if string lookup failed
try:
signed_url = await get_signed_download_url(blob_path, 24)
lang_downloads["captions_vtt"] = signed_url
except Exception as e:
logger.warning(f"Failed to generate signed URL for captions {language}: {e}")
client_doc = await db.users.find_one({"_id": ObjectId(client_id)})
except:
pass # Invalid ObjectId format
if not client_doc:
logger.error(f"Client {client_id} not found in database for job {job_id}")
# Don't retry for missing users - this is likely a data issue
return
# Audio Description VTT
if "ad_vtt_gcs" in lang_output:
blob_path = lang_output["ad_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
# Generate signed URLs for all outputs
download_links = {}
outputs = job_doc.get("outputs", {})
for language, lang_output in outputs.items():
if not isinstance(lang_output, dict):
continue
lang_downloads = {}
# Captions VTT
if "captions_vtt_gcs" in lang_output:
blob_path = lang_output["captions_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
try:
signed_url = await get_signed_download_url(blob_path, 24)
lang_downloads["captions_vtt"] = signed_url
except Exception as e:
logger.warning(f"Failed to generate signed URL for captions {language}: {e}")
# Audio Description VTT
if "ad_vtt_gcs" in lang_output:
blob_path = lang_output["ad_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
try:
signed_url = await get_signed_download_url(blob_path, 24)
lang_downloads["audio_description_vtt"] = signed_url
except Exception as e:
logger.warning(f"Failed to generate signed URL for AD VTT {language}: {e}")
# Audio Description MP3
if "ad_mp3_gcs" in lang_output:
blob_path = lang_output["ad_mp3_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
try:
signed_url = await get_signed_download_url(blob_path, 24)
lang_downloads["audio_description_mp3"] = signed_url
except Exception as e:
logger.warning(f"Failed to generate signed URL for AD MP3 {language}: {e}")
if lang_downloads:
download_links[language] = lang_downloads
# Send completion email (temporarily disabled)
# TODO: Re-enable emails once authentication is configured
email_enabled = False # Set to True to re-enable emails
if email_enabled:
try:
signed_url = await get_signed_download_url(blob_path, 24)
lang_downloads["audio_description_vtt"] = signed_url
except Exception as e:
logger.warning(f"Failed to generate signed URL for AD VTT {language}: {e}")
success = await email_service.send_completion_email(
recipient_email=client_doc["email"],
job_title=job_doc["title"],
download_links=download_links
)
# Audio Description MP3
if "ad_mp3_gcs" in lang_output:
blob_path = lang_output["ad_mp3_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
try:
signed_url = await get_signed_download_url(blob_path, 24)
lang_downloads["audio_description_mp3"] = signed_url
except Exception as e:
logger.warning(f"Failed to generate signed URL for AD MP3 {language}: {e}")
if success:
logger.info(f"Successfully sent completion email to {client_doc['email']} for job {job_id}")
else:
logger.warning(f"Email service returned failure for job {job_id} - treating as non-retryable")
except Exception as email_error:
error_msg = str(email_error)
logger.error(f"Email sending exception for job {job_id}: {error_msg}")
# Check if this is an authentication error (non-retryable)
if "401" in error_msg or "Unauthorized" in error_msg or "authentication" in error_msg.lower():
logger.warning(f"Email authentication failed for job {job_id} - treating as non-retryable configuration error")
else:
# Other email errors might be transient
raise ValueError(f"Email sending failed: {error_msg}")
else:
logger.info(f"Email notifications are currently disabled - skipping email for job {job_id}")
logger.info(f"Would have sent completion email to {client_doc['email']} with {sum(len(files) for files in download_links.values())} download links")
if lang_downloads:
download_links[language] = lang_downloads
# Send completion email
success = await email_service.send_completion_email(
recipient_email=client_doc["email"],
job_title=job_doc["title"],
download_links=download_links
)
if success:
# Log audit entry
# Log audit entry (regardless of email status)
audit_log = AuditLogCreate(
job_id=job_id,
action="client_notified",
action=AuditAction.JOB_STATUS_CHANGE,
description=f"Job {job_id} completed - client notification processed",
resource_type="job",
resource_id=job_id,
resource_name=job_doc["title"],
details={
"email": client_doc["email"],
"download_count": sum(len(files) for files in download_links.values())
"download_count": sum(len(files) for files in download_links.values()),
"email_sent": email_enabled,
"status": "completed"
}
)
await db.audit_logs.insert_one(audit_log.dict())
await db.audit_logs.insert_one(audit_log.model_dump())
logger.info(f"Successfully notified client for job {job_id}")
else:
raise ValueError("Failed to send completion email")
logger.info(f"Successfully completed notification processing for job {job_id}")
except Exception as e:
logger.error(f"Client notification failed for job {job_id}: {e}")
except Exception as e:
error_msg = str(e)
logger.error(f"Client notification failed for job {job_id}: {error_msg}")
# Update job with error
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"error": {
"type": "notification_failure",
"message": str(e),
"timestamp": datetime.utcnow().isoformat()
},
"updated_at": datetime.utcnow()
}
}
)
# Update job with error
try:
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"error": {
"type": "notification_failure",
"message": error_msg,
"timestamp": datetime.utcnow().isoformat()
},
"updated_at": datetime.utcnow()
}
}
)
except Exception as update_error:
logger.error(f"Failed to update job {job_id} with error: {update_error}")
raise
# Only retry for transient errors, not configuration or data errors
non_retryable_patterns = [
"not found",
"401",
"unauthorized",
"authentication",
"failed to send completion email"
]
should_not_retry = any(pattern in error_msg.lower() for pattern in non_retryable_patterns)
if should_not_retry:
logger.info(f"Skipping retry for job {job_id} due to non-retryable error: {error_msg}")
return
else:
# This might be a transient error, let it retry
logger.info(f"Allowing retry for job {job_id} due to potentially transient error: {error_msg}")
raise
finally:
client.close()
finally:
client.close()
# Register the task with manual retry control
@celery_app.task(bind=True, base=NotifyClientTask, max_retries=3, default_retry_delay=60)
def notify_client_task(self, job_id: str):
"""Celery task wrapper for client notification"""
# This method is called by NotifyClientTask.__call__
pass

View file

@ -14,11 +14,63 @@ from ..services.gcs import gcs_service, upload_vtt_to_gcs
from ..services.gemini import gemini_service
from ..services.translate import translate_service
from ..services.tts import tts_service
from ..services.websocket import connection_manager
from . import celery_app
logger = get_logger(__name__)
def broadcast_status_update(job_id: str, status: str, message: str = None, progress: int = None):
"""
Helper function to broadcast job status updates via WebSocket
Uses sync Redis client for Celery worker context
"""
logger.info(f"🔊 ATTEMPTING TO BROADCAST: job_id={job_id}, status={status}, message={message}")
try:
import redis as sync_redis
from ..core.config import settings
from ..services.websocket import JobStatusUpdate
from datetime import datetime
logger.info(f"🔊 About to create JobStatusUpdate for job {job_id}")
# Create status update
update = JobStatusUpdate(
job_id=job_id,
status=status,
updated_at=datetime.utcnow(),
message=message,
progress=progress
)
logger.info(f"🔊 Created update object, now connecting to Redis: {settings.redis_url}")
# Create synchronous Redis client
redis_client = sync_redis.Redis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=True
)
logger.info(f"🔊 Redis client created, now publishing to channels")
# Publish to channels
result1 = redis_client.publish("job_status_updates", update.model_dump_json())
result2 = redis_client.publish(f"job_status_updates:{job_id}", update.model_dump_json())
logger.info(f"🔊 Published to channels - general: {result1} subscribers, job-specific: {result2} subscribers")
# Close connection
redis_client.close()
logger.info(f"🔊 ✅ Successfully broadcasted status update for job {job_id}: {status}")
except Exception as e:
logger.error(f"🔊 ❌ Failed to broadcast status update for job {job_id}: {e}")
import traceback
logger.error(f"🔊 ❌ Full traceback: {traceback.format_exc()}")
async def retry_with_backoff(func, max_retries=3, base_delay=1):
"""Retry a function with exponential backoff"""
last_exception = None
@ -104,6 +156,14 @@ async def _async_translate_and_synthesize(job_id: str):
}
}
)
# Broadcast status update
broadcast_status_update(
job_id,
JobStatus.TRANSLATING.value,
"Starting translation and transcreation process",
progress=10
)
# Get English VTT content
en_outputs = job_doc["outputs"]["en"]
@ -203,6 +263,14 @@ async def _async_translate_and_synthesize(job_id: str):
}
}
)
# Broadcast status update
broadcast_status_update(
job_id,
JobStatus.TTS_GENERATING.value,
"Generating audio descriptions with text-to-speech",
progress=70
)
# Generate TTS for languages that need MP3
if job_doc["requested_outputs"]["audio_description_mp3"]:
@ -225,6 +293,14 @@ async def _async_translate_and_synthesize(job_id: str):
}
}
)
# Broadcast status update
broadcast_status_update(
job_id,
JobStatus.PENDING_FINAL_REVIEW.value,
"Translation and TTS complete - ready for final review",
progress=100
)
logger.info(f"Successfully completed translation and synthesis for job {job_id}")

View file

@ -0,0 +1,74 @@
/**
* Optional component to handle WebSocket connection status toasts
* This can be used in components that want to show connection status notifications
*/
import { useCallback } from 'react';
import { useToastContext } from '../contexts/ToastContext';
import type { ConnectionStatus } from '../hooks/useJobStatusWebSocket';
export interface WebSocketToastHandlerProps {
/** Whether to show connection toasts */
enabled?: boolean;
/** Custom connection status messages */
messages?: {
connected?: string;
connecting?: string;
disconnected?: string;
error?: string;
};
}
/**
* Hook that returns a connection change handler for WebSocket toasts
*/
export function useWebSocketToastHandler(props: WebSocketToastHandlerProps = {}) {
const {
enabled = false,
messages = {}
} = props;
const toast = useToastContext();
const handleConnectionChange = useCallback((status: ConnectionStatus) => {
if (!enabled) return;
switch (status) {
case 'connected':
toast.success(messages.connected || 'Real-time updates connected');
break;
case 'connecting':
// Usually don't show toast for connecting to avoid spam
// toast.info(messages.connecting || 'Connecting to real-time updates...');
break;
case 'disconnected':
toast.warning(messages.disconnected || 'Real-time updates disconnected');
break;
case 'error':
toast.error(messages.error || 'Connection error - using cached data');
break;
}
}, [enabled, messages, toast]);
return handleConnectionChange;
}
/**
* Default connection status messages for different contexts
*/
export const CONNECTION_MESSAGES = {
jobList: {
connected: 'Job list real-time updates enabled',
disconnected: 'Job list updates disconnected',
error: 'Job list connection error'
},
jobDetail: {
connected: 'Job status real-time updates enabled',
disconnected: 'Job status updates disconnected',
error: 'Job status connection error'
}
} as const;

View file

@ -0,0 +1,410 @@
/**
* WebSocket hook for real-time job status updates
*
* Provides WebSocket connections for:
* 1. Individual job status updates: useJobStatusWebSocket(jobId)
* 2. Job list updates: useJobStatusWebSocket() without jobId
*/
import { useEffect, useState, useRef, useCallback } from 'react';
import { useQueryClient } from '@tanstack/react-query';
import { useAuthStore } from '../lib/auth';
import { apiClient } from '../lib/api';
export interface JobStatusUpdate {
job_id: string;
status: string;
updated_at: string;
message?: string;
progress?: number;
metadata?: Record<string, unknown>;
}
export interface WebSocketMessage {
type: 'connection_established' | 'job_status_update' | 'job_list_update';
data?: JobStatusUpdate;
job_id?: string;
scope?: string;
timestamp?: string;
}
export type ConnectionStatus = 'connecting' | 'connected' | 'disconnected' | 'error';
interface UseJobStatusWebSocketOptions {
/**
* Whether to automatically reconnect on connection loss
* @default true
*/
autoReconnect?: boolean;
/**
* Maximum number of reconnection attempts
* @default 5
*/
maxReconnectAttempts?: number;
/**
* Initial reconnection delay in milliseconds
* @default 1000
*/
reconnectDelay?: number;
/**
* Whether to log debug information
* @default false
*/
debug?: boolean;
/**
* Whether to show toast notifications for status updates
* @default true
*/
showToasts?: boolean;
/**
* Custom toast handler function for status updates
*/
onStatusUpdate?: (update: JobStatusUpdate) => void;
/**
* Toast handler for connection status changes
*/
onConnectionChange?: (status: ConnectionStatus) => void;
}
interface UseJobStatusWebSocketReturn {
/** Current connection status */
connectionStatus: ConnectionStatus;
/** Latest received message */
lastMessage: WebSocketMessage | null;
/** Latest job status update */
lastUpdate: JobStatusUpdate | null;
/** Manual reconnect function */
reconnect: () => void;
/** Disconnect function */
disconnect: () => void;
/** Send message (for heartbeat, etc.) */
sendMessage: (message: string) => void;
}
/**
* WebSocket hook for job status updates
*
* @param jobId - Optional job ID for specific job updates. If not provided, subscribes to all job updates
* @param options - Configuration options
*/
export function useJobStatusWebSocket(
jobId?: string,
options: UseJobStatusWebSocketOptions = {}
): UseJobStatusWebSocketReturn {
const {
autoReconnect = true,
maxReconnectAttempts = 5,
reconnectDelay = 1000,
debug = false,
showToasts = true,
onStatusUpdate,
onConnectionChange
} = options;
const queryClient = useQueryClient();
const { isAuthenticated } = useAuthStore();
// Get access token from API client instead of auth store
const accessToken = apiClient.getAccessToken();
const [connectionStatus, setConnectionStatus] = useState<ConnectionStatus>('disconnected');
const [lastMessage, setLastMessage] = useState<WebSocketMessage | null>(null);
const [lastUpdate, setLastUpdate] = useState<JobStatusUpdate | null>(null);
const wsRef = useRef<WebSocket | null>(null);
const reconnectAttemptsRef = useRef(0);
const reconnectTimeoutRef = useRef<NodeJS.Timeout | null>(null);
const heartbeatIntervalRef = useRef<NodeJS.Timeout | null>(null);
const mountedRef = useRef(true);
// Cache recent updates to prevent duplicates
const recentUpdatesRef = useRef<Map<string, { status: string; timestamp: number }>>(new Map());
const log = useCallback((...args: unknown[]) => {
if (debug) {
console.log('[WebSocket]', ...args);
}
}, [debug]);
const getWebSocketUrl = useCallback(() => {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const host = window.location.host;
const basePath = '/api/v1/ws/jobs';
const path = jobId ? `${basePath}/${jobId}` : basePath;
const token = encodeURIComponent(accessToken || '');
return `${protocol}//${host}${path}?token=${token}`;
}, [jobId, accessToken]);
const handleStatusUpdate = useCallback((update: JobStatusUpdate) => {
// Check for duplicate status updates within the last 5 seconds
const updateKey = `${update.job_id}:${update.status}`;
const now = Date.now();
const recent = recentUpdatesRef.current.get(updateKey);
if (recent && (now - recent.timestamp) < 5000) {
// Skip duplicate status update within 5 seconds
log('Skipping duplicate status update:', updateKey);
return;
}
// Store this update
recentUpdatesRef.current.set(updateKey, { status: update.status, timestamp: now });
// Clean up old entries (older than 30 seconds)
const cutoff = now - 30000;
for (const [key, value] of recentUpdatesRef.current.entries()) {
if (value.timestamp < cutoff) {
recentUpdatesRef.current.delete(key);
}
}
// Call custom handler if provided
if (onStatusUpdate) {
onStatusUpdate(update);
}
}, [onStatusUpdate, log]);
const handleConnectionChange = useCallback((status: ConnectionStatus) => {
// Call custom handler if provided
if (onConnectionChange) {
onConnectionChange(status);
}
}, [onConnectionChange]);
const updateQueryCache = useCallback((update: JobStatusUpdate) => {
// Update individual job cache if we have job_id
if (update.job_id) {
queryClient.setQueryData(['jobs', update.job_id], (oldData: unknown) => {
if (oldData) {
return {
...oldData,
status: update.status,
updated_at: update.updated_at
};
}
return oldData;
});
}
// Update job list cache
queryClient.setQueriesData({ queryKey: ['jobs'] }, (oldData: unknown) => {
// Type-safe handling of the jobs list data
const data = oldData as { jobs?: Array<{ id: string; status: string; updated_at: string; [key: string]: unknown }> };
if (!data?.jobs) return oldData;
const updatedJobs = data.jobs.map((job) => {
if (job.id === update.job_id) {
return {
...job,
status: update.status,
updated_at: update.updated_at
};
}
return job;
});
return {
...data,
jobs: updatedJobs
};
});
}, [queryClient]);
const handleMessage = useCallback((event: MessageEvent) => {
try {
// Handle plain text responses (like "pong" heartbeat responses)
if (typeof event.data === 'string' && event.data === 'pong') {
log('Received heartbeat response:', event.data);
return;
}
const message: WebSocketMessage = JSON.parse(event.data);
log('Received message:', message);
setLastMessage(message);
if (message.type === 'job_status_update' || message.type === 'job_list_update') {
if (message.data) {
setLastUpdate(message.data);
updateQueryCache(message.data);
handleStatusUpdate(message.data);
}
}
} catch (error) {
console.error('[WebSocket] Failed to parse WebSocket message:', error, 'Raw data:', event.data);
}
}, [log, updateQueryCache, handleStatusUpdate]);
const handleOpen = useCallback(() => {
console.log('[WebSocket] Connected successfully!');
log('WebSocket connected');
setConnectionStatus('connected');
handleConnectionChange('connected');
reconnectAttemptsRef.current = 0;
// Start heartbeat
heartbeatIntervalRef.current = setInterval(() => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send('ping');
}
}, 30000); // Ping every 30 seconds
}, [log, handleConnectionChange]);
const handleClose = useCallback((event: CloseEvent) => {
log('WebSocket closed:', event.code, event.reason);
setConnectionStatus('disconnected');
handleConnectionChange('disconnected');
// Clear heartbeat
if (heartbeatIntervalRef.current) {
clearInterval(heartbeatIntervalRef.current);
heartbeatIntervalRef.current = null;
}
// Attempt to reconnect if enabled and component is still mounted
if (
autoReconnect &&
mountedRef.current &&
reconnectAttemptsRef.current < maxReconnectAttempts &&
event.code !== 1000 // Don't reconnect on normal closure
) {
const delay = reconnectDelay * Math.pow(2, reconnectAttemptsRef.current);
log(`Reconnecting in ${delay}ms (attempt ${reconnectAttemptsRef.current + 1}/${maxReconnectAttempts})`);
reconnectTimeoutRef.current = setTimeout(() => {
if (mountedRef.current) {
reconnectAttemptsRef.current++;
connect();
}
}, delay);
}
}, [log, autoReconnect, maxReconnectAttempts, reconnectDelay, handleConnectionChange]);
const handleError = useCallback((error: Event) => {
console.error('WebSocket error:', error);
setConnectionStatus('error');
handleConnectionChange('error');
}, [handleConnectionChange]);
const connect = useCallback(() => {
if (!accessToken) {
log('No access token available, skipping WebSocket connection');
return;
}
if (wsRef.current?.readyState === WebSocket.CONNECTING ||
wsRef.current?.readyState === WebSocket.OPEN) {
log('WebSocket already connecting or connected');
return;
}
try {
setConnectionStatus('connecting');
handleConnectionChange('connecting');
const url = getWebSocketUrl();
console.log('[WebSocket] Attempting to connect to:', url.replace(/token=[^&]+/, 'token=***'));
log('Connecting to:', url.replace(/token=[^&]+/, 'token=***'));
wsRef.current = new WebSocket(url);
wsRef.current.addEventListener('open', handleOpen);
wsRef.current.addEventListener('message', handleMessage);
wsRef.current.addEventListener('close', handleClose);
wsRef.current.addEventListener('error', handleError);
} catch (error) {
console.error('[WebSocket] Failed to create WebSocket connection:', error);
setConnectionStatus('error');
handleConnectionChange('error');
}
}, [accessToken, getWebSocketUrl, handleOpen, handleMessage, handleClose, handleError, log, handleConnectionChange]);
const disconnect = useCallback(() => {
log('Manually disconnecting WebSocket');
// Clear reconnection timeout
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
reconnectTimeoutRef.current = null;
}
// Clear heartbeat
if (heartbeatIntervalRef.current) {
clearInterval(heartbeatIntervalRef.current);
heartbeatIntervalRef.current = null;
}
// Close WebSocket
if (wsRef.current) {
wsRef.current.removeEventListener('open', handleOpen);
wsRef.current.removeEventListener('message', handleMessage);
wsRef.current.removeEventListener('close', handleClose);
wsRef.current.removeEventListener('error', handleError);
if (wsRef.current.readyState === WebSocket.OPEN ||
wsRef.current.readyState === WebSocket.CONNECTING) {
wsRef.current.close(1000, 'Manual disconnect');
}
wsRef.current = null;
}
setConnectionStatus('disconnected');
handleConnectionChange('disconnected');
}, [log, handleOpen, handleMessage, handleClose, handleError, handleConnectionChange]);
const reconnect = useCallback(() => {
log('Manual reconnect requested');
disconnect();
reconnectAttemptsRef.current = 0;
setTimeout(connect, 100);
}, [log, disconnect]); // Exclude connect to prevent dependency loops
const sendMessage = useCallback((message: string) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(message);
log('Sent message:', message);
} else {
console.warn('WebSocket not connected, cannot send message');
}
}, [log]);
// Connect on mount and when dependencies change
useEffect(() => {
const currentToken = apiClient.getAccessToken();
if (currentToken && isAuthenticated) {
connect();
}
return () => {
mountedRef.current = false;
disconnect();
};
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [isAuthenticated, jobId]); // Reconnect when auth state or jobId changes
// Cleanup on unmount
useEffect(() => {
return () => {
mountedRef.current = false;
};
}, []);
return {
connectionStatus,
lastMessage,
lastUpdate,
reconnect,
disconnect,
sendMessage
};
}

View file

@ -85,6 +85,10 @@ class ApiClient {
this.accessToken = null;
}
getAccessToken(): string | null {
return this.accessToken;
}
// Auth endpoints
async login(credentials: LoginRequest): Promise<LoginResponse> {
const response = await this.client.post('/auth/login', credentials);

View file

@ -1,9 +1,13 @@
import { useState } from 'react';
import { useState, useCallback } from 'react';
import { useParams, Link } from 'react-router-dom';
import { formatDistanceToNow } from 'date-fns';
import { useJob, useJobDownloads, useJobVttContent } from '../../hooks/useJob';
import { useJobStatusWebSocket } from '../../hooks/useJobStatusWebSocket';
import { StatusBadge } from '../../components/StatusBadge';
import { VideoWithCaptions } from '../../components/VideoWithCaptions';
import { useToastContext } from '../../contexts/ToastContext';
import { getStatusMessageConfig } from '../../utils/jobStatusMessages';
import type { JobStatusUpdate } from '../../hooks/useJobStatusWebSocket';
const ProgressIndicator = ({ status }: { status: string }) => {
@ -58,10 +62,34 @@ const ProgressIndicator = ({ status }: { status: string }) => {
export function JobDetail() {
const { id } = useParams();
const [activeTab, setActiveTab] = useState<'overview' | 'video' | 'assets' | 'history'>('overview');
const toast = useToastContext();
const { data: job, isLoading, error } = useJob(id!);
const { data: downloads, isLoading: downloadsLoading, error: downloadsError } = useJobDownloads(id!);
const { data: englishVtt, isLoading: vttLoading, error: vttError } = useJobVttContent(id!, 'en');
const { data: downloads } = useJobDownloads(id!);
const { data: englishVtt } = useJobVttContent(id!, 'en');
// Handle job status updates with toast notifications
const handleStatusUpdate = useCallback((update: JobStatusUpdate) => {
// Use the current job title or fallback
const jobTitle = job?.title;
const { message, type, showToast } = getStatusMessageConfig(
update.status,
jobTitle,
update.message
);
if (showToast) {
toast[type](message);
}
}, [job?.title, toast]);
// WebSocket connection for real-time job status updates
const { connectionStatus } = useJobStatusWebSocket(id, {
debug: false,
autoReconnect: true,
onStatusUpdate: handleStatusUpdate
});
// Get video URL from downloads
@ -289,7 +317,18 @@ export function JobDetail() {
Created {formatDistanceToNow(new Date(job.created_at))} ago
</p>
</div>
<StatusBadge status={job.status} />
<div className="flex items-center gap-2">
<StatusBadge status={job.status} />
{connectionStatus === 'connected' && (
<span className="inline-block w-2 h-2 bg-green-400 rounded-full" title="Connected - receiving real-time updates" />
)}
{connectionStatus === 'connecting' && (
<span className="inline-block w-2 h-2 bg-yellow-400 rounded-full animate-pulse" title="Connecting..." />
)}
{connectionStatus === 'error' && (
<span className="inline-block w-2 h-2 bg-red-400 rounded-full" title="Connection error" />
)}
</div>
</div>
</div>

View file

@ -1,11 +1,14 @@
import { useState, useMemo, useEffect } from 'react';
import { useState, useMemo, useEffect, useCallback } from 'react';
import { Link, useSearchParams } from 'react-router-dom';
import { formatDistanceToNow } from 'date-fns';
import { useAuthStore } from '../../lib/auth';
import { useJobs, useBulkDeleteJobs } from '../../hooks/useJob';
import { useJobStatusWebSocket } from '../../hooks/useJobStatusWebSocket';
import { StatusBadge } from '../../components/StatusBadge';
import { useToastContext } from '../../contexts/ToastContext';
import { getStatusMessageConfig } from '../../utils/jobStatusMessages';
import type { Job } from '../../types/api';
import type { JobStatusUpdate } from '../../hooks/useJobStatusWebSocket';
const STATUS_OPTIONS = [
{ value: '', label: 'All Statuses' },
@ -28,7 +31,7 @@ const SORT_OPTIONS = [
export function JobsList() {
const { user } = useAuthStore();
const toast = useToastContext();
const [searchParams, setSearchParams] = useSearchParams();
const [searchParams] = useSearchParams();
const [searchTerm, setSearchTerm] = useState('');
const [statusFilter, setStatusFilter] = useState(searchParams.get('status') || '');
const [sortBy, setSortBy] = useState('created_at_desc');
@ -56,6 +59,30 @@ export function JobsList() {
const bulkDeleteMutation = useBulkDeleteJobs();
// Handle job status updates with toast notifications
const handleStatusUpdate = useCallback((update: JobStatusUpdate) => {
// Find the job to get its title
const job = jobsData?.jobs.find((j: Job) => j.id === update.job_id);
const jobTitle = job?.title;
const { message, type, showToast } = getStatusMessageConfig(
update.status,
jobTitle,
update.message
);
if (showToast) {
toast[type](message);
}
}, [jobsData?.jobs, toast]);
// WebSocket connection for real-time job status updates
const { connectionStatus } = useJobStatusWebSocket(undefined, {
debug: false,
autoReconnect: true,
onStatusUpdate: handleStatusUpdate
});
// Client-side filtering and sorting for search term
const filteredAndSortedJobs = useMemo(() => {
let jobs = jobsData?.jobs || [];
@ -177,12 +204,30 @@ export function JobsList() {
}
};
const getConnectionStatusIcon = () => {
switch (connectionStatus) {
case 'connected':
return <span className="inline-block w-2 h-2 bg-green-400 rounded-full" title="Connected - real-time updates enabled" />;
case 'connecting':
return <span className="inline-block w-2 h-2 bg-yellow-400 rounded-full animate-pulse" title="Connecting..." />;
case 'disconnected':
return <span className="inline-block w-2 h-2 bg-gray-400 rounded-full" title="Disconnected - using cached data" />;
case 'error':
return <span className="inline-block w-2 h-2 bg-red-400 rounded-full" title="Connection error" />;
default:
return null;
}
};
return (
<div className="container mx-auto px-4 py-8">
{/* Header */}
<div className="flex justify-between items-center mb-8">
<div>
<h1 className="text-3xl font-bold text-gray-900">All Jobs</h1>
<div className="flex items-center gap-3">
<h1 className="text-3xl font-bold text-gray-900">All Jobs</h1>
{getConnectionStatusIcon()}
</div>
<p className="text-gray-600 mt-1">
{user?.role === 'client' ? 'Your video processing jobs' : 'System-wide job management'}
</p>

View file

@ -0,0 +1,127 @@
/**
* Utility functions for generating user-friendly job status messages and toast notifications
*/
export interface StatusMessageConfig {
message: string;
type: 'success' | 'info' | 'warning' | 'error';
showToast: boolean;
}
/**
* Get user-friendly message and toast configuration for job status updates
*/
export function getStatusMessageConfig(
status: string,
jobTitle?: string,
customMessage?: string
): StatusMessageConfig {
const title = jobTitle ? `"${jobTitle}"` : 'Job';
const fallbackMessage = customMessage || '';
switch (status) {
case 'created':
return {
message: `${title} has been created and queued for processing`,
type: 'info',
showToast: true
};
case 'ingesting':
return {
message: `${title} is being ingested and prepared for AI processing`,
type: 'info',
showToast: true
};
case 'ai_processing':
return {
message: `${title} is being processed by AI to generate accessibility features`,
type: 'info',
showToast: true
};
case 'pending_qc':
return {
message: `${title} is ready for quality control review`,
type: 'info',
showToast: true
};
case 'approved_english':
return {
message: `${title} English content has been approved - starting translation`,
type: 'success',
showToast: true
};
case 'rejected':
return {
message: `${title} has been rejected and requires revision`,
type: 'warning',
showToast: true
};
case 'translating':
return {
message: `${title} is being translated and transcreated into requested languages`,
type: 'info',
showToast: true
};
case 'tts_generating':
return {
message: `${title} is generating audio descriptions with text-to-speech`,
type: 'info',
showToast: true
};
case 'pending_final_review':
return {
message: `${title} is ready for final review before completion`,
type: 'info',
showToast: true
};
case 'qc_feedback':
return {
message: `${title} final review has been rejected - requires changes`,
type: 'warning',
showToast: true
};
case 'completed':
return {
message: `${title} has been completed successfully! 🎉 All files are ready for download`,
type: 'success',
showToast: true
};
default:
return {
message: fallbackMessage || `${title} status updated to ${status}`,
type: 'info',
showToast: !!fallbackMessage
};
}
}
/**
* Get a shorter status message for progress updates
*/
export function getProgressMessage(status: string, progress?: number): string {
const progressText = progress !== undefined ? ` (${progress}%)` : '';
switch (status) {
case 'ingesting':
return `Ingesting video${progressText}`;
case 'ai_processing':
return `AI processing${progressText}`;
case 'translating':
return `Translating content${progressText}`;
case 'tts_generating':
return `Generating audio${progressText}`;
default:
return status.replace(/_/g, ' ');
}
}

View file

@ -5,6 +5,15 @@ import react from '@vitejs/plugin-react'
// https://vite.dev/config/
export default defineConfig({
plugins: [react()],
server: {
proxy: {
'/api': {
target: 'http://localhost:8000',
changeOrigin: true,
ws: true, // Enable WebSocket proxying
},
},
},
test: {
globals: true,
environment: 'jsdom',