semblance-dev/backend/app/websocket_manager.py
Vadym Samoilenko 3e1865edbd Apply Jintech security audit remediation (sprint 3) — 87/92 findings fixed
- Fix missing await on FocusGroup.get_messages() (N-L1)
- Replace time.sleep with asyncio.sleep in key_theme_service and focus_group_service (N-P10)
- Replace flask import with quart in focus_groups.py (N-S3)
- Add logger.error before all 500 returns in focus_groups.py (N-P6)
- Add logging to silent except blocks across routes (N-M10, N-M11)
- Add @rate_limit to 6 remaining AI endpoints (N-H4)
- Add --confirm flag to populate scripts before delete_many (S-H2)
- Remove hardcoded Azure ID fallbacks from msal_service.py and msalConfig.ts (A-M2, F-H4)
- Centralize make_serializable() in utils.py, remove duplicates from 3 route files (N-P7)
- Replace all datetime.utcnow() with datetime.now(timezone.utc) across entire backend (M-L2)
- AuthContext.tsx: only mark token validated on 200 success, not on non-401 errors (F-H2)
- Rename authType → auth_type in auth.py (N-S4)
- Add security_report.md and security_report.pdf with full 92-finding status

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 12:51:18 +00:00

353 lines
No EOL
14 KiB
Python
Executable file

"""
WebSocket Manager for Synthetic Society
Handles WebSocket connections, room management, and real-time event broadcasting.
GPT-5 Fix: Implements queue-based emitting system to resolve greenlet/threading issues
during AI mode that prevented real-time message delivery.
"""
import logging
from typing import Dict, Set, Any, Optional
from datetime import datetime, timezone
from .extensions import socketio_server as socketio # Import singleton SocketIO instance
from app.auth.quart_jwt import decode_token
from queue import Queue
# Set up logging
logger = logging.getLogger(__name__)
# GPT-5 Fix: Queue-based emitter system to prevent cross-greenlet/thread issues
_emit_queue = Queue()
_emitter_started = False
def _start_emitter_if_needed():
"""Start the background emitter task if it hasn't been started yet."""
global _emitter_started
if _emitter_started:
return
_emitter_started = True
def _drain():
"""Background task that drains the emit queue and sends events in eventlet greenlet."""
while True:
try:
event, data, room = _emit_queue.get()
# Single place to emit - runs in correct eventlet greenlet
socketio.emit(event, data, to=room, namespace="/")
# Yield to let engine/transport flush immediately
socketio.sleep(0)
except Exception as e:
if current_app:
current_app.logger.exception("Emitter error: %s", e)
else:
logger.exception("Emitter error: %s", e)
socketio.sleep(0)
socketio.start_background_task(_drain)
logger.info("Started queue-based WebSocket emitter background task")
def emit_websocket_event(event: str, data: dict, room: str | None = None) -> None:
"""
Safe to call from ANY context (asyncio task, worker thread, request thread).
GPT-5 Fix: This replaces all direct socketio.emit() calls to prevent
"Cannot switch to a different thread" errors during AI mode.
"""
_start_emitter_if_needed()
_emit_queue.put((event, data, room))
class WebSocketManager:
"""Manages WebSocket connections and rooms for focus group sessions."""
def __init__(self):
# Use singleton SocketIO instance directly (GPT-5 fix)
self.socketio = socketio
self.focus_group_rooms: Dict[str, Set[str]] = {} # focus_group_id -> set of session_ids
self.user_sessions: Dict[str, Dict[str, Any]] = {} # session_id -> user info
# Register SocketIO event handlers
self._register_handlers()
# No longer need thread-safe manager - using singleton SocketIO pattern
def _register_handlers(self):
"""Register all WebSocket event handlers."""
@self.socketio.on('connect')
def handle_connect(sid, environ, auth=None):
"""Handle WebSocket connection."""
logger.info(f"WebSocket connection attempt from {sid}")
# Validate JWT token from auth data
if not auth or 'token' not in auth:
logger.warning(f"Connection rejected - no auth token provided")
disconnect()
return False
try:
# Decode and validate JWT token
token = auth['token']
decoded_token = decode_token(token)
user_id = decoded_token['sub']
# Store user session info
self.user_sessions[sid] = {
'user_id': user_id,
'connected_at': datetime.now(timezone.utc),
'focus_groups': set()
}
logger.info(f"WebSocket connected - Session: {sid}, User: {user_id}")
# Emit connection success
socketio.emit('connected', {'status': 'success', 'session_id': sid}, to=sid)
except Exception as e:
logger.error(f"Connection authentication failed: {e}")
socketio.disconnect(sid)
return False
@self.socketio.on('disconnect')
def handle_disconnect(sid):
"""Handle WebSocket disconnection."""
if sid in self.user_sessions:
user_info = self.user_sessions[sid]
user_id = user_info['user_id']
# Leave all focus group rooms
for focus_group_id in user_info['focus_groups'].copy():
self._leave_focus_group_room(sid, focus_group_id)
# Clean up session
del self.user_sessions[sid]
logger.info(f"WebSocket disconnected - Session: {sid}, User: {user_id}")
@self.socketio.on('join_focus_group')
def handle_join_focus_group(sid, data):
"""Handle joining a focus group room."""
if sid not in self.user_sessions:
socketio.emit('error', {'message': 'Session not authenticated'}, to=sid)
return
focus_group_id = data.get('focus_group_id')
if not focus_group_id:
socketio.emit('error', {'message': 'Focus group ID required'}, to=sid)
return
# Join the room
success = self._join_focus_group_room(sid, focus_group_id)
if success:
socketio.emit('joined_focus_group', {
'focus_group_id': focus_group_id,
'status': 'success'
}, to=sid)
logger.info(f"User joined focus group room - Session: {sid}, Group: {focus_group_id}")
else:
socketio.emit('error', {'message': 'Failed to join focus group'}, to=sid)
@self.socketio.on('leave_focus_group')
def handle_leave_focus_group(sid, data):
"""Handle leaving a focus group room."""
if sid not in self.user_sessions:
socketio.emit('error', {'message': 'Session not authenticated'}, to=sid)
return
focus_group_id = data.get('focus_group_id')
if not focus_group_id:
socketio.emit('error', {'message': 'Focus group ID required'}, to=sid)
return
# Leave the room
success = self._leave_focus_group_room(sid, focus_group_id)
if success:
socketio.emit('left_focus_group', {
'focus_group_id': focus_group_id,
'status': 'success'
}, to=sid)
logger.info(f"User left focus group room - Session: {sid}, Group: {focus_group_id}")
def _join_focus_group_room(self, session_id: str, focus_group_id: str) -> bool:
"""Join a user session to a focus group room."""
try:
# Add to SocketIO room
socketio.enter_room(session_id, focus_group_id)
# Track in our data structures
if focus_group_id not in self.focus_group_rooms:
self.focus_group_rooms[focus_group_id] = set()
self.focus_group_rooms[focus_group_id].add(session_id)
self.user_sessions[session_id]['focus_groups'].add(focus_group_id)
return True
except Exception as e:
logger.error(f"Failed to join focus group room: {e}")
return False
def _leave_focus_group_room(self, session_id: str, focus_group_id: str) -> bool:
"""Remove a user session from a focus group room."""
try:
# Leave SocketIO room
socketio.leave_room(session_id, focus_group_id)
# Clean up tracking
if focus_group_id in self.focus_group_rooms:
self.focus_group_rooms[focus_group_id].discard(session_id)
# Remove room if empty
if not self.focus_group_rooms[focus_group_id]:
del self.focus_group_rooms[focus_group_id]
if session_id in self.user_sessions:
self.user_sessions[session_id]['focus_groups'].discard(focus_group_id)
return True
except Exception as e:
logger.error(f"Failed to leave focus group room: {e}")
return False
def emit_to_focus_group(self, focus_group_id: str, event: str, data: Any, include_sender: bool = True, sender_session_id: Optional[str] = None):
"""Emit an event to all users in a focus group room."""
try:
if focus_group_id not in self.focus_group_rooms:
logger.debug(f"No active sessions for focus group {focus_group_id}")
return
room_name = focus_group_id
room_sessions = self.focus_group_rooms[focus_group_id].copy()
# Clean up stale sessions
active_sessions = []
stale_sessions = []
for session_id in room_sessions:
if session_id in self.user_sessions:
active_sessions.append(session_id)
else:
stale_sessions.append(session_id)
self.focus_group_rooms[focus_group_id].discard(session_id)
if stale_sessions:
logger.debug(f"Cleaned up {len(stale_sessions)} stale sessions")
if not active_sessions:
logger.debug(f"No active sessions remaining for focus group {focus_group_id} after cleanup")
return
# Prepare the event data
event_data = {
'focus_group_id': focus_group_id,
'timestamp': datetime.now(timezone.utc).isoformat(),
**data
}
if include_sender or not sender_session_id:
emit_websocket_event(event, event_data, room_name)
logger.debug(f"Emitted '{event}' to focus group {focus_group_id} ({len(active_sessions)} active users)")
else:
# Send to all users except the sender
for session_id in self.focus_group_rooms[focus_group_id]:
if session_id != sender_session_id:
emit_websocket_event(event, event_data, session_id)
logger.debug(f"Emitted '{event}' to focus group {focus_group_id} (excluding sender)")
except Exception as e:
logger.error(f"Failed to emit to focus group {focus_group_id}: {e}")
# _register_with_thread_safe_manager method removed - no longer needed with singleton pattern
def emit_message_update(self, focus_group_id: str, message_data: Dict[str, Any], sender_session_id: Optional[str] = None):
"""Emit a new message to focus group participants."""
self.emit_to_focus_group(
focus_group_id,
'message_update',
{'message': message_data},
include_sender=True,
sender_session_id=sender_session_id
)
def emit_ai_status_update(self, focus_group_id: str, status_data: Dict[str, Any]):
"""Emit AI status change to focus group participants."""
self.emit_to_focus_group(
focus_group_id,
'ai_status_update',
{'status': status_data}
)
def emit_moderator_status_update(self, focus_group_id: str, moderator_status: Dict[str, Any]):
"""Emit moderator status change to focus group participants."""
self.emit_to_focus_group(
focus_group_id,
'moderator_status_update',
{'moderator_status': moderator_status}
)
def emit_theme_update(self, focus_group_id: str, theme_data: Dict[str, Any], action: str = 'added'):
"""Emit theme update to focus group participants."""
self.emit_to_focus_group(
focus_group_id,
'theme_update',
{'theme': theme_data, 'action': action}
)
def emit_analytics_update(self, focus_group_id: str, analytics_data: Dict[str, Any]):
"""Emit analytics update to focus group participants."""
self.emit_to_focus_group(
focus_group_id,
'analytics_update',
{'analytics': analytics_data}
)
def emit_conversation_state_update(self, focus_group_id: str, state_data: Dict[str, Any]):
"""Emit conversation state update to focus group participants."""
self.emit_to_focus_group(
focus_group_id,
'conversation_state_update',
{'state': state_data}
)
def get_room_info(self, focus_group_id: str) -> Dict[str, Any]:
"""Get information about a focus group room."""
if focus_group_id not in self.focus_group_rooms:
return {'active_sessions': 0, 'users': []}
sessions = self.focus_group_rooms[focus_group_id]
users = []
for session_id in sessions:
if session_id in self.user_sessions:
user_info = self.user_sessions[session_id]
users.append({
'session_id': session_id,
'user_id': user_info['user_id'],
'connected_at': user_info['connected_at'].isoformat()
})
return {
'active_sessions': len(sessions),
'users': users
}
def get_connection_stats(self) -> Dict[str, Any]:
"""Get overall connection statistics."""
return {
'total_sessions': len(self.user_sessions),
'total_focus_groups': len(self.focus_group_rooms),
'focus_group_details': {
fg_id: len(sessions) for fg_id, sessions in self.focus_group_rooms.items()
}
}
# Global WebSocket manager instance
websocket_manager: Optional[WebSocketManager] = None
def init_websocket_manager() -> WebSocketManager:
"""Initialize the global WebSocket manager using singleton SocketIO."""
global websocket_manager
websocket_manager = WebSocketManager() # No parameter needed - uses singleton
return websocket_manager
def get_websocket_manager() -> Optional[WebSocketManager]:
"""Get the global WebSocket manager instance."""
return websocket_manager