hp_chatbot/documentation/graphrag-startup-optimization-guide.md
michael 5554aa043f Add GraphRAG startup optimization: triple caching and background init
- Cache extracted triples to disk (neo4j_triples.pickle) so Neo4j can be
  repopulated without expensive LLM re-extraction on cold starts
- Split initialization into two phases: fast vector-only (~1-2 min) and
  background GraphRAG, so the server serves requests while GraphRAG loads
- Add GraphRAG status flags to shared_state for monitoring readiness
- Update /status endpoint to expose graphrag_ready/initializing/error
- Restructure main.py to use single event loop for background task support

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-02-23 17:45:05 -06:00

28 KiB

GraphRAG Startup Optimization Guide

Purpose

This document describes a set of changes to drastically reduce startup time for an app built on the LlamaIndex + Neo4j GraphRAG + Flask/Hypercorn architecture. The changes achieve two things:

  1. Triple caching: Extracted entities/relationships are cached to disk so Neo4j can be repopulated without expensive LLM re-extraction.
  2. Background initialization: The server starts serving requests with vector-only search in ~1-2 minutes, while GraphRAG components initialize in the background.

Problem

On a cold start (or when Neo4j loses its data), the app spends:

Phase Duration Already Cached?
Document parsing (LlamaParse) 20-30 min Yes (vector index on disk)
Vector embeddings 10-15 min Yes (vector index on disk)
Triple extraction 10-20 min No — only lives in Neo4j
Community summarization 10-30 min Yes (pickle cache)
Community detection 5-10 min Yes (pickle cache)

The two bolded phases are the bottleneck. The app can't serve any requests until all of this completes.


Architecture Assumptions

This guide assumes your app has:

  • A graph store wrapper class (e.g., GraphRAGStore) that wraps Neo4jPropertyGraphStore and handles community detection/summarization with existing pickle caching for community data.
  • A create_graph_components() function that connects to Neo4j, checks for existing data, and either reuses data or runs LLM extraction via PropertyGraphIndex.
  • A monolithic initialization function (e.g., initialize_global_index()) that does everything: LLM setup, vector index loading, GraphRAG setup, agent creation.
  • A shared state module that stores global references to the agent, index, and GraphRAG components.
  • A main.py that runs startup synchronously before the server starts.
  • A routes module with a /status endpoint and a /chat endpoint that checks agent availability.

Adapt file names, class names, and function signatures to match your codebase.


Change 1: Cache Extracted Triples to Disk

What

Add two methods to your GraphRAGStore class:

  • save_triples_to_cache() — After successful LLM extraction, save entities and relations to a pickle file.
  • load_triples_from_cache() — On startup, if Neo4j is empty but the cache exists, restore triples to Neo4j directly.

Then modify create_graph_components() to use these methods.

File: Graph RAG Integration Module (e.g., graph_rag_integration.py)

1a. Add a triples cache file path to the store class

In the class-level constants of your GraphRAGStore (next to the existing community cache paths), add:

TRIPLES_CACHE_FILE = CACHE_DIR / "neo4j_triples.pickle"

1b. Add save_triples_to_cache() method

Add this method to GraphRAGStore, after get_triplets():

def save_triples_to_cache(self):
    """Save extracted triples (entities + relationships) from Neo4j to a disk cache.

    This allows restoring triples to Neo4j without expensive LLM re-extraction
    if Neo4j data is lost (e.g., container recreated without volume persistence).
    """
    try:
        triplets = self.get_triplets()
        if not triplets:
            log_structured('warning', 'No triplets to cache — Neo4j appears empty')
            return False

        # Collect unique entities and relations from the triplets
        entities = {}
        relations = []
        for entity1, relation, entity2 in triplets:
            entities[entity1.name] = entity1
            entities[entity2.name] = entity2
            relations.append(relation)

        cache_data = {
            'entities': list(entities.values()),
            'relations': relations,
            'triplet_count': len(triplets),
        }

        with open(self.TRIPLES_CACHE_FILE, 'wb') as f:
            pickle.dump(cache_data, f)

        log_structured('info', 'Successfully cached Neo4j triples to disk', {
            'entity_count': len(entities),
            'relation_count': len(relations),
            'triplet_count': len(triplets),
            'cache_file': str(self.TRIPLES_CACHE_FILE)
        })
        return True
    except Exception as e:
        log_structured('error', f'Error saving triples cache: {e}')
        return False

1c. Add load_triples_from_cache() method

Add this method right after save_triples_to_cache():

def load_triples_from_cache(self):
    """Load triples from disk cache and restore them to Neo4j.

    Returns True if triples were successfully restored, False otherwise.
    """
    if not self.TRIPLES_CACHE_FILE.exists():
        log_structured('info', 'No triples cache file found')
        return False

    try:
        with open(self.TRIPLES_CACHE_FILE, 'rb') as f:
            cache_data = pickle.load(f)

        entities = cache_data.get('entities', [])
        relations = cache_data.get('relations', [])

        if not entities:
            log_structured('warning', 'Triples cache file exists but contains no entities')
            return False

        log_structured('info', 'Restoring triples from disk cache to Neo4j', {
            'entity_count': len(entities),
            'relation_count': len(relations),
            'cached_triplet_count': cache_data.get('triplet_count', 'unknown')
        })

        # Restore entities (nodes) to Neo4j
        self.property_graph_store.upsert_nodes(entities)
        log_structured('info', f'Restored {len(entities)} entity nodes to Neo4j')

        # Restore relations to Neo4j
        if relations:
            self.property_graph_store.upsert_relations(relations)
            log_structured('info', f'Restored {len(relations)} relations to Neo4j')

        # Verify restoration
        restored_triplets = self.get_triplets()
        log_structured('info', f'Neo4j now contains {len(restored_triplets)} triplets after cache restore')

        return len(restored_triplets) > 0
    except Exception as e:
        log_structured('error', f'Error restoring triples from cache: {e}')
        return False

1d. Modify create_graph_components() to use the cache

The existing function has two branches:

  1. Neo4j has data and force_reindex=False → skip indexing
  2. Else → run LLM extraction

Change this to three branches:

1. Neo4j has data and !force_reindex → skip indexing (also save triples to cache if not already cached)
2. Neo4j is empty and !force_reindex and triples cache exists → restore from cache
3. Else → full LLM extraction (also save triples to cache after)

Here is the full replacement logic for the branching section inside create_graph_components():

# Check if Neo4j already has content
triplets = graph_store.get_triplets()
has_existing_content = len(triplets) > 0

log_structured('info', f'Neo4j check: Found {len(triplets)} triplets')

if has_existing_content and not force_reindex:
    # BRANCH 1: Neo4j has data — use it, but also ensure disk cache exists
    log_structured('info', f'Neo4j already contains {len(triplets)} triplets. Skipping indexing.')

    # Ensure triples are also cached to disk for future recovery
    if not graph_store.TRIPLES_CACHE_FILE.exists():
        log_structured('info', 'Neo4j has data but no triples cache on disk — creating cache now')
        graph_store.save_triples_to_cache()

    property_graph_index = PropertyGraphIndex(
        nodes=[],
        property_graph_store=property_graph_store,
    )

    if not graph_store.communities_built:
        log_structured('info', 'Building graph communities from existing Neo4j data')
        try:
            graph_store.build_communities()
        except Exception as e:
            log_structured('error', f'Error building communities: {e}')

elif not has_existing_content and not force_reindex and graph_store.TRIPLES_CACHE_FILE.exists():
    # BRANCH 2: Neo4j is empty but triples cache exists — restore from cache
    log_structured('info', 'Neo4j is empty but triples cache exists. Restoring from disk cache.')

    restored = graph_store.load_triples_from_cache()
    if restored:
        log_structured('info', 'Successfully restored triples from cache.')

        property_graph_index = PropertyGraphIndex(
            nodes=[],
            property_graph_store=property_graph_store,
        )

        if not graph_store.communities_built:
            try:
                graph_store.build_communities()
            except Exception as e:
                log_structured('error', f'Error building communities from restored data: {e}')
    else:
        # Cache restore failed — fall through to LLM extraction
        log_structured('warning', 'Triples cache restore failed. Falling back to LLM extraction.')
        if not nodes:
            raise ValueError("Nodes must be provided when Neo4j is empty and cache restore fails")

        kg_extractor = GraphRAGExtractor(
            llm=llm,
            extract_prompt=KG_TRIPLET_EXTRACT_TMPL,
            max_paths_per_chunk=max_paths_per_chunk,
            parse_fn=custom_parse_fn,
        )

        property_graph_index = PropertyGraphIndex(
            nodes=nodes,
            kg_extractors=[kg_extractor],
            property_graph_store=property_graph_store,
            show_progress=True,
        )

        graph_store.save_triples_to_cache()

        try:
            graph_store.build_communities()
        except Exception as e:
            log_structured('error', f'Error building communities: {e}')

else:
    # BRANCH 3: Full LLM extraction (force_reindex or no cache)
    if not nodes:
        raise ValueError("Nodes must be provided for indexing when Neo4j is empty or force_reindex=True")

    kg_extractor = GraphRAGExtractor(
        llm=llm,
        extract_prompt=KG_TRIPLET_EXTRACT_TMPL,
        max_paths_per_chunk=max_paths_per_chunk,
        parse_fn=custom_parse_fn,
    )

    if has_existing_content and force_reindex:
        # Clear Neo4j before re-extraction
        try:
            from neo4j import GraphDatabase
            driver = GraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))
            with driver.session() as session:
                session.run("MATCH (n) DETACH DELETE n")
            driver.close()
        except Exception as e:
            log_structured('warning', f'Error clearing Neo4j database: {e}')

    property_graph_index = PropertyGraphIndex(
        nodes=nodes,
        kg_extractors=[kg_extractor],
        property_graph_store=property_graph_store,
        show_progress=True,
    )

    # Cache the newly extracted triples
    graph_store.save_triples_to_cache()

    try:
        graph_store.build_communities()
    except Exception as e:
        log_structured('error', f'Error building communities: {e}')

return graph_store, property_graph_index

Change 2: Add GraphRAG Status Flags to Shared State

File: Shared State Module (e.g., shared_state.py)

2a. Add new module-level variables

After the existing GraphRAG component variables, add:

# GraphRAG initialization status
graphrag_ready = False
graphrag_initializing = False
graphrag_error = None

2b. Add setter/getter functions

Add these functions before is_agent_available():

def set_graphrag_status(ready=None, initializing=None, error=None):
    """Update GraphRAG initialization status flags."""
    global graphrag_ready, graphrag_initializing, graphrag_error
    from utils import log_structured

    if ready is not None:
        graphrag_ready = ready
    if initializing is not None:
        graphrag_initializing = initializing
    if error is not None:
        graphrag_error = error

    log_structured('info', 'GraphRAG status updated', {
        'ready': graphrag_ready,
        'initializing': graphrag_initializing,
        'error': str(graphrag_error) if graphrag_error else None
    })

def get_graphrag_status():
    """Get current GraphRAG initialization status."""
    return {
        'ready': graphrag_ready,
        'initializing': graphrag_initializing,
        'error': str(graphrag_error) if graphrag_error else None
    }

Change 3: Split Initialization into Two Phases

File: AI Core Module (e.g., ai_core.py)

This is the largest change. You'll split your monolithic initialization function into:

  1. initialize_vector_index() — Fast path (~1-2 min). Loads/creates vector index, creates vector query engine, creates agent with vector-only tools, attaches the run method. The server becomes usable after this completes.
  2. initialize_graphrag_components() — Background path. Connects to Neo4j, restores/extracts triples, builds communities, creates GraphRAG query engine, and dynamically adds the GraphRAG tool to the existing agent's tools list.
  3. initialize_global_index() — Kept as backward-compatible wrapper that calls both in sequence.

3a. Update imports from shared_state

Add set_graphrag_status to your shared_state imports:

from shared_state import (
    ...,
    set_graphrag_status
)

3b. Extract your GraphRAG Tool class to module level

If your GraphRAG tool class (the BaseTool subclass that wraps GraphRAGQueryEngine) is defined inside the initialization function, move it to module level. This makes it reusable from both the init function and the background init function.

The tool class should remain functionally identical — just move it out of the function body to module scope. Example:

class GraphRAGTool(BaseTool):
    """Tool that queries using both vector and graph-based retrieval."""

    def __init__(self, query_engine):
        self.query_engine = query_engine
        self._metadata = ToolMetadata(
            name="answerquestionswith_graphrag",
            description="Your tool description here"
        )

    @property
    def metadata(self):
        return self._metadata

    def __call__(self, query_str: str) -> ToolOutput:
        # ... existing implementation unchanged ...
        pass

    async def acall(self, input: str) -> ToolOutput:
        return self.__call__(input)

3c. Create initialize_vector_index()

This function contains everything from your original initialize_global_index() except the GraphRAG-related code:

  • LLM and embedding model setup
  • Global Settings configuration
  • Vector index loading or creation (including document processing for cold starts)
  • Vector query engine creation
  • Agent creation with vector-only tools
  • simple_run method attachment and agent testing

Key difference from the original: No GraphRAG components, no Neo4j connection, no community building. The agent is created with only the vector query tool.

The simple_run function works without modification because it dynamically looks up the GraphRAG tool:

graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
if graphrag_tool:
    # use it
# else: falls through to vector-only path

When GraphRAG isn't initialized yet, graphrag_tool is None and the code gracefully falls back to the direct response path.

3d. Create initialize_graphrag_components()

This function handles everything GraphRAG-related:

async def initialize_graphrag_components() -> bool:
    """Initialize GraphRAG components in the background."""
    set_graphrag_status(initializing=True, ready=False, error=None)

    try:
        # Get current index and agent from shared state
        from shared_state import global_workflow_agent as current_agent
        import shared_state
        index = shared_state.global_index

        if index is None:
            raise RuntimeError("Vector index must be initialized first")
        if current_agent is None:
            raise RuntimeError("Agent must be initialized first")

        llm = Settings.llm

        # Connect to Neo4j
        property_graph_store = Neo4jPropertyGraphStore(
            username=NEO4J_USERNAME,
            password=NEO4J_PASSWORD,
            url=NEO4J_URL
        )

        # Check Neo4j state and create/restore components
        temp_graph_store = GraphRAGStore(property_graph_store)
        triplets = temp_graph_store.get_triplets()
        neo4j_has_data = len(triplets) > 0

        if neo4j_has_data:
            graph_store, property_graph_index = create_graph_components(
                llm=llm, force_reindex=False
            )
        else:
            # Get nodes from vector index for potential extraction
            vector_nodes = []
            if hasattr(index, 'docstore') and index.docstore:
                vector_nodes = list(index.docstore.docs.values())

            if not vector_nodes:
                raise ValueError("No nodes available for GraphRAG indexing")

            # create_graph_components will try cache restore first, then LLM extraction
            graph_store, property_graph_index = create_graph_components(
                llm=llm,
                nodes=vector_nodes,
                max_paths_per_chunk=10,
                force_reindex=False  # Allow cache restore path
            )

        # Ensure communities are built
        if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built:
            graph_store.build_communities()

        # Create GraphRAG query engine
        vector_retriever = VectorIndexRetriever(
            index=index, similarity_top_k=SIMILARITY_TOP_K
        )

        graphrag_query_engine = create_graphrag_query_engine(
            vector_retriever=vector_retriever,
            graph_store=graph_store,
            llm=llm,
            similarity_top_k=SIMILARITY_TOP_K
        )

        # Store in shared state
        set_graphrag_components(
            graph_store=graph_store,
            property_graph_index=property_graph_index,
            graphrag_query_engine=graphrag_query_engine
        )

        # *** KEY STEP: Add GraphRAG tool to the EXISTING agent ***
        graphrag_tool = GraphRAGTool(graphrag_query_engine)

        from shared_state import global_workflow_agent as live_agent
        if live_agent is not None:
            live_agent.tools.append(graphrag_tool)
            log_structured('info', 'Added GraphRAG tool to existing agent', {
                'total_tools': len(live_agent.tools),
                'tool_names': [t.metadata.name for t in live_agent.tools]
            })

        set_graphrag_status(ready=True, initializing=False)
        return True

    except Exception as e:
        log_structured('error', f'GraphRAG background initialization failed: {e}')
        set_graphrag_status(ready=False, initializing=False, error=e)
        return False

Critical detail: The GraphRAG tool is added to the agent's tools list dynamically via live_agent.tools.append(graphrag_tool). Because simple_run does a dynamic lookup on global_workflow_agent.tools every time it runs, the GraphRAG tool becomes available to all subsequent requests without restarting the agent.

3e. Keep initialize_global_index() as a backward-compatible wrapper

async def initialize_global_index() -> bool:
    """Backward-compatible wrapper: initializes everything synchronously."""
    vector_success = await initialize_vector_index()
    if not vector_success:
        return False

    graphrag_success = await initialize_graphrag_components()
    if not graphrag_success:
        log_structured('warning', 'GraphRAG init failed, but vector search is available')

    return True

Change 4: Update Main Startup for Background Init

File: Main Module (e.g., main.py)

4a. Update imports

from ai_core import initialize_global_index, initialize_vector_index, initialize_graphrag_components
from shared_state import global_workflow_agent, is_agent_available, get_graphrag_status

4b. Modify startup_event() for two-phase startup

async def startup_event() -> bool:
    """Phase 1 (sync): MongoDB + vector index. Phase 2 (background): GraphRAG."""
    log_structured('info', "Application startup sequence initiated.")
    all_success = True

    # 1. Initialize MongoDB
    try:
        if init_mongodb():
            log_structured('info', "MongoDB initialized successfully.")
        else:
            all_success = False
    except Exception as db_err:
        log_structured('critical', f"MongoDB init failed: {db_err}")
        all_success = False

    # 2. Phase 1: Vector index + agent (fast)
    log_structured('info', "Phase 1: Initializing vector index and agent...")
    vector_success = await initialize_vector_index()

    if not is_agent_available() or not vector_success:
        log_structured('critical', "Vector initialization failed")
        all_success = False
    else:
        log_structured('info', "Phase 1 complete: server is ready for vector queries")

    # 3. Phase 2: GraphRAG in background
    if vector_success:
        log_structured('info', "Phase 2: Launching GraphRAG initialization in background...")

        async def _background_graphrag_init():
            try:
                success = await initialize_graphrag_components()
                if success:
                    log_structured('info', "GraphRAG background init completed")
                else:
                    log_structured('warning', "GraphRAG background init failed — vector search still works")
            except Exception as e:
                log_structured('error', f"GraphRAG background init error: {e}")

        # Schedule as background task — does NOT block server startup
        asyncio.ensure_future(_background_graphrag_init())

    log_structured('info', f"Startup complete. Server ready: {all_success}")
    return all_success

4c. Restructure the main execution block

The old pattern of asyncio.run(startup_event()) followed by asyncio.run(serve(app, config)) won't work because background tasks launched by asyncio.ensure_future() need to run in the same event loop as the server. Restructure:

if __name__ == '__main__':
    from hypercorn.config import Config as HypercornConfig
    from hypercorn.asyncio import serve as hypercorn_serve

    config = HypercornConfig()
    # ... config setup ...

    async def run_server_with_startup():
        """Run startup (with background GraphRAG init) then serve."""
        await startup_event()

        # Double-check agent
        if not is_agent_available():
            log_structured('critical', "Agent unavailable. Forcing re-init...")
            await initialize_vector_index()
            if is_agent_available():
                asyncio.ensure_future(initialize_graphrag_components())

        # Start serving — background GraphRAG init continues in same event loop
        await hypercorn_serve(app, config)

    try:
        asyncio.run(run_server_with_startup())
    except KeyboardInterrupt:
        log_structured('info', "Server stopped manually.")

Why this matters: asyncio.ensure_future() schedules a coroutine on the current event loop. If you use two separate asyncio.run() calls, the background task from the first call is lost when the first event loop closes. By wrapping everything in a single async def and calling asyncio.run() once, the background GraphRAG init shares the server's event loop and runs concurrently with request handling.


Change 5: Update Routes for Graceful Degradation

File: Routes Module (e.g., routes.py)

5a. Update imports

from ai_core import initialize_global_index, initialize_vector_index
from shared_state import ..., get_graphrag_status

5b. Update /status endpoint

Add GraphRAG status to the response:

graphrag_status = get_graphrag_status()

status_data = {
    # ... existing fields ...
    'graphrag_ready': graphrag_status['ready'],
    'graphrag_initializing': graphrag_status['initializing'],
    'graphrag_error': graphrag_status['error'],
}

5c. Update on-demand initialization in /chat

If the /chat endpoint has a fallback that tries to initialize on-demand when the agent is unavailable, change it to use initialize_vector_index() instead of initialize_global_index() for faster recovery:

# Old:
index_success = await initialize_global_index()

# New:
index_success = await initialize_vector_index()

5d. No changes needed for chat logic

The /chat endpoint itself needs no changes for graceful degradation. The agent's simple_run method already dynamically checks for GraphRAG tool availability:

graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
if graphrag_tool:
    # Use GraphRAG — only happens after background init completes
else:
    # Fall through to vector-only response

Change 6: Docker Compose Documentation

File: Neo4j Docker Compose (e.g., docker-compose-neo4j.yml)

Add a comment at the top explaining volume persistence:

# IMPORTANT: Neo4j data persists in bind-mounted directories (./neo4j/data, etc.).
# These survive `docker-compose down` and `docker-compose up -d` restarts.
# However, if you delete the data directory manually, all graph data
# (extracted triples, entities, relationships) will be lost and must be
# re-extracted via LLM calls (10-20+ minutes) or restored from the disk cache
# at index_storage/graphrag_cache/neo4j_triples.pickle.

Verification Checklist

Test 1: Warm start (Neo4j has data, vector index cached)

  • Server starts and accepts chat requests within ~1-2 minutes
  • /status shows graphrag_initializing: true briefly, then graphrag_ready: true
  • Chat requests work with vector search immediately
  • GraphRAG tool becomes available after background init completes

Test 2: Cold Neo4j with caches (Neo4j empty, index_storage/ intact)

  • Server starts with vector queries in ~1-2 minutes
  • Background init restores triples from neo4j_triples.pickle (check logs for "Restoring triples from disk cache")
  • No LLM extraction calls (check logs — should NOT see "Building PropertyGraphIndex via LLM extraction")
  • GraphRAG becomes available after cache restore + community build

Test 3: Full cold start (no index_storage/, no Neo4j data)

  • Full rebuild runs (document parsing → vector index → LLM extraction → community building)
  • neo4j_triples.pickle is created after LLM extraction
  • Community cache files are created after community building
  • Everything works on subsequent restarts using caches

Test 4: Degraded mode

  • Send a chat request before GraphRAG finishes initializing
  • Response comes back using vector-only search
  • No errors in response — just missing GraphRAG context
  • After GraphRAG init completes, subsequent requests use both vector and GraphRAG

Test 5: Cache integrity

  • After a successful startup with Neo4j data, verify neo4j_triples.pickle exists in index_storage/graphrag_cache/
  • Delete Neo4j data, restart — verify cache restore works
  • Delete the pickle file, restart with Neo4j data — verify it gets recreated

File Change Summary

File Changes
Graph RAG integration module save_triples_to_cache(), load_triples_from_cache(), 3-branch logic in create_graph_components()
Shared state module graphrag_ready/graphrag_initializing/graphrag_error flags, set_graphrag_status(), get_graphrag_status()
AI core module Extract GraphRAG tool class to module level, split into initialize_vector_index() + initialize_graphrag_components(), keep initialize_global_index() as wrapper
Main module Two-phase startup in startup_event(), single asyncio.run() wrapping both startup and serve
Routes module Import new functions, add GraphRAG status to /status, use initialize_vector_index() for on-demand init
Docker compose Documentation comment about volume persistence