Neo4j operations (connect, get_triplets, upsert_nodes, build_communities) are synchronous blocking calls. When run as an asyncio.ensure_future() task, they block the event loop and prevent Hypercorn from binding to the port or accepting connections. Fix: launch GraphRAG init in a daemon thread with its own event loop so the main event loop stays free to serve HTTP requests immediately after Phase 1 completes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
210 lines
No EOL
8.8 KiB
Python
210 lines
No EOL
8.8 KiB
Python
# netflix_chatbot/main.py
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
import threading
|
|
from flask import Flask
|
|
from flask_cors import CORS
|
|
|
|
# Ensure the project directory is in the Python path
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
if current_dir not in sys.path:
|
|
sys.path.insert(0, current_dir)
|
|
|
|
# Import necessary components from our modules
|
|
from config import (
|
|
APPLICATION_ROOT, MAX_CONTENT_LENGTH,
|
|
CORS_ALLOWED_ORIGINS, CORS_SUPPORTS_CREDENTIALS,
|
|
SERVER_HOST, SERVER_PORT, USE_RELOADER, LOG_LEVEL,
|
|
KEEP_ALIVE_TIMEOUT, READ_TIMEOUT, WRITE_TIMEOUT
|
|
)
|
|
from utils import logger, log_structured
|
|
from json_utils import CustomJSONProvider
|
|
from ai_core import initialize_global_index, initialize_vector_index, initialize_graphrag_components # Import initialization functions
|
|
from shared_state import global_workflow_agent, is_agent_available, get_graphrag_status # Import shared state
|
|
from routes import register_routes
|
|
from init_mongodb import init_mongodb # Your MongoDB initialization script
|
|
|
|
# --- Flask App Initialization ---
|
|
app = Flask(__name__)
|
|
|
|
# Apply custom JSON provider for handling special types (LlamaIndex objects, etc.)
|
|
app.json_provider_class = CustomJSONProvider
|
|
app.json = CustomJSONProvider(app)
|
|
|
|
# Configuration
|
|
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH
|
|
if APPLICATION_ROOT:
|
|
app.config['APPLICATION_ROOT'] = APPLICATION_ROOT
|
|
# If using APPLICATION_ROOT, you might need to adjust route prefixes
|
|
# or use a Blueprint with url_prefix=APPLICATION_ROOT
|
|
log_structured('info', f"Flask Application Root set to: {APPLICATION_ROOT}")
|
|
|
|
# CORS Configuration
|
|
CORS(app,
|
|
resources={r"/*": {"origins": CORS_ALLOWED_ORIGINS}},
|
|
supports_credentials=CORS_SUPPORTS_CREDENTIALS,
|
|
# Expose custom headers if needed by the frontend
|
|
# expose_headers=["Content-Disposition"] # Example for downloads
|
|
)
|
|
log_structured('info', f"CORS configured for origins: {CORS_ALLOWED_ORIGINS}")
|
|
|
|
|
|
# --- Register Routes ---
|
|
# Pass the app object to the function in routes.py
|
|
register_routes(app)
|
|
log_structured('info', "Flask routes registered.")
|
|
|
|
|
|
def _launch_graphrag_background_thread():
|
|
"""Launch GraphRAG initialization in a daemon thread with its own event loop.
|
|
|
|
Neo4j operations (connect, get_triplets, upsert_nodes, build_communities)
|
|
are synchronous blocking calls. Running them in an asyncio task would block
|
|
the main event loop and prevent Hypercorn from serving requests. A separate
|
|
thread with its own event loop avoids this entirely.
|
|
"""
|
|
def _run():
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
success = loop.run_until_complete(initialize_graphrag_components())
|
|
if success:
|
|
log_structured('info', "Background GraphRAG initialization completed successfully")
|
|
else:
|
|
log_structured('warning', "Background GraphRAG initialization failed — vector search still works")
|
|
except Exception as e:
|
|
log_structured('error', f"Background GraphRAG initialization error: {e}")
|
|
finally:
|
|
loop.close()
|
|
|
|
t = threading.Thread(target=_run, name="graphrag-init", daemon=True)
|
|
t.start()
|
|
|
|
|
|
# --- Startup Function ---
|
|
async def startup_event() -> bool:
|
|
"""Tasks to run when the application starts.
|
|
|
|
Phase 1 (synchronous): MongoDB + vector index + agent (~1-2 min).
|
|
Phase 2 (background): GraphRAG components (Neo4j, triples, communities).
|
|
|
|
Returns:
|
|
bool: True if Phase 1 completed successfully (server is usable).
|
|
"""
|
|
log_structured('info', "Application startup sequence initiated.")
|
|
all_success = True
|
|
|
|
# 1. Initialize MongoDB Connection & Schema
|
|
log_structured('info', "Initializing MongoDB connection...")
|
|
try:
|
|
if init_mongodb():
|
|
log_structured('info', "MongoDB initialized successfully.")
|
|
else:
|
|
log_structured('warning', "MongoDB initialization script finished, but reported issues.")
|
|
all_success = False
|
|
except Exception as db_err:
|
|
log_structured('critical', "FATAL: MongoDB initialization failed.", {'error': str(db_err)})
|
|
all_success = False
|
|
|
|
# 2. Phase 1: Initialize vector index and agent (fast path)
|
|
log_structured('info', "Phase 1: Initializing vector index and agent...")
|
|
vector_success = await initialize_vector_index()
|
|
|
|
if not is_agent_available():
|
|
log_structured('critical', "After initialize_vector_index, agent is still unavailable")
|
|
all_success = False
|
|
elif not vector_success:
|
|
log_structured('warning', "Vector initialization reported failure")
|
|
all_success = False
|
|
else:
|
|
log_structured('info', "Phase 1 complete: vector index and agent are available")
|
|
|
|
# 3. Phase 2: Launch GraphRAG initialization in a background THREAD.
|
|
# This MUST run in a thread, not an asyncio task, because the Neo4j
|
|
# operations (connect, get_triplets, build_communities, etc.) are
|
|
# synchronous blocking calls. If run as an asyncio task, they block
|
|
# the event loop and prevent Hypercorn from accepting connections.
|
|
if vector_success:
|
|
log_structured('info', "Phase 2: Launching GraphRAG initialization in background thread...")
|
|
_launch_graphrag_background_thread()
|
|
else:
|
|
log_structured('warning', "Skipping GraphRAG background init because vector init failed")
|
|
|
|
log_structured('info', f"Application startup sequence complete. Server ready: {all_success}")
|
|
return all_success
|
|
|
|
|
|
# --- Shutdown Function (Optional) ---
|
|
async def shutdown_event():
|
|
"""Tasks to run when the application stops."""
|
|
log_structured('info', "Application shutdown sequence initiated.")
|
|
# Add any cleanup tasks here (e.g., closing connections if not handled elsewhere)
|
|
# Note: Hypercorn might not always guarantee graceful shutdown execution.
|
|
log_structured('info', "Application shutdown sequence complete.")
|
|
|
|
|
|
# --- Main Execution Block ---
|
|
if __name__ == '__main__':
|
|
from hypercorn.config import Config as HypercornConfig
|
|
from hypercorn.asyncio import serve as hypercorn_serve
|
|
|
|
# Create Hypercorn config object
|
|
config = HypercornConfig()
|
|
|
|
# Basic settings
|
|
config.bind = [f"{SERVER_HOST}:{SERVER_PORT}"]
|
|
config.use_reloader = USE_RELOADER
|
|
config.accesslog = '-' # Log to stdout/stderr
|
|
config.errorlog = '-' # Log to stdout/stderr
|
|
config.loglevel = LOG_LEVEL.upper()
|
|
config.worker_class = 'asyncio'
|
|
|
|
# Timeouts (ensure these are floats or ints)
|
|
config.keep_alive_timeout = float(KEEP_ALIVE_TIMEOUT)
|
|
config.read_timeout = float(READ_TIMEOUT)
|
|
config.write_timeout = float(WRITE_TIMEOUT)
|
|
|
|
# Request size limits (check Hypercorn docs for exact names, might vary slightly)
|
|
# These might apply to HTTP/1.1 or HTTP/2 differently.
|
|
# config.h11_max_incomplete_size = MAX_CONTENT_LENGTH # Example for HTTP/1.1
|
|
# config.h2_max_concurrent_streams = 100 # Example for HTTP/2
|
|
# config.max_app_buffer_size = MAX_CONTENT_LENGTH # Another potential setting
|
|
# It's safer to configure these via a reverse proxy (like Nginx) in production.
|
|
# Hypercorn's defaults are usually reasonable. Let's comment these out for now.
|
|
|
|
# Shutdown handler only — startup is handled manually in run_server_with_startup()
|
|
# so we can launch GraphRAG as a background task in the same event loop.
|
|
# Do NOT register startup_event as a hook here (it would run twice).
|
|
config.shutdown_hooks = [shutdown_event]
|
|
|
|
log_structured('info', f"Starting Hypercorn server on {SERVER_HOST}:{SERVER_PORT}")
|
|
log_structured('info', f"Reload mode: {'Enabled' if USE_RELOADER else 'Disabled'}")
|
|
|
|
async def run_server_with_startup():
|
|
"""Run startup (with background GraphRAG init) then serve."""
|
|
# Phase 1 runs synchronously, Phase 2 launches as background task
|
|
startup_success = await startup_event()
|
|
|
|
# Double-check agent is initialized
|
|
if not is_agent_available():
|
|
log_structured('critical', "After startup, agent is still unavailable. Forcing re-initialization...")
|
|
vector_success = await initialize_vector_index()
|
|
if not vector_success or not is_agent_available():
|
|
log_structured('critical', "Emergency initialization failed. Server will run but chat will be impaired.")
|
|
else:
|
|
log_structured('info', "Emergency initialization succeeded.")
|
|
# Also try GraphRAG in background thread
|
|
_launch_graphrag_background_thread()
|
|
|
|
# Start serving — background GraphRAG init continues in same event loop
|
|
await hypercorn_serve(app, config)
|
|
|
|
try:
|
|
asyncio.run(run_server_with_startup())
|
|
except KeyboardInterrupt:
|
|
log_structured('info', "Server stopped manually (KeyboardInterrupt).")
|
|
except Exception as run_err:
|
|
log_structured('critical', "Hypercorn server failed to run.", {'error': str(run_err)})
|
|
sys.exit(1) |