- Cache extracted triples to disk (neo4j_triples.pickle) so Neo4j can be repopulated without expensive LLM re-extraction on cold starts - Split initialization into two phases: fast vector-only (~1-2 min) and background GraphRAG, so the server serves requests while GraphRAG loads - Add GraphRAG status flags to shared_state for monitoring readiness - Update /status endpoint to expose graphrag_ready/initializing/error - Restructure main.py to use single event loop for background task support Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1272 lines
No EOL
66 KiB
Python
1272 lines
No EOL
66 KiB
Python
# hp_chatbot/ai_core.py
|
|
import os
|
|
import asyncio
|
|
import traceback
|
|
import uuid
|
|
import shutil
|
|
import re
|
|
import inspect
|
|
import json
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional, Union
|
|
|
|
import httpx
|
|
import tiktoken
|
|
import nest_asyncio
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from functools import partial
|
|
|
|
# Import GraphRAG integration
|
|
from graph_rag_integration import (
|
|
create_graph_components,
|
|
create_graphrag_query_engine,
|
|
generate_final_answer,
|
|
GraphRAGExtractor,
|
|
GraphRAGStore,
|
|
GraphRAGQueryEngine
|
|
)
|
|
|
|
from llama_index.core import (
|
|
VectorStoreIndex,
|
|
Document as LlamaIndexDocument,
|
|
Settings,
|
|
get_response_synthesizer,
|
|
load_index_from_storage,
|
|
StorageContext,
|
|
SimpleDirectoryReader # Keep if needed for fallback/other uses
|
|
)
|
|
from llama_index.core.retrievers import VectorIndexAutoRetriever, VectorIndexRetriever
|
|
from llama_index.core.query_engine import RetrieverQueryEngine
|
|
from llama_index.core.postprocessor import SimilarityPostprocessor
|
|
from llama_index.core.response_synthesizers import ResponseMode
|
|
from llama_index.llms.openai import OpenAI as LlamaOpenAI
|
|
from llama_index.embeddings.openai import OpenAIEmbedding
|
|
from llama_index.core.callbacks import CallbackManager, TokenCountingHandler
|
|
from llama_index.core.tools import QueryEngineTool, ToolMetadata, BaseTool, ToolSelection, ToolOutput
|
|
from llama_index.core.memory import ChatMemoryBuffer
|
|
from llama_index.core.llms import ChatMessage, LLM
|
|
from llama_index.core.vector_stores.types import MetadataInfo, VectorStoreInfo
|
|
from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser
|
|
from llama_index.core.agent.react.types import (
|
|
ActionReasoningStep,
|
|
ObservationReasoningStep,
|
|
ResponseReasoningStep,
|
|
BaseReasoningStep
|
|
)
|
|
from llama_index.core.workflow import (
|
|
Context,
|
|
Workflow,
|
|
StartEvent,
|
|
StopEvent,
|
|
step,
|
|
Event,
|
|
)
|
|
from llama_index.core.node_parser import (
|
|
SentenceSplitter,
|
|
SemanticSplitterNodeParser,
|
|
)
|
|
from llama_parse import LlamaParse
|
|
|
|
# Import from our modules
|
|
from utils import logger, log_structured
|
|
from config import (
|
|
HP_DOCS_FOLDER, INDEX_PERSIST_PATH, IMAGES_DIRECTORY,
|
|
LLM_MODEL, EMBEDDING_MODEL, LLM_TEMPERATURE, LLM_TIMEOUT, AGENT_TIMEOUT,
|
|
TOOL_EXECUTION_TIMEOUT, SIMILARITY_TOP_K, SIMILARITY_CUTOFF,
|
|
LLAMA_PARSE_VENDOR_MODEL, LLAMA_PARSE_MAX_TIMEOUT,
|
|
NEO4J_URL, NEO4J_USERNAME, NEO4J_PASSWORD
|
|
)
|
|
|
|
nest_asyncio.apply()
|
|
|
|
# --- Global AI State ---
|
|
# Import shared state to ensure all modules access the same instances
|
|
from shared_state import (
|
|
global_index, global_workflow_agent,
|
|
global_graph_store, global_property_graph_index, global_graphrag_query_engine,
|
|
set_global_agent, set_global_index, set_graphrag_components,
|
|
set_graphrag_status
|
|
)
|
|
|
|
|
|
# --- Token Counter ---
|
|
try:
|
|
token_counter = TokenCountingHandler(
|
|
tokenizer=tiktoken.encoding_for_model(LLM_MODEL).encode
|
|
)
|
|
except Exception as e:
|
|
log_structured('warning', f'Could not initialize tiktoken for {LLM_MODEL}. Token counting may be inaccurate.', {'error': str(e)})
|
|
# Fallback tokenizer if needed, or disable token counting
|
|
token_counter = TokenCountingHandler(tokenizer=lambda text: list(text.encode("utf-8")))
|
|
|
|
|
|
# --- Custom ReAct Agent Workflow ---
|
|
class PrepEvent(Event): pass
|
|
class InputEvent(Event): input: list[ChatMessage]
|
|
class ToolCallEvent(Event): tool_calls: list[ToolSelection]
|
|
class FunctionOutputEvent(Event): output: ToolOutput
|
|
class CustomStartEvent(StartEvent): input_value: str = ""
|
|
|
|
|
|
class ReActAgent2(Workflow):
|
|
"""
|
|
Custom ReAct Agent implementation using LlamaIndex Workflows.
|
|
Includes timeout handling for LLM calls and tool execution.
|
|
"""
|
|
def __init__(
|
|
self,
|
|
llm: LLM,
|
|
tools: list[BaseTool],
|
|
memory: ChatMemoryBuffer,
|
|
timeout: float = AGENT_TIMEOUT, # Overall workflow timeout
|
|
llm_timeout: float = LLM_TIMEOUT, # Timeout for individual LLM calls
|
|
tool_timeout: float = TOOL_EXECUTION_TIMEOUT, # Timeout for individual tool calls
|
|
verbose: bool = True, # Add verbose flag
|
|
extra_context: str | None = None,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
super().__init__(timeout=timeout, verbose=verbose, **kwargs) # Pass verbose to parent
|
|
self.llm = llm
|
|
self.tools = tools or []
|
|
self.memory = memory
|
|
self.formatter = ReActChatFormatter(context=extra_context or "")
|
|
self.output_parser = ReActOutputParser()
|
|
self.sources: List[ToolOutput] = [] # Store ToolOutput objects directly
|
|
self.llm_timeout = llm_timeout
|
|
self.tool_timeout = tool_timeout
|
|
self.verbose = verbose # Store verbose flag
|
|
|
|
@step
|
|
async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:
|
|
self.sources = [] # Clear sources for new message
|
|
# Store the initial input in the context
|
|
await ctx.set("user_input", ev) # Save the whole event
|
|
|
|
# We'll set a placeholder message for now and extract the actual input in the next step
|
|
user_msg = ChatMessage(role="user", content="Placeholder")
|
|
self.memory.put(user_msg)
|
|
await ctx.set("current_reasoning", [])
|
|
if self.verbose:
|
|
log_structured('debug', 'ReActAgent: Receiving new user message', {})
|
|
return PrepEvent()
|
|
|
|
@step
|
|
async def prepare_chat_history(self, ctx: Context, ev: PrepEvent) -> InputEvent:
|
|
# Get the real user input from the first step and update the memory
|
|
start_event = await ctx.get("user_input")
|
|
if hasattr(start_event, "input_value"):
|
|
# Try to access input_value directly if it exists
|
|
real_user_input = start_event.input_value
|
|
else:
|
|
# Otherwise, assume the actual input is the input value provided to run()
|
|
log_structured('info', 'ReActAgent: Extracting input from workflow input', {})
|
|
# Default to reasonable fallback if we can't extract it
|
|
real_user_input = "How can I help you with HP marketing guidelines?"
|
|
|
|
# Update the placeholder message with the real input
|
|
messages = self.memory.get()
|
|
if messages and messages[0].role == "user" and messages[0].content == "Placeholder":
|
|
# Replace the placeholder with real input
|
|
messages[0].content = str(real_user_input)
|
|
if self.verbose:
|
|
log_structured('debug', 'ReActAgent: Updated user message with real input', {'input': real_user_input})
|
|
|
|
# Format the chat history for the LLM
|
|
chat_history = self.memory.get()
|
|
current_reasoning = await ctx.get("current_reasoning", default=[])
|
|
llm_input = self.formatter.format(
|
|
self.tools, chat_history, current_reasoning=current_reasoning
|
|
)
|
|
if self.verbose:
|
|
log_structured('debug', 'ReActAgent: Prepared chat history for LLM', {'history_len': len(llm_input)})
|
|
return InputEvent(input=llm_input)
|
|
|
|
@step # The timeout is managed inside the method
|
|
async def handle_llm_input(self, ctx: Context, ev: InputEvent) -> Union[ToolCallEvent, StopEvent, PrepEvent]:
|
|
chat_history = ev.input
|
|
current_reasoning = await ctx.get("current_reasoning", default=[])
|
|
|
|
try:
|
|
if self.verbose:
|
|
log_structured('debug', 'ReActAgent: Sending request to LLM', {'history_len': len(chat_history)})
|
|
|
|
response = await asyncio.wait_for(
|
|
self.llm.achat(chat_history),
|
|
timeout=self.llm_timeout
|
|
)
|
|
reasoning_step = self.output_parser.parse(response.message.content)
|
|
current_reasoning.append(reasoning_step)
|
|
await ctx.set("current_reasoning", current_reasoning) # Update context state
|
|
|
|
if self.verbose:
|
|
log_structured('debug', 'ReActAgent: Received LLM response', {
|
|
'step_type': type(reasoning_step).__name__,
|
|
'is_done': getattr(reasoning_step, 'is_done', False),
|
|
'has_response': hasattr(reasoning_step, 'response'),
|
|
'has_action': hasattr(reasoning_step, 'action'),
|
|
'action': getattr(reasoning_step, 'action', None),
|
|
'raw_content_preview': response.message.content[:300]
|
|
})
|
|
|
|
if reasoning_step.is_done:
|
|
# Log what we're about to return as final response
|
|
if self.verbose:
|
|
log_structured('debug', 'ReActAgent: Processing final response', {
|
|
'raw_response_content': response.message.content,
|
|
'reasoning_step_response': str(getattr(reasoning_step, 'response', 'NO_RESPONSE_ATTR'))
|
|
})
|
|
|
|
# Clean the response to remove any thinking parts
|
|
response_text = str(reasoning_step.response)
|
|
|
|
# Check for common thinking patterns and remove them
|
|
import re # Import re within the function scope to ensure it's available
|
|
thinking_patterns = [
|
|
r'(?i)^.*?thinking:.*?\n', # Remove lines starting with "Thinking:"
|
|
r'(?i)<thinking>.*?</thinking>', # Remove XML-like thinking tags
|
|
r'(?i)\[thinking\].*?\[/thinking\]', # Remove bracket thinking tags
|
|
r'(?i)I\'m thinking:.*?\n', # Remove "I'm thinking:" sections
|
|
r'(?i)Let me think.*?\n', # Remove "Let me think" sections
|
|
r'(?i)Thought:.*?Action:.*?Action Input:.*', # Remove the specific pattern user reported
|
|
r'(?i)^Thought:.*', # Remove any line starting with "Thought:"
|
|
r'(?i)Action:.*?Action Input:.*', # Remove Action/Action Input patterns
|
|
r'(?i)Thought:.*?Answer:', # Remove "Thought: ... Answer:" pattern
|
|
r'(?i)^Answer:\s*' # Remove just the "Answer:" prefix
|
|
]
|
|
for pattern in thinking_patterns:
|
|
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
|
|
|
|
# Remove extra newlines that might be left after cleaning
|
|
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
|
|
response_text = response_text.strip()
|
|
|
|
# Final safety check - if response still looks like thinking, provide fallback
|
|
if re.search(r'(?i)^(Thought|Action|Observation):', response_text) or not response_text:
|
|
log_structured('warning', 'ReActAgent: Final response still contains thinking patterns, using fallback', {
|
|
'problematic_response': response_text[:200]
|
|
})
|
|
response_text = "I found information about your query in the HP marketing materials. Please let me know if you need more specific details."
|
|
|
|
self.memory.put(ChatMessage(role="assistant", content=response_text))
|
|
if self.verbose:
|
|
log_structured('info', 'ReActAgent: Final response generated', {
|
|
'response_preview': response_text[:100],
|
|
'sources_count': len(self.sources)
|
|
})
|
|
|
|
# Structure the final output
|
|
final_result = {
|
|
"response": response_text,
|
|
"sources": self.sources, # Pass the collected ToolOutput objects
|
|
"reasoning": current_reasoning
|
|
}
|
|
return StopEvent(result=final_result)
|
|
|
|
elif isinstance(reasoning_step, ActionReasoningStep):
|
|
if self.verbose:
|
|
log_structured('debug', 'ReActAgent: Action step identified', {
|
|
'action': reasoning_step.action,
|
|
'input': reasoning_step.action_input
|
|
})
|
|
# Ensure action_input is a dict
|
|
action_input_dict = reasoning_step.action_input or {}
|
|
if not isinstance(action_input_dict, dict):
|
|
log_structured('warning', 'ReActAgent: action_input is not a dict, attempting to parse', {'raw_input': action_input_dict})
|
|
try:
|
|
# Attempt basic parsing if it looks like JSON string
|
|
if isinstance(action_input_dict, str) and action_input_dict.strip().startswith('{'):
|
|
action_input_dict = json.loads(action_input_dict)
|
|
else: # Fallback: treat as a single 'query' arg if not dict/json
|
|
action_input_dict = {'query': action_input_dict}
|
|
except Exception as parse_err:
|
|
log_structured('error', 'ReActAgent: Failed to parse action_input', {'raw_input': action_input_dict, 'error': str(parse_err)})
|
|
action_input_dict = {'query': str(action_input_dict)} # Safest fallback
|
|
|
|
return ToolCallEvent(tool_calls=[
|
|
ToolSelection(
|
|
tool_id="tool_" + reasoning_step.action.replace(" ", "_"), # Simple ID generation
|
|
tool_name=reasoning_step.action,
|
|
tool_kwargs=action_input_dict
|
|
)
|
|
])
|
|
else:
|
|
# Handle other reasoning step types if necessary, or just proceed
|
|
if self.verbose:
|
|
log_structured('debug', 'ReActAgent: Non-action, non-done step encountered', {'step_type': type(reasoning_step).__name__})
|
|
return PrepEvent() # Continue the loop
|
|
|
|
except asyncio.TimeoutError:
|
|
error_msg = f"LLM call timed out after {self.llm_timeout} seconds."
|
|
log_structured('error', 'ReActAgent: LLM Timeout', {'timeout': self.llm_timeout})
|
|
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
|
|
await ctx.set("current_reasoning", current_reasoning)
|
|
return PrepEvent() # Try again or maybe stop? Returning PrepEvent allows loop to continue.
|
|
except Exception as e:
|
|
error_msg = f"Error during LLM interaction or parsing: {str(e)}"
|
|
log_structured('error', 'ReActAgent: Error in handle_llm_input', {
|
|
'error': str(e), 'traceback': traceback.format_exc()
|
|
})
|
|
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
|
|
await ctx.set("current_reasoning", current_reasoning)
|
|
# Decide whether to stop or continue after error
|
|
# Returning PrepEvent allows the agent to potentially describe the error or retry
|
|
return PrepEvent()
|
|
|
|
|
|
@step # The timeout is managed inside the method
|
|
async def handle_tool_calls(self, ctx: Context, ev: ToolCallEvent) -> PrepEvent:
|
|
tool_calls = ev.tool_calls
|
|
tools_by_name = {tool.metadata.name: tool for tool in self.tools} # Use .name property
|
|
current_reasoning = await ctx.get("current_reasoning", default=[])
|
|
|
|
if self.verbose:
|
|
log_structured('debug', 'ReActAgent: Handling tool calls', {
|
|
'call_count': len(tool_calls),
|
|
'tool_names': [tc.tool_name for tc in tool_calls]
|
|
})
|
|
|
|
for tool_call in tool_calls:
|
|
tool = tools_by_name.get(tool_call.tool_name)
|
|
if not tool:
|
|
error_msg = f"Tool '{tool_call.tool_name}' not found."
|
|
log_structured('error', 'ReActAgent: Tool not found', {'tool_name': tool_call.tool_name})
|
|
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
|
|
continue # Skip to next tool call
|
|
|
|
try:
|
|
tool_kwargs = tool_call.tool_kwargs or {} # Ensure kwargs is a dict
|
|
if self.verbose:
|
|
log_structured('debug', 'ReActAgent: Executing tool', {
|
|
'tool_name': tool_call.tool_name,
|
|
'kwargs': tool_kwargs
|
|
})
|
|
|
|
# Execute tool potentially in thread pool with timeout
|
|
tool_func = partial(tool, **tool_kwargs)
|
|
tool_output: ToolOutput
|
|
if inspect.iscoroutinefunction(tool.call): # Check if tool's call method is async
|
|
tool_output = await asyncio.wait_for(
|
|
tool.acall(**tool_kwargs), # Use acall for async tools
|
|
timeout=self.tool_timeout
|
|
)
|
|
else:
|
|
# Run sync tool in thread pool
|
|
with ThreadPoolExecutor() as executor:
|
|
future = executor.submit(tool_func) # tool_func already has kwargs via partial
|
|
tool_output = await asyncio.to_thread(future.result, timeout=self.tool_timeout)
|
|
|
|
|
|
if self.verbose:
|
|
log_structured('debug', 'ReActAgent: Tool execution successful', {
|
|
'tool_name': tool_call.tool_name,
|
|
'output_type': type(tool_output).__name__,
|
|
'output_content_preview': str(tool_output.content)[:100] if tool_output else "N/A"
|
|
})
|
|
|
|
# Store the raw ToolOutput object which contains content, raw_output, metadata etc.
|
|
self.sources.append(tool_output)
|
|
|
|
# Add observation step with the tool's string content
|
|
observation_content = str(tool_output.content) if tool_output and tool_output.content is not None else "Tool executed successfully but returned no content."
|
|
current_reasoning.append(ObservationReasoningStep(observation=observation_content))
|
|
|
|
if self.verbose:
|
|
log_structured('debug', 'ReActAgent: Added observation to reasoning', {
|
|
'tool_name': tool_call.tool_name,
|
|
'observation_preview': observation_content[:200]
|
|
})
|
|
|
|
# --- Detailed logging for image metadata ---
|
|
if hasattr(tool_output, 'raw_output') and hasattr(tool_output.raw_output, 'source_nodes'):
|
|
for node_with_score in tool_output.raw_output.source_nodes:
|
|
node = getattr(node_with_score, 'node', None)
|
|
if node and hasattr(node, 'metadata'):
|
|
if 'image_paths' in node.metadata and node.metadata['image_paths']:
|
|
log_structured('debug', 'ReActAgent: Tool output node contains image paths', {
|
|
'tool_name': tool_call.tool_name,
|
|
'node_id': getattr(node, 'id_', 'N/A'),
|
|
'image_paths': node.metadata['image_paths']
|
|
})
|
|
|
|
|
|
except asyncio.TimeoutError:
|
|
error_msg = f"Tool '{tool_call.tool_name}' timed out after {self.tool_timeout} seconds."
|
|
log_structured('error', 'ReActAgent: Tool Timeout', {'tool_name': tool_call.tool_name, 'timeout': self.tool_timeout})
|
|
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
|
|
except Exception as e:
|
|
error_msg = f"Error calling tool '{tool_call.tool_name}': {str(e)}"
|
|
log_structured('error', 'ReActAgent: Tool execution error', {
|
|
'tool_name': tool_call.tool_name, 'error': str(e), 'traceback': traceback.format_exc()
|
|
})
|
|
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
|
|
|
|
# Update reasoning steps in context before returning
|
|
await ctx.set("current_reasoning", current_reasoning)
|
|
return PrepEvent() # Always return PrepEvent to continue the loop
|
|
|
|
|
|
# --- Document Processing ---
|
|
async def process_documents_in_directory(directory: str, session_id: Optional[str] = None) -> List[LlamaIndexDocument]:
|
|
"""
|
|
Process all documents in a directory using LlamaParse for text and images,
|
|
preserving chunk-level metadata including specific image paths per chunk.
|
|
Includes detailed logging for image metadata assignment.
|
|
|
|
Args:
|
|
directory: The path to the directory containing documents.
|
|
session_id: Optional session identifier for context.
|
|
|
|
Returns:
|
|
A list of LlamaIndexDocument objects representing the processed chunks,
|
|
ready for indexing. Returns an empty list on failure or if no documents
|
|
are processed.
|
|
"""
|
|
parser_text = None
|
|
parser_images = None
|
|
try:
|
|
# Ensure images directory exists
|
|
os.makedirs(IMAGES_DIRECTORY, exist_ok=True)
|
|
|
|
log_structured('info', f'Starting document processing for directory: {directory}', {
|
|
'session_id': session_id,
|
|
'directory_name': os.path.basename(directory),
|
|
'directory_exists': os.path.exists(directory),
|
|
'files_present': os.listdir(directory) if os.path.exists(directory) else 'N/A'
|
|
})
|
|
|
|
if not os.path.exists(directory) or not os.path.isdir(directory):
|
|
log_structured('error', f'Directory not found or is not a directory: {directory}', {'session_id': session_id})
|
|
return []
|
|
if not os.listdir(directory):
|
|
log_structured('warning', f'Directory is empty, skipping: {directory}', {'session_id': session_id})
|
|
return []
|
|
|
|
# --- LlamaParse Initialization ---
|
|
try:
|
|
# Separate clients recommended if making concurrent calls, but can share if sequential
|
|
custom_client_text = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT)
|
|
custom_client_images = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT)
|
|
|
|
parser_text = LlamaParse(
|
|
result_type="markdown",
|
|
add_page_breaks=False,
|
|
system_prompt="Extract all content including text, tables, and formatting. Preserve structure.",
|
|
premium_mode=False, # Set based on your LlamaCloud plan
|
|
max_timeout=LLAMA_PARSE_MAX_TIMEOUT,
|
|
custom_client=custom_client_text,
|
|
verbose=True # Enable verbose logging for LlamaParse
|
|
)
|
|
parser_images = LlamaParse(
|
|
result_type="markdown", # Less critical here, parsing for images primarily
|
|
add_page_breaks=False,
|
|
system_prompt="Generate page images of the document.",
|
|
use_vendor_multimodal_model=True, # Assuming you want LlamaParse's image gen
|
|
vendor_multimodal_model_name=LLAMA_PARSE_VENDOR_MODEL,
|
|
premium_mode=False, # Set based on your LlamaCloud plan
|
|
max_timeout=LLAMA_PARSE_MAX_TIMEOUT,
|
|
custom_client=custom_client_images,
|
|
verbose=True # Enable verbose logging for LlamaParse
|
|
)
|
|
log_structured('info', 'Initialized LlamaParse (text & image capability)', {'session_id': session_id})
|
|
except Exception as parser_err:
|
|
log_structured('error', f'Error initializing dual LlamaParse capability: {parser_err}. Falling back to text-only.', {
|
|
'session_id': session_id, 'traceback': traceback.format_exc()
|
|
})
|
|
if parser_text and hasattr(parser_text, 'aclose'): await parser_text.aclose()
|
|
if parser_images and hasattr(parser_images, 'aclose'): await parser_images.aclose()
|
|
parser_images = None # Disable image parsing
|
|
try:
|
|
# Ensure client is fresh for fallback
|
|
custom_client_text = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT)
|
|
parser_text = LlamaParse(
|
|
result_type="markdown",
|
|
language="en",
|
|
add_page_breaks=False,
|
|
merge_consecutive=True,
|
|
max_timeout=LLAMA_PARSE_MAX_TIMEOUT,
|
|
custom_client=custom_client_text,
|
|
verbose=True
|
|
)
|
|
log_structured('info', 'Initialized LlamaParse with fallback text-only parser.', {'session_id': session_id})
|
|
except Exception as fallback_err:
|
|
log_structured('critical', f'FATAL: Error initializing fallback LlamaParse: {fallback_err}', {
|
|
'session_id': session_id, 'traceback': traceback.format_exc()
|
|
})
|
|
if parser_text and hasattr(parser_text, 'aclose'): await parser_text.aclose() # Cleanup
|
|
return []
|
|
|
|
if not parser_text:
|
|
log_structured('critical', 'FATAL: Text parser (parser_text) could not be initialized.')
|
|
return []
|
|
|
|
file_extractor = {
|
|
".pdf": parser_text, ".docx": parser_text, ".doc": parser_text,
|
|
".pptx": parser_text, ".ppt": parser_text, ".txt": parser_text,
|
|
# Add others if your LlamaParse supports them
|
|
# ".html": parser_text, ".md": parser_text,
|
|
}
|
|
supported_extensions = list(file_extractor.keys())
|
|
|
|
# --- Custom File Reader Async Function ---
|
|
async def custom_file_reader():
|
|
processed_chunks: List[LlamaIndexDocument] = []
|
|
images_by_file_and_page: Dict[str, Dict[int, str]] = {} # filename -> { page_num_1_based -> image_filename }
|
|
|
|
all_files_paths = []
|
|
log_structured('debug', f'Searching for supported files in: {directory}', {'extensions': supported_extensions, 'session_id': session_id})
|
|
for ext in supported_extensions:
|
|
all_files_paths.extend(list(Path(directory).glob(f"*{ext}")))
|
|
all_files_paths.extend(list(Path(directory).glob(f"*{ext.upper()}"))) # Case-insensitive glob
|
|
all_files_paths = sorted(list(set(all_files_paths))) # Remove duplicates and sort
|
|
|
|
log_structured('info', f'Found {len(all_files_paths)} supported files in {directory}', {'session_id': session_id, 'files': [f.name for f in all_files_paths]})
|
|
if not all_files_paths:
|
|
log_structured('warning', f'No supported files found in directory: {directory}', {'session_id': session_id})
|
|
return []
|
|
|
|
for file_path in all_files_paths:
|
|
file_path_str = str(file_path)
|
|
filename = file_path.name
|
|
log_structured('info', f'Processing file: {filename}', {'session_id': session_id, 'path': file_path_str})
|
|
|
|
ext = file_path.suffix.lower()
|
|
current_text_parser = file_extractor.get(ext)
|
|
if not current_text_parser:
|
|
log_structured('warning', f'Skipping file {filename}: No parser defined for extension {ext}.', {'session_id': session_id})
|
|
continue
|
|
|
|
# 1. Extract Text Chunks
|
|
text_chunks: List[LlamaIndexDocument] = []
|
|
try:
|
|
log_structured('debug', f'Calling parser_text.aload_data for {filename}', {'session_id': session_id})
|
|
# LlamaParse aload_data returns List[Document]
|
|
text_chunks = await current_text_parser.aload_data(file_path_str)
|
|
log_structured('info', f'Extracted {len(text_chunks)} text chunks from {filename}', {'session_id': session_id})
|
|
if not text_chunks:
|
|
log_structured('warning', f'No text chunks extracted from {filename}. File might be empty or parser issue.', {'session_id': session_id})
|
|
# Continue to attempt image extraction for this file
|
|
except Exception as text_err:
|
|
log_structured('error', f'Error extracting text from {filename}', {
|
|
'session_id': session_id, 'error': str(text_err), 'traceback': traceback.format_exc()
|
|
})
|
|
continue # Skip to next file if text extraction fails critically
|
|
|
|
# 2. Extract Images (if parser_images is available)
|
|
file_page_images: Dict[int, str] = {} # { page_num_1_based: image_filename }
|
|
if parser_images:
|
|
try:
|
|
log_structured('debug', f'Attempting image extraction for: {filename}', {'session_id': session_id})
|
|
temp_image_dir = IMAGES_DIRECTORY / f"temp_img_{filename}_{uuid.uuid4().hex[:8]}"
|
|
os.makedirs(temp_image_dir, exist_ok=True)
|
|
|
|
# Use synchronous methods for simplicity here, or manage async complexity
|
|
# NOTE: Check LlamaParse documentation for the best way to get images.
|
|
# This assumes get_json_result and get_images are available and work this way.
|
|
# If these are async, you'll need `await parser_images.aget_json_result(...)` etc.
|
|
# Running sync methods in executor to avoid blocking event loop
|
|
loop = asyncio.get_running_loop()
|
|
with ThreadPoolExecutor() as pool:
|
|
md_json_objs = await loop.run_in_executor(pool, parser_images.get_json_result, file_path_str)
|
|
log_structured('debug', f'Got JSON result for images from {filename}', {'session_id': session_id})
|
|
image_dicts = await loop.run_in_executor(pool, parser_images.get_images, md_json_objs, temp_image_dir)
|
|
|
|
log_structured('info', f'LlamaParse reported {len(image_dicts)} images potentially extracted for {filename}', {'session_id': session_id})
|
|
|
|
saved_image_count = 0
|
|
for idx, img_info in enumerate(image_dicts):
|
|
# LlamaParse might return 0-based 'page' index
|
|
page_index_0_based = img_info.get('page', idx) # Default to list index if 'page' missing
|
|
page_num_1_based = page_index_0_based + 1 # Convert to 1-based for consistency
|
|
|
|
# log_structured('debug', f'Processing image info for {filename}, index {idx}', {'img_info': img_info, 'page_1_based': page_num_1_based})
|
|
|
|
if 'path' in img_info and img_info['path'] and os.path.exists(img_info['path']):
|
|
source_img_path = img_info['path']
|
|
image_filename = f"{os.path.splitext(filename)[0]}_page{page_num_1_based}_{uuid.uuid4().hex[:6]}.png"
|
|
dest_path = IMAGES_DIRECTORY / image_filename
|
|
|
|
try:
|
|
shutil.copy2(source_img_path, dest_path) # copy2 preserves metadata
|
|
file_page_images[page_num_1_based] = image_filename # Map 1-based page num
|
|
saved_image_count += 1
|
|
log_structured('debug', f'Saved image for {filename} page {page_num_1_based} (1-based) to {dest_path}', {'session_id': session_id})
|
|
except Exception as copy_err:
|
|
log_structured('error', f'Error copying image for {filename} page {page_num_1_based}', {'session_id': session_id, 'error': str(copy_err)})
|
|
else:
|
|
log_structured('warning', f'No valid image path found or file missing for image index {idx} (reported page {page_index_0_based}) from {filename}. Path: {img_info.get("path", "N/A")}', {'session_id': session_id})
|
|
|
|
log_structured('info', f'Successfully saved {saved_image_count} images for {filename}', {'session_id': session_id})
|
|
images_by_file_and_page[filename] = file_page_images
|
|
|
|
# Clean up temporary directory
|
|
try:
|
|
shutil.rmtree(temp_image_dir)
|
|
except Exception as cleanup_err:
|
|
log_structured('error', f'Error cleaning up temp image dir {temp_image_dir}', {'session_id': session_id, 'error': str(cleanup_err)})
|
|
|
|
except Exception as img_err:
|
|
log_structured('error', f'Error during image extraction pipeline for {filename}', {
|
|
'session_id': session_id, 'error': str(img_err), 'traceback': traceback.format_exc()
|
|
})
|
|
images_by_file_and_page[filename] = {} # Ensure empty dict on error
|
|
else:
|
|
log_structured('info', f'Image parsing skipped for {filename} (parser_images is None).', {'session_id': session_id})
|
|
images_by_file_and_page[filename] = {}
|
|
|
|
# 3. Add Metadata to Each Text Chunk
|
|
log_structured('debug', f'Assigning metadata and images to {len(text_chunks)} chunks for {filename}', {'session_id': session_id})
|
|
for chunk_index, chunk in enumerate(text_chunks):
|
|
if not hasattr(chunk, 'metadata') or chunk.metadata is None: chunk.metadata = {}
|
|
|
|
chunk.metadata['filename'] = filename
|
|
chunk.metadata['file_path'] = file_path_str
|
|
chunk.metadata['source'] = "document" # Or more specific if needed
|
|
chunk.metadata['type'] = os.path.basename(directory) # e.g., 'brief', 'supporting'
|
|
chunk.metadata['chunk_index'] = chunk_index
|
|
|
|
# --- Determine 1-based Page Number ---
|
|
page_num_1_based: Optional[int] = None
|
|
page_source_key: Optional[str] = None
|
|
|
|
# Priority: LlamaParse metadata keys
|
|
page_key_options = ['page_label', 'page_number', 'page'] # Check common keys
|
|
for key in page_key_options:
|
|
if key in chunk.metadata:
|
|
try:
|
|
page_num_1_based = int(chunk.metadata[key]) # Assume it's 1-based from LlamaParse
|
|
if page_num_1_based > 0: # Basic sanity check
|
|
page_source_key = key
|
|
chunk.metadata["source_page"] = page_num_1_based # Store consistently
|
|
log_structured('debug', f"Found page {page_num_1_based} from metadata key '{key}'", {'filename': filename, 'chunk': chunk_index, 'session_id': session_id})
|
|
break
|
|
else:
|
|
page_num_1_based = None # Ignore non-positive page numbers
|
|
except (ValueError, TypeError): pass # Ignore if not integer
|
|
|
|
# Fallback: If no page found in metadata, try approximation (less reliable)
|
|
# This part is optional and might be inaccurate. Consider removing if metadata is reliable.
|
|
if page_num_1_based is None:
|
|
# Simple fallback: use chunk index as a proxy (very rough)
|
|
page_num_1_based = chunk_index + 1 # Treat first chunk as page 1 approx.
|
|
chunk.metadata["source_page"] = page_num_1_based
|
|
page_source_key = "position_fallback"
|
|
log_structured('debug', f"Using position fallback for page: {page_num_1_based}", {'filename': filename, 'chunk': chunk_index, 'session_id': session_id})
|
|
|
|
|
|
# --- Precise Page-Based Image Assignment ---
|
|
chunk.metadata['image_paths'] = [] # Initialize/reset
|
|
specific_image_filename = None
|
|
|
|
if page_num_1_based is not None:
|
|
file_images_dict = images_by_file_and_page.get(filename, {})
|
|
if file_images_dict:
|
|
# Try exact match first
|
|
if page_num_1_based in file_images_dict:
|
|
specific_image_filename = file_images_dict[page_num_1_based]
|
|
chunk.metadata['image_match_type'] = 'exact'
|
|
log_structured('debug', f'Exact page match for image page {page_num_1_based}', {'filename': filename, 'chunk': chunk_index, 'image': specific_image_filename})
|
|
else:
|
|
# Optional: Find nearest page if exact match fails (can be noisy)
|
|
# nearest_page = min(file_images_dict.keys(), key=lambda x: abs(x - page_num_1_based))
|
|
# if abs(nearest_page - page_num_1_based) <= 1: # Only if very close? Threshold needed.
|
|
# specific_image_filename = file_images_dict[nearest_page]
|
|
# chunk.metadata['image_match_type'] = 'nearest'
|
|
# chunk.metadata["image_nearest_page"] = nearest_page
|
|
# log_structured('debug', f'Nearest page match: requested {page_num_1_based}, using {nearest_page}', {'filename': filename, 'chunk': chunk_index, 'image': specific_image_filename})
|
|
pass # Default: No image if no exact match
|
|
|
|
|
|
if specific_image_filename:
|
|
chunk.metadata['image_paths'] = [specific_image_filename] # Store as list
|
|
|
|
# Final metadata log before adding chunk
|
|
# log_structured('debug', "Final chunk metadata check", {
|
|
# 'session_id': session_id, 'filename': filename, 'chunk': chunk_index,
|
|
# 'page': chunk.metadata.get('source_page'), 'page_src': page_source_key,
|
|
# 'image': chunk.metadata.get('image_paths'), 'match_type': chunk.metadata.get('image_match_type'),
|
|
# 'keys': list(chunk.metadata.keys())
|
|
# })
|
|
|
|
processed_chunks.append(chunk)
|
|
|
|
# --- End of file loop ---
|
|
log_structured('info', f'Finished processing files. Total chunks: {len(processed_chunks)}', {'session_id': session_id})
|
|
return processed_chunks
|
|
|
|
# --- Execute the async file reader ---
|
|
final_documents: List[LlamaIndexDocument] = await custom_file_reader()
|
|
|
|
log_structured('info', 'Directory processing complete.', {
|
|
'session_id': session_id, 'directory': directory, 'total_chunks_generated': len(final_documents)
|
|
})
|
|
# Log sample metadata for verification
|
|
# for i, doc in enumerate(final_documents[:2]):
|
|
# log_structured('debug', f'Final Doc Sample #{i} Metadata', {'metadata': doc.metadata})
|
|
|
|
return final_documents
|
|
|
|
except Exception as e:
|
|
log_structured('critical', 'FATAL Error in process_documents_in_directory', {
|
|
'session_id': session_id, 'directory': directory, 'error': str(e), 'traceback': traceback.format_exc()
|
|
})
|
|
return [] # Return empty list on major failure
|
|
finally:
|
|
# Ensure async clients are closed
|
|
log_structured('debug', 'Closing LlamaParse clients.', {'session_id': session_id})
|
|
if parser_text and hasattr(parser_text, 'aclose'):
|
|
try: await parser_text.aclose()
|
|
except Exception as close_err: log_structured('error', 'Error closing parser_text client', {'error': str(close_err)})
|
|
if parser_images and hasattr(parser_images, 'aclose'):
|
|
try: await parser_images.aclose()
|
|
except Exception as close_err: log_structured('error', 'Error closing parser_images client', {'error': str(close_err)})
|
|
|
|
|
|
# --- GraphRAG Tool (module-level for reuse across init phases) ---
|
|
class GraphRAGTool(BaseTool):
|
|
"""Tool that queries using both vector and graph-based retrieval."""
|
|
|
|
def __init__(self, query_engine):
|
|
self.query_engine = query_engine
|
|
self._metadata = ToolMetadata(
|
|
name="answerquestionswith_graphrag",
|
|
description="USE THIS TOOL FOR ALL QUERIES - Queries HP marketing materials using both vector and graph-based retrieval for more comprehensive answers. Use this for complex questions that need context from multiple related documents."
|
|
)
|
|
|
|
@property
|
|
def metadata(self):
|
|
return self._metadata
|
|
|
|
def __call__(self, query_str: str) -> ToolOutput:
|
|
"""Run query through GraphRAG and generate synthesized answer."""
|
|
from shared_state import global_graphrag_query_engine
|
|
|
|
log_structured('info', 'GraphRAG Tool: Starting dual retrieval', {'query': query_str})
|
|
|
|
try:
|
|
# Get both vector and GraphRAG retrieval results
|
|
retrieval_result = self.query_engine.custom_query(query_str)
|
|
|
|
# Generate synthesized answer
|
|
final_answer = generate_final_answer(query_str, retrieval_result, self.query_engine.llm)
|
|
|
|
# Prepare the tool output
|
|
log_message = {
|
|
'vector_context_length': len(retrieval_result.get('vector_context', '')),
|
|
'graphrag_context_length': len(retrieval_result.get('graphrag_context', '')),
|
|
'vector_nodes_count': len(retrieval_result.get('vector_nodes', [])),
|
|
'community_ids': retrieval_result.get('community_ids', [])
|
|
}
|
|
log_structured('info', 'GraphRAG Tool: Retrieval complete', log_message)
|
|
|
|
# Extract vector nodes and create a properly structured raw_output that includes source_nodes
|
|
vector_nodes = retrieval_result.get('vector_nodes', [])
|
|
|
|
class NodeWrapper:
|
|
def __init__(self, nodes):
|
|
self.source_nodes = nodes
|
|
|
|
tool_output = ToolOutput(
|
|
content=final_answer,
|
|
tool_name="GraphRAG",
|
|
raw_output=NodeWrapper(vector_nodes),
|
|
raw_input={"query": query_str}
|
|
)
|
|
|
|
log_structured('debug', 'GraphRAG Tool: Including image metadata in response',
|
|
{'node_count': len(vector_nodes)})
|
|
|
|
return tool_output
|
|
except Exception as graphrag_err:
|
|
log_structured('error', f'Error in GraphRAG tool: {graphrag_err}',
|
|
{'traceback': traceback.format_exc()})
|
|
return ToolOutput(
|
|
content=f"I encountered an error while retrieving information: {str(graphrag_err)}. Please try again or use the standard query tool.",
|
|
tool_name="GraphRAG",
|
|
raw_input={"query": query_str},
|
|
raw_output={"error": str(graphrag_err)}
|
|
)
|
|
|
|
async def acall(self, input: str) -> ToolOutput:
|
|
"""Async version of __call__."""
|
|
return self.__call__(input)
|
|
|
|
|
|
# --- Phase 1: Vector Index Initialization (fast path) ---
|
|
async def initialize_vector_index() -> bool:
|
|
"""Initialize vector index and agent with vector-only tools.
|
|
|
|
This is the fast path (~1-2 min) that makes the server usable for
|
|
vector-only queries. GraphRAG components are added later by
|
|
initialize_graphrag_components().
|
|
"""
|
|
try:
|
|
# --- Configure LLM and Embedding Model ---
|
|
openai_key = os.environ.get("OPENAI_API_KEY", "")
|
|
if not openai_key:
|
|
log_structured('critical', 'No OpenAI API key provided. Make sure OPENAI_API_KEY is set in your .env file.')
|
|
return False
|
|
|
|
key_preview = openai_key[:4] + "..." if len(openai_key) > 4 else "invalid"
|
|
log_structured('info', f'Using OpenAI API key starting with: {key_preview}')
|
|
|
|
try:
|
|
llm = LlamaOpenAI(
|
|
model=LLM_MODEL,
|
|
temperature=LLM_TEMPERATURE,
|
|
timeout=LLM_TIMEOUT
|
|
)
|
|
test_prompt = "Say 'API key is working' if you can read this."
|
|
_ = llm.complete(test_prompt)
|
|
log_structured('info', 'Successfully tested LLM API connection')
|
|
except Exception as llm_err:
|
|
log_structured('critical', f'Failed to initialize LLM with provided API key: {str(llm_err)}')
|
|
return False
|
|
embed_model = OpenAIEmbedding(model=EMBEDDING_MODEL)
|
|
|
|
# --- Configure Global Settings ---
|
|
Settings.llm = llm
|
|
Settings.embed_model = embed_model
|
|
Settings.callback_manager = CallbackManager([token_counter])
|
|
|
|
node_parser = SemanticSplitterNodeParser(
|
|
buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model
|
|
)
|
|
Settings.node_parser = node_parser
|
|
|
|
# --- Load or Build Vector Index ---
|
|
if INDEX_PERSIST_PATH.exists():
|
|
log_structured('info', f'Loading existing index from {INDEX_PERSIST_PATH}')
|
|
storage_context = StorageContext.from_defaults(persist_dir=str(INDEX_PERSIST_PATH))
|
|
index = load_index_from_storage(storage_context)
|
|
set_global_index(index)
|
|
log_structured('info', 'Successfully loaded existing index')
|
|
else:
|
|
log_structured('info', f'No index found at {INDEX_PERSIST_PATH}. Creating new index from {HP_DOCS_FOLDER}')
|
|
if not HP_DOCS_FOLDER.exists() or not any(HP_DOCS_FOLDER.iterdir()):
|
|
log_structured('error', f'HP documents folder is missing or empty: {HP_DOCS_FOLDER}')
|
|
return False
|
|
|
|
documents = await process_documents_in_directory(str(HP_DOCS_FOLDER), session_id="global_init")
|
|
if not documents:
|
|
log_structured('error', f'No documents processed from {HP_DOCS_FOLDER}. Index creation aborted.')
|
|
return False
|
|
|
|
log_structured('info', 'Creating vector store index...', {
|
|
'document_count': len(documents),
|
|
'sample_doc_metadata': documents[0].metadata if documents else None
|
|
})
|
|
|
|
index = VectorStoreIndex.from_documents(
|
|
documents=documents,
|
|
show_progress=True,
|
|
)
|
|
set_global_index(index)
|
|
index.storage_context.persist(persist_dir=str(INDEX_PERSIST_PATH))
|
|
log_structured('info', f'Index created and persisted to {INDEX_PERSIST_PATH}')
|
|
|
|
# --- Create Retriever and Query Engine (vector only) ---
|
|
vector_store_info = VectorStoreInfo(
|
|
content_info="HP marketing reference materials, including brand guidelines and supporting documents.",
|
|
metadata_info=[
|
|
MetadataInfo(name="filename", type="str", description="Filename of the source document"),
|
|
MetadataInfo(name="source_page", type="int", description="Approximate page number in the source document"),
|
|
MetadataInfo(name="image_paths", type="list[str]", description="List of image filenames associated with this chunk"),
|
|
],
|
|
)
|
|
|
|
all_retriever = VectorIndexAutoRetriever(
|
|
index=index,
|
|
vector_store_info=vector_store_info,
|
|
similarity_top_k=SIMILARITY_TOP_K,
|
|
verbose=True
|
|
)
|
|
|
|
all_query_engine = RetrieverQueryEngine.from_args(
|
|
retriever=all_retriever,
|
|
response_synthesizer=get_response_synthesizer(
|
|
response_mode=ResponseMode.COMPACT,
|
|
),
|
|
node_postprocessors=[
|
|
SimilarityPostprocessor(similarity_cutoff=SIMILARITY_CUTOFF),
|
|
],
|
|
)
|
|
|
|
# --- Create Agent with vector-only tools ---
|
|
query_engine_tools_react = [
|
|
QueryEngineTool(
|
|
query_engine=all_query_engine,
|
|
metadata=ToolMetadata(
|
|
name="answer_questions_from_hp_marketing_materials",
|
|
description="USE THIS TOOL FOR ALL QUERIES - Queries HP marketing materials (brand guidelines, etc.) to answer questions about guidelines, workflows, and processes. Use this for specific lookups in the knowledge base."
|
|
),
|
|
),
|
|
]
|
|
|
|
try:
|
|
agent = ReActAgent2(
|
|
llm=llm,
|
|
tools=query_engine_tools_react,
|
|
memory=ChatMemoryBuffer.from_defaults(llm=llm, token_limit=4096),
|
|
verbose=True,
|
|
timeout=AGENT_TIMEOUT,
|
|
llm_timeout=LLM_TIMEOUT,
|
|
tool_timeout=TOOL_EXECUTION_TIMEOUT
|
|
)
|
|
set_global_agent(agent)
|
|
log_structured('info', 'Agent initialized successfully with vector-only tools')
|
|
except Exception as agent_err:
|
|
log_structured('critical', f'Failed to initialize agent: {str(agent_err)}', {'error': str(agent_err)})
|
|
return False
|
|
|
|
# --- Attach simple_run method ---
|
|
try:
|
|
async def simple_run(query_text):
|
|
"""Simple version that doesn't rely on complex workflow steps."""
|
|
from shared_state import global_workflow_agent
|
|
|
|
if not global_workflow_agent:
|
|
log_structured('critical', 'Agent is None in simple_run - this should never happen')
|
|
return {
|
|
"response": "The AI system is currently unavailable. Please try again later.",
|
|
"sources": [],
|
|
"reasoning": []
|
|
}
|
|
|
|
try:
|
|
global_workflow_agent.sources = []
|
|
|
|
user_msg = ChatMessage(role="user", content=str(query_text))
|
|
global_workflow_agent.memory.put(user_msg)
|
|
|
|
chat_history = global_workflow_agent.memory.get()
|
|
llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history)
|
|
|
|
response = await asyncio.wait_for(
|
|
global_workflow_agent.llm.achat(llm_input),
|
|
timeout=global_workflow_agent.llm_timeout
|
|
)
|
|
reasoning_step = global_workflow_agent.output_parser.parse(response.message.content)
|
|
|
|
log_structured('debug', 'Parsed reasoning step', {
|
|
'step_type': type(reasoning_step).__name__,
|
|
'has_response': hasattr(reasoning_step, 'response'),
|
|
'has_action': hasattr(reasoning_step, 'action'),
|
|
'action': getattr(reasoning_step, 'action', None),
|
|
'action_input': getattr(reasoning_step, 'action_input', None),
|
|
'raw_content': response.message.content[:200]
|
|
})
|
|
|
|
if hasattr(reasoning_step, 'response') and reasoning_step.response:
|
|
log_structured('debug', 'LLM provided direct response, forcing tool usage for image retrieval')
|
|
|
|
graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
|
|
if graphrag_tool:
|
|
try:
|
|
tool_output = await asyncio.wait_for(
|
|
graphrag_tool.acall(input=str(query_text)),
|
|
timeout=global_workflow_agent.tool_timeout
|
|
)
|
|
global_workflow_agent.sources.append(tool_output)
|
|
|
|
response_text = str(tool_output.content) if tool_output and tool_output.content else str(reasoning_step.response)
|
|
|
|
assistant_msg = ChatMessage(role="assistant", content=response_text)
|
|
global_workflow_agent.memory.put(assistant_msg)
|
|
return {
|
|
"response": response_text,
|
|
"sources": global_workflow_agent.sources,
|
|
"reasoning": [reasoning_step]
|
|
}
|
|
except Exception as e:
|
|
log_structured('error', f'Error forcing GraphRAG tool execution: {str(e)}')
|
|
|
|
response_text = str(reasoning_step.response)
|
|
|
|
import re
|
|
thinking_patterns = [
|
|
r'(?i)^.*?thinking:.*?\n',
|
|
r'(?i)<thinking>.*?</thinking>',
|
|
r'(?i)\[thinking\].*?\[/thinking\]',
|
|
r'(?i)I\'m thinking:.*?\n',
|
|
r'(?i)Let me think.*?\n',
|
|
r'(?i)Thought:.*?Answer:',
|
|
r'(?i)^Answer:\s*'
|
|
]
|
|
for pattern in thinking_patterns:
|
|
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
|
|
|
|
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
|
|
response_text = response_text.strip()
|
|
|
|
assistant_msg = ChatMessage(role="assistant", content=response_text)
|
|
global_workflow_agent.memory.put(assistant_msg)
|
|
return {
|
|
"response": response_text,
|
|
"sources": global_workflow_agent.sources,
|
|
"reasoning": [reasoning_step]
|
|
}
|
|
else:
|
|
if hasattr(reasoning_step, 'action') and reasoning_step.action:
|
|
tool_name = reasoning_step.action
|
|
action_input = reasoning_step.action_input or {}
|
|
|
|
if not isinstance(action_input, dict):
|
|
try:
|
|
if isinstance(action_input, str) and action_input.strip().startswith('{'):
|
|
action_input = json.loads(action_input)
|
|
else:
|
|
action_input = {'query': action_input}
|
|
except:
|
|
action_input = {'query': str(action_input)}
|
|
|
|
tool = next((t for t in global_workflow_agent.tools if t.metadata.name == tool_name), None)
|
|
if not tool:
|
|
error_msg = f"Tool '{tool_name}' not found. Available tools: {[t.metadata.name for t in global_workflow_agent.tools]}"
|
|
log_structured('error', 'Tool not found in simple_run', {'tool_name': tool_name, 'available_tools': [t.metadata.name for t in global_workflow_agent.tools]})
|
|
return {
|
|
"response": f"I tried to use a tool called '{tool_name}' but it's not available. Please try rephrasing your query.",
|
|
"sources": [],
|
|
"reasoning": [reasoning_step]
|
|
}
|
|
if tool:
|
|
try:
|
|
tool_output = await asyncio.wait_for(
|
|
tool.acall(**action_input),
|
|
timeout=global_workflow_agent.tool_timeout
|
|
)
|
|
global_workflow_agent.sources.append(tool_output)
|
|
|
|
observation = str(tool_output.content if tool_output and tool_output.content is not None else "No content")
|
|
follow_up_msg = ChatMessage(role="user", content=f"Here is the result: {observation}\\nPlease provide a final response based on this information.")
|
|
global_workflow_agent.memory.put(follow_up_msg)
|
|
|
|
chat_history = global_workflow_agent.memory.get()
|
|
llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history)
|
|
final_response = await asyncio.wait_for(
|
|
global_workflow_agent.llm.achat(llm_input),
|
|
timeout=global_workflow_agent.llm_timeout
|
|
)
|
|
|
|
response_text = str(final_response.message.content)
|
|
|
|
import re
|
|
thinking_patterns = [
|
|
r'(?i)^.*?thinking:.*?\n',
|
|
r'(?i)<thinking>.*?</thinking>',
|
|
r'(?i)\[thinking\].*?\[/thinking\]',
|
|
r'(?i)I\'m thinking:.*?\n',
|
|
r'(?i)Let me think.*?\n',
|
|
r'(?i)Thought:.*?Answer:',
|
|
r'(?i)^Answer:\s*'
|
|
]
|
|
for pattern in thinking_patterns:
|
|
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
|
|
|
|
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
|
|
response_text = response_text.strip()
|
|
|
|
global_workflow_agent.memory.put(ChatMessage(role="assistant", content=response_text))
|
|
return {
|
|
"response": response_text,
|
|
"sources": global_workflow_agent.sources,
|
|
"reasoning": [reasoning_step]
|
|
}
|
|
except Exception as e:
|
|
log_structured('error', f'Error executing tool: {str(e)}', {'traceback': traceback.format_exc()})
|
|
error_response = f"I encountered an error while processing your query: {str(e)}"
|
|
return {
|
|
"response": error_response,
|
|
"sources": [],
|
|
"reasoning": [reasoning_step]
|
|
}
|
|
|
|
if isinstance(reasoning_step, ActionReasoningStep):
|
|
log_structured('warning', 'Returning raw thinking due to failed action execution', {
|
|
'action': reasoning_step.action,
|
|
'action_input': reasoning_step.action_input
|
|
})
|
|
return {
|
|
"response": "I apologize, but I encountered an issue while processing your query. Please try asking your question in a different way.",
|
|
"sources": [],
|
|
"reasoning": [reasoning_step]
|
|
}
|
|
else:
|
|
fallback_response = "I wasn't able to find a specific answer to your question. Please try rephrasing your query."
|
|
return {
|
|
"response": fallback_response,
|
|
"sources": global_workflow_agent.sources,
|
|
"reasoning": [reasoning_step] if reasoning_step else []
|
|
}
|
|
except Exception as e:
|
|
log_structured('error', f'Error in simple_run: {str(e)}', {'traceback': traceback.format_exc()})
|
|
error_response = f"I encountered an error while processing your query: {str(e)}"
|
|
return {
|
|
"response": error_response,
|
|
"sources": [],
|
|
"reasoning": []
|
|
}
|
|
|
|
from shared_state import global_workflow_agent as current_agent
|
|
|
|
if current_agent is None:
|
|
log_structured('critical', 'Cannot set run method - global_workflow_agent is None')
|
|
return False
|
|
|
|
current_agent.run = simple_run
|
|
|
|
if not hasattr(current_agent, 'run'):
|
|
log_structured('critical', 'Failed to attach run method to agent')
|
|
return False
|
|
|
|
log_structured('info', 'Successfully attached run method to agent')
|
|
|
|
log_structured('info', 'Testing agent functionality...')
|
|
if current_agent and hasattr(current_agent, 'memory') and hasattr(current_agent.memory, 'reset'):
|
|
current_agent.memory.reset()
|
|
log_structured('info', 'Agent memory reset test successful')
|
|
else:
|
|
if not current_agent:
|
|
log_structured('error', 'Agent memory test failed: agent is None')
|
|
elif not hasattr(current_agent, 'memory'):
|
|
log_structured('error', 'Agent memory test failed: agent has no memory attribute')
|
|
elif not hasattr(current_agent.memory, 'reset'):
|
|
log_structured('error', 'Agent memory test failed: agent.memory has no reset method')
|
|
else:
|
|
log_structured('error', 'Agent memory test failed: unknown reason')
|
|
return False
|
|
|
|
except Exception as method_err:
|
|
log_structured('critical', f'Failed to set up agent run method: {str(method_err)}',
|
|
{'error': str(method_err), 'traceback': traceback.format_exc()})
|
|
return False
|
|
|
|
log_structured('info', 'Vector index and agent initialized successfully (vector-only mode).')
|
|
return True
|
|
|
|
except Exception as e:
|
|
log_structured('critical', 'Vector index/agent initialization failed', {
|
|
'error': str(e), 'traceback': traceback.format_exc()
|
|
})
|
|
return False
|
|
|
|
|
|
# --- Phase 2: GraphRAG Background Initialization ---
|
|
async def initialize_graphrag_components() -> bool:
|
|
"""Initialize GraphRAG components in the background.
|
|
|
|
Connects to Neo4j, restores/extracts triples, builds communities,
|
|
creates GraphRAG query engine, and dynamically adds the GraphRAG tool
|
|
to the existing agent's tools list.
|
|
"""
|
|
set_graphrag_status(initializing=True, ready=False, error=None)
|
|
|
|
try:
|
|
import shared_state
|
|
current_agent = shared_state.global_workflow_agent
|
|
index = shared_state.global_index
|
|
|
|
if index is None:
|
|
raise RuntimeError("Vector index must be initialized first")
|
|
if current_agent is None:
|
|
raise RuntimeError("Agent must be initialized first")
|
|
|
|
llm = Settings.llm
|
|
|
|
# Connect to Neo4j
|
|
from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore
|
|
log_structured('info', f'Connecting to Neo4j at {NEO4J_URL}')
|
|
property_graph_store = Neo4jPropertyGraphStore(
|
|
username=NEO4J_USERNAME,
|
|
password=NEO4J_PASSWORD,
|
|
url=NEO4J_URL
|
|
)
|
|
log_structured('info', 'Successfully connected to Neo4j database')
|
|
|
|
# Check Neo4j state
|
|
temp_graph_store = GraphRAGStore(property_graph_store)
|
|
triplets = temp_graph_store.get_triplets()
|
|
neo4j_has_data = len(triplets) > 0
|
|
|
|
if neo4j_has_data:
|
|
log_structured('info', f'Neo4j contains {len(triplets)} triplets. Using existing data.')
|
|
graph_store, property_graph_index = create_graph_components(
|
|
llm=llm,
|
|
force_reindex=False
|
|
)
|
|
else:
|
|
# Neo4j is empty — get nodes from vector index for potential extraction
|
|
log_structured('info', 'Neo4j is empty. Getting nodes from vector index for GraphRAG.')
|
|
vector_nodes = []
|
|
if hasattr(index, 'docstore') and index.docstore:
|
|
vector_nodes = list(index.docstore.docs.values())
|
|
log_structured('info', f'Retrieved {len(vector_nodes)} nodes from index docstore')
|
|
|
|
if not vector_nodes:
|
|
raise ValueError("No nodes available for GraphRAG indexing")
|
|
|
|
# create_graph_components will try cache restore first, then LLM extraction
|
|
graph_store, property_graph_index = create_graph_components(
|
|
llm=llm,
|
|
nodes=vector_nodes,
|
|
max_paths_per_chunk=10,
|
|
force_reindex=False # Allow cache restore path
|
|
)
|
|
|
|
# Ensure communities are built
|
|
if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built:
|
|
graph_store.build_communities()
|
|
|
|
# Create GraphRAG query engine
|
|
vector_retriever = VectorIndexRetriever(
|
|
index=index,
|
|
similarity_top_k=SIMILARITY_TOP_K
|
|
)
|
|
|
|
graphrag_query_engine = create_graphrag_query_engine(
|
|
vector_retriever=vector_retriever,
|
|
graph_store=graph_store,
|
|
llm=llm,
|
|
similarity_top_k=SIMILARITY_TOP_K
|
|
)
|
|
|
|
# Store in shared state
|
|
set_graphrag_components(
|
|
graph_store=graph_store,
|
|
property_graph_index=property_graph_index,
|
|
graphrag_query_engine=graphrag_query_engine
|
|
)
|
|
|
|
# Dynamically add GraphRAG tool to the existing agent
|
|
graphrag_tool = GraphRAGTool(graphrag_query_engine)
|
|
|
|
live_agent = shared_state.global_workflow_agent
|
|
if live_agent is not None:
|
|
live_agent.tools.append(graphrag_tool)
|
|
log_structured('info', 'Added GraphRAG tool to existing agent', {
|
|
'total_tools': len(live_agent.tools),
|
|
'tool_names': [t.metadata.name for t in live_agent.tools]
|
|
})
|
|
|
|
set_graphrag_status(ready=True, initializing=False)
|
|
log_structured('info', 'GraphRAG components initialized successfully')
|
|
return True
|
|
|
|
except Exception as e:
|
|
log_structured('error', f'GraphRAG background initialization failed: {e}',
|
|
{'traceback': traceback.format_exc()})
|
|
set_graphrag_status(ready=False, initializing=False, error=e)
|
|
return False
|
|
|
|
|
|
# --- Global Index Initialization (backward-compatible wrapper) ---
|
|
async def initialize_global_index() -> bool:
|
|
"""Backward-compatible wrapper: initializes everything sequentially."""
|
|
vector_success = await initialize_vector_index()
|
|
if not vector_success:
|
|
return False
|
|
|
|
graphrag_success = await initialize_graphrag_components()
|
|
if not graphrag_success:
|
|
log_structured('warning', 'GraphRAG init failed, but vector search is available')
|
|
|
|
return True |