# GraphRAG Startup Optimization Guide ## Purpose This document describes a set of changes to drastically reduce startup time for an app built on the LlamaIndex + Neo4j GraphRAG + Flask/Hypercorn architecture. The changes achieve two things: 1. **Triple caching**: Extracted entities/relationships are cached to disk so Neo4j can be repopulated without expensive LLM re-extraction. 2. **Background initialization**: The server starts serving requests with vector-only search in ~1-2 minutes, while GraphRAG components initialize in the background. --- ## Problem On a cold start (or when Neo4j loses its data), the app spends: | Phase | Duration | Already Cached? | |-------|----------|-----------------| | Document parsing (LlamaParse) | 20-30 min | Yes (vector index on disk) | | Vector embeddings | 10-15 min | Yes (vector index on disk) | | **Triple extraction** | **10-20 min** | **No — only lives in Neo4j** | | **Community summarization** | **10-30 min** | Yes (pickle cache) | | Community detection | 5-10 min | Yes (pickle cache) | The two bolded phases are the bottleneck. The app can't serve any requests until all of this completes. --- ## Architecture Assumptions This guide assumes your app has: - A **graph store wrapper class** (e.g., `GraphRAGStore`) that wraps `Neo4jPropertyGraphStore` and handles community detection/summarization with existing pickle caching for community data. - A **`create_graph_components()` function** that connects to Neo4j, checks for existing data, and either reuses data or runs LLM extraction via `PropertyGraphIndex`. - A **monolithic initialization function** (e.g., `initialize_global_index()`) that does everything: LLM setup, vector index loading, GraphRAG setup, agent creation. - A **shared state module** that stores global references to the agent, index, and GraphRAG components. - A **main.py** that runs startup synchronously before the server starts. - A **routes module** with a `/status` endpoint and a `/chat` endpoint that checks agent availability. Adapt file names, class names, and function signatures to match your codebase. --- ## Change 1: Cache Extracted Triples to Disk ### What Add two methods to your `GraphRAGStore` class: - `save_triples_to_cache()` — After successful LLM extraction, save entities and relations to a pickle file. - `load_triples_from_cache()` — On startup, if Neo4j is empty but the cache exists, restore triples to Neo4j directly. Then modify `create_graph_components()` to use these methods. ### File: Graph RAG Integration Module (e.g., `graph_rag_integration.py`) #### 1a. Add a triples cache file path to the store class In the class-level constants of your `GraphRAGStore` (next to the existing community cache paths), add: ```python TRIPLES_CACHE_FILE = CACHE_DIR / "neo4j_triples.pickle" ``` #### 1b. Add `save_triples_to_cache()` method Add this method to `GraphRAGStore`, after `get_triplets()`: ```python def save_triples_to_cache(self): """Save extracted triples (entities + relationships) from Neo4j to a disk cache. This allows restoring triples to Neo4j without expensive LLM re-extraction if Neo4j data is lost (e.g., container recreated without volume persistence). """ try: triplets = self.get_triplets() if not triplets: log_structured('warning', 'No triplets to cache — Neo4j appears empty') return False # Collect unique entities and relations from the triplets entities = {} relations = [] for entity1, relation, entity2 in triplets: entities[entity1.name] = entity1 entities[entity2.name] = entity2 relations.append(relation) cache_data = { 'entities': list(entities.values()), 'relations': relations, 'triplet_count': len(triplets), } with open(self.TRIPLES_CACHE_FILE, 'wb') as f: pickle.dump(cache_data, f) log_structured('info', 'Successfully cached Neo4j triples to disk', { 'entity_count': len(entities), 'relation_count': len(relations), 'triplet_count': len(triplets), 'cache_file': str(self.TRIPLES_CACHE_FILE) }) return True except Exception as e: log_structured('error', f'Error saving triples cache: {e}') return False ``` #### 1c. Add `load_triples_from_cache()` method Add this method right after `save_triples_to_cache()`: ```python def load_triples_from_cache(self): """Load triples from disk cache and restore them to Neo4j. Returns True if triples were successfully restored, False otherwise. """ if not self.TRIPLES_CACHE_FILE.exists(): log_structured('info', 'No triples cache file found') return False try: with open(self.TRIPLES_CACHE_FILE, 'rb') as f: cache_data = pickle.load(f) entities = cache_data.get('entities', []) relations = cache_data.get('relations', []) if not entities: log_structured('warning', 'Triples cache file exists but contains no entities') return False log_structured('info', 'Restoring triples from disk cache to Neo4j', { 'entity_count': len(entities), 'relation_count': len(relations), 'cached_triplet_count': cache_data.get('triplet_count', 'unknown') }) # Restore entities (nodes) to Neo4j self.property_graph_store.upsert_nodes(entities) log_structured('info', f'Restored {len(entities)} entity nodes to Neo4j') # Restore relations to Neo4j if relations: self.property_graph_store.upsert_relations(relations) log_structured('info', f'Restored {len(relations)} relations to Neo4j') # Verify restoration restored_triplets = self.get_triplets() log_structured('info', f'Neo4j now contains {len(restored_triplets)} triplets after cache restore') return len(restored_triplets) > 0 except Exception as e: log_structured('error', f'Error restoring triples from cache: {e}') return False ``` #### 1d. Modify `create_graph_components()` to use the cache The existing function has two branches: 1. Neo4j has data and `force_reindex=False` → skip indexing 2. Else → run LLM extraction Change this to **three** branches: ``` 1. Neo4j has data and !force_reindex → skip indexing (also save triples to cache if not already cached) 2. Neo4j is empty and !force_reindex and triples cache exists → restore from cache 3. Else → full LLM extraction (also save triples to cache after) ``` Here is the full replacement logic for the branching section inside `create_graph_components()`: ```python # Check if Neo4j already has content triplets = graph_store.get_triplets() has_existing_content = len(triplets) > 0 log_structured('info', f'Neo4j check: Found {len(triplets)} triplets') if has_existing_content and not force_reindex: # BRANCH 1: Neo4j has data — use it, but also ensure disk cache exists log_structured('info', f'Neo4j already contains {len(triplets)} triplets. Skipping indexing.') # Ensure triples are also cached to disk for future recovery if not graph_store.TRIPLES_CACHE_FILE.exists(): log_structured('info', 'Neo4j has data but no triples cache on disk — creating cache now') graph_store.save_triples_to_cache() property_graph_index = PropertyGraphIndex( nodes=[], property_graph_store=property_graph_store, ) if not graph_store.communities_built: log_structured('info', 'Building graph communities from existing Neo4j data') try: graph_store.build_communities() except Exception as e: log_structured('error', f'Error building communities: {e}') elif not has_existing_content and not force_reindex and graph_store.TRIPLES_CACHE_FILE.exists(): # BRANCH 2: Neo4j is empty but triples cache exists — restore from cache log_structured('info', 'Neo4j is empty but triples cache exists. Restoring from disk cache.') restored = graph_store.load_triples_from_cache() if restored: log_structured('info', 'Successfully restored triples from cache.') property_graph_index = PropertyGraphIndex( nodes=[], property_graph_store=property_graph_store, ) if not graph_store.communities_built: try: graph_store.build_communities() except Exception as e: log_structured('error', f'Error building communities from restored data: {e}') else: # Cache restore failed — fall through to LLM extraction log_structured('warning', 'Triples cache restore failed. Falling back to LLM extraction.') if not nodes: raise ValueError("Nodes must be provided when Neo4j is empty and cache restore fails") kg_extractor = GraphRAGExtractor( llm=llm, extract_prompt=KG_TRIPLET_EXTRACT_TMPL, max_paths_per_chunk=max_paths_per_chunk, parse_fn=custom_parse_fn, ) property_graph_index = PropertyGraphIndex( nodes=nodes, kg_extractors=[kg_extractor], property_graph_store=property_graph_store, show_progress=True, ) graph_store.save_triples_to_cache() try: graph_store.build_communities() except Exception as e: log_structured('error', f'Error building communities: {e}') else: # BRANCH 3: Full LLM extraction (force_reindex or no cache) if not nodes: raise ValueError("Nodes must be provided for indexing when Neo4j is empty or force_reindex=True") kg_extractor = GraphRAGExtractor( llm=llm, extract_prompt=KG_TRIPLET_EXTRACT_TMPL, max_paths_per_chunk=max_paths_per_chunk, parse_fn=custom_parse_fn, ) if has_existing_content and force_reindex: # Clear Neo4j before re-extraction try: from neo4j import GraphDatabase driver = GraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USERNAME, NEO4J_PASSWORD)) with driver.session() as session: session.run("MATCH (n) DETACH DELETE n") driver.close() except Exception as e: log_structured('warning', f'Error clearing Neo4j database: {e}') property_graph_index = PropertyGraphIndex( nodes=nodes, kg_extractors=[kg_extractor], property_graph_store=property_graph_store, show_progress=True, ) # Cache the newly extracted triples graph_store.save_triples_to_cache() try: graph_store.build_communities() except Exception as e: log_structured('error', f'Error building communities: {e}') return graph_store, property_graph_index ``` --- ## Change 2: Add GraphRAG Status Flags to Shared State ### File: Shared State Module (e.g., `shared_state.py`) #### 2a. Add new module-level variables After the existing GraphRAG component variables, add: ```python # GraphRAG initialization status graphrag_ready = False graphrag_initializing = False graphrag_error = None ``` #### 2b. Add setter/getter functions Add these functions before `is_agent_available()`: ```python def set_graphrag_status(ready=None, initializing=None, error=None): """Update GraphRAG initialization status flags.""" global graphrag_ready, graphrag_initializing, graphrag_error from utils import log_structured if ready is not None: graphrag_ready = ready if initializing is not None: graphrag_initializing = initializing if error is not None: graphrag_error = error log_structured('info', 'GraphRAG status updated', { 'ready': graphrag_ready, 'initializing': graphrag_initializing, 'error': str(graphrag_error) if graphrag_error else None }) def get_graphrag_status(): """Get current GraphRAG initialization status.""" return { 'ready': graphrag_ready, 'initializing': graphrag_initializing, 'error': str(graphrag_error) if graphrag_error else None } ``` --- ## Change 3: Split Initialization into Two Phases ### File: AI Core Module (e.g., `ai_core.py`) This is the largest change. You'll split your monolithic initialization function into: 1. **`initialize_vector_index()`** — Fast path (~1-2 min). Loads/creates vector index, creates vector query engine, creates agent with vector-only tools, attaches the `run` method. The server becomes usable after this completes. 2. **`initialize_graphrag_components()`** — Background path. Connects to Neo4j, restores/extracts triples, builds communities, creates GraphRAG query engine, and **dynamically adds the GraphRAG tool** to the existing agent's tools list. 3. **`initialize_global_index()`** — Kept as backward-compatible wrapper that calls both in sequence. #### 3a. Update imports from shared_state Add `set_graphrag_status` to your shared_state imports: ```python from shared_state import ( ..., set_graphrag_status ) ``` #### 3b. Extract your GraphRAG Tool class to module level If your GraphRAG tool class (the `BaseTool` subclass that wraps `GraphRAGQueryEngine`) is defined inside the initialization function, move it to module level. This makes it reusable from both the init function and the background init function. The tool class should remain functionally identical — just move it out of the function body to module scope. Example: ```python class GraphRAGTool(BaseTool): """Tool that queries using both vector and graph-based retrieval.""" def __init__(self, query_engine): self.query_engine = query_engine self._metadata = ToolMetadata( name="answerquestionswith_graphrag", description="Your tool description here" ) @property def metadata(self): return self._metadata def __call__(self, query_str: str) -> ToolOutput: # ... existing implementation unchanged ... pass async def acall(self, input: str) -> ToolOutput: return self.__call__(input) ``` #### 3c. Create `initialize_vector_index()` This function contains everything from your original `initialize_global_index()` **except** the GraphRAG-related code: - LLM and embedding model setup - Global Settings configuration - Vector index loading or creation (including document processing for cold starts) - Vector query engine creation - Agent creation with **vector-only** tools - `simple_run` method attachment and agent testing **Key difference from the original**: No GraphRAG components, no Neo4j connection, no community building. The agent is created with only the vector query tool. The `simple_run` function works without modification because it dynamically looks up the GraphRAG tool: ```python graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None) if graphrag_tool: # use it # else: falls through to vector-only path ``` When GraphRAG isn't initialized yet, `graphrag_tool` is `None` and the code gracefully falls back to the direct response path. #### 3d. Create `initialize_graphrag_components()` This function handles everything GraphRAG-related: ```python async def initialize_graphrag_components() -> bool: """Initialize GraphRAG components in the background.""" set_graphrag_status(initializing=True, ready=False, error=None) try: # Get current index and agent from shared state from shared_state import global_workflow_agent as current_agent import shared_state index = shared_state.global_index if index is None: raise RuntimeError("Vector index must be initialized first") if current_agent is None: raise RuntimeError("Agent must be initialized first") llm = Settings.llm # Connect to Neo4j property_graph_store = Neo4jPropertyGraphStore( username=NEO4J_USERNAME, password=NEO4J_PASSWORD, url=NEO4J_URL ) # Check Neo4j state and create/restore components temp_graph_store = GraphRAGStore(property_graph_store) triplets = temp_graph_store.get_triplets() neo4j_has_data = len(triplets) > 0 if neo4j_has_data: graph_store, property_graph_index = create_graph_components( llm=llm, force_reindex=False ) else: # Get nodes from vector index for potential extraction vector_nodes = [] if hasattr(index, 'docstore') and index.docstore: vector_nodes = list(index.docstore.docs.values()) if not vector_nodes: raise ValueError("No nodes available for GraphRAG indexing") # create_graph_components will try cache restore first, then LLM extraction graph_store, property_graph_index = create_graph_components( llm=llm, nodes=vector_nodes, max_paths_per_chunk=10, force_reindex=False # Allow cache restore path ) # Ensure communities are built if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built: graph_store.build_communities() # Create GraphRAG query engine vector_retriever = VectorIndexRetriever( index=index, similarity_top_k=SIMILARITY_TOP_K ) graphrag_query_engine = create_graphrag_query_engine( vector_retriever=vector_retriever, graph_store=graph_store, llm=llm, similarity_top_k=SIMILARITY_TOP_K ) # Store in shared state set_graphrag_components( graph_store=graph_store, property_graph_index=property_graph_index, graphrag_query_engine=graphrag_query_engine ) # *** KEY STEP: Add GraphRAG tool to the EXISTING agent *** graphrag_tool = GraphRAGTool(graphrag_query_engine) from shared_state import global_workflow_agent as live_agent if live_agent is not None: live_agent.tools.append(graphrag_tool) log_structured('info', 'Added GraphRAG tool to existing agent', { 'total_tools': len(live_agent.tools), 'tool_names': [t.metadata.name for t in live_agent.tools] }) set_graphrag_status(ready=True, initializing=False) return True except Exception as e: log_structured('error', f'GraphRAG background initialization failed: {e}') set_graphrag_status(ready=False, initializing=False, error=e) return False ``` **Critical detail**: The GraphRAG tool is added to the agent's tools list dynamically via `live_agent.tools.append(graphrag_tool)`. Because `simple_run` does a dynamic lookup on `global_workflow_agent.tools` every time it runs, the GraphRAG tool becomes available to all subsequent requests without restarting the agent. #### 3e. Keep `initialize_global_index()` as a backward-compatible wrapper ```python async def initialize_global_index() -> bool: """Backward-compatible wrapper: initializes everything synchronously.""" vector_success = await initialize_vector_index() if not vector_success: return False graphrag_success = await initialize_graphrag_components() if not graphrag_success: log_structured('warning', 'GraphRAG init failed, but vector search is available') return True ``` --- ## Change 4: Update Main Startup for Background Init ### File: Main Module (e.g., `main.py`) #### 4a. Update imports ```python from ai_core import initialize_global_index, initialize_vector_index, initialize_graphrag_components from shared_state import global_workflow_agent, is_agent_available, get_graphrag_status ``` #### 4b. Modify `startup_event()` for two-phase startup ```python async def startup_event() -> bool: """Phase 1 (sync): MongoDB + vector index. Phase 2 (background): GraphRAG.""" log_structured('info', "Application startup sequence initiated.") all_success = True # 1. Initialize MongoDB try: if init_mongodb(): log_structured('info', "MongoDB initialized successfully.") else: all_success = False except Exception as db_err: log_structured('critical', f"MongoDB init failed: {db_err}") all_success = False # 2. Phase 1: Vector index + agent (fast) log_structured('info', "Phase 1: Initializing vector index and agent...") vector_success = await initialize_vector_index() if not is_agent_available() or not vector_success: log_structured('critical', "Vector initialization failed") all_success = False else: log_structured('info', "Phase 1 complete: server is ready for vector queries") # 3. Phase 2: GraphRAG in background if vector_success: log_structured('info', "Phase 2: Launching GraphRAG initialization in background...") async def _background_graphrag_init(): try: success = await initialize_graphrag_components() if success: log_structured('info', "GraphRAG background init completed") else: log_structured('warning', "GraphRAG background init failed — vector search still works") except Exception as e: log_structured('error', f"GraphRAG background init error: {e}") # Schedule as background task — does NOT block server startup asyncio.ensure_future(_background_graphrag_init()) log_structured('info', f"Startup complete. Server ready: {all_success}") return all_success ``` #### 4c. Restructure the main execution block The old pattern of `asyncio.run(startup_event())` followed by `asyncio.run(serve(app, config))` won't work because background tasks launched by `asyncio.ensure_future()` need to run in the same event loop as the server. Restructure: ```python if __name__ == '__main__': from hypercorn.config import Config as HypercornConfig from hypercorn.asyncio import serve as hypercorn_serve config = HypercornConfig() # ... config setup ... async def run_server_with_startup(): """Run startup (with background GraphRAG init) then serve.""" await startup_event() # Double-check agent if not is_agent_available(): log_structured('critical', "Agent unavailable. Forcing re-init...") await initialize_vector_index() if is_agent_available(): asyncio.ensure_future(initialize_graphrag_components()) # Start serving — background GraphRAG init continues in same event loop await hypercorn_serve(app, config) try: asyncio.run(run_server_with_startup()) except KeyboardInterrupt: log_structured('info', "Server stopped manually.") ``` **Why this matters**: `asyncio.ensure_future()` schedules a coroutine on the *current* event loop. If you use two separate `asyncio.run()` calls, the background task from the first call is lost when the first event loop closes. By wrapping everything in a single `async def` and calling `asyncio.run()` once, the background GraphRAG init shares the server's event loop and runs concurrently with request handling. --- ## Change 5: Update Routes for Graceful Degradation ### File: Routes Module (e.g., `routes.py`) #### 5a. Update imports ```python from ai_core import initialize_global_index, initialize_vector_index from shared_state import ..., get_graphrag_status ``` #### 5b. Update `/status` endpoint Add GraphRAG status to the response: ```python graphrag_status = get_graphrag_status() status_data = { # ... existing fields ... 'graphrag_ready': graphrag_status['ready'], 'graphrag_initializing': graphrag_status['initializing'], 'graphrag_error': graphrag_status['error'], } ``` #### 5c. Update on-demand initialization in `/chat` If the `/chat` endpoint has a fallback that tries to initialize on-demand when the agent is unavailable, change it to use `initialize_vector_index()` instead of `initialize_global_index()` for faster recovery: ```python # Old: index_success = await initialize_global_index() # New: index_success = await initialize_vector_index() ``` #### 5d. No changes needed for chat logic The `/chat` endpoint itself needs no changes for graceful degradation. The agent's `simple_run` method already dynamically checks for GraphRAG tool availability: ```python graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None) if graphrag_tool: # Use GraphRAG — only happens after background init completes else: # Fall through to vector-only response ``` --- ## Change 6: Docker Compose Documentation ### File: Neo4j Docker Compose (e.g., `docker-compose-neo4j.yml`) Add a comment at the top explaining volume persistence: ```yaml # IMPORTANT: Neo4j data persists in bind-mounted directories (./neo4j/data, etc.). # These survive `docker-compose down` and `docker-compose up -d` restarts. # However, if you delete the data directory manually, all graph data # (extracted triples, entities, relationships) will be lost and must be # re-extracted via LLM calls (10-20+ minutes) or restored from the disk cache # at index_storage/graphrag_cache/neo4j_triples.pickle. ``` --- ## Verification Checklist ### Test 1: Warm start (Neo4j has data, vector index cached) - [ ] Server starts and accepts chat requests within ~1-2 minutes - [ ] `/status` shows `graphrag_initializing: true` briefly, then `graphrag_ready: true` - [ ] Chat requests work with vector search immediately - [ ] GraphRAG tool becomes available after background init completes ### Test 2: Cold Neo4j with caches (Neo4j empty, `index_storage/` intact) - [ ] Server starts with vector queries in ~1-2 minutes - [ ] Background init restores triples from `neo4j_triples.pickle` (check logs for "Restoring triples from disk cache") - [ ] No LLM extraction calls (check logs — should NOT see "Building PropertyGraphIndex via LLM extraction") - [ ] GraphRAG becomes available after cache restore + community build ### Test 3: Full cold start (no `index_storage/`, no Neo4j data) - [ ] Full rebuild runs (document parsing → vector index → LLM extraction → community building) - [ ] `neo4j_triples.pickle` is created after LLM extraction - [ ] Community cache files are created after community building - [ ] Everything works on subsequent restarts using caches ### Test 4: Degraded mode - [ ] Send a chat request before GraphRAG finishes initializing - [ ] Response comes back using vector-only search - [ ] No errors in response — just missing GraphRAG context - [ ] After GraphRAG init completes, subsequent requests use both vector and GraphRAG ### Test 5: Cache integrity - [ ] After a successful startup with Neo4j data, verify `neo4j_triples.pickle` exists in `index_storage/graphrag_cache/` - [ ] Delete Neo4j data, restart — verify cache restore works - [ ] Delete the pickle file, restart with Neo4j data — verify it gets recreated --- ## File Change Summary | File | Changes | |------|---------| | Graph RAG integration module | `save_triples_to_cache()`, `load_triples_from_cache()`, 3-branch logic in `create_graph_components()` | | Shared state module | `graphrag_ready`/`graphrag_initializing`/`graphrag_error` flags, `set_graphrag_status()`, `get_graphrag_status()` | | AI core module | Extract GraphRAG tool class to module level, split into `initialize_vector_index()` + `initialize_graphrag_components()`, keep `initialize_global_index()` as wrapper | | Main module | Two-phase startup in `startup_event()`, single `asyncio.run()` wrapping both startup and serve | | Routes module | Import new functions, add GraphRAG status to `/status`, use `initialize_vector_index()` for on-demand init | | Docker compose | Documentation comment about volume persistence |