hp_chatbot/session_manager.py
michael 594f749d4c Initial commit: HP Marketing Materials GraphRAG Chatbot
Full-stack GraphRAG chatbot for HP marketing materials with:
- Python/Flask backend with custom ReAct agent (LlamaIndex)
- Neo4j knowledge graph + vector search hybrid retrieval
- LlamaParse multimodal document processing (text + images)
- React/Vite frontend with conversation management
- MongoDB conversation persistence
- MSAL authentication support

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 08:37:58 -06:00

162 lines
No EOL
7.4 KiB
Python

# hp_chatbot/session_manager.py
from typing import Dict, Any, Optional
import uuid
import time
from datetime import datetime
# Import MongoDB utilities from the separate file
from mongodb_utils import (
get_db, create_or_update_user, get_user_by_username,
create_conversation, get_conversation, get_conversation_by_id, get_user_conversations,
get_conversation_messages, add_message, update_conversation_title,
generate_conversation_title, delete_conversation,
get_session_state as db_get_session_state, # Rename to avoid conflict
create_session_state as db_create_session_state,
update_session_state as db_update_session_state
)
# Import necessary components from ai_core
# Use forward reference for ReActAgent2 if needed, or import normally if load order allows
from ai_core import global_workflow_agent, ReActAgent2
# Import logging
from utils import log_structured
# --- In-memory Session Cache ---
# Stores session-specific data like associated conversation_id and user_id
# Key: session_id (string), Value: dict {'conversation_id': ObjectId, 'user_id': ObjectId}
# Note: The actual agent *instance* is now global (global_workflow_agent),
# but its *memory* provides conversation context. Resetting the agent's memory
# effectively resets the conversation for that agent instance.
# We still need to map a *frontend session ID* to a *persistent conversation ID* in the DB.
chat_state: Dict[str, Dict[str, Any]] = {}
def get_or_create_session_state(session_id: str, username: Optional[str] = None) -> Dict[str, Any]:
"""
Gets or creates session state, mapping session_id to user and conversation in MongoDB.
Returns a dictionary containing 'conversation_id' and 'user_id'.
The 'workflow_agent' is now global and not stored per session here.
Args:
session_id: The unique identifier from the frontend/client.
username: Optional username for linking to a user.
Returns:
A dictionary like {'conversation_id': ObjectId, 'user_id': ObjectId, 'initialized': bool}
"""
global global_workflow_agent # Access the global agent
# 1. Check in-memory cache first
if session_id in chat_state:
cached_state = chat_state[session_id]
# Ensure it has the necessary keys before returning
if 'conversation_id' in cached_state and 'user_id' in cached_state:
log_structured('debug', f'Session cache hit for {session_id}', {'cached_state': cached_state})
cached_state['initialized'] = global_workflow_agent is not None
return cached_state
else:
log_structured('warning', f'Cached state for {session_id} is incomplete. Re-fetching.', {'cached_state': cached_state})
# Remove incomplete entry and proceed to DB check
del chat_state[session_id]
# 2. Check persistent storage (MongoDB) for this session_id
mongo_session_data = None
try:
mongo_session_data = db_get_session_state(session_id)
except Exception as db_err:
log_structured('error', f'Error accessing MongoDB for session {session_id}: {str(db_err)}')
# Continue with fallback approach
user_id = None
conversation_id = None
if mongo_session_data:
log_structured('info', f'Loaded existing session state from MongoDB for {session_id}', {
'db_data': mongo_session_data # Be careful logging sensitive data
})
try:
user_id = mongo_session_data.get('user_id')
conversation_id = mongo_session_data.get('conversation_id')
except Exception as parse_err:
log_structured('error', f'Error parsing MongoDB session data: {str(parse_err)}')
mongo_session_data = None
# Validate retrieved IDs
if not user_id or not conversation_id:
log_structured('error', f'Incomplete session data found in DB for {session_id}. Recreating.', {'db_data': mongo_session_data})
# Force creation of a new conversation/session state below
mongo_session_data = None # Treat as if not found
# 3. If not found in cache or DB, or if data was invalid, create new state
if not mongo_session_data:
log_structured('info', f'No valid session state found for {session_id}. Creating new.', {'username': username})
# Determine User ID
effective_username = username if username else f"anonymous_{session_id[:8]}"
try:
user_id = create_or_update_user(effective_username)
except Exception as user_err:
log_structured('error', f'Failed to create user in MongoDB: {str(user_err)}')
# Create a temporary ID for in-memory operation
user_id = f"temp_user_{uuid.uuid4().hex}"
log_structured('info', f'Using temporary user ID: {user_id}')
if not user_id:
log_structured('error', f"Failed to create or update user: {effective_username}")
# Create a fallback user ID for in-memory operation
user_id = f"fallback_user_{uuid.uuid4().hex}"
log_structured('info', f'Using fallback user ID: {user_id}')
# Create a new Conversation linked to this user
# Use a default title, it will be updated after the first interaction
new_conv_title = f"New Chat ({datetime.now().strftime('%Y-%m-%d %H:%M')})"
try:
conversation_id = create_conversation(session_id, user_id, title=new_conv_title)
except Exception as conv_err:
log_structured('error', f'Failed to create conversation in MongoDB: {str(conv_err)}')
# Create a temporary conversation ID for in-memory operation
conversation_id = f"temp_conv_{uuid.uuid4().hex}"
log_structured('info', f'Using temporary conversation ID: {conversation_id}')
if not conversation_id:
log_structured('error', f"Failed to create conversation for session {session_id}, user {user_id}")
# Create a fallback conversation ID for in-memory operation
conversation_id = f"fallback_conv_{uuid.uuid4().hex}"
log_structured('info', f'Using fallback conversation ID: {conversation_id}')
# Store the new session state linkage in MongoDB
try:
db_create_session_state(session_id, user_id, conversation_id)
except Exception as session_err:
log_structured('warning', f"Failed to persist new session state link in DB: {str(session_err)}")
# Continue with in-memory operation
log_structured('info', f'Created new conversation and session state link', {
'session_id': session_id, 'user_id': user_id, 'conversation_id': conversation_id
})
# 4. Store in the in-memory cache and return
# We store the DB IDs, not the agent object itself
current_state = {
'initialized': global_workflow_agent is not None,
'conversation_id': conversation_id,
'user_id': user_id
}
chat_state[session_id] = current_state
return current_state
def clear_chat_state_cache(session_id: Optional[str] = None):
""" Clears the in-memory chat state cache. """
global chat_state
if session_id:
if session_id in chat_state:
del chat_state[session_id]
log_structured('info', f'Cleared in-memory cache for session {session_id}')
else:
chat_state = {}
log_structured('info', 'Cleared all in-memory session cache')