netflix/ai_core.py
michael 49b0ba9c74 Update OpenAI models to gpt-4.1 and gpt-4.1-mini
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 15:13:41 -06:00

1380 lines
No EOL
77 KiB
Python

# netflix_chatbot/ai_core.py
import os
import asyncio
import traceback
import uuid
import shutil
import re
import inspect
import json
from pathlib import Path
from typing import List, Dict, Any, Optional, Union
import httpx
import tiktoken
import nest_asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
# Import GraphRAG integration
from graph_rag_integration import (
create_graph_components,
create_graphrag_query_engine,
generate_final_answer,
GraphRAGExtractor,
GraphRAGStore,
GraphRAGQueryEngine
)
from llama_index.core import (
VectorStoreIndex,
Document as LlamaIndexDocument,
Settings,
get_response_synthesizer,
load_index_from_storage,
StorageContext,
SimpleDirectoryReader # Keep if needed for fallback/other uses
)
from llama_index.core.retrievers import VectorIndexAutoRetriever, VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.postprocessor import SimilarityPostprocessor
from llama_index.core.response_synthesizers import ResponseMode
from llama_index.llms.openai import OpenAI as LlamaOpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.callbacks import CallbackManager, TokenCountingHandler
from llama_index.core.tools import QueryEngineTool, ToolMetadata, BaseTool, ToolSelection, ToolOutput
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.llms import ChatMessage, LLM
from llama_index.core.vector_stores.types import MetadataInfo, VectorStoreInfo
from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser
from llama_index.core.agent.react.types import (
ActionReasoningStep,
ObservationReasoningStep,
ResponseReasoningStep,
BaseReasoningStep
)
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
Event,
)
from llama_index.core.node_parser import (
SentenceSplitter,
SemanticSplitterNodeParser,
)
from llama_parse import LlamaParse
# Import from our modules
from utils import logger, log_structured
from config import (
NETFLIX_DOCS_FOLDER, INDEX_PERSIST_PATH, IMAGES_DIRECTORY,
LLM_MODEL, EMBEDDING_MODEL, LLM_TEMPERATURE, LLM_TIMEOUT, AGENT_TIMEOUT,
TOOL_EXECUTION_TIMEOUT, SIMILARITY_TOP_K, SIMILARITY_CUTOFF,
LLAMA_PARSE_VENDOR_MODEL, LLAMA_PARSE_MAX_TIMEOUT,
NEO4J_URL, NEO4J_USERNAME, NEO4J_PASSWORD
)
nest_asyncio.apply()
# --- Global AI State ---
# Import shared state to ensure all modules access the same instances
from shared_state import (
global_index, global_workflow_agent,
global_graph_store, global_property_graph_index, global_graphrag_query_engine,
set_global_agent, set_global_index, set_graphrag_components
)
# --- Token Counter ---
try:
token_counter = TokenCountingHandler(
tokenizer=tiktoken.encoding_for_model(LLM_MODEL).encode
)
except Exception as e:
log_structured('warning', f'Could not initialize tiktoken for {LLM_MODEL}. Token counting may be inaccurate.', {'error': str(e)})
# Fallback tokenizer if needed, or disable token counting
token_counter = TokenCountingHandler(tokenizer=lambda text: list(text.encode("utf-8")))
# --- Custom ReAct Agent Workflow ---
class PrepEvent(Event): pass
class InputEvent(Event): input: list[ChatMessage]
class ToolCallEvent(Event): tool_calls: list[ToolSelection]
class FunctionOutputEvent(Event): output: ToolOutput
class CustomStartEvent(StartEvent): input_value: str = ""
class ReActAgent2(Workflow):
"""
Custom ReAct Agent implementation using LlamaIndex Workflows.
Includes timeout handling for LLM calls and tool execution.
"""
def __init__(
self,
llm: LLM,
tools: list[BaseTool],
memory: ChatMemoryBuffer,
timeout: float = AGENT_TIMEOUT, # Overall workflow timeout
llm_timeout: float = LLM_TIMEOUT, # Timeout for individual LLM calls
tool_timeout: float = TOOL_EXECUTION_TIMEOUT, # Timeout for individual tool calls
verbose: bool = True, # Add verbose flag
extra_context: str | None = None,
**kwargs: Any,
) -> None:
super().__init__(timeout=timeout, verbose=verbose, **kwargs) # Pass verbose to parent
self.llm = llm
self.tools = tools or []
self.memory = memory
self.formatter = ReActChatFormatter(context=extra_context or "")
self.output_parser = ReActOutputParser()
self.sources: List[ToolOutput] = [] # Store ToolOutput objects directly
self.llm_timeout = llm_timeout
self.tool_timeout = tool_timeout
self.verbose = verbose # Store verbose flag
@step
async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:
self.sources = [] # Clear sources for new message
# Store the initial input in the context
await ctx.set("user_input", ev) # Save the whole event
# We'll set a placeholder message for now and extract the actual input in the next step
user_msg = ChatMessage(role="user", content="Placeholder")
self.memory.put(user_msg)
await ctx.set("current_reasoning", [])
if self.verbose:
log_structured('debug', 'ReActAgent: Receiving new user message', {})
return PrepEvent()
@step
async def prepare_chat_history(self, ctx: Context, ev: PrepEvent) -> InputEvent:
# Get the real user input from the first step and update the memory
start_event = await ctx.get("user_input")
if hasattr(start_event, "input_value"):
# Try to access input_value directly if it exists
real_user_input = start_event.input_value
else:
# Otherwise, assume the actual input is the input value provided to run()
log_structured('info', 'ReActAgent: Extracting input from workflow input', {})
# Default to reasonable fallback if we can't extract it
real_user_input = "How can I help you with Netflix marketing guidelines?"
# Update the placeholder message with the real input
messages = self.memory.get()
if messages and messages[0].role == "user" and messages[0].content == "Placeholder":
# Replace the placeholder with real input
messages[0].content = str(real_user_input)
if self.verbose:
log_structured('debug', 'ReActAgent: Updated user message with real input', {'input': real_user_input})
# Format the chat history for the LLM
chat_history = self.memory.get()
current_reasoning = await ctx.get("current_reasoning", default=[])
llm_input = self.formatter.format(
self.tools, chat_history, current_reasoning=current_reasoning
)
if self.verbose:
log_structured('debug', 'ReActAgent: Prepared chat history for LLM', {'history_len': len(llm_input)})
return InputEvent(input=llm_input)
@step # The timeout is managed inside the method
async def handle_llm_input(self, ctx: Context, ev: InputEvent) -> Union[ToolCallEvent, StopEvent, PrepEvent]:
chat_history = ev.input
current_reasoning = await ctx.get("current_reasoning", default=[])
try:
if self.verbose:
log_structured('debug', 'ReActAgent: Sending request to LLM', {'history_len': len(chat_history)})
response = await asyncio.wait_for(
self.llm.achat(chat_history),
timeout=self.llm_timeout
)
reasoning_step = self.output_parser.parse(response.message.content)
current_reasoning.append(reasoning_step)
await ctx.set("current_reasoning", current_reasoning) # Update context state
if self.verbose:
log_structured('debug', 'ReActAgent: Received LLM response', {
'step_type': type(reasoning_step).__name__,
'is_done': getattr(reasoning_step, 'is_done', False),
'has_response': hasattr(reasoning_step, 'response'),
'has_action': hasattr(reasoning_step, 'action'),
'action': getattr(reasoning_step, 'action', None),
'raw_content_preview': response.message.content[:300]
})
if reasoning_step.is_done:
# Log what we're about to return as final response
if self.verbose:
log_structured('debug', 'ReActAgent: Processing final response', {
'raw_response_content': response.message.content,
'reasoning_step_response': str(getattr(reasoning_step, 'response', 'NO_RESPONSE_ATTR'))
})
# Clean the response to remove any thinking parts
response_text = str(reasoning_step.response)
# Check for common thinking patterns and remove them
import re # Import re within the function scope to ensure it's available
thinking_patterns = [
r'(?i)^.*?thinking:.*?\n', # Remove lines starting with "Thinking:"
r'(?i)<thinking>.*?</thinking>', # Remove XML-like thinking tags
r'(?i)\[thinking\].*?\[/thinking\]', # Remove bracket thinking tags
r'(?i)I\'m thinking:.*?\n', # Remove "I'm thinking:" sections
r'(?i)Let me think.*?\n', # Remove "Let me think" sections
r'(?i)Thought:.*?Action:.*?Action Input:.*', # Remove the specific pattern user reported
r'(?i)^Thought:.*', # Remove any line starting with "Thought:"
r'(?i)Action:.*?Action Input:.*', # Remove Action/Action Input patterns
r'(?i)Thought:.*?Answer:', # Remove "Thought: ... Answer:" pattern
r'(?i)^Answer:\s*' # Remove just the "Answer:" prefix
]
for pattern in thinking_patterns:
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
# Remove extra newlines that might be left after cleaning
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
response_text = response_text.strip()
# Final safety check - if response still looks like thinking, provide fallback
if re.search(r'(?i)^(Thought|Action|Observation):', response_text) or not response_text:
log_structured('warning', 'ReActAgent: Final response still contains thinking patterns, using fallback', {
'problematic_response': response_text[:200]
})
response_text = "I found information about your query in the Netflix marketing materials. Please let me know if you need more specific details."
self.memory.put(ChatMessage(role="assistant", content=response_text))
if self.verbose:
log_structured('info', 'ReActAgent: Final response generated', {
'response_preview': response_text[:100],
'sources_count': len(self.sources)
})
# Structure the final output
final_result = {
"response": response_text,
"sources": self.sources, # Pass the collected ToolOutput objects
"reasoning": current_reasoning
}
return StopEvent(result=final_result)
elif isinstance(reasoning_step, ActionReasoningStep):
if self.verbose:
log_structured('debug', 'ReActAgent: Action step identified', {
'action': reasoning_step.action,
'input': reasoning_step.action_input
})
# Ensure action_input is a dict
action_input_dict = reasoning_step.action_input or {}
if not isinstance(action_input_dict, dict):
log_structured('warning', 'ReActAgent: action_input is not a dict, attempting to parse', {'raw_input': action_input_dict})
try:
# Attempt basic parsing if it looks like JSON string
if isinstance(action_input_dict, str) and action_input_dict.strip().startswith('{'):
action_input_dict = json.loads(action_input_dict)
else: # Fallback: treat as a single 'query' arg if not dict/json
action_input_dict = {'query': action_input_dict}
except Exception as parse_err:
log_structured('error', 'ReActAgent: Failed to parse action_input', {'raw_input': action_input_dict, 'error': str(parse_err)})
action_input_dict = {'query': str(action_input_dict)} # Safest fallback
return ToolCallEvent(tool_calls=[
ToolSelection(
tool_id="tool_" + reasoning_step.action.replace(" ", "_"), # Simple ID generation
tool_name=reasoning_step.action,
tool_kwargs=action_input_dict
)
])
else:
# Handle other reasoning step types if necessary, or just proceed
if self.verbose:
log_structured('debug', 'ReActAgent: Non-action, non-done step encountered', {'step_type': type(reasoning_step).__name__})
return PrepEvent() # Continue the loop
except asyncio.TimeoutError:
error_msg = f"LLM call timed out after {self.llm_timeout} seconds."
log_structured('error', 'ReActAgent: LLM Timeout', {'timeout': self.llm_timeout})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
await ctx.set("current_reasoning", current_reasoning)
return PrepEvent() # Try again or maybe stop? Returning PrepEvent allows loop to continue.
except Exception as e:
error_msg = f"Error during LLM interaction or parsing: {str(e)}"
log_structured('error', 'ReActAgent: Error in handle_llm_input', {
'error': str(e), 'traceback': traceback.format_exc()
})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
await ctx.set("current_reasoning", current_reasoning)
# Decide whether to stop or continue after error
# Returning PrepEvent allows the agent to potentially describe the error or retry
return PrepEvent()
@step # The timeout is managed inside the method
async def handle_tool_calls(self, ctx: Context, ev: ToolCallEvent) -> PrepEvent:
tool_calls = ev.tool_calls
tools_by_name = {tool.metadata.name: tool for tool in self.tools} # Use .name property
current_reasoning = await ctx.get("current_reasoning", default=[])
if self.verbose:
log_structured('debug', 'ReActAgent: Handling tool calls', {
'call_count': len(tool_calls),
'tool_names': [tc.tool_name for tc in tool_calls]
})
for tool_call in tool_calls:
tool = tools_by_name.get(tool_call.tool_name)
if not tool:
error_msg = f"Tool '{tool_call.tool_name}' not found."
log_structured('error', 'ReActAgent: Tool not found', {'tool_name': tool_call.tool_name})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
continue # Skip to next tool call
try:
tool_kwargs = tool_call.tool_kwargs or {} # Ensure kwargs is a dict
if self.verbose:
log_structured('debug', 'ReActAgent: Executing tool', {
'tool_name': tool_call.tool_name,
'kwargs': tool_kwargs
})
# Execute tool potentially in thread pool with timeout
tool_func = partial(tool, **tool_kwargs)
tool_output: ToolOutput
if inspect.iscoroutinefunction(tool.call): # Check if tool's call method is async
tool_output = await asyncio.wait_for(
tool.acall(**tool_kwargs), # Use acall for async tools
timeout=self.tool_timeout
)
else:
# Run sync tool in thread pool
with ThreadPoolExecutor() as executor:
future = executor.submit(tool_func) # tool_func already has kwargs via partial
tool_output = await asyncio.to_thread(future.result, timeout=self.tool_timeout)
if self.verbose:
log_structured('debug', 'ReActAgent: Tool execution successful', {
'tool_name': tool_call.tool_name,
'output_type': type(tool_output).__name__,
'output_content_preview': str(tool_output.content)[:100] if tool_output else "N/A"
})
# Store the raw ToolOutput object which contains content, raw_output, metadata etc.
self.sources.append(tool_output)
# Add observation step with the tool's string content
observation_content = str(tool_output.content) if tool_output and tool_output.content is not None else "Tool executed successfully but returned no content."
current_reasoning.append(ObservationReasoningStep(observation=observation_content))
if self.verbose:
log_structured('debug', 'ReActAgent: Added observation to reasoning', {
'tool_name': tool_call.tool_name,
'observation_preview': observation_content[:200]
})
# --- Detailed logging for image metadata ---
if hasattr(tool_output, 'raw_output') and hasattr(tool_output.raw_output, 'source_nodes'):
for node_with_score in tool_output.raw_output.source_nodes:
node = getattr(node_with_score, 'node', None)
if node and hasattr(node, 'metadata'):
if 'image_paths' in node.metadata and node.metadata['image_paths']:
log_structured('debug', 'ReActAgent: Tool output node contains image paths', {
'tool_name': tool_call.tool_name,
'node_id': getattr(node, 'id_', 'N/A'),
'image_paths': node.metadata['image_paths']
})
except asyncio.TimeoutError:
error_msg = f"Tool '{tool_call.tool_name}' timed out after {self.tool_timeout} seconds."
log_structured('error', 'ReActAgent: Tool Timeout', {'tool_name': tool_call.tool_name, 'timeout': self.tool_timeout})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
except Exception as e:
error_msg = f"Error calling tool '{tool_call.tool_name}': {str(e)}"
log_structured('error', 'ReActAgent: Tool execution error', {
'tool_name': tool_call.tool_name, 'error': str(e), 'traceback': traceback.format_exc()
})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
# Update reasoning steps in context before returning
await ctx.set("current_reasoning", current_reasoning)
return PrepEvent() # Always return PrepEvent to continue the loop
# --- Document Processing ---
async def process_documents_in_directory(directory: str, session_id: Optional[str] = None) -> List[LlamaIndexDocument]:
"""
Process all documents in a directory using LlamaParse for text and images,
preserving chunk-level metadata including specific image paths per chunk.
Includes detailed logging for image metadata assignment.
Args:
directory: The path to the directory containing documents.
session_id: Optional session identifier for context.
Returns:
A list of LlamaIndexDocument objects representing the processed chunks,
ready for indexing. Returns an empty list on failure or if no documents
are processed.
"""
parser_text = None
parser_images = None
try:
# Ensure images directory exists
os.makedirs(IMAGES_DIRECTORY, exist_ok=True)
log_structured('info', f'Starting document processing for directory: {directory}', {
'session_id': session_id,
'directory_name': os.path.basename(directory),
'directory_exists': os.path.exists(directory),
'files_present': os.listdir(directory) if os.path.exists(directory) else 'N/A'
})
if not os.path.exists(directory) or not os.path.isdir(directory):
log_structured('error', f'Directory not found or is not a directory: {directory}', {'session_id': session_id})
return []
if not os.listdir(directory):
log_structured('warning', f'Directory is empty, skipping: {directory}', {'session_id': session_id})
return []
# --- LlamaParse Initialization ---
try:
# Separate clients recommended if making concurrent calls, but can share if sequential
custom_client_text = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT)
custom_client_images = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT)
parser_text = LlamaParse(
result_type="markdown",
add_page_breaks=False,
system_prompt="Extract all content including text, tables, and formatting. Preserve structure.",
premium_mode=False, # Set based on your LlamaCloud plan
max_timeout=LLAMA_PARSE_MAX_TIMEOUT,
custom_client=custom_client_text,
verbose=True # Enable verbose logging for LlamaParse
)
parser_images = LlamaParse(
result_type="markdown", # Less critical here, parsing for images primarily
add_page_breaks=False,
system_prompt="Generate page images of the document.",
use_vendor_multimodal_model=True, # Assuming you want LlamaParse's image gen
vendor_multimodal_model_name=LLAMA_PARSE_VENDOR_MODEL,
premium_mode=False, # Set based on your LlamaCloud plan
max_timeout=LLAMA_PARSE_MAX_TIMEOUT,
custom_client=custom_client_images,
verbose=True # Enable verbose logging for LlamaParse
)
log_structured('info', 'Initialized LlamaParse (text & image capability)', {'session_id': session_id})
except Exception as parser_err:
log_structured('error', f'Error initializing dual LlamaParse capability: {parser_err}. Falling back to text-only.', {
'session_id': session_id, 'traceback': traceback.format_exc()
})
if parser_text and hasattr(parser_text, 'aclose'): await parser_text.aclose()
if parser_images and hasattr(parser_images, 'aclose'): await parser_images.aclose()
parser_images = None # Disable image parsing
try:
# Ensure client is fresh for fallback
custom_client_text = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT)
parser_text = LlamaParse(
result_type="markdown",
language="en",
add_page_breaks=False,
merge_consecutive=True,
max_timeout=LLAMA_PARSE_MAX_TIMEOUT,
custom_client=custom_client_text,
verbose=True
)
log_structured('info', 'Initialized LlamaParse with fallback text-only parser.', {'session_id': session_id})
except Exception as fallback_err:
log_structured('critical', f'FATAL: Error initializing fallback LlamaParse: {fallback_err}', {
'session_id': session_id, 'traceback': traceback.format_exc()
})
if parser_text and hasattr(parser_text, 'aclose'): await parser_text.aclose() # Cleanup
return []
if not parser_text:
log_structured('critical', 'FATAL: Text parser (parser_text) could not be initialized.')
return []
file_extractor = {
".pdf": parser_text, ".docx": parser_text, ".doc": parser_text,
".pptx": parser_text, ".ppt": parser_text, ".txt": parser_text,
# Add others if your LlamaParse supports them
# ".html": parser_text, ".md": parser_text,
}
supported_extensions = list(file_extractor.keys())
# --- Custom File Reader Async Function ---
async def custom_file_reader():
processed_chunks: List[LlamaIndexDocument] = []
images_by_file_and_page: Dict[str, Dict[int, str]] = {} # filename -> { page_num_1_based -> image_filename }
all_files_paths = []
log_structured('debug', f'Searching for supported files in: {directory}', {'extensions': supported_extensions, 'session_id': session_id})
for ext in supported_extensions:
all_files_paths.extend(list(Path(directory).glob(f"*{ext}")))
all_files_paths.extend(list(Path(directory).glob(f"*{ext.upper()}"))) # Case-insensitive glob
all_files_paths = sorted(list(set(all_files_paths))) # Remove duplicates and sort
log_structured('info', f'Found {len(all_files_paths)} supported files in {directory}', {'session_id': session_id, 'files': [f.name for f in all_files_paths]})
if not all_files_paths:
log_structured('warning', f'No supported files found in directory: {directory}', {'session_id': session_id})
return []
for file_path in all_files_paths:
file_path_str = str(file_path)
filename = file_path.name
log_structured('info', f'Processing file: {filename}', {'session_id': session_id, 'path': file_path_str})
ext = file_path.suffix.lower()
current_text_parser = file_extractor.get(ext)
if not current_text_parser:
log_structured('warning', f'Skipping file {filename}: No parser defined for extension {ext}.', {'session_id': session_id})
continue
# 1. Extract Text Chunks
text_chunks: List[LlamaIndexDocument] = []
try:
log_structured('debug', f'Calling parser_text.aload_data for {filename}', {'session_id': session_id})
# LlamaParse aload_data returns List[Document]
text_chunks = await current_text_parser.aload_data(file_path_str)
log_structured('info', f'Extracted {len(text_chunks)} text chunks from {filename}', {'session_id': session_id})
if not text_chunks:
log_structured('warning', f'No text chunks extracted from {filename}. File might be empty or parser issue.', {'session_id': session_id})
# Continue to attempt image extraction for this file
except Exception as text_err:
log_structured('error', f'Error extracting text from {filename}', {
'session_id': session_id, 'error': str(text_err), 'traceback': traceback.format_exc()
})
continue # Skip to next file if text extraction fails critically
# 2. Extract Images (if parser_images is available)
file_page_images: Dict[int, str] = {} # { page_num_1_based: image_filename }
if parser_images:
try:
log_structured('debug', f'Attempting image extraction for: {filename}', {'session_id': session_id})
temp_image_dir = IMAGES_DIRECTORY / f"temp_img_{filename}_{uuid.uuid4().hex[:8]}"
os.makedirs(temp_image_dir, exist_ok=True)
# Use synchronous methods for simplicity here, or manage async complexity
# NOTE: Check LlamaParse documentation for the best way to get images.
# This assumes get_json_result and get_images are available and work this way.
# If these are async, you'll need `await parser_images.aget_json_result(...)` etc.
# Running sync methods in executor to avoid blocking event loop
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
md_json_objs = await loop.run_in_executor(pool, parser_images.get_json_result, file_path_str)
log_structured('debug', f'Got JSON result for images from {filename}', {'session_id': session_id})
image_dicts = await loop.run_in_executor(pool, parser_images.get_images, md_json_objs, temp_image_dir)
log_structured('info', f'LlamaParse reported {len(image_dicts)} images potentially extracted for {filename}', {'session_id': session_id})
saved_image_count = 0
for idx, img_info in enumerate(image_dicts):
# LlamaParse might return 0-based 'page' index
page_index_0_based = img_info.get('page', idx) # Default to list index if 'page' missing
page_num_1_based = page_index_0_based + 1 # Convert to 1-based for consistency
# log_structured('debug', f'Processing image info for {filename}, index {idx}', {'img_info': img_info, 'page_1_based': page_num_1_based})
if 'path' in img_info and img_info['path'] and os.path.exists(img_info['path']):
source_img_path = img_info['path']
image_filename = f"{os.path.splitext(filename)[0]}_page{page_num_1_based}_{uuid.uuid4().hex[:6]}.png"
dest_path = IMAGES_DIRECTORY / image_filename
try:
shutil.copy2(source_img_path, dest_path) # copy2 preserves metadata
file_page_images[page_num_1_based] = image_filename # Map 1-based page num
saved_image_count += 1
log_structured('debug', f'Saved image for {filename} page {page_num_1_based} (1-based) to {dest_path}', {'session_id': session_id})
except Exception as copy_err:
log_structured('error', f'Error copying image for {filename} page {page_num_1_based}', {'session_id': session_id, 'error': str(copy_err)})
else:
log_structured('warning', f'No valid image path found or file missing for image index {idx} (reported page {page_index_0_based}) from {filename}. Path: {img_info.get("path", "N/A")}', {'session_id': session_id})
log_structured('info', f'Successfully saved {saved_image_count} images for {filename}', {'session_id': session_id})
images_by_file_and_page[filename] = file_page_images
# Clean up temporary directory
try:
shutil.rmtree(temp_image_dir)
except Exception as cleanup_err:
log_structured('error', f'Error cleaning up temp image dir {temp_image_dir}', {'session_id': session_id, 'error': str(cleanup_err)})
except Exception as img_err:
log_structured('error', f'Error during image extraction pipeline for {filename}', {
'session_id': session_id, 'error': str(img_err), 'traceback': traceback.format_exc()
})
images_by_file_and_page[filename] = {} # Ensure empty dict on error
else:
log_structured('info', f'Image parsing skipped for {filename} (parser_images is None).', {'session_id': session_id})
images_by_file_and_page[filename] = {}
# 3. Add Metadata to Each Text Chunk
log_structured('debug', f'Assigning metadata and images to {len(text_chunks)} chunks for {filename}', {'session_id': session_id})
for chunk_index, chunk in enumerate(text_chunks):
if not hasattr(chunk, 'metadata') or chunk.metadata is None: chunk.metadata = {}
chunk.metadata['filename'] = filename
chunk.metadata['file_path'] = file_path_str
chunk.metadata['source'] = "document" # Or more specific if needed
chunk.metadata['type'] = os.path.basename(directory) # e.g., 'brief', 'supporting'
chunk.metadata['chunk_index'] = chunk_index
# --- Determine 1-based Page Number ---
page_num_1_based: Optional[int] = None
page_source_key: Optional[str] = None
# Priority: LlamaParse metadata keys
page_key_options = ['page_label', 'page_number', 'page'] # Check common keys
for key in page_key_options:
if key in chunk.metadata:
try:
page_num_1_based = int(chunk.metadata[key]) # Assume it's 1-based from LlamaParse
if page_num_1_based > 0: # Basic sanity check
page_source_key = key
chunk.metadata["source_page"] = page_num_1_based # Store consistently
log_structured('debug', f"Found page {page_num_1_based} from metadata key '{key}'", {'filename': filename, 'chunk': chunk_index, 'session_id': session_id})
break
else:
page_num_1_based = None # Ignore non-positive page numbers
except (ValueError, TypeError): pass # Ignore if not integer
# Fallback: If no page found in metadata, try approximation (less reliable)
# This part is optional and might be inaccurate. Consider removing if metadata is reliable.
if page_num_1_based is None:
# Simple fallback: use chunk index as a proxy (very rough)
page_num_1_based = chunk_index + 1 # Treat first chunk as page 1 approx.
chunk.metadata["source_page"] = page_num_1_based
page_source_key = "position_fallback"
log_structured('debug', f"Using position fallback for page: {page_num_1_based}", {'filename': filename, 'chunk': chunk_index, 'session_id': session_id})
# --- Precise Page-Based Image Assignment ---
chunk.metadata['image_paths'] = [] # Initialize/reset
specific_image_filename = None
if page_num_1_based is not None:
file_images_dict = images_by_file_and_page.get(filename, {})
if file_images_dict:
# Try exact match first
if page_num_1_based in file_images_dict:
specific_image_filename = file_images_dict[page_num_1_based]
chunk.metadata['image_match_type'] = 'exact'
log_structured('debug', f'Exact page match for image page {page_num_1_based}', {'filename': filename, 'chunk': chunk_index, 'image': specific_image_filename})
else:
# Optional: Find nearest page if exact match fails (can be noisy)
# nearest_page = min(file_images_dict.keys(), key=lambda x: abs(x - page_num_1_based))
# if abs(nearest_page - page_num_1_based) <= 1: # Only if very close? Threshold needed.
# specific_image_filename = file_images_dict[nearest_page]
# chunk.metadata['image_match_type'] = 'nearest'
# chunk.metadata["image_nearest_page"] = nearest_page
# log_structured('debug', f'Nearest page match: requested {page_num_1_based}, using {nearest_page}', {'filename': filename, 'chunk': chunk_index, 'image': specific_image_filename})
pass # Default: No image if no exact match
if specific_image_filename:
chunk.metadata['image_paths'] = [specific_image_filename] # Store as list
# Final metadata log before adding chunk
# log_structured('debug', "Final chunk metadata check", {
# 'session_id': session_id, 'filename': filename, 'chunk': chunk_index,
# 'page': chunk.metadata.get('source_page'), 'page_src': page_source_key,
# 'image': chunk.metadata.get('image_paths'), 'match_type': chunk.metadata.get('image_match_type'),
# 'keys': list(chunk.metadata.keys())
# })
processed_chunks.append(chunk)
# --- End of file loop ---
log_structured('info', f'Finished processing files. Total chunks: {len(processed_chunks)}', {'session_id': session_id})
return processed_chunks
# --- Execute the async file reader ---
final_documents: List[LlamaIndexDocument] = await custom_file_reader()
log_structured('info', 'Directory processing complete.', {
'session_id': session_id, 'directory': directory, 'total_chunks_generated': len(final_documents)
})
# Log sample metadata for verification
# for i, doc in enumerate(final_documents[:2]):
# log_structured('debug', f'Final Doc Sample #{i} Metadata', {'metadata': doc.metadata})
return final_documents
except Exception as e:
log_structured('critical', 'FATAL Error in process_documents_in_directory', {
'session_id': session_id, 'directory': directory, 'error': str(e), 'traceback': traceback.format_exc()
})
return [] # Return empty list on major failure
finally:
# Ensure async clients are closed
log_structured('debug', 'Closing LlamaParse clients.', {'session_id': session_id})
if parser_text and hasattr(parser_text, 'aclose'):
try: await parser_text.aclose()
except Exception as close_err: log_structured('error', 'Error closing parser_text client', {'error': str(close_err)})
if parser_images and hasattr(parser_images, 'aclose'):
try: await parser_images.aclose()
except Exception as close_err: log_structured('error', 'Error closing parser_images client', {'error': str(close_err)})
# --- Global Index Initialization ---
async def initialize_global_index() -> bool:
"""Initialize the global index from Netflix documents at startup."""
# Use shared state instead of module-level globals
try:
# --- Configure LLM and Embedding Model ---
# Check for real API keys
openai_key = os.environ.get("OPENAI_API_KEY", "")
if not openai_key:
log_structured('critical', 'No OpenAI API key provided. Make sure OPENAI_API_KEY is set in your .env file.')
return False
# Log the first few characters of the key for debugging (important to verify it's loaded)
key_preview = openai_key[:4] + "..." if len(openai_key) > 4 else "invalid"
log_structured('info', f'Using OpenAI API key starting with: {key_preview}')
# Normal initialization with real API keys
try:
llm = LlamaOpenAI(
model=LLM_MODEL,
temperature=LLM_TEMPERATURE,
timeout=LLM_TIMEOUT
)
# Test the LLM with a simple prompt to ensure it's working
test_prompt = "Say 'API key is working' if you can read this."
_ = llm.complete(test_prompt)
log_structured('info', 'Successfully tested LLM API connection')
except Exception as llm_err:
log_structured('critical', f'Failed to initialize LLM with provided API key: {str(llm_err)}')
return False
embed_model = OpenAIEmbedding(model=EMBEDDING_MODEL)
# --- Configure Global Settings ---
Settings.llm = llm
Settings.embed_model = embed_model
Settings.callback_manager = CallbackManager([token_counter])
# Settings.chunk_size = NODE_PARSER_CHUNK_SIZE # If using SentenceSplitter
# Settings.chunk_overlap = NODE_PARSER_CHUNK_OVERLAP
# Use Semantic Splitter (more robust, less config needed here)
node_parser = SemanticSplitterNodeParser(
buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model # Adjust threshold
)
Settings.node_parser = node_parser
# Alternatively, configure SentenceSplitter globally
# Settings.node_parser = SentenceSplitter(
# chunk_size=NODE_PARSER_CHUNK_SIZE,
# chunk_overlap=NODE_PARSER_CHUNK_OVERLAP,
# # ... other SentenceSplitter params
# )
# --- Load or Build Index ---
if INDEX_PERSIST_PATH.exists():
log_structured('info', f'Loading existing index from {INDEX_PERSIST_PATH}')
storage_context = StorageContext.from_defaults(persist_dir=str(INDEX_PERSIST_PATH))
index = load_index_from_storage(storage_context)
# Save to shared state
set_global_index(index)
log_structured('info', 'Successfully loaded existing index')
# Attempt to load or recreate GraphRAG components
try:
log_structured('info', 'Attempting to recreate GraphRAG components from loaded index')
# First, try to connect to Neo4j to check if it has data
from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore
property_graph_store = Neo4jPropertyGraphStore(
username=NEO4J_USERNAME,
password=NEO4J_PASSWORD,
url=NEO4J_URL
)
log_structured('info', 'Successfully connected to Neo4j database')
# Create temporary GraphRAGStore to check for existing data
temp_graph_store = GraphRAGStore(property_graph_store)
triplets = temp_graph_store.get_triplets()
neo4j_has_data = len(triplets) > 0
if neo4j_has_data:
# Neo4j already has data, just use it
log_structured('info', f'Neo4j contains {len(triplets)} triplets. Using existing data.')
graph_store, property_graph_index = create_graph_components(
llm=llm,
force_reindex=False # Use existing Neo4j data
)
log_structured('info', 'GraphRAG components loaded from existing Neo4j data')
else:
# Neo4j is empty, need to extract nodes from the vector index to populate it
log_structured('info', 'Neo4j is empty. Extracting nodes from vector index for GraphRAG indexing.')
# Get all nodes from the vector index
from llama_index.core.schema import TextNode
vector_nodes = []
# Extract nodes from the index's docstore
if hasattr(index, 'docstore') and index.docstore:
docstore_nodes = list(index.docstore.docs.values())
vector_nodes.extend(docstore_nodes)
log_structured('info', f'Retrieved {len(docstore_nodes)} nodes from index docstore')
# Handle cases where we can't get nodes directly
if not vector_nodes:
log_structured('warning', 'Could not retrieve nodes from vector index. GraphRAG indexing will be skipped.')
raise ValueError("No nodes could be retrieved from the vector index for GraphRAG indexing")
# Now create GraphRAG components with the retrieved nodes
graph_store, property_graph_index = create_graph_components(
llm=llm,
nodes=vector_nodes,
max_paths_per_chunk=10,
force_reindex=True # Force indexing since Neo4j is empty
)
log_structured('info', f'GraphRAG components created with {len(vector_nodes)} nodes from vector index')
except Exception as e:
log_structured('warning', f'Error recreating GraphRAG components: {e}. Continuing without GraphRAG.')
graph_store = None
else:
log_structured('info', f'No index found at {INDEX_PERSIST_PATH}. Creating new index from {NETFLIX_DOCS_FOLDER}')
if not NETFLIX_DOCS_FOLDER.exists() or not any(NETFLIX_DOCS_FOLDER.iterdir()):
log_structured('error', f'Netflix documents folder is missing or empty: {NETFLIX_DOCS_FOLDER}')
return False
# Process documents using LlamaParse
documents = await process_documents_in_directory(str(NETFLIX_DOCS_FOLDER), session_id="global_init")
if not documents:
log_structured('error', f'No documents processed from {NETFLIX_DOCS_FOLDER}. Index creation aborted.')
return False
log_structured('info', 'Creating vector store index...', {
'document_count': len(documents),
'sample_doc_metadata': documents[0].metadata if documents else None
})
# Build the index using the globally configured Settings (incl. node_parser)
index = VectorStoreIndex.from_documents(
documents=documents,
show_progress=True,
# service_context=service_context, # Old way, use Settings now
)
# Save to shared state
set_global_index(index)
# Persist the index
index.storage_context.persist(persist_dir=str(INDEX_PERSIST_PATH))
log_structured('info', f'Index created and persisted to {INDEX_PERSIST_PATH}')
# --- Create GraphRAG components ---
log_structured('info', 'Starting GraphRAG component creation')
try:
# Get nodes from the index's docstore for GraphRAG
docstore_nodes = []
if hasattr(index, 'docstore') and index.docstore:
docstore_nodes = list(index.docstore.docs.values())
log_structured('info', f'Retrieved {len(docstore_nodes)} nodes from index docstore for GraphRAG')
# If docstore is empty or doesn't exist, use the original documents
nodes_for_graph = docstore_nodes if docstore_nodes else documents
log_structured('info', f'Using {len(nodes_for_graph)} nodes for GraphRAG indexing')
# Create GraphRAG components
graph_store, property_graph_index = create_graph_components(
llm=llm,
nodes=nodes_for_graph,
max_paths_per_chunk=10,
force_reindex=True # Force indexing for new index creation
)
log_structured('info', 'GraphRAG components created successfully')
except Exception as graph_err:
log_structured('error', f'Error creating GraphRAG components: {graph_err}',
{'traceback': traceback.format_exc()})
# Continue without GraphRAG if there's an error
# --- Create Retriever and Query Engine ---
vector_store_info = VectorStoreInfo(
content_info="Netflix marketing reference materials, including GPD Key Art Playbook and supporting documents.",
metadata_info=[
MetadataInfo(name="filename", type="str", description="Filename of the source document"),
MetadataInfo(name="source_page", type="int", description="Approximate page number in the source document"),
MetadataInfo(name="image_paths", type="list[str]", description="List of image filenames associated with this chunk"),
# Add other relevant metadata fields here
],
)
all_retriever = VectorIndexAutoRetriever(
index=index, # Use the local variable, not global_index
vector_store_info=vector_store_info,
similarity_top_k=SIMILARITY_TOP_K,
verbose=True # Enable verbose logging for retriever
)
# Create standard vector-based query engine
all_query_engine = RetrieverQueryEngine.from_args( # Use from_args for clarity
retriever=all_retriever,
response_synthesizer=get_response_synthesizer(
response_mode=ResponseMode.COMPACT, # Or REFINE, TREE_SUMMARIZE etc.
# service_context=service_context # Old way
),
node_postprocessors=[
SimilarityPostprocessor(similarity_cutoff=SIMILARITY_CUTOFF),
],
# service_context=service_context # Old way
)
# Create GraphRAG query engine if components were created successfully
graphrag_query_engine = None
graph_store = locals().get('graph_store', None)
property_graph_index = locals().get('property_graph_index', None)
if graph_store is not None and property_graph_index is not None:
try:
# Ensure graph communities are built before creating query engine
if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built:
log_structured('info', 'Building graph communities before creating query engine')
try:
# Use gpt-4.1-mini model for community summaries (set in GraphRAGStore)
# The build_communities() method will first try to load from cache
# and will only rebuild and re-cache if cache loading fails
# It also tracks if communities are already built to avoid duplicate work
graph_store.build_communities()
log_structured('info', 'Communities built successfully (loaded from cache or built new)')
except Exception as comm_err:
log_structured('error', f'Error building communities: {comm_err}',
{'traceback': traceback.format_exc()})
# Continue with query engine creation even if communities failed
# Create a basic VectorIndexRetriever for GraphRAG
vector_retriever = VectorIndexRetriever(
index=index,
similarity_top_k=SIMILARITY_TOP_K
)
# Create the GraphRAG query engine - ensure all required fields are passed directly
try:
graphrag_query_engine = create_graphrag_query_engine(
vector_retriever=vector_retriever,
graph_store=graph_store,
llm=llm,
similarity_top_k=SIMILARITY_TOP_K
)
log_structured('info', 'GraphRAG query engine created successfully')
except Exception as e:
log_structured('error', f'Error creating GraphRAG query engine: {e}', {'traceback': traceback.format_exc()})
# Create a direct instance without using the factory function as a fallback
from graph_rag_integration import GraphRAGQueryEngine
graphrag_query_engine = GraphRAGQueryEngine(
vector_retriever=vector_retriever,
graph_store=graph_store,
llm=llm,
similarity_top_k=SIMILARITY_TOP_K
)
# Store GraphRAG components in shared state
set_graphrag_components(
graph_store=graph_store,
property_graph_index=property_graph_index,
graphrag_query_engine=graphrag_query_engine
)
log_structured('info', 'GraphRAG query engine created successfully')
except Exception as graph_engine_err:
log_structured('error', f'Error creating GraphRAG query engine: {graph_engine_err}',
{'traceback': traceback.format_exc()})
# Continue without GraphRAG query engine if there's an error
# --- Create Query Engine Tools ---
query_engine_tools_react = [
QueryEngineTool(
query_engine=all_query_engine,
metadata=ToolMetadata(
name="answer_questions_from_netflix_marketing_materials",
description="USE THIS TOOL FOR ALL QUERIES - Queries Netflix marketing materials (GPD Key Art Playbook, etc.) to answer questions about guidelines, workflows, and processes. Use this for specific lookups in the knowledge base."
),
),
]
# Add GraphRAG tool if available
if graphrag_query_engine is not None:
class GraphRAGTool(BaseTool):
def __init__(self, query_engine):
self.query_engine = query_engine
self._metadata = ToolMetadata(
name="answerquestionswith_graphrag",
description="USE THIS TOOL FOR ALL QUERIES - Queries Netflix marketing materials using both vector and graph-based retrieval for more comprehensive answers. Use this for complex questions that need context from multiple related documents."
)
@property
def metadata(self):
return self._metadata
def __call__(self, query_str: str) -> ToolOutput:
"""Run query through GraphRAG and generate synthesized answer."""
from shared_state import global_graphrag_query_engine
log_structured('info', 'GraphRAG Tool: Starting dual retrieval', {'query': query_str})
try:
# Get both vector and GraphRAG retrieval results
retrieval_result = self.query_engine.custom_query(query_str)
# Generate synthesized answer
final_answer = generate_final_answer(query_str, retrieval_result, self.query_engine.llm)
# Prepare the tool output
log_message = {
'vector_context_length': len(retrieval_result.get('vector_context', '')),
'graphrag_context_length': len(retrieval_result.get('graphrag_context', '')),
'vector_nodes_count': len(retrieval_result.get('vector_nodes', [])),
'community_ids': retrieval_result.get('community_ids', [])
}
log_structured('info', 'GraphRAG Tool: Retrieval complete', log_message)
# Extract vector nodes and create a properly structured raw_output that includes source_nodes
# This allows the routes.py code to extract images from these nodes
vector_nodes = retrieval_result.get('vector_nodes', [])
# Create a wrapper object that mimics the structure expected by routes.py for image extraction
class NodeWrapper:
def __init__(self, nodes):
self.source_nodes = nodes
# Preserve the original retrieval_result but add the source_nodes in the expected format
modified_raw_output = retrieval_result.copy()
modified_raw_output['source_nodes'] = vector_nodes
# Add a source_nodes property that routes.py will look for
tool_output = ToolOutput(
content=final_answer,
tool_name="GraphRAG",
raw_output=NodeWrapper(vector_nodes),
raw_input={"query": query_str},
metadata={
'source': 'graphrag',
'retrieval_stats': log_message,
'original_result': modified_raw_output # Store the original result too
}
)
log_structured('debug', 'GraphRAG Tool: Including image metadata in response',
{'node_count': len(vector_nodes)})
return tool_output
except Exception as graphrag_err:
log_structured('error', f'Error in GraphRAG tool: {graphrag_err}',
{'traceback': traceback.format_exc()})
return ToolOutput(
content=f"I encountered an error while retrieving information: {str(graphrag_err)}. Please try again or use the standard query tool.",
tool_name="GraphRAG",
raw_input={"query": query_str},
raw_output={"error": str(graphrag_err)},
metadata={'error': str(graphrag_err)}
)
async def acall(self, input: str) -> ToolOutput:
"""Async version of __call__."""
return self.__call__(input)
# Create the GraphRAG tool instance and add it to tools
graphrag_tool = GraphRAGTool(graphrag_query_engine)
query_engine_tools_react.append(graphrag_tool)
log_structured('info', 'Added GraphRAG tool to query engine tools')
# --- Initialize Global Workflow Agent ---
# We're now using shared_state rather than module globals
try:
# Create the agent instance
agent = ReActAgent2(
llm=llm, # Use the LLM configured via Settings
tools=query_engine_tools_react,
memory=ChatMemoryBuffer.from_defaults(llm=llm, token_limit=4096), # Give memory its own LLM ref
verbose=True, # Enable agent verbose logging
timeout=AGENT_TIMEOUT,
llm_timeout=LLM_TIMEOUT,
tool_timeout=TOOL_EXECUTION_TIMEOUT
)
# Store in shared state
set_global_agent(agent)
log_structured('info', 'Agent initialized successfully')
except Exception as agent_err:
log_structured('critical', f'Failed to initialize agent: {str(agent_err)}', {'error': str(agent_err)})
return False # Signal failure
try:
# Define a simpler version of the run method
async def simple_run(query_text):
"""Simple version that doesn't rely on complex workflow steps."""
# Import from shared state to ensure we use the current global agent
from shared_state import global_workflow_agent
if not global_workflow_agent:
log_structured('critical', 'Agent is None in simple_run - this should never happen')
return {
"response": "The AI system is currently unavailable. Please try again later.",
"sources": [],
"reasoning": []
}
try:
# Don't reset memory - we want to preserve conversation context between requests
# Only reset sources to track new sources for this specific response
global_workflow_agent.sources = []
# 1. Add the user query to memory
user_msg = ChatMessage(role="user", content=str(query_text))
global_workflow_agent.memory.put(user_msg)
# 2. Format the chat history
chat_history = global_workflow_agent.memory.get()
llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history)
# 3. Get LLM response
response = await asyncio.wait_for(
global_workflow_agent.llm.achat(llm_input),
timeout=global_workflow_agent.llm_timeout
)
reasoning_step = global_workflow_agent.output_parser.parse(response.message.content)
# Add detailed logging
log_structured('debug', 'Parsed reasoning step', {
'step_type': type(reasoning_step).__name__,
'has_response': hasattr(reasoning_step, 'response'),
'has_action': hasattr(reasoning_step, 'action'),
'action': getattr(reasoning_step, 'action', None),
'action_input': getattr(reasoning_step, 'action_input', None),
'raw_content': response.message.content[:200] # First 200 chars for debugging
})
# 4. Process the response (simplified)
# Force tool usage for all queries - don't allow direct responses
if hasattr(reasoning_step, 'response') and reasoning_step.response:
# If LLM gave a direct response, we need to force tool usage instead
log_structured('debug', 'LLM provided direct response, forcing tool usage for image retrieval')
# Force tool execution by calling GraphRAG directly
graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
if graphrag_tool:
try:
tool_output = await asyncio.wait_for(
graphrag_tool.acall(input=str(query_text)),
timeout=global_workflow_agent.tool_timeout
)
global_workflow_agent.sources.append(tool_output)
# Use the tool's response instead of the direct LLM response
response_text = str(tool_output.content) if tool_output and tool_output.content else str(reasoning_step.response)
assistant_msg = ChatMessage(role="assistant", content=response_text)
global_workflow_agent.memory.put(assistant_msg)
return {
"response": response_text,
"sources": global_workflow_agent.sources,
"reasoning": [reasoning_step]
}
except Exception as e:
log_structured('error', f'Error forcing GraphRAG tool execution: {str(e)}')
# Fall back to direct response if tool fails
# Fallback to cleaning the direct response
response_text = str(reasoning_step.response)
# Check for common thinking patterns and remove them
import re # Import re within the function scope to ensure it's available
thinking_patterns = [
r'(?i)^.*?thinking:.*?\n', # Remove lines starting with "Thinking:"
r'(?i)<thinking>.*?</thinking>', # Remove XML-like thinking tags
r'(?i)\[thinking\].*?\[/thinking\]', # Remove bracket thinking tags
r'(?i)I\'m thinking:.*?\n', # Remove "I'm thinking:" sections
r'(?i)Let me think.*?\n', # Remove "Let me think" sections
r'(?i)Thought:.*?Answer:', # Remove "Thought: ... Answer:" pattern
r'(?i)^Answer:\s*' # Remove just the "Answer:" prefix
]
for pattern in thinking_patterns:
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
# Remove extra newlines that might be left after cleaning
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
response_text = response_text.strip()
assistant_msg = ChatMessage(role="assistant", content=response_text)
global_workflow_agent.memory.put(assistant_msg)
return {
"response": response_text,
"sources": global_workflow_agent.sources,
"reasoning": [reasoning_step]
}
else:
# Handle tool calls if the response indicates an action
if hasattr(reasoning_step, 'action') and reasoning_step.action:
tool_name = reasoning_step.action
action_input = reasoning_step.action_input or {}
# Convert to dict if needed
if not isinstance(action_input, dict):
try:
if isinstance(action_input, str) and action_input.strip().startswith('{'):
action_input = json.loads(action_input)
else:
action_input = {'query': action_input}
except:
action_input = {'query': str(action_input)}
# Find the tool
tool = next((t for t in global_workflow_agent.tools if t.metadata.name == tool_name), None)
if not tool:
error_msg = f"Tool '{tool_name}' not found. Available tools: {[t.metadata.name for t in global_workflow_agent.tools]}"
log_structured('error', 'Tool not found in simple_run', {'tool_name': tool_name, 'available_tools': [t.metadata.name for t in global_workflow_agent.tools]})
return {
"response": f"I tried to use a tool called '{tool_name}' but it's not available. Please try rephrasing your query.",
"sources": [],
"reasoning": [reasoning_step]
}
if tool:
try:
# Execute the tool
tool_output = await asyncio.wait_for(
tool.acall(**action_input),
timeout=global_workflow_agent.tool_timeout
)
global_workflow_agent.sources.append(tool_output)
# Get a final response
observation = str(tool_output.content if tool_output and tool_output.content is not None else "No content")
follow_up_msg = ChatMessage(role="user", content=f"Here is the result: {observation}\\nPlease provide a final response based on this information.")
global_workflow_agent.memory.put(follow_up_msg)
# Call LLM again for final response
chat_history = global_workflow_agent.memory.get()
llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history)
final_response = await asyncio.wait_for(
global_workflow_agent.llm.achat(llm_input),
timeout=global_workflow_agent.llm_timeout
)
# Store and return
# Clean the response to remove any thinking parts
response_text = str(final_response.message.content)
# Check for common thinking patterns and remove them
import re # Import re within the function scope to ensure it's available
thinking_patterns = [
r'(?i)^.*?thinking:.*?\n', # Remove lines starting with "Thinking:"
r'(?i)<thinking>.*?</thinking>', # Remove XML-like thinking tags
r'(?i)\[thinking\].*?\[/thinking\]', # Remove bracket thinking tags
r'(?i)I\'m thinking:.*?\n', # Remove "I'm thinking:" sections
r'(?i)Let me think.*?\n', # Remove "Let me think" sections
r'(?i)Thought:.*?Answer:', # Remove "Thought: ... Answer:" pattern
r'(?i)^Answer:\s*' # Remove just the "Answer:" prefix
]
for pattern in thinking_patterns:
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
# Remove extra newlines that might be left after cleaning
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
response_text = response_text.strip()
global_workflow_agent.memory.put(ChatMessage(role="assistant", content=response_text))
return {
"response": response_text,
"sources": global_workflow_agent.sources,
"reasoning": [reasoning_step]
}
except Exception as e:
log_structured('error', f'Error executing tool: {str(e)}', {'traceback': traceback.format_exc()})
# Error message is already clean without thinking
error_response = f"I encountered an error while processing your query: {str(e)}"
return {
"response": error_response,
"sources": [],
"reasoning": [reasoning_step]
}
# Enhanced fallback with thinking detection
if isinstance(reasoning_step, ActionReasoningStep):
# If we reach here, it means we had an action but couldn't execute it
thinking_response = f"Thought: {getattr(reasoning_step, 'thought', '')} Action: {reasoning_step.action} Action Input: {reasoning_step.action_input}"
log_structured('warning', 'Returning raw thinking due to failed action execution', {
'action': reasoning_step.action,
'action_input': reasoning_step.action_input
})
return {
"response": "I apologize, but I encountered an issue while processing your query. Please try asking your question in a different way.",
"sources": [],
"reasoning": [reasoning_step]
}
else:
# Original fallback for other types
fallback_response = "I wasn't able to find a specific answer to your question. Please try rephrasing your query."
return {
"response": fallback_response,
"sources": global_workflow_agent.sources,
"reasoning": [reasoning_step] if reasoning_step else []
}
except Exception as e:
log_structured('error', f'Error in simple_run: {str(e)}', {'traceback': traceback.format_exc()})
# Error response is already clean
error_response = f"I encountered an error while processing your query: {str(e)}"
return {
"response": error_response,
"sources": [],
"reasoning": []
}
# Replace the run method directly on the agent that is already in shared state
# We need to get the current reference from shared_state
from shared_state import global_workflow_agent as current_agent
if current_agent is None:
log_structured('critical', 'Cannot set run method - global_workflow_agent is None')
return False
# Attach the run method directly
current_agent.run = simple_run
# Verify it was attached correctly
if not hasattr(current_agent, 'run'):
log_structured('critical', 'Failed to attach run method to agent')
return False
log_structured('info', 'Successfully attached run method to agent')
# Test the agent is working by calling a simple method
log_structured('info', 'Testing agent functionality...')
# Test on the current_agent we imported above
if current_agent and hasattr(current_agent, 'memory') and hasattr(current_agent.memory, 'reset'):
current_agent.memory.reset()
log_structured('info', 'Agent memory reset test successful')
else:
if not current_agent:
log_structured('error', 'Agent memory test failed: agent is None')
elif not hasattr(current_agent, 'memory'):
log_structured('error', 'Agent memory test failed: agent has no memory attribute')
elif not hasattr(current_agent.memory, 'reset'):
log_structured('error', 'Agent memory test failed: agent.memory has no reset method')
else:
log_structured('error', 'Agent memory test failed: unknown reason')
return False
except Exception as method_err:
log_structured('critical', f'Failed to set up agent run method: {str(method_err)}',
{'error': str(method_err), 'traceback': traceback.format_exc()})
return False
log_structured('info', 'Global index and workflow agent initialized successfully.')
# Skip test query to avoid potential errors during startup
return True
except Exception as e:
log_structured('critical', 'Global index/agent initialization failed', {
'error': str(e), 'traceback': traceback.format_exc()
})
global_index = None
global_workflow_agent = None
return False