netflix/routes.py
michael fe0d881341 Speed up GraphRAG startup with triple caching and background init
Server now starts serving vector-only queries in ~1-2 minutes instead of
30-60 minutes. GraphRAG initializes in a background task and its tool is
dynamically added to the agent when ready.

- Cache extracted triples to disk (neo4j_triples.pickle) so Neo4j can be
  repopulated without expensive LLM re-extraction
- Split initialize_global_index() into initialize_vector_index() (fast) and
  initialize_graphrag_components() (background)
- Add graphrag_ready/graphrag_initializing status flags to shared_state
- Launch GraphRAG init as asyncio background task in main.py
- Report GraphRAG status in /status endpoint for frontend awareness
- Add comprehensive migration guide for applying to other projects

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 17:33:19 -06:00

1037 lines
No EOL
57 KiB
Python

# netflix_chatbot/routes.py
import os
import asyncio
import traceback
import uuid
import shutil
import urllib.parse
from datetime import datetime
from pathlib import Path
from flask import (
Flask, request, jsonify, make_response, send_from_directory, send_file,
Blueprint # Consider using Blueprint for larger apps
)
from werkzeug.utils import secure_filename # Keep if file uploads are reintroduced
# Import from our modules
from utils import logger, log_structured, allowed_file # Keep allowed_file if uploads return
from json_utils import CustomJSONEncoder # Used indirectly by make_response/jsonify with custom provider
from session_manager import get_or_create_session_state, clear_chat_state_cache
from ai_core import initialize_global_index, initialize_vector_index # Import initialization functions
from shared_state import global_workflow_agent, global_index, is_agent_available, get_graphrag_status # Import shared state variables
from document_generator import create_brief_docx
from config import IMAGES_DIRECTORY, NETFLIX_DOCS_FOLDER, BASE_DIR, SUPPORTING_FILES_DIR # Import necessary configs
from llama_index.core.tools import ToolOutput # Import ToolOutput for type checking
from llama_index.core.llms import ChatMessage # Import ChatMessage for memory management
# Import MongoDB utilities
from mongodb_utils import (
add_message, get_conversation_messages, get_user_conversations,
get_conversation_by_id, create_conversation, create_or_update_user,
delete_conversation as db_delete_conversation, # Alias to avoid conflict
generate_conversation_title, update_session_state
)
# Create a Blueprint or define routes on the app passed from main.py
# Using direct app registration for simplicity here.
# If using Blueprint:
# api_bp = Blueprint('api', __name__)
# @api_bp.route('/chat', methods=['POST', 'OPTIONS'])
# async def chat(): ...
# In main.py: app.register_blueprint(api_bp)
def register_routes(app: Flask):
@app.route('/chat', methods=['POST', 'OPTIONS'])
async def chat():
"""Handle chat messages, interact with the AI agent, include image handling."""
# Declare global variables we'll need to access
global global_workflow_agent
if request.method == 'OPTIONS':
# Pre-flight request. Reply successfully:
resp = make_response(jsonify({'status': 'ok'}))
# Add necessary CORS headers if not handled globally by Flask-CORS middleware
# resp.headers['Access-Control-Allow-Origin'] = request.headers.get('Origin')
# resp.headers['Access-Control-Allow-Credentials'] = 'true'
# resp.headers['Access-Control-Allow-Methods'] = 'POST, OPTIONS'
# resp.headers['Access-Control-Allow-Headers'] = 'Content-Type, X-MS-USERNAME, Authorization' # Add any custom headers
return resp
session_id = None # Initialize for error logging
try:
data = request.get_json()
if not data or 'message' not in data:
log_structured('warning', 'Chat request missing message payload')
return jsonify({'error': 'No message provided'}), 400
session_id = data.get('sessionId')
if not session_id:
log_structured('warning', 'Chat request missing sessionId')
return jsonify({'error': 'Session ID is required'}), 400
# Use X-MS-USERNAME or a similar header for authenticated username
username = request.headers.get('X-MS-USERNAME') # Adjust header name if different
# Check if we're in development mode
if not username and os.environ.get("PRODUCTION", "false").lower() == "false":
# For local development, create a default username
username = f"dev_user@local"
log_structured('info', f'Development mode: Using default username: {username}')
elif not username:
# Allow anonymous access based on session or return error?
# For now, allow anonymous tied to session_id
log_structured('info', f'Chat request from anonymous user (session: {session_id})')
# username = f"anonymous_{session_id[:8]}" # Or handle as error if auth is mandatory
# return jsonify({'error': 'Authentication required (X-MS-USERNAME header missing)'}), 401
log_structured('info', f'Chat request received', {'session_id': session_id, 'username': username or 'anonymous'})
# Get or create session state (maps session_id to user_id, conversation_id)
session_state = get_or_create_session_state(session_id, username)
# Ensure session is initialized in both development and production modes
if not session_state.get('initialized'):
log_structured('info', 'Session not yet initialized. Initializing now.', {'session_id': session_id})
# Initialize session properly
session_state['initialized'] = True
# Update the session state in the database to mark as initialized
update_session_state(session_id, {'initialized': True})
conversation_id = session_state.get('conversation_id')
user_id = session_state.get('user_id') # Get user_id for context if needed
if not conversation_id or not user_id:
log_structured('error', 'Session state missing conversation_id or user_id', {'session_id': session_id, 'state': session_state})
return jsonify({'error': 'Session setup incomplete. Please refresh or start a new chat.'}), 500
user_message = data['message']
# Store user message
add_message(conversation_id, 'user', user_message)
log_structured('debug', 'Starting agent workflow run', {
'session_id': session_id,
'conversation_id': conversation_id,
'user_id': user_id,
'message_preview': user_message[:100]
})
# Check if global agent is available using the shared state helper
if not is_agent_available():
log_structured('error', 'Global workflow agent is not initialized')
# Try to initialize it on-demand if not available (vector-only for speed)
try:
log_structured('info', 'Attempting to initialize vector index and agent on-demand')
index_success = await initialize_vector_index()
# Let initialization complete, then check the global variable
if index_success:
log_structured('info', 'Index initialization reported success')
# Force reload the global module variable
import importlib
import ai_core
importlib.reload(ai_core)
from ai_core import global_workflow_agent
# Check if global_workflow_agent is now available
if global_workflow_agent:
# Test the agent with a simple function call
try:
# Just check if the run method exists and is callable
if hasattr(global_workflow_agent, 'run') and callable(global_workflow_agent.run):
log_structured('info', 'Successfully initialized global agent on-demand')
else:
log_structured('error', 'Agent initialized but run method is not callable')
except Exception as agent_test_err:
log_structured('error', f'Agent functional test failed: {str(agent_test_err)}')
else:
log_structured('error', 'Agent still not available after successful index initialization')
else:
log_structured('error', 'Index initialization reported failure')
except Exception as init_err:
log_structured('error', f'Error initializing agent on-demand: {str(init_err)}')
# Final check
import ai_core
# We're already using the global variable from the function declaration
# Just reassign it from the imported module
global_workflow_agent = ai_core.global_workflow_agent
if not global_workflow_agent:
log_structured('critical', 'Failed to get or initialize global workflow agent')
# Provide a more detailed error message for debugging
error_details = {
'error': 'Chat agent is not available. Please try again later.',
'details': 'The AI agent failed to initialize properly. This might be due to API key issues or other backend problems.',
'code': 'AGENT_UNAVAILABLE'
}
return jsonify(error_details), 503
# Use the agent to process the query (with one final check to be sure)
if not is_agent_available():
log_structured('critical', 'Global agent is unavailable after initialization checks')
error_details = {
'error': 'Chat agent became unavailable. Please try again.',
'code': 'AGENT_DISAPPEARED'
}
return jsonify(error_details), 500
# Import fresh reference to agent
from shared_state import global_workflow_agent
if global_workflow_agent is None:
log_structured('critical', 'Global agent is None after direct import from shared_state')
error_details = {
'error': 'Chat agent became unavailable. Please try again.',
'code': 'AGENT_DISAPPEARED'
}
return jsonify(error_details), 500
log_structured('info', 'Running agent to process query')
# Get a fresh reference to the agent from shared state
from shared_state import global_workflow_agent as current_agent
# Double-check that the agent is available
if current_agent is None or not hasattr(current_agent, 'run'):
log_structured('critical', 'Agent disappeared at runtime: null check before .run() call')
return jsonify({'error': 'Chat agent became unavailable just before processing. Please try again.', 'code': 'AGENT_GONE'}), 500
# Load conversation history into the agent's memory
if conversation_id:
try:
conv_messages = get_conversation_messages(conversation_id)
# Skip if we already have messages in memory (avoid duplication)
if current_agent.memory and len(current_agent.memory.get()) <= 1:
log_structured('info', f'Loading conversation history: {len(conv_messages)} messages',
{'conversation_id': conversation_id})
# Clear memory first to avoid duplicate history
current_agent.memory.reset()
# Add messages to memory in the correct order
for msg in conv_messages:
# Skip the current message which will be added by the agent
if msg.get('content') == user_message and msg.get('role') == 'user':
continue
role = msg.get('role')
content = msg.get('content', '')
if role and content:
current_agent.memory.put(ChatMessage(role=role, content=content))
except Exception as hist_err:
log_structured('error', f'Error loading conversation history: {str(hist_err)}',
{'conversation_id': conversation_id})
# Continue without history rather than failing the request
# Call the run method on the current agent
raw_response = await current_agent.run(user_message)
log_structured('debug', 'Raw response received from workflow agent', {
'session_id': session_id, 'conversation_id': conversation_id,
'response_type': type(raw_response).__name__,
'response_keys': list(raw_response.keys()) if isinstance(raw_response, dict) else None,
})
# --- Process Response and Extract Images ---
final_response_text = "Sorry, I encountered an issue generating a response."
final_sources = []
final_reasoning = []
images = []
seen_images = set() # Track unique images added
if isinstance(raw_response, dict):
final_response_text = str(raw_response.get('response', ''))
response_sources = raw_response.get('sources', []) # Should be list of ToolOutput
final_reasoning = raw_response.get('reasoning', []) # Should be list of ReasoningSteps
log_structured('debug', f'Processing {len(response_sources)} sources for metadata and images', {'session_id': session_id})
# Extract source details and potential images
if response_sources and isinstance(response_sources, list):
for source_idx, tool_output_source in enumerate(response_sources):
if tool_output_source is None:
continue
source_info = {'content': "Source content unavailable", 'metadata': {}} # Default
if isinstance(tool_output_source, ToolOutput):
source_info['content'] = str(tool_output_source.content) if tool_output_source.content else "No content"
source_info['tool_name'] = getattr(tool_output_source, 'tool_name', 'unknown_tool')
source_info['raw_output_preview'] = str(tool_output_source.raw_output)[:100] if hasattr(tool_output_source, 'raw_output') and tool_output_source.raw_output else "N/A"
# Log GraphRAG retrieval information if present
if getattr(tool_output_source, 'tool_name', '') == 'GraphRAG':
raw_output = getattr(tool_output_source, 'raw_output', {})
if isinstance(raw_output, dict):
# Extract and log vector and GraphRAG contexts
vector_context = raw_output.get('vector_context', '')
graphrag_context = raw_output.get('graphrag_context', '')
# Add retrieval source information to the log
log_structured('info', 'GraphRAG Retrieval Results', {
'session_id': session_id,
'vector_retrieval_size': len(vector_context),
'graphrag_retrieval_size': len(graphrag_context),
'vector_retrieval_preview': vector_context[:200] + '...' if len(vector_context) > 200 else vector_context,
'graphrag_retrieval_preview': graphrag_context[:200] + '...' if len(graphrag_context) > 200 else graphrag_context,
'community_ids': raw_output.get('community_ids', [])
})
# Add context source information to the source_info
source_info['retrieval_method'] = 'graphrag_hybrid'
source_info['vector_context_length'] = len(vector_context)
source_info['graphrag_context_length'] = len(graphrag_context)
source_info['used_communities'] = raw_output.get('community_ids', [])
else:
# Standard vector retrieval
source_info['retrieval_method'] = 'vector_only'
# Try accessing source_nodes from raw_output if it exists
raw_output = getattr(tool_output_source, 'raw_output', None)
if raw_output and hasattr(raw_output, 'source_nodes'):
nodes_metadata = []
# Safely iterate over source_nodes
source_nodes = getattr(raw_output, 'source_nodes', [])
if source_nodes:
for node_idx, node_with_score in enumerate(source_nodes):
node = getattr(node_with_score, 'node', None)
if node and hasattr(node, 'metadata'):
node_meta = node.metadata or {}
nodes_metadata.append(node_meta) # Collect metadata for source info
# Check for images within this node's metadata
node_image_paths = node_meta.get('image_paths', [])
if isinstance(node_image_paths, list):
for img_filename in node_image_paths:
if img_filename and isinstance(img_filename, str) and img_filename not in seen_images:
# Ensure IMAGES_DIRECTORY is a Path object
image_dir = Path(IMAGES_DIRECTORY)
image_full_path = image_dir / img_filename
if image_full_path.is_file():
# Extract doc name/page from filename (adjust parsing as needed)
doc_name = node_meta.get('filename', 'Unknown Document')
page_num = node_meta.get('source_page', 'Unknown Page')
# URL encode the filename for the frontend
url_encoded_filename = urllib.parse.quote(img_filename)
image_info = {
'filename': img_filename,
'url_encoded_filename': url_encoded_filename,
'document': doc_name,
'page': page_num,
'source': doc_name # Redundant? Maybe use tool name?
}
images.append(image_info)
seen_images.add(img_filename)
# Get node ID safely
node_id = getattr(node, 'id_', 'unknown_node')
log_structured('info', f'Added image from node metadata: {img_filename}', {'session_id': session_id, 'node_id': node_id})
else:
log_structured('warning', f'Image file from metadata not found: {img_filename}', {'session_id': session_id, 'path_checked': str(image_full_path)})
else:
if node_image_paths: # Log if it exists but isn't a list
node_id = getattr(node, 'id_', 'unknown_node')
log_structured('warning', f"'image_paths' in node metadata is not a list.", {'session_id': session_id, 'node_id': node_id, 'metadata_value': str(node_image_paths)})
# Add collected node metadata (or tool metadata) to the source info
# Initialize nodes_metadata if it doesn't exist
if not locals().get('nodes_metadata'):
nodes_metadata = []
source_info['metadata'] = {'nodes': nodes_metadata} if nodes_metadata else getattr(tool_output_source, 'metadata', {})
else:
log_structured('warning', f'Source element is not a ToolOutput object', {'type': type(tool_output_source).__name__})
source_info['content'] = str(tool_output_source) # Fallback
# Add the processed source info regardless of type
final_sources.append(source_info)
else:
# Handle cases where agent might return just a string (less ideal)
final_response_text = str(raw_response)
log_structured('warning', 'Agent response was not a dict, sources/reasoning unavailable.', {'response_type': type(raw_response).__name__})
log_structured('info', f'Final image count for response: {len(images)}', {'session_id': session_id, 'images': images})
# Structure the response to be sent to the frontend
processed_response = {
'response': final_response_text,
'sources': final_sources, # Use the processed sources
'reasoning': final_reasoning, # Use the extracted reasoning steps
'images': images # Use the validated list of image info dicts
}
# Store assistant response in MongoDB
# Note: Ensure final_sources and final_reasoning are serializable by CustomJSONEncoder
add_message(
conversation_id,
'assistant',
final_response_text,
sources=final_sources,
reasoning=final_reasoning, # Pass reasoning steps
images=images # Pass image info list
)
# Update conversation title (if needed)
conv_messages = get_conversation_messages(conversation_id)
# Generate title after first user/assistant pair usually
if conv_messages and len(conv_messages) == 2:
try:
new_title = generate_conversation_title(conversation_id, conv_messages)
log_structured('info', f'Generated initial conversation title: "{new_title}"', {'conversation_id': conversation_id})
except Exception as title_err:
log_structured('error', f'Error generating conversation title: {title_err}', {'conversation_id': conversation_id})
# Non-fatal error
# Use make_response to ensure custom JSON encoder is used
response = make_response(jsonify({
'status': 'success',
'data': processed_response
}))
response.headers['Content-Type'] = 'application/json'
return response
except asyncio.TimeoutError:
log_structured('error', 'Chat request timed out', {'session_id': session_id})
return jsonify({'status': 'error', 'error': 'Request timed out', 'detail': 'The request took too long to process.'}), 504 # Gateway Timeout
except Exception as e:
log_structured('error', 'Unhandled error in /chat endpoint', {
'session_id': session_id, 'error': str(e), 'traceback': traceback.format_exc()
})
response = make_response(jsonify({
'status': 'error', 'error': 'An internal server error occurred.', 'detail': str(e)
}), 500)
response.headers['Content-Type'] = 'application/json'
return response
@app.route('/images/<path:filename>', methods=['GET'])
def serve_image(filename):
"""Serve an image file from the designated image directory."""
# Log the requested image
log_structured('debug', f'Image request received for: {filename}')
# For filenames with spaces and special characters, don't use secure_filename
# Just check for basic path traversal attempts
if '..' in filename or filename.startswith('/'):
log_structured('warning', 'Possible path traversal attempt', {'requested': filename})
return jsonify({'error': 'Invalid filename'}), 400
try:
# Try the main images directory first (using raw filename)
image_path = os.path.join(IMAGES_DIRECTORY, filename)
if os.path.exists(image_path) and os.path.isfile(image_path):
log_structured('debug', f'Serving image from IMAGES_DIRECTORY: {filename}')
return send_from_directory(IMAGES_DIRECTORY, filename, as_attachment=False)
# If not found, check for public images in the chat interface public directory
public_images_dir = BASE_DIR / "chat-interface" / "public" / "images"
public_image_path = os.path.join(public_images_dir, filename)
if os.path.exists(public_image_path) and os.path.isfile(public_image_path):
log_structured('debug', f'Found image in public directory: {filename}')
return send_from_directory(public_images_dir, filename, as_attachment=False)
# If still not found, look for the image in supporting files
supporting_images_dir = SUPPORTING_FILES_DIR
supporting_image_path = os.path.join(supporting_images_dir, filename)
if os.path.exists(supporting_image_path) and os.path.isfile(supporting_image_path):
log_structured('debug', f'Found image in supporting files: {filename}')
return send_from_directory(supporting_images_dir, filename, as_attachment=False)
# Try with URL-decoded filename
import urllib.parse
decoded_filename = urllib.parse.unquote(filename)
if decoded_filename != filename:
decoded_image_path = os.path.join(IMAGES_DIRECTORY, decoded_filename)
if os.path.exists(decoded_image_path) and os.path.isfile(decoded_image_path):
log_structured('debug', f'Serving image with decoded filename: {decoded_filename}')
return send_from_directory(IMAGES_DIRECTORY, decoded_filename, as_attachment=False)
log_structured('error', f'Image not found in any directory: {filename}')
return jsonify({'error': 'Image not found'}), 404
except Exception as e:
log_structured('error', f'Error serving image {filename}', {'error': str(e)})
return jsonify({'error': 'Server error serving image'}), 500
@app.route('/list-images', methods=['GET'])
def list_images():
"""List all available images in the images directory."""
try:
if not os.path.exists(IMAGES_DIRECTORY):
log_structured('error', 'Images directory does not exist', {'path': str(IMAGES_DIRECTORY)})
return jsonify({'status': 'error', 'message': 'Images directory not found'}), 404
image_files = []
valid_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.webp') # Add more if needed
# Use os.listdir which is more reliable with special characters
for filename in os.listdir(IMAGES_DIRECTORY):
file_path = os.path.join(IMAGES_DIRECTORY, filename)
if os.path.isfile(file_path) and os.path.splitext(filename)[1].lower() in valid_extensions:
try:
stat_info = os.stat(file_path)
image_files.append({
'filename': filename,
'url': f'/images/{urllib.parse.quote(filename)}', # URL encode for safety
'size': stat_info.st_size,
'created': datetime.fromtimestamp(stat_info.st_ctime).isoformat()
})
except Exception as stat_err:
log_structured('error', f'Could not get stats for image file: {filename}', {'error': str(stat_err)})
# Sort by creation time, newest first
image_files.sort(key=lambda x: x['created'], reverse=True)
return jsonify({
'status': 'success',
'count': len(image_files),
'images': image_files
})
except Exception as e:
log_structured('error', 'Error listing images', {'error': str(e), 'traceback': traceback.format_exc()})
return jsonify({'status': 'error', 'message': 'Failed to list images'}), 500
# Example route for manual testing (consider removing in production)
@app.route('/capture-screenshot', methods=['POST'])
async def capture_screenshot():
"""Manually trigger LlamaParse image capture for a specific file (for testing)."""
# This uses parts of ai_core.process_documents_in_directory - refactor might be needed
# For simplicity, we'll re-implement the image parsing part here
# WARNING: This endpoint allows triggering processing on arbitrary paths if not secured.
# Consider adding authentication/authorization or removing it.
try:
data = request.get_json()
file_path_str = data.get('file_path') if data else None
if not file_path_str:
return jsonify({'status': 'error', 'message': 'Missing file_path'}), 400
# Basic check: Ensure the file is within the expected docs folder for safety
target_path = Path(file_path_str).resolve()
allowed_dir = NETFLIX_DOCS_FOLDER.resolve()
if not str(target_path).startswith(str(allowed_dir)):
log_structured('warning', 'Attempt to access file outside allowed directory', {'requested': file_path_str, 'allowed': str(allowed_dir)})
return jsonify({'status': 'error', 'message': 'File path is not allowed'}), 403
if not target_path.is_file():
return jsonify({'status': 'error', 'message': f'File not found: {file_path_str}'}), 404
# --- Re-initialize LlamaParse for image capture ---
parser_images = None
image_results = []
temp_dir = None
try:
from config import LLAMA_PARSE_VENDOR_MODEL, LLAMA_PARSE_MAX_TIMEOUT
import httpx
from llama_parse import LlamaParse # Local import for clarity
custom_client_images = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT)
parser_images = LlamaParse(
result_type="markdown", add_page_breaks=False,
system_prompt="Generate page images.",
use_vendor_multimodal_model=True,
vendor_multimodal_model_name=LLAMA_PARSE_VENDOR_MODEL,
premium_mode=False, max_timeout=LLAMA_PARSE_MAX_TIMEOUT,
custom_client=custom_client_images, verbose=True
)
log_structured('info', f'Attempting to capture images from file: {file_path_str} (Manual Trigger)')
temp_dir = IMAGES_DIRECTORY / f"temp_manual_{uuid.uuid4().hex[:8]}"
os.makedirs(temp_dir, exist_ok=True)
# --- Execute image parsing (sync methods in thread) ---
loop = asyncio.get_running_loop()
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as pool:
md_json_objs = await loop.run_in_executor(pool, parser_images.get_json_result, str(target_path))
image_dicts = await loop.run_in_executor(pool, parser_images.get_images, md_json_objs, str(temp_dir))
# Ensure image_dicts is not None and is a list
image_dicts = image_dicts or []
log_structured('info', f'Manual capture: LlamaParse reported {len(image_dicts)} images for {target_path.name}')
# --- Process and save images ---
if isinstance(image_dicts, list): # Safety check
for idx, img_info in enumerate(image_dicts):
if not isinstance(img_info, dict):
log_structured('warning', f'Image info is not a dict: {img_info}')
continue
page_index_0_based = img_info.get('page', idx)
page_num_1_based = page_index_0_based + 1
source_img_path_str = img_info.get('path')
if source_img_path_str and os.path.exists(source_img_path_str):
source_img_path = Path(source_img_path_str)
try:
image_filename = f"{target_path.stem}_manual_page{page_num_1_based}_{uuid.uuid4().hex[:6]}.png"
dest_path = IMAGES_DIRECTORY / image_filename
shutil.copy2(source_img_path, dest_path)
image_results.append({
'success': True, 'page_number': page_num_1_based,
'filename': image_filename, 'url': f'/images/{image_filename}'
})
log_structured('info', f'Saved manual test image for page {page_num_1_based}', {'path': str(dest_path)})
except Exception as img_err:
log_structured('error', f'Failed to save manual test image page {page_num_1_based}', {'error': str(img_err)})
image_results.append({'success': False, 'page_number': page_num_1_based, 'error': str(img_err)})
else:
image_results.append({'success': False, 'page_number': page_num_1_based, 'error': 'No image path found or path invalid'})
except Exception as parse_err:
log_structured('error', f'Error during manual image capture for {file_path_str}', {'error': str(parse_err), 'traceback': traceback.format_exc()})
return jsonify({'status': 'error', 'message': f'Error processing file: {parse_err}'}), 500
finally:
# Clean up temporary directory
if temp_dir and os.path.exists(temp_dir):
try: shutil.rmtree(temp_dir)
except Exception as clean_err: log_structured('error', 'Error cleaning up manual capture temp dir', {'error': str(clean_err)})
# Close LlamaParse client
if parser_images and hasattr(parser_images, 'aclose'):
try: await parser_images.aclose()
except Exception: pass
return jsonify({
'status': 'success',
'message': f'Processed file {target_path.name}, attempted to capture {len(image_dicts or [])} images.',
'image_results': image_results
})
except Exception as e:
log_structured('error', 'Error in /capture-screenshot endpoint', {'error': str(e), 'traceback': traceback.format_exc()})
return jsonify({'status': 'error', 'message': f'Internal server error: {str(e)}'}), 500
@app.route('/status', methods=['GET'])
def get_status():
"""Get the current status of the chat system and session initialization."""
# Use shared state module instead of globals
session_id = request.args.get('sessionId')
# SUPER SIMPLIFIED - ALWAYS RETURN INITIALIZED
log_structured('info', 'Status check: ALWAYS returning initialized=true')
graphrag_status = get_graphrag_status()
status_data = {
'global_status': 'initialized',
'initialized': True,
'is_initialized': True,
'timestamp': datetime.now().isoformat(),
'override': 'FORCE_INITIALIZED',
'graphrag_ready': graphrag_status['ready'],
'graphrag_initializing': graphrag_status['initializing'],
'graphrag_error': graphrag_status['error'],
}
if session_id:
# Optionally check if this specific session is known (cached or in DB)
try:
# Use the session manager, but don't force creation if it doesn't exist
state = get_or_create_session_state(session_id, username=None) # Check existing without forcing creation
# Check if session exists and has the required fields
session_exists = state and state.get('conversation_id')
status_data['session_status'] = 'found_or_created' if session_exists else 'not_found'
status_data['session_id_checked'] = session_id
# Add session-specific info for the frontend with forced initialization
if session_exists:
# Make sure the session is always marked as initialized in the DB
if not state.get('initialized', False):
try:
# Update session state to force initialization
update_session_state(session_id, {'initialized': True})
log_structured('info', f'Forcing session initialization for {session_id}')
except Exception as update_err:
log_structured('error', f'Error updating session state: {str(update_err)}')
# Always report session as initialized to the frontend
status_data['session_initialized'] = True
except Exception as sess_err:
status_data['session_status'] = 'error_checking'
status_data['session_error'] = str(sess_err)
else:
status_data['session_status'] = 'not_checked'
return jsonify(status_data)
@app.route('/debug-status', methods=['GET'])
def debug_status():
"""Enhanced debug endpoint to check system state."""
# This is a development-only endpoint; disable in production
if os.environ.get("PRODUCTION", "false").lower() == "true":
return jsonify({'error': 'Debug endpoints not available in production'}), 403
from importlib import import_module
import sys
# Get module info
modules = {
'shared_state': {'loaded': 'shared_state' in sys.modules},
'ai_core': {'loaded': 'ai_core' in sys.modules}
}
# Get global state values
shared_state_globals = {}
try:
import shared_state
shared_state_globals = {
'agent_none': shared_state.global_workflow_agent is None,
'index_none': shared_state.global_index is None,
'graph_store_none': shared_state.global_graph_store is None,
'agent_available': shared_state.is_agent_available(),
'agent_type': type(shared_state.global_workflow_agent).__name__ if shared_state.global_workflow_agent else None,
'agent_has_run': hasattr(shared_state.global_workflow_agent, 'run') if shared_state.global_workflow_agent else False
}
except Exception as e:
shared_state_globals = {'error': str(e)}
# Try re-importing
reimport_result = {}
try:
import importlib
importlib.reload(shared_state)
reimport_result['shared_state_reloaded'] = True
reimport_result['agent_none_after_reload'] = shared_state.global_workflow_agent is None
reimport_result['agent_available_after_reload'] = shared_state.is_agent_available()
except Exception as e:
reimport_result = {'error': str(e)}
# Get routes.py values (local imports)
local_values = {
'global_workflow_agent_none': global_workflow_agent is None,
'global_index_none': global_index is None,
'is_agent_available': is_agent_available()
}
return jsonify({
'modules': modules,
'shared_state_globals': shared_state_globals,
'reimport_result': reimport_result,
'local_values': local_values,
'os_environ': {k: v for k, v in os.environ.items() if not k.startswith('OPENAI') and not k.startswith('SECRET')}
})
@app.route('/reinitialize', methods=['POST'])
async def reinitialize_agent():
"""Force reinitialization of the agent."""
# This is a development-only endpoint; disable in production
if os.environ.get("PRODUCTION", "false").lower() == "true":
return jsonify({'error': 'Debug endpoints not available in production'}), 403
from ai_core import initialize_global_index
try:
# Log current state before reinitializing
log_structured('info', 'Reinitializing global agent', {
'agent_available_before': is_agent_available(),
'agent_none_before': global_workflow_agent is None,
'index_none_before': global_index is None
})
# Attempt full reinitialize (vector + GraphRAG)
success = await initialize_global_index()
# Import fresh state after initialization
import importlib
import shared_state
importlib.reload(shared_state)
# Check state after reinitialization
agent_available = shared_state.is_agent_available()
return jsonify({
'success': success,
'agent_available': agent_available,
'agent_none': shared_state.global_workflow_agent is None,
'index_none': shared_state.global_index is None,
'message': 'Agent reinitialization attempted'
})
except Exception as e:
log_structured('error', 'Error during agent reinitialization', {
'error': str(e),
'traceback': traceback.format_exc()
})
return jsonify({
'success': False,
'error': str(e),
'message': 'Error during reinitialization'
}), 500
@app.route('/reset', methods=['POST'])
async def reset_chat():
"""Resets the agent's chat memory for the *current global agent*."""
# NOTE: This resets the memory of the SINGLE global agent instance.
# If multiple users are hitting this simultaneously, they reset the same memory.
# True multi-user requires separate agent instances or more sophisticated memory management.
# This endpoint might be more suitable for single-user testing or demos.
# For persistent multi-conversation, rely on switching conversation_id via the frontend.
session_id = None
try:
data = request.get_json()
session_id = data.get('sessionId') # Get session ID for logging context
if not session_id:
return jsonify({'error': 'Session ID is required for context'}), 400
log_structured('warning', f'Received request to reset GLOBAL agent memory', {'triggering_session_id': session_id})
if not is_agent_available():
return jsonify({'error': 'Chat agent not initialized'}), 500
try:
# Reset the memory of the global agent instance
# Check if the agent has a reset method, otherwise reset the memory directly
if hasattr(global_workflow_agent, 'reset'):
global_workflow_agent.reset()
elif hasattr(global_workflow_agent, 'memory') and hasattr(global_workflow_agent.memory, 'reset'):
global_workflow_agent.memory.reset()
else:
# Import necessary classes to reinitialize memory if needed
from llama_index.core.memory import ChatMemoryBuffer
global_workflow_agent.memory = ChatMemoryBuffer.from_defaults(llm=global_workflow_agent.llm)
# Clear the in-memory cache for this specific session, forcing a DB reload on next request
clear_chat_state_cache(session_id)
log_structured('info', f'Global agent memory reset triggered by session {session_id}')
return jsonify({'status': 'success', 'message': 'Global agent chat memory has been reset.'})
except Exception as e:
log_structured('error', f'Error resetting global agent memory', {'session_id': session_id, 'error': str(e)})
return jsonify({'error': f'Failed to reset agent memory: {str(e)}'}), 500
except Exception as e:
log_structured('error', 'Error in /reset endpoint', {'session_id': session_id, 'error': str(e), 'traceback': traceback.format_exc()})
return jsonify({'error': str(e)}), 500
# --- Conversation Management Endpoints ---
@app.route('/conversations', methods=['GET'])
def get_conversations():
"""Get all active conversations for the authenticated user."""
username = request.headers.get('X-MS-USERNAME')
# Check if we're in development mode
if not username and os.environ.get("PRODUCTION", "false").lower() == "false":
# For local development, create a default username
username = "dev_user@local"
log_structured('info', f'Development mode: Using default username: {username} for conversations')
elif not username:
return jsonify({'error': 'Authentication required'}), 401
try:
user_id = create_or_update_user(username) # Ensures user exists
if not user_id:
return jsonify({'error': 'User not found or could not be created'}), 404
# Fetch only active conversations (is_deleted=False or field doesn't exist)
conversations = get_user_conversations(user_id)
formatted_conversations = []
for conv in conversations:
formatted_conversations.append({
'id': str(conv['_id']),
'title': conv.get('title', 'Untitled Conversation'),
'created_at': conv.get('created_at', datetime.min).isoformat(),
'last_updated': conv.get('last_updated', datetime.min).isoformat(),
'session_id': conv.get('session_id', None) # Include session_id if stored
})
# Sort by last updated time, newest first
formatted_conversations.sort(key=lambda x: x['last_updated'], reverse=True)
return jsonify({'status': 'success', 'conversations': formatted_conversations})
except Exception as e:
log_structured('error', 'Error getting user conversations', {'username': username, 'error': str(e), 'traceback': traceback.format_exc()})
return jsonify({'error': 'Failed to retrieve conversations'}), 500
@app.route('/conversations/<conversation_id_str>/messages', methods=['GET'])
def get_conversation_msgs(conversation_id_str):
"""Get all messages for a specific conversation."""
username = request.headers.get('X-MS-USERNAME')
# Check if we're in development mode
if not username and os.environ.get("PRODUCTION", "false").lower() == "false":
# For local development, create a default username
username = "dev_user@local"
log_structured('info', f'Development mode: Using default username: {username} for conversation messages')
elif not username:
return jsonify({'error': 'Authentication required'}), 401
try:
# Validate conversation ID format if needed (e.g., check if valid ObjectId string)
# from bson import ObjectId
# try:
# conv_obj_id = ObjectId(conversation_id_str)
# except:
# return jsonify({'error': 'Invalid conversation ID format'}), 400
# Get conversation details (includes auth check implicitly if needed)
conversation = get_conversation_by_id(conversation_id_str)
if not conversation:
log_structured('warning', 'Conversation not found', {'conversation_id': conversation_id_str, 'username': username})
return jsonify({'error': 'Conversation not found'}), 404
# Optional: Verify user owns this conversation
user_id = create_or_update_user(username)
if conversation.get('user_id') != user_id:
log_structured('warning', 'User attempted to access unauthorized conversation', {'conversation_id': conversation_id_str, 'username': username})
return jsonify({'error': 'Forbidden'}), 403
messages = get_conversation_messages(conversation_id_str)
formatted_messages = []
for msg in messages:
formatted_messages.append({
'id': str(msg['_id']),
'role': msg.get('role'),
'content': msg.get('content'),
'timestamp': msg.get('timestamp', datetime.min).isoformat(),
'sources': msg.get('sources', []), # Ensure these are serializable
'reasoning': msg.get('reasoning', []),# Ensure these are serializable
'images': msg.get('images', [])
})
return jsonify({
'status': 'success',
'conversation_title': conversation.get('title', 'Untitled'),
'messages': formatted_messages
})
except Exception as e:
log_structured('error', 'Error getting conversation messages', {'conversation_id': conversation_id_str, 'username': username, 'error': str(e), 'traceback': traceback.format_exc()})
return jsonify({'error': 'Failed to retrieve messages'}), 500
@app.route('/conversations/new', methods=['POST'])
def create_new_conversation_endpoint():
"""Creates a new, empty conversation for the user."""
username = request.headers.get('X-MS-USERNAME')
# Check if we're in development mode
if not username and os.environ.get("PRODUCTION", "false").lower() == "false":
# For local development, create a default username
username = "dev_user@local"
log_structured('info', f'Development mode: Using default username: {username} for new conversation')
elif not username:
return jsonify({'error': 'Authentication required'}), 401
try:
user_id = create_or_update_user(username)
if not user_id:
return jsonify({'error': 'User not found or could not be created'}), 404
# Create a placeholder session ID (might not be used directly by frontend, but good for DB link)
session_id = f"manual_new_conv_{uuid.uuid4().hex[:8]}"
# Create conversation with a default title
new_conv_title = f"New Chat ({datetime.now().strftime('%Y-%m-%d %H:%M')})"
conversation_id = create_conversation(session_id, user_id, title=new_conv_title)
if not conversation_id:
return jsonify({'error': 'Failed to create conversation'}), 500
log_structured('info', 'New conversation created manually', {'username': username, 'conversation_id': conversation_id})
# Return the ID and title of the newly created conversation
return jsonify({
'status': 'success',
'conversation_id': str(conversation_id),
'title': new_conv_title
})
except Exception as e:
log_structured('error', 'Error creating new conversation', {'username': username, 'error': str(e), 'traceback': traceback.format_exc()})
return jsonify({'error': 'Failed to create conversation'}), 500
@app.route('/conversations/<conversation_id_str>', methods=['DELETE'])
def delete_conversation_route(conversation_id_str):
"""Deletes (soft by default) a specific conversation."""
username = request.headers.get('X-MS-USERNAME')
# Check if we're in development mode
if not username and os.environ.get("PRODUCTION", "false").lower() == "false":
# For local development, create a default username
username = "dev_user@local"
log_structured('info', f'Development mode: Using default username: {username} for deleting conversation')
elif not username:
return jsonify({'error': 'Authentication required'}), 401
hard_delete = request.args.get('hard_delete', 'false').lower() == 'true'
try:
conversation = get_conversation_by_id(conversation_id_str)
if not conversation:
return jsonify({'error': 'Conversation not found'}), 404
# Verify user owns this conversation
user_id = create_or_update_user(username)
if conversation.get('user_id') != user_id:
return jsonify({'error': 'Forbidden'}), 403
success = db_delete_conversation(conversation_id_str, hard_delete=hard_delete)
if success:
log_structured('info', f'Conversation {"hard" if hard_delete else "soft"} deleted', {
'conversation_id': conversation_id_str, 'username': username
})
# Clear any cached state related to this conversation's session_id if known
session_id_to_clear = conversation.get('session_id')
if session_id_to_clear:
clear_chat_state_cache(session_id_to_clear)
return jsonify({'status': 'success'})
else:
log_structured('error', 'Failed to delete conversation in DB', {'conversation_id': conversation_id_str})
return jsonify({'error': 'Failed to delete conversation'}), 500
except Exception as e:
log_structured('error', 'Error deleting conversation', {'conversation_id': conversation_id_str, 'username': username, 'error': str(e), 'traceback': traceback.format_exc()})
return jsonify({'error': 'Failed to delete conversation'}), 500
# --- Brief Download Endpoint ---
@app.route('/download-brief', methods=['POST', 'OPTIONS'])
async def download_brief():
"""Generates and downloads a marketing brief as a Word document."""
if request.method == 'OPTIONS':
return jsonify({'status': 'ok'}) # Handle pre-flight
session_id = None
try:
data = request.get_json()
if not data or 'brief_content' not in data:
return jsonify({'error': 'No brief content provided'}), 400
session_id = data.get('sessionId', 'unknown_session') # Get session for context
brief_content_markdown = data['brief_content']
log_structured('info', 'Generating brief document', {'session_id': session_id})
# Create the Word document in memory using the generator function
doc_buffer = create_brief_docx(brief_content_markdown)
# Send the file
download_name = f'marketing_brief_{session_id}_{datetime.now().strftime("%Y%m%d")}.docx'
return send_file(
doc_buffer,
mimetype='application/vnd.openxmlformats-officedocument.wordprocessingml.document',
as_attachment=True,
download_name=download_name
)
except Exception as e:
log_structured('error', 'Error generating or sending brief document', {
'session_id': session_id, 'error': str(e), 'traceback': traceback.format_exc()
})
return jsonify({
'status': 'error',
'error': 'Failed to generate brief document',
'detail': str(e)
}), 500