- Cache extracted triples to disk (neo4j_triples.pickle) so Neo4j can be repopulated without expensive LLM re-extraction on cold starts - Split initialization into two phases: fast vector-only (~1-2 min) and background GraphRAG, so the server serves requests while GraphRAG loads - Add GraphRAG status flags to shared_state for monitoring readiness - Update /status endpoint to expose graphrag_ready/initializing/error - Restructure main.py to use single event loop for background task support Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
133 lines
No EOL
4.6 KiB
Python
133 lines
No EOL
4.6 KiB
Python
# hp_chatbot/shared_state.py
|
|
"""
|
|
Shared state module to store global variables that need to be
|
|
accessible across different modules and ensures proper synchronization.
|
|
"""
|
|
|
|
# Store the AI agent here so it's properly shared across modules
|
|
global_workflow_agent = None
|
|
|
|
# Store the global index here
|
|
global_index = None
|
|
|
|
# Store GraphRAG components
|
|
global_graph_store = None
|
|
global_property_graph_index = None
|
|
global_graphrag_query_engine = None
|
|
|
|
# GraphRAG initialization status
|
|
graphrag_ready = False
|
|
graphrag_initializing = False
|
|
graphrag_error = None
|
|
|
|
# Helper to set the global agent
|
|
def set_global_agent(agent):
|
|
"""Set the global agent instance."""
|
|
global global_workflow_agent
|
|
from utils import log_structured
|
|
|
|
if agent is None:
|
|
log_structured('error', 'Attempted to set global_workflow_agent to None')
|
|
return False
|
|
|
|
try:
|
|
# Check that the agent has a run method
|
|
if not hasattr(agent, 'run'):
|
|
log_structured('error', 'Agent being set does not have a run method')
|
|
return False
|
|
|
|
# Set the global agent
|
|
global_workflow_agent = agent
|
|
|
|
# Verify it was set correctly
|
|
has_run = hasattr(global_workflow_agent, 'run')
|
|
success = global_workflow_agent is not None and has_run
|
|
|
|
log_structured('info', f'Global agent set successfully: {success}', {
|
|
'has_run_method': has_run,
|
|
'agent_type': type(agent).__name__
|
|
})
|
|
|
|
return success
|
|
except Exception as e:
|
|
log_structured('error', f'Error setting global agent: {str(e)}')
|
|
return False
|
|
|
|
# Helper to set the global index
|
|
def set_global_index(index):
|
|
"""Set the global index instance."""
|
|
global global_index
|
|
global_index = index
|
|
return global_index is not None
|
|
|
|
# Helper to set the GraphRAG components
|
|
def set_graphrag_components(graph_store, property_graph_index, graphrag_query_engine):
|
|
"""Set the global GraphRAG components."""
|
|
global global_graph_store, global_property_graph_index, global_graphrag_query_engine
|
|
|
|
from utils import log_structured
|
|
|
|
global_graph_store = graph_store
|
|
global_property_graph_index = property_graph_index
|
|
global_graphrag_query_engine = graphrag_query_engine
|
|
|
|
components_set = (global_graph_store is not None and
|
|
global_property_graph_index is not None and
|
|
global_graphrag_query_engine is not None)
|
|
|
|
log_structured('info', f'GraphRAG components set successfully: {components_set}')
|
|
return components_set
|
|
|
|
def set_graphrag_status(ready=None, initializing=None, error=None):
|
|
"""Update GraphRAG initialization status flags."""
|
|
global graphrag_ready, graphrag_initializing, graphrag_error
|
|
from utils import log_structured
|
|
|
|
if ready is not None:
|
|
graphrag_ready = ready
|
|
if initializing is not None:
|
|
graphrag_initializing = initializing
|
|
if error is not None:
|
|
graphrag_error = error
|
|
|
|
log_structured('info', 'GraphRAG status updated', {
|
|
'ready': graphrag_ready,
|
|
'initializing': graphrag_initializing,
|
|
'error': str(graphrag_error) if graphrag_error else None
|
|
})
|
|
|
|
def get_graphrag_status():
|
|
"""Get current GraphRAG initialization status."""
|
|
return {
|
|
'ready': graphrag_ready,
|
|
'initializing': graphrag_initializing,
|
|
'error': str(graphrag_error) if graphrag_error else None
|
|
}
|
|
|
|
# Helper to get agent status
|
|
def is_agent_available():
|
|
"""
|
|
Check if the global agent is available.
|
|
Uses direct reference to ensure we check the current module state.
|
|
"""
|
|
from utils import log_structured
|
|
|
|
# Access the module-level global_workflow_agent directly
|
|
# We are using the global_workflow_agent from this module, not importing it
|
|
# This avoids circular import issues and ensures we're checking the actual current value
|
|
|
|
# IMPORTANT: Declare as global to ensure we're checking the correct module-level variable
|
|
global global_workflow_agent
|
|
|
|
is_available = global_workflow_agent is not None and hasattr(global_workflow_agent, 'run')
|
|
|
|
# Add detailed logging
|
|
if not is_available:
|
|
if global_workflow_agent is None:
|
|
log_structured('warning', 'Agent availability check failed: global_workflow_agent is None')
|
|
elif not hasattr(global_workflow_agent, 'run'):
|
|
log_structured('warning', 'Agent availability check failed: global_workflow_agent has no run method')
|
|
else:
|
|
log_structured('debug', 'Agent availability check passed: agent exists and has run method')
|
|
|
|
return is_available |