diff --git a/backend/app/__init__.py b/backend/app/__init__.py index e776e5ff..e03ff813 100755 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -160,7 +160,12 @@ def create_app(): async def start_task_sweeper(): import asyncio from app.services.task_manager import get_task_manager + from app.websocket_manager_async import get_async_websocket_manager asyncio.create_task(get_task_manager().start_sweeper()) + # Register the ASGI event loop so cross-thread WebSocket emits (from AI Runner) work + ws_mgr = get_async_websocket_manager() + if ws_mgr: + ws_mgr.set_main_loop(asyncio.get_running_loop()) # Health check endpoint @app.route('/api/health', methods=['GET']) diff --git a/backend/app/websocket_manager_async.py b/backend/app/websocket_manager_async.py index 32525712..cd31f4c5 100755 --- a/backend/app/websocket_manager_async.py +++ b/backend/app/websocket_manager_async.py @@ -18,15 +18,23 @@ logger = logging.getLogger(__name__) class AsyncWebSocketManager: """Manages WebSocket connections and rooms for focus group sessions using AsyncServer.""" - + def __init__(self): # Use singleton SocketIO AsyncServer instance self.sio = sio self.focus_group_rooms: Dict[str, Set[str]] = {} # focus_group_id -> set of session_ids self.user_sessions: Dict[str, Dict[str, Any]] = {} # session_id -> user info - + + # Main ASGI event loop reference (set via set_main_loop during app startup) + self._main_loop: Optional[asyncio.AbstractEventLoop] = None + # Register SocketIO event handlers self._register_handlers() + + def set_main_loop(self, loop: asyncio.AbstractEventLoop) -> None: + """Store the main ASGI event loop for cross-thread emission.""" + self._main_loop = loop + logger.info("AsyncWebSocketManager: main event loop registered") def _register_handlers(self): """Register all WebSocket event handlers.""" @@ -342,18 +350,42 @@ class AsyncWebSocketManager: **data } + # Detect if we're running on a different event loop than the ASGI server + # (happens when called from the AI Runner's background thread) + cross_loop = False + if self._main_loop is not None: + try: + current_loop = asyncio.get_running_loop() + cross_loop = (current_loop is not self._main_loop) + except RuntimeError: + cross_loop = True # No running loop on this thread + if include_sender or not sender_session_id: # Send to all users in the room using AsyncServer - print(f"🔔 ASYNC Emitting '{event}' to room {room_name} with data keys: {list(event_data.keys())}") - await self.sio.emit(event, event_data, room=room_name) - + print(f"🔔 ASYNC Emitting '{event}' to room {room_name} (cross_loop={cross_loop})") + if cross_loop: + future = asyncio.run_coroutine_threadsafe( + self.sio.emit(event, event_data, room=room_name), + self._main_loop + ) + future.result(timeout=5) + else: + await self.sio.emit(event, event_data, room=room_name) + print(f"🔔 ASYNC Successfully emitted '{event}' to focus group {focus_group_id} ({len(active_sessions)} active users)") logger.debug(f"Emitted '{event}' to focus group {focus_group_id} ({len(active_sessions)} active users)") else: # Send to all users except the sender for session_id in active_sessions: if session_id != sender_session_id: - await self.sio.emit(event, event_data, to=session_id) + if cross_loop: + future = asyncio.run_coroutine_threadsafe( + self.sio.emit(event, event_data, to=session_id), + self._main_loop + ) + future.result(timeout=5) + else: + await self.sio.emit(event, event_data, to=session_id) logger.debug(f"Emitted '{event}' to focus group {focus_group_id} (excluding sender)") except Exception as e: diff --git a/src/pages/FocusGroupSession.tsx b/src/pages/FocusGroupSession.tsx index a55f9587..ee048917 100755 --- a/src/pages/FocusGroupSession.tsx +++ b/src/pages/FocusGroupSession.tsx @@ -340,6 +340,17 @@ const FocusGroupSession = () => { }, [wsConnected, wsConnecting, wsError, useWebSocketEnabled, id]); + // Poll for new messages during AI mode (reliability fallback for cross-loop WebSocket emit issues) + useEffect(() => { + if (!isAiModeActive || !id) return; + + const interval = window.setInterval(() => { + fetchMessages(); + }, 3000); + + return () => window.clearInterval(interval); + }, [isAiModeActive, id]); + // Notification for polling fallback when WebSocket is disabled useEffect(() => { if (!useWebSocketEnabled && id && focusGroup) {