""" WebSocket Manager for Synthetic Society Handles WebSocket connections, room management, and real-time event broadcasting. GPT-5 Fix: Implements queue-based emitting system to resolve greenlet/threading issues during AI mode that prevented real-time message delivery. """ import logging from typing import Dict, Set, Any, Optional from datetime import datetime, timezone from .extensions import socketio_server as socketio # Import singleton SocketIO instance from app.auth.quart_jwt import decode_token from queue import Queue # Set up logging logger = logging.getLogger(__name__) # GPT-5 Fix: Queue-based emitter system to prevent cross-greenlet/thread issues _emit_queue = Queue() _emitter_started = False def _start_emitter_if_needed(): """Start the background emitter task if it hasn't been started yet.""" global _emitter_started if _emitter_started: return _emitter_started = True def _drain(): """Background task that drains the emit queue and sends events in eventlet greenlet.""" while True: try: event, data, room = _emit_queue.get() # Single place to emit - runs in correct eventlet greenlet socketio.emit(event, data, to=room, namespace="/") # Yield to let engine/transport flush immediately socketio.sleep(0) except Exception as e: if current_app: current_app.logger.exception("Emitter error: %s", e) else: logger.exception("Emitter error: %s", e) socketio.sleep(0) socketio.start_background_task(_drain) logger.info("Started queue-based WebSocket emitter background task") def emit_websocket_event(event: str, data: dict, room: str | None = None) -> None: """ Safe to call from ANY context (asyncio task, worker thread, request thread). GPT-5 Fix: This replaces all direct socketio.emit() calls to prevent "Cannot switch to a different thread" errors during AI mode. """ _start_emitter_if_needed() _emit_queue.put((event, data, room)) class WebSocketManager: """Manages WebSocket connections and rooms for focus group sessions.""" def __init__(self): # Use singleton SocketIO instance directly (GPT-5 fix) self.socketio = socketio self.focus_group_rooms: Dict[str, Set[str]] = {} # focus_group_id -> set of session_ids self.user_sessions: Dict[str, Dict[str, Any]] = {} # session_id -> user info # Register SocketIO event handlers self._register_handlers() # No longer need thread-safe manager - using singleton SocketIO pattern def _register_handlers(self): """Register all WebSocket event handlers.""" @self.socketio.on('connect') def handle_connect(sid, environ, auth=None): """Handle WebSocket connection.""" logger.info(f"WebSocket connection attempt from {sid}") # Validate JWT token from auth data if not auth or 'token' not in auth: logger.warning(f"Connection rejected - no auth token provided") disconnect() return False try: # Decode and validate JWT token token = auth['token'] decoded_token = decode_token(token) user_id = decoded_token['sub'] # Store user session info self.user_sessions[sid] = { 'user_id': user_id, 'connected_at': datetime.now(timezone.utc), 'focus_groups': set() } logger.info(f"WebSocket connected - Session: {sid}, User: {user_id}") # Emit connection success socketio.emit('connected', {'status': 'success', 'session_id': sid}, to=sid) except Exception as e: logger.error(f"Connection authentication failed: {e}") socketio.disconnect(sid) return False @self.socketio.on('disconnect') def handle_disconnect(sid): """Handle WebSocket disconnection.""" if sid in self.user_sessions: user_info = self.user_sessions[sid] user_id = user_info['user_id'] # Leave all focus group rooms for focus_group_id in user_info['focus_groups'].copy(): self._leave_focus_group_room(sid, focus_group_id) # Clean up session del self.user_sessions[sid] logger.info(f"WebSocket disconnected - Session: {sid}, User: {user_id}") @self.socketio.on('join_focus_group') def handle_join_focus_group(sid, data): """Handle joining a focus group room.""" if sid not in self.user_sessions: socketio.emit('error', {'message': 'Session not authenticated'}, to=sid) return focus_group_id = data.get('focus_group_id') if not focus_group_id: socketio.emit('error', {'message': 'Focus group ID required'}, to=sid) return # Join the room success = self._join_focus_group_room(sid, focus_group_id) if success: socketio.emit('joined_focus_group', { 'focus_group_id': focus_group_id, 'status': 'success' }, to=sid) logger.info(f"User joined focus group room - Session: {sid}, Group: {focus_group_id}") else: socketio.emit('error', {'message': 'Failed to join focus group'}, to=sid) @self.socketio.on('leave_focus_group') def handle_leave_focus_group(sid, data): """Handle leaving a focus group room.""" if sid not in self.user_sessions: socketio.emit('error', {'message': 'Session not authenticated'}, to=sid) return focus_group_id = data.get('focus_group_id') if not focus_group_id: socketio.emit('error', {'message': 'Focus group ID required'}, to=sid) return # Leave the room success = self._leave_focus_group_room(sid, focus_group_id) if success: socketio.emit('left_focus_group', { 'focus_group_id': focus_group_id, 'status': 'success' }, to=sid) logger.info(f"User left focus group room - Session: {sid}, Group: {focus_group_id}") def _join_focus_group_room(self, session_id: str, focus_group_id: str) -> bool: """Join a user session to a focus group room.""" try: # Add to SocketIO room socketio.enter_room(session_id, focus_group_id) # Track in our data structures if focus_group_id not in self.focus_group_rooms: self.focus_group_rooms[focus_group_id] = set() self.focus_group_rooms[focus_group_id].add(session_id) self.user_sessions[session_id]['focus_groups'].add(focus_group_id) return True except Exception as e: logger.error(f"Failed to join focus group room: {e}") return False def _leave_focus_group_room(self, session_id: str, focus_group_id: str) -> bool: """Remove a user session from a focus group room.""" try: # Leave SocketIO room socketio.leave_room(session_id, focus_group_id) # Clean up tracking if focus_group_id in self.focus_group_rooms: self.focus_group_rooms[focus_group_id].discard(session_id) # Remove room if empty if not self.focus_group_rooms[focus_group_id]: del self.focus_group_rooms[focus_group_id] if session_id in self.user_sessions: self.user_sessions[session_id]['focus_groups'].discard(focus_group_id) return True except Exception as e: logger.error(f"Failed to leave focus group room: {e}") return False def emit_to_focus_group(self, focus_group_id: str, event: str, data: Any, include_sender: bool = True, sender_session_id: Optional[str] = None): """Emit an event to all users in a focus group room.""" try: if focus_group_id not in self.focus_group_rooms: logger.debug(f"No active sessions for focus group {focus_group_id}") return room_name = focus_group_id room_sessions = self.focus_group_rooms[focus_group_id].copy() # Clean up stale sessions active_sessions = [] stale_sessions = [] for session_id in room_sessions: if session_id in self.user_sessions: active_sessions.append(session_id) else: stale_sessions.append(session_id) self.focus_group_rooms[focus_group_id].discard(session_id) if stale_sessions: logger.debug(f"Cleaned up {len(stale_sessions)} stale sessions") if not active_sessions: logger.debug(f"No active sessions remaining for focus group {focus_group_id} after cleanup") return # Prepare the event data event_data = { 'focus_group_id': focus_group_id, 'timestamp': datetime.now(timezone.utc).isoformat(), **data } if include_sender or not sender_session_id: emit_websocket_event(event, event_data, room_name) logger.debug(f"Emitted '{event}' to focus group {focus_group_id} ({len(active_sessions)} active users)") else: # Send to all users except the sender for session_id in self.focus_group_rooms[focus_group_id]: if session_id != sender_session_id: emit_websocket_event(event, event_data, session_id) logger.debug(f"Emitted '{event}' to focus group {focus_group_id} (excluding sender)") except Exception as e: logger.error(f"Failed to emit to focus group {focus_group_id}: {e}") # _register_with_thread_safe_manager method removed - no longer needed with singleton pattern def emit_message_update(self, focus_group_id: str, message_data: Dict[str, Any], sender_session_id: Optional[str] = None): """Emit a new message to focus group participants.""" self.emit_to_focus_group( focus_group_id, 'message_update', {'message': message_data}, include_sender=True, sender_session_id=sender_session_id ) def emit_ai_status_update(self, focus_group_id: str, status_data: Dict[str, Any]): """Emit AI status change to focus group participants.""" self.emit_to_focus_group( focus_group_id, 'ai_status_update', {'status': status_data} ) def emit_moderator_status_update(self, focus_group_id: str, moderator_status: Dict[str, Any]): """Emit moderator status change to focus group participants.""" self.emit_to_focus_group( focus_group_id, 'moderator_status_update', {'moderator_status': moderator_status} ) def emit_theme_update(self, focus_group_id: str, theme_data: Dict[str, Any], action: str = 'added'): """Emit theme update to focus group participants.""" self.emit_to_focus_group( focus_group_id, 'theme_update', {'theme': theme_data, 'action': action} ) def emit_analytics_update(self, focus_group_id: str, analytics_data: Dict[str, Any]): """Emit analytics update to focus group participants.""" self.emit_to_focus_group( focus_group_id, 'analytics_update', {'analytics': analytics_data} ) def emit_conversation_state_update(self, focus_group_id: str, state_data: Dict[str, Any]): """Emit conversation state update to focus group participants.""" self.emit_to_focus_group( focus_group_id, 'conversation_state_update', {'state': state_data} ) def get_room_info(self, focus_group_id: str) -> Dict[str, Any]: """Get information about a focus group room.""" if focus_group_id not in self.focus_group_rooms: return {'active_sessions': 0, 'users': []} sessions = self.focus_group_rooms[focus_group_id] users = [] for session_id in sessions: if session_id in self.user_sessions: user_info = self.user_sessions[session_id] users.append({ 'session_id': session_id, 'user_id': user_info['user_id'], 'connected_at': user_info['connected_at'].isoformat() }) return { 'active_sessions': len(sessions), 'users': users } def get_connection_stats(self) -> Dict[str, Any]: """Get overall connection statistics.""" return { 'total_sessions': len(self.user_sessions), 'total_focus_groups': len(self.focus_group_rooms), 'focus_group_details': { fg_id: len(sessions) for fg_id, sessions in self.focus_group_rooms.items() } } # Global WebSocket manager instance websocket_manager: Optional[WebSocketManager] = None def init_websocket_manager() -> WebSocketManager: """Initialize the global WebSocket manager using singleton SocketIO.""" global websocket_manager websocket_manager = WebSocketManager() # No parameter needed - uses singleton return websocket_manager def get_websocket_manager() -> Optional[WebSocketManager]: """Get the global WebSocket manager instance.""" return websocket_manager