Fix AI autonomous mode: cross-loop WebSocket emit + polling fallback
The AI Runner runs on a dedicated background thread with its own asyncio event loop. When it emitted WebSocket events via sio.emit(), the call happened on the wrong loop (AI Runner's vs ASGI/Quart's), causing silent failures — messages were saved to MongoDB but never reached the frontend. Additionally, the frontend HTTP polling fallback was never enabled when WebSocket appeared connected, leaving no way to discover missed messages. - websocket_manager_async.py: store ASGI main loop reference; detect cross-loop calls in emit_to_focus_group and use run_coroutine_threadsafe to schedule emits on the correct loop - __init__.py: register the ASGI event loop with the WebSocket manager in before_serving hook - FocusGroupSession.tsx: always poll fetchMessages every 3s during AI mode as a reliability fallback regardless of WebSocket status Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
33272cc677
commit
b4978989a5
3 changed files with 54 additions and 6 deletions
|
|
@ -160,7 +160,12 @@ def create_app():
|
|||
async def start_task_sweeper():
|
||||
import asyncio
|
||||
from app.services.task_manager import get_task_manager
|
||||
from app.websocket_manager_async import get_async_websocket_manager
|
||||
asyncio.create_task(get_task_manager().start_sweeper())
|
||||
# Register the ASGI event loop so cross-thread WebSocket emits (from AI Runner) work
|
||||
ws_mgr = get_async_websocket_manager()
|
||||
if ws_mgr:
|
||||
ws_mgr.set_main_loop(asyncio.get_running_loop())
|
||||
|
||||
# Health check endpoint
|
||||
@app.route('/api/health', methods=['GET'])
|
||||
|
|
|
|||
|
|
@ -18,15 +18,23 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
class AsyncWebSocketManager:
|
||||
"""Manages WebSocket connections and rooms for focus group sessions using AsyncServer."""
|
||||
|
||||
|
||||
def __init__(self):
|
||||
# Use singleton SocketIO AsyncServer instance
|
||||
self.sio = sio
|
||||
self.focus_group_rooms: Dict[str, Set[str]] = {} # focus_group_id -> set of session_ids
|
||||
self.user_sessions: Dict[str, Dict[str, Any]] = {} # session_id -> user info
|
||||
|
||||
|
||||
# Main ASGI event loop reference (set via set_main_loop during app startup)
|
||||
self._main_loop: Optional[asyncio.AbstractEventLoop] = None
|
||||
|
||||
# Register SocketIO event handlers
|
||||
self._register_handlers()
|
||||
|
||||
def set_main_loop(self, loop: asyncio.AbstractEventLoop) -> None:
|
||||
"""Store the main ASGI event loop for cross-thread emission."""
|
||||
self._main_loop = loop
|
||||
logger.info("AsyncWebSocketManager: main event loop registered")
|
||||
|
||||
def _register_handlers(self):
|
||||
"""Register all WebSocket event handlers."""
|
||||
|
|
@ -342,18 +350,42 @@ class AsyncWebSocketManager:
|
|||
**data
|
||||
}
|
||||
|
||||
# Detect if we're running on a different event loop than the ASGI server
|
||||
# (happens when called from the AI Runner's background thread)
|
||||
cross_loop = False
|
||||
if self._main_loop is not None:
|
||||
try:
|
||||
current_loop = asyncio.get_running_loop()
|
||||
cross_loop = (current_loop is not self._main_loop)
|
||||
except RuntimeError:
|
||||
cross_loop = True # No running loop on this thread
|
||||
|
||||
if include_sender or not sender_session_id:
|
||||
# Send to all users in the room using AsyncServer
|
||||
print(f"🔔 ASYNC Emitting '{event}' to room {room_name} with data keys: {list(event_data.keys())}")
|
||||
await self.sio.emit(event, event_data, room=room_name)
|
||||
|
||||
print(f"🔔 ASYNC Emitting '{event}' to room {room_name} (cross_loop={cross_loop})")
|
||||
if cross_loop:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self.sio.emit(event, event_data, room=room_name),
|
||||
self._main_loop
|
||||
)
|
||||
future.result(timeout=5)
|
||||
else:
|
||||
await self.sio.emit(event, event_data, room=room_name)
|
||||
|
||||
print(f"🔔 ASYNC Successfully emitted '{event}' to focus group {focus_group_id} ({len(active_sessions)} active users)")
|
||||
logger.debug(f"Emitted '{event}' to focus group {focus_group_id} ({len(active_sessions)} active users)")
|
||||
else:
|
||||
# Send to all users except the sender
|
||||
for session_id in active_sessions:
|
||||
if session_id != sender_session_id:
|
||||
await self.sio.emit(event, event_data, to=session_id)
|
||||
if cross_loop:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self.sio.emit(event, event_data, to=session_id),
|
||||
self._main_loop
|
||||
)
|
||||
future.result(timeout=5)
|
||||
else:
|
||||
await self.sio.emit(event, event_data, to=session_id)
|
||||
logger.debug(f"Emitted '{event}' to focus group {focus_group_id} (excluding sender)")
|
||||
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -340,6 +340,17 @@ const FocusGroupSession = () => {
|
|||
}, [wsConnected, wsConnecting, wsError, useWebSocketEnabled, id]);
|
||||
|
||||
|
||||
// Poll for new messages during AI mode (reliability fallback for cross-loop WebSocket emit issues)
|
||||
useEffect(() => {
|
||||
if (!isAiModeActive || !id) return;
|
||||
|
||||
const interval = window.setInterval(() => {
|
||||
fetchMessages();
|
||||
}, 3000);
|
||||
|
||||
return () => window.clearInterval(interval);
|
||||
}, [isAiModeActive, id]);
|
||||
|
||||
// Notification for polling fallback when WebSocket is disabled
|
||||
useEffect(() => {
|
||||
if (!useWebSocketEnabled && id && focusGroup) {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue