From fe0d8813414a96047fc0a441edad8ec10f4c73f8 Mon Sep 17 00:00:00 2001 From: michael Date: Mon, 23 Feb 2026 17:33:19 -0600 Subject: [PATCH] Speed up GraphRAG startup with triple caching and background init Server now starts serving vector-only queries in ~1-2 minutes instead of 30-60 minutes. GraphRAG initializes in a background task and its tool is dynamically added to the agent when ready. - Cache extracted triples to disk (neo4j_triples.pickle) so Neo4j can be repopulated without expensive LLM re-extraction - Split initialize_global_index() into initialize_vector_index() (fast) and initialize_graphrag_components() (background) - Add graphrag_ready/graphrag_initializing status flags to shared_state - Launch GraphRAG init as asyncio background task in main.py - Report GraphRAG status in /status endpoint for frontend awareness - Add comprehensive migration guide for applying to other projects Co-Authored-By: Claude Opus 4.6 --- ai_core.py | 683 ++++++++-------- .../graphrag-startup-optimization-guide.md | 738 ++++++++++++++++++ graph_rag_integration.py | 167 +++- main.py | 94 ++- neo4j/docker-compose-neo4j.yml | 7 + routes.py | 27 +- shared_state.py | 32 + 7 files changed, 1318 insertions(+), 430 deletions(-) create mode 100644 documentation/graphrag-startup-optimization-guide.md diff --git a/ai_core.py b/ai_core.py index a4b0eea..a7d8012 100644 --- a/ai_core.py +++ b/ai_core.py @@ -82,9 +82,10 @@ nest_asyncio.apply() # --- Global AI State --- # Import shared state to ensure all modules access the same instances from shared_state import ( - global_index, global_workflow_agent, + global_index, global_workflow_agent, global_graph_store, global_property_graph_index, global_graphrag_query_engine, - set_global_agent, set_global_index, set_graphrag_components + set_global_agent, set_global_index, set_graphrag_components, + set_graphrag_status ) @@ -720,31 +721,96 @@ async def process_documents_in_directory(directory: str, session_id: Optional[st except Exception as close_err: log_structured('error', 'Error closing parser_images client', {'error': str(close_err)}) -# --- Global Index Initialization --- -async def initialize_global_index() -> bool: - """Initialize the global index from Netflix documents at startup.""" - # Use shared state instead of module-level globals +# --- GraphRAG Tool Class (module-level for reuse) --- +class GraphRAGTool(BaseTool): + """Tool that queries Netflix marketing materials using both vector and graph-based retrieval.""" + def __init__(self, query_engine): + self.query_engine = query_engine + self._metadata = ToolMetadata( + name="answerquestionswith_graphrag", + description="USE THIS TOOL FOR ALL QUERIES - Queries Netflix marketing materials using both vector and graph-based retrieval for more comprehensive answers. Use this for complex questions that need context from multiple related documents." + ) + + @property + def metadata(self): + return self._metadata + + def __call__(self, query_str: str) -> ToolOutput: + """Run query through GraphRAG and generate synthesized answer.""" + log_structured('info', 'GraphRAG Tool: Starting dual retrieval', {'query': query_str}) + + try: + retrieval_result = self.query_engine.custom_query(query_str) + final_answer = generate_final_answer(query_str, retrieval_result, self.query_engine.llm) + + log_message = { + 'vector_context_length': len(retrieval_result.get('vector_context', '')), + 'graphrag_context_length': len(retrieval_result.get('graphrag_context', '')), + 'vector_nodes_count': len(retrieval_result.get('vector_nodes', [])), + 'community_ids': retrieval_result.get('community_ids', []) + } + log_structured('info', 'GraphRAG Tool: Retrieval complete', log_message) + + vector_nodes = retrieval_result.get('vector_nodes', []) + + class NodeWrapper: + def __init__(self, nodes): + self.source_nodes = nodes + + tool_output = ToolOutput( + content=final_answer, + tool_name="GraphRAG", + raw_output=NodeWrapper(vector_nodes), + raw_input={"query": query_str, "source": "graphrag", "retrieval_stats": log_message} + ) + + log_structured('debug', 'GraphRAG Tool: Including image metadata in response', + {'node_count': len(vector_nodes)}) + + return tool_output + except Exception as graphrag_err: + log_structured('error', f'Error in GraphRAG tool: {graphrag_err}', + {'traceback': traceback.format_exc()}) + return ToolOutput( + content=f"I encountered an error while retrieving information: {str(graphrag_err)}. Please try again or use the standard query tool.", + tool_name="GraphRAG", + raw_input={"query": query_str}, + raw_output={"error": str(graphrag_err)}, + is_error=True + ) + + async def acall(self, input: str) -> ToolOutput: + """Async version of __call__.""" + return self.__call__(input) + + +# --- Phase 1: Vector Index Initialization (fast path) --- +async def initialize_vector_index() -> bool: + """Initialize the vector index and agent with vector-only tools. + + This is the fast path (~1-2 minutes) that makes the server usable. + GraphRAG components are initialized separately in the background. + + Returns: + bool: True if vector index and agent were initialized successfully. + """ try: # --- Configure LLM and Embedding Model --- - # Check for real API keys openai_key = os.environ.get("OPENAI_API_KEY", "") if not openai_key: log_structured('critical', 'No OpenAI API key provided. Make sure OPENAI_API_KEY is set in your .env file.') return False - - # Log the first few characters of the key for debugging (important to verify it's loaded) + key_preview = openai_key[:4] + "..." if len(openai_key) > 4 else "invalid" log_structured('info', f'Using OpenAI API key starting with: {key_preview}') - - # Normal initialization with real API keys + try: llm = LlamaOpenAI( model=LLM_MODEL, temperature=LLM_TEMPERATURE, timeout=LLM_TIMEOUT ) - # Test the LLM with a simple prompt to ensure it's working test_prompt = "Say 'API key is working' if you can read this." _ = llm.complete(test_prompt) log_structured('info', 'Successfully tested LLM API connection') @@ -757,95 +823,26 @@ async def initialize_global_index() -> bool: Settings.llm = llm Settings.embed_model = embed_model Settings.callback_manager = CallbackManager([token_counter]) - # Settings.chunk_size = NODE_PARSER_CHUNK_SIZE # If using SentenceSplitter - # Settings.chunk_overlap = NODE_PARSER_CHUNK_OVERLAP - # Use Semantic Splitter (more robust, less config needed here) node_parser = SemanticSplitterNodeParser( - buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model # Adjust threshold + buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model ) Settings.node_parser = node_parser - # Alternatively, configure SentenceSplitter globally - # Settings.node_parser = SentenceSplitter( - # chunk_size=NODE_PARSER_CHUNK_SIZE, - # chunk_overlap=NODE_PARSER_CHUNK_OVERLAP, - # # ... other SentenceSplitter params - # ) - # --- Load or Build Index --- + # --- Load or Build Vector Index --- if INDEX_PERSIST_PATH.exists(): log_structured('info', f'Loading existing index from {INDEX_PERSIST_PATH}') storage_context = StorageContext.from_defaults(persist_dir=str(INDEX_PERSIST_PATH)) index = load_index_from_storage(storage_context) - # Save to shared state set_global_index(index) log_structured('info', 'Successfully loaded existing index') - - # Attempt to load or recreate GraphRAG components - try: - log_structured('info', 'Attempting to recreate GraphRAG components from loaded index') - - # First, try to connect to Neo4j to check if it has data - from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore - property_graph_store = Neo4jPropertyGraphStore( - username=NEO4J_USERNAME, - password=NEO4J_PASSWORD, - url=NEO4J_URL - ) - log_structured('info', 'Successfully connected to Neo4j database') - - # Create temporary GraphRAGStore to check for existing data - temp_graph_store = GraphRAGStore(property_graph_store) - triplets = temp_graph_store.get_triplets() - neo4j_has_data = len(triplets) > 0 - - if neo4j_has_data: - # Neo4j already has data, just use it - log_structured('info', f'Neo4j contains {len(triplets)} triplets. Using existing data.') - graph_store, property_graph_index = create_graph_components( - llm=llm, - force_reindex=False # Use existing Neo4j data - ) - log_structured('info', 'GraphRAG components loaded from existing Neo4j data') - else: - # Neo4j is empty, need to extract nodes from the vector index to populate it - log_structured('info', 'Neo4j is empty. Extracting nodes from vector index for GraphRAG indexing.') - - # Get all nodes from the vector index - from llama_index.core.schema import TextNode - vector_nodes = [] - - # Extract nodes from the index's docstore - if hasattr(index, 'docstore') and index.docstore: - docstore_nodes = list(index.docstore.docs.values()) - vector_nodes.extend(docstore_nodes) - log_structured('info', f'Retrieved {len(docstore_nodes)} nodes from index docstore') - - # Handle cases where we can't get nodes directly - if not vector_nodes: - log_structured('warning', 'Could not retrieve nodes from vector index. GraphRAG indexing will be skipped.') - raise ValueError("No nodes could be retrieved from the vector index for GraphRAG indexing") - - # Now create GraphRAG components with the retrieved nodes - graph_store, property_graph_index = create_graph_components( - llm=llm, - nodes=vector_nodes, - max_paths_per_chunk=10, - force_reindex=True # Force indexing since Neo4j is empty - ) - log_structured('info', f'GraphRAG components created with {len(vector_nodes)} nodes from vector index') - except Exception as e: - log_structured('warning', f'Error recreating GraphRAG components: {e}. Continuing without GraphRAG.') - graph_store = None else: log_structured('info', f'No index found at {INDEX_PERSIST_PATH}. Creating new index from {NETFLIX_DOCS_FOLDER}') if not NETFLIX_DOCS_FOLDER.exists() or not any(NETFLIX_DOCS_FOLDER.iterdir()): - log_structured('error', f'Netflix documents folder is missing or empty: {NETFLIX_DOCS_FOLDER}') - return False + log_structured('error', f'Netflix documents folder is missing or empty: {NETFLIX_DOCS_FOLDER}') + return False - # Process documents using LlamaParse documents = await process_documents_in_directory(str(NETFLIX_DOCS_FOLDER), session_id="global_init") - if not documents: log_structured('error', f'No documents processed from {NETFLIX_DOCS_FOLDER}. Index creation aborted.') return False @@ -855,138 +852,42 @@ async def initialize_global_index() -> bool: 'sample_doc_metadata': documents[0].metadata if documents else None }) - # Build the index using the globally configured Settings (incl. node_parser) index = VectorStoreIndex.from_documents( documents=documents, show_progress=True, - # service_context=service_context, # Old way, use Settings now ) - - # Save to shared state set_global_index(index) - - # Persist the index index.storage_context.persist(persist_dir=str(INDEX_PERSIST_PATH)) log_structured('info', f'Index created and persisted to {INDEX_PERSIST_PATH}') - - # --- Create GraphRAG components --- - log_structured('info', 'Starting GraphRAG component creation') - try: - # Get nodes from the index's docstore for GraphRAG - docstore_nodes = [] - if hasattr(index, 'docstore') and index.docstore: - docstore_nodes = list(index.docstore.docs.values()) - log_structured('info', f'Retrieved {len(docstore_nodes)} nodes from index docstore for GraphRAG') - - # If docstore is empty or doesn't exist, use the original documents - nodes_for_graph = docstore_nodes if docstore_nodes else documents - log_structured('info', f'Using {len(nodes_for_graph)} nodes for GraphRAG indexing') - - # Create GraphRAG components - graph_store, property_graph_index = create_graph_components( - llm=llm, - nodes=nodes_for_graph, - max_paths_per_chunk=10, - force_reindex=True # Force indexing for new index creation - ) - log_structured('info', 'GraphRAG components created successfully') - except Exception as graph_err: - log_structured('error', f'Error creating GraphRAG components: {graph_err}', - {'traceback': traceback.format_exc()}) - # Continue without GraphRAG if there's an error - # --- Create Retriever and Query Engine --- + # --- Create Retriever and Query Engine (vector-only) --- vector_store_info = VectorStoreInfo( content_info="Netflix marketing reference materials, including GPD Key Art Playbook and supporting documents.", metadata_info=[ MetadataInfo(name="filename", type="str", description="Filename of the source document"), MetadataInfo(name="source_page", type="int", description="Approximate page number in the source document"), MetadataInfo(name="image_paths", type="list[str]", description="List of image filenames associated with this chunk"), - # Add other relevant metadata fields here ], ) all_retriever = VectorIndexAutoRetriever( - index=index, # Use the local variable, not global_index + index=index, vector_store_info=vector_store_info, similarity_top_k=SIMILARITY_TOP_K, - verbose=True # Enable verbose logging for retriever + verbose=True ) - # Create standard vector-based query engine - all_query_engine = RetrieverQueryEngine.from_args( # Use from_args for clarity + all_query_engine = RetrieverQueryEngine.from_args( retriever=all_retriever, response_synthesizer=get_response_synthesizer( - response_mode=ResponseMode.COMPACT, # Or REFINE, TREE_SUMMARIZE etc. - # service_context=service_context # Old way + response_mode=ResponseMode.COMPACT, ), node_postprocessors=[ SimilarityPostprocessor(similarity_cutoff=SIMILARITY_CUTOFF), ], - # service_context=service_context # Old way ) - - # Create GraphRAG query engine if components were created successfully - graphrag_query_engine = None - graph_store = locals().get('graph_store', None) - property_graph_index = locals().get('property_graph_index', None) - if graph_store is not None and property_graph_index is not None: - try: - # Ensure graph communities are built before creating query engine - if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built: - log_structured('info', 'Building graph communities before creating query engine') - try: - # Use gpt-4.1-mini model for community summaries (set in GraphRAGStore) - # The build_communities() method will first try to load from cache - # and will only rebuild and re-cache if cache loading fails - # It also tracks if communities are already built to avoid duplicate work - graph_store.build_communities() - log_structured('info', 'Communities built successfully (loaded from cache or built new)') - except Exception as comm_err: - log_structured('error', f'Error building communities: {comm_err}', - {'traceback': traceback.format_exc()}) - # Continue with query engine creation even if communities failed - - # Create a basic VectorIndexRetriever for GraphRAG - vector_retriever = VectorIndexRetriever( - index=index, - similarity_top_k=SIMILARITY_TOP_K - ) - - # Create the GraphRAG query engine - ensure all required fields are passed directly - try: - graphrag_query_engine = create_graphrag_query_engine( - vector_retriever=vector_retriever, - graph_store=graph_store, - llm=llm, - similarity_top_k=SIMILARITY_TOP_K - ) - log_structured('info', 'GraphRAG query engine created successfully') - except Exception as e: - log_structured('error', f'Error creating GraphRAG query engine: {e}', {'traceback': traceback.format_exc()}) - # Create a direct instance without using the factory function as a fallback - from graph_rag_integration import GraphRAGQueryEngine - graphrag_query_engine = GraphRAGQueryEngine( - vector_retriever=vector_retriever, - graph_store=graph_store, - llm=llm, - similarity_top_k=SIMILARITY_TOP_K - ) - - # Store GraphRAG components in shared state - set_graphrag_components( - graph_store=graph_store, - property_graph_index=property_graph_index, - graphrag_query_engine=graphrag_query_engine - ) - - log_structured('info', 'GraphRAG query engine created successfully') - except Exception as graph_engine_err: - log_structured('error', f'Error creating GraphRAG query engine: {graph_engine_err}', - {'traceback': traceback.format_exc()}) - # Continue without GraphRAG query engine if there's an error - # --- Create Query Engine Tools --- + # --- Create Query Engine Tools (vector-only initially) --- query_engine_tools_react = [ QueryEngineTool( query_engine=all_query_engine, @@ -996,118 +897,30 @@ async def initialize_global_index() -> bool: ), ), ] - - # Add GraphRAG tool if available - if graphrag_query_engine is not None: - class GraphRAGTool(BaseTool): - def __init__(self, query_engine): - self.query_engine = query_engine - self._metadata = ToolMetadata( - name="answerquestionswith_graphrag", - description="USE THIS TOOL FOR ALL QUERIES - Queries Netflix marketing materials using both vector and graph-based retrieval for more comprehensive answers. Use this for complex questions that need context from multiple related documents." - ) - - @property - def metadata(self): - return self._metadata - - def __call__(self, query_str: str) -> ToolOutput: - """Run query through GraphRAG and generate synthesized answer.""" - from shared_state import global_graphrag_query_engine - - log_structured('info', 'GraphRAG Tool: Starting dual retrieval', {'query': query_str}) - - try: - # Get both vector and GraphRAG retrieval results - retrieval_result = self.query_engine.custom_query(query_str) - - # Generate synthesized answer - final_answer = generate_final_answer(query_str, retrieval_result, self.query_engine.llm) - - # Prepare the tool output - log_message = { - 'vector_context_length': len(retrieval_result.get('vector_context', '')), - 'graphrag_context_length': len(retrieval_result.get('graphrag_context', '')), - 'vector_nodes_count': len(retrieval_result.get('vector_nodes', [])), - 'community_ids': retrieval_result.get('community_ids', []) - } - log_structured('info', 'GraphRAG Tool: Retrieval complete', log_message) - - # Extract vector nodes and create a properly structured raw_output that includes source_nodes - # This allows the routes.py code to extract images from these nodes - vector_nodes = retrieval_result.get('vector_nodes', []) - - # Create a wrapper object that mimics the structure expected by routes.py for image extraction - class NodeWrapper: - def __init__(self, nodes): - self.source_nodes = nodes - - # Preserve the original retrieval_result but add the source_nodes in the expected format - modified_raw_output = retrieval_result.copy() - modified_raw_output['source_nodes'] = vector_nodes - - # Add a source_nodes property that routes.py will look for - tool_output = ToolOutput( - content=final_answer, - tool_name="GraphRAG", - raw_output=NodeWrapper(vector_nodes), - raw_input={"query": query_str, "source": "graphrag", "retrieval_stats": log_message} - ) - - log_structured('debug', 'GraphRAG Tool: Including image metadata in response', - {'node_count': len(vector_nodes)}) - - return tool_output - except Exception as graphrag_err: - log_structured('error', f'Error in GraphRAG tool: {graphrag_err}', - {'traceback': traceback.format_exc()}) - return ToolOutput( - content=f"I encountered an error while retrieving information: {str(graphrag_err)}. Please try again or use the standard query tool.", - tool_name="GraphRAG", - raw_input={"query": query_str}, - raw_output={"error": str(graphrag_err)}, - is_error=True - ) - - async def acall(self, input: str) -> ToolOutput: - """Async version of __call__.""" - return self.__call__(input) - - # Create the GraphRAG tool instance and add it to tools - graphrag_tool = GraphRAGTool(graphrag_query_engine) - query_engine_tools_react.append(graphrag_tool) - log_structured('info', 'Added GraphRAG tool to query engine tools') # --- Initialize Global Workflow Agent --- - # We're now using shared_state rather than module globals - try: - # Create the agent instance agent = ReActAgent2( - llm=llm, # Use the LLM configured via Settings + llm=llm, tools=query_engine_tools_react, - memory=ChatMemoryBuffer.from_defaults(llm=llm, token_limit=4096), # Give memory its own LLM ref - verbose=True, # Enable agent verbose logging + memory=ChatMemoryBuffer.from_defaults(llm=llm, token_limit=4096), + verbose=True, timeout=AGENT_TIMEOUT, llm_timeout=LLM_TIMEOUT, tool_timeout=TOOL_EXECUTION_TIMEOUT ) - - # Store in shared state set_global_agent(agent) - - log_structured('info', 'Agent initialized successfully') + log_structured('info', 'Agent initialized with vector-only tools') except Exception as agent_err: log_structured('critical', f'Failed to initialize agent: {str(agent_err)}', {'error': str(agent_err)}) - return False # Signal failure - + return False + try: # Define a simpler version of the run method async def simple_run(query_text): """Simple version that doesn't rely on complex workflow steps.""" - # Import from shared state to ensure we use the current global agent from shared_state import global_workflow_agent - + if not global_workflow_agent: log_structured('critical', 'Agent is None in simple_run - this should never happen') return { @@ -1115,44 +928,39 @@ async def initialize_global_index() -> bool: "sources": [], "reasoning": [] } - + try: - # Don't reset memory - we want to preserve conversation context between requests - # Only reset sources to track new sources for this specific response global_workflow_agent.sources = [] - + # 1. Add the user query to memory user_msg = ChatMessage(role="user", content=str(query_text)) global_workflow_agent.memory.put(user_msg) - + # 2. Format the chat history chat_history = global_workflow_agent.memory.get() llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history) - + # 3. Get LLM response response = await asyncio.wait_for( global_workflow_agent.llm.achat(llm_input), timeout=global_workflow_agent.llm_timeout ) reasoning_step = global_workflow_agent.output_parser.parse(response.message.content) - - # Add detailed logging + log_structured('debug', 'Parsed reasoning step', { 'step_type': type(reasoning_step).__name__, 'has_response': hasattr(reasoning_step, 'response'), - 'has_action': hasattr(reasoning_step, 'action'), + 'has_action': hasattr(reasoning_step, 'action'), 'action': getattr(reasoning_step, 'action', None), 'action_input': getattr(reasoning_step, 'action_input', None), - 'raw_content': response.message.content[:200] # First 200 chars for debugging + 'raw_content': response.message.content[:200] }) - - # 4. Process the response (simplified) - # Force tool usage for all queries - don't allow direct responses + + # 4. Process the response if hasattr(reasoning_step, 'response') and reasoning_step.response: - # If LLM gave a direct response, we need to force tool usage instead log_structured('debug', 'LLM provided direct response, forcing tool usage for image retrieval') - - # Force tool execution by calling GraphRAG directly + + # Try GraphRAG tool first (dynamically checks current tools list) graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None) if graphrag_tool: try: @@ -1161,10 +969,7 @@ async def initialize_global_index() -> bool: timeout=global_workflow_agent.tool_timeout ) global_workflow_agent.sources.append(tool_output) - - # Use the tool's response instead of the direct LLM response response_text = str(tool_output.content) if tool_output and tool_output.content else str(reasoning_step.response) - assistant_msg = ChatMessage(role="assistant", content=response_text) global_workflow_agent.memory.put(assistant_msg) return { @@ -1174,29 +979,26 @@ async def initialize_global_index() -> bool: } except Exception as e: log_structured('error', f'Error forcing GraphRAG tool execution: {str(e)}') - # Fall back to direct response if tool fails - + # Fallback to cleaning the direct response response_text = str(reasoning_step.response) - - # Check for common thinking patterns and remove them - import re # Import re within the function scope to ensure it's available + + import re thinking_patterns = [ - r'(?i)^.*?thinking:.*?\n', # Remove lines starting with "Thinking:" - r'(?i).*?', # Remove XML-like thinking tags - r'(?i)\[thinking\].*?\[/thinking\]', # Remove bracket thinking tags - r'(?i)I\'m thinking:.*?\n', # Remove "I'm thinking:" sections - r'(?i)Let me think.*?\n', # Remove "Let me think" sections - r'(?i)Thought:.*?Answer:', # Remove "Thought: ... Answer:" pattern - r'(?i)^Answer:\s*' # Remove just the "Answer:" prefix + r'(?i)^.*?thinking:.*?\n', + r'(?i).*?', + r'(?i)\[thinking\].*?\[/thinking\]', + r'(?i)I\'m thinking:.*?\n', + r'(?i)Let me think.*?\n', + r'(?i)Thought:.*?Answer:', + r'(?i)^Answer:\s*' ] for pattern in thinking_patterns: response_text = re.sub(pattern, '', response_text, flags=re.DOTALL) - - # Remove extra newlines that might be left after cleaning + response_text = re.sub(r'\n{3,}', '\n\n', response_text) response_text = response_text.strip() - + assistant_msg = ChatMessage(role="assistant", content=response_text) global_workflow_agent.memory.put(assistant_msg) return { @@ -1205,12 +1007,11 @@ async def initialize_global_index() -> bool: "reasoning": [reasoning_step] } else: - # Handle tool calls if the response indicates an action + # Handle tool calls if hasattr(reasoning_step, 'action') and reasoning_step.action: tool_name = reasoning_step.action action_input = reasoning_step.action_input or {} - - # Convert to dict if needed + if not isinstance(action_input, dict): try: if isinstance(action_input, str) and action_input.strip().startswith('{'): @@ -1219,11 +1020,9 @@ async def initialize_global_index() -> bool: action_input = {'query': action_input} except: action_input = {'query': str(action_input)} - - # Find the tool + tool = next((t for t in global_workflow_agent.tools if t.metadata.name == tool_name), None) if not tool: - error_msg = f"Tool '{tool_name}' not found. Available tools: {[t.metadata.name for t in global_workflow_agent.tools]}" log_structured('error', 'Tool not found in simple_run', {'tool_name': tool_name, 'available_tools': [t.metadata.name for t in global_workflow_agent.tools]}) return { "response": f"I tried to use a tool called '{tool_name}' but it's not available. Please try rephrasing your query.", @@ -1232,48 +1031,41 @@ async def initialize_global_index() -> bool: } if tool: try: - # Execute the tool tool_output = await asyncio.wait_for( tool.acall(**action_input), timeout=global_workflow_agent.tool_timeout ) global_workflow_agent.sources.append(tool_output) - - # Get a final response + observation = str(tool_output.content if tool_output and tool_output.content is not None else "No content") follow_up_msg = ChatMessage(role="user", content=f"Here is the result: {observation}\\nPlease provide a final response based on this information.") global_workflow_agent.memory.put(follow_up_msg) - - # Call LLM again for final response + chat_history = global_workflow_agent.memory.get() llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history) final_response = await asyncio.wait_for( global_workflow_agent.llm.achat(llm_input), timeout=global_workflow_agent.llm_timeout ) - - # Store and return - # Clean the response to remove any thinking parts + response_text = str(final_response.message.content) - - # Check for common thinking patterns and remove them - import re # Import re within the function scope to ensure it's available + + import re thinking_patterns = [ - r'(?i)^.*?thinking:.*?\n', # Remove lines starting with "Thinking:" - r'(?i).*?', # Remove XML-like thinking tags - r'(?i)\[thinking\].*?\[/thinking\]', # Remove bracket thinking tags - r'(?i)I\'m thinking:.*?\n', # Remove "I'm thinking:" sections - r'(?i)Let me think.*?\n', # Remove "Let me think" sections - r'(?i)Thought:.*?Answer:', # Remove "Thought: ... Answer:" pattern - r'(?i)^Answer:\s*' # Remove just the "Answer:" prefix + r'(?i)^.*?thinking:.*?\n', + r'(?i).*?', + r'(?i)\[thinking\].*?\[/thinking\]', + r'(?i)I\'m thinking:.*?\n', + r'(?i)Let me think.*?\n', + r'(?i)Thought:.*?Answer:', + r'(?i)^Answer:\s*' ] for pattern in thinking_patterns: response_text = re.sub(pattern, '', response_text, flags=re.DOTALL) - - # Remove extra newlines that might be left after cleaning + response_text = re.sub(r'\n{3,}', '\n\n', response_text) response_text = response_text.strip() - + global_workflow_agent.memory.put(ChatMessage(role="assistant", content=response_text)) return { "response": response_text, @@ -1282,18 +1074,15 @@ async def initialize_global_index() -> bool: } except Exception as e: log_structured('error', f'Error executing tool: {str(e)}', {'traceback': traceback.format_exc()}) - # Error message is already clean without thinking error_response = f"I encountered an error while processing your query: {str(e)}" return { "response": error_response, "sources": [], "reasoning": [reasoning_step] } - + # Enhanced fallback with thinking detection if isinstance(reasoning_step, ActionReasoningStep): - # If we reach here, it means we had an action but couldn't execute it - thinking_response = f"Thought: {getattr(reasoning_step, 'thought', '')} Action: {reasoning_step.action} Action Input: {reasoning_step.action_input}" log_structured('warning', 'Returning raw thinking due to failed action execution', { 'action': reasoning_step.action, 'action_input': reasoning_step.action_input @@ -1304,7 +1093,6 @@ async def initialize_global_index() -> bool: "reasoning": [reasoning_step] } else: - # Original fallback for other types fallback_response = "I wasn't able to find a specific answer to your question. Please try rephrasing your query." return { "response": fallback_response, @@ -1313,36 +1101,29 @@ async def initialize_global_index() -> bool: } except Exception as e: log_structured('error', f'Error in simple_run: {str(e)}', {'traceback': traceback.format_exc()}) - # Error response is already clean error_response = f"I encountered an error while processing your query: {str(e)}" return { "response": error_response, "sources": [], "reasoning": [] } - - # Replace the run method directly on the agent that is already in shared state - # We need to get the current reference from shared_state + + # Attach the run method to the agent from shared_state import global_workflow_agent as current_agent - + if current_agent is None: log_structured('critical', 'Cannot set run method - global_workflow_agent is None') return False - - # Attach the run method directly + current_agent.run = simple_run - - # Verify it was attached correctly + if not hasattr(current_agent, 'run'): log_structured('critical', 'Failed to attach run method to agent') return False - + log_structured('info', 'Successfully attached run method to agent') - - # Test the agent is working by calling a simple method - log_structured('info', 'Testing agent functionality...') - - # Test on the current_agent we imported above + + # Test the agent if current_agent and hasattr(current_agent, 'memory') and hasattr(current_agent.memory, 'reset'): current_agent.memory.reset() log_structured('info', 'Agent memory reset test successful') @@ -1356,20 +1137,180 @@ async def initialize_global_index() -> bool: else: log_structured('error', 'Agent memory test failed: unknown reason') return False - + except Exception as method_err: - log_structured('critical', f'Failed to set up agent run method: {str(method_err)}', + log_structured('critical', f'Failed to set up agent run method: {str(method_err)}', {'error': str(method_err), 'traceback': traceback.format_exc()}) return False - log_structured('info', 'Global index and workflow agent initialized successfully.') - # Skip test query to avoid potential errors during startup + log_structured('info', 'Vector index and agent initialized successfully (GraphRAG pending).') return True except Exception as e: - log_structured('critical', 'Global index/agent initialization failed', { + log_structured('critical', 'Vector index/agent initialization failed', { 'error': str(e), 'traceback': traceback.format_exc() }) - global_index = None - global_workflow_agent = None - return False \ No newline at end of file + return False + + +# --- Phase 2: GraphRAG Background Initialization --- +async def initialize_graphrag_components() -> bool: + """Initialize GraphRAG components in the background. + + This connects to Neo4j, restores or extracts triples, builds communities, + creates the GraphRAG query engine, and adds the GraphRAG tool to the + existing agent. Can be called after the server is already serving requests. + + Returns: + bool: True if GraphRAG components were initialized successfully. + """ + set_graphrag_status(initializing=True, ready=False, error=None) + + try: + # Get the current index and agent from shared state + from shared_state import global_workflow_agent as current_agent + import shared_state + index = shared_state.global_index + + if index is None: + raise RuntimeError("Vector index must be initialized before GraphRAG components") + if current_agent is None: + raise RuntimeError("Agent must be initialized before GraphRAG components") + + llm = Settings.llm + + # --- Connect to Neo4j and create/restore GraphRAG components --- + log_structured('info', 'Starting GraphRAG background initialization') + + try: + from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore + property_graph_store = Neo4jPropertyGraphStore( + username=NEO4J_USERNAME, + password=NEO4J_PASSWORD, + url=NEO4J_URL + ) + log_structured('info', 'Successfully connected to Neo4j database') + except Exception as e: + raise RuntimeError(f"Neo4j connection failed: {e}") + + # Check Neo4j state + temp_graph_store = GraphRAGStore(property_graph_store) + triplets = temp_graph_store.get_triplets() + neo4j_has_data = len(triplets) > 0 + log_structured('info', f'Neo4j state check: {len(triplets)} triplets found') + + if neo4j_has_data: + # Neo4j already has data — use it directly + log_structured('info', f'Neo4j contains {len(triplets)} triplets. Using existing data.') + graph_store, property_graph_index = create_graph_components( + llm=llm, + force_reindex=False + ) + log_structured('info', 'GraphRAG components loaded from existing Neo4j data') + else: + # Neo4j is empty — try cache restore, then fall back to LLM extraction + log_structured('info', 'Neo4j is empty. Will attempt cache restore or LLM extraction.') + + # Get nodes from vector index for potential LLM extraction + vector_nodes = [] + if hasattr(index, 'docstore') and index.docstore: + vector_nodes = list(index.docstore.docs.values()) + log_structured('info', f'Retrieved {len(vector_nodes)} nodes from index docstore') + + if not vector_nodes: + log_structured('warning', 'Could not retrieve nodes from vector index. GraphRAG indexing will be skipped.') + raise ValueError("No nodes could be retrieved from the vector index for GraphRAG indexing") + + # create_graph_components will automatically try cache restore before LLM extraction + graph_store, property_graph_index = create_graph_components( + llm=llm, + nodes=vector_nodes, + max_paths_per_chunk=10, + force_reindex=False # Allow cache restore path + ) + log_structured('info', 'GraphRAG components created') + + # --- Build GraphRAG query engine --- + # Ensure communities are built + if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built: + log_structured('info', 'Building graph communities before creating query engine') + try: + graph_store.build_communities() + log_structured('info', 'Communities built successfully') + except Exception as comm_err: + log_structured('error', f'Error building communities: {comm_err}', + {'traceback': traceback.format_exc()}) + + vector_retriever = VectorIndexRetriever( + index=index, + similarity_top_k=SIMILARITY_TOP_K + ) + + try: + graphrag_query_engine = create_graphrag_query_engine( + vector_retriever=vector_retriever, + graph_store=graph_store, + llm=llm, + similarity_top_k=SIMILARITY_TOP_K + ) + log_structured('info', 'GraphRAG query engine created successfully') + except Exception as e: + log_structured('error', f'Error creating GraphRAG query engine via factory: {e}') + graphrag_query_engine = GraphRAGQueryEngine( + vector_retriever=vector_retriever, + graph_store=graph_store, + llm=llm, + similarity_top_k=SIMILARITY_TOP_K + ) + + # Store GraphRAG components in shared state + set_graphrag_components( + graph_store=graph_store, + property_graph_index=property_graph_index, + graphrag_query_engine=graphrag_query_engine + ) + + # --- Add GraphRAG tool to the existing agent's tools --- + graphrag_tool = GraphRAGTool(graphrag_query_engine) + + # Re-import current agent to get fresh reference + from shared_state import global_workflow_agent as live_agent + if live_agent is not None: + live_agent.tools.append(graphrag_tool) + log_structured('info', 'Added GraphRAG tool to existing agent', { + 'total_tools': len(live_agent.tools), + 'tool_names': [t.metadata.name for t in live_agent.tools] + }) + else: + log_structured('warning', 'Agent not available when trying to add GraphRAG tool') + + set_graphrag_status(ready=True, initializing=False) + log_structured('info', 'GraphRAG background initialization complete') + return True + + except Exception as e: + log_structured('error', f'GraphRAG background initialization failed: {e}', { + 'traceback': traceback.format_exc() + }) + set_graphrag_status(ready=False, initializing=False, error=e) + return False + + +# --- Backward-compatible wrapper --- +async def initialize_global_index() -> bool: + """Initialize the global index from Netflix documents at startup. + + This is the original entry point that initializes everything synchronously. + For faster startup, use initialize_vector_index() followed by + initialize_graphrag_components() in the background. + """ + vector_success = await initialize_vector_index() + if not vector_success: + return False + + graphrag_success = await initialize_graphrag_components() + if not graphrag_success: + log_structured('warning', 'GraphRAG initialization failed, but vector search is available') + # Return True since vector search works — GraphRAG is optional + + return True \ No newline at end of file diff --git a/documentation/graphrag-startup-optimization-guide.md b/documentation/graphrag-startup-optimization-guide.md new file mode 100644 index 0000000..6435744 --- /dev/null +++ b/documentation/graphrag-startup-optimization-guide.md @@ -0,0 +1,738 @@ +# GraphRAG Startup Optimization Guide + +## Purpose + +This document describes a set of changes to drastically reduce startup time for an app built on the LlamaIndex + Neo4j GraphRAG + Flask/Hypercorn architecture. The changes achieve two things: + +1. **Triple caching**: Extracted entities/relationships are cached to disk so Neo4j can be repopulated without expensive LLM re-extraction. +2. **Background initialization**: The server starts serving requests with vector-only search in ~1-2 minutes, while GraphRAG components initialize in the background. + +--- + +## Problem + +On a cold start (or when Neo4j loses its data), the app spends: + +| Phase | Duration | Already Cached? | +|-------|----------|-----------------| +| Document parsing (LlamaParse) | 20-30 min | Yes (vector index on disk) | +| Vector embeddings | 10-15 min | Yes (vector index on disk) | +| **Triple extraction** | **10-20 min** | **No — only lives in Neo4j** | +| **Community summarization** | **10-30 min** | Yes (pickle cache) | +| Community detection | 5-10 min | Yes (pickle cache) | + +The two bolded phases are the bottleneck. The app can't serve any requests until all of this completes. + +--- + +## Architecture Assumptions + +This guide assumes your app has: + +- A **graph store wrapper class** (e.g., `GraphRAGStore`) that wraps `Neo4jPropertyGraphStore` and handles community detection/summarization with existing pickle caching for community data. +- A **`create_graph_components()` function** that connects to Neo4j, checks for existing data, and either reuses data or runs LLM extraction via `PropertyGraphIndex`. +- A **monolithic initialization function** (e.g., `initialize_global_index()`) that does everything: LLM setup, vector index loading, GraphRAG setup, agent creation. +- A **shared state module** that stores global references to the agent, index, and GraphRAG components. +- A **main.py** that runs startup synchronously before the server starts. +- A **routes module** with a `/status` endpoint and a `/chat` endpoint that checks agent availability. + +Adapt file names, class names, and function signatures to match your codebase. + +--- + +## Change 1: Cache Extracted Triples to Disk + +### What + +Add two methods to your `GraphRAGStore` class: +- `save_triples_to_cache()` — After successful LLM extraction, save entities and relations to a pickle file. +- `load_triples_from_cache()` — On startup, if Neo4j is empty but the cache exists, restore triples to Neo4j directly. + +Then modify `create_graph_components()` to use these methods. + +### File: Graph RAG Integration Module (e.g., `graph_rag_integration.py`) + +#### 1a. Add a triples cache file path to the store class + +In the class-level constants of your `GraphRAGStore` (next to the existing community cache paths), add: + +```python +TRIPLES_CACHE_FILE = CACHE_DIR / "neo4j_triples.pickle" +``` + +#### 1b. Add `save_triples_to_cache()` method + +Add this method to `GraphRAGStore`, after `get_triplets()`: + +```python +def save_triples_to_cache(self): + """Save extracted triples (entities + relationships) from Neo4j to a disk cache. + + This allows restoring triples to Neo4j without expensive LLM re-extraction + if Neo4j data is lost (e.g., container recreated without volume persistence). + """ + try: + triplets = self.get_triplets() + if not triplets: + log_structured('warning', 'No triplets to cache — Neo4j appears empty') + return False + + # Collect unique entities and relations from the triplets + entities = {} + relations = [] + for entity1, relation, entity2 in triplets: + entities[entity1.name] = entity1 + entities[entity2.name] = entity2 + relations.append(relation) + + cache_data = { + 'entities': list(entities.values()), + 'relations': relations, + 'triplet_count': len(triplets), + } + + with open(self.TRIPLES_CACHE_FILE, 'wb') as f: + pickle.dump(cache_data, f) + + log_structured('info', 'Successfully cached Neo4j triples to disk', { + 'entity_count': len(entities), + 'relation_count': len(relations), + 'triplet_count': len(triplets), + 'cache_file': str(self.TRIPLES_CACHE_FILE) + }) + return True + except Exception as e: + log_structured('error', f'Error saving triples cache: {e}') + return False +``` + +#### 1c. Add `load_triples_from_cache()` method + +Add this method right after `save_triples_to_cache()`: + +```python +def load_triples_from_cache(self): + """Load triples from disk cache and restore them to Neo4j. + + Returns True if triples were successfully restored, False otherwise. + """ + if not self.TRIPLES_CACHE_FILE.exists(): + log_structured('info', 'No triples cache file found') + return False + + try: + with open(self.TRIPLES_CACHE_FILE, 'rb') as f: + cache_data = pickle.load(f) + + entities = cache_data.get('entities', []) + relations = cache_data.get('relations', []) + + if not entities: + log_structured('warning', 'Triples cache file exists but contains no entities') + return False + + log_structured('info', 'Restoring triples from disk cache to Neo4j', { + 'entity_count': len(entities), + 'relation_count': len(relations), + 'cached_triplet_count': cache_data.get('triplet_count', 'unknown') + }) + + # Restore entities (nodes) to Neo4j + self.property_graph_store.upsert_nodes(entities) + log_structured('info', f'Restored {len(entities)} entity nodes to Neo4j') + + # Restore relations to Neo4j + if relations: + self.property_graph_store.upsert_relations(relations) + log_structured('info', f'Restored {len(relations)} relations to Neo4j') + + # Verify restoration + restored_triplets = self.get_triplets() + log_structured('info', f'Neo4j now contains {len(restored_triplets)} triplets after cache restore') + + return len(restored_triplets) > 0 + except Exception as e: + log_structured('error', f'Error restoring triples from cache: {e}') + return False +``` + +#### 1d. Modify `create_graph_components()` to use the cache + +The existing function has two branches: +1. Neo4j has data and `force_reindex=False` → skip indexing +2. Else → run LLM extraction + +Change this to **three** branches: + +``` +1. Neo4j has data and !force_reindex → skip indexing (also save triples to cache if not already cached) +2. Neo4j is empty and !force_reindex and triples cache exists → restore from cache +3. Else → full LLM extraction (also save triples to cache after) +``` + +Here is the full replacement logic for the branching section inside `create_graph_components()`: + +```python +# Check if Neo4j already has content +triplets = graph_store.get_triplets() +has_existing_content = len(triplets) > 0 + +log_structured('info', f'Neo4j check: Found {len(triplets)} triplets') + +if has_existing_content and not force_reindex: + # BRANCH 1: Neo4j has data — use it, but also ensure disk cache exists + log_structured('info', f'Neo4j already contains {len(triplets)} triplets. Skipping indexing.') + + # Ensure triples are also cached to disk for future recovery + if not graph_store.TRIPLES_CACHE_FILE.exists(): + log_structured('info', 'Neo4j has data but no triples cache on disk — creating cache now') + graph_store.save_triples_to_cache() + + property_graph_index = PropertyGraphIndex( + nodes=[], + property_graph_store=property_graph_store, + ) + + if not graph_store.communities_built: + log_structured('info', 'Building graph communities from existing Neo4j data') + try: + graph_store.build_communities() + except Exception as e: + log_structured('error', f'Error building communities: {e}') + +elif not has_existing_content and not force_reindex and graph_store.TRIPLES_CACHE_FILE.exists(): + # BRANCH 2: Neo4j is empty but triples cache exists — restore from cache + log_structured('info', 'Neo4j is empty but triples cache exists. Restoring from disk cache.') + + restored = graph_store.load_triples_from_cache() + if restored: + log_structured('info', 'Successfully restored triples from cache.') + + property_graph_index = PropertyGraphIndex( + nodes=[], + property_graph_store=property_graph_store, + ) + + if not graph_store.communities_built: + try: + graph_store.build_communities() + except Exception as e: + log_structured('error', f'Error building communities from restored data: {e}') + else: + # Cache restore failed — fall through to LLM extraction + log_structured('warning', 'Triples cache restore failed. Falling back to LLM extraction.') + if not nodes: + raise ValueError("Nodes must be provided when Neo4j is empty and cache restore fails") + + kg_extractor = GraphRAGExtractor( + llm=llm, + extract_prompt=KG_TRIPLET_EXTRACT_TMPL, + max_paths_per_chunk=max_paths_per_chunk, + parse_fn=custom_parse_fn, + ) + + property_graph_index = PropertyGraphIndex( + nodes=nodes, + kg_extractors=[kg_extractor], + property_graph_store=property_graph_store, + show_progress=True, + ) + + graph_store.save_triples_to_cache() + + try: + graph_store.build_communities() + except Exception as e: + log_structured('error', f'Error building communities: {e}') + +else: + # BRANCH 3: Full LLM extraction (force_reindex or no cache) + if not nodes: + raise ValueError("Nodes must be provided for indexing when Neo4j is empty or force_reindex=True") + + kg_extractor = GraphRAGExtractor( + llm=llm, + extract_prompt=KG_TRIPLET_EXTRACT_TMPL, + max_paths_per_chunk=max_paths_per_chunk, + parse_fn=custom_parse_fn, + ) + + if has_existing_content and force_reindex: + # Clear Neo4j before re-extraction + try: + from neo4j import GraphDatabase + driver = GraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USERNAME, NEO4J_PASSWORD)) + with driver.session() as session: + session.run("MATCH (n) DETACH DELETE n") + driver.close() + except Exception as e: + log_structured('warning', f'Error clearing Neo4j database: {e}') + + property_graph_index = PropertyGraphIndex( + nodes=nodes, + kg_extractors=[kg_extractor], + property_graph_store=property_graph_store, + show_progress=True, + ) + + # Cache the newly extracted triples + graph_store.save_triples_to_cache() + + try: + graph_store.build_communities() + except Exception as e: + log_structured('error', f'Error building communities: {e}') + +return graph_store, property_graph_index +``` + +--- + +## Change 2: Add GraphRAG Status Flags to Shared State + +### File: Shared State Module (e.g., `shared_state.py`) + +#### 2a. Add new module-level variables + +After the existing GraphRAG component variables, add: + +```python +# GraphRAG initialization status +graphrag_ready = False +graphrag_initializing = False +graphrag_error = None +``` + +#### 2b. Add setter/getter functions + +Add these functions before `is_agent_available()`: + +```python +def set_graphrag_status(ready=None, initializing=None, error=None): + """Update GraphRAG initialization status flags.""" + global graphrag_ready, graphrag_initializing, graphrag_error + from utils import log_structured + + if ready is not None: + graphrag_ready = ready + if initializing is not None: + graphrag_initializing = initializing + if error is not None: + graphrag_error = error + + log_structured('info', 'GraphRAG status updated', { + 'ready': graphrag_ready, + 'initializing': graphrag_initializing, + 'error': str(graphrag_error) if graphrag_error else None + }) + +def get_graphrag_status(): + """Get current GraphRAG initialization status.""" + return { + 'ready': graphrag_ready, + 'initializing': graphrag_initializing, + 'error': str(graphrag_error) if graphrag_error else None + } +``` + +--- + +## Change 3: Split Initialization into Two Phases + +### File: AI Core Module (e.g., `ai_core.py`) + +This is the largest change. You'll split your monolithic initialization function into: + +1. **`initialize_vector_index()`** — Fast path (~1-2 min). Loads/creates vector index, creates vector query engine, creates agent with vector-only tools, attaches the `run` method. The server becomes usable after this completes. +2. **`initialize_graphrag_components()`** — Background path. Connects to Neo4j, restores/extracts triples, builds communities, creates GraphRAG query engine, and **dynamically adds the GraphRAG tool** to the existing agent's tools list. +3. **`initialize_global_index()`** — Kept as backward-compatible wrapper that calls both in sequence. + +#### 3a. Update imports from shared_state + +Add `set_graphrag_status` to your shared_state imports: + +```python +from shared_state import ( + ..., + set_graphrag_status +) +``` + +#### 3b. Extract your GraphRAG Tool class to module level + +If your GraphRAG tool class (the `BaseTool` subclass that wraps `GraphRAGQueryEngine`) is defined inside the initialization function, move it to module level. This makes it reusable from both the init function and the background init function. + +The tool class should remain functionally identical — just move it out of the function body to module scope. Example: + +```python +class GraphRAGTool(BaseTool): + """Tool that queries using both vector and graph-based retrieval.""" + + def __init__(self, query_engine): + self.query_engine = query_engine + self._metadata = ToolMetadata( + name="answerquestionswith_graphrag", + description="Your tool description here" + ) + + @property + def metadata(self): + return self._metadata + + def __call__(self, query_str: str) -> ToolOutput: + # ... existing implementation unchanged ... + pass + + async def acall(self, input: str) -> ToolOutput: + return self.__call__(input) +``` + +#### 3c. Create `initialize_vector_index()` + +This function contains everything from your original `initialize_global_index()` **except** the GraphRAG-related code: + +- LLM and embedding model setup +- Global Settings configuration +- Vector index loading or creation (including document processing for cold starts) +- Vector query engine creation +- Agent creation with **vector-only** tools +- `simple_run` method attachment and agent testing + +**Key difference from the original**: No GraphRAG components, no Neo4j connection, no community building. The agent is created with only the vector query tool. + +The `simple_run` function works without modification because it dynamically looks up the GraphRAG tool: +```python +graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None) +if graphrag_tool: + # use it +# else: falls through to vector-only path +``` + +When GraphRAG isn't initialized yet, `graphrag_tool` is `None` and the code gracefully falls back to the direct response path. + +#### 3d. Create `initialize_graphrag_components()` + +This function handles everything GraphRAG-related: + +```python +async def initialize_graphrag_components() -> bool: + """Initialize GraphRAG components in the background.""" + set_graphrag_status(initializing=True, ready=False, error=None) + + try: + # Get current index and agent from shared state + from shared_state import global_workflow_agent as current_agent + import shared_state + index = shared_state.global_index + + if index is None: + raise RuntimeError("Vector index must be initialized first") + if current_agent is None: + raise RuntimeError("Agent must be initialized first") + + llm = Settings.llm + + # Connect to Neo4j + property_graph_store = Neo4jPropertyGraphStore( + username=NEO4J_USERNAME, + password=NEO4J_PASSWORD, + url=NEO4J_URL + ) + + # Check Neo4j state and create/restore components + temp_graph_store = GraphRAGStore(property_graph_store) + triplets = temp_graph_store.get_triplets() + neo4j_has_data = len(triplets) > 0 + + if neo4j_has_data: + graph_store, property_graph_index = create_graph_components( + llm=llm, force_reindex=False + ) + else: + # Get nodes from vector index for potential extraction + vector_nodes = [] + if hasattr(index, 'docstore') and index.docstore: + vector_nodes = list(index.docstore.docs.values()) + + if not vector_nodes: + raise ValueError("No nodes available for GraphRAG indexing") + + # create_graph_components will try cache restore first, then LLM extraction + graph_store, property_graph_index = create_graph_components( + llm=llm, + nodes=vector_nodes, + max_paths_per_chunk=10, + force_reindex=False # Allow cache restore path + ) + + # Ensure communities are built + if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built: + graph_store.build_communities() + + # Create GraphRAG query engine + vector_retriever = VectorIndexRetriever( + index=index, similarity_top_k=SIMILARITY_TOP_K + ) + + graphrag_query_engine = create_graphrag_query_engine( + vector_retriever=vector_retriever, + graph_store=graph_store, + llm=llm, + similarity_top_k=SIMILARITY_TOP_K + ) + + # Store in shared state + set_graphrag_components( + graph_store=graph_store, + property_graph_index=property_graph_index, + graphrag_query_engine=graphrag_query_engine + ) + + # *** KEY STEP: Add GraphRAG tool to the EXISTING agent *** + graphrag_tool = GraphRAGTool(graphrag_query_engine) + + from shared_state import global_workflow_agent as live_agent + if live_agent is not None: + live_agent.tools.append(graphrag_tool) + log_structured('info', 'Added GraphRAG tool to existing agent', { + 'total_tools': len(live_agent.tools), + 'tool_names': [t.metadata.name for t in live_agent.tools] + }) + + set_graphrag_status(ready=True, initializing=False) + return True + + except Exception as e: + log_structured('error', f'GraphRAG background initialization failed: {e}') + set_graphrag_status(ready=False, initializing=False, error=e) + return False +``` + +**Critical detail**: The GraphRAG tool is added to the agent's tools list dynamically via `live_agent.tools.append(graphrag_tool)`. Because `simple_run` does a dynamic lookup on `global_workflow_agent.tools` every time it runs, the GraphRAG tool becomes available to all subsequent requests without restarting the agent. + +#### 3e. Keep `initialize_global_index()` as a backward-compatible wrapper + +```python +async def initialize_global_index() -> bool: + """Backward-compatible wrapper: initializes everything synchronously.""" + vector_success = await initialize_vector_index() + if not vector_success: + return False + + graphrag_success = await initialize_graphrag_components() + if not graphrag_success: + log_structured('warning', 'GraphRAG init failed, but vector search is available') + + return True +``` + +--- + +## Change 4: Update Main Startup for Background Init + +### File: Main Module (e.g., `main.py`) + +#### 4a. Update imports + +```python +from ai_core import initialize_global_index, initialize_vector_index, initialize_graphrag_components +from shared_state import global_workflow_agent, is_agent_available, get_graphrag_status +``` + +#### 4b. Modify `startup_event()` for two-phase startup + +```python +async def startup_event() -> bool: + """Phase 1 (sync): MongoDB + vector index. Phase 2 (background): GraphRAG.""" + log_structured('info', "Application startup sequence initiated.") + all_success = True + + # 1. Initialize MongoDB + try: + if init_mongodb(): + log_structured('info', "MongoDB initialized successfully.") + else: + all_success = False + except Exception as db_err: + log_structured('critical', f"MongoDB init failed: {db_err}") + all_success = False + + # 2. Phase 1: Vector index + agent (fast) + log_structured('info', "Phase 1: Initializing vector index and agent...") + vector_success = await initialize_vector_index() + + if not is_agent_available() or not vector_success: + log_structured('critical', "Vector initialization failed") + all_success = False + else: + log_structured('info', "Phase 1 complete: server is ready for vector queries") + + # 3. Phase 2: GraphRAG in background + if vector_success: + log_structured('info', "Phase 2: Launching GraphRAG initialization in background...") + + async def _background_graphrag_init(): + try: + success = await initialize_graphrag_components() + if success: + log_structured('info', "GraphRAG background init completed") + else: + log_structured('warning', "GraphRAG background init failed — vector search still works") + except Exception as e: + log_structured('error', f"GraphRAG background init error: {e}") + + # Schedule as background task — does NOT block server startup + asyncio.ensure_future(_background_graphrag_init()) + + log_structured('info', f"Startup complete. Server ready: {all_success}") + return all_success +``` + +#### 4c. Restructure the main execution block + +The old pattern of `asyncio.run(startup_event())` followed by `asyncio.run(serve(app, config))` won't work because background tasks launched by `asyncio.ensure_future()` need to run in the same event loop as the server. Restructure: + +```python +if __name__ == '__main__': + from hypercorn.config import Config as HypercornConfig + from hypercorn.asyncio import serve as hypercorn_serve + + config = HypercornConfig() + # ... config setup ... + + async def run_server_with_startup(): + """Run startup (with background GraphRAG init) then serve.""" + await startup_event() + + # Double-check agent + if not is_agent_available(): + log_structured('critical', "Agent unavailable. Forcing re-init...") + await initialize_vector_index() + if is_agent_available(): + asyncio.ensure_future(initialize_graphrag_components()) + + # Start serving — background GraphRAG init continues in same event loop + await hypercorn_serve(app, config) + + try: + asyncio.run(run_server_with_startup()) + except KeyboardInterrupt: + log_structured('info', "Server stopped manually.") +``` + +**Why this matters**: `asyncio.ensure_future()` schedules a coroutine on the *current* event loop. If you use two separate `asyncio.run()` calls, the background task from the first call is lost when the first event loop closes. By wrapping everything in a single `async def` and calling `asyncio.run()` once, the background GraphRAG init shares the server's event loop and runs concurrently with request handling. + +--- + +## Change 5: Update Routes for Graceful Degradation + +### File: Routes Module (e.g., `routes.py`) + +#### 5a. Update imports + +```python +from ai_core import initialize_global_index, initialize_vector_index +from shared_state import ..., get_graphrag_status +``` + +#### 5b. Update `/status` endpoint + +Add GraphRAG status to the response: + +```python +graphrag_status = get_graphrag_status() + +status_data = { + # ... existing fields ... + 'graphrag_ready': graphrag_status['ready'], + 'graphrag_initializing': graphrag_status['initializing'], + 'graphrag_error': graphrag_status['error'], +} +``` + +#### 5c. Update on-demand initialization in `/chat` + +If the `/chat` endpoint has a fallback that tries to initialize on-demand when the agent is unavailable, change it to use `initialize_vector_index()` instead of `initialize_global_index()` for faster recovery: + +```python +# Old: +index_success = await initialize_global_index() + +# New: +index_success = await initialize_vector_index() +``` + +#### 5d. No changes needed for chat logic + +The `/chat` endpoint itself needs no changes for graceful degradation. The agent's `simple_run` method already dynamically checks for GraphRAG tool availability: + +```python +graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None) +if graphrag_tool: + # Use GraphRAG — only happens after background init completes +else: + # Fall through to vector-only response +``` + +--- + +## Change 6: Docker Compose Documentation + +### File: Neo4j Docker Compose (e.g., `docker-compose-neo4j.yml`) + +Add a comment at the top explaining volume persistence: + +```yaml +# IMPORTANT: Neo4j data persists in bind-mounted directories (./neo4j/data, etc.). +# These survive `docker-compose down` and `docker-compose up -d` restarts. +# However, if you delete the data directory manually, all graph data +# (extracted triples, entities, relationships) will be lost and must be +# re-extracted via LLM calls (10-20+ minutes) or restored from the disk cache +# at index_storage/graphrag_cache/neo4j_triples.pickle. +``` + +--- + +## Verification Checklist + +### Test 1: Warm start (Neo4j has data, vector index cached) +- [ ] Server starts and accepts chat requests within ~1-2 minutes +- [ ] `/status` shows `graphrag_initializing: true` briefly, then `graphrag_ready: true` +- [ ] Chat requests work with vector search immediately +- [ ] GraphRAG tool becomes available after background init completes + +### Test 2: Cold Neo4j with caches (Neo4j empty, `index_storage/` intact) +- [ ] Server starts with vector queries in ~1-2 minutes +- [ ] Background init restores triples from `neo4j_triples.pickle` (check logs for "Restoring triples from disk cache") +- [ ] No LLM extraction calls (check logs — should NOT see "Building PropertyGraphIndex via LLM extraction") +- [ ] GraphRAG becomes available after cache restore + community build + +### Test 3: Full cold start (no `index_storage/`, no Neo4j data) +- [ ] Full rebuild runs (document parsing → vector index → LLM extraction → community building) +- [ ] `neo4j_triples.pickle` is created after LLM extraction +- [ ] Community cache files are created after community building +- [ ] Everything works on subsequent restarts using caches + +### Test 4: Degraded mode +- [ ] Send a chat request before GraphRAG finishes initializing +- [ ] Response comes back using vector-only search +- [ ] No errors in response — just missing GraphRAG context +- [ ] After GraphRAG init completes, subsequent requests use both vector and GraphRAG + +### Test 5: Cache integrity +- [ ] After a successful startup with Neo4j data, verify `neo4j_triples.pickle` exists in `index_storage/graphrag_cache/` +- [ ] Delete Neo4j data, restart — verify cache restore works +- [ ] Delete the pickle file, restart with Neo4j data — verify it gets recreated + +--- + +## File Change Summary + +| File | Changes | +|------|---------| +| Graph RAG integration module | `save_triples_to_cache()`, `load_triples_from_cache()`, 3-branch logic in `create_graph_components()` | +| Shared state module | `graphrag_ready`/`graphrag_initializing`/`graphrag_error` flags, `set_graphrag_status()`, `get_graphrag_status()` | +| AI core module | Extract GraphRAG tool class to module level, split into `initialize_vector_index()` + `initialize_graphrag_components()`, keep `initialize_global_index()` as wrapper | +| Main module | Two-phase startup in `startup_event()`, single `asyncio.run()` wrapping both startup and serve | +| Routes module | Import new functions, add GraphRAG status to `/status`, use `initialize_vector_index()` for on-demand init | +| Docker compose | Documentation comment about volume persistence | diff --git a/graph_rag_integration.py b/graph_rag_integration.py index 042a33e..af298d5 100644 --- a/graph_rag_integration.py +++ b/graph_rag_integration.py @@ -183,6 +183,7 @@ class GraphRAGStore: CACHE_DIR = Path("index_storage/graphrag_cache") COMMUNITY_CACHE_FILE = CACHE_DIR / "community_summary.pickle" ENTITY_INFO_CACHE_FILE = CACHE_DIR / "entity_info.pickle" + TRIPLES_CACHE_FILE = CACHE_DIR / "neo4j_triples.pickle" def __init__(self, property_graph_store): """Initialize with a property_graph_store (Neo4j or in-memory).""" @@ -206,6 +207,90 @@ class GraphRAGStore: """Get triplets from the property graph store.""" return self.property_graph_store.get_triplets() + def save_triples_to_cache(self): + """Save extracted triples (entities + relationships) from Neo4j to a disk cache. + + This allows restoring triples to Neo4j without expensive LLM re-extraction + if Neo4j data is lost (e.g., container recreated without volume persistence). + """ + try: + triplets = self.get_triplets() + if not triplets: + log_structured('warning', 'No triplets to cache — Neo4j appears empty') + return False + + # Collect unique entities and relations from the triplets + entities = {} + relations = [] + for entity1, relation, entity2 in triplets: + entities[entity1.name] = entity1 + entities[entity2.name] = entity2 + relations.append(relation) + + cache_data = { + 'entities': list(entities.values()), + 'relations': relations, + 'triplet_count': len(triplets), + } + + with open(self.TRIPLES_CACHE_FILE, 'wb') as f: + pickle.dump(cache_data, f) + + log_structured('info', 'Successfully cached Neo4j triples to disk', { + 'entity_count': len(entities), + 'relation_count': len(relations), + 'triplet_count': len(triplets), + 'cache_file': str(self.TRIPLES_CACHE_FILE) + }) + return True + except Exception as e: + log_structured('error', f'Error saving triples cache: {e}') + return False + + def load_triples_from_cache(self): + """Load triples from disk cache and restore them to Neo4j. + + Returns True if triples were successfully restored, False otherwise. + """ + if not self.TRIPLES_CACHE_FILE.exists(): + log_structured('info', 'No triples cache file found at %s', str(self.TRIPLES_CACHE_FILE)) + return False + + try: + with open(self.TRIPLES_CACHE_FILE, 'rb') as f: + cache_data = pickle.load(f) + + entities = cache_data.get('entities', []) + relations = cache_data.get('relations', []) + + if not entities: + log_structured('warning', 'Triples cache file exists but contains no entities') + return False + + log_structured('info', 'Restoring triples from disk cache to Neo4j', { + 'entity_count': len(entities), + 'relation_count': len(relations), + 'cached_triplet_count': cache_data.get('triplet_count', 'unknown') + }) + + # Restore entities (nodes) to Neo4j + self.property_graph_store.upsert_nodes(entities) + log_structured('info', f'Restored {len(entities)} entity nodes to Neo4j') + + # Restore relations to Neo4j + if relations: + self.property_graph_store.upsert_relations(relations) + log_structured('info', f'Restored {len(relations)} relations to Neo4j') + + # Verify restoration + restored_triplets = self.get_triplets() + log_structured('info', f'Neo4j now contains {len(restored_triplets)} triplets after cache restore') + + return len(restored_triplets) > 0 + except Exception as e: + log_structured('error', f'Error restoring triples from cache: {e}') + return False + def generate_community_summary(self, text): """Generate summary for a given text using an LLM with handling for large contexts.""" @@ -691,13 +776,18 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind if has_existing_content and not force_reindex: log_structured('info', f'Neo4j already contains {len(triplets)} triplets. Skipping indexing.') - + + # Ensure triples are also cached to disk for future recovery + if not graph_store.TRIPLES_CACHE_FILE.exists(): + log_structured('info', 'Neo4j has data but no triples cache on disk — creating cache now') + graph_store.save_triples_to_cache() + # Create a minimal PropertyGraphIndex without indexing property_graph_index = PropertyGraphIndex( nodes=[], # Empty nodes since we're not indexing property_graph_store=property_graph_store, ) - + # Build communities from existing data (if not already built) if not graph_store.communities_built: log_structured('info', 'Building graph communities from existing Neo4j data') @@ -708,12 +798,64 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind log_structured('error', f'Error building communities: {e}') else: log_structured('info', 'Communities already built, skipping rebuild') + elif not has_existing_content and not force_reindex and graph_store.TRIPLES_CACHE_FILE.exists(): + # Neo4j is empty but we have a triples cache — restore from cache instead of LLM extraction + log_structured('info', 'Neo4j is empty but triples cache exists. Restoring from disk cache (skipping LLM extraction).') + + restored = graph_store.load_triples_from_cache() + if restored: + log_structured('info', 'Successfully restored triples from cache. Skipping LLM-based extraction.') + + # Create a minimal PropertyGraphIndex (no extraction needed) + property_graph_index = PropertyGraphIndex( + nodes=[], + property_graph_store=property_graph_store, + ) + + # Build communities from restored data + if not graph_store.communities_built: + log_structured('info', 'Building graph communities from restored Neo4j data') + try: + graph_store.build_communities() + log_structured('info', 'Communities built successfully from restored data') + except Exception as e: + log_structured('error', f'Error building communities from restored data: {e}') + else: + # Cache restore failed — fall through to full LLM extraction + log_structured('warning', 'Triples cache restore failed. Falling back to LLM extraction.') + if not nodes: + raise ValueError("Nodes must be provided for indexing when Neo4j is empty and cache restore fails") + + kg_extractor = GraphRAGExtractor( + llm=llm, + extract_prompt=KG_TRIPLET_EXTRACT_TMPL, + max_paths_per_chunk=max_paths_per_chunk, + parse_fn=custom_parse_fn, + ) + + log_structured('info', 'Building PropertyGraphIndex via LLM extraction', {'node_count': len(nodes)}) + property_graph_index = PropertyGraphIndex( + nodes=nodes, + kg_extractors=[kg_extractor], + property_graph_store=property_graph_store, + show_progress=True, + ) + + # Cache the newly extracted triples for future recovery + graph_store.save_triples_to_cache() + + log_structured('info', 'Building graph communities') + try: + graph_store.build_communities() + log_structured('info', 'Communities built successfully') + except Exception as e: + log_structured('error', f'Error building communities: {e}') else: - # Need to perform indexing + # Need to perform full LLM-based indexing if not nodes: log_structured('error', 'No nodes provided for indexing and Neo4j is empty or force_reindex=True') raise ValueError("Nodes must be provided for indexing when Neo4j is empty or force_reindex=True") - + # Create the knowledge graph extractor kg_extractor = GraphRAGExtractor( llm=llm, @@ -721,12 +863,10 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind max_paths_per_chunk=max_paths_per_chunk, parse_fn=custom_parse_fn, ) - + if has_existing_content and force_reindex: log_structured('info', 'Force reindexing requested. Clearing existing Neo4j data.') try: - # Try to clear the graph using Neo4j's native query - # Note: This requires the Neo4j APOC plugin to be installed from neo4j import GraphDatabase driver = GraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USERNAME, NEO4J_PASSWORD)) with driver.session() as session: @@ -735,16 +875,19 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind log_structured('info', 'Successfully cleared Neo4j database') except Exception as e: log_structured('warning', f'Error clearing Neo4j database: {e}. Proceeding with indexing anyway.') - - # Build the property graph index - log_structured('info', 'Building PropertyGraphIndex', {'node_count': len(nodes)}) + + # Build the property graph index (this is the expensive LLM extraction step) + log_structured('info', 'Building PropertyGraphIndex via LLM extraction', {'node_count': len(nodes)}) property_graph_index = PropertyGraphIndex( nodes=nodes, kg_extractors=[kg_extractor], property_graph_store=property_graph_store, show_progress=True, ) - + + # Cache the newly extracted triples for future recovery + graph_store.save_triples_to_cache() + # Build communities log_structured('info', 'Building graph communities') try: @@ -752,7 +895,7 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind log_structured('info', 'Communities built successfully') except Exception as e: log_structured('error', f'Error building communities: {e}') - + return graph_store, property_graph_index def create_graphrag_query_engine(vector_retriever, graph_store, llm, similarity_top_k=20): diff --git a/main.py b/main.py index 18dbbe5..0d037e8 100644 --- a/main.py +++ b/main.py @@ -20,8 +20,8 @@ from config import ( ) from utils import logger, log_structured from json_utils import CustomJSONProvider -from ai_core import initialize_global_index # Import the initialization function -from shared_state import global_workflow_agent, is_agent_available # Import shared state +from ai_core import initialize_global_index, initialize_vector_index, initialize_graphrag_components # Import initialization functions +from shared_state import global_workflow_agent, is_agent_available, get_graphrag_status # Import shared state from routes import register_routes from init_mongodb import init_mongodb # Your MongoDB initialization script @@ -59,43 +59,61 @@ log_structured('info', "Flask routes registered.") # --- Startup Function --- async def startup_event() -> bool: """Tasks to run when the application starts. - + + Phase 1 (synchronous): MongoDB + vector index + agent (~1-2 min). + Phase 2 (background): GraphRAG components (Neo4j, triples, communities). + Returns: - bool: True if all startup tasks completed successfully, False otherwise + bool: True if Phase 1 completed successfully (server is usable). """ log_structured('info', "Application startup sequence initiated.") all_success = True - # 1. Initialize MongoDB Connection & Schema (using your script) + # 1. Initialize MongoDB Connection & Schema log_structured('info', "Initializing MongoDB connection...") - mongo_success = False try: if init_mongodb(): log_structured('info', "MongoDB initialized successfully.") - mongo_success = True else: log_structured('warning', "MongoDB initialization script finished, but reported issues.") all_success = False except Exception as db_err: log_structured('critical', "FATAL: MongoDB initialization failed.", {'error': str(db_err)}) all_success = False - # We'll continue in a degraded state - # 2. Initialize Global AI Index and Agent - log_structured('info', "Initializing global AI index and agent...") - index_success = await initialize_global_index() - - # Explicitly check the status after initialization + # 2. Phase 1: Initialize vector index and agent (fast path) + log_structured('info', "Phase 1: Initializing vector index and agent...") + vector_success = await initialize_vector_index() + if not is_agent_available(): - log_structured('critical', "After initialize_global_index, global_workflow_agent is still unavailable, even though function may have reported success") + log_structured('critical', "After initialize_vector_index, agent is still unavailable") all_success = False - elif not index_success: - log_structured('warning', "AI initialization reported failure, but will continue in degraded state") + elif not vector_success: + log_structured('warning', "Vector initialization reported failure") all_success = False else: - log_structured('info', "AI initialization successful, global_workflow_agent is available") + log_structured('info', "Phase 1 complete: vector index and agent are available") - log_structured('info', f"Application startup sequence complete. Overall success: {all_success}") + # 3. Phase 2: Launch GraphRAG initialization as a background task + if vector_success: + log_structured('info', "Phase 2: Launching GraphRAG initialization in background...") + + async def _background_graphrag_init(): + try: + success = await initialize_graphrag_components() + if success: + log_structured('info', "Background GraphRAG initialization completed successfully") + else: + log_structured('warning', "Background GraphRAG initialization failed — vector search still works") + except Exception as e: + log_structured('error', f"Background GraphRAG initialization error: {e}") + + # Schedule as a background task — does not block server startup + asyncio.ensure_future(_background_graphrag_init()) + else: + log_structured('warning', "Skipping GraphRAG background init because vector init failed") + + log_structured('info', f"Application startup sequence complete. Server ready: {all_success}") return all_success @@ -144,25 +162,29 @@ if __name__ == '__main__': log_structured('info', f"Starting Hypercorn server on {SERVER_HOST}:{SERVER_PORT}") log_structured('info', f"Reload mode: {'Enabled' if USE_RELOADER else 'Disabled'}") - # Execute startup task before running the server - log_structured('info', "Manually executing startup sequence before server start") - startup_success = asyncio.run(startup_event()) - - # Double-check that the agent is initialized - if not is_agent_available(): - log_structured('critical', "After startup, global_workflow_agent is still unavailable. Forcing re-initialization...") - # Try once more to initialize - index_success = asyncio.run(initialize_global_index()) - if not index_success or not is_agent_available(): - log_structured('critical', "Emergency initialization also failed. Server will run but chat functionality will be impaired.") - else: - log_structured('info', "Emergency initialization succeeded.") + async def run_server_with_startup(): + """Run startup (with background GraphRAG init) then serve.""" + # Phase 1 runs synchronously, Phase 2 launches as background task + startup_success = await startup_event() + + # Double-check agent is initialized + if not is_agent_available(): + log_structured('critical', "After startup, agent is still unavailable. Forcing re-initialization...") + vector_success = await initialize_vector_index() + if not vector_success or not is_agent_available(): + log_structured('critical', "Emergency initialization failed. Server will run but chat will be impaired.") + else: + log_structured('info', "Emergency initialization succeeded.") + # Also try GraphRAG in background + asyncio.ensure_future(initialize_graphrag_components()) + + # Start serving — background GraphRAG init continues in same event loop + await hypercorn_serve(app, config) - # Run the server try: - asyncio.run(hypercorn_serve(app, config)) + asyncio.run(run_server_with_startup()) except KeyboardInterrupt: - log_structured('info', "Server stopped manually (KeyboardInterrupt).") + log_structured('info', "Server stopped manually (KeyboardInterrupt).") except Exception as run_err: - log_structured('critical', "Hypercorn server failed to run.", {'error': str(run_err)}) - sys.exit(1) \ No newline at end of file + log_structured('critical', "Hypercorn server failed to run.", {'error': str(run_err)}) + sys.exit(1) \ No newline at end of file diff --git a/neo4j/docker-compose-neo4j.yml b/neo4j/docker-compose-neo4j.yml index 40fb539..4b417e3 100644 --- a/neo4j/docker-compose-neo4j.yml +++ b/neo4j/docker-compose-neo4j.yml @@ -1,5 +1,12 @@ version: '3' +# IMPORTANT: Neo4j data persists in bind-mounted directories (./neo4j/data, etc.). +# These survive `docker-compose down` and `docker-compose up -d` restarts. +# However, if you delete the ./neo4j/data directory manually, all graph data +# (extracted triples, entities, relationships) will be lost and must be +# re-extracted via LLM calls (10-20+ minutes) or restored from the disk cache +# at index_storage/graphrag_cache/neo4j_triples.pickle. + services: neo4j: image: neo4j:latest diff --git a/routes.py b/routes.py index 5e554fc..df72f3e 100644 --- a/routes.py +++ b/routes.py @@ -19,8 +19,8 @@ from werkzeug.utils import secure_filename # Keep if file uploads are reintroduc from utils import logger, log_structured, allowed_file # Keep allowed_file if uploads return from json_utils import CustomJSONEncoder # Used indirectly by make_response/jsonify with custom provider from session_manager import get_or_create_session_state, clear_chat_state_cache -from ai_core import initialize_global_index # Import initialization function -from shared_state import global_workflow_agent, global_index, is_agent_available # Import shared state variables +from ai_core import initialize_global_index, initialize_vector_index # Import initialization functions +from shared_state import global_workflow_agent, global_index, is_agent_available, get_graphrag_status # Import shared state variables from document_generator import create_brief_docx from config import IMAGES_DIRECTORY, NETFLIX_DOCS_FOLDER, BASE_DIR, SUPPORTING_FILES_DIR # Import necessary configs from llama_index.core.tools import ToolOutput # Import ToolOutput for type checking @@ -122,10 +122,10 @@ def register_routes(app: Flask): if not is_agent_available(): log_structured('error', 'Global workflow agent is not initialized') - # Try to initialize it on-demand if not available + # Try to initialize it on-demand if not available (vector-only for speed) try: - log_structured('info', 'Attempting to initialize global index and agent on-demand') - index_success = await initialize_global_index() + log_structured('info', 'Attempting to initialize vector index and agent on-demand') + index_success = await initialize_vector_index() # Let initialization complete, then check the global variable if index_success: @@ -616,13 +616,18 @@ def register_routes(app: Flask): # SUPER SIMPLIFIED - ALWAYS RETURN INITIALIZED log_structured('info', 'Status check: ALWAYS returning initialized=true') - + + graphrag_status = get_graphrag_status() + status_data = { 'global_status': 'initialized', 'initialized': True, 'is_initialized': True, 'timestamp': datetime.now().isoformat(), - 'override': 'FORCE_INITIALIZED' + 'override': 'FORCE_INITIALIZED', + 'graphrag_ready': graphrag_status['ready'], + 'graphrag_initializing': graphrag_status['initializing'], + 'graphrag_error': graphrag_status['error'], } if session_id: @@ -721,9 +726,9 @@ def register_routes(app: Flask): # This is a development-only endpoint; disable in production if os.environ.get("PRODUCTION", "false").lower() == "true": return jsonify({'error': 'Debug endpoints not available in production'}), 403 - + from ai_core import initialize_global_index - + try: # Log current state before reinitializing log_structured('info', 'Reinitializing global agent', { @@ -731,8 +736,8 @@ def register_routes(app: Flask): 'agent_none_before': global_workflow_agent is None, 'index_none_before': global_index is None }) - - # Attempt to reinitialize + + # Attempt full reinitialize (vector + GraphRAG) success = await initialize_global_index() # Import fresh state after initialization diff --git a/shared_state.py b/shared_state.py index b3d50ae..df82e89 100644 --- a/shared_state.py +++ b/shared_state.py @@ -15,6 +15,11 @@ global_graph_store = None global_property_graph_index = None global_graphrag_query_engine = None +# GraphRAG initialization status +graphrag_ready = False +graphrag_initializing = False +graphrag_error = None + # Helper to set the global agent def set_global_agent(agent): """Set the global agent instance.""" @@ -73,6 +78,33 @@ def set_graphrag_components(graph_store, property_graph_index, graphrag_query_en log_structured('info', f'GraphRAG components set successfully: {components_set}') return components_set +# Helper to set/get GraphRAG initialization status +def set_graphrag_status(ready=None, initializing=None, error=None): + """Update GraphRAG initialization status flags.""" + global graphrag_ready, graphrag_initializing, graphrag_error + from utils import log_structured + + if ready is not None: + graphrag_ready = ready + if initializing is not None: + graphrag_initializing = initializing + if error is not None: + graphrag_error = error + + log_structured('info', 'GraphRAG status updated', { + 'ready': graphrag_ready, + 'initializing': graphrag_initializing, + 'error': str(graphrag_error) if graphrag_error else None + }) + +def get_graphrag_status(): + """Get current GraphRAG initialization status.""" + return { + 'ready': graphrag_ready, + 'initializing': graphrag_initializing, + 'error': str(graphrag_error) if graphrag_error else None + } + # Helper to get agent status def is_agent_available(): """