Replace asyncio.ensure_future() with a daemon thread for GraphRAG initialization. The Neo4j driver and NetworkX calls are synchronous and were starving Hypercorn of CPU time on the shared event loop. A separate thread with its own event loop isolates the blocking work so the server accepts connections immediately after Phase 1 completes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
270 lines
10 KiB
Markdown
270 lines
10 KiB
Markdown
# Post-Deploy Fixes: Background GraphRAG Init
|
|
|
|
## Context
|
|
|
|
After deploying the initial startup optimization (see `graphrag-startup-optimization-guide.md`), two bugs were discovered during production testing that prevented the server from accepting connections promptly. This document describes both bugs and their fixes.
|
|
|
|
These fixes apply to the **main module** (`main.py`) only. No other files are affected.
|
|
|
|
---
|
|
|
|
## Bug 1: Double Startup Execution
|
|
|
|
### Symptom
|
|
|
|
The server took much longer than expected to start accepting connections. Phase 1 (vector init) appeared to run twice in the logs.
|
|
|
|
### Root Cause
|
|
|
|
The original `main.py` had `startup_event` registered as a Hypercorn startup hook:
|
|
|
|
```python
|
|
config.startup_hooks = [startup_event]
|
|
```
|
|
|
|
After the refactor, `startup_event()` was ALSO called manually inside `run_server_with_startup()`:
|
|
|
|
```python
|
|
async def run_server_with_startup():
|
|
startup_success = await startup_event() # <-- manual call
|
|
await hypercorn_serve(app, config) # <-- Hypercorn calls it AGAIN via hooks
|
|
```
|
|
|
|
This caused `startup_event()` to execute **twice**: once manually (where Phase 2 background task was launched), and once by Hypercorn as a startup hook (which blocks the server from accepting connections until it completes). The second execution re-ran `initialize_vector_index()`, delaying server startup.
|
|
|
|
### Fix
|
|
|
|
**Remove `startup_event` from Hypercorn's startup hooks.** Since we call it manually in `run_server_with_startup()`, registering it as a hook causes double execution.
|
|
|
|
Change:
|
|
```python
|
|
config.startup_hooks = [startup_event]
|
|
config.shutdown_hooks = [shutdown_event]
|
|
```
|
|
|
|
To:
|
|
```python
|
|
# Shutdown handler only — startup is handled manually in run_server_with_startup()
|
|
# Do NOT register startup_event as a hook here (it would run twice).
|
|
config.shutdown_hooks = [shutdown_event]
|
|
```
|
|
|
|
---
|
|
|
|
## Bug 2: Blocking Event Loop (the critical one)
|
|
|
|
### Symptom
|
|
|
|
After fixing Bug 1, the logs showed Phase 1 completing in ~7 seconds:
|
|
```
|
|
Phase 1 complete: vector index and agent are available
|
|
Phase 2: Launching GraphRAG initialization in background...
|
|
Application startup sequence complete. Server ready: True
|
|
Starting GraphRAG background initialization
|
|
```
|
|
|
|
But the **chat interface was still unavailable**. The server was not accepting HTTP connections despite logging "Server ready: True". `htop` showed Neo4j Java processes at 100% CPU.
|
|
|
|
### Root Cause
|
|
|
|
The background GraphRAG init was launched with `asyncio.ensure_future()`:
|
|
|
|
```python
|
|
asyncio.ensure_future(_background_graphrag_init())
|
|
```
|
|
|
|
This schedules the coroutine on the **same event loop** as Hypercorn. The problem is that `initialize_graphrag_components()` and the functions it calls contain **synchronous blocking operations**:
|
|
|
|
- `Neo4jPropertyGraphStore()` constructor — synchronous TCP connection to Neo4j
|
|
- `graph_store.get_triplets()` — synchronous Cypher query
|
|
- `property_graph_store.upsert_nodes()` / `upsert_relations()` — synchronous writes
|
|
- `graph_store.build_communities()` — synchronous NetworkX + synchronous LLM calls
|
|
|
|
In Python's asyncio, a coroutine that calls synchronous blocking code **blocks the entire event loop** until the blocking call returns. Even though the coroutine is "in the background", once the event loop gives it a time slice, the blocking Neo4j calls prevent the loop from doing anything else — including running Hypercorn's connection handler.
|
|
|
|
This means: `asyncio.ensure_future()` with synchronous-blocking code is NOT truly non-blocking. The server cannot accept connections until each individual blocking call within the background task completes and yields back.
|
|
|
|
### Fix
|
|
|
|
**Run the GraphRAG init in a separate daemon thread** with its own event loop. This completely isolates the blocking Neo4j work from Hypercorn's event loop.
|
|
|
|
#### Step 1: Add `import threading` to the top of `main.py`
|
|
|
|
```python
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
import threading # <-- ADD THIS
|
|
from flask import Flask
|
|
from flask_cors import CORS
|
|
```
|
|
|
|
#### Step 2: Add a helper function to launch the background thread
|
|
|
|
Place this function **before** `startup_event()` (e.g., after the route registration):
|
|
|
|
```python
|
|
def _launch_graphrag_background_thread():
|
|
"""Launch GraphRAG initialization in a daemon thread with its own event loop.
|
|
|
|
Neo4j operations (connect, get_triplets, upsert_nodes, build_communities)
|
|
are synchronous blocking calls. Running them in an asyncio task would block
|
|
the main event loop and prevent Hypercorn from serving requests. A separate
|
|
thread with its own event loop avoids this entirely.
|
|
"""
|
|
def _run():
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
success = loop.run_until_complete(initialize_graphrag_components())
|
|
if success:
|
|
log_structured('info', "Background GraphRAG initialization completed successfully")
|
|
else:
|
|
log_structured('warning', "Background GraphRAG initialization failed — vector search still works")
|
|
except Exception as e:
|
|
log_structured('error', f"Background GraphRAG initialization error: {e}")
|
|
finally:
|
|
loop.close()
|
|
|
|
t = threading.Thread(target=_run, name="graphrag-init", daemon=True)
|
|
t.start()
|
|
```
|
|
|
|
Key details:
|
|
- **`daemon=True`**: Thread dies automatically if the main process exits. No cleanup needed.
|
|
- **`asyncio.new_event_loop()`**: Each thread needs its own event loop. Cannot share the main thread's loop.
|
|
- **`loop.run_until_complete()`**: Runs the async `initialize_graphrag_components()` to completion within the thread's own loop.
|
|
- **Thread safety**: This is safe because:
|
|
- Python's GIL protects attribute assignments (`set_graphrag_status()`, `set_graphrag_components()`)
|
|
- `list.append()` is atomic under the GIL (`live_agent.tools.append(graphrag_tool)`)
|
|
- The shared state module uses simple attribute assignment, not complex data structures
|
|
|
|
#### Step 3: Replace `asyncio.ensure_future()` calls with the thread launcher
|
|
|
|
In `startup_event()`, change:
|
|
|
|
```python
|
|
# OLD — blocks event loop:
|
|
asyncio.ensure_future(_background_graphrag_init())
|
|
```
|
|
|
|
To:
|
|
|
|
```python
|
|
# NEW — runs in separate thread, cannot block event loop:
|
|
_launch_graphrag_background_thread()
|
|
```
|
|
|
|
Remove the `_background_graphrag_init()` async helper function entirely — it's no longer needed.
|
|
|
|
If you have an emergency fallback path (e.g., in `run_server_with_startup()`) that also launches GraphRAG in the background, change that too:
|
|
|
|
```python
|
|
# OLD:
|
|
asyncio.ensure_future(initialize_graphrag_components())
|
|
|
|
# NEW:
|
|
_launch_graphrag_background_thread()
|
|
```
|
|
|
|
### Result
|
|
|
|
After this fix:
|
|
- Phase 1 (vector init) completes in ~7-10 seconds
|
|
- Hypercorn binds to the port and starts accepting connections **immediately** after Phase 1
|
|
- GraphRAG init runs in a completely separate thread — Neo4j can peg the CPU at 100% without affecting HTTP request handling
|
|
- The `/status` endpoint shows `graphrag_initializing: true` while the thread runs, then `graphrag_ready: true` when it finishes
|
|
- Chat requests work immediately using vector-only search, then get GraphRAG context once the background thread completes
|
|
|
|
---
|
|
|
|
## Summary of All Changes to `main.py`
|
|
|
|
Here is the complete final state of the key sections (adapt to your codebase):
|
|
|
|
### Imports
|
|
```python
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
import threading # ADDED
|
|
from flask import Flask
|
|
from flask_cors import CORS
|
|
```
|
|
|
|
### AI Core imports
|
|
```python
|
|
from ai_core import initialize_global_index, initialize_vector_index, initialize_graphrag_components
|
|
from shared_state import global_workflow_agent, is_agent_available, get_graphrag_status
|
|
```
|
|
|
|
### Background thread launcher (NEW function)
|
|
```python
|
|
def _launch_graphrag_background_thread():
|
|
def _run():
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
success = loop.run_until_complete(initialize_graphrag_components())
|
|
if success:
|
|
log_structured('info', "Background GraphRAG initialization completed successfully")
|
|
else:
|
|
log_structured('warning', "Background GraphRAG initialization failed — vector search still works")
|
|
except Exception as e:
|
|
log_structured('error', f"Background GraphRAG initialization error: {e}")
|
|
finally:
|
|
loop.close()
|
|
|
|
t = threading.Thread(target=_run, name="graphrag-init", daemon=True)
|
|
t.start()
|
|
```
|
|
|
|
### startup_event() — Phase 2 section
|
|
```python
|
|
if vector_success:
|
|
log_structured('info', "Phase 2: Launching GraphRAG initialization in background thread...")
|
|
_launch_graphrag_background_thread() # Thread, not asyncio task
|
|
else:
|
|
log_structured('warning', "Skipping GraphRAG background init because vector init failed")
|
|
```
|
|
|
|
### Hypercorn config — NO startup hook
|
|
```python
|
|
# Do NOT register startup_event as a hook — it's called manually
|
|
config.shutdown_hooks = [shutdown_event]
|
|
```
|
|
|
|
### run_server_with_startup() — single asyncio.run() wrapping everything
|
|
```python
|
|
async def run_server_with_startup():
|
|
startup_success = await startup_event()
|
|
|
|
if not is_agent_available():
|
|
log_structured('critical', "Agent unavailable. Forcing re-init...")
|
|
vector_success = await initialize_vector_index()
|
|
if not vector_success or not is_agent_available():
|
|
log_structured('critical', "Emergency init failed.")
|
|
else:
|
|
log_structured('info', "Emergency init succeeded.")
|
|
_launch_graphrag_background_thread() # Thread, not asyncio task
|
|
|
|
await hypercorn_serve(app, config)
|
|
|
|
try:
|
|
asyncio.run(run_server_with_startup())
|
|
except KeyboardInterrupt:
|
|
log_structured('info', "Server stopped manually.")
|
|
```
|
|
|
|
---
|
|
|
|
## Key Lesson: asyncio.ensure_future() vs threading.Thread
|
|
|
|
| | `asyncio.ensure_future()` | `threading.Thread(daemon=True)` |
|
|
|---|---|---|
|
|
| **Where it runs** | Same event loop as the server | Separate OS thread |
|
|
| **Blocking sync calls** | **Block the entire event loop** — server can't handle requests | Isolated — main event loop stays free |
|
|
| **When to use** | Truly async I/O (aiohttp, async DB drivers) | Synchronous/blocking libraries (neo4j, networkx, synchronous LLM calls) |
|
|
| **Thread safety** | N/A (single-threaded) | Safe under Python's GIL for simple attribute assignments and list.append() |
|
|
|
|
**Rule of thumb**: If your "background task" calls any synchronous library (Neo4j driver, NetworkX, synchronous HTTP clients), use a thread. `asyncio.ensure_future()` only helps if the entire call chain is truly async.
|