bug fixes related to websockets implementation
This commit is contained in:
parent
d1e8e8e9a9
commit
36465862fc
22 changed files with 75 additions and 1560 deletions
BIN
.DS_Store
vendored
BIN
.DS_Store
vendored
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -13,8 +13,8 @@ socketio = SocketIO(
|
|||
async_mode="eventlet",
|
||||
ping_timeout=120, # 2 minutes timeout for ping response
|
||||
ping_interval=45, # Send ping every 45 seconds
|
||||
logger=True, # Enable debugging while fixing the issue
|
||||
engineio_logger=True # Enable debugging while fixing the issue
|
||||
logger=False, # Disable verbose socketio logging (reduces log noise)
|
||||
engineio_logger=False # Disable verbose engineio logging (reduces PING/PONG spam)
|
||||
)
|
||||
|
||||
# Note: The app will be bound to this instance using socketio.init_app(app) in create_app()
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -149,17 +149,17 @@ def microsoft_login():
|
|||
try:
|
||||
data = request.get_json()
|
||||
|
||||
if not data or not data.get('access_token'):
|
||||
return jsonify({"message": "Missing Microsoft access token"}), 400
|
||||
if not data or not data.get('id_token'):
|
||||
return jsonify({"message": "Missing Microsoft ID token"}), 400
|
||||
|
||||
access_token = data.get('access_token')
|
||||
id_token = data.get('id_token')
|
||||
|
||||
# Initialize MSAL service and validate the token
|
||||
msal_service = MSALService()
|
||||
microsoft_user_info = msal_service.validate_token(access_token)
|
||||
microsoft_user_info = msal_service.validate_token(id_token)
|
||||
|
||||
if not microsoft_user_info:
|
||||
return jsonify({"message": "Invalid Microsoft access token"}), 401
|
||||
return jsonify({"message": "Invalid Microsoft ID token"}), 401
|
||||
|
||||
microsoft_id = microsoft_user_info.get('microsoft_id')
|
||||
email = microsoft_user_info.get('email')
|
||||
|
|
|
|||
|
|
@ -801,55 +801,22 @@ def stop_autonomous_conversation(focus_group_id):
|
|||
# Create autonomous conversation controller
|
||||
controller = AutonomousConversationController(focus_group_id, current_app.logger)
|
||||
|
||||
# Stop the conversation using threading to avoid event loop conflicts
|
||||
import asyncio
|
||||
import threading
|
||||
from flask import copy_current_request_context
|
||||
# Signal the running conversation loop to stop gracefully
|
||||
# No need for asyncio.run() or background task - just set flags
|
||||
from datetime import datetime
|
||||
|
||||
stop_result = {"error": "Unknown error"}
|
||||
stop_exception = None
|
||||
controller.is_running = False
|
||||
controller.conversation_state = "completed"
|
||||
|
||||
@copy_current_request_context
|
||||
def stop_conversation_thread():
|
||||
nonlocal stop_result, stop_exception
|
||||
try:
|
||||
with current_app.app_context():
|
||||
# Create a new event loop for this thread
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
# Run the async stop operation
|
||||
stop_result = loop.run_until_complete(controller.stop_conversation(reason))
|
||||
current_app.logger.info(f"Stop conversation result: {stop_result}")
|
||||
|
||||
# Close the loop
|
||||
loop.close()
|
||||
except Exception as e:
|
||||
stop_exception = e
|
||||
try:
|
||||
current_app.logger.error(f"Background stop conversation error: {e}")
|
||||
except:
|
||||
print(f"Background stop conversation error: {e}") # Fallback if logger fails
|
||||
# Update focus group status in database
|
||||
status = 'completed' if reason in ['completed', 'discussion_guide_completed', 'natural_completion'] else 'active'
|
||||
FocusGroup.update(focus_group_id, {
|
||||
'status': status,
|
||||
'autonomous_ended_at': datetime.utcnow(),
|
||||
'completion_reason': reason
|
||||
})
|
||||
|
||||
# Start the stop operation in a separate thread
|
||||
thread = threading.Thread(target=stop_conversation_thread)
|
||||
thread.start()
|
||||
|
||||
# Wait for the thread to complete (with timeout)
|
||||
thread.join(timeout=10) # 10 second timeout
|
||||
|
||||
if thread.is_alive():
|
||||
current_app.logger.error("Stop operation timed out")
|
||||
return jsonify({
|
||||
"error": "Stop operation timed out",
|
||||
"message": "The stop request is being processed in the background"
|
||||
}), 202 # Accepted but still processing
|
||||
|
||||
if stop_exception:
|
||||
raise stop_exception
|
||||
|
||||
if "error" in stop_result:
|
||||
return jsonify(stop_result), 400
|
||||
current_app.logger.info(f"Signaled autonomous conversation to stop for focus group {focus_group_id}: {reason}")
|
||||
|
||||
# Log the manual mode start event (AI mode stopped)
|
||||
try:
|
||||
|
|
@ -859,7 +826,16 @@ def stop_autonomous_conversation(focus_group_id):
|
|||
except Exception as e:
|
||||
current_app.logger.warning(f"Failed to log manual mode start event: {e}")
|
||||
|
||||
return jsonify(stop_result), 200
|
||||
# Return immediately with a success response like start_autonomous_conversation
|
||||
result = {
|
||||
"message": "Autonomous conversation stopping",
|
||||
"focus_group_id": focus_group_id,
|
||||
"state": "stopping",
|
||||
"background": True
|
||||
}
|
||||
current_app.logger.info(f"Returning success response for stop request: {result}")
|
||||
|
||||
return jsonify(result), 200
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Error stopping autonomous conversation: {str(e)}")
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -6,7 +6,7 @@ including sequential navigation through structured discussion guides.
|
|||
|
||||
from typing import Dict, List, Any, Optional, Tuple
|
||||
from flask import current_app
|
||||
from app.models.focus_group import FocusGroup
|
||||
from app.models.focus_group import FocusGroup, emit_websocket_event
|
||||
from app.services.llm_service import LLMService, LLMServiceError
|
||||
from app.utils.prompt_loader import load_prompt, PromptLoaderError
|
||||
from app.utils.discussion_guide_schema import DiscussionGuideValidator, StructuredDiscussionGuide
|
||||
|
|
@ -304,6 +304,17 @@ class AIModeratorService:
|
|||
|
||||
if update_success:
|
||||
print(f"✅ Successfully updated moderator position in database")
|
||||
|
||||
# Emit WebSocket event for moderator position change (same pattern as FocusGroup.add_message)
|
||||
try:
|
||||
moderator_status = AIModeratorService.get_moderator_status(focus_group_id)
|
||||
if "error" not in moderator_status:
|
||||
emit_websocket_event('moderator_status_update', focus_group_id, {
|
||||
'moderator_status': moderator_status
|
||||
})
|
||||
current_app.logger.debug(f"🔔 Emitted moderator_status_update websocket event for focus group {focus_group_id}")
|
||||
except Exception as e:
|
||||
current_app.logger.warning(f"Failed to emit moderator position websocket event: {str(e)}")
|
||||
else:
|
||||
print(f"❌ Failed to update moderator position in database")
|
||||
|
||||
|
|
@ -441,6 +452,17 @@ class AIModeratorService:
|
|||
'moderator_position': new_position
|
||||
})
|
||||
|
||||
# Emit WebSocket event for moderator position change (same pattern as FocusGroup.add_message)
|
||||
try:
|
||||
moderator_status = AIModeratorService.get_moderator_status(focus_group_id)
|
||||
if "error" not in moderator_status:
|
||||
emit_websocket_event('moderator_status_update', focus_group_id, {
|
||||
'moderator_status': moderator_status
|
||||
})
|
||||
current_app.logger.debug(f"🔔 Emitted moderator_status_update websocket event for focus group {focus_group_id}")
|
||||
except Exception as e:
|
||||
current_app.logger.warning(f"Failed to emit moderator position websocket event: {str(e)}")
|
||||
|
||||
return {
|
||||
"message": "Moderator position updated successfully",
|
||||
"position": new_position,
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
import requests
|
||||
import jwt
|
||||
from jwt import PyJWKClient
|
||||
import logging
|
||||
|
|
@ -14,89 +13,45 @@ class MSALService:
|
|||
|
||||
# Microsoft endpoints
|
||||
self.jwks_url = f'https://login.microsoftonline.com/{self.tenant_id}/discovery/v2.0/keys'
|
||||
self.graph_me_url = 'https://graph.microsoft.com/v1.0/me'
|
||||
|
||||
# Initialize JWK client for token verification
|
||||
self.jwks_client = PyJWKClient(self.jwks_url)
|
||||
|
||||
def validate_token(self, access_token: str) -> Optional[Dict[str, Any]]:
|
||||
def validate_token(self, id_token: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Validate a Microsoft access token and return user information.
|
||||
Validate a Microsoft ID token and return user information.
|
||||
|
||||
Args:
|
||||
access_token: The Microsoft access token to validate
|
||||
id_token: The Microsoft ID token (JWT) to validate
|
||||
|
||||
Returns:
|
||||
Dictionary containing user information if valid, None if invalid
|
||||
"""
|
||||
try:
|
||||
# First, try to get user info from Microsoft Graph API
|
||||
user_info = self._get_user_info_from_graph(access_token)
|
||||
if user_info:
|
||||
return user_info
|
||||
|
||||
# If Graph API fails, try to decode the JWT token directly
|
||||
return self._decode_jwt_token(access_token)
|
||||
# Decode and validate the ID token as a JWT
|
||||
return self._decode_jwt_token(id_token)
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Token validation failed: {str(e)}")
|
||||
current_app.logger.error(f"ID token validation failed: {str(e)}")
|
||||
return None
|
||||
|
||||
def _get_user_info_from_graph(self, access_token: str) -> Optional[Dict[str, Any]]:
|
||||
def _decode_jwt_token(self, id_token: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Get user information from Microsoft Graph API.
|
||||
Decode and validate ID token as JWT.
|
||||
|
||||
Args:
|
||||
access_token: The Microsoft access token
|
||||
|
||||
Returns:
|
||||
Dictionary containing user information if successful, None if failed
|
||||
"""
|
||||
try:
|
||||
headers = {
|
||||
'Authorization': f'Bearer {access_token}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
response = requests.get(self.graph_me_url, headers=headers, timeout=10)
|
||||
|
||||
if response.status_code == 200:
|
||||
user_data = response.json()
|
||||
|
||||
return {
|
||||
'microsoft_id': user_data.get('id'),
|
||||
'username': user_data.get('userPrincipalName', '').split('@')[0],
|
||||
'email': user_data.get('mail') or user_data.get('userPrincipalName'),
|
||||
'display_name': user_data.get('displayName', ''),
|
||||
'given_name': user_data.get('givenName', ''),
|
||||
'surname': user_data.get('surname', ''),
|
||||
'auth_type': 'microsoft'
|
||||
}
|
||||
else:
|
||||
current_app.logger.warning(f"Graph API request failed with status {response.status_code}: {response.text}")
|
||||
return None
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
current_app.logger.error(f"Graph API request failed: {str(e)}")
|
||||
return None
|
||||
|
||||
def _decode_jwt_token(self, access_token: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Decode and validate JWT token directly.
|
||||
|
||||
Args:
|
||||
access_token: The Microsoft access token (JWT)
|
||||
id_token: The Microsoft ID token (JWT) to validate
|
||||
|
||||
Returns:
|
||||
Dictionary containing user information if valid, None if invalid
|
||||
"""
|
||||
try:
|
||||
# Get the signing key
|
||||
signing_key = self.jwks_client.get_signing_key_from_jwt(access_token)
|
||||
signing_key = self.jwks_client.get_signing_key_from_jwt(id_token)
|
||||
|
||||
# Decode and validate the token
|
||||
# Decode and validate the ID token
|
||||
decoded_token = jwt.decode(
|
||||
access_token,
|
||||
id_token,
|
||||
signing_key.key,
|
||||
algorithms=['RS256'],
|
||||
audience=self.client_id,
|
||||
|
|
|
|||
|
|
@ -1,168 +0,0 @@
|
|||
"""
|
||||
Thread-Safe WebSocket Manager
|
||||
Allows WebSocket events to be emitted from background threads (AI mode) to frontend clients.
|
||||
Solves the cross-thread issue where AI processing runs in daemon threads but WebSocket
|
||||
connections exist in the main Flask thread.
|
||||
"""
|
||||
|
||||
import threading
|
||||
import queue
|
||||
import time
|
||||
from typing import Dict, Any, Optional
|
||||
from datetime import datetime
|
||||
|
||||
class ThreadSafeWebSocketManager:
|
||||
"""
|
||||
Manages WebSocket events across thread boundaries.
|
||||
|
||||
Uses a thread-safe queue to pass WebSocket events from background threads
|
||||
(like AI mode processing) to the main Flask thread where WebSocket connections exist.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# Thread-safe queue for WebSocket events
|
||||
self.event_queue = queue.Queue()
|
||||
# Main thread WebSocket manager reference
|
||||
self.main_websocket_manager = None
|
||||
# Background processing thread
|
||||
self.processing_thread = None
|
||||
self.should_stop = threading.Event()
|
||||
self.is_running = False
|
||||
|
||||
def set_main_websocket_manager(self, websocket_manager):
|
||||
"""Set the main thread WebSocket manager reference."""
|
||||
self.main_websocket_manager = websocket_manager
|
||||
|
||||
# Start background processing if not already running
|
||||
if not self.is_running:
|
||||
self.start_background_processing()
|
||||
|
||||
def start_background_processing(self):
|
||||
"""Start the background thread that processes WebSocket events."""
|
||||
if self.is_running:
|
||||
return
|
||||
|
||||
self.should_stop.clear()
|
||||
self.is_running = True
|
||||
|
||||
def process_events():
|
||||
"""Background thread function that processes queued WebSocket events."""
|
||||
print(f"🔄 ThreadSafeWebSocketManager: Background processing started in thread {threading.get_ident()}")
|
||||
|
||||
while not self.should_stop.is_set():
|
||||
try:
|
||||
# Get event from queue (blocking with timeout)
|
||||
try:
|
||||
event_data = self.event_queue.get(timeout=1.0)
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
# Process the event
|
||||
if self.main_websocket_manager and event_data:
|
||||
self._process_websocket_event(event_data)
|
||||
|
||||
# Mark task as done
|
||||
self.event_queue.task_done()
|
||||
|
||||
except Exception as e:
|
||||
print(f"🔄 ThreadSafeWebSocketManager: Error processing event: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
print(f"🔄 ThreadSafeWebSocketManager: Background processing stopped")
|
||||
self.is_running = False
|
||||
|
||||
self.processing_thread = threading.Thread(target=process_events, daemon=True)
|
||||
self.processing_thread.start()
|
||||
|
||||
def stop_background_processing(self):
|
||||
"""Stop the background processing thread."""
|
||||
if self.is_running:
|
||||
self.should_stop.set()
|
||||
if self.processing_thread:
|
||||
self.processing_thread.join(timeout=5.0)
|
||||
self.is_running = False
|
||||
|
||||
def _process_websocket_event(self, event_data: Dict[str, Any]):
|
||||
"""Process a WebSocket event in the main thread context."""
|
||||
try:
|
||||
event_type = event_data.get('event_type')
|
||||
focus_group_id = event_data.get('focus_group_id')
|
||||
data = event_data.get('data', {})
|
||||
|
||||
current_thread = threading.get_ident()
|
||||
print(f"🔄 ThreadSafeWebSocketManager: Processing {event_type} in thread {current_thread}")
|
||||
|
||||
# CRITICAL: Check if we're in the same thread as Flask-SocketIO
|
||||
main_thread = threading.main_thread()
|
||||
is_main_thread = threading.current_thread() is main_thread
|
||||
print(f"🔄 ThreadSafeWebSocketManager: Current thread is main thread: {is_main_thread}")
|
||||
print(f"🔄 ThreadSafeWebSocketManager: Main thread ID: {main_thread.ident}, Current: {current_thread}")
|
||||
|
||||
# Route to appropriate emission method
|
||||
if event_type == 'message_update':
|
||||
self.main_websocket_manager.emit_message_update(focus_group_id, data)
|
||||
elif event_type == 'ai_status_update':
|
||||
self.main_websocket_manager.emit_ai_status_update(focus_group_id, data)
|
||||
elif event_type == 'theme_update':
|
||||
theme_data = data.get('theme', {})
|
||||
action = data.get('action', 'added')
|
||||
self.main_websocket_manager.emit_theme_update(focus_group_id, theme_data, action)
|
||||
elif event_type == 'moderator_status_update':
|
||||
self.main_websocket_manager.emit_moderator_status_update(focus_group_id, data)
|
||||
elif event_type == 'analytics_update':
|
||||
self.main_websocket_manager.emit_analytics_update(focus_group_id, data)
|
||||
elif event_type == 'conversation_state_update':
|
||||
self.main_websocket_manager.emit_conversation_state_update(focus_group_id, data)
|
||||
else:
|
||||
# Generic emission
|
||||
self.main_websocket_manager.emit_to_focus_group(focus_group_id, event_type, data)
|
||||
|
||||
print(f"✅ ThreadSafeWebSocketManager: Successfully processed {event_type} for focus group {focus_group_id}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ ThreadSafeWebSocketManager: Error processing event: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
def emit_from_background_thread(self, event_type: str, focus_group_id: str, data: Dict[str, Any]):
|
||||
"""
|
||||
Emit a WebSocket event from a background thread.
|
||||
|
||||
This method can be called from any thread (including AI processing daemon threads).
|
||||
The event will be queued and processed by the main thread.
|
||||
"""
|
||||
current_thread = threading.get_ident()
|
||||
print(f"🔄 ThreadSafeWebSocketManager: Queueing {event_type} from thread {current_thread}")
|
||||
|
||||
event_data = {
|
||||
'event_type': event_type,
|
||||
'focus_group_id': focus_group_id,
|
||||
'data': data,
|
||||
'timestamp': datetime.utcnow().isoformat(),
|
||||
'source_thread': current_thread
|
||||
}
|
||||
|
||||
try:
|
||||
self.event_queue.put(event_data, timeout=5.0) # 5 second timeout
|
||||
print(f"✅ ThreadSafeWebSocketManager: Queued {event_type} for focus group {focus_group_id}")
|
||||
except queue.Full:
|
||||
print(f"❌ ThreadSafeWebSocketManager: Event queue is full, dropping {event_type}")
|
||||
except Exception as e:
|
||||
print(f"❌ ThreadSafeWebSocketManager: Error queueing event: {e}")
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
"""Get statistics about the thread-safe WebSocket manager."""
|
||||
return {
|
||||
'is_running': self.is_running,
|
||||
'queue_size': self.event_queue.qsize(),
|
||||
'has_main_manager': self.main_websocket_manager is not None,
|
||||
'processing_thread_alive': self.processing_thread.is_alive() if self.processing_thread else False
|
||||
}
|
||||
|
||||
# Global instance
|
||||
_thread_safe_manager = ThreadSafeWebSocketManager()
|
||||
|
||||
def get_thread_safe_websocket_manager() -> ThreadSafeWebSocketManager:
|
||||
"""Get the global thread-safe WebSocket manager instance."""
|
||||
return _thread_safe_manager
|
||||
|
|
@ -85,6 +85,8 @@ def setup_logging(log_level: str = 'INFO') -> None:
|
|||
logging.getLogger('werkzeug').setLevel(logging.WARNING) # Reduce Flask dev server noise
|
||||
logging.getLogger('hypercorn').setLevel(logging.WARNING) # Reduce Hypercorn noise
|
||||
logging.getLogger('hypercorn.access').setLevel(logging.WARNING) # Reduce access log noise
|
||||
logging.getLogger('engineio.server').setLevel(logging.WARNING) # Reduce WebSocket PING/PONG spam
|
||||
logging.getLogger('socketio.server').setLevel(logging.WARNING) # Reduce WebSocket noise
|
||||
|
||||
# Keep application loggers at INFO level
|
||||
logging.getLogger('app').setLevel(numeric_level)
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
File diff suppressed because one or more lines are too long
2
dist/index.html
vendored
2
dist/index.html
vendored
|
|
@ -7,7 +7,7 @@
|
|||
<meta name="description" content="Lovable Generated Project" />
|
||||
<meta name="author" content="Lovable" />
|
||||
<meta property="og:image" content="/og-image.png" />
|
||||
<script type="module" crossorigin src="/semblance/assets/index-_LzirAYA.js"></script>
|
||||
<script type="module" crossorigin src="/semblance/assets/index-C2iKqYFP.js"></script>
|
||||
<link rel="stylesheet" crossorigin href="/semblance/assets/index-BttT7ZR2.css">
|
||||
</head>
|
||||
|
||||
|
|
|
|||
|
|
@ -173,11 +173,11 @@ export function AuthProvider({ children }: { children: ReactNode }) {
|
|||
console.log('Starting Microsoft authentication...');
|
||||
const response = await instance.loginPopup(loginRequest);
|
||||
|
||||
if (response && response.account && response.accessToken) {
|
||||
if (response && response.account && response.idToken) {
|
||||
console.log('Microsoft authentication successful', response.account);
|
||||
|
||||
// Send the Microsoft access token to our backend
|
||||
const backendResponse = await authApi.loginWithMicrosoft(response.accessToken);
|
||||
// Send the Microsoft ID token to our backend for validation
|
||||
const backendResponse = await authApi.loginWithMicrosoft(response.idToken);
|
||||
|
||||
if (backendResponse.data.access_token) {
|
||||
// Save our backend JWT token and user data
|
||||
|
|
|
|||
|
|
@ -114,8 +114,8 @@ export const authApi = {
|
|||
login: (username: string, password: string) =>
|
||||
api.post('/auth/login', { username, password }),
|
||||
|
||||
loginWithMicrosoft: (accessToken: string) =>
|
||||
api.post('/auth/microsoft', { access_token: accessToken }),
|
||||
loginWithMicrosoft: (idToken: string) =>
|
||||
api.post('/auth/microsoft', { id_token: idToken }),
|
||||
|
||||
register: (username: string, email: string, password: string) =>
|
||||
api.post('/auth/register', { username, email, password }),
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue