diff --git a/ai_core.py b/ai_core.py
index a4b0eea..a7d8012 100644
--- a/ai_core.py
+++ b/ai_core.py
@@ -82,9 +82,10 @@ nest_asyncio.apply()
# --- Global AI State ---
# Import shared state to ensure all modules access the same instances
from shared_state import (
- global_index, global_workflow_agent,
+ global_index, global_workflow_agent,
global_graph_store, global_property_graph_index, global_graphrag_query_engine,
- set_global_agent, set_global_index, set_graphrag_components
+ set_global_agent, set_global_index, set_graphrag_components,
+ set_graphrag_status
)
@@ -720,31 +721,96 @@ async def process_documents_in_directory(directory: str, session_id: Optional[st
except Exception as close_err: log_structured('error', 'Error closing parser_images client', {'error': str(close_err)})
-# --- Global Index Initialization ---
-async def initialize_global_index() -> bool:
- """Initialize the global index from Netflix documents at startup."""
- # Use shared state instead of module-level globals
+# --- GraphRAG Tool Class (module-level for reuse) ---
+class GraphRAGTool(BaseTool):
+ """Tool that queries Netflix marketing materials using both vector and graph-based retrieval."""
+ def __init__(self, query_engine):
+ self.query_engine = query_engine
+ self._metadata = ToolMetadata(
+ name="answerquestionswith_graphrag",
+ description="USE THIS TOOL FOR ALL QUERIES - Queries Netflix marketing materials using both vector and graph-based retrieval for more comprehensive answers. Use this for complex questions that need context from multiple related documents."
+ )
+
+ @property
+ def metadata(self):
+ return self._metadata
+
+ def __call__(self, query_str: str) -> ToolOutput:
+ """Run query through GraphRAG and generate synthesized answer."""
+ log_structured('info', 'GraphRAG Tool: Starting dual retrieval', {'query': query_str})
+
+ try:
+ retrieval_result = self.query_engine.custom_query(query_str)
+ final_answer = generate_final_answer(query_str, retrieval_result, self.query_engine.llm)
+
+ log_message = {
+ 'vector_context_length': len(retrieval_result.get('vector_context', '')),
+ 'graphrag_context_length': len(retrieval_result.get('graphrag_context', '')),
+ 'vector_nodes_count': len(retrieval_result.get('vector_nodes', [])),
+ 'community_ids': retrieval_result.get('community_ids', [])
+ }
+ log_structured('info', 'GraphRAG Tool: Retrieval complete', log_message)
+
+ vector_nodes = retrieval_result.get('vector_nodes', [])
+
+ class NodeWrapper:
+ def __init__(self, nodes):
+ self.source_nodes = nodes
+
+ tool_output = ToolOutput(
+ content=final_answer,
+ tool_name="GraphRAG",
+ raw_output=NodeWrapper(vector_nodes),
+ raw_input={"query": query_str, "source": "graphrag", "retrieval_stats": log_message}
+ )
+
+ log_structured('debug', 'GraphRAG Tool: Including image metadata in response',
+ {'node_count': len(vector_nodes)})
+
+ return tool_output
+ except Exception as graphrag_err:
+ log_structured('error', f'Error in GraphRAG tool: {graphrag_err}',
+ {'traceback': traceback.format_exc()})
+ return ToolOutput(
+ content=f"I encountered an error while retrieving information: {str(graphrag_err)}. Please try again or use the standard query tool.",
+ tool_name="GraphRAG",
+ raw_input={"query": query_str},
+ raw_output={"error": str(graphrag_err)},
+ is_error=True
+ )
+
+ async def acall(self, input: str) -> ToolOutput:
+ """Async version of __call__."""
+ return self.__call__(input)
+
+
+# --- Phase 1: Vector Index Initialization (fast path) ---
+async def initialize_vector_index() -> bool:
+ """Initialize the vector index and agent with vector-only tools.
+
+ This is the fast path (~1-2 minutes) that makes the server usable.
+ GraphRAG components are initialized separately in the background.
+
+ Returns:
+ bool: True if vector index and agent were initialized successfully.
+ """
try:
# --- Configure LLM and Embedding Model ---
- # Check for real API keys
openai_key = os.environ.get("OPENAI_API_KEY", "")
if not openai_key:
log_structured('critical', 'No OpenAI API key provided. Make sure OPENAI_API_KEY is set in your .env file.')
return False
-
- # Log the first few characters of the key for debugging (important to verify it's loaded)
+
key_preview = openai_key[:4] + "..." if len(openai_key) > 4 else "invalid"
log_structured('info', f'Using OpenAI API key starting with: {key_preview}')
-
- # Normal initialization with real API keys
+
try:
llm = LlamaOpenAI(
model=LLM_MODEL,
temperature=LLM_TEMPERATURE,
timeout=LLM_TIMEOUT
)
- # Test the LLM with a simple prompt to ensure it's working
test_prompt = "Say 'API key is working' if you can read this."
_ = llm.complete(test_prompt)
log_structured('info', 'Successfully tested LLM API connection')
@@ -757,95 +823,26 @@ async def initialize_global_index() -> bool:
Settings.llm = llm
Settings.embed_model = embed_model
Settings.callback_manager = CallbackManager([token_counter])
- # Settings.chunk_size = NODE_PARSER_CHUNK_SIZE # If using SentenceSplitter
- # Settings.chunk_overlap = NODE_PARSER_CHUNK_OVERLAP
- # Use Semantic Splitter (more robust, less config needed here)
node_parser = SemanticSplitterNodeParser(
- buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model # Adjust threshold
+ buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model
)
Settings.node_parser = node_parser
- # Alternatively, configure SentenceSplitter globally
- # Settings.node_parser = SentenceSplitter(
- # chunk_size=NODE_PARSER_CHUNK_SIZE,
- # chunk_overlap=NODE_PARSER_CHUNK_OVERLAP,
- # # ... other SentenceSplitter params
- # )
- # --- Load or Build Index ---
+ # --- Load or Build Vector Index ---
if INDEX_PERSIST_PATH.exists():
log_structured('info', f'Loading existing index from {INDEX_PERSIST_PATH}')
storage_context = StorageContext.from_defaults(persist_dir=str(INDEX_PERSIST_PATH))
index = load_index_from_storage(storage_context)
- # Save to shared state
set_global_index(index)
log_structured('info', 'Successfully loaded existing index')
-
- # Attempt to load or recreate GraphRAG components
- try:
- log_structured('info', 'Attempting to recreate GraphRAG components from loaded index')
-
- # First, try to connect to Neo4j to check if it has data
- from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore
- property_graph_store = Neo4jPropertyGraphStore(
- username=NEO4J_USERNAME,
- password=NEO4J_PASSWORD,
- url=NEO4J_URL
- )
- log_structured('info', 'Successfully connected to Neo4j database')
-
- # Create temporary GraphRAGStore to check for existing data
- temp_graph_store = GraphRAGStore(property_graph_store)
- triplets = temp_graph_store.get_triplets()
- neo4j_has_data = len(triplets) > 0
-
- if neo4j_has_data:
- # Neo4j already has data, just use it
- log_structured('info', f'Neo4j contains {len(triplets)} triplets. Using existing data.')
- graph_store, property_graph_index = create_graph_components(
- llm=llm,
- force_reindex=False # Use existing Neo4j data
- )
- log_structured('info', 'GraphRAG components loaded from existing Neo4j data')
- else:
- # Neo4j is empty, need to extract nodes from the vector index to populate it
- log_structured('info', 'Neo4j is empty. Extracting nodes from vector index for GraphRAG indexing.')
-
- # Get all nodes from the vector index
- from llama_index.core.schema import TextNode
- vector_nodes = []
-
- # Extract nodes from the index's docstore
- if hasattr(index, 'docstore') and index.docstore:
- docstore_nodes = list(index.docstore.docs.values())
- vector_nodes.extend(docstore_nodes)
- log_structured('info', f'Retrieved {len(docstore_nodes)} nodes from index docstore')
-
- # Handle cases where we can't get nodes directly
- if not vector_nodes:
- log_structured('warning', 'Could not retrieve nodes from vector index. GraphRAG indexing will be skipped.')
- raise ValueError("No nodes could be retrieved from the vector index for GraphRAG indexing")
-
- # Now create GraphRAG components with the retrieved nodes
- graph_store, property_graph_index = create_graph_components(
- llm=llm,
- nodes=vector_nodes,
- max_paths_per_chunk=10,
- force_reindex=True # Force indexing since Neo4j is empty
- )
- log_structured('info', f'GraphRAG components created with {len(vector_nodes)} nodes from vector index')
- except Exception as e:
- log_structured('warning', f'Error recreating GraphRAG components: {e}. Continuing without GraphRAG.')
- graph_store = None
else:
log_structured('info', f'No index found at {INDEX_PERSIST_PATH}. Creating new index from {NETFLIX_DOCS_FOLDER}')
if not NETFLIX_DOCS_FOLDER.exists() or not any(NETFLIX_DOCS_FOLDER.iterdir()):
- log_structured('error', f'Netflix documents folder is missing or empty: {NETFLIX_DOCS_FOLDER}')
- return False
+ log_structured('error', f'Netflix documents folder is missing or empty: {NETFLIX_DOCS_FOLDER}')
+ return False
- # Process documents using LlamaParse
documents = await process_documents_in_directory(str(NETFLIX_DOCS_FOLDER), session_id="global_init")
-
if not documents:
log_structured('error', f'No documents processed from {NETFLIX_DOCS_FOLDER}. Index creation aborted.')
return False
@@ -855,138 +852,42 @@ async def initialize_global_index() -> bool:
'sample_doc_metadata': documents[0].metadata if documents else None
})
- # Build the index using the globally configured Settings (incl. node_parser)
index = VectorStoreIndex.from_documents(
documents=documents,
show_progress=True,
- # service_context=service_context, # Old way, use Settings now
)
-
- # Save to shared state
set_global_index(index)
-
- # Persist the index
index.storage_context.persist(persist_dir=str(INDEX_PERSIST_PATH))
log_structured('info', f'Index created and persisted to {INDEX_PERSIST_PATH}')
-
- # --- Create GraphRAG components ---
- log_structured('info', 'Starting GraphRAG component creation')
- try:
- # Get nodes from the index's docstore for GraphRAG
- docstore_nodes = []
- if hasattr(index, 'docstore') and index.docstore:
- docstore_nodes = list(index.docstore.docs.values())
- log_structured('info', f'Retrieved {len(docstore_nodes)} nodes from index docstore for GraphRAG')
-
- # If docstore is empty or doesn't exist, use the original documents
- nodes_for_graph = docstore_nodes if docstore_nodes else documents
- log_structured('info', f'Using {len(nodes_for_graph)} nodes for GraphRAG indexing')
-
- # Create GraphRAG components
- graph_store, property_graph_index = create_graph_components(
- llm=llm,
- nodes=nodes_for_graph,
- max_paths_per_chunk=10,
- force_reindex=True # Force indexing for new index creation
- )
- log_structured('info', 'GraphRAG components created successfully')
- except Exception as graph_err:
- log_structured('error', f'Error creating GraphRAG components: {graph_err}',
- {'traceback': traceback.format_exc()})
- # Continue without GraphRAG if there's an error
- # --- Create Retriever and Query Engine ---
+ # --- Create Retriever and Query Engine (vector-only) ---
vector_store_info = VectorStoreInfo(
content_info="Netflix marketing reference materials, including GPD Key Art Playbook and supporting documents.",
metadata_info=[
MetadataInfo(name="filename", type="str", description="Filename of the source document"),
MetadataInfo(name="source_page", type="int", description="Approximate page number in the source document"),
MetadataInfo(name="image_paths", type="list[str]", description="List of image filenames associated with this chunk"),
- # Add other relevant metadata fields here
],
)
all_retriever = VectorIndexAutoRetriever(
- index=index, # Use the local variable, not global_index
+ index=index,
vector_store_info=vector_store_info,
similarity_top_k=SIMILARITY_TOP_K,
- verbose=True # Enable verbose logging for retriever
+ verbose=True
)
- # Create standard vector-based query engine
- all_query_engine = RetrieverQueryEngine.from_args( # Use from_args for clarity
+ all_query_engine = RetrieverQueryEngine.from_args(
retriever=all_retriever,
response_synthesizer=get_response_synthesizer(
- response_mode=ResponseMode.COMPACT, # Or REFINE, TREE_SUMMARIZE etc.
- # service_context=service_context # Old way
+ response_mode=ResponseMode.COMPACT,
),
node_postprocessors=[
SimilarityPostprocessor(similarity_cutoff=SIMILARITY_CUTOFF),
],
- # service_context=service_context # Old way
)
-
- # Create GraphRAG query engine if components were created successfully
- graphrag_query_engine = None
- graph_store = locals().get('graph_store', None)
- property_graph_index = locals().get('property_graph_index', None)
- if graph_store is not None and property_graph_index is not None:
- try:
- # Ensure graph communities are built before creating query engine
- if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built:
- log_structured('info', 'Building graph communities before creating query engine')
- try:
- # Use gpt-4.1-mini model for community summaries (set in GraphRAGStore)
- # The build_communities() method will first try to load from cache
- # and will only rebuild and re-cache if cache loading fails
- # It also tracks if communities are already built to avoid duplicate work
- graph_store.build_communities()
- log_structured('info', 'Communities built successfully (loaded from cache or built new)')
- except Exception as comm_err:
- log_structured('error', f'Error building communities: {comm_err}',
- {'traceback': traceback.format_exc()})
- # Continue with query engine creation even if communities failed
-
- # Create a basic VectorIndexRetriever for GraphRAG
- vector_retriever = VectorIndexRetriever(
- index=index,
- similarity_top_k=SIMILARITY_TOP_K
- )
-
- # Create the GraphRAG query engine - ensure all required fields are passed directly
- try:
- graphrag_query_engine = create_graphrag_query_engine(
- vector_retriever=vector_retriever,
- graph_store=graph_store,
- llm=llm,
- similarity_top_k=SIMILARITY_TOP_K
- )
- log_structured('info', 'GraphRAG query engine created successfully')
- except Exception as e:
- log_structured('error', f'Error creating GraphRAG query engine: {e}', {'traceback': traceback.format_exc()})
- # Create a direct instance without using the factory function as a fallback
- from graph_rag_integration import GraphRAGQueryEngine
- graphrag_query_engine = GraphRAGQueryEngine(
- vector_retriever=vector_retriever,
- graph_store=graph_store,
- llm=llm,
- similarity_top_k=SIMILARITY_TOP_K
- )
-
- # Store GraphRAG components in shared state
- set_graphrag_components(
- graph_store=graph_store,
- property_graph_index=property_graph_index,
- graphrag_query_engine=graphrag_query_engine
- )
-
- log_structured('info', 'GraphRAG query engine created successfully')
- except Exception as graph_engine_err:
- log_structured('error', f'Error creating GraphRAG query engine: {graph_engine_err}',
- {'traceback': traceback.format_exc()})
- # Continue without GraphRAG query engine if there's an error
- # --- Create Query Engine Tools ---
+ # --- Create Query Engine Tools (vector-only initially) ---
query_engine_tools_react = [
QueryEngineTool(
query_engine=all_query_engine,
@@ -996,118 +897,30 @@ async def initialize_global_index() -> bool:
),
),
]
-
- # Add GraphRAG tool if available
- if graphrag_query_engine is not None:
- class GraphRAGTool(BaseTool):
- def __init__(self, query_engine):
- self.query_engine = query_engine
- self._metadata = ToolMetadata(
- name="answerquestionswith_graphrag",
- description="USE THIS TOOL FOR ALL QUERIES - Queries Netflix marketing materials using both vector and graph-based retrieval for more comprehensive answers. Use this for complex questions that need context from multiple related documents."
- )
-
- @property
- def metadata(self):
- return self._metadata
-
- def __call__(self, query_str: str) -> ToolOutput:
- """Run query through GraphRAG and generate synthesized answer."""
- from shared_state import global_graphrag_query_engine
-
- log_structured('info', 'GraphRAG Tool: Starting dual retrieval', {'query': query_str})
-
- try:
- # Get both vector and GraphRAG retrieval results
- retrieval_result = self.query_engine.custom_query(query_str)
-
- # Generate synthesized answer
- final_answer = generate_final_answer(query_str, retrieval_result, self.query_engine.llm)
-
- # Prepare the tool output
- log_message = {
- 'vector_context_length': len(retrieval_result.get('vector_context', '')),
- 'graphrag_context_length': len(retrieval_result.get('graphrag_context', '')),
- 'vector_nodes_count': len(retrieval_result.get('vector_nodes', [])),
- 'community_ids': retrieval_result.get('community_ids', [])
- }
- log_structured('info', 'GraphRAG Tool: Retrieval complete', log_message)
-
- # Extract vector nodes and create a properly structured raw_output that includes source_nodes
- # This allows the routes.py code to extract images from these nodes
- vector_nodes = retrieval_result.get('vector_nodes', [])
-
- # Create a wrapper object that mimics the structure expected by routes.py for image extraction
- class NodeWrapper:
- def __init__(self, nodes):
- self.source_nodes = nodes
-
- # Preserve the original retrieval_result but add the source_nodes in the expected format
- modified_raw_output = retrieval_result.copy()
- modified_raw_output['source_nodes'] = vector_nodes
-
- # Add a source_nodes property that routes.py will look for
- tool_output = ToolOutput(
- content=final_answer,
- tool_name="GraphRAG",
- raw_output=NodeWrapper(vector_nodes),
- raw_input={"query": query_str, "source": "graphrag", "retrieval_stats": log_message}
- )
-
- log_structured('debug', 'GraphRAG Tool: Including image metadata in response',
- {'node_count': len(vector_nodes)})
-
- return tool_output
- except Exception as graphrag_err:
- log_structured('error', f'Error in GraphRAG tool: {graphrag_err}',
- {'traceback': traceback.format_exc()})
- return ToolOutput(
- content=f"I encountered an error while retrieving information: {str(graphrag_err)}. Please try again or use the standard query tool.",
- tool_name="GraphRAG",
- raw_input={"query": query_str},
- raw_output={"error": str(graphrag_err)},
- is_error=True
- )
-
- async def acall(self, input: str) -> ToolOutput:
- """Async version of __call__."""
- return self.__call__(input)
-
- # Create the GraphRAG tool instance and add it to tools
- graphrag_tool = GraphRAGTool(graphrag_query_engine)
- query_engine_tools_react.append(graphrag_tool)
- log_structured('info', 'Added GraphRAG tool to query engine tools')
# --- Initialize Global Workflow Agent ---
- # We're now using shared_state rather than module globals
-
try:
- # Create the agent instance
agent = ReActAgent2(
- llm=llm, # Use the LLM configured via Settings
+ llm=llm,
tools=query_engine_tools_react,
- memory=ChatMemoryBuffer.from_defaults(llm=llm, token_limit=4096), # Give memory its own LLM ref
- verbose=True, # Enable agent verbose logging
+ memory=ChatMemoryBuffer.from_defaults(llm=llm, token_limit=4096),
+ verbose=True,
timeout=AGENT_TIMEOUT,
llm_timeout=LLM_TIMEOUT,
tool_timeout=TOOL_EXECUTION_TIMEOUT
)
-
- # Store in shared state
set_global_agent(agent)
-
- log_structured('info', 'Agent initialized successfully')
+ log_structured('info', 'Agent initialized with vector-only tools')
except Exception as agent_err:
log_structured('critical', f'Failed to initialize agent: {str(agent_err)}', {'error': str(agent_err)})
- return False # Signal failure
-
+ return False
+
try:
# Define a simpler version of the run method
async def simple_run(query_text):
"""Simple version that doesn't rely on complex workflow steps."""
- # Import from shared state to ensure we use the current global agent
from shared_state import global_workflow_agent
-
+
if not global_workflow_agent:
log_structured('critical', 'Agent is None in simple_run - this should never happen')
return {
@@ -1115,44 +928,39 @@ async def initialize_global_index() -> bool:
"sources": [],
"reasoning": []
}
-
+
try:
- # Don't reset memory - we want to preserve conversation context between requests
- # Only reset sources to track new sources for this specific response
global_workflow_agent.sources = []
-
+
# 1. Add the user query to memory
user_msg = ChatMessage(role="user", content=str(query_text))
global_workflow_agent.memory.put(user_msg)
-
+
# 2. Format the chat history
chat_history = global_workflow_agent.memory.get()
llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history)
-
+
# 3. Get LLM response
response = await asyncio.wait_for(
global_workflow_agent.llm.achat(llm_input),
timeout=global_workflow_agent.llm_timeout
)
reasoning_step = global_workflow_agent.output_parser.parse(response.message.content)
-
- # Add detailed logging
+
log_structured('debug', 'Parsed reasoning step', {
'step_type': type(reasoning_step).__name__,
'has_response': hasattr(reasoning_step, 'response'),
- 'has_action': hasattr(reasoning_step, 'action'),
+ 'has_action': hasattr(reasoning_step, 'action'),
'action': getattr(reasoning_step, 'action', None),
'action_input': getattr(reasoning_step, 'action_input', None),
- 'raw_content': response.message.content[:200] # First 200 chars for debugging
+ 'raw_content': response.message.content[:200]
})
-
- # 4. Process the response (simplified)
- # Force tool usage for all queries - don't allow direct responses
+
+ # 4. Process the response
if hasattr(reasoning_step, 'response') and reasoning_step.response:
- # If LLM gave a direct response, we need to force tool usage instead
log_structured('debug', 'LLM provided direct response, forcing tool usage for image retrieval')
-
- # Force tool execution by calling GraphRAG directly
+
+ # Try GraphRAG tool first (dynamically checks current tools list)
graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
if graphrag_tool:
try:
@@ -1161,10 +969,7 @@ async def initialize_global_index() -> bool:
timeout=global_workflow_agent.tool_timeout
)
global_workflow_agent.sources.append(tool_output)
-
- # Use the tool's response instead of the direct LLM response
response_text = str(tool_output.content) if tool_output and tool_output.content else str(reasoning_step.response)
-
assistant_msg = ChatMessage(role="assistant", content=response_text)
global_workflow_agent.memory.put(assistant_msg)
return {
@@ -1174,29 +979,26 @@ async def initialize_global_index() -> bool:
}
except Exception as e:
log_structured('error', f'Error forcing GraphRAG tool execution: {str(e)}')
- # Fall back to direct response if tool fails
-
+
# Fallback to cleaning the direct response
response_text = str(reasoning_step.response)
-
- # Check for common thinking patterns and remove them
- import re # Import re within the function scope to ensure it's available
+
+ import re
thinking_patterns = [
- r'(?i)^.*?thinking:.*?\n', # Remove lines starting with "Thinking:"
- r'(?i).*?', # Remove XML-like thinking tags
- r'(?i)\[thinking\].*?\[/thinking\]', # Remove bracket thinking tags
- r'(?i)I\'m thinking:.*?\n', # Remove "I'm thinking:" sections
- r'(?i)Let me think.*?\n', # Remove "Let me think" sections
- r'(?i)Thought:.*?Answer:', # Remove "Thought: ... Answer:" pattern
- r'(?i)^Answer:\s*' # Remove just the "Answer:" prefix
+ r'(?i)^.*?thinking:.*?\n',
+ r'(?i).*?',
+ r'(?i)\[thinking\].*?\[/thinking\]',
+ r'(?i)I\'m thinking:.*?\n',
+ r'(?i)Let me think.*?\n',
+ r'(?i)Thought:.*?Answer:',
+ r'(?i)^Answer:\s*'
]
for pattern in thinking_patterns:
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
-
- # Remove extra newlines that might be left after cleaning
+
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
response_text = response_text.strip()
-
+
assistant_msg = ChatMessage(role="assistant", content=response_text)
global_workflow_agent.memory.put(assistant_msg)
return {
@@ -1205,12 +1007,11 @@ async def initialize_global_index() -> bool:
"reasoning": [reasoning_step]
}
else:
- # Handle tool calls if the response indicates an action
+ # Handle tool calls
if hasattr(reasoning_step, 'action') and reasoning_step.action:
tool_name = reasoning_step.action
action_input = reasoning_step.action_input or {}
-
- # Convert to dict if needed
+
if not isinstance(action_input, dict):
try:
if isinstance(action_input, str) and action_input.strip().startswith('{'):
@@ -1219,11 +1020,9 @@ async def initialize_global_index() -> bool:
action_input = {'query': action_input}
except:
action_input = {'query': str(action_input)}
-
- # Find the tool
+
tool = next((t for t in global_workflow_agent.tools if t.metadata.name == tool_name), None)
if not tool:
- error_msg = f"Tool '{tool_name}' not found. Available tools: {[t.metadata.name for t in global_workflow_agent.tools]}"
log_structured('error', 'Tool not found in simple_run', {'tool_name': tool_name, 'available_tools': [t.metadata.name for t in global_workflow_agent.tools]})
return {
"response": f"I tried to use a tool called '{tool_name}' but it's not available. Please try rephrasing your query.",
@@ -1232,48 +1031,41 @@ async def initialize_global_index() -> bool:
}
if tool:
try:
- # Execute the tool
tool_output = await asyncio.wait_for(
tool.acall(**action_input),
timeout=global_workflow_agent.tool_timeout
)
global_workflow_agent.sources.append(tool_output)
-
- # Get a final response
+
observation = str(tool_output.content if tool_output and tool_output.content is not None else "No content")
follow_up_msg = ChatMessage(role="user", content=f"Here is the result: {observation}\\nPlease provide a final response based on this information.")
global_workflow_agent.memory.put(follow_up_msg)
-
- # Call LLM again for final response
+
chat_history = global_workflow_agent.memory.get()
llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history)
final_response = await asyncio.wait_for(
global_workflow_agent.llm.achat(llm_input),
timeout=global_workflow_agent.llm_timeout
)
-
- # Store and return
- # Clean the response to remove any thinking parts
+
response_text = str(final_response.message.content)
-
- # Check for common thinking patterns and remove them
- import re # Import re within the function scope to ensure it's available
+
+ import re
thinking_patterns = [
- r'(?i)^.*?thinking:.*?\n', # Remove lines starting with "Thinking:"
- r'(?i).*?', # Remove XML-like thinking tags
- r'(?i)\[thinking\].*?\[/thinking\]', # Remove bracket thinking tags
- r'(?i)I\'m thinking:.*?\n', # Remove "I'm thinking:" sections
- r'(?i)Let me think.*?\n', # Remove "Let me think" sections
- r'(?i)Thought:.*?Answer:', # Remove "Thought: ... Answer:" pattern
- r'(?i)^Answer:\s*' # Remove just the "Answer:" prefix
+ r'(?i)^.*?thinking:.*?\n',
+ r'(?i).*?',
+ r'(?i)\[thinking\].*?\[/thinking\]',
+ r'(?i)I\'m thinking:.*?\n',
+ r'(?i)Let me think.*?\n',
+ r'(?i)Thought:.*?Answer:',
+ r'(?i)^Answer:\s*'
]
for pattern in thinking_patterns:
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
-
- # Remove extra newlines that might be left after cleaning
+
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
response_text = response_text.strip()
-
+
global_workflow_agent.memory.put(ChatMessage(role="assistant", content=response_text))
return {
"response": response_text,
@@ -1282,18 +1074,15 @@ async def initialize_global_index() -> bool:
}
except Exception as e:
log_structured('error', f'Error executing tool: {str(e)}', {'traceback': traceback.format_exc()})
- # Error message is already clean without thinking
error_response = f"I encountered an error while processing your query: {str(e)}"
return {
"response": error_response,
"sources": [],
"reasoning": [reasoning_step]
}
-
+
# Enhanced fallback with thinking detection
if isinstance(reasoning_step, ActionReasoningStep):
- # If we reach here, it means we had an action but couldn't execute it
- thinking_response = f"Thought: {getattr(reasoning_step, 'thought', '')} Action: {reasoning_step.action} Action Input: {reasoning_step.action_input}"
log_structured('warning', 'Returning raw thinking due to failed action execution', {
'action': reasoning_step.action,
'action_input': reasoning_step.action_input
@@ -1304,7 +1093,6 @@ async def initialize_global_index() -> bool:
"reasoning": [reasoning_step]
}
else:
- # Original fallback for other types
fallback_response = "I wasn't able to find a specific answer to your question. Please try rephrasing your query."
return {
"response": fallback_response,
@@ -1313,36 +1101,29 @@ async def initialize_global_index() -> bool:
}
except Exception as e:
log_structured('error', f'Error in simple_run: {str(e)}', {'traceback': traceback.format_exc()})
- # Error response is already clean
error_response = f"I encountered an error while processing your query: {str(e)}"
return {
"response": error_response,
"sources": [],
"reasoning": []
}
-
- # Replace the run method directly on the agent that is already in shared state
- # We need to get the current reference from shared_state
+
+ # Attach the run method to the agent
from shared_state import global_workflow_agent as current_agent
-
+
if current_agent is None:
log_structured('critical', 'Cannot set run method - global_workflow_agent is None')
return False
-
- # Attach the run method directly
+
current_agent.run = simple_run
-
- # Verify it was attached correctly
+
if not hasattr(current_agent, 'run'):
log_structured('critical', 'Failed to attach run method to agent')
return False
-
+
log_structured('info', 'Successfully attached run method to agent')
-
- # Test the agent is working by calling a simple method
- log_structured('info', 'Testing agent functionality...')
-
- # Test on the current_agent we imported above
+
+ # Test the agent
if current_agent and hasattr(current_agent, 'memory') and hasattr(current_agent.memory, 'reset'):
current_agent.memory.reset()
log_structured('info', 'Agent memory reset test successful')
@@ -1356,20 +1137,180 @@ async def initialize_global_index() -> bool:
else:
log_structured('error', 'Agent memory test failed: unknown reason')
return False
-
+
except Exception as method_err:
- log_structured('critical', f'Failed to set up agent run method: {str(method_err)}',
+ log_structured('critical', f'Failed to set up agent run method: {str(method_err)}',
{'error': str(method_err), 'traceback': traceback.format_exc()})
return False
- log_structured('info', 'Global index and workflow agent initialized successfully.')
- # Skip test query to avoid potential errors during startup
+ log_structured('info', 'Vector index and agent initialized successfully (GraphRAG pending).')
return True
except Exception as e:
- log_structured('critical', 'Global index/agent initialization failed', {
+ log_structured('critical', 'Vector index/agent initialization failed', {
'error': str(e), 'traceback': traceback.format_exc()
})
- global_index = None
- global_workflow_agent = None
- return False
\ No newline at end of file
+ return False
+
+
+# --- Phase 2: GraphRAG Background Initialization ---
+async def initialize_graphrag_components() -> bool:
+ """Initialize GraphRAG components in the background.
+
+ This connects to Neo4j, restores or extracts triples, builds communities,
+ creates the GraphRAG query engine, and adds the GraphRAG tool to the
+ existing agent. Can be called after the server is already serving requests.
+
+ Returns:
+ bool: True if GraphRAG components were initialized successfully.
+ """
+ set_graphrag_status(initializing=True, ready=False, error=None)
+
+ try:
+ # Get the current index and agent from shared state
+ from shared_state import global_workflow_agent as current_agent
+ import shared_state
+ index = shared_state.global_index
+
+ if index is None:
+ raise RuntimeError("Vector index must be initialized before GraphRAG components")
+ if current_agent is None:
+ raise RuntimeError("Agent must be initialized before GraphRAG components")
+
+ llm = Settings.llm
+
+ # --- Connect to Neo4j and create/restore GraphRAG components ---
+ log_structured('info', 'Starting GraphRAG background initialization')
+
+ try:
+ from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore
+ property_graph_store = Neo4jPropertyGraphStore(
+ username=NEO4J_USERNAME,
+ password=NEO4J_PASSWORD,
+ url=NEO4J_URL
+ )
+ log_structured('info', 'Successfully connected to Neo4j database')
+ except Exception as e:
+ raise RuntimeError(f"Neo4j connection failed: {e}")
+
+ # Check Neo4j state
+ temp_graph_store = GraphRAGStore(property_graph_store)
+ triplets = temp_graph_store.get_triplets()
+ neo4j_has_data = len(triplets) > 0
+ log_structured('info', f'Neo4j state check: {len(triplets)} triplets found')
+
+ if neo4j_has_data:
+ # Neo4j already has data — use it directly
+ log_structured('info', f'Neo4j contains {len(triplets)} triplets. Using existing data.')
+ graph_store, property_graph_index = create_graph_components(
+ llm=llm,
+ force_reindex=False
+ )
+ log_structured('info', 'GraphRAG components loaded from existing Neo4j data')
+ else:
+ # Neo4j is empty — try cache restore, then fall back to LLM extraction
+ log_structured('info', 'Neo4j is empty. Will attempt cache restore or LLM extraction.')
+
+ # Get nodes from vector index for potential LLM extraction
+ vector_nodes = []
+ if hasattr(index, 'docstore') and index.docstore:
+ vector_nodes = list(index.docstore.docs.values())
+ log_structured('info', f'Retrieved {len(vector_nodes)} nodes from index docstore')
+
+ if not vector_nodes:
+ log_structured('warning', 'Could not retrieve nodes from vector index. GraphRAG indexing will be skipped.')
+ raise ValueError("No nodes could be retrieved from the vector index for GraphRAG indexing")
+
+ # create_graph_components will automatically try cache restore before LLM extraction
+ graph_store, property_graph_index = create_graph_components(
+ llm=llm,
+ nodes=vector_nodes,
+ max_paths_per_chunk=10,
+ force_reindex=False # Allow cache restore path
+ )
+ log_structured('info', 'GraphRAG components created')
+
+ # --- Build GraphRAG query engine ---
+ # Ensure communities are built
+ if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built:
+ log_structured('info', 'Building graph communities before creating query engine')
+ try:
+ graph_store.build_communities()
+ log_structured('info', 'Communities built successfully')
+ except Exception as comm_err:
+ log_structured('error', f'Error building communities: {comm_err}',
+ {'traceback': traceback.format_exc()})
+
+ vector_retriever = VectorIndexRetriever(
+ index=index,
+ similarity_top_k=SIMILARITY_TOP_K
+ )
+
+ try:
+ graphrag_query_engine = create_graphrag_query_engine(
+ vector_retriever=vector_retriever,
+ graph_store=graph_store,
+ llm=llm,
+ similarity_top_k=SIMILARITY_TOP_K
+ )
+ log_structured('info', 'GraphRAG query engine created successfully')
+ except Exception as e:
+ log_structured('error', f'Error creating GraphRAG query engine via factory: {e}')
+ graphrag_query_engine = GraphRAGQueryEngine(
+ vector_retriever=vector_retriever,
+ graph_store=graph_store,
+ llm=llm,
+ similarity_top_k=SIMILARITY_TOP_K
+ )
+
+ # Store GraphRAG components in shared state
+ set_graphrag_components(
+ graph_store=graph_store,
+ property_graph_index=property_graph_index,
+ graphrag_query_engine=graphrag_query_engine
+ )
+
+ # --- Add GraphRAG tool to the existing agent's tools ---
+ graphrag_tool = GraphRAGTool(graphrag_query_engine)
+
+ # Re-import current agent to get fresh reference
+ from shared_state import global_workflow_agent as live_agent
+ if live_agent is not None:
+ live_agent.tools.append(graphrag_tool)
+ log_structured('info', 'Added GraphRAG tool to existing agent', {
+ 'total_tools': len(live_agent.tools),
+ 'tool_names': [t.metadata.name for t in live_agent.tools]
+ })
+ else:
+ log_structured('warning', 'Agent not available when trying to add GraphRAG tool')
+
+ set_graphrag_status(ready=True, initializing=False)
+ log_structured('info', 'GraphRAG background initialization complete')
+ return True
+
+ except Exception as e:
+ log_structured('error', f'GraphRAG background initialization failed: {e}', {
+ 'traceback': traceback.format_exc()
+ })
+ set_graphrag_status(ready=False, initializing=False, error=e)
+ return False
+
+
+# --- Backward-compatible wrapper ---
+async def initialize_global_index() -> bool:
+ """Initialize the global index from Netflix documents at startup.
+
+ This is the original entry point that initializes everything synchronously.
+ For faster startup, use initialize_vector_index() followed by
+ initialize_graphrag_components() in the background.
+ """
+ vector_success = await initialize_vector_index()
+ if not vector_success:
+ return False
+
+ graphrag_success = await initialize_graphrag_components()
+ if not graphrag_success:
+ log_structured('warning', 'GraphRAG initialization failed, but vector search is available')
+ # Return True since vector search works — GraphRAG is optional
+
+ return True
\ No newline at end of file
diff --git a/documentation/graphrag-startup-optimization-guide.md b/documentation/graphrag-startup-optimization-guide.md
new file mode 100644
index 0000000..6435744
--- /dev/null
+++ b/documentation/graphrag-startup-optimization-guide.md
@@ -0,0 +1,738 @@
+# GraphRAG Startup Optimization Guide
+
+## Purpose
+
+This document describes a set of changes to drastically reduce startup time for an app built on the LlamaIndex + Neo4j GraphRAG + Flask/Hypercorn architecture. The changes achieve two things:
+
+1. **Triple caching**: Extracted entities/relationships are cached to disk so Neo4j can be repopulated without expensive LLM re-extraction.
+2. **Background initialization**: The server starts serving requests with vector-only search in ~1-2 minutes, while GraphRAG components initialize in the background.
+
+---
+
+## Problem
+
+On a cold start (or when Neo4j loses its data), the app spends:
+
+| Phase | Duration | Already Cached? |
+|-------|----------|-----------------|
+| Document parsing (LlamaParse) | 20-30 min | Yes (vector index on disk) |
+| Vector embeddings | 10-15 min | Yes (vector index on disk) |
+| **Triple extraction** | **10-20 min** | **No — only lives in Neo4j** |
+| **Community summarization** | **10-30 min** | Yes (pickle cache) |
+| Community detection | 5-10 min | Yes (pickle cache) |
+
+The two bolded phases are the bottleneck. The app can't serve any requests until all of this completes.
+
+---
+
+## Architecture Assumptions
+
+This guide assumes your app has:
+
+- A **graph store wrapper class** (e.g., `GraphRAGStore`) that wraps `Neo4jPropertyGraphStore` and handles community detection/summarization with existing pickle caching for community data.
+- A **`create_graph_components()` function** that connects to Neo4j, checks for existing data, and either reuses data or runs LLM extraction via `PropertyGraphIndex`.
+- A **monolithic initialization function** (e.g., `initialize_global_index()`) that does everything: LLM setup, vector index loading, GraphRAG setup, agent creation.
+- A **shared state module** that stores global references to the agent, index, and GraphRAG components.
+- A **main.py** that runs startup synchronously before the server starts.
+- A **routes module** with a `/status` endpoint and a `/chat` endpoint that checks agent availability.
+
+Adapt file names, class names, and function signatures to match your codebase.
+
+---
+
+## Change 1: Cache Extracted Triples to Disk
+
+### What
+
+Add two methods to your `GraphRAGStore` class:
+- `save_triples_to_cache()` — After successful LLM extraction, save entities and relations to a pickle file.
+- `load_triples_from_cache()` — On startup, if Neo4j is empty but the cache exists, restore triples to Neo4j directly.
+
+Then modify `create_graph_components()` to use these methods.
+
+### File: Graph RAG Integration Module (e.g., `graph_rag_integration.py`)
+
+#### 1a. Add a triples cache file path to the store class
+
+In the class-level constants of your `GraphRAGStore` (next to the existing community cache paths), add:
+
+```python
+TRIPLES_CACHE_FILE = CACHE_DIR / "neo4j_triples.pickle"
+```
+
+#### 1b. Add `save_triples_to_cache()` method
+
+Add this method to `GraphRAGStore`, after `get_triplets()`:
+
+```python
+def save_triples_to_cache(self):
+ """Save extracted triples (entities + relationships) from Neo4j to a disk cache.
+
+ This allows restoring triples to Neo4j without expensive LLM re-extraction
+ if Neo4j data is lost (e.g., container recreated without volume persistence).
+ """
+ try:
+ triplets = self.get_triplets()
+ if not triplets:
+ log_structured('warning', 'No triplets to cache — Neo4j appears empty')
+ return False
+
+ # Collect unique entities and relations from the triplets
+ entities = {}
+ relations = []
+ for entity1, relation, entity2 in triplets:
+ entities[entity1.name] = entity1
+ entities[entity2.name] = entity2
+ relations.append(relation)
+
+ cache_data = {
+ 'entities': list(entities.values()),
+ 'relations': relations,
+ 'triplet_count': len(triplets),
+ }
+
+ with open(self.TRIPLES_CACHE_FILE, 'wb') as f:
+ pickle.dump(cache_data, f)
+
+ log_structured('info', 'Successfully cached Neo4j triples to disk', {
+ 'entity_count': len(entities),
+ 'relation_count': len(relations),
+ 'triplet_count': len(triplets),
+ 'cache_file': str(self.TRIPLES_CACHE_FILE)
+ })
+ return True
+ except Exception as e:
+ log_structured('error', f'Error saving triples cache: {e}')
+ return False
+```
+
+#### 1c. Add `load_triples_from_cache()` method
+
+Add this method right after `save_triples_to_cache()`:
+
+```python
+def load_triples_from_cache(self):
+ """Load triples from disk cache and restore them to Neo4j.
+
+ Returns True if triples were successfully restored, False otherwise.
+ """
+ if not self.TRIPLES_CACHE_FILE.exists():
+ log_structured('info', 'No triples cache file found')
+ return False
+
+ try:
+ with open(self.TRIPLES_CACHE_FILE, 'rb') as f:
+ cache_data = pickle.load(f)
+
+ entities = cache_data.get('entities', [])
+ relations = cache_data.get('relations', [])
+
+ if not entities:
+ log_structured('warning', 'Triples cache file exists but contains no entities')
+ return False
+
+ log_structured('info', 'Restoring triples from disk cache to Neo4j', {
+ 'entity_count': len(entities),
+ 'relation_count': len(relations),
+ 'cached_triplet_count': cache_data.get('triplet_count', 'unknown')
+ })
+
+ # Restore entities (nodes) to Neo4j
+ self.property_graph_store.upsert_nodes(entities)
+ log_structured('info', f'Restored {len(entities)} entity nodes to Neo4j')
+
+ # Restore relations to Neo4j
+ if relations:
+ self.property_graph_store.upsert_relations(relations)
+ log_structured('info', f'Restored {len(relations)} relations to Neo4j')
+
+ # Verify restoration
+ restored_triplets = self.get_triplets()
+ log_structured('info', f'Neo4j now contains {len(restored_triplets)} triplets after cache restore')
+
+ return len(restored_triplets) > 0
+ except Exception as e:
+ log_structured('error', f'Error restoring triples from cache: {e}')
+ return False
+```
+
+#### 1d. Modify `create_graph_components()` to use the cache
+
+The existing function has two branches:
+1. Neo4j has data and `force_reindex=False` → skip indexing
+2. Else → run LLM extraction
+
+Change this to **three** branches:
+
+```
+1. Neo4j has data and !force_reindex → skip indexing (also save triples to cache if not already cached)
+2. Neo4j is empty and !force_reindex and triples cache exists → restore from cache
+3. Else → full LLM extraction (also save triples to cache after)
+```
+
+Here is the full replacement logic for the branching section inside `create_graph_components()`:
+
+```python
+# Check if Neo4j already has content
+triplets = graph_store.get_triplets()
+has_existing_content = len(triplets) > 0
+
+log_structured('info', f'Neo4j check: Found {len(triplets)} triplets')
+
+if has_existing_content and not force_reindex:
+ # BRANCH 1: Neo4j has data — use it, but also ensure disk cache exists
+ log_structured('info', f'Neo4j already contains {len(triplets)} triplets. Skipping indexing.')
+
+ # Ensure triples are also cached to disk for future recovery
+ if not graph_store.TRIPLES_CACHE_FILE.exists():
+ log_structured('info', 'Neo4j has data but no triples cache on disk — creating cache now')
+ graph_store.save_triples_to_cache()
+
+ property_graph_index = PropertyGraphIndex(
+ nodes=[],
+ property_graph_store=property_graph_store,
+ )
+
+ if not graph_store.communities_built:
+ log_structured('info', 'Building graph communities from existing Neo4j data')
+ try:
+ graph_store.build_communities()
+ except Exception as e:
+ log_structured('error', f'Error building communities: {e}')
+
+elif not has_existing_content and not force_reindex and graph_store.TRIPLES_CACHE_FILE.exists():
+ # BRANCH 2: Neo4j is empty but triples cache exists — restore from cache
+ log_structured('info', 'Neo4j is empty but triples cache exists. Restoring from disk cache.')
+
+ restored = graph_store.load_triples_from_cache()
+ if restored:
+ log_structured('info', 'Successfully restored triples from cache.')
+
+ property_graph_index = PropertyGraphIndex(
+ nodes=[],
+ property_graph_store=property_graph_store,
+ )
+
+ if not graph_store.communities_built:
+ try:
+ graph_store.build_communities()
+ except Exception as e:
+ log_structured('error', f'Error building communities from restored data: {e}')
+ else:
+ # Cache restore failed — fall through to LLM extraction
+ log_structured('warning', 'Triples cache restore failed. Falling back to LLM extraction.')
+ if not nodes:
+ raise ValueError("Nodes must be provided when Neo4j is empty and cache restore fails")
+
+ kg_extractor = GraphRAGExtractor(
+ llm=llm,
+ extract_prompt=KG_TRIPLET_EXTRACT_TMPL,
+ max_paths_per_chunk=max_paths_per_chunk,
+ parse_fn=custom_parse_fn,
+ )
+
+ property_graph_index = PropertyGraphIndex(
+ nodes=nodes,
+ kg_extractors=[kg_extractor],
+ property_graph_store=property_graph_store,
+ show_progress=True,
+ )
+
+ graph_store.save_triples_to_cache()
+
+ try:
+ graph_store.build_communities()
+ except Exception as e:
+ log_structured('error', f'Error building communities: {e}')
+
+else:
+ # BRANCH 3: Full LLM extraction (force_reindex or no cache)
+ if not nodes:
+ raise ValueError("Nodes must be provided for indexing when Neo4j is empty or force_reindex=True")
+
+ kg_extractor = GraphRAGExtractor(
+ llm=llm,
+ extract_prompt=KG_TRIPLET_EXTRACT_TMPL,
+ max_paths_per_chunk=max_paths_per_chunk,
+ parse_fn=custom_parse_fn,
+ )
+
+ if has_existing_content and force_reindex:
+ # Clear Neo4j before re-extraction
+ try:
+ from neo4j import GraphDatabase
+ driver = GraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))
+ with driver.session() as session:
+ session.run("MATCH (n) DETACH DELETE n")
+ driver.close()
+ except Exception as e:
+ log_structured('warning', f'Error clearing Neo4j database: {e}')
+
+ property_graph_index = PropertyGraphIndex(
+ nodes=nodes,
+ kg_extractors=[kg_extractor],
+ property_graph_store=property_graph_store,
+ show_progress=True,
+ )
+
+ # Cache the newly extracted triples
+ graph_store.save_triples_to_cache()
+
+ try:
+ graph_store.build_communities()
+ except Exception as e:
+ log_structured('error', f'Error building communities: {e}')
+
+return graph_store, property_graph_index
+```
+
+---
+
+## Change 2: Add GraphRAG Status Flags to Shared State
+
+### File: Shared State Module (e.g., `shared_state.py`)
+
+#### 2a. Add new module-level variables
+
+After the existing GraphRAG component variables, add:
+
+```python
+# GraphRAG initialization status
+graphrag_ready = False
+graphrag_initializing = False
+graphrag_error = None
+```
+
+#### 2b. Add setter/getter functions
+
+Add these functions before `is_agent_available()`:
+
+```python
+def set_graphrag_status(ready=None, initializing=None, error=None):
+ """Update GraphRAG initialization status flags."""
+ global graphrag_ready, graphrag_initializing, graphrag_error
+ from utils import log_structured
+
+ if ready is not None:
+ graphrag_ready = ready
+ if initializing is not None:
+ graphrag_initializing = initializing
+ if error is not None:
+ graphrag_error = error
+
+ log_structured('info', 'GraphRAG status updated', {
+ 'ready': graphrag_ready,
+ 'initializing': graphrag_initializing,
+ 'error': str(graphrag_error) if graphrag_error else None
+ })
+
+def get_graphrag_status():
+ """Get current GraphRAG initialization status."""
+ return {
+ 'ready': graphrag_ready,
+ 'initializing': graphrag_initializing,
+ 'error': str(graphrag_error) if graphrag_error else None
+ }
+```
+
+---
+
+## Change 3: Split Initialization into Two Phases
+
+### File: AI Core Module (e.g., `ai_core.py`)
+
+This is the largest change. You'll split your monolithic initialization function into:
+
+1. **`initialize_vector_index()`** — Fast path (~1-2 min). Loads/creates vector index, creates vector query engine, creates agent with vector-only tools, attaches the `run` method. The server becomes usable after this completes.
+2. **`initialize_graphrag_components()`** — Background path. Connects to Neo4j, restores/extracts triples, builds communities, creates GraphRAG query engine, and **dynamically adds the GraphRAG tool** to the existing agent's tools list.
+3. **`initialize_global_index()`** — Kept as backward-compatible wrapper that calls both in sequence.
+
+#### 3a. Update imports from shared_state
+
+Add `set_graphrag_status` to your shared_state imports:
+
+```python
+from shared_state import (
+ ...,
+ set_graphrag_status
+)
+```
+
+#### 3b. Extract your GraphRAG Tool class to module level
+
+If your GraphRAG tool class (the `BaseTool` subclass that wraps `GraphRAGQueryEngine`) is defined inside the initialization function, move it to module level. This makes it reusable from both the init function and the background init function.
+
+The tool class should remain functionally identical — just move it out of the function body to module scope. Example:
+
+```python
+class GraphRAGTool(BaseTool):
+ """Tool that queries using both vector and graph-based retrieval."""
+
+ def __init__(self, query_engine):
+ self.query_engine = query_engine
+ self._metadata = ToolMetadata(
+ name="answerquestionswith_graphrag",
+ description="Your tool description here"
+ )
+
+ @property
+ def metadata(self):
+ return self._metadata
+
+ def __call__(self, query_str: str) -> ToolOutput:
+ # ... existing implementation unchanged ...
+ pass
+
+ async def acall(self, input: str) -> ToolOutput:
+ return self.__call__(input)
+```
+
+#### 3c. Create `initialize_vector_index()`
+
+This function contains everything from your original `initialize_global_index()` **except** the GraphRAG-related code:
+
+- LLM and embedding model setup
+- Global Settings configuration
+- Vector index loading or creation (including document processing for cold starts)
+- Vector query engine creation
+- Agent creation with **vector-only** tools
+- `simple_run` method attachment and agent testing
+
+**Key difference from the original**: No GraphRAG components, no Neo4j connection, no community building. The agent is created with only the vector query tool.
+
+The `simple_run` function works without modification because it dynamically looks up the GraphRAG tool:
+```python
+graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
+if graphrag_tool:
+ # use it
+# else: falls through to vector-only path
+```
+
+When GraphRAG isn't initialized yet, `graphrag_tool` is `None` and the code gracefully falls back to the direct response path.
+
+#### 3d. Create `initialize_graphrag_components()`
+
+This function handles everything GraphRAG-related:
+
+```python
+async def initialize_graphrag_components() -> bool:
+ """Initialize GraphRAG components in the background."""
+ set_graphrag_status(initializing=True, ready=False, error=None)
+
+ try:
+ # Get current index and agent from shared state
+ from shared_state import global_workflow_agent as current_agent
+ import shared_state
+ index = shared_state.global_index
+
+ if index is None:
+ raise RuntimeError("Vector index must be initialized first")
+ if current_agent is None:
+ raise RuntimeError("Agent must be initialized first")
+
+ llm = Settings.llm
+
+ # Connect to Neo4j
+ property_graph_store = Neo4jPropertyGraphStore(
+ username=NEO4J_USERNAME,
+ password=NEO4J_PASSWORD,
+ url=NEO4J_URL
+ )
+
+ # Check Neo4j state and create/restore components
+ temp_graph_store = GraphRAGStore(property_graph_store)
+ triplets = temp_graph_store.get_triplets()
+ neo4j_has_data = len(triplets) > 0
+
+ if neo4j_has_data:
+ graph_store, property_graph_index = create_graph_components(
+ llm=llm, force_reindex=False
+ )
+ else:
+ # Get nodes from vector index for potential extraction
+ vector_nodes = []
+ if hasattr(index, 'docstore') and index.docstore:
+ vector_nodes = list(index.docstore.docs.values())
+
+ if not vector_nodes:
+ raise ValueError("No nodes available for GraphRAG indexing")
+
+ # create_graph_components will try cache restore first, then LLM extraction
+ graph_store, property_graph_index = create_graph_components(
+ llm=llm,
+ nodes=vector_nodes,
+ max_paths_per_chunk=10,
+ force_reindex=False # Allow cache restore path
+ )
+
+ # Ensure communities are built
+ if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built:
+ graph_store.build_communities()
+
+ # Create GraphRAG query engine
+ vector_retriever = VectorIndexRetriever(
+ index=index, similarity_top_k=SIMILARITY_TOP_K
+ )
+
+ graphrag_query_engine = create_graphrag_query_engine(
+ vector_retriever=vector_retriever,
+ graph_store=graph_store,
+ llm=llm,
+ similarity_top_k=SIMILARITY_TOP_K
+ )
+
+ # Store in shared state
+ set_graphrag_components(
+ graph_store=graph_store,
+ property_graph_index=property_graph_index,
+ graphrag_query_engine=graphrag_query_engine
+ )
+
+ # *** KEY STEP: Add GraphRAG tool to the EXISTING agent ***
+ graphrag_tool = GraphRAGTool(graphrag_query_engine)
+
+ from shared_state import global_workflow_agent as live_agent
+ if live_agent is not None:
+ live_agent.tools.append(graphrag_tool)
+ log_structured('info', 'Added GraphRAG tool to existing agent', {
+ 'total_tools': len(live_agent.tools),
+ 'tool_names': [t.metadata.name for t in live_agent.tools]
+ })
+
+ set_graphrag_status(ready=True, initializing=False)
+ return True
+
+ except Exception as e:
+ log_structured('error', f'GraphRAG background initialization failed: {e}')
+ set_graphrag_status(ready=False, initializing=False, error=e)
+ return False
+```
+
+**Critical detail**: The GraphRAG tool is added to the agent's tools list dynamically via `live_agent.tools.append(graphrag_tool)`. Because `simple_run` does a dynamic lookup on `global_workflow_agent.tools` every time it runs, the GraphRAG tool becomes available to all subsequent requests without restarting the agent.
+
+#### 3e. Keep `initialize_global_index()` as a backward-compatible wrapper
+
+```python
+async def initialize_global_index() -> bool:
+ """Backward-compatible wrapper: initializes everything synchronously."""
+ vector_success = await initialize_vector_index()
+ if not vector_success:
+ return False
+
+ graphrag_success = await initialize_graphrag_components()
+ if not graphrag_success:
+ log_structured('warning', 'GraphRAG init failed, but vector search is available')
+
+ return True
+```
+
+---
+
+## Change 4: Update Main Startup for Background Init
+
+### File: Main Module (e.g., `main.py`)
+
+#### 4a. Update imports
+
+```python
+from ai_core import initialize_global_index, initialize_vector_index, initialize_graphrag_components
+from shared_state import global_workflow_agent, is_agent_available, get_graphrag_status
+```
+
+#### 4b. Modify `startup_event()` for two-phase startup
+
+```python
+async def startup_event() -> bool:
+ """Phase 1 (sync): MongoDB + vector index. Phase 2 (background): GraphRAG."""
+ log_structured('info', "Application startup sequence initiated.")
+ all_success = True
+
+ # 1. Initialize MongoDB
+ try:
+ if init_mongodb():
+ log_structured('info', "MongoDB initialized successfully.")
+ else:
+ all_success = False
+ except Exception as db_err:
+ log_structured('critical', f"MongoDB init failed: {db_err}")
+ all_success = False
+
+ # 2. Phase 1: Vector index + agent (fast)
+ log_structured('info', "Phase 1: Initializing vector index and agent...")
+ vector_success = await initialize_vector_index()
+
+ if not is_agent_available() or not vector_success:
+ log_structured('critical', "Vector initialization failed")
+ all_success = False
+ else:
+ log_structured('info', "Phase 1 complete: server is ready for vector queries")
+
+ # 3. Phase 2: GraphRAG in background
+ if vector_success:
+ log_structured('info', "Phase 2: Launching GraphRAG initialization in background...")
+
+ async def _background_graphrag_init():
+ try:
+ success = await initialize_graphrag_components()
+ if success:
+ log_structured('info', "GraphRAG background init completed")
+ else:
+ log_structured('warning', "GraphRAG background init failed — vector search still works")
+ except Exception as e:
+ log_structured('error', f"GraphRAG background init error: {e}")
+
+ # Schedule as background task — does NOT block server startup
+ asyncio.ensure_future(_background_graphrag_init())
+
+ log_structured('info', f"Startup complete. Server ready: {all_success}")
+ return all_success
+```
+
+#### 4c. Restructure the main execution block
+
+The old pattern of `asyncio.run(startup_event())` followed by `asyncio.run(serve(app, config))` won't work because background tasks launched by `asyncio.ensure_future()` need to run in the same event loop as the server. Restructure:
+
+```python
+if __name__ == '__main__':
+ from hypercorn.config import Config as HypercornConfig
+ from hypercorn.asyncio import serve as hypercorn_serve
+
+ config = HypercornConfig()
+ # ... config setup ...
+
+ async def run_server_with_startup():
+ """Run startup (with background GraphRAG init) then serve."""
+ await startup_event()
+
+ # Double-check agent
+ if not is_agent_available():
+ log_structured('critical', "Agent unavailable. Forcing re-init...")
+ await initialize_vector_index()
+ if is_agent_available():
+ asyncio.ensure_future(initialize_graphrag_components())
+
+ # Start serving — background GraphRAG init continues in same event loop
+ await hypercorn_serve(app, config)
+
+ try:
+ asyncio.run(run_server_with_startup())
+ except KeyboardInterrupt:
+ log_structured('info', "Server stopped manually.")
+```
+
+**Why this matters**: `asyncio.ensure_future()` schedules a coroutine on the *current* event loop. If you use two separate `asyncio.run()` calls, the background task from the first call is lost when the first event loop closes. By wrapping everything in a single `async def` and calling `asyncio.run()` once, the background GraphRAG init shares the server's event loop and runs concurrently with request handling.
+
+---
+
+## Change 5: Update Routes for Graceful Degradation
+
+### File: Routes Module (e.g., `routes.py`)
+
+#### 5a. Update imports
+
+```python
+from ai_core import initialize_global_index, initialize_vector_index
+from shared_state import ..., get_graphrag_status
+```
+
+#### 5b. Update `/status` endpoint
+
+Add GraphRAG status to the response:
+
+```python
+graphrag_status = get_graphrag_status()
+
+status_data = {
+ # ... existing fields ...
+ 'graphrag_ready': graphrag_status['ready'],
+ 'graphrag_initializing': graphrag_status['initializing'],
+ 'graphrag_error': graphrag_status['error'],
+}
+```
+
+#### 5c. Update on-demand initialization in `/chat`
+
+If the `/chat` endpoint has a fallback that tries to initialize on-demand when the agent is unavailable, change it to use `initialize_vector_index()` instead of `initialize_global_index()` for faster recovery:
+
+```python
+# Old:
+index_success = await initialize_global_index()
+
+# New:
+index_success = await initialize_vector_index()
+```
+
+#### 5d. No changes needed for chat logic
+
+The `/chat` endpoint itself needs no changes for graceful degradation. The agent's `simple_run` method already dynamically checks for GraphRAG tool availability:
+
+```python
+graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
+if graphrag_tool:
+ # Use GraphRAG — only happens after background init completes
+else:
+ # Fall through to vector-only response
+```
+
+---
+
+## Change 6: Docker Compose Documentation
+
+### File: Neo4j Docker Compose (e.g., `docker-compose-neo4j.yml`)
+
+Add a comment at the top explaining volume persistence:
+
+```yaml
+# IMPORTANT: Neo4j data persists in bind-mounted directories (./neo4j/data, etc.).
+# These survive `docker-compose down` and `docker-compose up -d` restarts.
+# However, if you delete the data directory manually, all graph data
+# (extracted triples, entities, relationships) will be lost and must be
+# re-extracted via LLM calls (10-20+ minutes) or restored from the disk cache
+# at index_storage/graphrag_cache/neo4j_triples.pickle.
+```
+
+---
+
+## Verification Checklist
+
+### Test 1: Warm start (Neo4j has data, vector index cached)
+- [ ] Server starts and accepts chat requests within ~1-2 minutes
+- [ ] `/status` shows `graphrag_initializing: true` briefly, then `graphrag_ready: true`
+- [ ] Chat requests work with vector search immediately
+- [ ] GraphRAG tool becomes available after background init completes
+
+### Test 2: Cold Neo4j with caches (Neo4j empty, `index_storage/` intact)
+- [ ] Server starts with vector queries in ~1-2 minutes
+- [ ] Background init restores triples from `neo4j_triples.pickle` (check logs for "Restoring triples from disk cache")
+- [ ] No LLM extraction calls (check logs — should NOT see "Building PropertyGraphIndex via LLM extraction")
+- [ ] GraphRAG becomes available after cache restore + community build
+
+### Test 3: Full cold start (no `index_storage/`, no Neo4j data)
+- [ ] Full rebuild runs (document parsing → vector index → LLM extraction → community building)
+- [ ] `neo4j_triples.pickle` is created after LLM extraction
+- [ ] Community cache files are created after community building
+- [ ] Everything works on subsequent restarts using caches
+
+### Test 4: Degraded mode
+- [ ] Send a chat request before GraphRAG finishes initializing
+- [ ] Response comes back using vector-only search
+- [ ] No errors in response — just missing GraphRAG context
+- [ ] After GraphRAG init completes, subsequent requests use both vector and GraphRAG
+
+### Test 5: Cache integrity
+- [ ] After a successful startup with Neo4j data, verify `neo4j_triples.pickle` exists in `index_storage/graphrag_cache/`
+- [ ] Delete Neo4j data, restart — verify cache restore works
+- [ ] Delete the pickle file, restart with Neo4j data — verify it gets recreated
+
+---
+
+## File Change Summary
+
+| File | Changes |
+|------|---------|
+| Graph RAG integration module | `save_triples_to_cache()`, `load_triples_from_cache()`, 3-branch logic in `create_graph_components()` |
+| Shared state module | `graphrag_ready`/`graphrag_initializing`/`graphrag_error` flags, `set_graphrag_status()`, `get_graphrag_status()` |
+| AI core module | Extract GraphRAG tool class to module level, split into `initialize_vector_index()` + `initialize_graphrag_components()`, keep `initialize_global_index()` as wrapper |
+| Main module | Two-phase startup in `startup_event()`, single `asyncio.run()` wrapping both startup and serve |
+| Routes module | Import new functions, add GraphRAG status to `/status`, use `initialize_vector_index()` for on-demand init |
+| Docker compose | Documentation comment about volume persistence |
diff --git a/graph_rag_integration.py b/graph_rag_integration.py
index 042a33e..af298d5 100644
--- a/graph_rag_integration.py
+++ b/graph_rag_integration.py
@@ -183,6 +183,7 @@ class GraphRAGStore:
CACHE_DIR = Path("index_storage/graphrag_cache")
COMMUNITY_CACHE_FILE = CACHE_DIR / "community_summary.pickle"
ENTITY_INFO_CACHE_FILE = CACHE_DIR / "entity_info.pickle"
+ TRIPLES_CACHE_FILE = CACHE_DIR / "neo4j_triples.pickle"
def __init__(self, property_graph_store):
"""Initialize with a property_graph_store (Neo4j or in-memory)."""
@@ -206,6 +207,90 @@ class GraphRAGStore:
"""Get triplets from the property graph store."""
return self.property_graph_store.get_triplets()
+ def save_triples_to_cache(self):
+ """Save extracted triples (entities + relationships) from Neo4j to a disk cache.
+
+ This allows restoring triples to Neo4j without expensive LLM re-extraction
+ if Neo4j data is lost (e.g., container recreated without volume persistence).
+ """
+ try:
+ triplets = self.get_triplets()
+ if not triplets:
+ log_structured('warning', 'No triplets to cache — Neo4j appears empty')
+ return False
+
+ # Collect unique entities and relations from the triplets
+ entities = {}
+ relations = []
+ for entity1, relation, entity2 in triplets:
+ entities[entity1.name] = entity1
+ entities[entity2.name] = entity2
+ relations.append(relation)
+
+ cache_data = {
+ 'entities': list(entities.values()),
+ 'relations': relations,
+ 'triplet_count': len(triplets),
+ }
+
+ with open(self.TRIPLES_CACHE_FILE, 'wb') as f:
+ pickle.dump(cache_data, f)
+
+ log_structured('info', 'Successfully cached Neo4j triples to disk', {
+ 'entity_count': len(entities),
+ 'relation_count': len(relations),
+ 'triplet_count': len(triplets),
+ 'cache_file': str(self.TRIPLES_CACHE_FILE)
+ })
+ return True
+ except Exception as e:
+ log_structured('error', f'Error saving triples cache: {e}')
+ return False
+
+ def load_triples_from_cache(self):
+ """Load triples from disk cache and restore them to Neo4j.
+
+ Returns True if triples were successfully restored, False otherwise.
+ """
+ if not self.TRIPLES_CACHE_FILE.exists():
+ log_structured('info', 'No triples cache file found at %s', str(self.TRIPLES_CACHE_FILE))
+ return False
+
+ try:
+ with open(self.TRIPLES_CACHE_FILE, 'rb') as f:
+ cache_data = pickle.load(f)
+
+ entities = cache_data.get('entities', [])
+ relations = cache_data.get('relations', [])
+
+ if not entities:
+ log_structured('warning', 'Triples cache file exists but contains no entities')
+ return False
+
+ log_structured('info', 'Restoring triples from disk cache to Neo4j', {
+ 'entity_count': len(entities),
+ 'relation_count': len(relations),
+ 'cached_triplet_count': cache_data.get('triplet_count', 'unknown')
+ })
+
+ # Restore entities (nodes) to Neo4j
+ self.property_graph_store.upsert_nodes(entities)
+ log_structured('info', f'Restored {len(entities)} entity nodes to Neo4j')
+
+ # Restore relations to Neo4j
+ if relations:
+ self.property_graph_store.upsert_relations(relations)
+ log_structured('info', f'Restored {len(relations)} relations to Neo4j')
+
+ # Verify restoration
+ restored_triplets = self.get_triplets()
+ log_structured('info', f'Neo4j now contains {len(restored_triplets)} triplets after cache restore')
+
+ return len(restored_triplets) > 0
+ except Exception as e:
+ log_structured('error', f'Error restoring triples from cache: {e}')
+ return False
+
def generate_community_summary(self, text):
"""Generate summary for a given text using an LLM with handling for large contexts."""
@@ -691,13 +776,18 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind
if has_existing_content and not force_reindex:
log_structured('info', f'Neo4j already contains {len(triplets)} triplets. Skipping indexing.')
-
+
+ # Ensure triples are also cached to disk for future recovery
+ if not graph_store.TRIPLES_CACHE_FILE.exists():
+ log_structured('info', 'Neo4j has data but no triples cache on disk — creating cache now')
+ graph_store.save_triples_to_cache()
+
# Create a minimal PropertyGraphIndex without indexing
property_graph_index = PropertyGraphIndex(
nodes=[], # Empty nodes since we're not indexing
property_graph_store=property_graph_store,
)
-
+
# Build communities from existing data (if not already built)
if not graph_store.communities_built:
log_structured('info', 'Building graph communities from existing Neo4j data')
@@ -708,12 +798,64 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind
log_structured('error', f'Error building communities: {e}')
else:
log_structured('info', 'Communities already built, skipping rebuild')
+ elif not has_existing_content and not force_reindex and graph_store.TRIPLES_CACHE_FILE.exists():
+ # Neo4j is empty but we have a triples cache — restore from cache instead of LLM extraction
+ log_structured('info', 'Neo4j is empty but triples cache exists. Restoring from disk cache (skipping LLM extraction).')
+
+ restored = graph_store.load_triples_from_cache()
+ if restored:
+ log_structured('info', 'Successfully restored triples from cache. Skipping LLM-based extraction.')
+
+ # Create a minimal PropertyGraphIndex (no extraction needed)
+ property_graph_index = PropertyGraphIndex(
+ nodes=[],
+ property_graph_store=property_graph_store,
+ )
+
+ # Build communities from restored data
+ if not graph_store.communities_built:
+ log_structured('info', 'Building graph communities from restored Neo4j data')
+ try:
+ graph_store.build_communities()
+ log_structured('info', 'Communities built successfully from restored data')
+ except Exception as e:
+ log_structured('error', f'Error building communities from restored data: {e}')
+ else:
+ # Cache restore failed — fall through to full LLM extraction
+ log_structured('warning', 'Triples cache restore failed. Falling back to LLM extraction.')
+ if not nodes:
+ raise ValueError("Nodes must be provided for indexing when Neo4j is empty and cache restore fails")
+
+ kg_extractor = GraphRAGExtractor(
+ llm=llm,
+ extract_prompt=KG_TRIPLET_EXTRACT_TMPL,
+ max_paths_per_chunk=max_paths_per_chunk,
+ parse_fn=custom_parse_fn,
+ )
+
+ log_structured('info', 'Building PropertyGraphIndex via LLM extraction', {'node_count': len(nodes)})
+ property_graph_index = PropertyGraphIndex(
+ nodes=nodes,
+ kg_extractors=[kg_extractor],
+ property_graph_store=property_graph_store,
+ show_progress=True,
+ )
+
+ # Cache the newly extracted triples for future recovery
+ graph_store.save_triples_to_cache()
+
+ log_structured('info', 'Building graph communities')
+ try:
+ graph_store.build_communities()
+ log_structured('info', 'Communities built successfully')
+ except Exception as e:
+ log_structured('error', f'Error building communities: {e}')
else:
- # Need to perform indexing
+ # Need to perform full LLM-based indexing
if not nodes:
log_structured('error', 'No nodes provided for indexing and Neo4j is empty or force_reindex=True')
raise ValueError("Nodes must be provided for indexing when Neo4j is empty or force_reindex=True")
-
+
# Create the knowledge graph extractor
kg_extractor = GraphRAGExtractor(
llm=llm,
@@ -721,12 +863,10 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind
max_paths_per_chunk=max_paths_per_chunk,
parse_fn=custom_parse_fn,
)
-
+
if has_existing_content and force_reindex:
log_structured('info', 'Force reindexing requested. Clearing existing Neo4j data.')
try:
- # Try to clear the graph using Neo4j's native query
- # Note: This requires the Neo4j APOC plugin to be installed
from neo4j import GraphDatabase
driver = GraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))
with driver.session() as session:
@@ -735,16 +875,19 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind
log_structured('info', 'Successfully cleared Neo4j database')
except Exception as e:
log_structured('warning', f'Error clearing Neo4j database: {e}. Proceeding with indexing anyway.')
-
- # Build the property graph index
- log_structured('info', 'Building PropertyGraphIndex', {'node_count': len(nodes)})
+
+ # Build the property graph index (this is the expensive LLM extraction step)
+ log_structured('info', 'Building PropertyGraphIndex via LLM extraction', {'node_count': len(nodes)})
property_graph_index = PropertyGraphIndex(
nodes=nodes,
kg_extractors=[kg_extractor],
property_graph_store=property_graph_store,
show_progress=True,
)
-
+
+ # Cache the newly extracted triples for future recovery
+ graph_store.save_triples_to_cache()
+
# Build communities
log_structured('info', 'Building graph communities')
try:
@@ -752,7 +895,7 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind
log_structured('info', 'Communities built successfully')
except Exception as e:
log_structured('error', f'Error building communities: {e}')
-
+
return graph_store, property_graph_index
def create_graphrag_query_engine(vector_retriever, graph_store, llm, similarity_top_k=20):
diff --git a/main.py b/main.py
index 18dbbe5..0d037e8 100644
--- a/main.py
+++ b/main.py
@@ -20,8 +20,8 @@ from config import (
)
from utils import logger, log_structured
from json_utils import CustomJSONProvider
-from ai_core import initialize_global_index # Import the initialization function
-from shared_state import global_workflow_agent, is_agent_available # Import shared state
+from ai_core import initialize_global_index, initialize_vector_index, initialize_graphrag_components # Import initialization functions
+from shared_state import global_workflow_agent, is_agent_available, get_graphrag_status # Import shared state
from routes import register_routes
from init_mongodb import init_mongodb # Your MongoDB initialization script
@@ -59,43 +59,61 @@ log_structured('info', "Flask routes registered.")
# --- Startup Function ---
async def startup_event() -> bool:
"""Tasks to run when the application starts.
-
+
+ Phase 1 (synchronous): MongoDB + vector index + agent (~1-2 min).
+ Phase 2 (background): GraphRAG components (Neo4j, triples, communities).
+
Returns:
- bool: True if all startup tasks completed successfully, False otherwise
+ bool: True if Phase 1 completed successfully (server is usable).
"""
log_structured('info', "Application startup sequence initiated.")
all_success = True
- # 1. Initialize MongoDB Connection & Schema (using your script)
+ # 1. Initialize MongoDB Connection & Schema
log_structured('info', "Initializing MongoDB connection...")
- mongo_success = False
try:
if init_mongodb():
log_structured('info', "MongoDB initialized successfully.")
- mongo_success = True
else:
log_structured('warning', "MongoDB initialization script finished, but reported issues.")
all_success = False
except Exception as db_err:
log_structured('critical', "FATAL: MongoDB initialization failed.", {'error': str(db_err)})
all_success = False
- # We'll continue in a degraded state
- # 2. Initialize Global AI Index and Agent
- log_structured('info', "Initializing global AI index and agent...")
- index_success = await initialize_global_index()
-
- # Explicitly check the status after initialization
+ # 2. Phase 1: Initialize vector index and agent (fast path)
+ log_structured('info', "Phase 1: Initializing vector index and agent...")
+ vector_success = await initialize_vector_index()
+
if not is_agent_available():
- log_structured('critical', "After initialize_global_index, global_workflow_agent is still unavailable, even though function may have reported success")
+ log_structured('critical', "After initialize_vector_index, agent is still unavailable")
all_success = False
- elif not index_success:
- log_structured('warning', "AI initialization reported failure, but will continue in degraded state")
+ elif not vector_success:
+ log_structured('warning', "Vector initialization reported failure")
all_success = False
else:
- log_structured('info', "AI initialization successful, global_workflow_agent is available")
+ log_structured('info', "Phase 1 complete: vector index and agent are available")
- log_structured('info', f"Application startup sequence complete. Overall success: {all_success}")
+ # 3. Phase 2: Launch GraphRAG initialization as a background task
+ if vector_success:
+ log_structured('info', "Phase 2: Launching GraphRAG initialization in background...")
+
+ async def _background_graphrag_init():
+ try:
+ success = await initialize_graphrag_components()
+ if success:
+ log_structured('info', "Background GraphRAG initialization completed successfully")
+ else:
+ log_structured('warning', "Background GraphRAG initialization failed — vector search still works")
+ except Exception as e:
+ log_structured('error', f"Background GraphRAG initialization error: {e}")
+
+ # Schedule as a background task — does not block server startup
+ asyncio.ensure_future(_background_graphrag_init())
+ else:
+ log_structured('warning', "Skipping GraphRAG background init because vector init failed")
+
+ log_structured('info', f"Application startup sequence complete. Server ready: {all_success}")
return all_success
@@ -144,25 +162,29 @@ if __name__ == '__main__':
log_structured('info', f"Starting Hypercorn server on {SERVER_HOST}:{SERVER_PORT}")
log_structured('info', f"Reload mode: {'Enabled' if USE_RELOADER else 'Disabled'}")
- # Execute startup task before running the server
- log_structured('info', "Manually executing startup sequence before server start")
- startup_success = asyncio.run(startup_event())
-
- # Double-check that the agent is initialized
- if not is_agent_available():
- log_structured('critical', "After startup, global_workflow_agent is still unavailable. Forcing re-initialization...")
- # Try once more to initialize
- index_success = asyncio.run(initialize_global_index())
- if not index_success or not is_agent_available():
- log_structured('critical', "Emergency initialization also failed. Server will run but chat functionality will be impaired.")
- else:
- log_structured('info', "Emergency initialization succeeded.")
+ async def run_server_with_startup():
+ """Run startup (with background GraphRAG init) then serve."""
+ # Phase 1 runs synchronously, Phase 2 launches as background task
+ startup_success = await startup_event()
+
+ # Double-check agent is initialized
+ if not is_agent_available():
+ log_structured('critical', "After startup, agent is still unavailable. Forcing re-initialization...")
+ vector_success = await initialize_vector_index()
+ if not vector_success or not is_agent_available():
+ log_structured('critical', "Emergency initialization failed. Server will run but chat will be impaired.")
+ else:
+ log_structured('info', "Emergency initialization succeeded.")
+ # Also try GraphRAG in background
+ asyncio.ensure_future(initialize_graphrag_components())
+
+ # Start serving — background GraphRAG init continues in same event loop
+ await hypercorn_serve(app, config)
- # Run the server
try:
- asyncio.run(hypercorn_serve(app, config))
+ asyncio.run(run_server_with_startup())
except KeyboardInterrupt:
- log_structured('info', "Server stopped manually (KeyboardInterrupt).")
+ log_structured('info', "Server stopped manually (KeyboardInterrupt).")
except Exception as run_err:
- log_structured('critical', "Hypercorn server failed to run.", {'error': str(run_err)})
- sys.exit(1)
\ No newline at end of file
+ log_structured('critical', "Hypercorn server failed to run.", {'error': str(run_err)})
+ sys.exit(1)
\ No newline at end of file
diff --git a/neo4j/docker-compose-neo4j.yml b/neo4j/docker-compose-neo4j.yml
index 40fb539..4b417e3 100644
--- a/neo4j/docker-compose-neo4j.yml
+++ b/neo4j/docker-compose-neo4j.yml
@@ -1,5 +1,12 @@
version: '3'
+# IMPORTANT: Neo4j data persists in bind-mounted directories (./neo4j/data, etc.).
+# These survive `docker-compose down` and `docker-compose up -d` restarts.
+# However, if you delete the ./neo4j/data directory manually, all graph data
+# (extracted triples, entities, relationships) will be lost and must be
+# re-extracted via LLM calls (10-20+ minutes) or restored from the disk cache
+# at index_storage/graphrag_cache/neo4j_triples.pickle.
+
services:
neo4j:
image: neo4j:latest
diff --git a/routes.py b/routes.py
index 5e554fc..df72f3e 100644
--- a/routes.py
+++ b/routes.py
@@ -19,8 +19,8 @@ from werkzeug.utils import secure_filename # Keep if file uploads are reintroduc
from utils import logger, log_structured, allowed_file # Keep allowed_file if uploads return
from json_utils import CustomJSONEncoder # Used indirectly by make_response/jsonify with custom provider
from session_manager import get_or_create_session_state, clear_chat_state_cache
-from ai_core import initialize_global_index # Import initialization function
-from shared_state import global_workflow_agent, global_index, is_agent_available # Import shared state variables
+from ai_core import initialize_global_index, initialize_vector_index # Import initialization functions
+from shared_state import global_workflow_agent, global_index, is_agent_available, get_graphrag_status # Import shared state variables
from document_generator import create_brief_docx
from config import IMAGES_DIRECTORY, NETFLIX_DOCS_FOLDER, BASE_DIR, SUPPORTING_FILES_DIR # Import necessary configs
from llama_index.core.tools import ToolOutput # Import ToolOutput for type checking
@@ -122,10 +122,10 @@ def register_routes(app: Flask):
if not is_agent_available():
log_structured('error', 'Global workflow agent is not initialized')
- # Try to initialize it on-demand if not available
+ # Try to initialize it on-demand if not available (vector-only for speed)
try:
- log_structured('info', 'Attempting to initialize global index and agent on-demand')
- index_success = await initialize_global_index()
+ log_structured('info', 'Attempting to initialize vector index and agent on-demand')
+ index_success = await initialize_vector_index()
# Let initialization complete, then check the global variable
if index_success:
@@ -616,13 +616,18 @@ def register_routes(app: Flask):
# SUPER SIMPLIFIED - ALWAYS RETURN INITIALIZED
log_structured('info', 'Status check: ALWAYS returning initialized=true')
-
+
+ graphrag_status = get_graphrag_status()
+
status_data = {
'global_status': 'initialized',
'initialized': True,
'is_initialized': True,
'timestamp': datetime.now().isoformat(),
- 'override': 'FORCE_INITIALIZED'
+ 'override': 'FORCE_INITIALIZED',
+ 'graphrag_ready': graphrag_status['ready'],
+ 'graphrag_initializing': graphrag_status['initializing'],
+ 'graphrag_error': graphrag_status['error'],
}
if session_id:
@@ -721,9 +726,9 @@ def register_routes(app: Flask):
# This is a development-only endpoint; disable in production
if os.environ.get("PRODUCTION", "false").lower() == "true":
return jsonify({'error': 'Debug endpoints not available in production'}), 403
-
+
from ai_core import initialize_global_index
-
+
try:
# Log current state before reinitializing
log_structured('info', 'Reinitializing global agent', {
@@ -731,8 +736,8 @@ def register_routes(app: Flask):
'agent_none_before': global_workflow_agent is None,
'index_none_before': global_index is None
})
-
- # Attempt to reinitialize
+
+ # Attempt full reinitialize (vector + GraphRAG)
success = await initialize_global_index()
# Import fresh state after initialization
diff --git a/shared_state.py b/shared_state.py
index b3d50ae..df82e89 100644
--- a/shared_state.py
+++ b/shared_state.py
@@ -15,6 +15,11 @@ global_graph_store = None
global_property_graph_index = None
global_graphrag_query_engine = None
+# GraphRAG initialization status
+graphrag_ready = False
+graphrag_initializing = False
+graphrag_error = None
+
# Helper to set the global agent
def set_global_agent(agent):
"""Set the global agent instance."""
@@ -73,6 +78,33 @@ def set_graphrag_components(graph_store, property_graph_index, graphrag_query_en
log_structured('info', f'GraphRAG components set successfully: {components_set}')
return components_set
+# Helper to set/get GraphRAG initialization status
+def set_graphrag_status(ready=None, initializing=None, error=None):
+ """Update GraphRAG initialization status flags."""
+ global graphrag_ready, graphrag_initializing, graphrag_error
+ from utils import log_structured
+
+ if ready is not None:
+ graphrag_ready = ready
+ if initializing is not None:
+ graphrag_initializing = initializing
+ if error is not None:
+ graphrag_error = error
+
+ log_structured('info', 'GraphRAG status updated', {
+ 'ready': graphrag_ready,
+ 'initializing': graphrag_initializing,
+ 'error': str(graphrag_error) if graphrag_error else None
+ })
+
+def get_graphrag_status():
+ """Get current GraphRAG initialization status."""
+ return {
+ 'ready': graphrag_ready,
+ 'initializing': graphrag_initializing,
+ 'error': str(graphrag_error) if graphrag_error else None
+ }
+
# Helper to get agent status
def is_agent_available():
"""