214 lines
No EOL
7 KiB
Python
214 lines
No EOL
7 KiB
Python
"""
|
|
WebSocket routes for real-time job status updates
|
|
|
|
Provides WebSocket endpoints for:
|
|
1. Individual job status updates: /ws/jobs/{job_id}
|
|
2. Job list updates: /ws/jobs (all jobs for authenticated user)
|
|
"""
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, HTTPException, Depends, Query
|
|
from fastapi.security import HTTPBearer
|
|
|
|
from ...services.websocket import (
|
|
connection_manager,
|
|
authenticate_websocket,
|
|
get_connection_manager,
|
|
ConnectionManager
|
|
)
|
|
from ...models.job import Job
|
|
from ...core.database import get_database
|
|
from ...core.dependencies import get_current_user
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(tags=["WebSocket"])
|
|
security = HTTPBearer()
|
|
|
|
|
|
@router.websocket("/ws/jobs/{job_id}")
|
|
async def websocket_job_status(
|
|
websocket: WebSocket,
|
|
job_id: str,
|
|
token: Optional[str] = Query(None),
|
|
manager: ConnectionManager = Depends(get_connection_manager)
|
|
):
|
|
"""
|
|
WebSocket endpoint for real-time job status updates
|
|
|
|
Usage:
|
|
- Connect: ws://localhost:8000/api/v1/ws/jobs/{job_id}?token={jwt_token}
|
|
- Receives: Real-time status updates for the specific job
|
|
|
|
Message format:
|
|
{
|
|
"type": "job_status_update",
|
|
"data": {
|
|
"job_id": "...",
|
|
"status": "processing",
|
|
"updated_at": "2023-...",
|
|
"message": "Processing video...",
|
|
"progress": 45
|
|
}
|
|
}
|
|
"""
|
|
# Authenticate the WebSocket connection
|
|
user_id = await authenticate_websocket(websocket, token)
|
|
if not user_id:
|
|
return
|
|
|
|
try:
|
|
# Verify user has access to this job
|
|
db = await get_database()
|
|
jobs_collection = db["jobs"]
|
|
|
|
job = await jobs_collection.find_one({"_id": job_id})
|
|
if not job:
|
|
await websocket.close(code=4004, reason="Job not found")
|
|
return
|
|
|
|
# Check permissions - users can only access their own jobs unless they're admin/reviewer
|
|
user = await db["users"].find_one({"_id": user_id})
|
|
if not user:
|
|
try:
|
|
from bson import ObjectId
|
|
user = await db["users"].find_one({"_id": ObjectId(user_id)})
|
|
except Exception:
|
|
pass # Invalid ObjectId format
|
|
|
|
if not user:
|
|
await websocket.close(code=4001, reason="User not found")
|
|
return
|
|
|
|
# Check access permissions
|
|
if user["role"] == "client" and job.get("created_by") != user_id:
|
|
await websocket.close(code=4003, reason="Access denied")
|
|
return
|
|
|
|
# Connect to job status updates
|
|
await manager.connect_job_status(websocket, user_id, job_id)
|
|
|
|
# Keep connection alive and handle incoming messages
|
|
while True:
|
|
try:
|
|
# Wait for incoming WebSocket messages (for heartbeat, etc.)
|
|
message = await websocket.receive_text()
|
|
logger.debug(f"Received WebSocket message from user {user_id}: {message}")
|
|
|
|
# Handle heartbeat or other client messages if needed
|
|
if message == "ping":
|
|
await websocket.send_text("pong")
|
|
|
|
except WebSocketDisconnect:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in WebSocket message handling: {e}")
|
|
break
|
|
|
|
except WebSocketDisconnect:
|
|
pass
|
|
except Exception as e:
|
|
logger.error(f"WebSocket job status error: {e}")
|
|
finally:
|
|
await manager.disconnect(websocket, user_id)
|
|
|
|
|
|
@router.websocket("/ws/jobs")
|
|
async def websocket_job_list(
|
|
websocket: WebSocket,
|
|
token: Optional[str] = Query(None),
|
|
manager: ConnectionManager = Depends(get_connection_manager)
|
|
):
|
|
"""
|
|
WebSocket endpoint for real-time job list updates
|
|
|
|
Usage:
|
|
- Connect: ws://localhost:8000/api/v1/ws/jobs?token={jwt_token}
|
|
- Receives: Real-time status updates for all jobs the user can access
|
|
|
|
Message format:
|
|
{
|
|
"type": "job_list_update",
|
|
"data": {
|
|
"job_id": "...",
|
|
"status": "processing",
|
|
"updated_at": "2023-...",
|
|
"message": "Processing video...",
|
|
"progress": 45
|
|
}
|
|
}
|
|
"""
|
|
# Authenticate the WebSocket connection
|
|
user_id = await authenticate_websocket(websocket, token)
|
|
if not user_id:
|
|
return
|
|
|
|
try:
|
|
# Verify user exists
|
|
logger.info(f"WebSocket: Looking up user {user_id} in database")
|
|
db = await get_database()
|
|
|
|
# Try looking up user by string ID first, then by ObjectId
|
|
user = await db["users"].find_one({"_id": user_id})
|
|
if not user:
|
|
try:
|
|
from bson import ObjectId
|
|
user = await db["users"].find_one({"_id": ObjectId(user_id)})
|
|
except Exception:
|
|
pass # Invalid ObjectId format
|
|
|
|
if not user:
|
|
logger.warning(f"WebSocket: User {user_id} not found in database (tried both string and ObjectId)")
|
|
await websocket.close(code=4001, reason="User not found")
|
|
return
|
|
|
|
logger.info(f"WebSocket: User {user_id} found, role: {user.get('role', 'unknown')}")
|
|
|
|
logger.info(f"WebSocket: User {user_id} found, connecting to job list updates")
|
|
# Connect to job list updates
|
|
await manager.connect_job_list(websocket, user_id)
|
|
|
|
# Keep connection alive and handle incoming messages
|
|
while True:
|
|
try:
|
|
# Wait for incoming WebSocket messages
|
|
message = await websocket.receive_text()
|
|
logger.debug(f"Received WebSocket message from user {user_id}: {message}")
|
|
|
|
# Handle heartbeat or other client messages if needed
|
|
if message == "ping":
|
|
await websocket.send_text("pong")
|
|
|
|
except WebSocketDisconnect:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in WebSocket message handling: {e}")
|
|
break
|
|
|
|
except WebSocketDisconnect:
|
|
pass
|
|
except Exception as e:
|
|
logger.error(f"WebSocket job list error: {e}")
|
|
finally:
|
|
await manager.disconnect(websocket, user_id)
|
|
|
|
|
|
@router.get("/ws/status")
|
|
async def websocket_status():
|
|
"""
|
|
Get WebSocket connection status and statistics
|
|
Useful for debugging and monitoring
|
|
"""
|
|
stats = {
|
|
"active_connections": len(connection_manager.active_connections),
|
|
"job_subscriptions": len(connection_manager.job_subscriptions),
|
|
"global_subscriptions": len(connection_manager.global_subscriptions),
|
|
"redis_connected": connection_manager.redis_client is not None,
|
|
"subscriber_running": (
|
|
connection_manager.subscriber_task is not None and
|
|
not connection_manager.subscriber_task.done()
|
|
)
|
|
}
|
|
|
|
return stats |