ac-tool/backend/server/ws/manager.py
Vadym Samoilenko 72c50b2c92 Initial commit — AC Tool unified application
Merges ac-helper (PHP Activation Calendar) and brief-extractor (Python AI)
into a single Docker app with React/TypeScript frontend.

Features:
- Brief upload → AI extraction → review → Activation Calendar import
- Handsontable v17 spreadsheet with dependent dropdowns (148 categories)
- AI natural language commands via Gemini (YOLO mode, voice input)
- Azure AD MSAL SPA PKCE authentication, user roles (user/admin)
- CSV Activation Calendar export
- Real-time WebSocket job progress
- Admin: user management, dropdown Excel upload
- Multi-stage Dockerfile, docker-compose, nginx proxy instructions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 13:24:46 +00:00

300 lines
No EOL
9.6 KiB
Python
Executable file

"""
WebSocket connection and message management
"""
import asyncio
import json
import logging
from datetime import datetime
from typing import Dict, Set, Any, Optional
import uuid
from weakref import WeakSet
from quart import websocket
from ..config_runtime import server_config
logger = logging.getLogger(__name__)
class WebSocketClient:
"""Represents a connected WebSocket client"""
def __init__(self, client_id: str, user_id: Optional[str] = None):
self.client_id = client_id
self.user_id = user_id or 'anonymous'
self.connected_at = datetime.utcnow()
self.last_ping = datetime.utcnow()
self.websocket = websocket._get_current_object()
async def send(self, message: Dict[str, Any]):
"""Send a message to this client"""
try:
await self.websocket.send(json.dumps(message))
except Exception as e:
logger.warning(f"Failed to send message to client {self.client_id}: {e}")
raise
async def ping(self):
"""Send ping to client"""
try:
await self.send({'type': 'ping', 'timestamp': datetime.utcnow().isoformat()})
self.last_ping = datetime.utcnow()
except Exception as e:
logger.warning(f"Failed to ping client {self.client_id}: {e}")
raise
class WebSocketManager:
"""
Manages WebSocket connections and broadcasts
Singleton for coordinating real-time updates
"""
_instance: Optional['WebSocketManager'] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
self._initialized = True
self.clients: Dict[str, WebSocketClient] = {}
self._lock = asyncio.Lock()
# Start background tasks
self.ping_task = None
self.cleanup_task = None
logger.info("WebSocketManager initialized")
async def start_background_tasks(self):
"""Start background maintenance tasks"""
if not self.ping_task:
self.ping_task = asyncio.create_task(self._ping_clients_loop())
if not self.cleanup_task:
self.cleanup_task = asyncio.create_task(self._cleanup_disconnected_loop())
async def stop_background_tasks(self):
"""Stop background maintenance tasks"""
if self.ping_task:
self.ping_task.cancel()
try:
await self.ping_task
except asyncio.CancelledError:
pass
if self.cleanup_task:
self.cleanup_task.cancel()
try:
await self.cleanup_task
except asyncio.CancelledError:
pass
async def register_client(self, user_id: Optional[str] = None) -> WebSocketClient:
"""
Register a new WebSocket client
Args:
user_id: User identifier (optional for dev mode)
Returns:
WebSocketClient instance
"""
client_id = str(uuid.uuid4())
client = WebSocketClient(client_id, user_id)
async with self._lock:
self.clients[client_id] = client
logger.info(f"Registered WebSocket client {client_id} for user {user_id}")
# Send initial connection acknowledgment
await client.send({
'type': 'connection.established',
'clientId': client_id,
'userId': user_id,
'connectedAt': client.connected_at.isoformat()
})
return client
async def unregister_client(self, client_id: str):
"""
Unregister a WebSocket client
Args:
client_id: Client identifier
"""
async with self._lock:
if client_id in self.clients:
client = self.clients.pop(client_id)
logger.info(f"Unregistered WebSocket client {client_id} for user {client.user_id}")
async def broadcast_to_all(self, message: Dict[str, Any]):
"""
Broadcast message to all connected clients
Args:
message: Message to broadcast
"""
if not self.clients:
return
# Add timestamp to message
message['timestamp'] = datetime.utcnow().isoformat()
async with self._lock:
clients_to_remove = []
for client_id, client in self.clients.items():
try:
await client.send(message)
except Exception as e:
logger.warning(f"Failed to send to client {client_id}: {e}")
clients_to_remove.append(client_id)
# Remove failed clients
for client_id in clients_to_remove:
self.clients.pop(client_id, None)
async def broadcast_to_user(self, user_id: str, message: Dict[str, Any]):
"""
Broadcast message to all connections for a specific user
Args:
user_id: User identifier
message: Message to broadcast
"""
if not self.clients:
return
# Add timestamp to message
message['timestamp'] = datetime.utcnow().isoformat()
async with self._lock:
clients_to_remove = []
sent_count = 0
for client_id, client in self.clients.items():
if client.user_id == user_id:
try:
await client.send(message)
sent_count += 1
except Exception as e:
logger.warning(f"Failed to send to client {client_id}: {e}")
clients_to_remove.append(client_id)
# Remove failed clients
for client_id in clients_to_remove:
self.clients.pop(client_id, None)
if sent_count > 0:
logger.debug(f"Broadcast message to {sent_count} clients for user {user_id}")
async def broadcast_job_update(self, job_id: str, message: Dict[str, Any]):
"""
Broadcast job-specific update
Args:
job_id: Job identifier
message: Message to broadcast
"""
# For now, broadcast to all clients
# In the future, we could implement job-specific subscriptions
message['jobId'] = job_id
await self.broadcast_to_all(message)
async def send_queue_snapshot(self, client: WebSocketClient, jobs_data: list):
"""
Send initial queue snapshot to a client
Args:
client: WebSocket client
jobs_data: Serialized jobs data
"""
try:
await client.send({
'type': 'queue.snapshot',
'jobs': jobs_data
})
logger.debug(f"Sent queue snapshot to client {client.client_id}")
except Exception as e:
logger.error(f"Failed to send queue snapshot to {client.client_id}: {e}")
raise
async def get_connection_stats(self) -> Dict[str, Any]:
"""
Get WebSocket connection statistics
Returns:
Statistics dictionary
"""
async with self._lock:
user_counts = {}
for client in self.clients.values():
user_counts[client.user_id] = user_counts.get(client.user_id, 0) + 1
return {
'total_connections': len(self.clients),
'unique_users': len(user_counts),
'connections_per_user': user_counts,
'uptime_seconds': (datetime.utcnow() -
min((c.connected_at for c in self.clients.values()),
default=datetime.utcnow())).total_seconds()
}
async def _ping_clients_loop(self):
"""Background task to ping clients periodically"""
while True:
try:
await asyncio.sleep(server_config.WS_PING_INTERVAL_SECONDS)
async with self._lock:
clients_to_remove = []
for client_id, client in self.clients.items():
try:
await client.ping()
except Exception:
clients_to_remove.append(client_id)
# Remove failed clients
for client_id in clients_to_remove:
self.clients.pop(client_id, None)
logger.debug(f"Removed unresponsive client {client_id}")
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in ping loop: {e}")
async def _cleanup_disconnected_loop(self):
"""Background task to clean up disconnected clients"""
while True:
try:
await asyncio.sleep(60) # Check every minute
async with self._lock:
# Clean up clients that haven't been pinged recently
cutoff = datetime.utcnow().timestamp() - (server_config.WS_PING_INTERVAL_SECONDS * 3)
clients_to_remove = []
for client_id, client in self.clients.items():
if client.last_ping.timestamp() < cutoff:
clients_to_remove.append(client_id)
for client_id in clients_to_remove:
self.clients.pop(client_id, None)
logger.debug(f"Cleaned up stale client {client_id}")
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in cleanup loop: {e}")
# Global instance
ws_manager = WebSocketManager()