hp_chatbot/ai_core.py
michael 5554aa043f Add GraphRAG startup optimization: triple caching and background init
- Cache extracted triples to disk (neo4j_triples.pickle) so Neo4j can be
  repopulated without expensive LLM re-extraction on cold starts
- Split initialization into two phases: fast vector-only (~1-2 min) and
  background GraphRAG, so the server serves requests while GraphRAG loads
- Add GraphRAG status flags to shared_state for monitoring readiness
- Update /status endpoint to expose graphrag_ready/initializing/error
- Restructure main.py to use single event loop for background task support

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-02-23 17:45:05 -06:00

1272 lines
No EOL
66 KiB
Python

# hp_chatbot/ai_core.py
import os
import asyncio
import traceback
import uuid
import shutil
import re
import inspect
import json
from pathlib import Path
from typing import List, Dict, Any, Optional, Union
import httpx
import tiktoken
import nest_asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
# Import GraphRAG integration
from graph_rag_integration import (
create_graph_components,
create_graphrag_query_engine,
generate_final_answer,
GraphRAGExtractor,
GraphRAGStore,
GraphRAGQueryEngine
)
from llama_index.core import (
VectorStoreIndex,
Document as LlamaIndexDocument,
Settings,
get_response_synthesizer,
load_index_from_storage,
StorageContext,
SimpleDirectoryReader # Keep if needed for fallback/other uses
)
from llama_index.core.retrievers import VectorIndexAutoRetriever, VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.postprocessor import SimilarityPostprocessor
from llama_index.core.response_synthesizers import ResponseMode
from llama_index.llms.openai import OpenAI as LlamaOpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.callbacks import CallbackManager, TokenCountingHandler
from llama_index.core.tools import QueryEngineTool, ToolMetadata, BaseTool, ToolSelection, ToolOutput
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.llms import ChatMessage, LLM
from llama_index.core.vector_stores.types import MetadataInfo, VectorStoreInfo
from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser
from llama_index.core.agent.react.types import (
ActionReasoningStep,
ObservationReasoningStep,
ResponseReasoningStep,
BaseReasoningStep
)
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
Event,
)
from llama_index.core.node_parser import (
SentenceSplitter,
SemanticSplitterNodeParser,
)
from llama_parse import LlamaParse
# Import from our modules
from utils import logger, log_structured
from config import (
HP_DOCS_FOLDER, INDEX_PERSIST_PATH, IMAGES_DIRECTORY,
LLM_MODEL, EMBEDDING_MODEL, LLM_TEMPERATURE, LLM_TIMEOUT, AGENT_TIMEOUT,
TOOL_EXECUTION_TIMEOUT, SIMILARITY_TOP_K, SIMILARITY_CUTOFF,
LLAMA_PARSE_VENDOR_MODEL, LLAMA_PARSE_MAX_TIMEOUT,
NEO4J_URL, NEO4J_USERNAME, NEO4J_PASSWORD
)
nest_asyncio.apply()
# --- Global AI State ---
# Import shared state to ensure all modules access the same instances
from shared_state import (
global_index, global_workflow_agent,
global_graph_store, global_property_graph_index, global_graphrag_query_engine,
set_global_agent, set_global_index, set_graphrag_components,
set_graphrag_status
)
# --- Token Counter ---
try:
token_counter = TokenCountingHandler(
tokenizer=tiktoken.encoding_for_model(LLM_MODEL).encode
)
except Exception as e:
log_structured('warning', f'Could not initialize tiktoken for {LLM_MODEL}. Token counting may be inaccurate.', {'error': str(e)})
# Fallback tokenizer if needed, or disable token counting
token_counter = TokenCountingHandler(tokenizer=lambda text: list(text.encode("utf-8")))
# --- Custom ReAct Agent Workflow ---
class PrepEvent(Event): pass
class InputEvent(Event): input: list[ChatMessage]
class ToolCallEvent(Event): tool_calls: list[ToolSelection]
class FunctionOutputEvent(Event): output: ToolOutput
class CustomStartEvent(StartEvent): input_value: str = ""
class ReActAgent2(Workflow):
"""
Custom ReAct Agent implementation using LlamaIndex Workflows.
Includes timeout handling for LLM calls and tool execution.
"""
def __init__(
self,
llm: LLM,
tools: list[BaseTool],
memory: ChatMemoryBuffer,
timeout: float = AGENT_TIMEOUT, # Overall workflow timeout
llm_timeout: float = LLM_TIMEOUT, # Timeout for individual LLM calls
tool_timeout: float = TOOL_EXECUTION_TIMEOUT, # Timeout for individual tool calls
verbose: bool = True, # Add verbose flag
extra_context: str | None = None,
**kwargs: Any,
) -> None:
super().__init__(timeout=timeout, verbose=verbose, **kwargs) # Pass verbose to parent
self.llm = llm
self.tools = tools or []
self.memory = memory
self.formatter = ReActChatFormatter(context=extra_context or "")
self.output_parser = ReActOutputParser()
self.sources: List[ToolOutput] = [] # Store ToolOutput objects directly
self.llm_timeout = llm_timeout
self.tool_timeout = tool_timeout
self.verbose = verbose # Store verbose flag
@step
async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:
self.sources = [] # Clear sources for new message
# Store the initial input in the context
await ctx.set("user_input", ev) # Save the whole event
# We'll set a placeholder message for now and extract the actual input in the next step
user_msg = ChatMessage(role="user", content="Placeholder")
self.memory.put(user_msg)
await ctx.set("current_reasoning", [])
if self.verbose:
log_structured('debug', 'ReActAgent: Receiving new user message', {})
return PrepEvent()
@step
async def prepare_chat_history(self, ctx: Context, ev: PrepEvent) -> InputEvent:
# Get the real user input from the first step and update the memory
start_event = await ctx.get("user_input")
if hasattr(start_event, "input_value"):
# Try to access input_value directly if it exists
real_user_input = start_event.input_value
else:
# Otherwise, assume the actual input is the input value provided to run()
log_structured('info', 'ReActAgent: Extracting input from workflow input', {})
# Default to reasonable fallback if we can't extract it
real_user_input = "How can I help you with HP marketing guidelines?"
# Update the placeholder message with the real input
messages = self.memory.get()
if messages and messages[0].role == "user" and messages[0].content == "Placeholder":
# Replace the placeholder with real input
messages[0].content = str(real_user_input)
if self.verbose:
log_structured('debug', 'ReActAgent: Updated user message with real input', {'input': real_user_input})
# Format the chat history for the LLM
chat_history = self.memory.get()
current_reasoning = await ctx.get("current_reasoning", default=[])
llm_input = self.formatter.format(
self.tools, chat_history, current_reasoning=current_reasoning
)
if self.verbose:
log_structured('debug', 'ReActAgent: Prepared chat history for LLM', {'history_len': len(llm_input)})
return InputEvent(input=llm_input)
@step # The timeout is managed inside the method
async def handle_llm_input(self, ctx: Context, ev: InputEvent) -> Union[ToolCallEvent, StopEvent, PrepEvent]:
chat_history = ev.input
current_reasoning = await ctx.get("current_reasoning", default=[])
try:
if self.verbose:
log_structured('debug', 'ReActAgent: Sending request to LLM', {'history_len': len(chat_history)})
response = await asyncio.wait_for(
self.llm.achat(chat_history),
timeout=self.llm_timeout
)
reasoning_step = self.output_parser.parse(response.message.content)
current_reasoning.append(reasoning_step)
await ctx.set("current_reasoning", current_reasoning) # Update context state
if self.verbose:
log_structured('debug', 'ReActAgent: Received LLM response', {
'step_type': type(reasoning_step).__name__,
'is_done': getattr(reasoning_step, 'is_done', False),
'has_response': hasattr(reasoning_step, 'response'),
'has_action': hasattr(reasoning_step, 'action'),
'action': getattr(reasoning_step, 'action', None),
'raw_content_preview': response.message.content[:300]
})
if reasoning_step.is_done:
# Log what we're about to return as final response
if self.verbose:
log_structured('debug', 'ReActAgent: Processing final response', {
'raw_response_content': response.message.content,
'reasoning_step_response': str(getattr(reasoning_step, 'response', 'NO_RESPONSE_ATTR'))
})
# Clean the response to remove any thinking parts
response_text = str(reasoning_step.response)
# Check for common thinking patterns and remove them
import re # Import re within the function scope to ensure it's available
thinking_patterns = [
r'(?i)^.*?thinking:.*?\n', # Remove lines starting with "Thinking:"
r'(?i)<thinking>.*?</thinking>', # Remove XML-like thinking tags
r'(?i)\[thinking\].*?\[/thinking\]', # Remove bracket thinking tags
r'(?i)I\'m thinking:.*?\n', # Remove "I'm thinking:" sections
r'(?i)Let me think.*?\n', # Remove "Let me think" sections
r'(?i)Thought:.*?Action:.*?Action Input:.*', # Remove the specific pattern user reported
r'(?i)^Thought:.*', # Remove any line starting with "Thought:"
r'(?i)Action:.*?Action Input:.*', # Remove Action/Action Input patterns
r'(?i)Thought:.*?Answer:', # Remove "Thought: ... Answer:" pattern
r'(?i)^Answer:\s*' # Remove just the "Answer:" prefix
]
for pattern in thinking_patterns:
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
# Remove extra newlines that might be left after cleaning
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
response_text = response_text.strip()
# Final safety check - if response still looks like thinking, provide fallback
if re.search(r'(?i)^(Thought|Action|Observation):', response_text) or not response_text:
log_structured('warning', 'ReActAgent: Final response still contains thinking patterns, using fallback', {
'problematic_response': response_text[:200]
})
response_text = "I found information about your query in the HP marketing materials. Please let me know if you need more specific details."
self.memory.put(ChatMessage(role="assistant", content=response_text))
if self.verbose:
log_structured('info', 'ReActAgent: Final response generated', {
'response_preview': response_text[:100],
'sources_count': len(self.sources)
})
# Structure the final output
final_result = {
"response": response_text,
"sources": self.sources, # Pass the collected ToolOutput objects
"reasoning": current_reasoning
}
return StopEvent(result=final_result)
elif isinstance(reasoning_step, ActionReasoningStep):
if self.verbose:
log_structured('debug', 'ReActAgent: Action step identified', {
'action': reasoning_step.action,
'input': reasoning_step.action_input
})
# Ensure action_input is a dict
action_input_dict = reasoning_step.action_input or {}
if not isinstance(action_input_dict, dict):
log_structured('warning', 'ReActAgent: action_input is not a dict, attempting to parse', {'raw_input': action_input_dict})
try:
# Attempt basic parsing if it looks like JSON string
if isinstance(action_input_dict, str) and action_input_dict.strip().startswith('{'):
action_input_dict = json.loads(action_input_dict)
else: # Fallback: treat as a single 'query' arg if not dict/json
action_input_dict = {'query': action_input_dict}
except Exception as parse_err:
log_structured('error', 'ReActAgent: Failed to parse action_input', {'raw_input': action_input_dict, 'error': str(parse_err)})
action_input_dict = {'query': str(action_input_dict)} # Safest fallback
return ToolCallEvent(tool_calls=[
ToolSelection(
tool_id="tool_" + reasoning_step.action.replace(" ", "_"), # Simple ID generation
tool_name=reasoning_step.action,
tool_kwargs=action_input_dict
)
])
else:
# Handle other reasoning step types if necessary, or just proceed
if self.verbose:
log_structured('debug', 'ReActAgent: Non-action, non-done step encountered', {'step_type': type(reasoning_step).__name__})
return PrepEvent() # Continue the loop
except asyncio.TimeoutError:
error_msg = f"LLM call timed out after {self.llm_timeout} seconds."
log_structured('error', 'ReActAgent: LLM Timeout', {'timeout': self.llm_timeout})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
await ctx.set("current_reasoning", current_reasoning)
return PrepEvent() # Try again or maybe stop? Returning PrepEvent allows loop to continue.
except Exception as e:
error_msg = f"Error during LLM interaction or parsing: {str(e)}"
log_structured('error', 'ReActAgent: Error in handle_llm_input', {
'error': str(e), 'traceback': traceback.format_exc()
})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
await ctx.set("current_reasoning", current_reasoning)
# Decide whether to stop or continue after error
# Returning PrepEvent allows the agent to potentially describe the error or retry
return PrepEvent()
@step # The timeout is managed inside the method
async def handle_tool_calls(self, ctx: Context, ev: ToolCallEvent) -> PrepEvent:
tool_calls = ev.tool_calls
tools_by_name = {tool.metadata.name: tool for tool in self.tools} # Use .name property
current_reasoning = await ctx.get("current_reasoning", default=[])
if self.verbose:
log_structured('debug', 'ReActAgent: Handling tool calls', {
'call_count': len(tool_calls),
'tool_names': [tc.tool_name for tc in tool_calls]
})
for tool_call in tool_calls:
tool = tools_by_name.get(tool_call.tool_name)
if not tool:
error_msg = f"Tool '{tool_call.tool_name}' not found."
log_structured('error', 'ReActAgent: Tool not found', {'tool_name': tool_call.tool_name})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
continue # Skip to next tool call
try:
tool_kwargs = tool_call.tool_kwargs or {} # Ensure kwargs is a dict
if self.verbose:
log_structured('debug', 'ReActAgent: Executing tool', {
'tool_name': tool_call.tool_name,
'kwargs': tool_kwargs
})
# Execute tool potentially in thread pool with timeout
tool_func = partial(tool, **tool_kwargs)
tool_output: ToolOutput
if inspect.iscoroutinefunction(tool.call): # Check if tool's call method is async
tool_output = await asyncio.wait_for(
tool.acall(**tool_kwargs), # Use acall for async tools
timeout=self.tool_timeout
)
else:
# Run sync tool in thread pool
with ThreadPoolExecutor() as executor:
future = executor.submit(tool_func) # tool_func already has kwargs via partial
tool_output = await asyncio.to_thread(future.result, timeout=self.tool_timeout)
if self.verbose:
log_structured('debug', 'ReActAgent: Tool execution successful', {
'tool_name': tool_call.tool_name,
'output_type': type(tool_output).__name__,
'output_content_preview': str(tool_output.content)[:100] if tool_output else "N/A"
})
# Store the raw ToolOutput object which contains content, raw_output, metadata etc.
self.sources.append(tool_output)
# Add observation step with the tool's string content
observation_content = str(tool_output.content) if tool_output and tool_output.content is not None else "Tool executed successfully but returned no content."
current_reasoning.append(ObservationReasoningStep(observation=observation_content))
if self.verbose:
log_structured('debug', 'ReActAgent: Added observation to reasoning', {
'tool_name': tool_call.tool_name,
'observation_preview': observation_content[:200]
})
# --- Detailed logging for image metadata ---
if hasattr(tool_output, 'raw_output') and hasattr(tool_output.raw_output, 'source_nodes'):
for node_with_score in tool_output.raw_output.source_nodes:
node = getattr(node_with_score, 'node', None)
if node and hasattr(node, 'metadata'):
if 'image_paths' in node.metadata and node.metadata['image_paths']:
log_structured('debug', 'ReActAgent: Tool output node contains image paths', {
'tool_name': tool_call.tool_name,
'node_id': getattr(node, 'id_', 'N/A'),
'image_paths': node.metadata['image_paths']
})
except asyncio.TimeoutError:
error_msg = f"Tool '{tool_call.tool_name}' timed out after {self.tool_timeout} seconds."
log_structured('error', 'ReActAgent: Tool Timeout', {'tool_name': tool_call.tool_name, 'timeout': self.tool_timeout})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
except Exception as e:
error_msg = f"Error calling tool '{tool_call.tool_name}': {str(e)}"
log_structured('error', 'ReActAgent: Tool execution error', {
'tool_name': tool_call.tool_name, 'error': str(e), 'traceback': traceback.format_exc()
})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
# Update reasoning steps in context before returning
await ctx.set("current_reasoning", current_reasoning)
return PrepEvent() # Always return PrepEvent to continue the loop
# --- Document Processing ---
async def process_documents_in_directory(directory: str, session_id: Optional[str] = None) -> List[LlamaIndexDocument]:
"""
Process all documents in a directory using LlamaParse for text and images,
preserving chunk-level metadata including specific image paths per chunk.
Includes detailed logging for image metadata assignment.
Args:
directory: The path to the directory containing documents.
session_id: Optional session identifier for context.
Returns:
A list of LlamaIndexDocument objects representing the processed chunks,
ready for indexing. Returns an empty list on failure or if no documents
are processed.
"""
parser_text = None
parser_images = None
try:
# Ensure images directory exists
os.makedirs(IMAGES_DIRECTORY, exist_ok=True)
log_structured('info', f'Starting document processing for directory: {directory}', {
'session_id': session_id,
'directory_name': os.path.basename(directory),
'directory_exists': os.path.exists(directory),
'files_present': os.listdir(directory) if os.path.exists(directory) else 'N/A'
})
if not os.path.exists(directory) or not os.path.isdir(directory):
log_structured('error', f'Directory not found or is not a directory: {directory}', {'session_id': session_id})
return []
if not os.listdir(directory):
log_structured('warning', f'Directory is empty, skipping: {directory}', {'session_id': session_id})
return []
# --- LlamaParse Initialization ---
try:
# Separate clients recommended if making concurrent calls, but can share if sequential
custom_client_text = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT)
custom_client_images = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT)
parser_text = LlamaParse(
result_type="markdown",
add_page_breaks=False,
system_prompt="Extract all content including text, tables, and formatting. Preserve structure.",
premium_mode=False, # Set based on your LlamaCloud plan
max_timeout=LLAMA_PARSE_MAX_TIMEOUT,
custom_client=custom_client_text,
verbose=True # Enable verbose logging for LlamaParse
)
parser_images = LlamaParse(
result_type="markdown", # Less critical here, parsing for images primarily
add_page_breaks=False,
system_prompt="Generate page images of the document.",
use_vendor_multimodal_model=True, # Assuming you want LlamaParse's image gen
vendor_multimodal_model_name=LLAMA_PARSE_VENDOR_MODEL,
premium_mode=False, # Set based on your LlamaCloud plan
max_timeout=LLAMA_PARSE_MAX_TIMEOUT,
custom_client=custom_client_images,
verbose=True # Enable verbose logging for LlamaParse
)
log_structured('info', 'Initialized LlamaParse (text & image capability)', {'session_id': session_id})
except Exception as parser_err:
log_structured('error', f'Error initializing dual LlamaParse capability: {parser_err}. Falling back to text-only.', {
'session_id': session_id, 'traceback': traceback.format_exc()
})
if parser_text and hasattr(parser_text, 'aclose'): await parser_text.aclose()
if parser_images and hasattr(parser_images, 'aclose'): await parser_images.aclose()
parser_images = None # Disable image parsing
try:
# Ensure client is fresh for fallback
custom_client_text = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT)
parser_text = LlamaParse(
result_type="markdown",
language="en",
add_page_breaks=False,
merge_consecutive=True,
max_timeout=LLAMA_PARSE_MAX_TIMEOUT,
custom_client=custom_client_text,
verbose=True
)
log_structured('info', 'Initialized LlamaParse with fallback text-only parser.', {'session_id': session_id})
except Exception as fallback_err:
log_structured('critical', f'FATAL: Error initializing fallback LlamaParse: {fallback_err}', {
'session_id': session_id, 'traceback': traceback.format_exc()
})
if parser_text and hasattr(parser_text, 'aclose'): await parser_text.aclose() # Cleanup
return []
if not parser_text:
log_structured('critical', 'FATAL: Text parser (parser_text) could not be initialized.')
return []
file_extractor = {
".pdf": parser_text, ".docx": parser_text, ".doc": parser_text,
".pptx": parser_text, ".ppt": parser_text, ".txt": parser_text,
# Add others if your LlamaParse supports them
# ".html": parser_text, ".md": parser_text,
}
supported_extensions = list(file_extractor.keys())
# --- Custom File Reader Async Function ---
async def custom_file_reader():
processed_chunks: List[LlamaIndexDocument] = []
images_by_file_and_page: Dict[str, Dict[int, str]] = {} # filename -> { page_num_1_based -> image_filename }
all_files_paths = []
log_structured('debug', f'Searching for supported files in: {directory}', {'extensions': supported_extensions, 'session_id': session_id})
for ext in supported_extensions:
all_files_paths.extend(list(Path(directory).glob(f"*{ext}")))
all_files_paths.extend(list(Path(directory).glob(f"*{ext.upper()}"))) # Case-insensitive glob
all_files_paths = sorted(list(set(all_files_paths))) # Remove duplicates and sort
log_structured('info', f'Found {len(all_files_paths)} supported files in {directory}', {'session_id': session_id, 'files': [f.name for f in all_files_paths]})
if not all_files_paths:
log_structured('warning', f'No supported files found in directory: {directory}', {'session_id': session_id})
return []
for file_path in all_files_paths:
file_path_str = str(file_path)
filename = file_path.name
log_structured('info', f'Processing file: {filename}', {'session_id': session_id, 'path': file_path_str})
ext = file_path.suffix.lower()
current_text_parser = file_extractor.get(ext)
if not current_text_parser:
log_structured('warning', f'Skipping file {filename}: No parser defined for extension {ext}.', {'session_id': session_id})
continue
# 1. Extract Text Chunks
text_chunks: List[LlamaIndexDocument] = []
try:
log_structured('debug', f'Calling parser_text.aload_data for {filename}', {'session_id': session_id})
# LlamaParse aload_data returns List[Document]
text_chunks = await current_text_parser.aload_data(file_path_str)
log_structured('info', f'Extracted {len(text_chunks)} text chunks from {filename}', {'session_id': session_id})
if not text_chunks:
log_structured('warning', f'No text chunks extracted from {filename}. File might be empty or parser issue.', {'session_id': session_id})
# Continue to attempt image extraction for this file
except Exception as text_err:
log_structured('error', f'Error extracting text from {filename}', {
'session_id': session_id, 'error': str(text_err), 'traceback': traceback.format_exc()
})
continue # Skip to next file if text extraction fails critically
# 2. Extract Images (if parser_images is available)
file_page_images: Dict[int, str] = {} # { page_num_1_based: image_filename }
if parser_images:
try:
log_structured('debug', f'Attempting image extraction for: {filename}', {'session_id': session_id})
temp_image_dir = IMAGES_DIRECTORY / f"temp_img_{filename}_{uuid.uuid4().hex[:8]}"
os.makedirs(temp_image_dir, exist_ok=True)
# Use synchronous methods for simplicity here, or manage async complexity
# NOTE: Check LlamaParse documentation for the best way to get images.
# This assumes get_json_result and get_images are available and work this way.
# If these are async, you'll need `await parser_images.aget_json_result(...)` etc.
# Running sync methods in executor to avoid blocking event loop
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
md_json_objs = await loop.run_in_executor(pool, parser_images.get_json_result, file_path_str)
log_structured('debug', f'Got JSON result for images from {filename}', {'session_id': session_id})
image_dicts = await loop.run_in_executor(pool, parser_images.get_images, md_json_objs, temp_image_dir)
log_structured('info', f'LlamaParse reported {len(image_dicts)} images potentially extracted for {filename}', {'session_id': session_id})
saved_image_count = 0
for idx, img_info in enumerate(image_dicts):
# LlamaParse might return 0-based 'page' index
page_index_0_based = img_info.get('page', idx) # Default to list index if 'page' missing
page_num_1_based = page_index_0_based + 1 # Convert to 1-based for consistency
# log_structured('debug', f'Processing image info for {filename}, index {idx}', {'img_info': img_info, 'page_1_based': page_num_1_based})
if 'path' in img_info and img_info['path'] and os.path.exists(img_info['path']):
source_img_path = img_info['path']
image_filename = f"{os.path.splitext(filename)[0]}_page{page_num_1_based}_{uuid.uuid4().hex[:6]}.png"
dest_path = IMAGES_DIRECTORY / image_filename
try:
shutil.copy2(source_img_path, dest_path) # copy2 preserves metadata
file_page_images[page_num_1_based] = image_filename # Map 1-based page num
saved_image_count += 1
log_structured('debug', f'Saved image for {filename} page {page_num_1_based} (1-based) to {dest_path}', {'session_id': session_id})
except Exception as copy_err:
log_structured('error', f'Error copying image for {filename} page {page_num_1_based}', {'session_id': session_id, 'error': str(copy_err)})
else:
log_structured('warning', f'No valid image path found or file missing for image index {idx} (reported page {page_index_0_based}) from {filename}. Path: {img_info.get("path", "N/A")}', {'session_id': session_id})
log_structured('info', f'Successfully saved {saved_image_count} images for {filename}', {'session_id': session_id})
images_by_file_and_page[filename] = file_page_images
# Clean up temporary directory
try:
shutil.rmtree(temp_image_dir)
except Exception as cleanup_err:
log_structured('error', f'Error cleaning up temp image dir {temp_image_dir}', {'session_id': session_id, 'error': str(cleanup_err)})
except Exception as img_err:
log_structured('error', f'Error during image extraction pipeline for {filename}', {
'session_id': session_id, 'error': str(img_err), 'traceback': traceback.format_exc()
})
images_by_file_and_page[filename] = {} # Ensure empty dict on error
else:
log_structured('info', f'Image parsing skipped for {filename} (parser_images is None).', {'session_id': session_id})
images_by_file_and_page[filename] = {}
# 3. Add Metadata to Each Text Chunk
log_structured('debug', f'Assigning metadata and images to {len(text_chunks)} chunks for {filename}', {'session_id': session_id})
for chunk_index, chunk in enumerate(text_chunks):
if not hasattr(chunk, 'metadata') or chunk.metadata is None: chunk.metadata = {}
chunk.metadata['filename'] = filename
chunk.metadata['file_path'] = file_path_str
chunk.metadata['source'] = "document" # Or more specific if needed
chunk.metadata['type'] = os.path.basename(directory) # e.g., 'brief', 'supporting'
chunk.metadata['chunk_index'] = chunk_index
# --- Determine 1-based Page Number ---
page_num_1_based: Optional[int] = None
page_source_key: Optional[str] = None
# Priority: LlamaParse metadata keys
page_key_options = ['page_label', 'page_number', 'page'] # Check common keys
for key in page_key_options:
if key in chunk.metadata:
try:
page_num_1_based = int(chunk.metadata[key]) # Assume it's 1-based from LlamaParse
if page_num_1_based > 0: # Basic sanity check
page_source_key = key
chunk.metadata["source_page"] = page_num_1_based # Store consistently
log_structured('debug', f"Found page {page_num_1_based} from metadata key '{key}'", {'filename': filename, 'chunk': chunk_index, 'session_id': session_id})
break
else:
page_num_1_based = None # Ignore non-positive page numbers
except (ValueError, TypeError): pass # Ignore if not integer
# Fallback: If no page found in metadata, try approximation (less reliable)
# This part is optional and might be inaccurate. Consider removing if metadata is reliable.
if page_num_1_based is None:
# Simple fallback: use chunk index as a proxy (very rough)
page_num_1_based = chunk_index + 1 # Treat first chunk as page 1 approx.
chunk.metadata["source_page"] = page_num_1_based
page_source_key = "position_fallback"
log_structured('debug', f"Using position fallback for page: {page_num_1_based}", {'filename': filename, 'chunk': chunk_index, 'session_id': session_id})
# --- Precise Page-Based Image Assignment ---
chunk.metadata['image_paths'] = [] # Initialize/reset
specific_image_filename = None
if page_num_1_based is not None:
file_images_dict = images_by_file_and_page.get(filename, {})
if file_images_dict:
# Try exact match first
if page_num_1_based in file_images_dict:
specific_image_filename = file_images_dict[page_num_1_based]
chunk.metadata['image_match_type'] = 'exact'
log_structured('debug', f'Exact page match for image page {page_num_1_based}', {'filename': filename, 'chunk': chunk_index, 'image': specific_image_filename})
else:
# Optional: Find nearest page if exact match fails (can be noisy)
# nearest_page = min(file_images_dict.keys(), key=lambda x: abs(x - page_num_1_based))
# if abs(nearest_page - page_num_1_based) <= 1: # Only if very close? Threshold needed.
# specific_image_filename = file_images_dict[nearest_page]
# chunk.metadata['image_match_type'] = 'nearest'
# chunk.metadata["image_nearest_page"] = nearest_page
# log_structured('debug', f'Nearest page match: requested {page_num_1_based}, using {nearest_page}', {'filename': filename, 'chunk': chunk_index, 'image': specific_image_filename})
pass # Default: No image if no exact match
if specific_image_filename:
chunk.metadata['image_paths'] = [specific_image_filename] # Store as list
# Final metadata log before adding chunk
# log_structured('debug', "Final chunk metadata check", {
# 'session_id': session_id, 'filename': filename, 'chunk': chunk_index,
# 'page': chunk.metadata.get('source_page'), 'page_src': page_source_key,
# 'image': chunk.metadata.get('image_paths'), 'match_type': chunk.metadata.get('image_match_type'),
# 'keys': list(chunk.metadata.keys())
# })
processed_chunks.append(chunk)
# --- End of file loop ---
log_structured('info', f'Finished processing files. Total chunks: {len(processed_chunks)}', {'session_id': session_id})
return processed_chunks
# --- Execute the async file reader ---
final_documents: List[LlamaIndexDocument] = await custom_file_reader()
log_structured('info', 'Directory processing complete.', {
'session_id': session_id, 'directory': directory, 'total_chunks_generated': len(final_documents)
})
# Log sample metadata for verification
# for i, doc in enumerate(final_documents[:2]):
# log_structured('debug', f'Final Doc Sample #{i} Metadata', {'metadata': doc.metadata})
return final_documents
except Exception as e:
log_structured('critical', 'FATAL Error in process_documents_in_directory', {
'session_id': session_id, 'directory': directory, 'error': str(e), 'traceback': traceback.format_exc()
})
return [] # Return empty list on major failure
finally:
# Ensure async clients are closed
log_structured('debug', 'Closing LlamaParse clients.', {'session_id': session_id})
if parser_text and hasattr(parser_text, 'aclose'):
try: await parser_text.aclose()
except Exception as close_err: log_structured('error', 'Error closing parser_text client', {'error': str(close_err)})
if parser_images and hasattr(parser_images, 'aclose'):
try: await parser_images.aclose()
except Exception as close_err: log_structured('error', 'Error closing parser_images client', {'error': str(close_err)})
# --- GraphRAG Tool (module-level for reuse across init phases) ---
class GraphRAGTool(BaseTool):
"""Tool that queries using both vector and graph-based retrieval."""
def __init__(self, query_engine):
self.query_engine = query_engine
self._metadata = ToolMetadata(
name="answerquestionswith_graphrag",
description="USE THIS TOOL FOR ALL QUERIES - Queries HP marketing materials using both vector and graph-based retrieval for more comprehensive answers. Use this for complex questions that need context from multiple related documents."
)
@property
def metadata(self):
return self._metadata
def __call__(self, query_str: str) -> ToolOutput:
"""Run query through GraphRAG and generate synthesized answer."""
from shared_state import global_graphrag_query_engine
log_structured('info', 'GraphRAG Tool: Starting dual retrieval', {'query': query_str})
try:
# Get both vector and GraphRAG retrieval results
retrieval_result = self.query_engine.custom_query(query_str)
# Generate synthesized answer
final_answer = generate_final_answer(query_str, retrieval_result, self.query_engine.llm)
# Prepare the tool output
log_message = {
'vector_context_length': len(retrieval_result.get('vector_context', '')),
'graphrag_context_length': len(retrieval_result.get('graphrag_context', '')),
'vector_nodes_count': len(retrieval_result.get('vector_nodes', [])),
'community_ids': retrieval_result.get('community_ids', [])
}
log_structured('info', 'GraphRAG Tool: Retrieval complete', log_message)
# Extract vector nodes and create a properly structured raw_output that includes source_nodes
vector_nodes = retrieval_result.get('vector_nodes', [])
class NodeWrapper:
def __init__(self, nodes):
self.source_nodes = nodes
tool_output = ToolOutput(
content=final_answer,
tool_name="GraphRAG",
raw_output=NodeWrapper(vector_nodes),
raw_input={"query": query_str}
)
log_structured('debug', 'GraphRAG Tool: Including image metadata in response',
{'node_count': len(vector_nodes)})
return tool_output
except Exception as graphrag_err:
log_structured('error', f'Error in GraphRAG tool: {graphrag_err}',
{'traceback': traceback.format_exc()})
return ToolOutput(
content=f"I encountered an error while retrieving information: {str(graphrag_err)}. Please try again or use the standard query tool.",
tool_name="GraphRAG",
raw_input={"query": query_str},
raw_output={"error": str(graphrag_err)}
)
async def acall(self, input: str) -> ToolOutput:
"""Async version of __call__."""
return self.__call__(input)
# --- Phase 1: Vector Index Initialization (fast path) ---
async def initialize_vector_index() -> bool:
"""Initialize vector index and agent with vector-only tools.
This is the fast path (~1-2 min) that makes the server usable for
vector-only queries. GraphRAG components are added later by
initialize_graphrag_components().
"""
try:
# --- Configure LLM and Embedding Model ---
openai_key = os.environ.get("OPENAI_API_KEY", "")
if not openai_key:
log_structured('critical', 'No OpenAI API key provided. Make sure OPENAI_API_KEY is set in your .env file.')
return False
key_preview = openai_key[:4] + "..." if len(openai_key) > 4 else "invalid"
log_structured('info', f'Using OpenAI API key starting with: {key_preview}')
try:
llm = LlamaOpenAI(
model=LLM_MODEL,
temperature=LLM_TEMPERATURE,
timeout=LLM_TIMEOUT
)
test_prompt = "Say 'API key is working' if you can read this."
_ = llm.complete(test_prompt)
log_structured('info', 'Successfully tested LLM API connection')
except Exception as llm_err:
log_structured('critical', f'Failed to initialize LLM with provided API key: {str(llm_err)}')
return False
embed_model = OpenAIEmbedding(model=EMBEDDING_MODEL)
# --- Configure Global Settings ---
Settings.llm = llm
Settings.embed_model = embed_model
Settings.callback_manager = CallbackManager([token_counter])
node_parser = SemanticSplitterNodeParser(
buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model
)
Settings.node_parser = node_parser
# --- Load or Build Vector Index ---
if INDEX_PERSIST_PATH.exists():
log_structured('info', f'Loading existing index from {INDEX_PERSIST_PATH}')
storage_context = StorageContext.from_defaults(persist_dir=str(INDEX_PERSIST_PATH))
index = load_index_from_storage(storage_context)
set_global_index(index)
log_structured('info', 'Successfully loaded existing index')
else:
log_structured('info', f'No index found at {INDEX_PERSIST_PATH}. Creating new index from {HP_DOCS_FOLDER}')
if not HP_DOCS_FOLDER.exists() or not any(HP_DOCS_FOLDER.iterdir()):
log_structured('error', f'HP documents folder is missing or empty: {HP_DOCS_FOLDER}')
return False
documents = await process_documents_in_directory(str(HP_DOCS_FOLDER), session_id="global_init")
if not documents:
log_structured('error', f'No documents processed from {HP_DOCS_FOLDER}. Index creation aborted.')
return False
log_structured('info', 'Creating vector store index...', {
'document_count': len(documents),
'sample_doc_metadata': documents[0].metadata if documents else None
})
index = VectorStoreIndex.from_documents(
documents=documents,
show_progress=True,
)
set_global_index(index)
index.storage_context.persist(persist_dir=str(INDEX_PERSIST_PATH))
log_structured('info', f'Index created and persisted to {INDEX_PERSIST_PATH}')
# --- Create Retriever and Query Engine (vector only) ---
vector_store_info = VectorStoreInfo(
content_info="HP marketing reference materials, including brand guidelines and supporting documents.",
metadata_info=[
MetadataInfo(name="filename", type="str", description="Filename of the source document"),
MetadataInfo(name="source_page", type="int", description="Approximate page number in the source document"),
MetadataInfo(name="image_paths", type="list[str]", description="List of image filenames associated with this chunk"),
],
)
all_retriever = VectorIndexAutoRetriever(
index=index,
vector_store_info=vector_store_info,
similarity_top_k=SIMILARITY_TOP_K,
verbose=True
)
all_query_engine = RetrieverQueryEngine.from_args(
retriever=all_retriever,
response_synthesizer=get_response_synthesizer(
response_mode=ResponseMode.COMPACT,
),
node_postprocessors=[
SimilarityPostprocessor(similarity_cutoff=SIMILARITY_CUTOFF),
],
)
# --- Create Agent with vector-only tools ---
query_engine_tools_react = [
QueryEngineTool(
query_engine=all_query_engine,
metadata=ToolMetadata(
name="answer_questions_from_hp_marketing_materials",
description="USE THIS TOOL FOR ALL QUERIES - Queries HP marketing materials (brand guidelines, etc.) to answer questions about guidelines, workflows, and processes. Use this for specific lookups in the knowledge base."
),
),
]
try:
agent = ReActAgent2(
llm=llm,
tools=query_engine_tools_react,
memory=ChatMemoryBuffer.from_defaults(llm=llm, token_limit=4096),
verbose=True,
timeout=AGENT_TIMEOUT,
llm_timeout=LLM_TIMEOUT,
tool_timeout=TOOL_EXECUTION_TIMEOUT
)
set_global_agent(agent)
log_structured('info', 'Agent initialized successfully with vector-only tools')
except Exception as agent_err:
log_structured('critical', f'Failed to initialize agent: {str(agent_err)}', {'error': str(agent_err)})
return False
# --- Attach simple_run method ---
try:
async def simple_run(query_text):
"""Simple version that doesn't rely on complex workflow steps."""
from shared_state import global_workflow_agent
if not global_workflow_agent:
log_structured('critical', 'Agent is None in simple_run - this should never happen')
return {
"response": "The AI system is currently unavailable. Please try again later.",
"sources": [],
"reasoning": []
}
try:
global_workflow_agent.sources = []
user_msg = ChatMessage(role="user", content=str(query_text))
global_workflow_agent.memory.put(user_msg)
chat_history = global_workflow_agent.memory.get()
llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history)
response = await asyncio.wait_for(
global_workflow_agent.llm.achat(llm_input),
timeout=global_workflow_agent.llm_timeout
)
reasoning_step = global_workflow_agent.output_parser.parse(response.message.content)
log_structured('debug', 'Parsed reasoning step', {
'step_type': type(reasoning_step).__name__,
'has_response': hasattr(reasoning_step, 'response'),
'has_action': hasattr(reasoning_step, 'action'),
'action': getattr(reasoning_step, 'action', None),
'action_input': getattr(reasoning_step, 'action_input', None),
'raw_content': response.message.content[:200]
})
if hasattr(reasoning_step, 'response') and reasoning_step.response:
log_structured('debug', 'LLM provided direct response, forcing tool usage for image retrieval')
graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
if graphrag_tool:
try:
tool_output = await asyncio.wait_for(
graphrag_tool.acall(input=str(query_text)),
timeout=global_workflow_agent.tool_timeout
)
global_workflow_agent.sources.append(tool_output)
response_text = str(tool_output.content) if tool_output and tool_output.content else str(reasoning_step.response)
assistant_msg = ChatMessage(role="assistant", content=response_text)
global_workflow_agent.memory.put(assistant_msg)
return {
"response": response_text,
"sources": global_workflow_agent.sources,
"reasoning": [reasoning_step]
}
except Exception as e:
log_structured('error', f'Error forcing GraphRAG tool execution: {str(e)}')
response_text = str(reasoning_step.response)
import re
thinking_patterns = [
r'(?i)^.*?thinking:.*?\n',
r'(?i)<thinking>.*?</thinking>',
r'(?i)\[thinking\].*?\[/thinking\]',
r'(?i)I\'m thinking:.*?\n',
r'(?i)Let me think.*?\n',
r'(?i)Thought:.*?Answer:',
r'(?i)^Answer:\s*'
]
for pattern in thinking_patterns:
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
response_text = response_text.strip()
assistant_msg = ChatMessage(role="assistant", content=response_text)
global_workflow_agent.memory.put(assistant_msg)
return {
"response": response_text,
"sources": global_workflow_agent.sources,
"reasoning": [reasoning_step]
}
else:
if hasattr(reasoning_step, 'action') and reasoning_step.action:
tool_name = reasoning_step.action
action_input = reasoning_step.action_input or {}
if not isinstance(action_input, dict):
try:
if isinstance(action_input, str) and action_input.strip().startswith('{'):
action_input = json.loads(action_input)
else:
action_input = {'query': action_input}
except:
action_input = {'query': str(action_input)}
tool = next((t for t in global_workflow_agent.tools if t.metadata.name == tool_name), None)
if not tool:
error_msg = f"Tool '{tool_name}' not found. Available tools: {[t.metadata.name for t in global_workflow_agent.tools]}"
log_structured('error', 'Tool not found in simple_run', {'tool_name': tool_name, 'available_tools': [t.metadata.name for t in global_workflow_agent.tools]})
return {
"response": f"I tried to use a tool called '{tool_name}' but it's not available. Please try rephrasing your query.",
"sources": [],
"reasoning": [reasoning_step]
}
if tool:
try:
tool_output = await asyncio.wait_for(
tool.acall(**action_input),
timeout=global_workflow_agent.tool_timeout
)
global_workflow_agent.sources.append(tool_output)
observation = str(tool_output.content if tool_output and tool_output.content is not None else "No content")
follow_up_msg = ChatMessage(role="user", content=f"Here is the result: {observation}\\nPlease provide a final response based on this information.")
global_workflow_agent.memory.put(follow_up_msg)
chat_history = global_workflow_agent.memory.get()
llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history)
final_response = await asyncio.wait_for(
global_workflow_agent.llm.achat(llm_input),
timeout=global_workflow_agent.llm_timeout
)
response_text = str(final_response.message.content)
import re
thinking_patterns = [
r'(?i)^.*?thinking:.*?\n',
r'(?i)<thinking>.*?</thinking>',
r'(?i)\[thinking\].*?\[/thinking\]',
r'(?i)I\'m thinking:.*?\n',
r'(?i)Let me think.*?\n',
r'(?i)Thought:.*?Answer:',
r'(?i)^Answer:\s*'
]
for pattern in thinking_patterns:
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
response_text = response_text.strip()
global_workflow_agent.memory.put(ChatMessage(role="assistant", content=response_text))
return {
"response": response_text,
"sources": global_workflow_agent.sources,
"reasoning": [reasoning_step]
}
except Exception as e:
log_structured('error', f'Error executing tool: {str(e)}', {'traceback': traceback.format_exc()})
error_response = f"I encountered an error while processing your query: {str(e)}"
return {
"response": error_response,
"sources": [],
"reasoning": [reasoning_step]
}
if isinstance(reasoning_step, ActionReasoningStep):
log_structured('warning', 'Returning raw thinking due to failed action execution', {
'action': reasoning_step.action,
'action_input': reasoning_step.action_input
})
return {
"response": "I apologize, but I encountered an issue while processing your query. Please try asking your question in a different way.",
"sources": [],
"reasoning": [reasoning_step]
}
else:
fallback_response = "I wasn't able to find a specific answer to your question. Please try rephrasing your query."
return {
"response": fallback_response,
"sources": global_workflow_agent.sources,
"reasoning": [reasoning_step] if reasoning_step else []
}
except Exception as e:
log_structured('error', f'Error in simple_run: {str(e)}', {'traceback': traceback.format_exc()})
error_response = f"I encountered an error while processing your query: {str(e)}"
return {
"response": error_response,
"sources": [],
"reasoning": []
}
from shared_state import global_workflow_agent as current_agent
if current_agent is None:
log_structured('critical', 'Cannot set run method - global_workflow_agent is None')
return False
current_agent.run = simple_run
if not hasattr(current_agent, 'run'):
log_structured('critical', 'Failed to attach run method to agent')
return False
log_structured('info', 'Successfully attached run method to agent')
log_structured('info', 'Testing agent functionality...')
if current_agent and hasattr(current_agent, 'memory') and hasattr(current_agent.memory, 'reset'):
current_agent.memory.reset()
log_structured('info', 'Agent memory reset test successful')
else:
if not current_agent:
log_structured('error', 'Agent memory test failed: agent is None')
elif not hasattr(current_agent, 'memory'):
log_structured('error', 'Agent memory test failed: agent has no memory attribute')
elif not hasattr(current_agent.memory, 'reset'):
log_structured('error', 'Agent memory test failed: agent.memory has no reset method')
else:
log_structured('error', 'Agent memory test failed: unknown reason')
return False
except Exception as method_err:
log_structured('critical', f'Failed to set up agent run method: {str(method_err)}',
{'error': str(method_err), 'traceback': traceback.format_exc()})
return False
log_structured('info', 'Vector index and agent initialized successfully (vector-only mode).')
return True
except Exception as e:
log_structured('critical', 'Vector index/agent initialization failed', {
'error': str(e), 'traceback': traceback.format_exc()
})
return False
# --- Phase 2: GraphRAG Background Initialization ---
async def initialize_graphrag_components() -> bool:
"""Initialize GraphRAG components in the background.
Connects to Neo4j, restores/extracts triples, builds communities,
creates GraphRAG query engine, and dynamically adds the GraphRAG tool
to the existing agent's tools list.
"""
set_graphrag_status(initializing=True, ready=False, error=None)
try:
import shared_state
current_agent = shared_state.global_workflow_agent
index = shared_state.global_index
if index is None:
raise RuntimeError("Vector index must be initialized first")
if current_agent is None:
raise RuntimeError("Agent must be initialized first")
llm = Settings.llm
# Connect to Neo4j
from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore
log_structured('info', f'Connecting to Neo4j at {NEO4J_URL}')
property_graph_store = Neo4jPropertyGraphStore(
username=NEO4J_USERNAME,
password=NEO4J_PASSWORD,
url=NEO4J_URL
)
log_structured('info', 'Successfully connected to Neo4j database')
# Check Neo4j state
temp_graph_store = GraphRAGStore(property_graph_store)
triplets = temp_graph_store.get_triplets()
neo4j_has_data = len(triplets) > 0
if neo4j_has_data:
log_structured('info', f'Neo4j contains {len(triplets)} triplets. Using existing data.')
graph_store, property_graph_index = create_graph_components(
llm=llm,
force_reindex=False
)
else:
# Neo4j is empty — get nodes from vector index for potential extraction
log_structured('info', 'Neo4j is empty. Getting nodes from vector index for GraphRAG.')
vector_nodes = []
if hasattr(index, 'docstore') and index.docstore:
vector_nodes = list(index.docstore.docs.values())
log_structured('info', f'Retrieved {len(vector_nodes)} nodes from index docstore')
if not vector_nodes:
raise ValueError("No nodes available for GraphRAG indexing")
# create_graph_components will try cache restore first, then LLM extraction
graph_store, property_graph_index = create_graph_components(
llm=llm,
nodes=vector_nodes,
max_paths_per_chunk=10,
force_reindex=False # Allow cache restore path
)
# Ensure communities are built
if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built:
graph_store.build_communities()
# Create GraphRAG query engine
vector_retriever = VectorIndexRetriever(
index=index,
similarity_top_k=SIMILARITY_TOP_K
)
graphrag_query_engine = create_graphrag_query_engine(
vector_retriever=vector_retriever,
graph_store=graph_store,
llm=llm,
similarity_top_k=SIMILARITY_TOP_K
)
# Store in shared state
set_graphrag_components(
graph_store=graph_store,
property_graph_index=property_graph_index,
graphrag_query_engine=graphrag_query_engine
)
# Dynamically add GraphRAG tool to the existing agent
graphrag_tool = GraphRAGTool(graphrag_query_engine)
live_agent = shared_state.global_workflow_agent
if live_agent is not None:
live_agent.tools.append(graphrag_tool)
log_structured('info', 'Added GraphRAG tool to existing agent', {
'total_tools': len(live_agent.tools),
'tool_names': [t.metadata.name for t in live_agent.tools]
})
set_graphrag_status(ready=True, initializing=False)
log_structured('info', 'GraphRAG components initialized successfully')
return True
except Exception as e:
log_structured('error', f'GraphRAG background initialization failed: {e}',
{'traceback': traceback.format_exc()})
set_graphrag_status(ready=False, initializing=False, error=e)
return False
# --- Global Index Initialization (backward-compatible wrapper) ---
async def initialize_global_index() -> bool:
"""Backward-compatible wrapper: initializes everything sequentially."""
vector_success = await initialize_vector_index()
if not vector_success:
return False
graphrag_success = await initialize_graphrag_components()
if not graphrag_success:
log_structured('warning', 'GraphRAG init failed, but vector search is available')
return True