video-accessibility/frontend/src/hooks/useJobStatusWebSocket.ts
2025-10-10 12:22:26 -05:00

413 lines
No EOL
13 KiB
TypeScript

/**
* WebSocket hook for real-time job status updates
*
* Provides WebSocket connections for:
* 1. Individual job status updates: useJobStatusWebSocket(jobId)
* 2. Job list updates: useJobStatusWebSocket() without jobId
*/
import { useEffect, useState, useRef, useCallback } from 'react';
import { useQueryClient } from '@tanstack/react-query';
import { useAuthStore } from '../lib/auth';
import { apiClient } from '../lib/api';
export interface JobStatusUpdate {
job_id: string;
status: string;
updated_at: string;
job_title?: string;
message?: string;
progress?: number;
metadata?: Record<string, unknown>;
}
export interface WebSocketMessage {
type: 'connection_established' | 'job_status_update' | 'job_list_update';
data?: JobStatusUpdate;
job_id?: string;
scope?: string;
timestamp?: string;
}
export type ConnectionStatus = 'connecting' | 'connected' | 'disconnected' | 'error';
interface UseJobStatusWebSocketOptions {
/**
* Whether to automatically reconnect on connection loss
* @default true
*/
autoReconnect?: boolean;
/**
* Maximum number of reconnection attempts
* @default 5
*/
maxReconnectAttempts?: number;
/**
* Initial reconnection delay in milliseconds
* @default 1000
*/
reconnectDelay?: number;
/**
* Whether to log debug information
* @default false
*/
debug?: boolean;
/**
* Custom toast handler function for status updates
*/
onStatusUpdate?: (update: JobStatusUpdate) => void;
/**
* Toast handler for connection status changes
*/
onConnectionChange?: (status: ConnectionStatus) => void;
}
interface UseJobStatusWebSocketReturn {
/** Current connection status */
connectionStatus: ConnectionStatus;
/** Latest received message */
lastMessage: WebSocketMessage | null;
/** Latest job status update */
lastUpdate: JobStatusUpdate | null;
/** Manual reconnect function */
reconnect: () => void;
/** Disconnect function */
disconnect: () => void;
/** Send message (for heartbeat, etc.) */
sendMessage: (message: string) => void;
}
/**
* WebSocket hook for job status updates
*
* @param jobId - Optional job ID for specific job updates. If not provided, subscribes to all job updates
* @param options - Configuration options
*/
export function useJobStatusWebSocket(
jobId?: string,
options: UseJobStatusWebSocketOptions = {}
): UseJobStatusWebSocketReturn {
const {
autoReconnect = true,
maxReconnectAttempts = 5,
reconnectDelay = 1000,
debug = false,
onStatusUpdate,
onConnectionChange
} = options;
const queryClient = useQueryClient();
const { isAuthenticated } = useAuthStore();
// Get access token from API client instead of auth store
const accessToken = apiClient.getAccessToken();
const [connectionStatus, setConnectionStatus] = useState<ConnectionStatus>('disconnected');
const [lastMessage, setLastMessage] = useState<WebSocketMessage | null>(null);
const [lastUpdate, setLastUpdate] = useState<JobStatusUpdate | null>(null);
const wsRef = useRef<WebSocket | null>(null);
const reconnectAttemptsRef = useRef(0);
const reconnectTimeoutRef = useRef<NodeJS.Timeout | null>(null);
const heartbeatIntervalRef = useRef<NodeJS.Timeout | null>(null);
const mountedRef = useRef(true);
// Cache recent updates to prevent duplicates
const recentUpdatesRef = useRef<Map<string, { status: string; timestamp: number }>>(new Map());
const log = useCallback((...args: unknown[]) => {
if (debug) {
console.log('[WebSocket]', ...args);
}
}, [debug]);
const getWebSocketUrl = useCallback(() => {
// Get API base URL from environment
const apiBaseUrl = import.meta.env.VITE_API_BASE_URL || 'http://localhost:8000';
const apiUrl = new URL(apiBaseUrl);
// Use wss:// for https, ws:// for http
const protocol = apiUrl.protocol === 'https:' ? 'wss:' : 'ws:';
const host = apiUrl.host; // Use backend host, not window.location.host
// Include pathname from API base URL (e.g., /video-accessibility-back)
const apiPathname = apiUrl.pathname.replace(/\/$/, ''); // Remove trailing slash
const wsPath = `/api/v1/ws/jobs`;
const fullPath = jobId ? `${apiPathname}${wsPath}/${jobId}` : `${apiPathname}${wsPath}`;
const token = encodeURIComponent(accessToken || '');
return `${protocol}//${host}${fullPath}?token=${token}`;
}, [jobId, accessToken]);
const handleStatusUpdate = useCallback((update: JobStatusUpdate) => {
// Check for duplicate status updates within the last 5 seconds
const updateKey = `${update.job_id}:${update.status}`;
const now = Date.now();
const recent = recentUpdatesRef.current.get(updateKey);
if (recent && (now - recent.timestamp) < 5000) {
// Skip duplicate status update within 5 seconds
log('Skipping duplicate status update:', updateKey);
return;
}
// Store this update
recentUpdatesRef.current.set(updateKey, { status: update.status, timestamp: now });
// Clean up old entries (older than 30 seconds)
const cutoff = now - 30000;
for (const [key, value] of recentUpdatesRef.current.entries()) {
if (value.timestamp < cutoff) {
recentUpdatesRef.current.delete(key);
}
}
// Call custom handler if provided
if (onStatusUpdate) {
onStatusUpdate(update);
}
}, [onStatusUpdate, log]);
const handleConnectionChange = useCallback((status: ConnectionStatus) => {
// Call custom handler if provided
if (onConnectionChange) {
onConnectionChange(status);
}
}, [onConnectionChange]);
const updateQueryCache = useCallback((update: JobStatusUpdate) => {
// Update individual job cache if we have job_id
if (update.job_id) {
queryClient.setQueryData(['jobs', update.job_id], (oldData: unknown) => {
if (oldData) {
return {
...oldData,
status: update.status,
updated_at: update.updated_at
};
}
return oldData;
});
}
// Update job list cache
queryClient.setQueriesData({ queryKey: ['jobs'] }, (oldData: unknown) => {
// Type-safe handling of the jobs list data
const data = oldData as { jobs?: Array<{ id: string; status: string; updated_at: string; [key: string]: unknown }> };
if (!data?.jobs) return oldData;
const updatedJobs = data.jobs.map((job) => {
if (job.id === update.job_id) {
return {
...job,
status: update.status,
updated_at: update.updated_at
};
}
return job;
});
return {
...data,
jobs: updatedJobs
};
});
}, [queryClient]);
const handleMessage = useCallback((event: MessageEvent) => {
try {
// Handle plain text responses (like "pong" heartbeat responses)
if (typeof event.data === 'string' && event.data === 'pong') {
log('Received heartbeat response:', event.data);
return;
}
const message: WebSocketMessage = JSON.parse(event.data);
log('Received message:', message);
setLastMessage(message);
if (message.type === 'job_status_update' || message.type === 'job_list_update') {
if (message.data) {
setLastUpdate(message.data);
updateQueryCache(message.data);
handleStatusUpdate(message.data);
}
}
} catch (error) {
console.error('[WebSocket] Failed to parse WebSocket message:', error, 'Raw data:', event.data);
}
}, [log, updateQueryCache, handleStatusUpdate]);
const handleOpen = useCallback(() => {
console.log('[WebSocket] Connected successfully!');
log('WebSocket connected');
setConnectionStatus('connected');
handleConnectionChange('connected');
reconnectAttemptsRef.current = 0;
// Start heartbeat
heartbeatIntervalRef.current = setInterval(() => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send('ping');
}
}, 30000); // Ping every 30 seconds
}, [log, handleConnectionChange]);
const handleClose = useCallback((event: CloseEvent) => {
log('WebSocket closed:', event.code, event.reason);
setConnectionStatus('disconnected');
handleConnectionChange('disconnected');
// Clear heartbeat
if (heartbeatIntervalRef.current) {
clearInterval(heartbeatIntervalRef.current);
heartbeatIntervalRef.current = null;
}
// Attempt to reconnect if enabled and component is still mounted
if (
autoReconnect &&
mountedRef.current &&
reconnectAttemptsRef.current < maxReconnectAttempts &&
event.code !== 1000 // Don't reconnect on normal closure
) {
const delay = reconnectDelay * Math.pow(2, reconnectAttemptsRef.current);
log(`Reconnecting in ${delay}ms (attempt ${reconnectAttemptsRef.current + 1}/${maxReconnectAttempts})`);
reconnectTimeoutRef.current = setTimeout(() => {
if (mountedRef.current) {
reconnectAttemptsRef.current++;
connect();
}
}, delay);
}
}, [log, autoReconnect, maxReconnectAttempts, reconnectDelay, handleConnectionChange]);
const handleError = useCallback((error: Event) => {
console.error('WebSocket error:', error);
setConnectionStatus('error');
handleConnectionChange('error');
}, [handleConnectionChange]);
const connect = useCallback(() => {
if (!accessToken) {
log('No access token available, skipping WebSocket connection');
return;
}
if (wsRef.current?.readyState === WebSocket.CONNECTING ||
wsRef.current?.readyState === WebSocket.OPEN) {
log('WebSocket already connecting or connected');
return;
}
try {
setConnectionStatus('connecting');
handleConnectionChange('connecting');
const url = getWebSocketUrl();
console.log('[WebSocket] Attempting to connect to:', url.replace(/token=[^&]+/, 'token=***'));
log('Connecting to:', url.replace(/token=[^&]+/, 'token=***'));
wsRef.current = new WebSocket(url);
wsRef.current.addEventListener('open', handleOpen);
wsRef.current.addEventListener('message', handleMessage);
wsRef.current.addEventListener('close', handleClose);
wsRef.current.addEventListener('error', handleError);
} catch (error) {
console.error('[WebSocket] Failed to create WebSocket connection:', error);
setConnectionStatus('error');
handleConnectionChange('error');
}
}, [accessToken, getWebSocketUrl, handleOpen, handleMessage, handleClose, handleError, log, handleConnectionChange]);
const disconnect = useCallback(() => {
log('Manually disconnecting WebSocket');
// Clear reconnection timeout
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
reconnectTimeoutRef.current = null;
}
// Clear heartbeat
if (heartbeatIntervalRef.current) {
clearInterval(heartbeatIntervalRef.current);
heartbeatIntervalRef.current = null;
}
// Close WebSocket
if (wsRef.current) {
wsRef.current.removeEventListener('open', handleOpen);
wsRef.current.removeEventListener('message', handleMessage);
wsRef.current.removeEventListener('close', handleClose);
wsRef.current.removeEventListener('error', handleError);
if (wsRef.current.readyState === WebSocket.OPEN ||
wsRef.current.readyState === WebSocket.CONNECTING) {
wsRef.current.close(1000, 'Manual disconnect');
}
wsRef.current = null;
}
setConnectionStatus('disconnected');
handleConnectionChange('disconnected');
}, [log, handleOpen, handleMessage, handleClose, handleError, handleConnectionChange]);
const reconnect = useCallback(() => {
log('Manual reconnect requested');
disconnect();
reconnectAttemptsRef.current = 0;
setTimeout(connect, 100);
}, [log, disconnect]); // Exclude connect to prevent dependency loops
const sendMessage = useCallback((message: string) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(message);
log('Sent message:', message);
} else {
console.warn('WebSocket not connected, cannot send message');
}
}, [log]);
// Connect on mount and when dependencies change
useEffect(() => {
const currentToken = apiClient.getAccessToken();
if (currentToken && isAuthenticated) {
connect();
}
return () => {
mountedRef.current = false;
disconnect();
};
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [isAuthenticated, jobId]); // Reconnect when auth state or jobId changes
// Cleanup on unmount
useEffect(() => {
return () => {
mountedRef.current = false;
};
}, []);
return {
connectionStatus,
lastMessage,
lastUpdate,
reconnect,
disconnect,
sendMessage
};
}