video-accessibility/backend/app/api/v1/routes_websockets.py

214 lines
No EOL
7 KiB
Python

"""
WebSocket routes for real-time job status updates
Provides WebSocket endpoints for:
1. Individual job status updates: /ws/jobs/{job_id}
2. Job list updates: /ws/jobs (all jobs for authenticated user)
"""
import logging
from typing import Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, HTTPException, Depends, Query
from fastapi.security import HTTPBearer
from ...services.websocket import (
connection_manager,
authenticate_websocket,
get_connection_manager,
ConnectionManager
)
from ...models.job import Job
from ...core.database import get_database
from ...core.dependencies import get_current_user
logger = logging.getLogger(__name__)
router = APIRouter(tags=["WebSocket"])
security = HTTPBearer()
@router.websocket("/ws/jobs/{job_id}")
async def websocket_job_status(
websocket: WebSocket,
job_id: str,
token: Optional[str] = Query(None),
manager: ConnectionManager = Depends(get_connection_manager)
):
"""
WebSocket endpoint for real-time job status updates
Usage:
- Connect: ws://localhost:8000/api/v1/ws/jobs/{job_id}?token={jwt_token}
- Receives: Real-time status updates for the specific job
Message format:
{
"type": "job_status_update",
"data": {
"job_id": "...",
"status": "processing",
"updated_at": "2023-...",
"message": "Processing video...",
"progress": 45
}
}
"""
# Authenticate the WebSocket connection
user_id = await authenticate_websocket(websocket, token)
if not user_id:
return
try:
# Verify user has access to this job
db = await get_database()
jobs_collection = db["jobs"]
job = await jobs_collection.find_one({"_id": job_id})
if not job:
await websocket.close(code=4004, reason="Job not found")
return
# Check permissions - users can only access their own jobs unless they're admin/reviewer
user = await db["users"].find_one({"_id": user_id})
if not user:
try:
from bson import ObjectId
user = await db["users"].find_one({"_id": ObjectId(user_id)})
except Exception:
pass # Invalid ObjectId format
if not user:
await websocket.close(code=4001, reason="User not found")
return
# Check access permissions
if user["role"] == "client" and job.get("created_by") != user_id:
await websocket.close(code=4003, reason="Access denied")
return
# Connect to job status updates
await manager.connect_job_status(websocket, user_id, job_id)
# Keep connection alive and handle incoming messages
while True:
try:
# Wait for incoming WebSocket messages (for heartbeat, etc.)
message = await websocket.receive_text()
logger.debug(f"Received WebSocket message from user {user_id}: {message}")
# Handle heartbeat or other client messages if needed
if message == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
break
except Exception as e:
logger.error(f"Error in WebSocket message handling: {e}")
break
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"WebSocket job status error: {e}")
finally:
await manager.disconnect(websocket, user_id)
@router.websocket("/ws/jobs")
async def websocket_job_list(
websocket: WebSocket,
token: Optional[str] = Query(None),
manager: ConnectionManager = Depends(get_connection_manager)
):
"""
WebSocket endpoint for real-time job list updates
Usage:
- Connect: ws://localhost:8000/api/v1/ws/jobs?token={jwt_token}
- Receives: Real-time status updates for all jobs the user can access
Message format:
{
"type": "job_list_update",
"data": {
"job_id": "...",
"status": "processing",
"updated_at": "2023-...",
"message": "Processing video...",
"progress": 45
}
}
"""
# Authenticate the WebSocket connection
user_id = await authenticate_websocket(websocket, token)
if not user_id:
return
try:
# Verify user exists
logger.info(f"WebSocket: Looking up user {user_id} in database")
db = await get_database()
# Try looking up user by string ID first, then by ObjectId
user = await db["users"].find_one({"_id": user_id})
if not user:
try:
from bson import ObjectId
user = await db["users"].find_one({"_id": ObjectId(user_id)})
except Exception:
pass # Invalid ObjectId format
if not user:
logger.warning(f"WebSocket: User {user_id} not found in database (tried both string and ObjectId)")
await websocket.close(code=4001, reason="User not found")
return
logger.info(f"WebSocket: User {user_id} found, role: {user.get('role', 'unknown')}")
logger.info(f"WebSocket: User {user_id} found, connecting to job list updates")
# Connect to job list updates
await manager.connect_job_list(websocket, user_id)
# Keep connection alive and handle incoming messages
while True:
try:
# Wait for incoming WebSocket messages
message = await websocket.receive_text()
logger.debug(f"Received WebSocket message from user {user_id}: {message}")
# Handle heartbeat or other client messages if needed
if message == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
break
except Exception as e:
logger.error(f"Error in WebSocket message handling: {e}")
break
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"WebSocket job list error: {e}")
finally:
await manager.disconnect(websocket, user_id)
@router.get("/ws/status")
async def websocket_status():
"""
Get WebSocket connection status and statistics
Useful for debugging and monitoring
"""
stats = {
"active_connections": len(connection_manager.active_connections),
"job_subscriptions": len(connection_manager.job_subscriptions),
"global_subscriptions": len(connection_manager.global_subscriptions),
"redis_connected": connection_manager.redis_client is not None,
"subscriber_running": (
connection_manager.subscriber_task is not None and
not connection_manager.subscriber_task.done()
)
}
return stats