netflix/shared_state.py
michael fe0d881341 Speed up GraphRAG startup with triple caching and background init
Server now starts serving vector-only queries in ~1-2 minutes instead of
30-60 minutes. GraphRAG initializes in a background task and its tool is
dynamically added to the agent when ready.

- Cache extracted triples to disk (neo4j_triples.pickle) so Neo4j can be
  repopulated without expensive LLM re-extraction
- Split initialize_global_index() into initialize_vector_index() (fast) and
  initialize_graphrag_components() (background)
- Add graphrag_ready/graphrag_initializing status flags to shared_state
- Launch GraphRAG init as asyncio background task in main.py
- Report GraphRAG status in /status endpoint for frontend awareness
- Add comprehensive migration guide for applying to other projects

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 17:33:19 -06:00

134 lines
No EOL
4.6 KiB
Python

# netflix_chatbot/shared_state.py
"""
Shared state module to store global variables that need to be
accessible across different modules and ensures proper synchronization.
"""
# Store the AI agent here so it's properly shared across modules
global_workflow_agent = None
# Store the global index here
global_index = None
# Store GraphRAG components
global_graph_store = None
global_property_graph_index = None
global_graphrag_query_engine = None
# GraphRAG initialization status
graphrag_ready = False
graphrag_initializing = False
graphrag_error = None
# Helper to set the global agent
def set_global_agent(agent):
"""Set the global agent instance."""
global global_workflow_agent
from utils import log_structured
if agent is None:
log_structured('error', 'Attempted to set global_workflow_agent to None')
return False
try:
# Check that the agent has a run method
if not hasattr(agent, 'run'):
log_structured('error', 'Agent being set does not have a run method')
return False
# Set the global agent
global_workflow_agent = agent
# Verify it was set correctly
has_run = hasattr(global_workflow_agent, 'run')
success = global_workflow_agent is not None and has_run
log_structured('info', f'Global agent set successfully: {success}', {
'has_run_method': has_run,
'agent_type': type(agent).__name__
})
return success
except Exception as e:
log_structured('error', f'Error setting global agent: {str(e)}')
return False
# Helper to set the global index
def set_global_index(index):
"""Set the global index instance."""
global global_index
global_index = index
return global_index is not None
# Helper to set the GraphRAG components
def set_graphrag_components(graph_store, property_graph_index, graphrag_query_engine):
"""Set the global GraphRAG components."""
global global_graph_store, global_property_graph_index, global_graphrag_query_engine
from utils import log_structured
global_graph_store = graph_store
global_property_graph_index = property_graph_index
global_graphrag_query_engine = graphrag_query_engine
components_set = (global_graph_store is not None and
global_property_graph_index is not None and
global_graphrag_query_engine is not None)
log_structured('info', f'GraphRAG components set successfully: {components_set}')
return components_set
# Helper to set/get GraphRAG initialization status
def set_graphrag_status(ready=None, initializing=None, error=None):
"""Update GraphRAG initialization status flags."""
global graphrag_ready, graphrag_initializing, graphrag_error
from utils import log_structured
if ready is not None:
graphrag_ready = ready
if initializing is not None:
graphrag_initializing = initializing
if error is not None:
graphrag_error = error
log_structured('info', 'GraphRAG status updated', {
'ready': graphrag_ready,
'initializing': graphrag_initializing,
'error': str(graphrag_error) if graphrag_error else None
})
def get_graphrag_status():
"""Get current GraphRAG initialization status."""
return {
'ready': graphrag_ready,
'initializing': graphrag_initializing,
'error': str(graphrag_error) if graphrag_error else None
}
# Helper to get agent status
def is_agent_available():
"""
Check if the global agent is available.
Uses direct reference to ensure we check the current module state.
"""
from utils import log_structured
# Access the module-level global_workflow_agent directly
# We are using the global_workflow_agent from this module, not importing it
# This avoids circular import issues and ensures we're checking the actual current value
# IMPORTANT: Declare as global to ensure we're checking the correct module-level variable
global global_workflow_agent
is_available = global_workflow_agent is not None and hasattr(global_workflow_agent, 'run')
# Add detailed logging
if not is_available:
if global_workflow_agent is None:
log_structured('warning', 'Agent availability check failed: global_workflow_agent is None')
elif not hasattr(global_workflow_agent, 'run'):
log_structured('warning', 'Agent availability check failed: global_workflow_agent has no run method')
else:
log_structured('debug', 'Agent availability check passed: agent exists and has run method')
return is_available