- Cache extracted triples to disk (neo4j_triples.pickle) so Neo4j can be repopulated without expensive LLM re-extraction on cold starts - Split initialization into two phases: fast vector-only (~1-2 min) and background GraphRAG, so the server serves requests while GraphRAG loads - Add GraphRAG status flags to shared_state for monitoring readiness - Update /status endpoint to expose graphrag_ready/initializing/error - Restructure main.py to use single event loop for background task support Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
738 lines
28 KiB
Markdown
738 lines
28 KiB
Markdown
# GraphRAG Startup Optimization Guide
|
|
|
|
## Purpose
|
|
|
|
This document describes a set of changes to drastically reduce startup time for an app built on the LlamaIndex + Neo4j GraphRAG + Flask/Hypercorn architecture. The changes achieve two things:
|
|
|
|
1. **Triple caching**: Extracted entities/relationships are cached to disk so Neo4j can be repopulated without expensive LLM re-extraction.
|
|
2. **Background initialization**: The server starts serving requests with vector-only search in ~1-2 minutes, while GraphRAG components initialize in the background.
|
|
|
|
---
|
|
|
|
## Problem
|
|
|
|
On a cold start (or when Neo4j loses its data), the app spends:
|
|
|
|
| Phase | Duration | Already Cached? |
|
|
|-------|----------|-----------------|
|
|
| Document parsing (LlamaParse) | 20-30 min | Yes (vector index on disk) |
|
|
| Vector embeddings | 10-15 min | Yes (vector index on disk) |
|
|
| **Triple extraction** | **10-20 min** | **No — only lives in Neo4j** |
|
|
| **Community summarization** | **10-30 min** | Yes (pickle cache) |
|
|
| Community detection | 5-10 min | Yes (pickle cache) |
|
|
|
|
The two bolded phases are the bottleneck. The app can't serve any requests until all of this completes.
|
|
|
|
---
|
|
|
|
## Architecture Assumptions
|
|
|
|
This guide assumes your app has:
|
|
|
|
- A **graph store wrapper class** (e.g., `GraphRAGStore`) that wraps `Neo4jPropertyGraphStore` and handles community detection/summarization with existing pickle caching for community data.
|
|
- A **`create_graph_components()` function** that connects to Neo4j, checks for existing data, and either reuses data or runs LLM extraction via `PropertyGraphIndex`.
|
|
- A **monolithic initialization function** (e.g., `initialize_global_index()`) that does everything: LLM setup, vector index loading, GraphRAG setup, agent creation.
|
|
- A **shared state module** that stores global references to the agent, index, and GraphRAG components.
|
|
- A **main.py** that runs startup synchronously before the server starts.
|
|
- A **routes module** with a `/status` endpoint and a `/chat` endpoint that checks agent availability.
|
|
|
|
Adapt file names, class names, and function signatures to match your codebase.
|
|
|
|
---
|
|
|
|
## Change 1: Cache Extracted Triples to Disk
|
|
|
|
### What
|
|
|
|
Add two methods to your `GraphRAGStore` class:
|
|
- `save_triples_to_cache()` — After successful LLM extraction, save entities and relations to a pickle file.
|
|
- `load_triples_from_cache()` — On startup, if Neo4j is empty but the cache exists, restore triples to Neo4j directly.
|
|
|
|
Then modify `create_graph_components()` to use these methods.
|
|
|
|
### File: Graph RAG Integration Module (e.g., `graph_rag_integration.py`)
|
|
|
|
#### 1a. Add a triples cache file path to the store class
|
|
|
|
In the class-level constants of your `GraphRAGStore` (next to the existing community cache paths), add:
|
|
|
|
```python
|
|
TRIPLES_CACHE_FILE = CACHE_DIR / "neo4j_triples.pickle"
|
|
```
|
|
|
|
#### 1b. Add `save_triples_to_cache()` method
|
|
|
|
Add this method to `GraphRAGStore`, after `get_triplets()`:
|
|
|
|
```python
|
|
def save_triples_to_cache(self):
|
|
"""Save extracted triples (entities + relationships) from Neo4j to a disk cache.
|
|
|
|
This allows restoring triples to Neo4j without expensive LLM re-extraction
|
|
if Neo4j data is lost (e.g., container recreated without volume persistence).
|
|
"""
|
|
try:
|
|
triplets = self.get_triplets()
|
|
if not triplets:
|
|
log_structured('warning', 'No triplets to cache — Neo4j appears empty')
|
|
return False
|
|
|
|
# Collect unique entities and relations from the triplets
|
|
entities = {}
|
|
relations = []
|
|
for entity1, relation, entity2 in triplets:
|
|
entities[entity1.name] = entity1
|
|
entities[entity2.name] = entity2
|
|
relations.append(relation)
|
|
|
|
cache_data = {
|
|
'entities': list(entities.values()),
|
|
'relations': relations,
|
|
'triplet_count': len(triplets),
|
|
}
|
|
|
|
with open(self.TRIPLES_CACHE_FILE, 'wb') as f:
|
|
pickle.dump(cache_data, f)
|
|
|
|
log_structured('info', 'Successfully cached Neo4j triples to disk', {
|
|
'entity_count': len(entities),
|
|
'relation_count': len(relations),
|
|
'triplet_count': len(triplets),
|
|
'cache_file': str(self.TRIPLES_CACHE_FILE)
|
|
})
|
|
return True
|
|
except Exception as e:
|
|
log_structured('error', f'Error saving triples cache: {e}')
|
|
return False
|
|
```
|
|
|
|
#### 1c. Add `load_triples_from_cache()` method
|
|
|
|
Add this method right after `save_triples_to_cache()`:
|
|
|
|
```python
|
|
def load_triples_from_cache(self):
|
|
"""Load triples from disk cache and restore them to Neo4j.
|
|
|
|
Returns True if triples were successfully restored, False otherwise.
|
|
"""
|
|
if not self.TRIPLES_CACHE_FILE.exists():
|
|
log_structured('info', 'No triples cache file found')
|
|
return False
|
|
|
|
try:
|
|
with open(self.TRIPLES_CACHE_FILE, 'rb') as f:
|
|
cache_data = pickle.load(f)
|
|
|
|
entities = cache_data.get('entities', [])
|
|
relations = cache_data.get('relations', [])
|
|
|
|
if not entities:
|
|
log_structured('warning', 'Triples cache file exists but contains no entities')
|
|
return False
|
|
|
|
log_structured('info', 'Restoring triples from disk cache to Neo4j', {
|
|
'entity_count': len(entities),
|
|
'relation_count': len(relations),
|
|
'cached_triplet_count': cache_data.get('triplet_count', 'unknown')
|
|
})
|
|
|
|
# Restore entities (nodes) to Neo4j
|
|
self.property_graph_store.upsert_nodes(entities)
|
|
log_structured('info', f'Restored {len(entities)} entity nodes to Neo4j')
|
|
|
|
# Restore relations to Neo4j
|
|
if relations:
|
|
self.property_graph_store.upsert_relations(relations)
|
|
log_structured('info', f'Restored {len(relations)} relations to Neo4j')
|
|
|
|
# Verify restoration
|
|
restored_triplets = self.get_triplets()
|
|
log_structured('info', f'Neo4j now contains {len(restored_triplets)} triplets after cache restore')
|
|
|
|
return len(restored_triplets) > 0
|
|
except Exception as e:
|
|
log_structured('error', f'Error restoring triples from cache: {e}')
|
|
return False
|
|
```
|
|
|
|
#### 1d. Modify `create_graph_components()` to use the cache
|
|
|
|
The existing function has two branches:
|
|
1. Neo4j has data and `force_reindex=False` → skip indexing
|
|
2. Else → run LLM extraction
|
|
|
|
Change this to **three** branches:
|
|
|
|
```
|
|
1. Neo4j has data and !force_reindex → skip indexing (also save triples to cache if not already cached)
|
|
2. Neo4j is empty and !force_reindex and triples cache exists → restore from cache
|
|
3. Else → full LLM extraction (also save triples to cache after)
|
|
```
|
|
|
|
Here is the full replacement logic for the branching section inside `create_graph_components()`:
|
|
|
|
```python
|
|
# Check if Neo4j already has content
|
|
triplets = graph_store.get_triplets()
|
|
has_existing_content = len(triplets) > 0
|
|
|
|
log_structured('info', f'Neo4j check: Found {len(triplets)} triplets')
|
|
|
|
if has_existing_content and not force_reindex:
|
|
# BRANCH 1: Neo4j has data — use it, but also ensure disk cache exists
|
|
log_structured('info', f'Neo4j already contains {len(triplets)} triplets. Skipping indexing.')
|
|
|
|
# Ensure triples are also cached to disk for future recovery
|
|
if not graph_store.TRIPLES_CACHE_FILE.exists():
|
|
log_structured('info', 'Neo4j has data but no triples cache on disk — creating cache now')
|
|
graph_store.save_triples_to_cache()
|
|
|
|
property_graph_index = PropertyGraphIndex(
|
|
nodes=[],
|
|
property_graph_store=property_graph_store,
|
|
)
|
|
|
|
if not graph_store.communities_built:
|
|
log_structured('info', 'Building graph communities from existing Neo4j data')
|
|
try:
|
|
graph_store.build_communities()
|
|
except Exception as e:
|
|
log_structured('error', f'Error building communities: {e}')
|
|
|
|
elif not has_existing_content and not force_reindex and graph_store.TRIPLES_CACHE_FILE.exists():
|
|
# BRANCH 2: Neo4j is empty but triples cache exists — restore from cache
|
|
log_structured('info', 'Neo4j is empty but triples cache exists. Restoring from disk cache.')
|
|
|
|
restored = graph_store.load_triples_from_cache()
|
|
if restored:
|
|
log_structured('info', 'Successfully restored triples from cache.')
|
|
|
|
property_graph_index = PropertyGraphIndex(
|
|
nodes=[],
|
|
property_graph_store=property_graph_store,
|
|
)
|
|
|
|
if not graph_store.communities_built:
|
|
try:
|
|
graph_store.build_communities()
|
|
except Exception as e:
|
|
log_structured('error', f'Error building communities from restored data: {e}')
|
|
else:
|
|
# Cache restore failed — fall through to LLM extraction
|
|
log_structured('warning', 'Triples cache restore failed. Falling back to LLM extraction.')
|
|
if not nodes:
|
|
raise ValueError("Nodes must be provided when Neo4j is empty and cache restore fails")
|
|
|
|
kg_extractor = GraphRAGExtractor(
|
|
llm=llm,
|
|
extract_prompt=KG_TRIPLET_EXTRACT_TMPL,
|
|
max_paths_per_chunk=max_paths_per_chunk,
|
|
parse_fn=custom_parse_fn,
|
|
)
|
|
|
|
property_graph_index = PropertyGraphIndex(
|
|
nodes=nodes,
|
|
kg_extractors=[kg_extractor],
|
|
property_graph_store=property_graph_store,
|
|
show_progress=True,
|
|
)
|
|
|
|
graph_store.save_triples_to_cache()
|
|
|
|
try:
|
|
graph_store.build_communities()
|
|
except Exception as e:
|
|
log_structured('error', f'Error building communities: {e}')
|
|
|
|
else:
|
|
# BRANCH 3: Full LLM extraction (force_reindex or no cache)
|
|
if not nodes:
|
|
raise ValueError("Nodes must be provided for indexing when Neo4j is empty or force_reindex=True")
|
|
|
|
kg_extractor = GraphRAGExtractor(
|
|
llm=llm,
|
|
extract_prompt=KG_TRIPLET_EXTRACT_TMPL,
|
|
max_paths_per_chunk=max_paths_per_chunk,
|
|
parse_fn=custom_parse_fn,
|
|
)
|
|
|
|
if has_existing_content and force_reindex:
|
|
# Clear Neo4j before re-extraction
|
|
try:
|
|
from neo4j import GraphDatabase
|
|
driver = GraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))
|
|
with driver.session() as session:
|
|
session.run("MATCH (n) DETACH DELETE n")
|
|
driver.close()
|
|
except Exception as e:
|
|
log_structured('warning', f'Error clearing Neo4j database: {e}')
|
|
|
|
property_graph_index = PropertyGraphIndex(
|
|
nodes=nodes,
|
|
kg_extractors=[kg_extractor],
|
|
property_graph_store=property_graph_store,
|
|
show_progress=True,
|
|
)
|
|
|
|
# Cache the newly extracted triples
|
|
graph_store.save_triples_to_cache()
|
|
|
|
try:
|
|
graph_store.build_communities()
|
|
except Exception as e:
|
|
log_structured('error', f'Error building communities: {e}')
|
|
|
|
return graph_store, property_graph_index
|
|
```
|
|
|
|
---
|
|
|
|
## Change 2: Add GraphRAG Status Flags to Shared State
|
|
|
|
### File: Shared State Module (e.g., `shared_state.py`)
|
|
|
|
#### 2a. Add new module-level variables
|
|
|
|
After the existing GraphRAG component variables, add:
|
|
|
|
```python
|
|
# GraphRAG initialization status
|
|
graphrag_ready = False
|
|
graphrag_initializing = False
|
|
graphrag_error = None
|
|
```
|
|
|
|
#### 2b. Add setter/getter functions
|
|
|
|
Add these functions before `is_agent_available()`:
|
|
|
|
```python
|
|
def set_graphrag_status(ready=None, initializing=None, error=None):
|
|
"""Update GraphRAG initialization status flags."""
|
|
global graphrag_ready, graphrag_initializing, graphrag_error
|
|
from utils import log_structured
|
|
|
|
if ready is not None:
|
|
graphrag_ready = ready
|
|
if initializing is not None:
|
|
graphrag_initializing = initializing
|
|
if error is not None:
|
|
graphrag_error = error
|
|
|
|
log_structured('info', 'GraphRAG status updated', {
|
|
'ready': graphrag_ready,
|
|
'initializing': graphrag_initializing,
|
|
'error': str(graphrag_error) if graphrag_error else None
|
|
})
|
|
|
|
def get_graphrag_status():
|
|
"""Get current GraphRAG initialization status."""
|
|
return {
|
|
'ready': graphrag_ready,
|
|
'initializing': graphrag_initializing,
|
|
'error': str(graphrag_error) if graphrag_error else None
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Change 3: Split Initialization into Two Phases
|
|
|
|
### File: AI Core Module (e.g., `ai_core.py`)
|
|
|
|
This is the largest change. You'll split your monolithic initialization function into:
|
|
|
|
1. **`initialize_vector_index()`** — Fast path (~1-2 min). Loads/creates vector index, creates vector query engine, creates agent with vector-only tools, attaches the `run` method. The server becomes usable after this completes.
|
|
2. **`initialize_graphrag_components()`** — Background path. Connects to Neo4j, restores/extracts triples, builds communities, creates GraphRAG query engine, and **dynamically adds the GraphRAG tool** to the existing agent's tools list.
|
|
3. **`initialize_global_index()`** — Kept as backward-compatible wrapper that calls both in sequence.
|
|
|
|
#### 3a. Update imports from shared_state
|
|
|
|
Add `set_graphrag_status` to your shared_state imports:
|
|
|
|
```python
|
|
from shared_state import (
|
|
...,
|
|
set_graphrag_status
|
|
)
|
|
```
|
|
|
|
#### 3b. Extract your GraphRAG Tool class to module level
|
|
|
|
If your GraphRAG tool class (the `BaseTool` subclass that wraps `GraphRAGQueryEngine`) is defined inside the initialization function, move it to module level. This makes it reusable from both the init function and the background init function.
|
|
|
|
The tool class should remain functionally identical — just move it out of the function body to module scope. Example:
|
|
|
|
```python
|
|
class GraphRAGTool(BaseTool):
|
|
"""Tool that queries using both vector and graph-based retrieval."""
|
|
|
|
def __init__(self, query_engine):
|
|
self.query_engine = query_engine
|
|
self._metadata = ToolMetadata(
|
|
name="answerquestionswith_graphrag",
|
|
description="Your tool description here"
|
|
)
|
|
|
|
@property
|
|
def metadata(self):
|
|
return self._metadata
|
|
|
|
def __call__(self, query_str: str) -> ToolOutput:
|
|
# ... existing implementation unchanged ...
|
|
pass
|
|
|
|
async def acall(self, input: str) -> ToolOutput:
|
|
return self.__call__(input)
|
|
```
|
|
|
|
#### 3c. Create `initialize_vector_index()`
|
|
|
|
This function contains everything from your original `initialize_global_index()` **except** the GraphRAG-related code:
|
|
|
|
- LLM and embedding model setup
|
|
- Global Settings configuration
|
|
- Vector index loading or creation (including document processing for cold starts)
|
|
- Vector query engine creation
|
|
- Agent creation with **vector-only** tools
|
|
- `simple_run` method attachment and agent testing
|
|
|
|
**Key difference from the original**: No GraphRAG components, no Neo4j connection, no community building. The agent is created with only the vector query tool.
|
|
|
|
The `simple_run` function works without modification because it dynamically looks up the GraphRAG tool:
|
|
```python
|
|
graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
|
|
if graphrag_tool:
|
|
# use it
|
|
# else: falls through to vector-only path
|
|
```
|
|
|
|
When GraphRAG isn't initialized yet, `graphrag_tool` is `None` and the code gracefully falls back to the direct response path.
|
|
|
|
#### 3d. Create `initialize_graphrag_components()`
|
|
|
|
This function handles everything GraphRAG-related:
|
|
|
|
```python
|
|
async def initialize_graphrag_components() -> bool:
|
|
"""Initialize GraphRAG components in the background."""
|
|
set_graphrag_status(initializing=True, ready=False, error=None)
|
|
|
|
try:
|
|
# Get current index and agent from shared state
|
|
from shared_state import global_workflow_agent as current_agent
|
|
import shared_state
|
|
index = shared_state.global_index
|
|
|
|
if index is None:
|
|
raise RuntimeError("Vector index must be initialized first")
|
|
if current_agent is None:
|
|
raise RuntimeError("Agent must be initialized first")
|
|
|
|
llm = Settings.llm
|
|
|
|
# Connect to Neo4j
|
|
property_graph_store = Neo4jPropertyGraphStore(
|
|
username=NEO4J_USERNAME,
|
|
password=NEO4J_PASSWORD,
|
|
url=NEO4J_URL
|
|
)
|
|
|
|
# Check Neo4j state and create/restore components
|
|
temp_graph_store = GraphRAGStore(property_graph_store)
|
|
triplets = temp_graph_store.get_triplets()
|
|
neo4j_has_data = len(triplets) > 0
|
|
|
|
if neo4j_has_data:
|
|
graph_store, property_graph_index = create_graph_components(
|
|
llm=llm, force_reindex=False
|
|
)
|
|
else:
|
|
# Get nodes from vector index for potential extraction
|
|
vector_nodes = []
|
|
if hasattr(index, 'docstore') and index.docstore:
|
|
vector_nodes = list(index.docstore.docs.values())
|
|
|
|
if not vector_nodes:
|
|
raise ValueError("No nodes available for GraphRAG indexing")
|
|
|
|
# create_graph_components will try cache restore first, then LLM extraction
|
|
graph_store, property_graph_index = create_graph_components(
|
|
llm=llm,
|
|
nodes=vector_nodes,
|
|
max_paths_per_chunk=10,
|
|
force_reindex=False # Allow cache restore path
|
|
)
|
|
|
|
# Ensure communities are built
|
|
if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built:
|
|
graph_store.build_communities()
|
|
|
|
# Create GraphRAG query engine
|
|
vector_retriever = VectorIndexRetriever(
|
|
index=index, similarity_top_k=SIMILARITY_TOP_K
|
|
)
|
|
|
|
graphrag_query_engine = create_graphrag_query_engine(
|
|
vector_retriever=vector_retriever,
|
|
graph_store=graph_store,
|
|
llm=llm,
|
|
similarity_top_k=SIMILARITY_TOP_K
|
|
)
|
|
|
|
# Store in shared state
|
|
set_graphrag_components(
|
|
graph_store=graph_store,
|
|
property_graph_index=property_graph_index,
|
|
graphrag_query_engine=graphrag_query_engine
|
|
)
|
|
|
|
# *** KEY STEP: Add GraphRAG tool to the EXISTING agent ***
|
|
graphrag_tool = GraphRAGTool(graphrag_query_engine)
|
|
|
|
from shared_state import global_workflow_agent as live_agent
|
|
if live_agent is not None:
|
|
live_agent.tools.append(graphrag_tool)
|
|
log_structured('info', 'Added GraphRAG tool to existing agent', {
|
|
'total_tools': len(live_agent.tools),
|
|
'tool_names': [t.metadata.name for t in live_agent.tools]
|
|
})
|
|
|
|
set_graphrag_status(ready=True, initializing=False)
|
|
return True
|
|
|
|
except Exception as e:
|
|
log_structured('error', f'GraphRAG background initialization failed: {e}')
|
|
set_graphrag_status(ready=False, initializing=False, error=e)
|
|
return False
|
|
```
|
|
|
|
**Critical detail**: The GraphRAG tool is added to the agent's tools list dynamically via `live_agent.tools.append(graphrag_tool)`. Because `simple_run` does a dynamic lookup on `global_workflow_agent.tools` every time it runs, the GraphRAG tool becomes available to all subsequent requests without restarting the agent.
|
|
|
|
#### 3e. Keep `initialize_global_index()` as a backward-compatible wrapper
|
|
|
|
```python
|
|
async def initialize_global_index() -> bool:
|
|
"""Backward-compatible wrapper: initializes everything synchronously."""
|
|
vector_success = await initialize_vector_index()
|
|
if not vector_success:
|
|
return False
|
|
|
|
graphrag_success = await initialize_graphrag_components()
|
|
if not graphrag_success:
|
|
log_structured('warning', 'GraphRAG init failed, but vector search is available')
|
|
|
|
return True
|
|
```
|
|
|
|
---
|
|
|
|
## Change 4: Update Main Startup for Background Init
|
|
|
|
### File: Main Module (e.g., `main.py`)
|
|
|
|
#### 4a. Update imports
|
|
|
|
```python
|
|
from ai_core import initialize_global_index, initialize_vector_index, initialize_graphrag_components
|
|
from shared_state import global_workflow_agent, is_agent_available, get_graphrag_status
|
|
```
|
|
|
|
#### 4b. Modify `startup_event()` for two-phase startup
|
|
|
|
```python
|
|
async def startup_event() -> bool:
|
|
"""Phase 1 (sync): MongoDB + vector index. Phase 2 (background): GraphRAG."""
|
|
log_structured('info', "Application startup sequence initiated.")
|
|
all_success = True
|
|
|
|
# 1. Initialize MongoDB
|
|
try:
|
|
if init_mongodb():
|
|
log_structured('info', "MongoDB initialized successfully.")
|
|
else:
|
|
all_success = False
|
|
except Exception as db_err:
|
|
log_structured('critical', f"MongoDB init failed: {db_err}")
|
|
all_success = False
|
|
|
|
# 2. Phase 1: Vector index + agent (fast)
|
|
log_structured('info', "Phase 1: Initializing vector index and agent...")
|
|
vector_success = await initialize_vector_index()
|
|
|
|
if not is_agent_available() or not vector_success:
|
|
log_structured('critical', "Vector initialization failed")
|
|
all_success = False
|
|
else:
|
|
log_structured('info', "Phase 1 complete: server is ready for vector queries")
|
|
|
|
# 3. Phase 2: GraphRAG in background
|
|
if vector_success:
|
|
log_structured('info', "Phase 2: Launching GraphRAG initialization in background...")
|
|
|
|
async def _background_graphrag_init():
|
|
try:
|
|
success = await initialize_graphrag_components()
|
|
if success:
|
|
log_structured('info', "GraphRAG background init completed")
|
|
else:
|
|
log_structured('warning', "GraphRAG background init failed — vector search still works")
|
|
except Exception as e:
|
|
log_structured('error', f"GraphRAG background init error: {e}")
|
|
|
|
# Schedule as background task — does NOT block server startup
|
|
asyncio.ensure_future(_background_graphrag_init())
|
|
|
|
log_structured('info', f"Startup complete. Server ready: {all_success}")
|
|
return all_success
|
|
```
|
|
|
|
#### 4c. Restructure the main execution block
|
|
|
|
The old pattern of `asyncio.run(startup_event())` followed by `asyncio.run(serve(app, config))` won't work because background tasks launched by `asyncio.ensure_future()` need to run in the same event loop as the server. Restructure:
|
|
|
|
```python
|
|
if __name__ == '__main__':
|
|
from hypercorn.config import Config as HypercornConfig
|
|
from hypercorn.asyncio import serve as hypercorn_serve
|
|
|
|
config = HypercornConfig()
|
|
# ... config setup ...
|
|
|
|
async def run_server_with_startup():
|
|
"""Run startup (with background GraphRAG init) then serve."""
|
|
await startup_event()
|
|
|
|
# Double-check agent
|
|
if not is_agent_available():
|
|
log_structured('critical', "Agent unavailable. Forcing re-init...")
|
|
await initialize_vector_index()
|
|
if is_agent_available():
|
|
asyncio.ensure_future(initialize_graphrag_components())
|
|
|
|
# Start serving — background GraphRAG init continues in same event loop
|
|
await hypercorn_serve(app, config)
|
|
|
|
try:
|
|
asyncio.run(run_server_with_startup())
|
|
except KeyboardInterrupt:
|
|
log_structured('info', "Server stopped manually.")
|
|
```
|
|
|
|
**Why this matters**: `asyncio.ensure_future()` schedules a coroutine on the *current* event loop. If you use two separate `asyncio.run()` calls, the background task from the first call is lost when the first event loop closes. By wrapping everything in a single `async def` and calling `asyncio.run()` once, the background GraphRAG init shares the server's event loop and runs concurrently with request handling.
|
|
|
|
---
|
|
|
|
## Change 5: Update Routes for Graceful Degradation
|
|
|
|
### File: Routes Module (e.g., `routes.py`)
|
|
|
|
#### 5a. Update imports
|
|
|
|
```python
|
|
from ai_core import initialize_global_index, initialize_vector_index
|
|
from shared_state import ..., get_graphrag_status
|
|
```
|
|
|
|
#### 5b. Update `/status` endpoint
|
|
|
|
Add GraphRAG status to the response:
|
|
|
|
```python
|
|
graphrag_status = get_graphrag_status()
|
|
|
|
status_data = {
|
|
# ... existing fields ...
|
|
'graphrag_ready': graphrag_status['ready'],
|
|
'graphrag_initializing': graphrag_status['initializing'],
|
|
'graphrag_error': graphrag_status['error'],
|
|
}
|
|
```
|
|
|
|
#### 5c. Update on-demand initialization in `/chat`
|
|
|
|
If the `/chat` endpoint has a fallback that tries to initialize on-demand when the agent is unavailable, change it to use `initialize_vector_index()` instead of `initialize_global_index()` for faster recovery:
|
|
|
|
```python
|
|
# Old:
|
|
index_success = await initialize_global_index()
|
|
|
|
# New:
|
|
index_success = await initialize_vector_index()
|
|
```
|
|
|
|
#### 5d. No changes needed for chat logic
|
|
|
|
The `/chat` endpoint itself needs no changes for graceful degradation. The agent's `simple_run` method already dynamically checks for GraphRAG tool availability:
|
|
|
|
```python
|
|
graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
|
|
if graphrag_tool:
|
|
# Use GraphRAG — only happens after background init completes
|
|
else:
|
|
# Fall through to vector-only response
|
|
```
|
|
|
|
---
|
|
|
|
## Change 6: Docker Compose Documentation
|
|
|
|
### File: Neo4j Docker Compose (e.g., `docker-compose-neo4j.yml`)
|
|
|
|
Add a comment at the top explaining volume persistence:
|
|
|
|
```yaml
|
|
# IMPORTANT: Neo4j data persists in bind-mounted directories (./neo4j/data, etc.).
|
|
# These survive `docker-compose down` and `docker-compose up -d` restarts.
|
|
# However, if you delete the data directory manually, all graph data
|
|
# (extracted triples, entities, relationships) will be lost and must be
|
|
# re-extracted via LLM calls (10-20+ minutes) or restored from the disk cache
|
|
# at index_storage/graphrag_cache/neo4j_triples.pickle.
|
|
```
|
|
|
|
---
|
|
|
|
## Verification Checklist
|
|
|
|
### Test 1: Warm start (Neo4j has data, vector index cached)
|
|
- [ ] Server starts and accepts chat requests within ~1-2 minutes
|
|
- [ ] `/status` shows `graphrag_initializing: true` briefly, then `graphrag_ready: true`
|
|
- [ ] Chat requests work with vector search immediately
|
|
- [ ] GraphRAG tool becomes available after background init completes
|
|
|
|
### Test 2: Cold Neo4j with caches (Neo4j empty, `index_storage/` intact)
|
|
- [ ] Server starts with vector queries in ~1-2 minutes
|
|
- [ ] Background init restores triples from `neo4j_triples.pickle` (check logs for "Restoring triples from disk cache")
|
|
- [ ] No LLM extraction calls (check logs — should NOT see "Building PropertyGraphIndex via LLM extraction")
|
|
- [ ] GraphRAG becomes available after cache restore + community build
|
|
|
|
### Test 3: Full cold start (no `index_storage/`, no Neo4j data)
|
|
- [ ] Full rebuild runs (document parsing → vector index → LLM extraction → community building)
|
|
- [ ] `neo4j_triples.pickle` is created after LLM extraction
|
|
- [ ] Community cache files are created after community building
|
|
- [ ] Everything works on subsequent restarts using caches
|
|
|
|
### Test 4: Degraded mode
|
|
- [ ] Send a chat request before GraphRAG finishes initializing
|
|
- [ ] Response comes back using vector-only search
|
|
- [ ] No errors in response — just missing GraphRAG context
|
|
- [ ] After GraphRAG init completes, subsequent requests use both vector and GraphRAG
|
|
|
|
### Test 5: Cache integrity
|
|
- [ ] After a successful startup with Neo4j data, verify `neo4j_triples.pickle` exists in `index_storage/graphrag_cache/`
|
|
- [ ] Delete Neo4j data, restart — verify cache restore works
|
|
- [ ] Delete the pickle file, restart with Neo4j data — verify it gets recreated
|
|
|
|
---
|
|
|
|
## File Change Summary
|
|
|
|
| File | Changes |
|
|
|------|---------|
|
|
| Graph RAG integration module | `save_triples_to_cache()`, `load_triples_from_cache()`, 3-branch logic in `create_graph_components()` |
|
|
| Shared state module | `graphrag_ready`/`graphrag_initializing`/`graphrag_error` flags, `set_graphrag_status()`, `get_graphrag_status()` |
|
|
| AI core module | Extract GraphRAG tool class to module level, split into `initialize_vector_index()` + `initialize_graphrag_components()`, keep `initialize_global_index()` as wrapper |
|
|
| Main module | Two-phase startup in `startup_event()`, single `asyncio.run()` wrapping both startup and serve |
|
|
| Routes module | Import new functions, add GraphRAG status to `/status`, use `initialize_vector_index()` for on-demand init |
|
|
| Docker compose | Documentation comment about volume persistence |
|