# netflix_chatbot/main.py import asyncio import os import sys import threading from flask import Flask from flask_cors import CORS # Ensure the project directory is in the Python path current_dir = os.path.dirname(os.path.abspath(__file__)) if current_dir not in sys.path: sys.path.insert(0, current_dir) # Import necessary components from our modules from config import ( APPLICATION_ROOT, MAX_CONTENT_LENGTH, CORS_ALLOWED_ORIGINS, CORS_SUPPORTS_CREDENTIALS, SERVER_HOST, SERVER_PORT, USE_RELOADER, LOG_LEVEL, KEEP_ALIVE_TIMEOUT, READ_TIMEOUT, WRITE_TIMEOUT ) from utils import logger, log_structured from json_utils import CustomJSONProvider from ai_core import initialize_global_index, initialize_vector_index, initialize_graphrag_components # Import initialization functions from shared_state import global_workflow_agent, is_agent_available, get_graphrag_status # Import shared state from routes import register_routes from init_mongodb import init_mongodb # Your MongoDB initialization script # --- Flask App Initialization --- app = Flask(__name__) # Apply custom JSON provider for handling special types (LlamaIndex objects, etc.) app.json_provider_class = CustomJSONProvider app.json = CustomJSONProvider(app) # Configuration app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH if APPLICATION_ROOT: app.config['APPLICATION_ROOT'] = APPLICATION_ROOT # If using APPLICATION_ROOT, you might need to adjust route prefixes # or use a Blueprint with url_prefix=APPLICATION_ROOT log_structured('info', f"Flask Application Root set to: {APPLICATION_ROOT}") # CORS Configuration CORS(app, resources={r"/*": {"origins": CORS_ALLOWED_ORIGINS}}, supports_credentials=CORS_SUPPORTS_CREDENTIALS, # Expose custom headers if needed by the frontend # expose_headers=["Content-Disposition"] # Example for downloads ) log_structured('info', f"CORS configured for origins: {CORS_ALLOWED_ORIGINS}") # --- Register Routes --- # Pass the app object to the function in routes.py register_routes(app) log_structured('info', "Flask routes registered.") def _launch_graphrag_background_thread(): """Launch GraphRAG initialization in a daemon thread with its own event loop. Neo4j operations (connect, get_triplets, upsert_nodes, build_communities) are synchronous blocking calls. Running them in an asyncio task would block the main event loop and prevent Hypercorn from serving requests. A separate thread with its own event loop avoids this entirely. """ def _run(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: success = loop.run_until_complete(initialize_graphrag_components()) if success: log_structured('info', "Background GraphRAG initialization completed successfully") else: log_structured('warning', "Background GraphRAG initialization failed — vector search still works") except Exception as e: log_structured('error', f"Background GraphRAG initialization error: {e}") finally: loop.close() t = threading.Thread(target=_run, name="graphrag-init", daemon=True) t.start() # --- Startup Function --- async def startup_event() -> bool: """Tasks to run when the application starts. Phase 1 (synchronous): MongoDB + vector index + agent (~1-2 min). Phase 2 (background): GraphRAG components (Neo4j, triples, communities). Returns: bool: True if Phase 1 completed successfully (server is usable). """ log_structured('info', "Application startup sequence initiated.") all_success = True # 1. Initialize MongoDB Connection & Schema log_structured('info', "Initializing MongoDB connection...") try: if init_mongodb(): log_structured('info', "MongoDB initialized successfully.") else: log_structured('warning', "MongoDB initialization script finished, but reported issues.") all_success = False except Exception as db_err: log_structured('critical', "FATAL: MongoDB initialization failed.", {'error': str(db_err)}) all_success = False # 2. Phase 1: Initialize vector index and agent (fast path) log_structured('info', "Phase 1: Initializing vector index and agent...") vector_success = await initialize_vector_index() if not is_agent_available(): log_structured('critical', "After initialize_vector_index, agent is still unavailable") all_success = False elif not vector_success: log_structured('warning', "Vector initialization reported failure") all_success = False else: log_structured('info', "Phase 1 complete: vector index and agent are available") # 3. Phase 2: Launch GraphRAG initialization in a background THREAD. # This MUST run in a thread, not an asyncio task, because the Neo4j # operations (connect, get_triplets, build_communities, etc.) are # synchronous blocking calls. If run as an asyncio task, they block # the event loop and prevent Hypercorn from accepting connections. if vector_success: log_structured('info', "Phase 2: Launching GraphRAG initialization in background thread...") _launch_graphrag_background_thread() else: log_structured('warning', "Skipping GraphRAG background init because vector init failed") log_structured('info', f"Application startup sequence complete. Server ready: {all_success}") return all_success # --- Shutdown Function (Optional) --- async def shutdown_event(): """Tasks to run when the application stops.""" log_structured('info', "Application shutdown sequence initiated.") # Add any cleanup tasks here (e.g., closing connections if not handled elsewhere) # Note: Hypercorn might not always guarantee graceful shutdown execution. log_structured('info', "Application shutdown sequence complete.") # --- Main Execution Block --- if __name__ == '__main__': from hypercorn.config import Config as HypercornConfig from hypercorn.asyncio import serve as hypercorn_serve # Create Hypercorn config object config = HypercornConfig() # Basic settings config.bind = [f"{SERVER_HOST}:{SERVER_PORT}"] config.use_reloader = USE_RELOADER config.accesslog = '-' # Log to stdout/stderr config.errorlog = '-' # Log to stdout/stderr config.loglevel = LOG_LEVEL.upper() config.worker_class = 'asyncio' # Timeouts (ensure these are floats or ints) config.keep_alive_timeout = float(KEEP_ALIVE_TIMEOUT) config.read_timeout = float(READ_TIMEOUT) config.write_timeout = float(WRITE_TIMEOUT) # Request size limits (check Hypercorn docs for exact names, might vary slightly) # These might apply to HTTP/1.1 or HTTP/2 differently. # config.h11_max_incomplete_size = MAX_CONTENT_LENGTH # Example for HTTP/1.1 # config.h2_max_concurrent_streams = 100 # Example for HTTP/2 # config.max_app_buffer_size = MAX_CONTENT_LENGTH # Another potential setting # It's safer to configure these via a reverse proxy (like Nginx) in production. # Hypercorn's defaults are usually reasonable. Let's comment these out for now. # Shutdown handler only — startup is handled manually in run_server_with_startup() # so we can launch GraphRAG as a background task in the same event loop. # Do NOT register startup_event as a hook here (it would run twice). config.shutdown_hooks = [shutdown_event] log_structured('info', f"Starting Hypercorn server on {SERVER_HOST}:{SERVER_PORT}") log_structured('info', f"Reload mode: {'Enabled' if USE_RELOADER else 'Disabled'}") async def run_server_with_startup(): """Run startup (with background GraphRAG init) then serve.""" # Phase 1 runs synchronously, Phase 2 launches as background task startup_success = await startup_event() # Double-check agent is initialized if not is_agent_available(): log_structured('critical', "After startup, agent is still unavailable. Forcing re-initialization...") vector_success = await initialize_vector_index() if not vector_success or not is_agent_available(): log_structured('critical', "Emergency initialization failed. Server will run but chat will be impaired.") else: log_structured('info', "Emergency initialization succeeded.") # Also try GraphRAG in background thread _launch_graphrag_background_thread() # Start serving — background GraphRAG init continues in same event loop await hypercorn_serve(app, config) try: asyncio.run(run_server_with_startup()) except KeyboardInterrupt: log_structured('info', "Server stopped manually (KeyboardInterrupt).") except Exception as run_err: log_structured('critical', "Hypercorn server failed to run.", {'error': str(run_err)}) sys.exit(1)