- Cache extracted triples to disk (neo4j_triples.pickle) so Neo4j can be repopulated without expensive LLM re-extraction on cold starts - Split initialization into two phases: fast vector-only (~1-2 min) and background GraphRAG, so the server serves requests while GraphRAG loads - Add GraphRAG status flags to shared_state for monitoring readiness - Update /status endpoint to expose graphrag_ready/initializing/error - Restructure main.py to use single event loop for background task support Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
28 KiB
GraphRAG Startup Optimization Guide
Purpose
This document describes a set of changes to drastically reduce startup time for an app built on the LlamaIndex + Neo4j GraphRAG + Flask/Hypercorn architecture. The changes achieve two things:
- Triple caching: Extracted entities/relationships are cached to disk so Neo4j can be repopulated without expensive LLM re-extraction.
- Background initialization: The server starts serving requests with vector-only search in ~1-2 minutes, while GraphRAG components initialize in the background.
Problem
On a cold start (or when Neo4j loses its data), the app spends:
| Phase | Duration | Already Cached? |
|---|---|---|
| Document parsing (LlamaParse) | 20-30 min | Yes (vector index on disk) |
| Vector embeddings | 10-15 min | Yes (vector index on disk) |
| Triple extraction | 10-20 min | No — only lives in Neo4j |
| Community summarization | 10-30 min | Yes (pickle cache) |
| Community detection | 5-10 min | Yes (pickle cache) |
The two bolded phases are the bottleneck. The app can't serve any requests until all of this completes.
Architecture Assumptions
This guide assumes your app has:
- A graph store wrapper class (e.g.,
GraphRAGStore) that wrapsNeo4jPropertyGraphStoreand handles community detection/summarization with existing pickle caching for community data. - A
create_graph_components()function that connects to Neo4j, checks for existing data, and either reuses data or runs LLM extraction viaPropertyGraphIndex. - A monolithic initialization function (e.g.,
initialize_global_index()) that does everything: LLM setup, vector index loading, GraphRAG setup, agent creation. - A shared state module that stores global references to the agent, index, and GraphRAG components.
- A main.py that runs startup synchronously before the server starts.
- A routes module with a
/statusendpoint and a/chatendpoint that checks agent availability.
Adapt file names, class names, and function signatures to match your codebase.
Change 1: Cache Extracted Triples to Disk
What
Add two methods to your GraphRAGStore class:
save_triples_to_cache()— After successful LLM extraction, save entities and relations to a pickle file.load_triples_from_cache()— On startup, if Neo4j is empty but the cache exists, restore triples to Neo4j directly.
Then modify create_graph_components() to use these methods.
File: Graph RAG Integration Module (e.g., graph_rag_integration.py)
1a. Add a triples cache file path to the store class
In the class-level constants of your GraphRAGStore (next to the existing community cache paths), add:
TRIPLES_CACHE_FILE = CACHE_DIR / "neo4j_triples.pickle"
1b. Add save_triples_to_cache() method
Add this method to GraphRAGStore, after get_triplets():
def save_triples_to_cache(self):
"""Save extracted triples (entities + relationships) from Neo4j to a disk cache.
This allows restoring triples to Neo4j without expensive LLM re-extraction
if Neo4j data is lost (e.g., container recreated without volume persistence).
"""
try:
triplets = self.get_triplets()
if not triplets:
log_structured('warning', 'No triplets to cache — Neo4j appears empty')
return False
# Collect unique entities and relations from the triplets
entities = {}
relations = []
for entity1, relation, entity2 in triplets:
entities[entity1.name] = entity1
entities[entity2.name] = entity2
relations.append(relation)
cache_data = {
'entities': list(entities.values()),
'relations': relations,
'triplet_count': len(triplets),
}
with open(self.TRIPLES_CACHE_FILE, 'wb') as f:
pickle.dump(cache_data, f)
log_structured('info', 'Successfully cached Neo4j triples to disk', {
'entity_count': len(entities),
'relation_count': len(relations),
'triplet_count': len(triplets),
'cache_file': str(self.TRIPLES_CACHE_FILE)
})
return True
except Exception as e:
log_structured('error', f'Error saving triples cache: {e}')
return False
1c. Add load_triples_from_cache() method
Add this method right after save_triples_to_cache():
def load_triples_from_cache(self):
"""Load triples from disk cache and restore them to Neo4j.
Returns True if triples were successfully restored, False otherwise.
"""
if not self.TRIPLES_CACHE_FILE.exists():
log_structured('info', 'No triples cache file found')
return False
try:
with open(self.TRIPLES_CACHE_FILE, 'rb') as f:
cache_data = pickle.load(f)
entities = cache_data.get('entities', [])
relations = cache_data.get('relations', [])
if not entities:
log_structured('warning', 'Triples cache file exists but contains no entities')
return False
log_structured('info', 'Restoring triples from disk cache to Neo4j', {
'entity_count': len(entities),
'relation_count': len(relations),
'cached_triplet_count': cache_data.get('triplet_count', 'unknown')
})
# Restore entities (nodes) to Neo4j
self.property_graph_store.upsert_nodes(entities)
log_structured('info', f'Restored {len(entities)} entity nodes to Neo4j')
# Restore relations to Neo4j
if relations:
self.property_graph_store.upsert_relations(relations)
log_structured('info', f'Restored {len(relations)} relations to Neo4j')
# Verify restoration
restored_triplets = self.get_triplets()
log_structured('info', f'Neo4j now contains {len(restored_triplets)} triplets after cache restore')
return len(restored_triplets) > 0
except Exception as e:
log_structured('error', f'Error restoring triples from cache: {e}')
return False
1d. Modify create_graph_components() to use the cache
The existing function has two branches:
- Neo4j has data and
force_reindex=False→ skip indexing - Else → run LLM extraction
Change this to three branches:
1. Neo4j has data and !force_reindex → skip indexing (also save triples to cache if not already cached)
2. Neo4j is empty and !force_reindex and triples cache exists → restore from cache
3. Else → full LLM extraction (also save triples to cache after)
Here is the full replacement logic for the branching section inside create_graph_components():
# Check if Neo4j already has content
triplets = graph_store.get_triplets()
has_existing_content = len(triplets) > 0
log_structured('info', f'Neo4j check: Found {len(triplets)} triplets')
if has_existing_content and not force_reindex:
# BRANCH 1: Neo4j has data — use it, but also ensure disk cache exists
log_structured('info', f'Neo4j already contains {len(triplets)} triplets. Skipping indexing.')
# Ensure triples are also cached to disk for future recovery
if not graph_store.TRIPLES_CACHE_FILE.exists():
log_structured('info', 'Neo4j has data but no triples cache on disk — creating cache now')
graph_store.save_triples_to_cache()
property_graph_index = PropertyGraphIndex(
nodes=[],
property_graph_store=property_graph_store,
)
if not graph_store.communities_built:
log_structured('info', 'Building graph communities from existing Neo4j data')
try:
graph_store.build_communities()
except Exception as e:
log_structured('error', f'Error building communities: {e}')
elif not has_existing_content and not force_reindex and graph_store.TRIPLES_CACHE_FILE.exists():
# BRANCH 2: Neo4j is empty but triples cache exists — restore from cache
log_structured('info', 'Neo4j is empty but triples cache exists. Restoring from disk cache.')
restored = graph_store.load_triples_from_cache()
if restored:
log_structured('info', 'Successfully restored triples from cache.')
property_graph_index = PropertyGraphIndex(
nodes=[],
property_graph_store=property_graph_store,
)
if not graph_store.communities_built:
try:
graph_store.build_communities()
except Exception as e:
log_structured('error', f'Error building communities from restored data: {e}')
else:
# Cache restore failed — fall through to LLM extraction
log_structured('warning', 'Triples cache restore failed. Falling back to LLM extraction.')
if not nodes:
raise ValueError("Nodes must be provided when Neo4j is empty and cache restore fails")
kg_extractor = GraphRAGExtractor(
llm=llm,
extract_prompt=KG_TRIPLET_EXTRACT_TMPL,
max_paths_per_chunk=max_paths_per_chunk,
parse_fn=custom_parse_fn,
)
property_graph_index = PropertyGraphIndex(
nodes=nodes,
kg_extractors=[kg_extractor],
property_graph_store=property_graph_store,
show_progress=True,
)
graph_store.save_triples_to_cache()
try:
graph_store.build_communities()
except Exception as e:
log_structured('error', f'Error building communities: {e}')
else:
# BRANCH 3: Full LLM extraction (force_reindex or no cache)
if not nodes:
raise ValueError("Nodes must be provided for indexing when Neo4j is empty or force_reindex=True")
kg_extractor = GraphRAGExtractor(
llm=llm,
extract_prompt=KG_TRIPLET_EXTRACT_TMPL,
max_paths_per_chunk=max_paths_per_chunk,
parse_fn=custom_parse_fn,
)
if has_existing_content and force_reindex:
# Clear Neo4j before re-extraction
try:
from neo4j import GraphDatabase
driver = GraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))
with driver.session() as session:
session.run("MATCH (n) DETACH DELETE n")
driver.close()
except Exception as e:
log_structured('warning', f'Error clearing Neo4j database: {e}')
property_graph_index = PropertyGraphIndex(
nodes=nodes,
kg_extractors=[kg_extractor],
property_graph_store=property_graph_store,
show_progress=True,
)
# Cache the newly extracted triples
graph_store.save_triples_to_cache()
try:
graph_store.build_communities()
except Exception as e:
log_structured('error', f'Error building communities: {e}')
return graph_store, property_graph_index
Change 2: Add GraphRAG Status Flags to Shared State
File: Shared State Module (e.g., shared_state.py)
2a. Add new module-level variables
After the existing GraphRAG component variables, add:
# GraphRAG initialization status
graphrag_ready = False
graphrag_initializing = False
graphrag_error = None
2b. Add setter/getter functions
Add these functions before is_agent_available():
def set_graphrag_status(ready=None, initializing=None, error=None):
"""Update GraphRAG initialization status flags."""
global graphrag_ready, graphrag_initializing, graphrag_error
from utils import log_structured
if ready is not None:
graphrag_ready = ready
if initializing is not None:
graphrag_initializing = initializing
if error is not None:
graphrag_error = error
log_structured('info', 'GraphRAG status updated', {
'ready': graphrag_ready,
'initializing': graphrag_initializing,
'error': str(graphrag_error) if graphrag_error else None
})
def get_graphrag_status():
"""Get current GraphRAG initialization status."""
return {
'ready': graphrag_ready,
'initializing': graphrag_initializing,
'error': str(graphrag_error) if graphrag_error else None
}
Change 3: Split Initialization into Two Phases
File: AI Core Module (e.g., ai_core.py)
This is the largest change. You'll split your monolithic initialization function into:
initialize_vector_index()— Fast path (~1-2 min). Loads/creates vector index, creates vector query engine, creates agent with vector-only tools, attaches therunmethod. The server becomes usable after this completes.initialize_graphrag_components()— Background path. Connects to Neo4j, restores/extracts triples, builds communities, creates GraphRAG query engine, and dynamically adds the GraphRAG tool to the existing agent's tools list.initialize_global_index()— Kept as backward-compatible wrapper that calls both in sequence.
3a. Update imports from shared_state
Add set_graphrag_status to your shared_state imports:
from shared_state import (
...,
set_graphrag_status
)
3b. Extract your GraphRAG Tool class to module level
If your GraphRAG tool class (the BaseTool subclass that wraps GraphRAGQueryEngine) is defined inside the initialization function, move it to module level. This makes it reusable from both the init function and the background init function.
The tool class should remain functionally identical — just move it out of the function body to module scope. Example:
class GraphRAGTool(BaseTool):
"""Tool that queries using both vector and graph-based retrieval."""
def __init__(self, query_engine):
self.query_engine = query_engine
self._metadata = ToolMetadata(
name="answerquestionswith_graphrag",
description="Your tool description here"
)
@property
def metadata(self):
return self._metadata
def __call__(self, query_str: str) -> ToolOutput:
# ... existing implementation unchanged ...
pass
async def acall(self, input: str) -> ToolOutput:
return self.__call__(input)
3c. Create initialize_vector_index()
This function contains everything from your original initialize_global_index() except the GraphRAG-related code:
- LLM and embedding model setup
- Global Settings configuration
- Vector index loading or creation (including document processing for cold starts)
- Vector query engine creation
- Agent creation with vector-only tools
simple_runmethod attachment and agent testing
Key difference from the original: No GraphRAG components, no Neo4j connection, no community building. The agent is created with only the vector query tool.
The simple_run function works without modification because it dynamically looks up the GraphRAG tool:
graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
if graphrag_tool:
# use it
# else: falls through to vector-only path
When GraphRAG isn't initialized yet, graphrag_tool is None and the code gracefully falls back to the direct response path.
3d. Create initialize_graphrag_components()
This function handles everything GraphRAG-related:
async def initialize_graphrag_components() -> bool:
"""Initialize GraphRAG components in the background."""
set_graphrag_status(initializing=True, ready=False, error=None)
try:
# Get current index and agent from shared state
from shared_state import global_workflow_agent as current_agent
import shared_state
index = shared_state.global_index
if index is None:
raise RuntimeError("Vector index must be initialized first")
if current_agent is None:
raise RuntimeError("Agent must be initialized first")
llm = Settings.llm
# Connect to Neo4j
property_graph_store = Neo4jPropertyGraphStore(
username=NEO4J_USERNAME,
password=NEO4J_PASSWORD,
url=NEO4J_URL
)
# Check Neo4j state and create/restore components
temp_graph_store = GraphRAGStore(property_graph_store)
triplets = temp_graph_store.get_triplets()
neo4j_has_data = len(triplets) > 0
if neo4j_has_data:
graph_store, property_graph_index = create_graph_components(
llm=llm, force_reindex=False
)
else:
# Get nodes from vector index for potential extraction
vector_nodes = []
if hasattr(index, 'docstore') and index.docstore:
vector_nodes = list(index.docstore.docs.values())
if not vector_nodes:
raise ValueError("No nodes available for GraphRAG indexing")
# create_graph_components will try cache restore first, then LLM extraction
graph_store, property_graph_index = create_graph_components(
llm=llm,
nodes=vector_nodes,
max_paths_per_chunk=10,
force_reindex=False # Allow cache restore path
)
# Ensure communities are built
if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built:
graph_store.build_communities()
# Create GraphRAG query engine
vector_retriever = VectorIndexRetriever(
index=index, similarity_top_k=SIMILARITY_TOP_K
)
graphrag_query_engine = create_graphrag_query_engine(
vector_retriever=vector_retriever,
graph_store=graph_store,
llm=llm,
similarity_top_k=SIMILARITY_TOP_K
)
# Store in shared state
set_graphrag_components(
graph_store=graph_store,
property_graph_index=property_graph_index,
graphrag_query_engine=graphrag_query_engine
)
# *** KEY STEP: Add GraphRAG tool to the EXISTING agent ***
graphrag_tool = GraphRAGTool(graphrag_query_engine)
from shared_state import global_workflow_agent as live_agent
if live_agent is not None:
live_agent.tools.append(graphrag_tool)
log_structured('info', 'Added GraphRAG tool to existing agent', {
'total_tools': len(live_agent.tools),
'tool_names': [t.metadata.name for t in live_agent.tools]
})
set_graphrag_status(ready=True, initializing=False)
return True
except Exception as e:
log_structured('error', f'GraphRAG background initialization failed: {e}')
set_graphrag_status(ready=False, initializing=False, error=e)
return False
Critical detail: The GraphRAG tool is added to the agent's tools list dynamically via live_agent.tools.append(graphrag_tool). Because simple_run does a dynamic lookup on global_workflow_agent.tools every time it runs, the GraphRAG tool becomes available to all subsequent requests without restarting the agent.
3e. Keep initialize_global_index() as a backward-compatible wrapper
async def initialize_global_index() -> bool:
"""Backward-compatible wrapper: initializes everything synchronously."""
vector_success = await initialize_vector_index()
if not vector_success:
return False
graphrag_success = await initialize_graphrag_components()
if not graphrag_success:
log_structured('warning', 'GraphRAG init failed, but vector search is available')
return True
Change 4: Update Main Startup for Background Init
File: Main Module (e.g., main.py)
4a. Update imports
from ai_core import initialize_global_index, initialize_vector_index, initialize_graphrag_components
from shared_state import global_workflow_agent, is_agent_available, get_graphrag_status
4b. Modify startup_event() for two-phase startup
async def startup_event() -> bool:
"""Phase 1 (sync): MongoDB + vector index. Phase 2 (background): GraphRAG."""
log_structured('info', "Application startup sequence initiated.")
all_success = True
# 1. Initialize MongoDB
try:
if init_mongodb():
log_structured('info', "MongoDB initialized successfully.")
else:
all_success = False
except Exception as db_err:
log_structured('critical', f"MongoDB init failed: {db_err}")
all_success = False
# 2. Phase 1: Vector index + agent (fast)
log_structured('info', "Phase 1: Initializing vector index and agent...")
vector_success = await initialize_vector_index()
if not is_agent_available() or not vector_success:
log_structured('critical', "Vector initialization failed")
all_success = False
else:
log_structured('info', "Phase 1 complete: server is ready for vector queries")
# 3. Phase 2: GraphRAG in background
if vector_success:
log_structured('info', "Phase 2: Launching GraphRAG initialization in background...")
async def _background_graphrag_init():
try:
success = await initialize_graphrag_components()
if success:
log_structured('info', "GraphRAG background init completed")
else:
log_structured('warning', "GraphRAG background init failed — vector search still works")
except Exception as e:
log_structured('error', f"GraphRAG background init error: {e}")
# Schedule as background task — does NOT block server startup
asyncio.ensure_future(_background_graphrag_init())
log_structured('info', f"Startup complete. Server ready: {all_success}")
return all_success
4c. Restructure the main execution block
The old pattern of asyncio.run(startup_event()) followed by asyncio.run(serve(app, config)) won't work because background tasks launched by asyncio.ensure_future() need to run in the same event loop as the server. Restructure:
if __name__ == '__main__':
from hypercorn.config import Config as HypercornConfig
from hypercorn.asyncio import serve as hypercorn_serve
config = HypercornConfig()
# ... config setup ...
async def run_server_with_startup():
"""Run startup (with background GraphRAG init) then serve."""
await startup_event()
# Double-check agent
if not is_agent_available():
log_structured('critical', "Agent unavailable. Forcing re-init...")
await initialize_vector_index()
if is_agent_available():
asyncio.ensure_future(initialize_graphrag_components())
# Start serving — background GraphRAG init continues in same event loop
await hypercorn_serve(app, config)
try:
asyncio.run(run_server_with_startup())
except KeyboardInterrupt:
log_structured('info', "Server stopped manually.")
Why this matters: asyncio.ensure_future() schedules a coroutine on the current event loop. If you use two separate asyncio.run() calls, the background task from the first call is lost when the first event loop closes. By wrapping everything in a single async def and calling asyncio.run() once, the background GraphRAG init shares the server's event loop and runs concurrently with request handling.
Change 5: Update Routes for Graceful Degradation
File: Routes Module (e.g., routes.py)
5a. Update imports
from ai_core import initialize_global_index, initialize_vector_index
from shared_state import ..., get_graphrag_status
5b. Update /status endpoint
Add GraphRAG status to the response:
graphrag_status = get_graphrag_status()
status_data = {
# ... existing fields ...
'graphrag_ready': graphrag_status['ready'],
'graphrag_initializing': graphrag_status['initializing'],
'graphrag_error': graphrag_status['error'],
}
5c. Update on-demand initialization in /chat
If the /chat endpoint has a fallback that tries to initialize on-demand when the agent is unavailable, change it to use initialize_vector_index() instead of initialize_global_index() for faster recovery:
# Old:
index_success = await initialize_global_index()
# New:
index_success = await initialize_vector_index()
5d. No changes needed for chat logic
The /chat endpoint itself needs no changes for graceful degradation. The agent's simple_run method already dynamically checks for GraphRAG tool availability:
graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
if graphrag_tool:
# Use GraphRAG — only happens after background init completes
else:
# Fall through to vector-only response
Change 6: Docker Compose Documentation
File: Neo4j Docker Compose (e.g., docker-compose-neo4j.yml)
Add a comment at the top explaining volume persistence:
# IMPORTANT: Neo4j data persists in bind-mounted directories (./neo4j/data, etc.).
# These survive `docker-compose down` and `docker-compose up -d` restarts.
# However, if you delete the data directory manually, all graph data
# (extracted triples, entities, relationships) will be lost and must be
# re-extracted via LLM calls (10-20+ minutes) or restored from the disk cache
# at index_storage/graphrag_cache/neo4j_triples.pickle.
Verification Checklist
Test 1: Warm start (Neo4j has data, vector index cached)
- Server starts and accepts chat requests within ~1-2 minutes
/statusshowsgraphrag_initializing: truebriefly, thengraphrag_ready: true- Chat requests work with vector search immediately
- GraphRAG tool becomes available after background init completes
Test 2: Cold Neo4j with caches (Neo4j empty, index_storage/ intact)
- Server starts with vector queries in ~1-2 minutes
- Background init restores triples from
neo4j_triples.pickle(check logs for "Restoring triples from disk cache") - No LLM extraction calls (check logs — should NOT see "Building PropertyGraphIndex via LLM extraction")
- GraphRAG becomes available after cache restore + community build
Test 3: Full cold start (no index_storage/, no Neo4j data)
- Full rebuild runs (document parsing → vector index → LLM extraction → community building)
neo4j_triples.pickleis created after LLM extraction- Community cache files are created after community building
- Everything works on subsequent restarts using caches
Test 4: Degraded mode
- Send a chat request before GraphRAG finishes initializing
- Response comes back using vector-only search
- No errors in response — just missing GraphRAG context
- After GraphRAG init completes, subsequent requests use both vector and GraphRAG
Test 5: Cache integrity
- After a successful startup with Neo4j data, verify
neo4j_triples.pickleexists inindex_storage/graphrag_cache/ - Delete Neo4j data, restart — verify cache restore works
- Delete the pickle file, restart with Neo4j data — verify it gets recreated
File Change Summary
| File | Changes |
|---|---|
| Graph RAG integration module | save_triples_to_cache(), load_triples_from_cache(), 3-branch logic in create_graph_components() |
| Shared state module | graphrag_ready/graphrag_initializing/graphrag_error flags, set_graphrag_status(), get_graphrag_status() |
| AI core module | Extract GraphRAG tool class to module level, split into initialize_vector_index() + initialize_graphrag_components(), keep initialize_global_index() as wrapper |
| Main module | Two-phase startup in startup_event(), single asyncio.run() wrapping both startup and serve |
| Routes module | Import new functions, add GraphRAG status to /status, use initialize_vector_index() for on-demand init |
| Docker compose | Documentation comment about volume persistence |