netflix/ai_core.py
michael fe0d881341 Speed up GraphRAG startup with triple caching and background init
Server now starts serving vector-only queries in ~1-2 minutes instead of
30-60 minutes. GraphRAG initializes in a background task and its tool is
dynamically added to the agent when ready.

- Cache extracted triples to disk (neo4j_triples.pickle) so Neo4j can be
  repopulated without expensive LLM re-extraction
- Split initialize_global_index() into initialize_vector_index() (fast) and
  initialize_graphrag_components() (background)
- Add graphrag_ready/graphrag_initializing status flags to shared_state
- Launch GraphRAG init as asyncio background task in main.py
- Report GraphRAG status in /status endpoint for frontend awareness
- Add comprehensive migration guide for applying to other projects

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 17:33:19 -06:00

1316 lines
No EOL
68 KiB
Python

# netflix_chatbot/ai_core.py
import os
import asyncio
import traceback
import uuid
import shutil
import re
import inspect
import json
from pathlib import Path
from typing import List, Dict, Any, Optional, Union
import httpx
import tiktoken
import nest_asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
# Import GraphRAG integration
from graph_rag_integration import (
create_graph_components,
create_graphrag_query_engine,
generate_final_answer,
GraphRAGExtractor,
GraphRAGStore,
GraphRAGQueryEngine
)
from llama_index.core import (
VectorStoreIndex,
Document as LlamaIndexDocument,
Settings,
get_response_synthesizer,
load_index_from_storage,
StorageContext,
SimpleDirectoryReader # Keep if needed for fallback/other uses
)
from llama_index.core.retrievers import VectorIndexAutoRetriever, VectorIndexRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.postprocessor import SimilarityPostprocessor
from llama_index.core.response_synthesizers import ResponseMode
from llama_index.llms.openai import OpenAI as LlamaOpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.callbacks import CallbackManager, TokenCountingHandler
from llama_index.core.tools import QueryEngineTool, ToolMetadata, BaseTool, ToolSelection, ToolOutput
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.llms import ChatMessage, LLM
from llama_index.core.vector_stores.types import MetadataInfo, VectorStoreInfo
from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser
from llama_index.core.agent.react.types import (
ActionReasoningStep,
ObservationReasoningStep,
ResponseReasoningStep,
BaseReasoningStep
)
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
Event,
)
from llama_index.core.node_parser import (
SentenceSplitter,
SemanticSplitterNodeParser,
)
from llama_parse import LlamaParse
# Import from our modules
from utils import logger, log_structured
from config import (
NETFLIX_DOCS_FOLDER, INDEX_PERSIST_PATH, IMAGES_DIRECTORY,
LLM_MODEL, EMBEDDING_MODEL, LLM_TEMPERATURE, LLM_TIMEOUT, AGENT_TIMEOUT,
TOOL_EXECUTION_TIMEOUT, SIMILARITY_TOP_K, SIMILARITY_CUTOFF,
LLAMA_PARSE_VENDOR_MODEL, LLAMA_PARSE_MAX_TIMEOUT,
NEO4J_URL, NEO4J_USERNAME, NEO4J_PASSWORD
)
nest_asyncio.apply()
# --- Global AI State ---
# Import shared state to ensure all modules access the same instances
from shared_state import (
global_index, global_workflow_agent,
global_graph_store, global_property_graph_index, global_graphrag_query_engine,
set_global_agent, set_global_index, set_graphrag_components,
set_graphrag_status
)
# --- Token Counter ---
try:
token_counter = TokenCountingHandler(
tokenizer=tiktoken.encoding_for_model(LLM_MODEL).encode
)
except Exception as e:
log_structured('warning', f'Could not initialize tiktoken for {LLM_MODEL}. Token counting may be inaccurate.', {'error': str(e)})
# Fallback tokenizer if needed, or disable token counting
token_counter = TokenCountingHandler(tokenizer=lambda text: list(text.encode("utf-8")))
# --- Custom ReAct Agent Workflow ---
class PrepEvent(Event): pass
class InputEvent(Event): input: list[ChatMessage]
class ToolCallEvent(Event): tool_calls: list[ToolSelection]
class FunctionOutputEvent(Event): output: ToolOutput
class CustomStartEvent(StartEvent): input_value: str = ""
class ReActAgent2(Workflow):
"""
Custom ReAct Agent implementation using LlamaIndex Workflows.
Includes timeout handling for LLM calls and tool execution.
"""
def __init__(
self,
llm: LLM,
tools: list[BaseTool],
memory: ChatMemoryBuffer,
timeout: float = AGENT_TIMEOUT, # Overall workflow timeout
llm_timeout: float = LLM_TIMEOUT, # Timeout for individual LLM calls
tool_timeout: float = TOOL_EXECUTION_TIMEOUT, # Timeout for individual tool calls
verbose: bool = True, # Add verbose flag
extra_context: str | None = None,
**kwargs: Any,
) -> None:
super().__init__(timeout=timeout, verbose=verbose, **kwargs) # Pass verbose to parent
self.llm = llm
self.tools = tools or []
self.memory = memory
self.formatter = ReActChatFormatter(context=extra_context or "")
self.output_parser = ReActOutputParser()
self.sources: List[ToolOutput] = [] # Store ToolOutput objects directly
self.llm_timeout = llm_timeout
self.tool_timeout = tool_timeout
self.verbose = verbose # Store verbose flag
@step
async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:
self.sources = [] # Clear sources for new message
# Store the initial input in the context
await ctx.set("user_input", ev) # Save the whole event
# We'll set a placeholder message for now and extract the actual input in the next step
user_msg = ChatMessage(role="user", content="Placeholder")
self.memory.put(user_msg)
await ctx.set("current_reasoning", [])
if self.verbose:
log_structured('debug', 'ReActAgent: Receiving new user message', {})
return PrepEvent()
@step
async def prepare_chat_history(self, ctx: Context, ev: PrepEvent) -> InputEvent:
# Get the real user input from the first step and update the memory
start_event = await ctx.get("user_input")
if hasattr(start_event, "input_value"):
# Try to access input_value directly if it exists
real_user_input = start_event.input_value
else:
# Otherwise, assume the actual input is the input value provided to run()
log_structured('info', 'ReActAgent: Extracting input from workflow input', {})
# Default to reasonable fallback if we can't extract it
real_user_input = "How can I help you with Netflix marketing guidelines?"
# Update the placeholder message with the real input
messages = self.memory.get()
if messages and messages[0].role == "user" and messages[0].content == "Placeholder":
# Replace the placeholder with real input
messages[0].content = str(real_user_input)
if self.verbose:
log_structured('debug', 'ReActAgent: Updated user message with real input', {'input': real_user_input})
# Format the chat history for the LLM
chat_history = self.memory.get()
current_reasoning = await ctx.get("current_reasoning", default=[])
llm_input = self.formatter.format(
self.tools, chat_history, current_reasoning=current_reasoning
)
if self.verbose:
log_structured('debug', 'ReActAgent: Prepared chat history for LLM', {'history_len': len(llm_input)})
return InputEvent(input=llm_input)
@step # The timeout is managed inside the method
async def handle_llm_input(self, ctx: Context, ev: InputEvent) -> Union[ToolCallEvent, StopEvent, PrepEvent]:
chat_history = ev.input
current_reasoning = await ctx.get("current_reasoning", default=[])
try:
if self.verbose:
log_structured('debug', 'ReActAgent: Sending request to LLM', {'history_len': len(chat_history)})
response = await asyncio.wait_for(
self.llm.achat(chat_history),
timeout=self.llm_timeout
)
reasoning_step = self.output_parser.parse(response.message.content)
current_reasoning.append(reasoning_step)
await ctx.set("current_reasoning", current_reasoning) # Update context state
if self.verbose:
log_structured('debug', 'ReActAgent: Received LLM response', {
'step_type': type(reasoning_step).__name__,
'is_done': getattr(reasoning_step, 'is_done', False),
'has_response': hasattr(reasoning_step, 'response'),
'has_action': hasattr(reasoning_step, 'action'),
'action': getattr(reasoning_step, 'action', None),
'raw_content_preview': response.message.content[:300]
})
if reasoning_step.is_done:
# Log what we're about to return as final response
if self.verbose:
log_structured('debug', 'ReActAgent: Processing final response', {
'raw_response_content': response.message.content,
'reasoning_step_response': str(getattr(reasoning_step, 'response', 'NO_RESPONSE_ATTR'))
})
# Clean the response to remove any thinking parts
response_text = str(reasoning_step.response)
# Check for common thinking patterns and remove them
import re # Import re within the function scope to ensure it's available
thinking_patterns = [
r'(?i)^.*?thinking:.*?\n', # Remove lines starting with "Thinking:"
r'(?i)<thinking>.*?</thinking>', # Remove XML-like thinking tags
r'(?i)\[thinking\].*?\[/thinking\]', # Remove bracket thinking tags
r'(?i)I\'m thinking:.*?\n', # Remove "I'm thinking:" sections
r'(?i)Let me think.*?\n', # Remove "Let me think" sections
r'(?i)Thought:.*?Action:.*?Action Input:.*', # Remove the specific pattern user reported
r'(?i)^Thought:.*', # Remove any line starting with "Thought:"
r'(?i)Action:.*?Action Input:.*', # Remove Action/Action Input patterns
r'(?i)Thought:.*?Answer:', # Remove "Thought: ... Answer:" pattern
r'(?i)^Answer:\s*' # Remove just the "Answer:" prefix
]
for pattern in thinking_patterns:
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
# Remove extra newlines that might be left after cleaning
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
response_text = response_text.strip()
# Final safety check - if response still looks like thinking, provide fallback
if re.search(r'(?i)^(Thought|Action|Observation):', response_text) or not response_text:
log_structured('warning', 'ReActAgent: Final response still contains thinking patterns, using fallback', {
'problematic_response': response_text[:200]
})
response_text = "I found information about your query in the Netflix marketing materials. Please let me know if you need more specific details."
self.memory.put(ChatMessage(role="assistant", content=response_text))
if self.verbose:
log_structured('info', 'ReActAgent: Final response generated', {
'response_preview': response_text[:100],
'sources_count': len(self.sources)
})
# Structure the final output
final_result = {
"response": response_text,
"sources": self.sources, # Pass the collected ToolOutput objects
"reasoning": current_reasoning
}
return StopEvent(result=final_result)
elif isinstance(reasoning_step, ActionReasoningStep):
if self.verbose:
log_structured('debug', 'ReActAgent: Action step identified', {
'action': reasoning_step.action,
'input': reasoning_step.action_input
})
# Ensure action_input is a dict
action_input_dict = reasoning_step.action_input or {}
if not isinstance(action_input_dict, dict):
log_structured('warning', 'ReActAgent: action_input is not a dict, attempting to parse', {'raw_input': action_input_dict})
try:
# Attempt basic parsing if it looks like JSON string
if isinstance(action_input_dict, str) and action_input_dict.strip().startswith('{'):
action_input_dict = json.loads(action_input_dict)
else: # Fallback: treat as a single 'query' arg if not dict/json
action_input_dict = {'query': action_input_dict}
except Exception as parse_err:
log_structured('error', 'ReActAgent: Failed to parse action_input', {'raw_input': action_input_dict, 'error': str(parse_err)})
action_input_dict = {'query': str(action_input_dict)} # Safest fallback
return ToolCallEvent(tool_calls=[
ToolSelection(
tool_id="tool_" + reasoning_step.action.replace(" ", "_"), # Simple ID generation
tool_name=reasoning_step.action,
tool_kwargs=action_input_dict
)
])
else:
# Handle other reasoning step types if necessary, or just proceed
if self.verbose:
log_structured('debug', 'ReActAgent: Non-action, non-done step encountered', {'step_type': type(reasoning_step).__name__})
return PrepEvent() # Continue the loop
except asyncio.TimeoutError:
error_msg = f"LLM call timed out after {self.llm_timeout} seconds."
log_structured('error', 'ReActAgent: LLM Timeout', {'timeout': self.llm_timeout})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
await ctx.set("current_reasoning", current_reasoning)
return PrepEvent() # Try again or maybe stop? Returning PrepEvent allows loop to continue.
except Exception as e:
error_msg = f"Error during LLM interaction or parsing: {str(e)}"
log_structured('error', 'ReActAgent: Error in handle_llm_input', {
'error': str(e), 'traceback': traceback.format_exc()
})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
await ctx.set("current_reasoning", current_reasoning)
# Decide whether to stop or continue after error
# Returning PrepEvent allows the agent to potentially describe the error or retry
return PrepEvent()
@step # The timeout is managed inside the method
async def handle_tool_calls(self, ctx: Context, ev: ToolCallEvent) -> PrepEvent:
tool_calls = ev.tool_calls
tools_by_name = {tool.metadata.name: tool for tool in self.tools} # Use .name property
current_reasoning = await ctx.get("current_reasoning", default=[])
if self.verbose:
log_structured('debug', 'ReActAgent: Handling tool calls', {
'call_count': len(tool_calls),
'tool_names': [tc.tool_name for tc in tool_calls]
})
for tool_call in tool_calls:
tool = tools_by_name.get(tool_call.tool_name)
if not tool:
error_msg = f"Tool '{tool_call.tool_name}' not found."
log_structured('error', 'ReActAgent: Tool not found', {'tool_name': tool_call.tool_name})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
continue # Skip to next tool call
try:
tool_kwargs = tool_call.tool_kwargs or {} # Ensure kwargs is a dict
if self.verbose:
log_structured('debug', 'ReActAgent: Executing tool', {
'tool_name': tool_call.tool_name,
'kwargs': tool_kwargs
})
# Execute tool potentially in thread pool with timeout
tool_func = partial(tool, **tool_kwargs)
tool_output: ToolOutput
if inspect.iscoroutinefunction(tool.call): # Check if tool's call method is async
tool_output = await asyncio.wait_for(
tool.acall(**tool_kwargs), # Use acall for async tools
timeout=self.tool_timeout
)
else:
# Run sync tool in thread pool
with ThreadPoolExecutor() as executor:
future = executor.submit(tool_func) # tool_func already has kwargs via partial
tool_output = await asyncio.to_thread(future.result, timeout=self.tool_timeout)
if self.verbose:
log_structured('debug', 'ReActAgent: Tool execution successful', {
'tool_name': tool_call.tool_name,
'output_type': type(tool_output).__name__,
'output_content_preview': str(tool_output.content)[:100] if tool_output else "N/A"
})
# Store the raw ToolOutput object which contains content, raw_output, metadata etc.
self.sources.append(tool_output)
# Add observation step with the tool's string content
observation_content = str(tool_output.content) if tool_output and tool_output.content is not None else "Tool executed successfully but returned no content."
current_reasoning.append(ObservationReasoningStep(observation=observation_content))
if self.verbose:
log_structured('debug', 'ReActAgent: Added observation to reasoning', {
'tool_name': tool_call.tool_name,
'observation_preview': observation_content[:200]
})
# --- Detailed logging for image metadata ---
if hasattr(tool_output, 'raw_output') and hasattr(tool_output.raw_output, 'source_nodes'):
for node_with_score in tool_output.raw_output.source_nodes:
node = getattr(node_with_score, 'node', None)
if node and hasattr(node, 'metadata'):
if 'image_paths' in node.metadata and node.metadata['image_paths']:
log_structured('debug', 'ReActAgent: Tool output node contains image paths', {
'tool_name': tool_call.tool_name,
'node_id': getattr(node, 'id_', 'N/A'),
'image_paths': node.metadata['image_paths']
})
except asyncio.TimeoutError:
error_msg = f"Tool '{tool_call.tool_name}' timed out after {self.tool_timeout} seconds."
log_structured('error', 'ReActAgent: Tool Timeout', {'tool_name': tool_call.tool_name, 'timeout': self.tool_timeout})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
except Exception as e:
error_msg = f"Error calling tool '{tool_call.tool_name}': {str(e)}"
log_structured('error', 'ReActAgent: Tool execution error', {
'tool_name': tool_call.tool_name, 'error': str(e), 'traceback': traceback.format_exc()
})
current_reasoning.append(ObservationReasoningStep(observation=error_msg))
# Update reasoning steps in context before returning
await ctx.set("current_reasoning", current_reasoning)
return PrepEvent() # Always return PrepEvent to continue the loop
# --- Document Processing ---
async def process_documents_in_directory(directory: str, session_id: Optional[str] = None) -> List[LlamaIndexDocument]:
"""
Process all documents in a directory using LlamaParse for text and images,
preserving chunk-level metadata including specific image paths per chunk.
Includes detailed logging for image metadata assignment.
Args:
directory: The path to the directory containing documents.
session_id: Optional session identifier for context.
Returns:
A list of LlamaIndexDocument objects representing the processed chunks,
ready for indexing. Returns an empty list on failure or if no documents
are processed.
"""
parser_text = None
parser_images = None
try:
# Ensure images directory exists
os.makedirs(IMAGES_DIRECTORY, exist_ok=True)
log_structured('info', f'Starting document processing for directory: {directory}', {
'session_id': session_id,
'directory_name': os.path.basename(directory),
'directory_exists': os.path.exists(directory),
'files_present': os.listdir(directory) if os.path.exists(directory) else 'N/A'
})
if not os.path.exists(directory) or not os.path.isdir(directory):
log_structured('error', f'Directory not found or is not a directory: {directory}', {'session_id': session_id})
return []
if not os.listdir(directory):
log_structured('warning', f'Directory is empty, skipping: {directory}', {'session_id': session_id})
return []
# --- LlamaParse Initialization ---
try:
# Separate clients recommended if making concurrent calls, but can share if sequential
custom_client_text = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT)
custom_client_images = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT)
parser_text = LlamaParse(
result_type="markdown",
add_page_breaks=False,
system_prompt="Extract all content including text, tables, and formatting. Preserve structure.",
premium_mode=False, # Set based on your LlamaCloud plan
max_timeout=LLAMA_PARSE_MAX_TIMEOUT,
custom_client=custom_client_text,
verbose=True # Enable verbose logging for LlamaParse
)
parser_images = LlamaParse(
result_type="markdown", # Less critical here, parsing for images primarily
add_page_breaks=False,
system_prompt="Generate page images of the document.",
use_vendor_multimodal_model=True, # Assuming you want LlamaParse's image gen
vendor_multimodal_model_name=LLAMA_PARSE_VENDOR_MODEL,
premium_mode=False, # Set based on your LlamaCloud plan
max_timeout=LLAMA_PARSE_MAX_TIMEOUT,
custom_client=custom_client_images,
verbose=True # Enable verbose logging for LlamaParse
)
log_structured('info', 'Initialized LlamaParse (text & image capability)', {'session_id': session_id})
except Exception as parser_err:
log_structured('error', f'Error initializing dual LlamaParse capability: {parser_err}. Falling back to text-only.', {
'session_id': session_id, 'traceback': traceback.format_exc()
})
if parser_text and hasattr(parser_text, 'aclose'): await parser_text.aclose()
if parser_images and hasattr(parser_images, 'aclose'): await parser_images.aclose()
parser_images = None # Disable image parsing
try:
# Ensure client is fresh for fallback
custom_client_text = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT)
parser_text = LlamaParse(
result_type="markdown",
language="en",
add_page_breaks=False,
merge_consecutive=True,
max_timeout=LLAMA_PARSE_MAX_TIMEOUT,
custom_client=custom_client_text,
verbose=True
)
log_structured('info', 'Initialized LlamaParse with fallback text-only parser.', {'session_id': session_id})
except Exception as fallback_err:
log_structured('critical', f'FATAL: Error initializing fallback LlamaParse: {fallback_err}', {
'session_id': session_id, 'traceback': traceback.format_exc()
})
if parser_text and hasattr(parser_text, 'aclose'): await parser_text.aclose() # Cleanup
return []
if not parser_text:
log_structured('critical', 'FATAL: Text parser (parser_text) could not be initialized.')
return []
file_extractor = {
".pdf": parser_text, ".docx": parser_text, ".doc": parser_text,
".pptx": parser_text, ".ppt": parser_text, ".txt": parser_text,
# Add others if your LlamaParse supports them
# ".html": parser_text, ".md": parser_text,
}
supported_extensions = list(file_extractor.keys())
# --- Custom File Reader Async Function ---
async def custom_file_reader():
processed_chunks: List[LlamaIndexDocument] = []
images_by_file_and_page: Dict[str, Dict[int, str]] = {} # filename -> { page_num_1_based -> image_filename }
all_files_paths = []
log_structured('debug', f'Searching for supported files in: {directory}', {'extensions': supported_extensions, 'session_id': session_id})
for ext in supported_extensions:
all_files_paths.extend(list(Path(directory).glob(f"*{ext}")))
all_files_paths.extend(list(Path(directory).glob(f"*{ext.upper()}"))) # Case-insensitive glob
all_files_paths = sorted(list(set(all_files_paths))) # Remove duplicates and sort
log_structured('info', f'Found {len(all_files_paths)} supported files in {directory}', {'session_id': session_id, 'files': [f.name for f in all_files_paths]})
if not all_files_paths:
log_structured('warning', f'No supported files found in directory: {directory}', {'session_id': session_id})
return []
for file_path in all_files_paths:
file_path_str = str(file_path)
filename = file_path.name
log_structured('info', f'Processing file: {filename}', {'session_id': session_id, 'path': file_path_str})
ext = file_path.suffix.lower()
current_text_parser = file_extractor.get(ext)
if not current_text_parser:
log_structured('warning', f'Skipping file {filename}: No parser defined for extension {ext}.', {'session_id': session_id})
continue
# 1. Extract Text Chunks
text_chunks: List[LlamaIndexDocument] = []
try:
log_structured('debug', f'Calling parser_text.aload_data for {filename}', {'session_id': session_id})
# LlamaParse aload_data returns List[Document]
text_chunks = await current_text_parser.aload_data(file_path_str)
log_structured('info', f'Extracted {len(text_chunks)} text chunks from {filename}', {'session_id': session_id})
if not text_chunks:
log_structured('warning', f'No text chunks extracted from {filename}. File might be empty or parser issue.', {'session_id': session_id})
# Continue to attempt image extraction for this file
except Exception as text_err:
log_structured('error', f'Error extracting text from {filename}', {
'session_id': session_id, 'error': str(text_err), 'traceback': traceback.format_exc()
})
continue # Skip to next file if text extraction fails critically
# 2. Extract Images (if parser_images is available)
file_page_images: Dict[int, str] = {} # { page_num_1_based: image_filename }
if parser_images:
try:
log_structured('debug', f'Attempting image extraction for: {filename}', {'session_id': session_id})
temp_image_dir = IMAGES_DIRECTORY / f"temp_img_{filename}_{uuid.uuid4().hex[:8]}"
os.makedirs(temp_image_dir, exist_ok=True)
# Use synchronous methods for simplicity here, or manage async complexity
# NOTE: Check LlamaParse documentation for the best way to get images.
# This assumes get_json_result and get_images are available and work this way.
# If these are async, you'll need `await parser_images.aget_json_result(...)` etc.
# Running sync methods in executor to avoid blocking event loop
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as pool:
md_json_objs = await loop.run_in_executor(pool, parser_images.get_json_result, file_path_str)
log_structured('debug', f'Got JSON result for images from {filename}', {'session_id': session_id})
image_dicts = await loop.run_in_executor(pool, parser_images.get_images, md_json_objs, temp_image_dir)
log_structured('info', f'LlamaParse reported {len(image_dicts)} images potentially extracted for {filename}', {'session_id': session_id})
saved_image_count = 0
for idx, img_info in enumerate(image_dicts):
# LlamaParse might return 0-based 'page' index
page_index_0_based = img_info.get('page', idx) # Default to list index if 'page' missing
page_num_1_based = page_index_0_based + 1 # Convert to 1-based for consistency
# log_structured('debug', f'Processing image info for {filename}, index {idx}', {'img_info': img_info, 'page_1_based': page_num_1_based})
if 'path' in img_info and img_info['path'] and os.path.exists(img_info['path']):
source_img_path = img_info['path']
image_filename = f"{os.path.splitext(filename)[0]}_page{page_num_1_based}_{uuid.uuid4().hex[:6]}.png"
dest_path = IMAGES_DIRECTORY / image_filename
try:
shutil.copy2(source_img_path, dest_path) # copy2 preserves metadata
file_page_images[page_num_1_based] = image_filename # Map 1-based page num
saved_image_count += 1
log_structured('debug', f'Saved image for {filename} page {page_num_1_based} (1-based) to {dest_path}', {'session_id': session_id})
except Exception as copy_err:
log_structured('error', f'Error copying image for {filename} page {page_num_1_based}', {'session_id': session_id, 'error': str(copy_err)})
else:
log_structured('warning', f'No valid image path found or file missing for image index {idx} (reported page {page_index_0_based}) from {filename}. Path: {img_info.get("path", "N/A")}', {'session_id': session_id})
log_structured('info', f'Successfully saved {saved_image_count} images for {filename}', {'session_id': session_id})
images_by_file_and_page[filename] = file_page_images
# Clean up temporary directory
try:
shutil.rmtree(temp_image_dir)
except Exception as cleanup_err:
log_structured('error', f'Error cleaning up temp image dir {temp_image_dir}', {'session_id': session_id, 'error': str(cleanup_err)})
except Exception as img_err:
log_structured('error', f'Error during image extraction pipeline for {filename}', {
'session_id': session_id, 'error': str(img_err), 'traceback': traceback.format_exc()
})
images_by_file_and_page[filename] = {} # Ensure empty dict on error
else:
log_structured('info', f'Image parsing skipped for {filename} (parser_images is None).', {'session_id': session_id})
images_by_file_and_page[filename] = {}
# 3. Add Metadata to Each Text Chunk
log_structured('debug', f'Assigning metadata and images to {len(text_chunks)} chunks for {filename}', {'session_id': session_id})
for chunk_index, chunk in enumerate(text_chunks):
if not hasattr(chunk, 'metadata') or chunk.metadata is None: chunk.metadata = {}
chunk.metadata['filename'] = filename
chunk.metadata['file_path'] = file_path_str
chunk.metadata['source'] = "document" # Or more specific if needed
chunk.metadata['type'] = os.path.basename(directory) # e.g., 'brief', 'supporting'
chunk.metadata['chunk_index'] = chunk_index
# --- Determine 1-based Page Number ---
page_num_1_based: Optional[int] = None
page_source_key: Optional[str] = None
# Priority: LlamaParse metadata keys
page_key_options = ['page_label', 'page_number', 'page'] # Check common keys
for key in page_key_options:
if key in chunk.metadata:
try:
page_num_1_based = int(chunk.metadata[key]) # Assume it's 1-based from LlamaParse
if page_num_1_based > 0: # Basic sanity check
page_source_key = key
chunk.metadata["source_page"] = page_num_1_based # Store consistently
log_structured('debug', f"Found page {page_num_1_based} from metadata key '{key}'", {'filename': filename, 'chunk': chunk_index, 'session_id': session_id})
break
else:
page_num_1_based = None # Ignore non-positive page numbers
except (ValueError, TypeError): pass # Ignore if not integer
# Fallback: If no page found in metadata, try approximation (less reliable)
# This part is optional and might be inaccurate. Consider removing if metadata is reliable.
if page_num_1_based is None:
# Simple fallback: use chunk index as a proxy (very rough)
page_num_1_based = chunk_index + 1 # Treat first chunk as page 1 approx.
chunk.metadata["source_page"] = page_num_1_based
page_source_key = "position_fallback"
log_structured('debug', f"Using position fallback for page: {page_num_1_based}", {'filename': filename, 'chunk': chunk_index, 'session_id': session_id})
# --- Precise Page-Based Image Assignment ---
chunk.metadata['image_paths'] = [] # Initialize/reset
specific_image_filename = None
if page_num_1_based is not None:
file_images_dict = images_by_file_and_page.get(filename, {})
if file_images_dict:
# Try exact match first
if page_num_1_based in file_images_dict:
specific_image_filename = file_images_dict[page_num_1_based]
chunk.metadata['image_match_type'] = 'exact'
log_structured('debug', f'Exact page match for image page {page_num_1_based}', {'filename': filename, 'chunk': chunk_index, 'image': specific_image_filename})
else:
# Optional: Find nearest page if exact match fails (can be noisy)
# nearest_page = min(file_images_dict.keys(), key=lambda x: abs(x - page_num_1_based))
# if abs(nearest_page - page_num_1_based) <= 1: # Only if very close? Threshold needed.
# specific_image_filename = file_images_dict[nearest_page]
# chunk.metadata['image_match_type'] = 'nearest'
# chunk.metadata["image_nearest_page"] = nearest_page
# log_structured('debug', f'Nearest page match: requested {page_num_1_based}, using {nearest_page}', {'filename': filename, 'chunk': chunk_index, 'image': specific_image_filename})
pass # Default: No image if no exact match
if specific_image_filename:
chunk.metadata['image_paths'] = [specific_image_filename] # Store as list
# Final metadata log before adding chunk
# log_structured('debug', "Final chunk metadata check", {
# 'session_id': session_id, 'filename': filename, 'chunk': chunk_index,
# 'page': chunk.metadata.get('source_page'), 'page_src': page_source_key,
# 'image': chunk.metadata.get('image_paths'), 'match_type': chunk.metadata.get('image_match_type'),
# 'keys': list(chunk.metadata.keys())
# })
processed_chunks.append(chunk)
# --- End of file loop ---
log_structured('info', f'Finished processing files. Total chunks: {len(processed_chunks)}', {'session_id': session_id})
return processed_chunks
# --- Execute the async file reader ---
final_documents: List[LlamaIndexDocument] = await custom_file_reader()
log_structured('info', 'Directory processing complete.', {
'session_id': session_id, 'directory': directory, 'total_chunks_generated': len(final_documents)
})
# Log sample metadata for verification
# for i, doc in enumerate(final_documents[:2]):
# log_structured('debug', f'Final Doc Sample #{i} Metadata', {'metadata': doc.metadata})
return final_documents
except Exception as e:
log_structured('critical', 'FATAL Error in process_documents_in_directory', {
'session_id': session_id, 'directory': directory, 'error': str(e), 'traceback': traceback.format_exc()
})
return [] # Return empty list on major failure
finally:
# Ensure async clients are closed
log_structured('debug', 'Closing LlamaParse clients.', {'session_id': session_id})
if parser_text and hasattr(parser_text, 'aclose'):
try: await parser_text.aclose()
except Exception as close_err: log_structured('error', 'Error closing parser_text client', {'error': str(close_err)})
if parser_images and hasattr(parser_images, 'aclose'):
try: await parser_images.aclose()
except Exception as close_err: log_structured('error', 'Error closing parser_images client', {'error': str(close_err)})
# --- GraphRAG Tool Class (module-level for reuse) ---
class GraphRAGTool(BaseTool):
"""Tool that queries Netflix marketing materials using both vector and graph-based retrieval."""
def __init__(self, query_engine):
self.query_engine = query_engine
self._metadata = ToolMetadata(
name="answerquestionswith_graphrag",
description="USE THIS TOOL FOR ALL QUERIES - Queries Netflix marketing materials using both vector and graph-based retrieval for more comprehensive answers. Use this for complex questions that need context from multiple related documents."
)
@property
def metadata(self):
return self._metadata
def __call__(self, query_str: str) -> ToolOutput:
"""Run query through GraphRAG and generate synthesized answer."""
log_structured('info', 'GraphRAG Tool: Starting dual retrieval', {'query': query_str})
try:
retrieval_result = self.query_engine.custom_query(query_str)
final_answer = generate_final_answer(query_str, retrieval_result, self.query_engine.llm)
log_message = {
'vector_context_length': len(retrieval_result.get('vector_context', '')),
'graphrag_context_length': len(retrieval_result.get('graphrag_context', '')),
'vector_nodes_count': len(retrieval_result.get('vector_nodes', [])),
'community_ids': retrieval_result.get('community_ids', [])
}
log_structured('info', 'GraphRAG Tool: Retrieval complete', log_message)
vector_nodes = retrieval_result.get('vector_nodes', [])
class NodeWrapper:
def __init__(self, nodes):
self.source_nodes = nodes
tool_output = ToolOutput(
content=final_answer,
tool_name="GraphRAG",
raw_output=NodeWrapper(vector_nodes),
raw_input={"query": query_str, "source": "graphrag", "retrieval_stats": log_message}
)
log_structured('debug', 'GraphRAG Tool: Including image metadata in response',
{'node_count': len(vector_nodes)})
return tool_output
except Exception as graphrag_err:
log_structured('error', f'Error in GraphRAG tool: {graphrag_err}',
{'traceback': traceback.format_exc()})
return ToolOutput(
content=f"I encountered an error while retrieving information: {str(graphrag_err)}. Please try again or use the standard query tool.",
tool_name="GraphRAG",
raw_input={"query": query_str},
raw_output={"error": str(graphrag_err)},
is_error=True
)
async def acall(self, input: str) -> ToolOutput:
"""Async version of __call__."""
return self.__call__(input)
# --- Phase 1: Vector Index Initialization (fast path) ---
async def initialize_vector_index() -> bool:
"""Initialize the vector index and agent with vector-only tools.
This is the fast path (~1-2 minutes) that makes the server usable.
GraphRAG components are initialized separately in the background.
Returns:
bool: True if vector index and agent were initialized successfully.
"""
try:
# --- Configure LLM and Embedding Model ---
openai_key = os.environ.get("OPENAI_API_KEY", "")
if not openai_key:
log_structured('critical', 'No OpenAI API key provided. Make sure OPENAI_API_KEY is set in your .env file.')
return False
key_preview = openai_key[:4] + "..." if len(openai_key) > 4 else "invalid"
log_structured('info', f'Using OpenAI API key starting with: {key_preview}')
try:
llm = LlamaOpenAI(
model=LLM_MODEL,
temperature=LLM_TEMPERATURE,
timeout=LLM_TIMEOUT
)
test_prompt = "Say 'API key is working' if you can read this."
_ = llm.complete(test_prompt)
log_structured('info', 'Successfully tested LLM API connection')
except Exception as llm_err:
log_structured('critical', f'Failed to initialize LLM with provided API key: {str(llm_err)}')
return False
embed_model = OpenAIEmbedding(model=EMBEDDING_MODEL)
# --- Configure Global Settings ---
Settings.llm = llm
Settings.embed_model = embed_model
Settings.callback_manager = CallbackManager([token_counter])
node_parser = SemanticSplitterNodeParser(
buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model
)
Settings.node_parser = node_parser
# --- Load or Build Vector Index ---
if INDEX_PERSIST_PATH.exists():
log_structured('info', f'Loading existing index from {INDEX_PERSIST_PATH}')
storage_context = StorageContext.from_defaults(persist_dir=str(INDEX_PERSIST_PATH))
index = load_index_from_storage(storage_context)
set_global_index(index)
log_structured('info', 'Successfully loaded existing index')
else:
log_structured('info', f'No index found at {INDEX_PERSIST_PATH}. Creating new index from {NETFLIX_DOCS_FOLDER}')
if not NETFLIX_DOCS_FOLDER.exists() or not any(NETFLIX_DOCS_FOLDER.iterdir()):
log_structured('error', f'Netflix documents folder is missing or empty: {NETFLIX_DOCS_FOLDER}')
return False
documents = await process_documents_in_directory(str(NETFLIX_DOCS_FOLDER), session_id="global_init")
if not documents:
log_structured('error', f'No documents processed from {NETFLIX_DOCS_FOLDER}. Index creation aborted.')
return False
log_structured('info', 'Creating vector store index...', {
'document_count': len(documents),
'sample_doc_metadata': documents[0].metadata if documents else None
})
index = VectorStoreIndex.from_documents(
documents=documents,
show_progress=True,
)
set_global_index(index)
index.storage_context.persist(persist_dir=str(INDEX_PERSIST_PATH))
log_structured('info', f'Index created and persisted to {INDEX_PERSIST_PATH}')
# --- Create Retriever and Query Engine (vector-only) ---
vector_store_info = VectorStoreInfo(
content_info="Netflix marketing reference materials, including GPD Key Art Playbook and supporting documents.",
metadata_info=[
MetadataInfo(name="filename", type="str", description="Filename of the source document"),
MetadataInfo(name="source_page", type="int", description="Approximate page number in the source document"),
MetadataInfo(name="image_paths", type="list[str]", description="List of image filenames associated with this chunk"),
],
)
all_retriever = VectorIndexAutoRetriever(
index=index,
vector_store_info=vector_store_info,
similarity_top_k=SIMILARITY_TOP_K,
verbose=True
)
all_query_engine = RetrieverQueryEngine.from_args(
retriever=all_retriever,
response_synthesizer=get_response_synthesizer(
response_mode=ResponseMode.COMPACT,
),
node_postprocessors=[
SimilarityPostprocessor(similarity_cutoff=SIMILARITY_CUTOFF),
],
)
# --- Create Query Engine Tools (vector-only initially) ---
query_engine_tools_react = [
QueryEngineTool(
query_engine=all_query_engine,
metadata=ToolMetadata(
name="answer_questions_from_netflix_marketing_materials",
description="USE THIS TOOL FOR ALL QUERIES - Queries Netflix marketing materials (GPD Key Art Playbook, etc.) to answer questions about guidelines, workflows, and processes. Use this for specific lookups in the knowledge base."
),
),
]
# --- Initialize Global Workflow Agent ---
try:
agent = ReActAgent2(
llm=llm,
tools=query_engine_tools_react,
memory=ChatMemoryBuffer.from_defaults(llm=llm, token_limit=4096),
verbose=True,
timeout=AGENT_TIMEOUT,
llm_timeout=LLM_TIMEOUT,
tool_timeout=TOOL_EXECUTION_TIMEOUT
)
set_global_agent(agent)
log_structured('info', 'Agent initialized with vector-only tools')
except Exception as agent_err:
log_structured('critical', f'Failed to initialize agent: {str(agent_err)}', {'error': str(agent_err)})
return False
try:
# Define a simpler version of the run method
async def simple_run(query_text):
"""Simple version that doesn't rely on complex workflow steps."""
from shared_state import global_workflow_agent
if not global_workflow_agent:
log_structured('critical', 'Agent is None in simple_run - this should never happen')
return {
"response": "The AI system is currently unavailable. Please try again later.",
"sources": [],
"reasoning": []
}
try:
global_workflow_agent.sources = []
# 1. Add the user query to memory
user_msg = ChatMessage(role="user", content=str(query_text))
global_workflow_agent.memory.put(user_msg)
# 2. Format the chat history
chat_history = global_workflow_agent.memory.get()
llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history)
# 3. Get LLM response
response = await asyncio.wait_for(
global_workflow_agent.llm.achat(llm_input),
timeout=global_workflow_agent.llm_timeout
)
reasoning_step = global_workflow_agent.output_parser.parse(response.message.content)
log_structured('debug', 'Parsed reasoning step', {
'step_type': type(reasoning_step).__name__,
'has_response': hasattr(reasoning_step, 'response'),
'has_action': hasattr(reasoning_step, 'action'),
'action': getattr(reasoning_step, 'action', None),
'action_input': getattr(reasoning_step, 'action_input', None),
'raw_content': response.message.content[:200]
})
# 4. Process the response
if hasattr(reasoning_step, 'response') and reasoning_step.response:
log_structured('debug', 'LLM provided direct response, forcing tool usage for image retrieval')
# Try GraphRAG tool first (dynamically checks current tools list)
graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None)
if graphrag_tool:
try:
tool_output = await asyncio.wait_for(
graphrag_tool.acall(input=str(query_text)),
timeout=global_workflow_agent.tool_timeout
)
global_workflow_agent.sources.append(tool_output)
response_text = str(tool_output.content) if tool_output and tool_output.content else str(reasoning_step.response)
assistant_msg = ChatMessage(role="assistant", content=response_text)
global_workflow_agent.memory.put(assistant_msg)
return {
"response": response_text,
"sources": global_workflow_agent.sources,
"reasoning": [reasoning_step]
}
except Exception as e:
log_structured('error', f'Error forcing GraphRAG tool execution: {str(e)}')
# Fallback to cleaning the direct response
response_text = str(reasoning_step.response)
import re
thinking_patterns = [
r'(?i)^.*?thinking:.*?\n',
r'(?i)<thinking>.*?</thinking>',
r'(?i)\[thinking\].*?\[/thinking\]',
r'(?i)I\'m thinking:.*?\n',
r'(?i)Let me think.*?\n',
r'(?i)Thought:.*?Answer:',
r'(?i)^Answer:\s*'
]
for pattern in thinking_patterns:
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
response_text = response_text.strip()
assistant_msg = ChatMessage(role="assistant", content=response_text)
global_workflow_agent.memory.put(assistant_msg)
return {
"response": response_text,
"sources": global_workflow_agent.sources,
"reasoning": [reasoning_step]
}
else:
# Handle tool calls
if hasattr(reasoning_step, 'action') and reasoning_step.action:
tool_name = reasoning_step.action
action_input = reasoning_step.action_input or {}
if not isinstance(action_input, dict):
try:
if isinstance(action_input, str) and action_input.strip().startswith('{'):
action_input = json.loads(action_input)
else:
action_input = {'query': action_input}
except:
action_input = {'query': str(action_input)}
tool = next((t for t in global_workflow_agent.tools if t.metadata.name == tool_name), None)
if not tool:
log_structured('error', 'Tool not found in simple_run', {'tool_name': tool_name, 'available_tools': [t.metadata.name for t in global_workflow_agent.tools]})
return {
"response": f"I tried to use a tool called '{tool_name}' but it's not available. Please try rephrasing your query.",
"sources": [],
"reasoning": [reasoning_step]
}
if tool:
try:
tool_output = await asyncio.wait_for(
tool.acall(**action_input),
timeout=global_workflow_agent.tool_timeout
)
global_workflow_agent.sources.append(tool_output)
observation = str(tool_output.content if tool_output and tool_output.content is not None else "No content")
follow_up_msg = ChatMessage(role="user", content=f"Here is the result: {observation}\\nPlease provide a final response based on this information.")
global_workflow_agent.memory.put(follow_up_msg)
chat_history = global_workflow_agent.memory.get()
llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history)
final_response = await asyncio.wait_for(
global_workflow_agent.llm.achat(llm_input),
timeout=global_workflow_agent.llm_timeout
)
response_text = str(final_response.message.content)
import re
thinking_patterns = [
r'(?i)^.*?thinking:.*?\n',
r'(?i)<thinking>.*?</thinking>',
r'(?i)\[thinking\].*?\[/thinking\]',
r'(?i)I\'m thinking:.*?\n',
r'(?i)Let me think.*?\n',
r'(?i)Thought:.*?Answer:',
r'(?i)^Answer:\s*'
]
for pattern in thinking_patterns:
response_text = re.sub(pattern, '', response_text, flags=re.DOTALL)
response_text = re.sub(r'\n{3,}', '\n\n', response_text)
response_text = response_text.strip()
global_workflow_agent.memory.put(ChatMessage(role="assistant", content=response_text))
return {
"response": response_text,
"sources": global_workflow_agent.sources,
"reasoning": [reasoning_step]
}
except Exception as e:
log_structured('error', f'Error executing tool: {str(e)}', {'traceback': traceback.format_exc()})
error_response = f"I encountered an error while processing your query: {str(e)}"
return {
"response": error_response,
"sources": [],
"reasoning": [reasoning_step]
}
# Enhanced fallback with thinking detection
if isinstance(reasoning_step, ActionReasoningStep):
log_structured('warning', 'Returning raw thinking due to failed action execution', {
'action': reasoning_step.action,
'action_input': reasoning_step.action_input
})
return {
"response": "I apologize, but I encountered an issue while processing your query. Please try asking your question in a different way.",
"sources": [],
"reasoning": [reasoning_step]
}
else:
fallback_response = "I wasn't able to find a specific answer to your question. Please try rephrasing your query."
return {
"response": fallback_response,
"sources": global_workflow_agent.sources,
"reasoning": [reasoning_step] if reasoning_step else []
}
except Exception as e:
log_structured('error', f'Error in simple_run: {str(e)}', {'traceback': traceback.format_exc()})
error_response = f"I encountered an error while processing your query: {str(e)}"
return {
"response": error_response,
"sources": [],
"reasoning": []
}
# Attach the run method to the agent
from shared_state import global_workflow_agent as current_agent
if current_agent is None:
log_structured('critical', 'Cannot set run method - global_workflow_agent is None')
return False
current_agent.run = simple_run
if not hasattr(current_agent, 'run'):
log_structured('critical', 'Failed to attach run method to agent')
return False
log_structured('info', 'Successfully attached run method to agent')
# Test the agent
if current_agent and hasattr(current_agent, 'memory') and hasattr(current_agent.memory, 'reset'):
current_agent.memory.reset()
log_structured('info', 'Agent memory reset test successful')
else:
if not current_agent:
log_structured('error', 'Agent memory test failed: agent is None')
elif not hasattr(current_agent, 'memory'):
log_structured('error', 'Agent memory test failed: agent has no memory attribute')
elif not hasattr(current_agent.memory, 'reset'):
log_structured('error', 'Agent memory test failed: agent.memory has no reset method')
else:
log_structured('error', 'Agent memory test failed: unknown reason')
return False
except Exception as method_err:
log_structured('critical', f'Failed to set up agent run method: {str(method_err)}',
{'error': str(method_err), 'traceback': traceback.format_exc()})
return False
log_structured('info', 'Vector index and agent initialized successfully (GraphRAG pending).')
return True
except Exception as e:
log_structured('critical', 'Vector index/agent initialization failed', {
'error': str(e), 'traceback': traceback.format_exc()
})
return False
# --- Phase 2: GraphRAG Background Initialization ---
async def initialize_graphrag_components() -> bool:
"""Initialize GraphRAG components in the background.
This connects to Neo4j, restores or extracts triples, builds communities,
creates the GraphRAG query engine, and adds the GraphRAG tool to the
existing agent. Can be called after the server is already serving requests.
Returns:
bool: True if GraphRAG components were initialized successfully.
"""
set_graphrag_status(initializing=True, ready=False, error=None)
try:
# Get the current index and agent from shared state
from shared_state import global_workflow_agent as current_agent
import shared_state
index = shared_state.global_index
if index is None:
raise RuntimeError("Vector index must be initialized before GraphRAG components")
if current_agent is None:
raise RuntimeError("Agent must be initialized before GraphRAG components")
llm = Settings.llm
# --- Connect to Neo4j and create/restore GraphRAG components ---
log_structured('info', 'Starting GraphRAG background initialization')
try:
from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore
property_graph_store = Neo4jPropertyGraphStore(
username=NEO4J_USERNAME,
password=NEO4J_PASSWORD,
url=NEO4J_URL
)
log_structured('info', 'Successfully connected to Neo4j database')
except Exception as e:
raise RuntimeError(f"Neo4j connection failed: {e}")
# Check Neo4j state
temp_graph_store = GraphRAGStore(property_graph_store)
triplets = temp_graph_store.get_triplets()
neo4j_has_data = len(triplets) > 0
log_structured('info', f'Neo4j state check: {len(triplets)} triplets found')
if neo4j_has_data:
# Neo4j already has data — use it directly
log_structured('info', f'Neo4j contains {len(triplets)} triplets. Using existing data.')
graph_store, property_graph_index = create_graph_components(
llm=llm,
force_reindex=False
)
log_structured('info', 'GraphRAG components loaded from existing Neo4j data')
else:
# Neo4j is empty — try cache restore, then fall back to LLM extraction
log_structured('info', 'Neo4j is empty. Will attempt cache restore or LLM extraction.')
# Get nodes from vector index for potential LLM extraction
vector_nodes = []
if hasattr(index, 'docstore') and index.docstore:
vector_nodes = list(index.docstore.docs.values())
log_structured('info', f'Retrieved {len(vector_nodes)} nodes from index docstore')
if not vector_nodes:
log_structured('warning', 'Could not retrieve nodes from vector index. GraphRAG indexing will be skipped.')
raise ValueError("No nodes could be retrieved from the vector index for GraphRAG indexing")
# create_graph_components will automatically try cache restore before LLM extraction
graph_store, property_graph_index = create_graph_components(
llm=llm,
nodes=vector_nodes,
max_paths_per_chunk=10,
force_reindex=False # Allow cache restore path
)
log_structured('info', 'GraphRAG components created')
# --- Build GraphRAG query engine ---
# Ensure communities are built
if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built:
log_structured('info', 'Building graph communities before creating query engine')
try:
graph_store.build_communities()
log_structured('info', 'Communities built successfully')
except Exception as comm_err:
log_structured('error', f'Error building communities: {comm_err}',
{'traceback': traceback.format_exc()})
vector_retriever = VectorIndexRetriever(
index=index,
similarity_top_k=SIMILARITY_TOP_K
)
try:
graphrag_query_engine = create_graphrag_query_engine(
vector_retriever=vector_retriever,
graph_store=graph_store,
llm=llm,
similarity_top_k=SIMILARITY_TOP_K
)
log_structured('info', 'GraphRAG query engine created successfully')
except Exception as e:
log_structured('error', f'Error creating GraphRAG query engine via factory: {e}')
graphrag_query_engine = GraphRAGQueryEngine(
vector_retriever=vector_retriever,
graph_store=graph_store,
llm=llm,
similarity_top_k=SIMILARITY_TOP_K
)
# Store GraphRAG components in shared state
set_graphrag_components(
graph_store=graph_store,
property_graph_index=property_graph_index,
graphrag_query_engine=graphrag_query_engine
)
# --- Add GraphRAG tool to the existing agent's tools ---
graphrag_tool = GraphRAGTool(graphrag_query_engine)
# Re-import current agent to get fresh reference
from shared_state import global_workflow_agent as live_agent
if live_agent is not None:
live_agent.tools.append(graphrag_tool)
log_structured('info', 'Added GraphRAG tool to existing agent', {
'total_tools': len(live_agent.tools),
'tool_names': [t.metadata.name for t in live_agent.tools]
})
else:
log_structured('warning', 'Agent not available when trying to add GraphRAG tool')
set_graphrag_status(ready=True, initializing=False)
log_structured('info', 'GraphRAG background initialization complete')
return True
except Exception as e:
log_structured('error', f'GraphRAG background initialization failed: {e}', {
'traceback': traceback.format_exc()
})
set_graphrag_status(ready=False, initializing=False, error=e)
return False
# --- Backward-compatible wrapper ---
async def initialize_global_index() -> bool:
"""Initialize the global index from Netflix documents at startup.
This is the original entry point that initializes everything synchronously.
For faster startup, use initialize_vector_index() followed by
initialize_graphrag_components() in the background.
"""
vector_success = await initialize_vector_index()
if not vector_success:
return False
graphrag_success = await initialize_graphrag_components()
if not graphrag_success:
log_structured('warning', 'GraphRAG initialization failed, but vector search is available')
# Return True since vector search works — GraphRAG is optional
return True