Speed up GraphRAG startup with triple caching and background init

Server now starts serving vector-only queries in ~1-2 minutes instead of
30-60 minutes. GraphRAG initializes in a background task and its tool is
dynamically added to the agent when ready.

- Cache extracted triples to disk (neo4j_triples.pickle) so Neo4j can be
  repopulated without expensive LLM re-extraction
- Split initialize_global_index() into initialize_vector_index() (fast) and
  initialize_graphrag_components() (background)
- Add graphrag_ready/graphrag_initializing status flags to shared_state
- Launch GraphRAG init as asyncio background task in main.py
- Report GraphRAG status in /status endpoint for frontend awareness
- Add comprehensive migration guide for applying to other projects

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
michael 2026-02-23 17:33:19 -06:00
parent fc22a9712c
commit fe0d881341
7 changed files with 1318 additions and 430 deletions

View file

@ -82,9 +82,10 @@ nest_asyncio.apply()
# --- Global AI State ---
# Import shared state to ensure all modules access the same instances
from shared_state import (
global_index, global_workflow_agent,
global_index, global_workflow_agent,
global_graph_store, global_property_graph_index, global_graphrag_query_engine,
set_global_agent, set_global_index, set_graphrag_components
set_global_agent, set_global_index, set_graphrag_components,
set_graphrag_status
)
@ -720,31 +721,96 @@ async def process_documents_in_directory(directory: str, session_id: Optional[st
except Exception as close_err: log_structured('error', 'Error closing parser_images client', {'error': str(close_err)})
# --- Global Index Initialization ---
async def initialize_global_index() -> bool:
"""Initialize the global index from Netflix documents at startup."""
# Use shared state instead of module-level globals
# --- GraphRAG Tool Class (module-level for reuse) ---
class GraphRAGTool(BaseTool):
"""Tool that queries Netflix marketing materials using both vector and graph-based retrieval."""
def __init__(self, query_engine):
self.query_engine = query_engine
self._metadata = ToolMetadata(
name="answerquestionswith_graphrag",
description="USE THIS TOOL FOR ALL QUERIES - Queries Netflix marketing materials using both vector and graph-based retrieval for more comprehensive answers. Use this for complex questions that need context from multiple related documents."
)
@property
def metadata(self):
return self._metadata
def __call__(self, query_str: str) -> ToolOutput:
"""Run query through GraphRAG and generate synthesized answer."""
log_structured('info', 'GraphRAG Tool: Starting dual retrieval', {'query': query_str})
try:
retrieval_result = self.query_engine.custom_query(query_str)
final_answer = generate_final_answer(query_str, retrieval_result, self.query_engine.llm)
log_message = {
'vector_context_length': len(retrieval_result.get('vector_context', '')),
'graphrag_context_length': len(retrieval_result.get('graphrag_context', '')),
'vector_nodes_count': len(retrieval_result.get('vector_nodes', [])),
'community_ids': retrieval_result.get('community_ids', [])
}
log_structured('info', 'GraphRAG Tool: Retrieval complete', log_message)
vector_nodes = retrieval_result.get('vector_nodes', [])
class NodeWrapper:
def __init__(self, nodes):
self.source_nodes = nodes
tool_output = ToolOutput(
content=final_answer,
tool_name="GraphRAG",
raw_output=NodeWrapper(vector_nodes),
raw_input={"query": query_str, "source": "graphrag", "retrieval_stats": log_message}
)
log_structured('debug', 'GraphRAG Tool: Including image metadata in response',
{'node_count': len(vector_nodes)})
return tool_output
except Exception as graphrag_err:
log_structured('error', f'Error in GraphRAG tool: {graphrag_err}',
{'traceback': traceback.format_exc()})
return ToolOutput(
content=f"I encountered an error while retrieving information: {str(graphrag_err)}. Please try again or use the standard query tool.",
tool_name="GraphRAG",
raw_input={"query": query_str},
raw_output={"error": str(graphrag_err)},
is_error=True
)
async def acall(self, input: str) -> ToolOutput:
"""Async version of __call__."""
return self.__call__(input)
# --- Phase 1: Vector Index Initialization (fast path) ---
async def initialize_vector_index() -> bool:
"""Initialize the vector index and agent with vector-only tools.
This is the fast path (~1-2 minutes) that makes the server usable.
GraphRAG components are initialized separately in the background.
Returns:
bool: True if vector index and agent were initialized successfully.
"""
try:
# --- Configure LLM and Embedding Model ---
# Check for real API keys
openai_key = os.environ.get("OPENAI_API_KEY", "")
if not openai_key:
log_structured('critical', 'No OpenAI API key provided. Make sure OPENAI_API_KEY is set in your .env file.')
return False
# Log the first few characters of the key for debugging (important to verify it's loaded)
key_preview = openai_key[:4] + "..." if len(openai_key) > 4 else "invalid"
log_structured('info', f'Using OpenAI API key starting with: {key_preview}')
# Normal initialization with real API keys
try:
llm = LlamaOpenAI(
model=LLM_MODEL,
temperature=LLM_TEMPERATURE,
timeout=LLM_TIMEOUT
)
# Test the LLM with a simple prompt to ensure it's working
test_prompt = "Say 'API key is working' if you can read this."
_ = llm.complete(test_prompt)
log_structured('info', 'Successfully tested LLM API connection')
@ -757,95 +823,26 @@ async def initialize_global_index() -> bool:
Settings.llm = llm
Settings.embed_model = embed_model
Settings.callback_manager = CallbackManager([token_counter])
# Settings.chunk_size = NODE_PARSER_CHUNK_SIZE # If using SentenceSplitter
# Settings.chunk_overlap = NODE_PARSER_CHUNK_OVERLAP
# Use Semantic Splitter (more robust, less config needed here)
node_parser = SemanticSplitterNodeParser(
buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model # Adjust threshold
buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model
)
Settings.node_parser = node_parser
# Alternatively, configure SentenceSplitter globally
# Settings.node_parser = SentenceSplitter(
# chunk_size=NODE_PARSER_CHUNK_SIZE,
# chunk_overlap=NODE_PARSER_CHUNK_OVERLAP,
# # ... other SentenceSplitter params
# )
# --- Load or Build Index ---
# --- Load or Build Vector Index ---
if INDEX_PERSIST_PATH.exists():
log_structured('info', f'Loading existing index from {INDEX_PERSIST_PATH}')
storage_context = StorageContext.from_defaults(persist_dir=str(INDEX_PERSIST_PATH))
index = load_index_from_storage(storage_context)
# Save to shared state
set_global_index(index)
log_structured('info', 'Successfully loaded existing index')
# Attempt to load or recreate GraphRAG components
try:
log_structured('info', 'Attempting to recreate GraphRAG components from loaded index')
# First, try to connect to Neo4j to check if it has data
from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore
property_graph_store = Neo4jPropertyGraphStore(
username=NEO4J_USERNAME,
password=NEO4J_PASSWORD,
url=NEO4J_URL
)
log_structured('info', 'Successfully connected to Neo4j database')
# Create temporary GraphRAGStore to check for existing data
temp_graph_store = GraphRAGStore(property_graph_store)
triplets = temp_graph_store.get_triplets()
neo4j_has_data = len(triplets) > 0
if neo4j_has_data:
# Neo4j already has data, just use it
log_structured('info', f'Neo4j contains {len(triplets)} triplets. Using existing data.')
graph_store, property_graph_index = create_graph_components(
llm=llm,
force_reindex=False # Use existing Neo4j data
)
log_structured('info', 'GraphRAG components loaded from existing Neo4j data')
else:
# Neo4j is empty, need to extract nodes from the vector index to populate it
log_structured('info', 'Neo4j is empty. Extracting nodes from vector index for GraphRAG indexing.')
# Get all nodes from the vector index
from llama_index.core.schema import TextNode
vector_nodes = []
# Extract nodes from the index's docstore
if hasattr(index, 'docstore') and index.docstore:
docstore_nodes = list(index.docstore.docs.values())
vector_nodes.extend(docstore_nodes)
log_structured('info', f'Retrieved {len(docstore_nodes)} nodes from index docstore')
# Handle cases where we can't get nodes directly
if not vector_nodes:
log_structured('warning', 'Could not retrieve nodes from vector index. GraphRAG indexing will be skipped.')
raise ValueError("No nodes could be retrieved from the vector index for GraphRAG indexing")
# Now create GraphRAG components with the retrieved nodes
graph_store, property_graph_index = create_graph_components(
llm=llm,
nodes=vector_nodes,
max_paths_per_chunk=10,
force_reindex=True # Force indexing since Neo4j is empty
)
log_structured('info', f'GraphRAG components created with {len(vector_nodes)} nodes from vector index')
except Exception as e:
log_structured('warning', f'Error recreating GraphRAG components: {e}. Continuing without GraphRAG.')
graph_store = None
else:
log_structured('info', f'No index found at {INDEX_PERSIST_PATH}. Creating new index from {NETFLIX_DOCS_FOLDER}')
if not NETFLIX_DOCS_FOLDER.exists() or not any(NETFLIX_DOCS_FOLDER.iterdir()):
log_structured('error', f'Netflix documents folder is missing or empty: {NETFLIX_DOCS_FOLDER}')
return False
log_structured('error', f'Netflix documents folder is missing or empty: {NETFLIX_DOCS_FOLDER}')
return False
# Process documents using LlamaParse
documents = await process_documents_in_directory(str(NETFLIX_DOCS_FOLDER), session_id="global_init")
if not documents:
log_structured('error', f'No documents processed from {NETFLIX_DOCS_FOLDER}. Index creation aborted.')
return False
@ -855,138 +852,42 @@ async def initialize_global_index() -> bool:
'sample_doc_metadata': documents[0].metadata if documents else None
})
# Build the index using the globally configured Settings (incl. node_parser)
index = VectorStoreIndex.from_documents(
documents=documents,
show_progress=True,
# service_context=service_context, # Old way, use Settings now
)
# Save to shared state
set_global_index(index)
# Persist the index
index.storage_context.persist(persist_dir=str(INDEX_PERSIST_PATH))
log_structured('info', f'Index created and persisted to {INDEX_PERSIST_PATH}')
# --- Create GraphRAG components ---
log_structured('info', 'Starting GraphRAG component creation')
try:
# Get nodes from the index's docstore for GraphRAG
docstore_nodes = []
if hasattr(index, 'docstore') and index.docstore:
docstore_nodes = list(index.docstore.docs.values())
log_structured('info', f'Retrieved {len(docstore_nodes)} nodes from index docstore for GraphRAG')
# If docstore is empty or doesn't exist, use the original documents
nodes_for_graph = docstore_nodes if docstore_nodes else documents
log_structured('info', f'Using {len(nodes_for_graph)} nodes for GraphRAG indexing')
# Create GraphRAG components
graph_store, property_graph_index = create_graph_components(
llm=llm,
nodes=nodes_for_graph,
max_paths_per_chunk=10,
force_reindex=True # Force indexing for new index creation
)
log_structured('info', 'GraphRAG components created successfully')
except Exception as graph_err:
log_structured('error', f'Error creating GraphRAG components: {graph_err}',
{'traceback': traceback.format_exc()})
# Continue without GraphRAG if there's an error
# --- Create Retriever and Query Engine ---
# --- Create Retriever and Query Engine (vector-only) ---
vector_store_info = VectorStoreInfo(
content_info="Netflix marketing reference materials, including GPD Key Art Playbook and supporting documents.",
metadata_info=[
MetadataInfo(name="filename", type="str", description="Filename of the source document"),
MetadataInfo(name="source_page", type="int", description="Approximate page number in the source document"),
MetadataInfo(name="image_paths", type="list[str]", description="List of image filenames associated with this chunk"),
# Add other relevant metadata fields here
],
)
all_retriever = VectorIndexAutoRetriever(
index=index, # Use the local variable, not global_index
index=index,
vector_store_info=vector_store_info,
similarity_top_k=SIMILARITY_TOP_K,
verbose=True # Enable verbose logging for retriever
verbose=True
)
# Create standard vector-based query engine
all_query_engine = RetrieverQueryEngine.from_args( # Use from_args for clarity
all_query_engine = RetrieverQueryEngine.from_args(
retriever=all_retriever,
response_synthesizer=get_response_synthesizer(
response_mode=ResponseMode.COMPACT, # Or REFINE, TREE_SUMMARIZE etc.
# service_context=service_context # Old way
response_mode=ResponseMode.COMPACT,
),
node_postprocessors=[
SimilarityPostprocessor(similarity_cutoff=SIMILARITY_CUTOFF),
],
# service_context=service_context # Old way
)
# Create GraphRAG query engine if components were created successfully
graphrag_query_engine = None
graph_store = locals().get('graph_store', None)
property_graph_index = locals().get('property_graph_index', None)
if graph_store is not None and property_graph_index is not None:
try:
# Ensure graph communities are built before creating query engine
if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built:
log_structured('info', 'Building graph communities before creating query engine')
try:
# Use gpt-4.1-mini model for community summaries (set in GraphRAGStore)
# The build_communities() method will first try to load from cache
# and will only rebuild and re-cache if cache loading fails
# It also tracks if communities are already built to avoid duplicate work
graph_store.build_communities()
log_structured('info', 'Communities built successfully (loaded from cache or built new)')
except Exception as comm_err:
log_structured('error', f'Error building communities: {comm_err}',
{'traceback': traceback.format_exc()})
# Continue with query engine creation even if communities failed
# Create a basic VectorIndexRetriever for GraphRAG
vector_retriever = VectorIndexRetriever(
index=index,
similarity_top_k=SIMILARITY_TOP_K
)
# Create the GraphRAG query engine - ensure all required fields are passed directly
try:
graphrag_query_engine = create_graphrag_query_engine(
vector_retriever=vector_retriever,
graph_store=graph_store,
llm=llm,
similarity_top_k=SIMILARITY_TOP_K
)
log_structured('info', 'GraphRAG query engine created successfully')
except Exception as e:
log_structured('error', f'Error creating GraphRAG query engine: {e}', {'traceback': traceback.format_exc()})
# Create a direct instance without using the factory function as a fallback
from graph_rag_integration import GraphRAGQueryEngine
graphrag_query_engine = GraphRAGQueryEngine(
vector_retriever=vector_retriever,
graph_store=graph_store,
llm=llm,
similarity_top_k=SIMILARITY_TOP_K
)
# Store GraphRAG components in shared state
set_graphrag_components(
graph_store=graph_store,
property_graph_index=property_graph_index,
graphrag_query_engine=graphrag_query_engine
)
log_structured('info', 'GraphRAG query engine created successfully')
except Exception as graph_engine_err:
log_structured('error', f'Error creating GraphRAG query engine: {graph_engine_err}',
{'traceback': traceback.format_exc()})
# Continue without GraphRAG query engine if there's an error
# --- Create Query Engine Tools ---
# --- Create Query Engine Tools (vector-only initially) ---
query_engine_tools_react = [
QueryEngineTool(
query_engine=all_query_engine,
@ -996,118 +897,30 @@ async def initialize_global_index() -> bool:
),
),
]
# Add GraphRAG tool if available
if graphrag_query_engine is not None:
class GraphRAGTool(BaseTool):
def __init__(self, query_engine):
self.query_engine = query_engine
self._metadata = ToolMetadata(
name="answerquestionswith_graphrag",
description="USE THIS TOOL FOR ALL QUERIES - Queries Netflix marketing materials using both vector and graph-based retrieval for more comprehensive answers. Use this for complex questions that need context from multiple related documents."
)
@property
def metadata(self):
return self._metadata
def __call__(self, query_str: str) -> ToolOutput:
"""Run query through GraphRAG and generate synthesized answer."""
from shared_state import global_graphrag_query_engine
log_structured('info', 'GraphRAG Tool: Starting dual retrieval', {'query': query_str})
try:
# Get both vector and GraphRAG retrieval results
retrieval_result = self.query_engine.custom_query(query_str)
# Generate synthesized answer
final_answer = generate_final_answer(query_str, retrieval_result, self.query_engine.llm)
# Prepare the tool output
log_message = {
'vector_context_length': len(retrieval_result.get('vector_context', '')),
'graphrag_context_length': len(retrieval_result.get('graphrag_context', '')),
'vector_nodes_count': len(retrieval_result.get('vector_nodes', [])),
'community_ids': retrieval_result.get('community_ids', [])
}
log_structured('info', 'GraphRAG Tool: Retrieval complete', log_message)
# Extract vector nodes and create a properly structured raw_output that includes source_nodes
# This allows the routes.py code to extract images from these nodes
vector_nodes = retrieval_result.get('vector_nodes', [])
# Create a wrapper object that mimics the structure expected by routes.py for image extraction
class NodeWrapper:
def __init__(self, nodes):
self.source_nodes = nodes
# Preserve the original retrieval_result but add the source_nodes in the expected format
modified_raw_output = retrieval_result.copy()
modified_raw_output['source_nodes'] = vector_nodes
# Add a source_nodes property that routes.py will look for
tool_output = ToolOutput(
content=final_answer,
tool_name="GraphRAG",
raw_output=NodeWrapper(vector_nodes),
raw_input={"query": query_str, "source": "graphrag", "retrieval_stats": log_message}
)
log_structured('debug', 'GraphRAG Tool: Including image metadata in response',
{'node_count': len(vector_nodes)})
return tool_output
except Exception as graphrag_err:
log_structured('error', f'Error in GraphRAG tool: {graphrag_err}',
{'traceback': traceback.format_exc()})
return ToolOutput(
content=f"I encountered an error while retrieving information: {str(graphrag_err)}. Please try again or use the standard query tool.",
tool_name="GraphRAG",
raw_input={"query": query_str},
raw_output={"error": str(graphrag_err)},
is_error=True
)
async def acall(self, input: str) -> ToolOutput:
"""Async version of __call__."""
return self.__call__(input)
# Create the GraphRAG tool instance and add it to tools
graphrag_tool = GraphRAGTool(graphrag_query_engine)
query_engine_tools_react.append(graphrag_tool)
log_structured('info', 'Added GraphRAG tool to query engine tools')
# --- Initialize Global Workflow Agent ---
# We're now using shared_state rather than module globals
try:
# Create the agent instance
agent = ReActAgent2(
llm=llm, # Use the LLM configured via Settings
llm=llm,
tools=query_engine_tools_react,
memory=ChatMemoryBuffer.from_defaults(llm=llm, token_limit=4096), # Give memory its own LLM ref
verbose=True, # Enable agent verbose logging
memory=ChatMemoryBuffer.from_defaults(llm=llm, token_limit=4096),
verbose=True,
timeout=AGENT_TIMEOUT,
llm_timeout=LLM_TIMEOUT,
tool_timeout=TOOL_EXECUTION_TIMEOUT
)
# Store in shared state
set_global_agent(agent)
log_structured('info', 'Agent initialized successfully')
log_structured('info', 'Agent initialized with vector-only tools')
except Exception as agent_err:
log_structured('critical', f'Failed to initialize agent: {str(agent_err)}', {'error': str(agent_err)})
return False # Signal failure
return False
try:
# Define a simpler version of the run method
async def simple_run(query_text):
"""Simple version that doesn't rely on complex workflow steps."""
# Import from shared state to ensure we use the current global agent
from shared_state import global_workflow_agent
if not global_workflow_agent:
log_structured('critical', 'Agent is None in simple_run - this should never happen')
return {
@ -1115,44 +928,39 @@ async def initialize_global_index() -> bool:
"sources": [],
"reasoning": []
}
try:
# Don't reset memory - we want to preserve conversation context between requests
# Only reset sources to track new sources for this specific response
global_workflow_agent.sources = []
# 1. Add the user query to memory
user_msg = ChatMessage(role="user", content=str(query_text))
global_workflow_agent.memory.put(user_msg)
# 2. Format the chat history
chat_history = global_workflow_agent.memory.get()
llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history)
# 3. Get LLM response
response = await asyncio.wait_for(
global_workflow_agent.llm.achat(llm_input),
timeout=global_workflow_agent.llm_timeout
)
reasoning_step = global_workflow_agent.output_parser.parse(response.message.content)
# Add detailed logging
log_structured('debug', 'Parsed reasoning step', {
'step_type': type(reasoning_step).__name__,
'has_response': hasattr(reasoning_step, 'response'),
'has_action': hasattr(reasoning_step, 'action'),
'has_action': hasattr(reasoning_step, 'action'),
'action': getattr(reasoning_step, 'action', None),
'action_input': getattr(reasoning_step, 'action_input', None),
'raw_content': response.message.content[:200] # First 200 chars for debugging
'raw_content': response.message.content[:200]
})
# 4. Process the response (simplified)
# Force tool usage for all queries - don't allow direct responses
# 4. Process the response
if hasattr(reasoning_step, 'response') and reasoning_step.response:
# If LLM gave a direct response, we need to force tool usage instead
log_structured('debug', 'LLM provided direct response, forcing tool usage for image retrieval')
# Force tool execution by calling GraphRAG directly
# Try GraphRAG tool first (dynamically checks current tools list)
graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
if graphrag_tool:
try:
@ -1161,10 +969,7 @@ async def initialize_global_index() -> bool:
timeout=global_workflow_agent.tool_timeout
)
global_workflow_agent.sources.append(tool_output)
# Use the tool's response instead of the direct LLM response
response_text = str(tool_output.content) if tool_output and tool_output.content else str(reasoning_step.response)
assistant_msg = ChatMessage(role="assistant", content=response_text)
global_workflow_agent.memory.put(assistant_msg)
return {
@ -1174,29 +979,26 @@ async def initialize_global_index() -> bool:
}
except Exception as e:
log_structured('error', f'Error forcing GraphRAG tool execution: {str(e)}')
# Fall back to direct response if tool fails
# Fallback to cleaning the direct response
response_text = str(reasoning_step.response)
# Check for common thinking patterns and remove them
import re # Import re within the function scope to ensure it's available
import re
thinking_patterns = [
r'(?i)^.*?thinking:.*?\n', # Remove lines starting with "Thinking:"
r'(?i)<thinking>.*?</thinking>', # Remove XML-like thinking tags
r'(?i)\[thinking\].*?\[/thinking\]', # Remove bracket thinking tags
r'(?i)I\'m thinking:.*?\n', # Remove "I'm thinking:" sections
r'(?i)Let me think.*?\n', # Remove "Let me think" sections
r'(?i)Thought:.*?Answer:', # Remove "Thought: ... Answer:" pattern
r'(?i)^Answer:\s*' # Remove just the "Answer:" prefix
r'(?i)^.*?thinking:.*?\n',
r'(?i)<thinking>.*?</thinking>',
r'(?i)\[thinking\].*?\[/thinking\]',
r'(?i)I\'m thinking:.*?\n',
r'(?i)Let me think.*?\n',
r'(?i)Thought:.*?Answer:',
r'(?i)^Answer:\s*'
]
for pattern in thinking_patterns:
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
# Remove extra newlines that might be left after cleaning
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
response_text = response_text.strip()
assistant_msg = ChatMessage(role="assistant", content=response_text)
global_workflow_agent.memory.put(assistant_msg)
return {
@ -1205,12 +1007,11 @@ async def initialize_global_index() -> bool:
"reasoning": [reasoning_step]
}
else:
# Handle tool calls if the response indicates an action
# Handle tool calls
if hasattr(reasoning_step, 'action') and reasoning_step.action:
tool_name = reasoning_step.action
action_input = reasoning_step.action_input or {}
# Convert to dict if needed
if not isinstance(action_input, dict):
try:
if isinstance(action_input, str) and action_input.strip().startswith('{'):
@ -1219,11 +1020,9 @@ async def initialize_global_index() -> bool:
action_input = {'query': action_input}
except:
action_input = {'query': str(action_input)}
# Find the tool
tool = next((t for t in global_workflow_agent.tools if t.metadata.name == tool_name), None)
if not tool:
error_msg = f"Tool '{tool_name}' not found. Available tools: {[t.metadata.name for t in global_workflow_agent.tools]}"
log_structured('error', 'Tool not found in simple_run', {'tool_name': tool_name, 'available_tools': [t.metadata.name for t in global_workflow_agent.tools]})
return {
"response": f"I tried to use a tool called '{tool_name}' but it's not available. Please try rephrasing your query.",
@ -1232,48 +1031,41 @@ async def initialize_global_index() -> bool:
}
if tool:
try:
# Execute the tool
tool_output = await asyncio.wait_for(
tool.acall(**action_input),
timeout=global_workflow_agent.tool_timeout
)
global_workflow_agent.sources.append(tool_output)
# Get a final response
observation = str(tool_output.content if tool_output and tool_output.content is not None else "No content")
follow_up_msg = ChatMessage(role="user", content=f"Here is the result: {observation}\\nPlease provide a final response based on this information.")
global_workflow_agent.memory.put(follow_up_msg)
# Call LLM again for final response
chat_history = global_workflow_agent.memory.get()
llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history)
final_response = await asyncio.wait_for(
global_workflow_agent.llm.achat(llm_input),
timeout=global_workflow_agent.llm_timeout
)
# Store and return
# Clean the response to remove any thinking parts
response_text = str(final_response.message.content)
# Check for common thinking patterns and remove them
import re # Import re within the function scope to ensure it's available
import re
thinking_patterns = [
r'(?i)^.*?thinking:.*?\n', # Remove lines starting with "Thinking:"
r'(?i)<thinking>.*?</thinking>', # Remove XML-like thinking tags
r'(?i)\[thinking\].*?\[/thinking\]', # Remove bracket thinking tags
r'(?i)I\'m thinking:.*?\n', # Remove "I'm thinking:" sections
r'(?i)Let me think.*?\n', # Remove "Let me think" sections
r'(?i)Thought:.*?Answer:', # Remove "Thought: ... Answer:" pattern
r'(?i)^Answer:\s*' # Remove just the "Answer:" prefix
r'(?i)^.*?thinking:.*?\n',
r'(?i)<thinking>.*?</thinking>',
r'(?i)\[thinking\].*?\[/thinking\]',
r'(?i)I\'m thinking:.*?\n',
r'(?i)Let me think.*?\n',
r'(?i)Thought:.*?Answer:',
r'(?i)^Answer:\s*'
]
for pattern in thinking_patterns:
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
# Remove extra newlines that might be left after cleaning
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
response_text = response_text.strip()
global_workflow_agent.memory.put(ChatMessage(role="assistant", content=response_text))
return {
"response": response_text,
@ -1282,18 +1074,15 @@ async def initialize_global_index() -> bool:
}
except Exception as e:
log_structured('error', f'Error executing tool: {str(e)}', {'traceback': traceback.format_exc()})
# Error message is already clean without thinking
error_response = f"I encountered an error while processing your query: {str(e)}"
return {
"response": error_response,
"sources": [],
"reasoning": [reasoning_step]
}
# Enhanced fallback with thinking detection
if isinstance(reasoning_step, ActionReasoningStep):
# If we reach here, it means we had an action but couldn't execute it
thinking_response = f"Thought: {getattr(reasoning_step, 'thought', '')} Action: {reasoning_step.action} Action Input: {reasoning_step.action_input}"
log_structured('warning', 'Returning raw thinking due to failed action execution', {
'action': reasoning_step.action,
'action_input': reasoning_step.action_input
@ -1304,7 +1093,6 @@ async def initialize_global_index() -> bool:
"reasoning": [reasoning_step]
}
else:
# Original fallback for other types
fallback_response = "I wasn't able to find a specific answer to your question. Please try rephrasing your query."
return {
"response": fallback_response,
@ -1313,36 +1101,29 @@ async def initialize_global_index() -> bool:
}
except Exception as e:
log_structured('error', f'Error in simple_run: {str(e)}', {'traceback': traceback.format_exc()})
# Error response is already clean
error_response = f"I encountered an error while processing your query: {str(e)}"
return {
"response": error_response,
"sources": [],
"reasoning": []
}
# Replace the run method directly on the agent that is already in shared state
# We need to get the current reference from shared_state
# Attach the run method to the agent
from shared_state import global_workflow_agent as current_agent
if current_agent is None:
log_structured('critical', 'Cannot set run method - global_workflow_agent is None')
return False
# Attach the run method directly
current_agent.run = simple_run
# Verify it was attached correctly
if not hasattr(current_agent, 'run'):
log_structured('critical', 'Failed to attach run method to agent')
return False
log_structured('info', 'Successfully attached run method to agent')
# Test the agent is working by calling a simple method
log_structured('info', 'Testing agent functionality...')
# Test on the current_agent we imported above
# Test the agent
if current_agent and hasattr(current_agent, 'memory') and hasattr(current_agent.memory, 'reset'):
current_agent.memory.reset()
log_structured('info', 'Agent memory reset test successful')
@ -1356,20 +1137,180 @@ async def initialize_global_index() -> bool:
else:
log_structured('error', 'Agent memory test failed: unknown reason')
return False
except Exception as method_err:
log_structured('critical', f'Failed to set up agent run method: {str(method_err)}',
log_structured('critical', f'Failed to set up agent run method: {str(method_err)}',
{'error': str(method_err), 'traceback': traceback.format_exc()})
return False
log_structured('info', 'Global index and workflow agent initialized successfully.')
# Skip test query to avoid potential errors during startup
log_structured('info', 'Vector index and agent initialized successfully (GraphRAG pending).')
return True
except Exception as e:
log_structured('critical', 'Global index/agent initialization failed', {
log_structured('critical', 'Vector index/agent initialization failed', {
'error': str(e), 'traceback': traceback.format_exc()
})
global_index = None
global_workflow_agent = None
return False
return False
# --- Phase 2: GraphRAG Background Initialization ---
async def initialize_graphrag_components() -> bool:
"""Initialize GraphRAG components in the background.
This connects to Neo4j, restores or extracts triples, builds communities,
creates the GraphRAG query engine, and adds the GraphRAG tool to the
existing agent. Can be called after the server is already serving requests.
Returns:
bool: True if GraphRAG components were initialized successfully.
"""
set_graphrag_status(initializing=True, ready=False, error=None)
try:
# Get the current index and agent from shared state
from shared_state import global_workflow_agent as current_agent
import shared_state
index = shared_state.global_index
if index is None:
raise RuntimeError("Vector index must be initialized before GraphRAG components")
if current_agent is None:
raise RuntimeError("Agent must be initialized before GraphRAG components")
llm = Settings.llm
# --- Connect to Neo4j and create/restore GraphRAG components ---
log_structured('info', 'Starting GraphRAG background initialization')
try:
from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore
property_graph_store = Neo4jPropertyGraphStore(
username=NEO4J_USERNAME,
password=NEO4J_PASSWORD,
url=NEO4J_URL
)
log_structured('info', 'Successfully connected to Neo4j database')
except Exception as e:
raise RuntimeError(f"Neo4j connection failed: {e}")
# Check Neo4j state
temp_graph_store = GraphRAGStore(property_graph_store)
triplets = temp_graph_store.get_triplets()
neo4j_has_data = len(triplets) > 0
log_structured('info', f'Neo4j state check: {len(triplets)} triplets found')
if neo4j_has_data:
# Neo4j already has data — use it directly
log_structured('info', f'Neo4j contains {len(triplets)} triplets. Using existing data.')
graph_store, property_graph_index = create_graph_components(
llm=llm,
force_reindex=False
)
log_structured('info', 'GraphRAG components loaded from existing Neo4j data')
else:
# Neo4j is empty — try cache restore, then fall back to LLM extraction
log_structured('info', 'Neo4j is empty. Will attempt cache restore or LLM extraction.')
# Get nodes from vector index for potential LLM extraction
vector_nodes = []
if hasattr(index, 'docstore') and index.docstore:
vector_nodes = list(index.docstore.docs.values())
log_structured('info', f'Retrieved {len(vector_nodes)} nodes from index docstore')
if not vector_nodes:
log_structured('warning', 'Could not retrieve nodes from vector index. GraphRAG indexing will be skipped.')
raise ValueError("No nodes could be retrieved from the vector index for GraphRAG indexing")
# create_graph_components will automatically try cache restore before LLM extraction
graph_store, property_graph_index = create_graph_components(
llm=llm,
nodes=vector_nodes,
max_paths_per_chunk=10,
force_reindex=False # Allow cache restore path
)
log_structured('info', 'GraphRAG components created')
# --- Build GraphRAG query engine ---
# Ensure communities are built
if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built:
log_structured('info', 'Building graph communities before creating query engine')
try:
graph_store.build_communities()
log_structured('info', 'Communities built successfully')
except Exception as comm_err:
log_structured('error', f'Error building communities: {comm_err}',
{'traceback': traceback.format_exc()})
vector_retriever = VectorIndexRetriever(
index=index,
similarity_top_k=SIMILARITY_TOP_K
)
try:
graphrag_query_engine = create_graphrag_query_engine(
vector_retriever=vector_retriever,
graph_store=graph_store,
llm=llm,
similarity_top_k=SIMILARITY_TOP_K
)
log_structured('info', 'GraphRAG query engine created successfully')
except Exception as e:
log_structured('error', f'Error creating GraphRAG query engine via factory: {e}')
graphrag_query_engine = GraphRAGQueryEngine(
vector_retriever=vector_retriever,
graph_store=graph_store,
llm=llm,
similarity_top_k=SIMILARITY_TOP_K
)
# Store GraphRAG components in shared state
set_graphrag_components(
graph_store=graph_store,
property_graph_index=property_graph_index,
graphrag_query_engine=graphrag_query_engine
)
# --- Add GraphRAG tool to the existing agent's tools ---
graphrag_tool = GraphRAGTool(graphrag_query_engine)
# Re-import current agent to get fresh reference
from shared_state import global_workflow_agent as live_agent
if live_agent is not None:
live_agent.tools.append(graphrag_tool)
log_structured('info', 'Added GraphRAG tool to existing agent', {
'total_tools': len(live_agent.tools),
'tool_names': [t.metadata.name for t in live_agent.tools]
})
else:
log_structured('warning', 'Agent not available when trying to add GraphRAG tool')
set_graphrag_status(ready=True, initializing=False)
log_structured('info', 'GraphRAG background initialization complete')
return True
except Exception as e:
log_structured('error', f'GraphRAG background initialization failed: {e}', {
'traceback': traceback.format_exc()
})
set_graphrag_status(ready=False, initializing=False, error=e)
return False
# --- Backward-compatible wrapper ---
async def initialize_global_index() -> bool:
"""Initialize the global index from Netflix documents at startup.
This is the original entry point that initializes everything synchronously.
For faster startup, use initialize_vector_index() followed by
initialize_graphrag_components() in the background.
"""
vector_success = await initialize_vector_index()
if not vector_success:
return False
graphrag_success = await initialize_graphrag_components()
if not graphrag_success:
log_structured('warning', 'GraphRAG initialization failed, but vector search is available')
# Return True since vector search works — GraphRAG is optional
return True

View file

@ -0,0 +1,738 @@
# GraphRAG Startup Optimization Guide
## Purpose
This document describes a set of changes to drastically reduce startup time for an app built on the LlamaIndex + Neo4j GraphRAG + Flask/Hypercorn architecture. The changes achieve two things:
1. **Triple caching**: Extracted entities/relationships are cached to disk so Neo4j can be repopulated without expensive LLM re-extraction.
2. **Background initialization**: The server starts serving requests with vector-only search in ~1-2 minutes, while GraphRAG components initialize in the background.
---
## Problem
On a cold start (or when Neo4j loses its data), the app spends:
| Phase | Duration | Already Cached? |
|-------|----------|-----------------|
| Document parsing (LlamaParse) | 20-30 min | Yes (vector index on disk) |
| Vector embeddings | 10-15 min | Yes (vector index on disk) |
| **Triple extraction** | **10-20 min** | **No — only lives in Neo4j** |
| **Community summarization** | **10-30 min** | Yes (pickle cache) |
| Community detection | 5-10 min | Yes (pickle cache) |
The two bolded phases are the bottleneck. The app can't serve any requests until all of this completes.
---
## Architecture Assumptions
This guide assumes your app has:
- A **graph store wrapper class** (e.g., `GraphRAGStore`) that wraps `Neo4jPropertyGraphStore` and handles community detection/summarization with existing pickle caching for community data.
- A **`create_graph_components()` function** that connects to Neo4j, checks for existing data, and either reuses data or runs LLM extraction via `PropertyGraphIndex`.
- A **monolithic initialization function** (e.g., `initialize_global_index()`) that does everything: LLM setup, vector index loading, GraphRAG setup, agent creation.
- A **shared state module** that stores global references to the agent, index, and GraphRAG components.
- A **main.py** that runs startup synchronously before the server starts.
- A **routes module** with a `/status` endpoint and a `/chat` endpoint that checks agent availability.
Adapt file names, class names, and function signatures to match your codebase.
---
## Change 1: Cache Extracted Triples to Disk
### What
Add two methods to your `GraphRAGStore` class:
- `save_triples_to_cache()` — After successful LLM extraction, save entities and relations to a pickle file.
- `load_triples_from_cache()` — On startup, if Neo4j is empty but the cache exists, restore triples to Neo4j directly.
Then modify `create_graph_components()` to use these methods.
### File: Graph RAG Integration Module (e.g., `graph_rag_integration.py`)
#### 1a. Add a triples cache file path to the store class
In the class-level constants of your `GraphRAGStore` (next to the existing community cache paths), add:
```python
TRIPLES_CACHE_FILE = CACHE_DIR / "neo4j_triples.pickle"
```
#### 1b. Add `save_triples_to_cache()` method
Add this method to `GraphRAGStore`, after `get_triplets()`:
```python
def save_triples_to_cache(self):
"""Save extracted triples (entities + relationships) from Neo4j to a disk cache.
This allows restoring triples to Neo4j without expensive LLM re-extraction
if Neo4j data is lost (e.g., container recreated without volume persistence).
"""
try:
triplets = self.get_triplets()
if not triplets:
log_structured('warning', 'No triplets to cache — Neo4j appears empty')
return False
# Collect unique entities and relations from the triplets
entities = {}
relations = []
for entity1, relation, entity2 in triplets:
entities[entity1.name] = entity1
entities[entity2.name] = entity2
relations.append(relation)
cache_data = {
'entities': list(entities.values()),
'relations': relations,
'triplet_count': len(triplets),
}
with open(self.TRIPLES_CACHE_FILE, 'wb') as f:
pickle.dump(cache_data, f)
log_structured('info', 'Successfully cached Neo4j triples to disk', {
'entity_count': len(entities),
'relation_count': len(relations),
'triplet_count': len(triplets),
'cache_file': str(self.TRIPLES_CACHE_FILE)
})
return True
except Exception as e:
log_structured('error', f'Error saving triples cache: {e}')
return False
```
#### 1c. Add `load_triples_from_cache()` method
Add this method right after `save_triples_to_cache()`:
```python
def load_triples_from_cache(self):
"""Load triples from disk cache and restore them to Neo4j.
Returns True if triples were successfully restored, False otherwise.
"""
if not self.TRIPLES_CACHE_FILE.exists():
log_structured('info', 'No triples cache file found')
return False
try:
with open(self.TRIPLES_CACHE_FILE, 'rb') as f:
cache_data = pickle.load(f)
entities = cache_data.get('entities', [])
relations = cache_data.get('relations', [])
if not entities:
log_structured('warning', 'Triples cache file exists but contains no entities')
return False
log_structured('info', 'Restoring triples from disk cache to Neo4j', {
'entity_count': len(entities),
'relation_count': len(relations),
'cached_triplet_count': cache_data.get('triplet_count', 'unknown')
})
# Restore entities (nodes) to Neo4j
self.property_graph_store.upsert_nodes(entities)
log_structured('info', f'Restored {len(entities)} entity nodes to Neo4j')
# Restore relations to Neo4j
if relations:
self.property_graph_store.upsert_relations(relations)
log_structured('info', f'Restored {len(relations)} relations to Neo4j')
# Verify restoration
restored_triplets = self.get_triplets()
log_structured('info', f'Neo4j now contains {len(restored_triplets)} triplets after cache restore')
return len(restored_triplets) > 0
except Exception as e:
log_structured('error', f'Error restoring triples from cache: {e}')
return False
```
#### 1d. Modify `create_graph_components()` to use the cache
The existing function has two branches:
1. Neo4j has data and `force_reindex=False` → skip indexing
2. Else → run LLM extraction
Change this to **three** branches:
```
1. Neo4j has data and !force_reindex → skip indexing (also save triples to cache if not already cached)
2. Neo4j is empty and !force_reindex and triples cache exists → restore from cache
3. Else → full LLM extraction (also save triples to cache after)
```
Here is the full replacement logic for the branching section inside `create_graph_components()`:
```python
# Check if Neo4j already has content
triplets = graph_store.get_triplets()
has_existing_content = len(triplets) > 0
log_structured('info', f'Neo4j check: Found {len(triplets)} triplets')
if has_existing_content and not force_reindex:
# BRANCH 1: Neo4j has data — use it, but also ensure disk cache exists
log_structured('info', f'Neo4j already contains {len(triplets)} triplets. Skipping indexing.')
# Ensure triples are also cached to disk for future recovery
if not graph_store.TRIPLES_CACHE_FILE.exists():
log_structured('info', 'Neo4j has data but no triples cache on disk — creating cache now')
graph_store.save_triples_to_cache()
property_graph_index = PropertyGraphIndex(
nodes=[],
property_graph_store=property_graph_store,
)
if not graph_store.communities_built:
log_structured('info', 'Building graph communities from existing Neo4j data')
try:
graph_store.build_communities()
except Exception as e:
log_structured('error', f'Error building communities: {e}')
elif not has_existing_content and not force_reindex and graph_store.TRIPLES_CACHE_FILE.exists():
# BRANCH 2: Neo4j is empty but triples cache exists — restore from cache
log_structured('info', 'Neo4j is empty but triples cache exists. Restoring from disk cache.')
restored = graph_store.load_triples_from_cache()
if restored:
log_structured('info', 'Successfully restored triples from cache.')
property_graph_index = PropertyGraphIndex(
nodes=[],
property_graph_store=property_graph_store,
)
if not graph_store.communities_built:
try:
graph_store.build_communities()
except Exception as e:
log_structured('error', f'Error building communities from restored data: {e}')
else:
# Cache restore failed — fall through to LLM extraction
log_structured('warning', 'Triples cache restore failed. Falling back to LLM extraction.')
if not nodes:
raise ValueError("Nodes must be provided when Neo4j is empty and cache restore fails")
kg_extractor = GraphRAGExtractor(
llm=llm,
extract_prompt=KG_TRIPLET_EXTRACT_TMPL,
max_paths_per_chunk=max_paths_per_chunk,
parse_fn=custom_parse_fn,
)
property_graph_index = PropertyGraphIndex(
nodes=nodes,
kg_extractors=[kg_extractor],
property_graph_store=property_graph_store,
show_progress=True,
)
graph_store.save_triples_to_cache()
try:
graph_store.build_communities()
except Exception as e:
log_structured('error', f'Error building communities: {e}')
else:
# BRANCH 3: Full LLM extraction (force_reindex or no cache)
if not nodes:
raise ValueError("Nodes must be provided for indexing when Neo4j is empty or force_reindex=True")
kg_extractor = GraphRAGExtractor(
llm=llm,
extract_prompt=KG_TRIPLET_EXTRACT_TMPL,
max_paths_per_chunk=max_paths_per_chunk,
parse_fn=custom_parse_fn,
)
if has_existing_content and force_reindex:
# Clear Neo4j before re-extraction
try:
from neo4j import GraphDatabase
driver = GraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))
with driver.session() as session:
session.run("MATCH (n) DETACH DELETE n")
driver.close()
except Exception as e:
log_structured('warning', f'Error clearing Neo4j database: {e}')
property_graph_index = PropertyGraphIndex(
nodes=nodes,
kg_extractors=[kg_extractor],
property_graph_store=property_graph_store,
show_progress=True,
)
# Cache the newly extracted triples
graph_store.save_triples_to_cache()
try:
graph_store.build_communities()
except Exception as e:
log_structured('error', f'Error building communities: {e}')
return graph_store, property_graph_index
```
---
## Change 2: Add GraphRAG Status Flags to Shared State
### File: Shared State Module (e.g., `shared_state.py`)
#### 2a. Add new module-level variables
After the existing GraphRAG component variables, add:
```python
# GraphRAG initialization status
graphrag_ready = False
graphrag_initializing = False
graphrag_error = None
```
#### 2b. Add setter/getter functions
Add these functions before `is_agent_available()`:
```python
def set_graphrag_status(ready=None, initializing=None, error=None):
"""Update GraphRAG initialization status flags."""
global graphrag_ready, graphrag_initializing, graphrag_error
from utils import log_structured
if ready is not None:
graphrag_ready = ready
if initializing is not None:
graphrag_initializing = initializing
if error is not None:
graphrag_error = error
log_structured('info', 'GraphRAG status updated', {
'ready': graphrag_ready,
'initializing': graphrag_initializing,
'error': str(graphrag_error) if graphrag_error else None
})
def get_graphrag_status():
"""Get current GraphRAG initialization status."""
return {
'ready': graphrag_ready,
'initializing': graphrag_initializing,
'error': str(graphrag_error) if graphrag_error else None
}
```
---
## Change 3: Split Initialization into Two Phases
### File: AI Core Module (e.g., `ai_core.py`)
This is the largest change. You'll split your monolithic initialization function into:
1. **`initialize_vector_index()`** — Fast path (~1-2 min). Loads/creates vector index, creates vector query engine, creates agent with vector-only tools, attaches the `run` method. The server becomes usable after this completes.
2. **`initialize_graphrag_components()`** — Background path. Connects to Neo4j, restores/extracts triples, builds communities, creates GraphRAG query engine, and **dynamically adds the GraphRAG tool** to the existing agent's tools list.
3. **`initialize_global_index()`** — Kept as backward-compatible wrapper that calls both in sequence.
#### 3a. Update imports from shared_state
Add `set_graphrag_status` to your shared_state imports:
```python
from shared_state import (
...,
set_graphrag_status
)
```
#### 3b. Extract your GraphRAG Tool class to module level
If your GraphRAG tool class (the `BaseTool` subclass that wraps `GraphRAGQueryEngine`) is defined inside the initialization function, move it to module level. This makes it reusable from both the init function and the background init function.
The tool class should remain functionally identical — just move it out of the function body to module scope. Example:
```python
class GraphRAGTool(BaseTool):
"""Tool that queries using both vector and graph-based retrieval."""
def __init__(self, query_engine):
self.query_engine = query_engine
self._metadata = ToolMetadata(
name="answerquestionswith_graphrag",
description="Your tool description here"
)
@property
def metadata(self):
return self._metadata
def __call__(self, query_str: str) -> ToolOutput:
# ... existing implementation unchanged ...
pass
async def acall(self, input: str) -> ToolOutput:
return self.__call__(input)
```
#### 3c. Create `initialize_vector_index()`
This function contains everything from your original `initialize_global_index()` **except** the GraphRAG-related code:
- LLM and embedding model setup
- Global Settings configuration
- Vector index loading or creation (including document processing for cold starts)
- Vector query engine creation
- Agent creation with **vector-only** tools
- `simple_run` method attachment and agent testing
**Key difference from the original**: No GraphRAG components, no Neo4j connection, no community building. The agent is created with only the vector query tool.
The `simple_run` function works without modification because it dynamically looks up the GraphRAG tool:
```python
graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
if graphrag_tool:
# use it
# else: falls through to vector-only path
```
When GraphRAG isn't initialized yet, `graphrag_tool` is `None` and the code gracefully falls back to the direct response path.
#### 3d. Create `initialize_graphrag_components()`
This function handles everything GraphRAG-related:
```python
async def initialize_graphrag_components() -> bool:
"""Initialize GraphRAG components in the background."""
set_graphrag_status(initializing=True, ready=False, error=None)
try:
# Get current index and agent from shared state
from shared_state import global_workflow_agent as current_agent
import shared_state
index = shared_state.global_index
if index is None:
raise RuntimeError("Vector index must be initialized first")
if current_agent is None:
raise RuntimeError("Agent must be initialized first")
llm = Settings.llm
# Connect to Neo4j
property_graph_store = Neo4jPropertyGraphStore(
username=NEO4J_USERNAME,
password=NEO4J_PASSWORD,
url=NEO4J_URL
)
# Check Neo4j state and create/restore components
temp_graph_store = GraphRAGStore(property_graph_store)
triplets = temp_graph_store.get_triplets()
neo4j_has_data = len(triplets) > 0
if neo4j_has_data:
graph_store, property_graph_index = create_graph_components(
llm=llm, force_reindex=False
)
else:
# Get nodes from vector index for potential extraction
vector_nodes = []
if hasattr(index, 'docstore') and index.docstore:
vector_nodes = list(index.docstore.docs.values())
if not vector_nodes:
raise ValueError("No nodes available for GraphRAG indexing")
# create_graph_components will try cache restore first, then LLM extraction
graph_store, property_graph_index = create_graph_components(
llm=llm,
nodes=vector_nodes,
max_paths_per_chunk=10,
force_reindex=False # Allow cache restore path
)
# Ensure communities are built
if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built:
graph_store.build_communities()
# Create GraphRAG query engine
vector_retriever = VectorIndexRetriever(
index=index, similarity_top_k=SIMILARITY_TOP_K
)
graphrag_query_engine = create_graphrag_query_engine(
vector_retriever=vector_retriever,
graph_store=graph_store,
llm=llm,
similarity_top_k=SIMILARITY_TOP_K
)
# Store in shared state
set_graphrag_components(
graph_store=graph_store,
property_graph_index=property_graph_index,
graphrag_query_engine=graphrag_query_engine
)
# *** KEY STEP: Add GraphRAG tool to the EXISTING agent ***
graphrag_tool = GraphRAGTool(graphrag_query_engine)
from shared_state import global_workflow_agent as live_agent
if live_agent is not None:
live_agent.tools.append(graphrag_tool)
log_structured('info', 'Added GraphRAG tool to existing agent', {
'total_tools': len(live_agent.tools),
'tool_names': [t.metadata.name for t in live_agent.tools]
})
set_graphrag_status(ready=True, initializing=False)
return True
except Exception as e:
log_structured('error', f'GraphRAG background initialization failed: {e}')
set_graphrag_status(ready=False, initializing=False, error=e)
return False
```
**Critical detail**: The GraphRAG tool is added to the agent's tools list dynamically via `live_agent.tools.append(graphrag_tool)`. Because `simple_run` does a dynamic lookup on `global_workflow_agent.tools` every time it runs, the GraphRAG tool becomes available to all subsequent requests without restarting the agent.
#### 3e. Keep `initialize_global_index()` as a backward-compatible wrapper
```python
async def initialize_global_index() -> bool:
"""Backward-compatible wrapper: initializes everything synchronously."""
vector_success = await initialize_vector_index()
if not vector_success:
return False
graphrag_success = await initialize_graphrag_components()
if not graphrag_success:
log_structured('warning', 'GraphRAG init failed, but vector search is available')
return True
```
---
## Change 4: Update Main Startup for Background Init
### File: Main Module (e.g., `main.py`)
#### 4a. Update imports
```python
from ai_core import initialize_global_index, initialize_vector_index, initialize_graphrag_components
from shared_state import global_workflow_agent, is_agent_available, get_graphrag_status
```
#### 4b. Modify `startup_event()` for two-phase startup
```python
async def startup_event() -> bool:
"""Phase 1 (sync): MongoDB + vector index. Phase 2 (background): GraphRAG."""
log_structured('info', "Application startup sequence initiated.")
all_success = True
# 1. Initialize MongoDB
try:
if init_mongodb():
log_structured('info', "MongoDB initialized successfully.")
else:
all_success = False
except Exception as db_err:
log_structured('critical', f"MongoDB init failed: {db_err}")
all_success = False
# 2. Phase 1: Vector index + agent (fast)
log_structured('info', "Phase 1: Initializing vector index and agent...")
vector_success = await initialize_vector_index()
if not is_agent_available() or not vector_success:
log_structured('critical', "Vector initialization failed")
all_success = False
else:
log_structured('info', "Phase 1 complete: server is ready for vector queries")
# 3. Phase 2: GraphRAG in background
if vector_success:
log_structured('info', "Phase 2: Launching GraphRAG initialization in background...")
async def _background_graphrag_init():
try:
success = await initialize_graphrag_components()
if success:
log_structured('info', "GraphRAG background init completed")
else:
log_structured('warning', "GraphRAG background init failed — vector search still works")
except Exception as e:
log_structured('error', f"GraphRAG background init error: {e}")
# Schedule as background task — does NOT block server startup
asyncio.ensure_future(_background_graphrag_init())
log_structured('info', f"Startup complete. Server ready: {all_success}")
return all_success
```
#### 4c. Restructure the main execution block
The old pattern of `asyncio.run(startup_event())` followed by `asyncio.run(serve(app, config))` won't work because background tasks launched by `asyncio.ensure_future()` need to run in the same event loop as the server. Restructure:
```python
if __name__ == '__main__':
from hypercorn.config import Config as HypercornConfig
from hypercorn.asyncio import serve as hypercorn_serve
config = HypercornConfig()
# ... config setup ...
async def run_server_with_startup():
"""Run startup (with background GraphRAG init) then serve."""
await startup_event()
# Double-check agent
if not is_agent_available():
log_structured('critical', "Agent unavailable. Forcing re-init...")
await initialize_vector_index()
if is_agent_available():
asyncio.ensure_future(initialize_graphrag_components())
# Start serving — background GraphRAG init continues in same event loop
await hypercorn_serve(app, config)
try:
asyncio.run(run_server_with_startup())
except KeyboardInterrupt:
log_structured('info', "Server stopped manually.")
```
**Why this matters**: `asyncio.ensure_future()` schedules a coroutine on the *current* event loop. If you use two separate `asyncio.run()` calls, the background task from the first call is lost when the first event loop closes. By wrapping everything in a single `async def` and calling `asyncio.run()` once, the background GraphRAG init shares the server's event loop and runs concurrently with request handling.
---
## Change 5: Update Routes for Graceful Degradation
### File: Routes Module (e.g., `routes.py`)
#### 5a. Update imports
```python
from ai_core import initialize_global_index, initialize_vector_index
from shared_state import ..., get_graphrag_status
```
#### 5b. Update `/status` endpoint
Add GraphRAG status to the response:
```python
graphrag_status = get_graphrag_status()
status_data = {
# ... existing fields ...
'graphrag_ready': graphrag_status['ready'],
'graphrag_initializing': graphrag_status['initializing'],
'graphrag_error': graphrag_status['error'],
}
```
#### 5c. Update on-demand initialization in `/chat`
If the `/chat` endpoint has a fallback that tries to initialize on-demand when the agent is unavailable, change it to use `initialize_vector_index()` instead of `initialize_global_index()` for faster recovery:
```python
# Old:
index_success = await initialize_global_index()
# New:
index_success = await initialize_vector_index()
```
#### 5d. No changes needed for chat logic
The `/chat` endpoint itself needs no changes for graceful degradation. The agent's `simple_run` method already dynamically checks for GraphRAG tool availability:
```python
graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
if graphrag_tool:
# Use GraphRAG — only happens after background init completes
else:
# Fall through to vector-only response
```
---
## Change 6: Docker Compose Documentation
### File: Neo4j Docker Compose (e.g., `docker-compose-neo4j.yml`)
Add a comment at the top explaining volume persistence:
```yaml
# IMPORTANT: Neo4j data persists in bind-mounted directories (./neo4j/data, etc.).
# These survive `docker-compose down` and `docker-compose up -d` restarts.
# However, if you delete the data directory manually, all graph data
# (extracted triples, entities, relationships) will be lost and must be
# re-extracted via LLM calls (10-20+ minutes) or restored from the disk cache
# at index_storage/graphrag_cache/neo4j_triples.pickle.
```
---
## Verification Checklist
### Test 1: Warm start (Neo4j has data, vector index cached)
- [ ] Server starts and accepts chat requests within ~1-2 minutes
- [ ] `/status` shows `graphrag_initializing: true` briefly, then `graphrag_ready: true`
- [ ] Chat requests work with vector search immediately
- [ ] GraphRAG tool becomes available after background init completes
### Test 2: Cold Neo4j with caches (Neo4j empty, `index_storage/` intact)
- [ ] Server starts with vector queries in ~1-2 minutes
- [ ] Background init restores triples from `neo4j_triples.pickle` (check logs for "Restoring triples from disk cache")
- [ ] No LLM extraction calls (check logs — should NOT see "Building PropertyGraphIndex via LLM extraction")
- [ ] GraphRAG becomes available after cache restore + community build
### Test 3: Full cold start (no `index_storage/`, no Neo4j data)
- [ ] Full rebuild runs (document parsing → vector index → LLM extraction → community building)
- [ ] `neo4j_triples.pickle` is created after LLM extraction
- [ ] Community cache files are created after community building
- [ ] Everything works on subsequent restarts using caches
### Test 4: Degraded mode
- [ ] Send a chat request before GraphRAG finishes initializing
- [ ] Response comes back using vector-only search
- [ ] No errors in response — just missing GraphRAG context
- [ ] After GraphRAG init completes, subsequent requests use both vector and GraphRAG
### Test 5: Cache integrity
- [ ] After a successful startup with Neo4j data, verify `neo4j_triples.pickle` exists in `index_storage/graphrag_cache/`
- [ ] Delete Neo4j data, restart — verify cache restore works
- [ ] Delete the pickle file, restart with Neo4j data — verify it gets recreated
---
## File Change Summary
| File | Changes |
|------|---------|
| Graph RAG integration module | `save_triples_to_cache()`, `load_triples_from_cache()`, 3-branch logic in `create_graph_components()` |
| Shared state module | `graphrag_ready`/`graphrag_initializing`/`graphrag_error` flags, `set_graphrag_status()`, `get_graphrag_status()` |
| AI core module | Extract GraphRAG tool class to module level, split into `initialize_vector_index()` + `initialize_graphrag_components()`, keep `initialize_global_index()` as wrapper |
| Main module | Two-phase startup in `startup_event()`, single `asyncio.run()` wrapping both startup and serve |
| Routes module | Import new functions, add GraphRAG status to `/status`, use `initialize_vector_index()` for on-demand init |
| Docker compose | Documentation comment about volume persistence |

View file

@ -183,6 +183,7 @@ class GraphRAGStore:
CACHE_DIR = Path("index_storage/graphrag_cache")
COMMUNITY_CACHE_FILE = CACHE_DIR / "community_summary.pickle"
ENTITY_INFO_CACHE_FILE = CACHE_DIR / "entity_info.pickle"
TRIPLES_CACHE_FILE = CACHE_DIR / "neo4j_triples.pickle"
def __init__(self, property_graph_store):
"""Initialize with a property_graph_store (Neo4j or in-memory)."""
@ -206,6 +207,90 @@ class GraphRAGStore:
"""Get triplets from the property graph store."""
return self.property_graph_store.get_triplets()
def save_triples_to_cache(self):
"""Save extracted triples (entities + relationships) from Neo4j to a disk cache.
This allows restoring triples to Neo4j without expensive LLM re-extraction
if Neo4j data is lost (e.g., container recreated without volume persistence).
"""
try:
triplets = self.get_triplets()
if not triplets:
log_structured('warning', 'No triplets to cache — Neo4j appears empty')
return False
# Collect unique entities and relations from the triplets
entities = {}
relations = []
for entity1, relation, entity2 in triplets:
entities[entity1.name] = entity1
entities[entity2.name] = entity2
relations.append(relation)
cache_data = {
'entities': list(entities.values()),
'relations': relations,
'triplet_count': len(triplets),
}
with open(self.TRIPLES_CACHE_FILE, 'wb') as f:
pickle.dump(cache_data, f)
log_structured('info', 'Successfully cached Neo4j triples to disk', {
'entity_count': len(entities),
'relation_count': len(relations),
'triplet_count': len(triplets),
'cache_file': str(self.TRIPLES_CACHE_FILE)
})
return True
except Exception as e:
log_structured('error', f'Error saving triples cache: {e}')
return False
def load_triples_from_cache(self):
"""Load triples from disk cache and restore them to Neo4j.
Returns True if triples were successfully restored, False otherwise.
"""
if not self.TRIPLES_CACHE_FILE.exists():
log_structured('info', 'No triples cache file found at %s', str(self.TRIPLES_CACHE_FILE))
return False
try:
with open(self.TRIPLES_CACHE_FILE, 'rb') as f:
cache_data = pickle.load(f)
entities = cache_data.get('entities', [])
relations = cache_data.get('relations', [])
if not entities:
log_structured('warning', 'Triples cache file exists but contains no entities')
return False
log_structured('info', 'Restoring triples from disk cache to Neo4j', {
'entity_count': len(entities),
'relation_count': len(relations),
'cached_triplet_count': cache_data.get('triplet_count', 'unknown')
})
# Restore entities (nodes) to Neo4j
self.property_graph_store.upsert_nodes(entities)
log_structured('info', f'Restored {len(entities)} entity nodes to Neo4j')
# Restore relations to Neo4j
if relations:
self.property_graph_store.upsert_relations(relations)
log_structured('info', f'Restored {len(relations)} relations to Neo4j')
# Verify restoration
restored_triplets = self.get_triplets()
log_structured('info', f'Neo4j now contains {len(restored_triplets)} triplets after cache restore')
return len(restored_triplets) > 0
except Exception as e:
log_structured('error', f'Error restoring triples from cache: {e}')
return False
def generate_community_summary(self, text):
"""Generate summary for a given text using an LLM with handling for large contexts."""
@ -691,13 +776,18 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind
if has_existing_content and not force_reindex:
log_structured('info', f'Neo4j already contains {len(triplets)} triplets. Skipping indexing.')
# Ensure triples are also cached to disk for future recovery
if not graph_store.TRIPLES_CACHE_FILE.exists():
log_structured('info', 'Neo4j has data but no triples cache on disk — creating cache now')
graph_store.save_triples_to_cache()
# Create a minimal PropertyGraphIndex without indexing
property_graph_index = PropertyGraphIndex(
nodes=[], # Empty nodes since we're not indexing
property_graph_store=property_graph_store,
)
# Build communities from existing data (if not already built)
if not graph_store.communities_built:
log_structured('info', 'Building graph communities from existing Neo4j data')
@ -708,12 +798,64 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind
log_structured('error', f'Error building communities: {e}')
else:
log_structured('info', 'Communities already built, skipping rebuild')
elif not has_existing_content and not force_reindex and graph_store.TRIPLES_CACHE_FILE.exists():
# Neo4j is empty but we have a triples cache — restore from cache instead of LLM extraction
log_structured('info', 'Neo4j is empty but triples cache exists. Restoring from disk cache (skipping LLM extraction).')
restored = graph_store.load_triples_from_cache()
if restored:
log_structured('info', 'Successfully restored triples from cache. Skipping LLM-based extraction.')
# Create a minimal PropertyGraphIndex (no extraction needed)
property_graph_index = PropertyGraphIndex(
nodes=[],
property_graph_store=property_graph_store,
)
# Build communities from restored data
if not graph_store.communities_built:
log_structured('info', 'Building graph communities from restored Neo4j data')
try:
graph_store.build_communities()
log_structured('info', 'Communities built successfully from restored data')
except Exception as e:
log_structured('error', f'Error building communities from restored data: {e}')
else:
# Cache restore failed — fall through to full LLM extraction
log_structured('warning', 'Triples cache restore failed. Falling back to LLM extraction.')
if not nodes:
raise ValueError("Nodes must be provided for indexing when Neo4j is empty and cache restore fails")
kg_extractor = GraphRAGExtractor(
llm=llm,
extract_prompt=KG_TRIPLET_EXTRACT_TMPL,
max_paths_per_chunk=max_paths_per_chunk,
parse_fn=custom_parse_fn,
)
log_structured('info', 'Building PropertyGraphIndex via LLM extraction', {'node_count': len(nodes)})
property_graph_index = PropertyGraphIndex(
nodes=nodes,
kg_extractors=[kg_extractor],
property_graph_store=property_graph_store,
show_progress=True,
)
# Cache the newly extracted triples for future recovery
graph_store.save_triples_to_cache()
log_structured('info', 'Building graph communities')
try:
graph_store.build_communities()
log_structured('info', 'Communities built successfully')
except Exception as e:
log_structured('error', f'Error building communities: {e}')
else:
# Need to perform indexing
# Need to perform full LLM-based indexing
if not nodes:
log_structured('error', 'No nodes provided for indexing and Neo4j is empty or force_reindex=True')
raise ValueError("Nodes must be provided for indexing when Neo4j is empty or force_reindex=True")
# Create the knowledge graph extractor
kg_extractor = GraphRAGExtractor(
llm=llm,
@ -721,12 +863,10 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind
max_paths_per_chunk=max_paths_per_chunk,
parse_fn=custom_parse_fn,
)
if has_existing_content and force_reindex:
log_structured('info', 'Force reindexing requested. Clearing existing Neo4j data.')
try:
# Try to clear the graph using Neo4j's native query
# Note: This requires the Neo4j APOC plugin to be installed
from neo4j import GraphDatabase
driver = GraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))
with driver.session() as session:
@ -735,16 +875,19 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind
log_structured('info', 'Successfully cleared Neo4j database')
except Exception as e:
log_structured('warning', f'Error clearing Neo4j database: {e}. Proceeding with indexing anyway.')
# Build the property graph index
log_structured('info', 'Building PropertyGraphIndex', {'node_count': len(nodes)})
# Build the property graph index (this is the expensive LLM extraction step)
log_structured('info', 'Building PropertyGraphIndex via LLM extraction', {'node_count': len(nodes)})
property_graph_index = PropertyGraphIndex(
nodes=nodes,
kg_extractors=[kg_extractor],
property_graph_store=property_graph_store,
show_progress=True,
)
# Cache the newly extracted triples for future recovery
graph_store.save_triples_to_cache()
# Build communities
log_structured('info', 'Building graph communities')
try:
@ -752,7 +895,7 @@ def create_graph_components(llm, nodes=None, max_paths_per_chunk=10, force_reind
log_structured('info', 'Communities built successfully')
except Exception as e:
log_structured('error', f'Error building communities: {e}')
return graph_store, property_graph_index
def create_graphrag_query_engine(vector_retriever, graph_store, llm, similarity_top_k=20):

94
main.py
View file

@ -20,8 +20,8 @@ from config import (
)
from utils import logger, log_structured
from json_utils import CustomJSONProvider
from ai_core import initialize_global_index # Import the initialization function
from shared_state import global_workflow_agent, is_agent_available # Import shared state
from ai_core import initialize_global_index, initialize_vector_index, initialize_graphrag_components # Import initialization functions
from shared_state import global_workflow_agent, is_agent_available, get_graphrag_status # Import shared state
from routes import register_routes
from init_mongodb import init_mongodb # Your MongoDB initialization script
@ -59,43 +59,61 @@ log_structured('info', "Flask routes registered.")
# --- Startup Function ---
async def startup_event() -> bool:
"""Tasks to run when the application starts.
Phase 1 (synchronous): MongoDB + vector index + agent (~1-2 min).
Phase 2 (background): GraphRAG components (Neo4j, triples, communities).
Returns:
bool: True if all startup tasks completed successfully, False otherwise
bool: True if Phase 1 completed successfully (server is usable).
"""
log_structured('info', "Application startup sequence initiated.")
all_success = True
# 1. Initialize MongoDB Connection & Schema (using your script)
# 1. Initialize MongoDB Connection & Schema
log_structured('info', "Initializing MongoDB connection...")
mongo_success = False
try:
if init_mongodb():
log_structured('info', "MongoDB initialized successfully.")
mongo_success = True
else:
log_structured('warning', "MongoDB initialization script finished, but reported issues.")
all_success = False
except Exception as db_err:
log_structured('critical', "FATAL: MongoDB initialization failed.", {'error': str(db_err)})
all_success = False
# We'll continue in a degraded state
# 2. Initialize Global AI Index and Agent
log_structured('info', "Initializing global AI index and agent...")
index_success = await initialize_global_index()
# Explicitly check the status after initialization
# 2. Phase 1: Initialize vector index and agent (fast path)
log_structured('info', "Phase 1: Initializing vector index and agent...")
vector_success = await initialize_vector_index()
if not is_agent_available():
log_structured('critical', "After initialize_global_index, global_workflow_agent is still unavailable, even though function may have reported success")
log_structured('critical', "After initialize_vector_index, agent is still unavailable")
all_success = False
elif not index_success:
log_structured('warning', "AI initialization reported failure, but will continue in degraded state")
elif not vector_success:
log_structured('warning', "Vector initialization reported failure")
all_success = False
else:
log_structured('info', "AI initialization successful, global_workflow_agent is available")
log_structured('info', "Phase 1 complete: vector index and agent are available")
log_structured('info', f"Application startup sequence complete. Overall success: {all_success}")
# 3. Phase 2: Launch GraphRAG initialization as a background task
if vector_success:
log_structured('info', "Phase 2: Launching GraphRAG initialization in background...")
async def _background_graphrag_init():
try:
success = await initialize_graphrag_components()
if success:
log_structured('info', "Background GraphRAG initialization completed successfully")
else:
log_structured('warning', "Background GraphRAG initialization failed — vector search still works")
except Exception as e:
log_structured('error', f"Background GraphRAG initialization error: {e}")
# Schedule as a background task — does not block server startup
asyncio.ensure_future(_background_graphrag_init())
else:
log_structured('warning', "Skipping GraphRAG background init because vector init failed")
log_structured('info', f"Application startup sequence complete. Server ready: {all_success}")
return all_success
@ -144,25 +162,29 @@ if __name__ == '__main__':
log_structured('info', f"Starting Hypercorn server on {SERVER_HOST}:{SERVER_PORT}")
log_structured('info', f"Reload mode: {'Enabled' if USE_RELOADER else 'Disabled'}")
# Execute startup task before running the server
log_structured('info', "Manually executing startup sequence before server start")
startup_success = asyncio.run(startup_event())
# Double-check that the agent is initialized
if not is_agent_available():
log_structured('critical', "After startup, global_workflow_agent is still unavailable. Forcing re-initialization...")
# Try once more to initialize
index_success = asyncio.run(initialize_global_index())
if not index_success or not is_agent_available():
log_structured('critical', "Emergency initialization also failed. Server will run but chat functionality will be impaired.")
else:
log_structured('info', "Emergency initialization succeeded.")
async def run_server_with_startup():
"""Run startup (with background GraphRAG init) then serve."""
# Phase 1 runs synchronously, Phase 2 launches as background task
startup_success = await startup_event()
# Double-check agent is initialized
if not is_agent_available():
log_structured('critical', "After startup, agent is still unavailable. Forcing re-initialization...")
vector_success = await initialize_vector_index()
if not vector_success or not is_agent_available():
log_structured('critical', "Emergency initialization failed. Server will run but chat will be impaired.")
else:
log_structured('info', "Emergency initialization succeeded.")
# Also try GraphRAG in background
asyncio.ensure_future(initialize_graphrag_components())
# Start serving — background GraphRAG init continues in same event loop
await hypercorn_serve(app, config)
# Run the server
try:
asyncio.run(hypercorn_serve(app, config))
asyncio.run(run_server_with_startup())
except KeyboardInterrupt:
log_structured('info', "Server stopped manually (KeyboardInterrupt).")
log_structured('info', "Server stopped manually (KeyboardInterrupt).")
except Exception as run_err:
log_structured('critical', "Hypercorn server failed to run.", {'error': str(run_err)})
sys.exit(1)
log_structured('critical', "Hypercorn server failed to run.", {'error': str(run_err)})
sys.exit(1)

View file

@ -1,5 +1,12 @@
version: '3'
# IMPORTANT: Neo4j data persists in bind-mounted directories (./neo4j/data, etc.).
# These survive `docker-compose down` and `docker-compose up -d` restarts.
# However, if you delete the ./neo4j/data directory manually, all graph data
# (extracted triples, entities, relationships) will be lost and must be
# re-extracted via LLM calls (10-20+ minutes) or restored from the disk cache
# at index_storage/graphrag_cache/neo4j_triples.pickle.
services:
neo4j:
image: neo4j:latest

View file

@ -19,8 +19,8 @@ from werkzeug.utils import secure_filename # Keep if file uploads are reintroduc
from utils import logger, log_structured, allowed_file # Keep allowed_file if uploads return
from json_utils import CustomJSONEncoder # Used indirectly by make_response/jsonify with custom provider
from session_manager import get_or_create_session_state, clear_chat_state_cache
from ai_core import initialize_global_index # Import initialization function
from shared_state import global_workflow_agent, global_index, is_agent_available # Import shared state variables
from ai_core import initialize_global_index, initialize_vector_index # Import initialization functions
from shared_state import global_workflow_agent, global_index, is_agent_available, get_graphrag_status # Import shared state variables
from document_generator import create_brief_docx
from config import IMAGES_DIRECTORY, NETFLIX_DOCS_FOLDER, BASE_DIR, SUPPORTING_FILES_DIR # Import necessary configs
from llama_index.core.tools import ToolOutput # Import ToolOutput for type checking
@ -122,10 +122,10 @@ def register_routes(app: Flask):
if not is_agent_available():
log_structured('error', 'Global workflow agent is not initialized')
# Try to initialize it on-demand if not available
# Try to initialize it on-demand if not available (vector-only for speed)
try:
log_structured('info', 'Attempting to initialize global index and agent on-demand')
index_success = await initialize_global_index()
log_structured('info', 'Attempting to initialize vector index and agent on-demand')
index_success = await initialize_vector_index()
# Let initialization complete, then check the global variable
if index_success:
@ -616,13 +616,18 @@ def register_routes(app: Flask):
# SUPER SIMPLIFIED - ALWAYS RETURN INITIALIZED
log_structured('info', 'Status check: ALWAYS returning initialized=true')
graphrag_status = get_graphrag_status()
status_data = {
'global_status': 'initialized',
'initialized': True,
'is_initialized': True,
'timestamp': datetime.now().isoformat(),
'override': 'FORCE_INITIALIZED'
'override': 'FORCE_INITIALIZED',
'graphrag_ready': graphrag_status['ready'],
'graphrag_initializing': graphrag_status['initializing'],
'graphrag_error': graphrag_status['error'],
}
if session_id:
@ -721,9 +726,9 @@ def register_routes(app: Flask):
# This is a development-only endpoint; disable in production
if os.environ.get("PRODUCTION", "false").lower() == "true":
return jsonify({'error': 'Debug endpoints not available in production'}), 403
from ai_core import initialize_global_index
try:
# Log current state before reinitializing
log_structured('info', 'Reinitializing global agent', {
@ -731,8 +736,8 @@ def register_routes(app: Flask):
'agent_none_before': global_workflow_agent is None,
'index_none_before': global_index is None
})
# Attempt to reinitialize
# Attempt full reinitialize (vector + GraphRAG)
success = await initialize_global_index()
# Import fresh state after initialization

View file

@ -15,6 +15,11 @@ global_graph_store = None
global_property_graph_index = None
global_graphrag_query_engine = None
# GraphRAG initialization status
graphrag_ready = False
graphrag_initializing = False
graphrag_error = None
# Helper to set the global agent
def set_global_agent(agent):
"""Set the global agent instance."""
@ -73,6 +78,33 @@ def set_graphrag_components(graph_store, property_graph_index, graphrag_query_en
log_structured('info', f'GraphRAG components set successfully: {components_set}')
return components_set
# Helper to set/get GraphRAG initialization status
def set_graphrag_status(ready=None, initializing=None, error=None):
"""Update GraphRAG initialization status flags."""
global graphrag_ready, graphrag_initializing, graphrag_error
from utils import log_structured
if ready is not None:
graphrag_ready = ready
if initializing is not None:
graphrag_initializing = initializing
if error is not None:
graphrag_error = error
log_structured('info', 'GraphRAG status updated', {
'ready': graphrag_ready,
'initializing': graphrag_initializing,
'error': str(graphrag_error) if graphrag_error else None
})
def get_graphrag_status():
"""Get current GraphRAG initialization status."""
return {
'ready': graphrag_ready,
'initializing': graphrag_initializing,
'error': str(graphrag_error) if graphrag_error else None
}
# Helper to get agent status
def is_agent_available():
"""