Full-stack GraphRAG chatbot for HP marketing materials with: - Python/Flask backend with custom ReAct agent (LlamaIndex) - Neo4j knowledge graph + vector search hybrid retrieval - LlamaParse multimodal document processing (text + images) - React/Vite frontend with conversation management - MongoDB conversation persistence - MSAL authentication support Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
162 lines
No EOL
7.4 KiB
Python
162 lines
No EOL
7.4 KiB
Python
# hp_chatbot/session_manager.py
|
|
|
|
from typing import Dict, Any, Optional
|
|
import uuid
|
|
import time
|
|
from datetime import datetime
|
|
|
|
# Import MongoDB utilities from the separate file
|
|
from mongodb_utils import (
|
|
get_db, create_or_update_user, get_user_by_username,
|
|
create_conversation, get_conversation, get_conversation_by_id, get_user_conversations,
|
|
get_conversation_messages, add_message, update_conversation_title,
|
|
generate_conversation_title, delete_conversation,
|
|
get_session_state as db_get_session_state, # Rename to avoid conflict
|
|
create_session_state as db_create_session_state,
|
|
update_session_state as db_update_session_state
|
|
)
|
|
|
|
# Import necessary components from ai_core
|
|
# Use forward reference for ReActAgent2 if needed, or import normally if load order allows
|
|
from ai_core import global_workflow_agent, ReActAgent2
|
|
|
|
# Import logging
|
|
from utils import log_structured
|
|
|
|
# --- In-memory Session Cache ---
|
|
# Stores session-specific data like associated conversation_id and user_id
|
|
# Key: session_id (string), Value: dict {'conversation_id': ObjectId, 'user_id': ObjectId}
|
|
# Note: The actual agent *instance* is now global (global_workflow_agent),
|
|
# but its *memory* provides conversation context. Resetting the agent's memory
|
|
# effectively resets the conversation for that agent instance.
|
|
# We still need to map a *frontend session ID* to a *persistent conversation ID* in the DB.
|
|
chat_state: Dict[str, Dict[str, Any]] = {}
|
|
|
|
def get_or_create_session_state(session_id: str, username: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Gets or creates session state, mapping session_id to user and conversation in MongoDB.
|
|
Returns a dictionary containing 'conversation_id' and 'user_id'.
|
|
The 'workflow_agent' is now global and not stored per session here.
|
|
|
|
Args:
|
|
session_id: The unique identifier from the frontend/client.
|
|
username: Optional username for linking to a user.
|
|
|
|
Returns:
|
|
A dictionary like {'conversation_id': ObjectId, 'user_id': ObjectId, 'initialized': bool}
|
|
"""
|
|
global global_workflow_agent # Access the global agent
|
|
|
|
# 1. Check in-memory cache first
|
|
if session_id in chat_state:
|
|
cached_state = chat_state[session_id]
|
|
# Ensure it has the necessary keys before returning
|
|
if 'conversation_id' in cached_state and 'user_id' in cached_state:
|
|
log_structured('debug', f'Session cache hit for {session_id}', {'cached_state': cached_state})
|
|
cached_state['initialized'] = global_workflow_agent is not None
|
|
return cached_state
|
|
else:
|
|
log_structured('warning', f'Cached state for {session_id} is incomplete. Re-fetching.', {'cached_state': cached_state})
|
|
# Remove incomplete entry and proceed to DB check
|
|
del chat_state[session_id]
|
|
|
|
|
|
# 2. Check persistent storage (MongoDB) for this session_id
|
|
mongo_session_data = None
|
|
try:
|
|
mongo_session_data = db_get_session_state(session_id)
|
|
except Exception as db_err:
|
|
log_structured('error', f'Error accessing MongoDB for session {session_id}: {str(db_err)}')
|
|
# Continue with fallback approach
|
|
|
|
user_id = None
|
|
conversation_id = None
|
|
|
|
if mongo_session_data:
|
|
log_structured('info', f'Loaded existing session state from MongoDB for {session_id}', {
|
|
'db_data': mongo_session_data # Be careful logging sensitive data
|
|
})
|
|
try:
|
|
user_id = mongo_session_data.get('user_id')
|
|
conversation_id = mongo_session_data.get('conversation_id')
|
|
except Exception as parse_err:
|
|
log_structured('error', f'Error parsing MongoDB session data: {str(parse_err)}')
|
|
mongo_session_data = None
|
|
|
|
# Validate retrieved IDs
|
|
if not user_id or not conversation_id:
|
|
log_structured('error', f'Incomplete session data found in DB for {session_id}. Recreating.', {'db_data': mongo_session_data})
|
|
# Force creation of a new conversation/session state below
|
|
mongo_session_data = None # Treat as if not found
|
|
|
|
# 3. If not found in cache or DB, or if data was invalid, create new state
|
|
if not mongo_session_data:
|
|
log_structured('info', f'No valid session state found for {session_id}. Creating new.', {'username': username})
|
|
|
|
# Determine User ID
|
|
effective_username = username if username else f"anonymous_{session_id[:8]}"
|
|
|
|
try:
|
|
user_id = create_or_update_user(effective_username)
|
|
except Exception as user_err:
|
|
log_structured('error', f'Failed to create user in MongoDB: {str(user_err)}')
|
|
# Create a temporary ID for in-memory operation
|
|
user_id = f"temp_user_{uuid.uuid4().hex}"
|
|
log_structured('info', f'Using temporary user ID: {user_id}')
|
|
|
|
if not user_id:
|
|
log_structured('error', f"Failed to create or update user: {effective_username}")
|
|
# Create a fallback user ID for in-memory operation
|
|
user_id = f"fallback_user_{uuid.uuid4().hex}"
|
|
log_structured('info', f'Using fallback user ID: {user_id}')
|
|
|
|
# Create a new Conversation linked to this user
|
|
# Use a default title, it will be updated after the first interaction
|
|
new_conv_title = f"New Chat ({datetime.now().strftime('%Y-%m-%d %H:%M')})"
|
|
|
|
try:
|
|
conversation_id = create_conversation(session_id, user_id, title=new_conv_title)
|
|
except Exception as conv_err:
|
|
log_structured('error', f'Failed to create conversation in MongoDB: {str(conv_err)}')
|
|
# Create a temporary conversation ID for in-memory operation
|
|
conversation_id = f"temp_conv_{uuid.uuid4().hex}"
|
|
log_structured('info', f'Using temporary conversation ID: {conversation_id}')
|
|
|
|
if not conversation_id:
|
|
log_structured('error', f"Failed to create conversation for session {session_id}, user {user_id}")
|
|
# Create a fallback conversation ID for in-memory operation
|
|
conversation_id = f"fallback_conv_{uuid.uuid4().hex}"
|
|
log_structured('info', f'Using fallback conversation ID: {conversation_id}')
|
|
|
|
# Store the new session state linkage in MongoDB
|
|
try:
|
|
db_create_session_state(session_id, user_id, conversation_id)
|
|
except Exception as session_err:
|
|
log_structured('warning', f"Failed to persist new session state link in DB: {str(session_err)}")
|
|
# Continue with in-memory operation
|
|
|
|
log_structured('info', f'Created new conversation and session state link', {
|
|
'session_id': session_id, 'user_id': user_id, 'conversation_id': conversation_id
|
|
})
|
|
|
|
# 4. Store in the in-memory cache and return
|
|
# We store the DB IDs, not the agent object itself
|
|
current_state = {
|
|
'initialized': global_workflow_agent is not None,
|
|
'conversation_id': conversation_id,
|
|
'user_id': user_id
|
|
}
|
|
chat_state[session_id] = current_state
|
|
|
|
return current_state
|
|
|
|
def clear_chat_state_cache(session_id: Optional[str] = None):
|
|
""" Clears the in-memory chat state cache. """
|
|
global chat_state
|
|
if session_id:
|
|
if session_id in chat_state:
|
|
del chat_state[session_id]
|
|
log_structured('info', f'Cleared in-memory cache for session {session_id}')
|
|
else:
|
|
chat_state = {}
|
|
log_structured('info', 'Cleared all in-memory session cache') |