hp_chatbot/main.py
michael 5e41ae4326 Fix background GraphRAG init blocking Hypercorn event loop
Replace asyncio.ensure_future() with a daemon thread for GraphRAG
initialization. The Neo4j driver and NetworkX calls are synchronous
and were starving Hypercorn of CPU time on the shared event loop.
A separate thread with its own event loop isolates the blocking work
so the server accepts connections immediately after Phase 1 completes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-02-23 17:56:26 -06:00

187 lines
No EOL
7.1 KiB
Python

# hp_chatbot/main.py
import asyncio
import os
import sys
import threading
from flask import Flask
from flask_cors import CORS
# Ensure the project directory is in the Python path
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.insert(0, current_dir)
# Import necessary components from our modules
from config import (
APPLICATION_ROOT, MAX_CONTENT_LENGTH,
CORS_ALLOWED_ORIGINS, CORS_SUPPORTS_CREDENTIALS,
SERVER_HOST, SERVER_PORT, USE_RELOADER, LOG_LEVEL,
KEEP_ALIVE_TIMEOUT, READ_TIMEOUT, WRITE_TIMEOUT
)
from utils import logger, log_structured
from json_utils import CustomJSONProvider
from ai_core import initialize_global_index, initialize_vector_index, initialize_graphrag_components
from shared_state import global_workflow_agent, is_agent_available, get_graphrag_status
from routes import register_routes
from init_mongodb import init_mongodb # Your MongoDB initialization script
# --- Flask App Initialization ---
app = Flask(__name__)
# Apply custom JSON provider for handling special types (LlamaIndex objects, etc.)
app.json_provider_class = CustomJSONProvider
app.json = CustomJSONProvider(app)
# Configuration
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH
if APPLICATION_ROOT:
app.config['APPLICATION_ROOT'] = APPLICATION_ROOT
# If using APPLICATION_ROOT, you might need to adjust route prefixes
# or use a Blueprint with url_prefix=APPLICATION_ROOT
log_structured('info', f"Flask Application Root set to: {APPLICATION_ROOT}")
# CORS Configuration
CORS(app,
resources={r"/*": {"origins": CORS_ALLOWED_ORIGINS}},
supports_credentials=CORS_SUPPORTS_CREDENTIALS,
# Expose custom headers if needed by the frontend
# expose_headers=["Content-Disposition"] # Example for downloads
)
log_structured('info', f"CORS configured for origins: {CORS_ALLOWED_ORIGINS}")
# --- Register Routes ---
# Pass the app object to the function in routes.py
register_routes(app)
log_structured('info', "Flask routes registered.")
def _launch_graphrag_background_thread():
"""Launch GraphRAG initialization in a daemon thread with its own event loop.
Neo4j operations (connect, get_triplets, upsert_nodes, build_communities)
are synchronous blocking calls. Running them in an asyncio task would block
the main event loop and prevent Hypercorn from serving requests. A separate
thread with its own event loop avoids this entirely.
"""
def _run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
success = loop.run_until_complete(initialize_graphrag_components())
if success:
log_structured('info', "Background GraphRAG initialization completed successfully")
else:
log_structured('warning', "Background GraphRAG initialization failed — vector search still works")
except Exception as e:
log_structured('error', f"Background GraphRAG initialization error: {e}")
finally:
loop.close()
t = threading.Thread(target=_run, name="graphrag-init", daemon=True)
t.start()
# --- Startup Function ---
async def startup_event() -> bool:
"""Phase 1 (blocking): MongoDB + vector index. Phase 2 (background): GraphRAG.
Returns:
bool: True if Phase 1 completed successfully, False otherwise
"""
log_structured('info', "Application startup sequence initiated.")
all_success = True
# 1. Initialize MongoDB Connection & Schema
log_structured('info', "Initializing MongoDB connection...")
try:
if init_mongodb():
log_structured('info', "MongoDB initialized successfully.")
else:
log_structured('warning', "MongoDB initialization script finished, but reported issues.")
all_success = False
except Exception as db_err:
log_structured('critical', "FATAL: MongoDB initialization failed.", {'error': str(db_err)})
all_success = False
# 2. Phase 1: Vector index + agent (fast, ~1-2 min)
log_structured('info', "Phase 1: Initializing vector index and agent...")
vector_success = await initialize_vector_index()
if not is_agent_available() or not vector_success:
log_structured('critical', "Vector initialization failed")
all_success = False
else:
log_structured('info', "Phase 1 complete: server is ready for vector queries")
# 3. Phase 2: GraphRAG in background thread (non-blocking)
if vector_success:
log_structured('info', "Phase 2: Launching GraphRAG initialization in background thread...")
_launch_graphrag_background_thread()
log_structured('info', f"Startup complete. Server ready: {all_success}")
return all_success
# --- Shutdown Function (Optional) ---
async def shutdown_event():
"""Tasks to run when the application stops."""
log_structured('info', "Application shutdown sequence initiated.")
# Add any cleanup tasks here (e.g., closing connections if not handled elsewhere)
# Note: Hypercorn might not always guarantee graceful shutdown execution.
log_structured('info', "Application shutdown sequence complete.")
# --- Main Execution Block ---
if __name__ == '__main__':
from hypercorn.config import Config as HypercornConfig
from hypercorn.asyncio import serve as hypercorn_serve
# Create Hypercorn config object
config = HypercornConfig()
# Basic settings
config.bind = [f"{SERVER_HOST}:{SERVER_PORT}"]
config.use_reloader = USE_RELOADER
config.accesslog = '-'
config.errorlog = '-'
config.loglevel = LOG_LEVEL.upper()
config.worker_class = 'asyncio'
# Timeouts
config.keep_alive_timeout = float(KEEP_ALIVE_TIMEOUT)
config.read_timeout = float(READ_TIMEOUT)
config.write_timeout = float(WRITE_TIMEOUT)
# Shutdown hooks (startup is handled manually inside run_server_with_startup)
config.shutdown_hooks = [shutdown_event]
log_structured('info', f"Starting Hypercorn server on {SERVER_HOST}:{SERVER_PORT}")
log_structured('info', f"Reload mode: {'Enabled' if USE_RELOADER else 'Disabled'}")
async def run_server_with_startup():
"""Run startup (with background GraphRAG init) then serve.
Phase 1 runs in the main event loop. Phase 2 (GraphRAG) runs in a
separate daemon thread so blocking Neo4j calls cannot stall Hypercorn.
"""
await startup_event()
# Double-check agent
if not is_agent_available():
log_structured('critical', "Agent unavailable after startup. Forcing vector re-init...")
await initialize_vector_index()
if is_agent_available():
_launch_graphrag_background_thread()
# Start serving — background GraphRAG init continues in separate thread
await hypercorn_serve(app, config)
try:
asyncio.run(run_server_with_startup())
except KeyboardInterrupt:
log_structured('info', "Server stopped manually (KeyboardInterrupt).")
except Exception as run_err:
log_structured('critical', "Hypercorn server failed to run.", {'error': str(run_err)})
sys.exit(1)