# hp_chatbot/ai_core.py import os import asyncio import traceback import uuid import shutil import re import inspect import json from pathlib import Path from typing import List, Dict, Any, Optional, Union import httpx import tiktoken import nest_asyncio from concurrent.futures import ThreadPoolExecutor from functools import partial # Import GraphRAG integration from graph_rag_integration import ( create_graph_components, create_graphrag_query_engine, generate_final_answer, GraphRAGExtractor, GraphRAGStore, GraphRAGQueryEngine ) from llama_index.core import ( VectorStoreIndex, Document as LlamaIndexDocument, Settings, get_response_synthesizer, load_index_from_storage, StorageContext, SimpleDirectoryReader # Keep if needed for fallback/other uses ) from llama_index.core.retrievers import VectorIndexAutoRetriever, VectorIndexRetriever from llama_index.core.query_engine import RetrieverQueryEngine from llama_index.core.postprocessor import SimilarityPostprocessor from llama_index.core.response_synthesizers import ResponseMode from llama_index.llms.openai import OpenAI as LlamaOpenAI from llama_index.embeddings.openai import OpenAIEmbedding from llama_index.core.callbacks import CallbackManager, TokenCountingHandler from llama_index.core.tools import QueryEngineTool, ToolMetadata, BaseTool, ToolSelection, ToolOutput from llama_index.core.memory import ChatMemoryBuffer from llama_index.core.llms import ChatMessage, LLM from llama_index.core.vector_stores.types import MetadataInfo, VectorStoreInfo from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser from llama_index.core.agent.react.types import ( ActionReasoningStep, ObservationReasoningStep, ResponseReasoningStep, BaseReasoningStep ) from llama_index.core.workflow import ( Context, Workflow, StartEvent, StopEvent, step, Event, ) from llama_index.core.node_parser import ( SentenceSplitter, SemanticSplitterNodeParser, ) from llama_parse import LlamaParse # Import from our modules from utils import logger, log_structured from config import ( HP_DOCS_FOLDER, INDEX_PERSIST_PATH, IMAGES_DIRECTORY, LLM_MODEL, EMBEDDING_MODEL, LLM_TEMPERATURE, LLM_TIMEOUT, AGENT_TIMEOUT, TOOL_EXECUTION_TIMEOUT, SIMILARITY_TOP_K, SIMILARITY_CUTOFF, LLAMA_PARSE_VENDOR_MODEL, LLAMA_PARSE_MAX_TIMEOUT, NEO4J_URL, NEO4J_USERNAME, NEO4J_PASSWORD ) nest_asyncio.apply() # --- Global AI State --- # Import shared state to ensure all modules access the same instances from shared_state import ( global_index, global_workflow_agent, global_graph_store, global_property_graph_index, global_graphrag_query_engine, set_global_agent, set_global_index, set_graphrag_components, set_graphrag_status ) # --- Token Counter --- try: token_counter = TokenCountingHandler( tokenizer=tiktoken.encoding_for_model(LLM_MODEL).encode ) except Exception as e: log_structured('warning', f'Could not initialize tiktoken for {LLM_MODEL}. Token counting may be inaccurate.', {'error': str(e)}) # Fallback tokenizer if needed, or disable token counting token_counter = TokenCountingHandler(tokenizer=lambda text: list(text.encode("utf-8"))) # --- Custom ReAct Agent Workflow --- class PrepEvent(Event): pass class InputEvent(Event): input: list[ChatMessage] class ToolCallEvent(Event): tool_calls: list[ToolSelection] class FunctionOutputEvent(Event): output: ToolOutput class CustomStartEvent(StartEvent): input_value: str = "" class ReActAgent2(Workflow): """ Custom ReAct Agent implementation using LlamaIndex Workflows. Includes timeout handling for LLM calls and tool execution. """ def __init__( self, llm: LLM, tools: list[BaseTool], memory: ChatMemoryBuffer, timeout: float = AGENT_TIMEOUT, # Overall workflow timeout llm_timeout: float = LLM_TIMEOUT, # Timeout for individual LLM calls tool_timeout: float = TOOL_EXECUTION_TIMEOUT, # Timeout for individual tool calls verbose: bool = True, # Add verbose flag extra_context: str | None = None, **kwargs: Any, ) -> None: super().__init__(timeout=timeout, verbose=verbose, **kwargs) # Pass verbose to parent self.llm = llm self.tools = tools or [] self.memory = memory self.formatter = ReActChatFormatter(context=extra_context or "") self.output_parser = ReActOutputParser() self.sources: List[ToolOutput] = [] # Store ToolOutput objects directly self.llm_timeout = llm_timeout self.tool_timeout = tool_timeout self.verbose = verbose # Store verbose flag @step async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent: self.sources = [] # Clear sources for new message # Store the initial input in the context await ctx.set("user_input", ev) # Save the whole event # We'll set a placeholder message for now and extract the actual input in the next step user_msg = ChatMessage(role="user", content="Placeholder") self.memory.put(user_msg) await ctx.set("current_reasoning", []) if self.verbose: log_structured('debug', 'ReActAgent: Receiving new user message', {}) return PrepEvent() @step async def prepare_chat_history(self, ctx: Context, ev: PrepEvent) -> InputEvent: # Get the real user input from the first step and update the memory start_event = await ctx.get("user_input") if hasattr(start_event, "input_value"): # Try to access input_value directly if it exists real_user_input = start_event.input_value else: # Otherwise, assume the actual input is the input value provided to run() log_structured('info', 'ReActAgent: Extracting input from workflow input', {}) # Default to reasonable fallback if we can't extract it real_user_input = "How can I help you with HP marketing guidelines?" # Update the placeholder message with the real input messages = self.memory.get() if messages and messages[0].role == "user" and messages[0].content == "Placeholder": # Replace the placeholder with real input messages[0].content = str(real_user_input) if self.verbose: log_structured('debug', 'ReActAgent: Updated user message with real input', {'input': real_user_input}) # Format the chat history for the LLM chat_history = self.memory.get() current_reasoning = await ctx.get("current_reasoning", default=[]) llm_input = self.formatter.format( self.tools, chat_history, current_reasoning=current_reasoning ) if self.verbose: log_structured('debug', 'ReActAgent: Prepared chat history for LLM', {'history_len': len(llm_input)}) return InputEvent(input=llm_input) @step # The timeout is managed inside the method async def handle_llm_input(self, ctx: Context, ev: InputEvent) -> Union[ToolCallEvent, StopEvent, PrepEvent]: chat_history = ev.input current_reasoning = await ctx.get("current_reasoning", default=[]) try: if self.verbose: log_structured('debug', 'ReActAgent: Sending request to LLM', {'history_len': len(chat_history)}) response = await asyncio.wait_for( self.llm.achat(chat_history), timeout=self.llm_timeout ) reasoning_step = self.output_parser.parse(response.message.content) current_reasoning.append(reasoning_step) await ctx.set("current_reasoning", current_reasoning) # Update context state if self.verbose: log_structured('debug', 'ReActAgent: Received LLM response', { 'step_type': type(reasoning_step).__name__, 'is_done': getattr(reasoning_step, 'is_done', False), 'has_response': hasattr(reasoning_step, 'response'), 'has_action': hasattr(reasoning_step, 'action'), 'action': getattr(reasoning_step, 'action', None), 'raw_content_preview': response.message.content[:300] }) if reasoning_step.is_done: # Log what we're about to return as final response if self.verbose: log_structured('debug', 'ReActAgent: Processing final response', { 'raw_response_content': response.message.content, 'reasoning_step_response': str(getattr(reasoning_step, 'response', 'NO_RESPONSE_ATTR')) }) # Clean the response to remove any thinking parts response_text = str(reasoning_step.response) # Check for common thinking patterns and remove them import re # Import re within the function scope to ensure it's available thinking_patterns = [ r'(?i)^.*?thinking:.*?\n', # Remove lines starting with "Thinking:" r'(?i).*?', # Remove XML-like thinking tags r'(?i)\[thinking\].*?\[/thinking\]', # Remove bracket thinking tags r'(?i)I\'m thinking:.*?\n', # Remove "I'm thinking:" sections r'(?i)Let me think.*?\n', # Remove "Let me think" sections r'(?i)Thought:.*?Action:.*?Action Input:.*', # Remove the specific pattern user reported r'(?i)^Thought:.*', # Remove any line starting with "Thought:" r'(?i)Action:.*?Action Input:.*', # Remove Action/Action Input patterns r'(?i)Thought:.*?Answer:', # Remove "Thought: ... Answer:" pattern r'(?i)^Answer:\s*' # Remove just the "Answer:" prefix ] for pattern in thinking_patterns: response_text = re.sub(pattern, '', response_text, flags=re.DOTALL) # Remove extra newlines that might be left after cleaning response_text = re.sub(r'\n{3,}', '\n\n', response_text) response_text = response_text.strip() # Final safety check - if response still looks like thinking, provide fallback if re.search(r'(?i)^(Thought|Action|Observation):', response_text) or not response_text: log_structured('warning', 'ReActAgent: Final response still contains thinking patterns, using fallback', { 'problematic_response': response_text[:200] }) response_text = "I found information about your query in the HP marketing materials. Please let me know if you need more specific details." self.memory.put(ChatMessage(role="assistant", content=response_text)) if self.verbose: log_structured('info', 'ReActAgent: Final response generated', { 'response_preview': response_text[:100], 'sources_count': len(self.sources) }) # Structure the final output final_result = { "response": response_text, "sources": self.sources, # Pass the collected ToolOutput objects "reasoning": current_reasoning } return StopEvent(result=final_result) elif isinstance(reasoning_step, ActionReasoningStep): if self.verbose: log_structured('debug', 'ReActAgent: Action step identified', { 'action': reasoning_step.action, 'input': reasoning_step.action_input }) # Ensure action_input is a dict action_input_dict = reasoning_step.action_input or {} if not isinstance(action_input_dict, dict): log_structured('warning', 'ReActAgent: action_input is not a dict, attempting to parse', {'raw_input': action_input_dict}) try: # Attempt basic parsing if it looks like JSON string if isinstance(action_input_dict, str) and action_input_dict.strip().startswith('{'): action_input_dict = json.loads(action_input_dict) else: # Fallback: treat as a single 'query' arg if not dict/json action_input_dict = {'query': action_input_dict} except Exception as parse_err: log_structured('error', 'ReActAgent: Failed to parse action_input', {'raw_input': action_input_dict, 'error': str(parse_err)}) action_input_dict = {'query': str(action_input_dict)} # Safest fallback return ToolCallEvent(tool_calls=[ ToolSelection( tool_id="tool_" + reasoning_step.action.replace(" ", "_"), # Simple ID generation tool_name=reasoning_step.action, tool_kwargs=action_input_dict ) ]) else: # Handle other reasoning step types if necessary, or just proceed if self.verbose: log_structured('debug', 'ReActAgent: Non-action, non-done step encountered', {'step_type': type(reasoning_step).__name__}) return PrepEvent() # Continue the loop except asyncio.TimeoutError: error_msg = f"LLM call timed out after {self.llm_timeout} seconds." log_structured('error', 'ReActAgent: LLM Timeout', {'timeout': self.llm_timeout}) current_reasoning.append(ObservationReasoningStep(observation=error_msg)) await ctx.set("current_reasoning", current_reasoning) return PrepEvent() # Try again or maybe stop? Returning PrepEvent allows loop to continue. except Exception as e: error_msg = f"Error during LLM interaction or parsing: {str(e)}" log_structured('error', 'ReActAgent: Error in handle_llm_input', { 'error': str(e), 'traceback': traceback.format_exc() }) current_reasoning.append(ObservationReasoningStep(observation=error_msg)) await ctx.set("current_reasoning", current_reasoning) # Decide whether to stop or continue after error # Returning PrepEvent allows the agent to potentially describe the error or retry return PrepEvent() @step # The timeout is managed inside the method async def handle_tool_calls(self, ctx: Context, ev: ToolCallEvent) -> PrepEvent: tool_calls = ev.tool_calls tools_by_name = {tool.metadata.name: tool for tool in self.tools} # Use .name property current_reasoning = await ctx.get("current_reasoning", default=[]) if self.verbose: log_structured('debug', 'ReActAgent: Handling tool calls', { 'call_count': len(tool_calls), 'tool_names': [tc.tool_name for tc in tool_calls] }) for tool_call in tool_calls: tool = tools_by_name.get(tool_call.tool_name) if not tool: error_msg = f"Tool '{tool_call.tool_name}' not found." log_structured('error', 'ReActAgent: Tool not found', {'tool_name': tool_call.tool_name}) current_reasoning.append(ObservationReasoningStep(observation=error_msg)) continue # Skip to next tool call try: tool_kwargs = tool_call.tool_kwargs or {} # Ensure kwargs is a dict if self.verbose: log_structured('debug', 'ReActAgent: Executing tool', { 'tool_name': tool_call.tool_name, 'kwargs': tool_kwargs }) # Execute tool potentially in thread pool with timeout tool_func = partial(tool, **tool_kwargs) tool_output: ToolOutput if inspect.iscoroutinefunction(tool.call): # Check if tool's call method is async tool_output = await asyncio.wait_for( tool.acall(**tool_kwargs), # Use acall for async tools timeout=self.tool_timeout ) else: # Run sync tool in thread pool with ThreadPoolExecutor() as executor: future = executor.submit(tool_func) # tool_func already has kwargs via partial tool_output = await asyncio.to_thread(future.result, timeout=self.tool_timeout) if self.verbose: log_structured('debug', 'ReActAgent: Tool execution successful', { 'tool_name': tool_call.tool_name, 'output_type': type(tool_output).__name__, 'output_content_preview': str(tool_output.content)[:100] if tool_output else "N/A" }) # Store the raw ToolOutput object which contains content, raw_output, metadata etc. self.sources.append(tool_output) # Add observation step with the tool's string content observation_content = str(tool_output.content) if tool_output and tool_output.content is not None else "Tool executed successfully but returned no content." current_reasoning.append(ObservationReasoningStep(observation=observation_content)) if self.verbose: log_structured('debug', 'ReActAgent: Added observation to reasoning', { 'tool_name': tool_call.tool_name, 'observation_preview': observation_content[:200] }) # --- Detailed logging for image metadata --- if hasattr(tool_output, 'raw_output') and hasattr(tool_output.raw_output, 'source_nodes'): for node_with_score in tool_output.raw_output.source_nodes: node = getattr(node_with_score, 'node', None) if node and hasattr(node, 'metadata'): if 'image_paths' in node.metadata and node.metadata['image_paths']: log_structured('debug', 'ReActAgent: Tool output node contains image paths', { 'tool_name': tool_call.tool_name, 'node_id': getattr(node, 'id_', 'N/A'), 'image_paths': node.metadata['image_paths'] }) except asyncio.TimeoutError: error_msg = f"Tool '{tool_call.tool_name}' timed out after {self.tool_timeout} seconds." log_structured('error', 'ReActAgent: Tool Timeout', {'tool_name': tool_call.tool_name, 'timeout': self.tool_timeout}) current_reasoning.append(ObservationReasoningStep(observation=error_msg)) except Exception as e: error_msg = f"Error calling tool '{tool_call.tool_name}': {str(e)}" log_structured('error', 'ReActAgent: Tool execution error', { 'tool_name': tool_call.tool_name, 'error': str(e), 'traceback': traceback.format_exc() }) current_reasoning.append(ObservationReasoningStep(observation=error_msg)) # Update reasoning steps in context before returning await ctx.set("current_reasoning", current_reasoning) return PrepEvent() # Always return PrepEvent to continue the loop # --- Document Processing --- async def process_documents_in_directory(directory: str, session_id: Optional[str] = None) -> List[LlamaIndexDocument]: """ Process all documents in a directory using LlamaParse for text and images, preserving chunk-level metadata including specific image paths per chunk. Includes detailed logging for image metadata assignment. Args: directory: The path to the directory containing documents. session_id: Optional session identifier for context. Returns: A list of LlamaIndexDocument objects representing the processed chunks, ready for indexing. Returns an empty list on failure or if no documents are processed. """ parser_text = None parser_images = None try: # Ensure images directory exists os.makedirs(IMAGES_DIRECTORY, exist_ok=True) log_structured('info', f'Starting document processing for directory: {directory}', { 'session_id': session_id, 'directory_name': os.path.basename(directory), 'directory_exists': os.path.exists(directory), 'files_present': os.listdir(directory) if os.path.exists(directory) else 'N/A' }) if not os.path.exists(directory) or not os.path.isdir(directory): log_structured('error', f'Directory not found or is not a directory: {directory}', {'session_id': session_id}) return [] if not os.listdir(directory): log_structured('warning', f'Directory is empty, skipping: {directory}', {'session_id': session_id}) return [] # --- LlamaParse Initialization --- try: # Separate clients recommended if making concurrent calls, but can share if sequential custom_client_text = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT) custom_client_images = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT) parser_text = LlamaParse( result_type="markdown", add_page_breaks=False, system_prompt="Extract all content including text, tables, and formatting. Preserve structure.", premium_mode=False, # Set based on your LlamaCloud plan max_timeout=LLAMA_PARSE_MAX_TIMEOUT, custom_client=custom_client_text, verbose=True # Enable verbose logging for LlamaParse ) parser_images = LlamaParse( result_type="markdown", # Less critical here, parsing for images primarily add_page_breaks=False, system_prompt="Generate page images of the document.", use_vendor_multimodal_model=True, # Assuming you want LlamaParse's image gen vendor_multimodal_model_name=LLAMA_PARSE_VENDOR_MODEL, premium_mode=False, # Set based on your LlamaCloud plan max_timeout=LLAMA_PARSE_MAX_TIMEOUT, custom_client=custom_client_images, verbose=True # Enable verbose logging for LlamaParse ) log_structured('info', 'Initialized LlamaParse (text & image capability)', {'session_id': session_id}) except Exception as parser_err: log_structured('error', f'Error initializing dual LlamaParse capability: {parser_err}. Falling back to text-only.', { 'session_id': session_id, 'traceback': traceback.format_exc() }) if parser_text and hasattr(parser_text, 'aclose'): await parser_text.aclose() if parser_images and hasattr(parser_images, 'aclose'): await parser_images.aclose() parser_images = None # Disable image parsing try: # Ensure client is fresh for fallback custom_client_text = httpx.AsyncClient(timeout=LLAMA_PARSE_MAX_TIMEOUT) parser_text = LlamaParse( result_type="markdown", language="en", add_page_breaks=False, merge_consecutive=True, max_timeout=LLAMA_PARSE_MAX_TIMEOUT, custom_client=custom_client_text, verbose=True ) log_structured('info', 'Initialized LlamaParse with fallback text-only parser.', {'session_id': session_id}) except Exception as fallback_err: log_structured('critical', f'FATAL: Error initializing fallback LlamaParse: {fallback_err}', { 'session_id': session_id, 'traceback': traceback.format_exc() }) if parser_text and hasattr(parser_text, 'aclose'): await parser_text.aclose() # Cleanup return [] if not parser_text: log_structured('critical', 'FATAL: Text parser (parser_text) could not be initialized.') return [] file_extractor = { ".pdf": parser_text, ".docx": parser_text, ".doc": parser_text, ".pptx": parser_text, ".ppt": parser_text, ".txt": parser_text, # Add others if your LlamaParse supports them # ".html": parser_text, ".md": parser_text, } supported_extensions = list(file_extractor.keys()) # --- Custom File Reader Async Function --- async def custom_file_reader(): processed_chunks: List[LlamaIndexDocument] = [] images_by_file_and_page: Dict[str, Dict[int, str]] = {} # filename -> { page_num_1_based -> image_filename } all_files_paths = [] log_structured('debug', f'Searching for supported files in: {directory}', {'extensions': supported_extensions, 'session_id': session_id}) for ext in supported_extensions: all_files_paths.extend(list(Path(directory).glob(f"*{ext}"))) all_files_paths.extend(list(Path(directory).glob(f"*{ext.upper()}"))) # Case-insensitive glob all_files_paths = sorted(list(set(all_files_paths))) # Remove duplicates and sort log_structured('info', f'Found {len(all_files_paths)} supported files in {directory}', {'session_id': session_id, 'files': [f.name for f in all_files_paths]}) if not all_files_paths: log_structured('warning', f'No supported files found in directory: {directory}', {'session_id': session_id}) return [] for file_path in all_files_paths: file_path_str = str(file_path) filename = file_path.name log_structured('info', f'Processing file: {filename}', {'session_id': session_id, 'path': file_path_str}) ext = file_path.suffix.lower() current_text_parser = file_extractor.get(ext) if not current_text_parser: log_structured('warning', f'Skipping file {filename}: No parser defined for extension {ext}.', {'session_id': session_id}) continue # 1. Extract Text Chunks text_chunks: List[LlamaIndexDocument] = [] try: log_structured('debug', f'Calling parser_text.aload_data for {filename}', {'session_id': session_id}) # LlamaParse aload_data returns List[Document] text_chunks = await current_text_parser.aload_data(file_path_str) log_structured('info', f'Extracted {len(text_chunks)} text chunks from {filename}', {'session_id': session_id}) if not text_chunks: log_structured('warning', f'No text chunks extracted from {filename}. File might be empty or parser issue.', {'session_id': session_id}) # Continue to attempt image extraction for this file except Exception as text_err: log_structured('error', f'Error extracting text from {filename}', { 'session_id': session_id, 'error': str(text_err), 'traceback': traceback.format_exc() }) continue # Skip to next file if text extraction fails critically # 2. Extract Images (if parser_images is available) file_page_images: Dict[int, str] = {} # { page_num_1_based: image_filename } if parser_images: try: log_structured('debug', f'Attempting image extraction for: {filename}', {'session_id': session_id}) temp_image_dir = IMAGES_DIRECTORY / f"temp_img_{filename}_{uuid.uuid4().hex[:8]}" os.makedirs(temp_image_dir, exist_ok=True) # Use synchronous methods for simplicity here, or manage async complexity # NOTE: Check LlamaParse documentation for the best way to get images. # This assumes get_json_result and get_images are available and work this way. # If these are async, you'll need `await parser_images.aget_json_result(...)` etc. # Running sync methods in executor to avoid blocking event loop loop = asyncio.get_running_loop() with ThreadPoolExecutor() as pool: md_json_objs = await loop.run_in_executor(pool, parser_images.get_json_result, file_path_str) log_structured('debug', f'Got JSON result for images from {filename}', {'session_id': session_id}) image_dicts = await loop.run_in_executor(pool, parser_images.get_images, md_json_objs, temp_image_dir) log_structured('info', f'LlamaParse reported {len(image_dicts)} images potentially extracted for {filename}', {'session_id': session_id}) saved_image_count = 0 for idx, img_info in enumerate(image_dicts): # LlamaParse might return 0-based 'page' index page_index_0_based = img_info.get('page', idx) # Default to list index if 'page' missing page_num_1_based = page_index_0_based + 1 # Convert to 1-based for consistency # log_structured('debug', f'Processing image info for {filename}, index {idx}', {'img_info': img_info, 'page_1_based': page_num_1_based}) if 'path' in img_info and img_info['path'] and os.path.exists(img_info['path']): source_img_path = img_info['path'] image_filename = f"{os.path.splitext(filename)[0]}_page{page_num_1_based}_{uuid.uuid4().hex[:6]}.png" dest_path = IMAGES_DIRECTORY / image_filename try: shutil.copy2(source_img_path, dest_path) # copy2 preserves metadata file_page_images[page_num_1_based] = image_filename # Map 1-based page num saved_image_count += 1 log_structured('debug', f'Saved image for {filename} page {page_num_1_based} (1-based) to {dest_path}', {'session_id': session_id}) except Exception as copy_err: log_structured('error', f'Error copying image for {filename} page {page_num_1_based}', {'session_id': session_id, 'error': str(copy_err)}) else: log_structured('warning', f'No valid image path found or file missing for image index {idx} (reported page {page_index_0_based}) from {filename}. Path: {img_info.get("path", "N/A")}', {'session_id': session_id}) log_structured('info', f'Successfully saved {saved_image_count} images for {filename}', {'session_id': session_id}) images_by_file_and_page[filename] = file_page_images # Clean up temporary directory try: shutil.rmtree(temp_image_dir) except Exception as cleanup_err: log_structured('error', f'Error cleaning up temp image dir {temp_image_dir}', {'session_id': session_id, 'error': str(cleanup_err)}) except Exception as img_err: log_structured('error', f'Error during image extraction pipeline for {filename}', { 'session_id': session_id, 'error': str(img_err), 'traceback': traceback.format_exc() }) images_by_file_and_page[filename] = {} # Ensure empty dict on error else: log_structured('info', f'Image parsing skipped for {filename} (parser_images is None).', {'session_id': session_id}) images_by_file_and_page[filename] = {} # 3. Add Metadata to Each Text Chunk log_structured('debug', f'Assigning metadata and images to {len(text_chunks)} chunks for {filename}', {'session_id': session_id}) for chunk_index, chunk in enumerate(text_chunks): if not hasattr(chunk, 'metadata') or chunk.metadata is None: chunk.metadata = {} chunk.metadata['filename'] = filename chunk.metadata['file_path'] = file_path_str chunk.metadata['source'] = "document" # Or more specific if needed chunk.metadata['type'] = os.path.basename(directory) # e.g., 'brief', 'supporting' chunk.metadata['chunk_index'] = chunk_index # --- Determine 1-based Page Number --- page_num_1_based: Optional[int] = None page_source_key: Optional[str] = None # Priority: LlamaParse metadata keys page_key_options = ['page_label', 'page_number', 'page'] # Check common keys for key in page_key_options: if key in chunk.metadata: try: page_num_1_based = int(chunk.metadata[key]) # Assume it's 1-based from LlamaParse if page_num_1_based > 0: # Basic sanity check page_source_key = key chunk.metadata["source_page"] = page_num_1_based # Store consistently log_structured('debug', f"Found page {page_num_1_based} from metadata key '{key}'", {'filename': filename, 'chunk': chunk_index, 'session_id': session_id}) break else: page_num_1_based = None # Ignore non-positive page numbers except (ValueError, TypeError): pass # Ignore if not integer # Fallback: If no page found in metadata, try approximation (less reliable) # This part is optional and might be inaccurate. Consider removing if metadata is reliable. if page_num_1_based is None: # Simple fallback: use chunk index as a proxy (very rough) page_num_1_based = chunk_index + 1 # Treat first chunk as page 1 approx. chunk.metadata["source_page"] = page_num_1_based page_source_key = "position_fallback" log_structured('debug', f"Using position fallback for page: {page_num_1_based}", {'filename': filename, 'chunk': chunk_index, 'session_id': session_id}) # --- Precise Page-Based Image Assignment --- chunk.metadata['image_paths'] = [] # Initialize/reset specific_image_filename = None if page_num_1_based is not None: file_images_dict = images_by_file_and_page.get(filename, {}) if file_images_dict: # Try exact match first if page_num_1_based in file_images_dict: specific_image_filename = file_images_dict[page_num_1_based] chunk.metadata['image_match_type'] = 'exact' log_structured('debug', f'Exact page match for image page {page_num_1_based}', {'filename': filename, 'chunk': chunk_index, 'image': specific_image_filename}) else: # Optional: Find nearest page if exact match fails (can be noisy) # nearest_page = min(file_images_dict.keys(), key=lambda x: abs(x - page_num_1_based)) # if abs(nearest_page - page_num_1_based) <= 1: # Only if very close? Threshold needed. # specific_image_filename = file_images_dict[nearest_page] # chunk.metadata['image_match_type'] = 'nearest' # chunk.metadata["image_nearest_page"] = nearest_page # log_structured('debug', f'Nearest page match: requested {page_num_1_based}, using {nearest_page}', {'filename': filename, 'chunk': chunk_index, 'image': specific_image_filename}) pass # Default: No image if no exact match if specific_image_filename: chunk.metadata['image_paths'] = [specific_image_filename] # Store as list # Final metadata log before adding chunk # log_structured('debug', "Final chunk metadata check", { # 'session_id': session_id, 'filename': filename, 'chunk': chunk_index, # 'page': chunk.metadata.get('source_page'), 'page_src': page_source_key, # 'image': chunk.metadata.get('image_paths'), 'match_type': chunk.metadata.get('image_match_type'), # 'keys': list(chunk.metadata.keys()) # }) processed_chunks.append(chunk) # --- End of file loop --- log_structured('info', f'Finished processing files. Total chunks: {len(processed_chunks)}', {'session_id': session_id}) return processed_chunks # --- Execute the async file reader --- final_documents: List[LlamaIndexDocument] = await custom_file_reader() log_structured('info', 'Directory processing complete.', { 'session_id': session_id, 'directory': directory, 'total_chunks_generated': len(final_documents) }) # Log sample metadata for verification # for i, doc in enumerate(final_documents[:2]): # log_structured('debug', f'Final Doc Sample #{i} Metadata', {'metadata': doc.metadata}) return final_documents except Exception as e: log_structured('critical', 'FATAL Error in process_documents_in_directory', { 'session_id': session_id, 'directory': directory, 'error': str(e), 'traceback': traceback.format_exc() }) return [] # Return empty list on major failure finally: # Ensure async clients are closed log_structured('debug', 'Closing LlamaParse clients.', {'session_id': session_id}) if parser_text and hasattr(parser_text, 'aclose'): try: await parser_text.aclose() except Exception as close_err: log_structured('error', 'Error closing parser_text client', {'error': str(close_err)}) if parser_images and hasattr(parser_images, 'aclose'): try: await parser_images.aclose() except Exception as close_err: log_structured('error', 'Error closing parser_images client', {'error': str(close_err)}) # --- GraphRAG Tool (module-level for reuse across init phases) --- class GraphRAGTool(BaseTool): """Tool that queries using both vector and graph-based retrieval.""" def __init__(self, query_engine): self.query_engine = query_engine self._metadata = ToolMetadata( name="answerquestionswith_graphrag", description="USE THIS TOOL FOR ALL QUERIES - Queries HP marketing materials using both vector and graph-based retrieval for more comprehensive answers. Use this for complex questions that need context from multiple related documents." ) @property def metadata(self): return self._metadata def __call__(self, query_str: str) -> ToolOutput: """Run query through GraphRAG and generate synthesized answer.""" from shared_state import global_graphrag_query_engine log_structured('info', 'GraphRAG Tool: Starting dual retrieval', {'query': query_str}) try: # Get both vector and GraphRAG retrieval results retrieval_result = self.query_engine.custom_query(query_str) # Generate synthesized answer final_answer = generate_final_answer(query_str, retrieval_result, self.query_engine.llm) # Prepare the tool output log_message = { 'vector_context_length': len(retrieval_result.get('vector_context', '')), 'graphrag_context_length': len(retrieval_result.get('graphrag_context', '')), 'vector_nodes_count': len(retrieval_result.get('vector_nodes', [])), 'community_ids': retrieval_result.get('community_ids', []) } log_structured('info', 'GraphRAG Tool: Retrieval complete', log_message) # Extract vector nodes and create a properly structured raw_output that includes source_nodes vector_nodes = retrieval_result.get('vector_nodes', []) class NodeWrapper: def __init__(self, nodes): self.source_nodes = nodes tool_output = ToolOutput( content=final_answer, tool_name="GraphRAG", raw_output=NodeWrapper(vector_nodes), raw_input={"query": query_str} ) log_structured('debug', 'GraphRAG Tool: Including image metadata in response', {'node_count': len(vector_nodes)}) return tool_output except Exception as graphrag_err: log_structured('error', f'Error in GraphRAG tool: {graphrag_err}', {'traceback': traceback.format_exc()}) return ToolOutput( content=f"I encountered an error while retrieving information: {str(graphrag_err)}. Please try again or use the standard query tool.", tool_name="GraphRAG", raw_input={"query": query_str}, raw_output={"error": str(graphrag_err)} ) async def acall(self, input: str) -> ToolOutput: """Async version of __call__.""" return self.__call__(input) # --- Phase 1: Vector Index Initialization (fast path) --- async def initialize_vector_index() -> bool: """Initialize vector index and agent with vector-only tools. This is the fast path (~1-2 min) that makes the server usable for vector-only queries. GraphRAG components are added later by initialize_graphrag_components(). """ try: # --- Configure LLM and Embedding Model --- openai_key = os.environ.get("OPENAI_API_KEY", "") if not openai_key: log_structured('critical', 'No OpenAI API key provided. Make sure OPENAI_API_KEY is set in your .env file.') return False key_preview = openai_key[:4] + "..." if len(openai_key) > 4 else "invalid" log_structured('info', f'Using OpenAI API key starting with: {key_preview}') try: llm = LlamaOpenAI( model=LLM_MODEL, temperature=LLM_TEMPERATURE, timeout=LLM_TIMEOUT ) test_prompt = "Say 'API key is working' if you can read this." _ = llm.complete(test_prompt) log_structured('info', 'Successfully tested LLM API connection') except Exception as llm_err: log_structured('critical', f'Failed to initialize LLM with provided API key: {str(llm_err)}') return False embed_model = OpenAIEmbedding(model=EMBEDDING_MODEL) # --- Configure Global Settings --- Settings.llm = llm Settings.embed_model = embed_model Settings.callback_manager = CallbackManager([token_counter]) node_parser = SemanticSplitterNodeParser( buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model ) Settings.node_parser = node_parser # --- Load or Build Vector Index --- if INDEX_PERSIST_PATH.exists(): log_structured('info', f'Loading existing index from {INDEX_PERSIST_PATH}') storage_context = StorageContext.from_defaults(persist_dir=str(INDEX_PERSIST_PATH)) index = load_index_from_storage(storage_context) set_global_index(index) log_structured('info', 'Successfully loaded existing index') else: log_structured('info', f'No index found at {INDEX_PERSIST_PATH}. Creating new index from {HP_DOCS_FOLDER}') if not HP_DOCS_FOLDER.exists() or not any(HP_DOCS_FOLDER.iterdir()): log_structured('error', f'HP documents folder is missing or empty: {HP_DOCS_FOLDER}') return False documents = await process_documents_in_directory(str(HP_DOCS_FOLDER), session_id="global_init") if not documents: log_structured('error', f'No documents processed from {HP_DOCS_FOLDER}. Index creation aborted.') return False log_structured('info', 'Creating vector store index...', { 'document_count': len(documents), 'sample_doc_metadata': documents[0].metadata if documents else None }) index = VectorStoreIndex.from_documents( documents=documents, show_progress=True, ) set_global_index(index) index.storage_context.persist(persist_dir=str(INDEX_PERSIST_PATH)) log_structured('info', f'Index created and persisted to {INDEX_PERSIST_PATH}') # --- Create Retriever and Query Engine (vector only) --- vector_store_info = VectorStoreInfo( content_info="HP marketing reference materials, including brand guidelines and supporting documents.", metadata_info=[ MetadataInfo(name="filename", type="str", description="Filename of the source document"), MetadataInfo(name="source_page", type="int", description="Approximate page number in the source document"), MetadataInfo(name="image_paths", type="list[str]", description="List of image filenames associated with this chunk"), ], ) all_retriever = VectorIndexAutoRetriever( index=index, vector_store_info=vector_store_info, similarity_top_k=SIMILARITY_TOP_K, verbose=True ) all_query_engine = RetrieverQueryEngine.from_args( retriever=all_retriever, response_synthesizer=get_response_synthesizer( response_mode=ResponseMode.COMPACT, ), node_postprocessors=[ SimilarityPostprocessor(similarity_cutoff=SIMILARITY_CUTOFF), ], ) # --- Create Agent with vector-only tools --- query_engine_tools_react = [ QueryEngineTool( query_engine=all_query_engine, metadata=ToolMetadata( name="answer_questions_from_hp_marketing_materials", description="USE THIS TOOL FOR ALL QUERIES - Queries HP marketing materials (brand guidelines, etc.) to answer questions about guidelines, workflows, and processes. Use this for specific lookups in the knowledge base." ), ), ] try: agent = ReActAgent2( llm=llm, tools=query_engine_tools_react, memory=ChatMemoryBuffer.from_defaults(llm=llm, token_limit=4096), verbose=True, timeout=AGENT_TIMEOUT, llm_timeout=LLM_TIMEOUT, tool_timeout=TOOL_EXECUTION_TIMEOUT ) set_global_agent(agent) log_structured('info', 'Agent initialized successfully with vector-only tools') except Exception as agent_err: log_structured('critical', f'Failed to initialize agent: {str(agent_err)}', {'error': str(agent_err)}) return False # --- Attach simple_run method --- try: async def simple_run(query_text): """Simple version that doesn't rely on complex workflow steps.""" from shared_state import global_workflow_agent if not global_workflow_agent: log_structured('critical', 'Agent is None in simple_run - this should never happen') return { "response": "The AI system is currently unavailable. Please try again later.", "sources": [], "reasoning": [] } try: global_workflow_agent.sources = [] user_msg = ChatMessage(role="user", content=str(query_text)) global_workflow_agent.memory.put(user_msg) chat_history = global_workflow_agent.memory.get() llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history) response = await asyncio.wait_for( global_workflow_agent.llm.achat(llm_input), timeout=global_workflow_agent.llm_timeout ) reasoning_step = global_workflow_agent.output_parser.parse(response.message.content) log_structured('debug', 'Parsed reasoning step', { 'step_type': type(reasoning_step).__name__, 'has_response': hasattr(reasoning_step, 'response'), 'has_action': hasattr(reasoning_step, 'action'), 'action': getattr(reasoning_step, 'action', None), 'action_input': getattr(reasoning_step, 'action_input', None), 'raw_content': response.message.content[:200] }) if hasattr(reasoning_step, 'response') and reasoning_step.response: log_structured('debug', 'LLM provided direct response, forcing tool usage for image retrieval') graphrag_tool = next((t for t in global_workflow_agent.tools if 'graphrag' in t.metadata.name.lower()), None) if graphrag_tool: try: tool_output = await asyncio.wait_for( graphrag_tool.acall(input=str(query_text)), timeout=global_workflow_agent.tool_timeout ) global_workflow_agent.sources.append(tool_output) response_text = str(tool_output.content) if tool_output and tool_output.content else str(reasoning_step.response) assistant_msg = ChatMessage(role="assistant", content=response_text) global_workflow_agent.memory.put(assistant_msg) return { "response": response_text, "sources": global_workflow_agent.sources, "reasoning": [reasoning_step] } except Exception as e: log_structured('error', f'Error forcing GraphRAG tool execution: {str(e)}') response_text = str(reasoning_step.response) import re thinking_patterns = [ r'(?i)^.*?thinking:.*?\n', r'(?i).*?', r'(?i)\[thinking\].*?\[/thinking\]', r'(?i)I\'m thinking:.*?\n', r'(?i)Let me think.*?\n', r'(?i)Thought:.*?Answer:', r'(?i)^Answer:\s*' ] for pattern in thinking_patterns: response_text = re.sub(pattern, '', response_text, flags=re.DOTALL) response_text = re.sub(r'\n{3,}', '\n\n', response_text) response_text = response_text.strip() assistant_msg = ChatMessage(role="assistant", content=response_text) global_workflow_agent.memory.put(assistant_msg) return { "response": response_text, "sources": global_workflow_agent.sources, "reasoning": [reasoning_step] } else: if hasattr(reasoning_step, 'action') and reasoning_step.action: tool_name = reasoning_step.action action_input = reasoning_step.action_input or {} if not isinstance(action_input, dict): try: if isinstance(action_input, str) and action_input.strip().startswith('{'): action_input = json.loads(action_input) else: action_input = {'query': action_input} except: action_input = {'query': str(action_input)} tool = next((t for t in global_workflow_agent.tools if t.metadata.name == tool_name), None) if not tool: error_msg = f"Tool '{tool_name}' not found. Available tools: {[t.metadata.name for t in global_workflow_agent.tools]}" log_structured('error', 'Tool not found in simple_run', {'tool_name': tool_name, 'available_tools': [t.metadata.name for t in global_workflow_agent.tools]}) return { "response": f"I tried to use a tool called '{tool_name}' but it's not available. Please try rephrasing your query.", "sources": [], "reasoning": [reasoning_step] } if tool: try: tool_output = await asyncio.wait_for( tool.acall(**action_input), timeout=global_workflow_agent.tool_timeout ) global_workflow_agent.sources.append(tool_output) observation = str(tool_output.content if tool_output and tool_output.content is not None else "No content") follow_up_msg = ChatMessage(role="user", content=f"Here is the result: {observation}\\nPlease provide a final response based on this information.") global_workflow_agent.memory.put(follow_up_msg) chat_history = global_workflow_agent.memory.get() llm_input = global_workflow_agent.formatter.format(global_workflow_agent.tools, chat_history) final_response = await asyncio.wait_for( global_workflow_agent.llm.achat(llm_input), timeout=global_workflow_agent.llm_timeout ) response_text = str(final_response.message.content) import re thinking_patterns = [ r'(?i)^.*?thinking:.*?\n', r'(?i).*?', r'(?i)\[thinking\].*?\[/thinking\]', r'(?i)I\'m thinking:.*?\n', r'(?i)Let me think.*?\n', r'(?i)Thought:.*?Answer:', r'(?i)^Answer:\s*' ] for pattern in thinking_patterns: response_text = re.sub(pattern, '', response_text, flags=re.DOTALL) response_text = re.sub(r'\n{3,}', '\n\n', response_text) response_text = response_text.strip() global_workflow_agent.memory.put(ChatMessage(role="assistant", content=response_text)) return { "response": response_text, "sources": global_workflow_agent.sources, "reasoning": [reasoning_step] } except Exception as e: log_structured('error', f'Error executing tool: {str(e)}', {'traceback': traceback.format_exc()}) error_response = f"I encountered an error while processing your query: {str(e)}" return { "response": error_response, "sources": [], "reasoning": [reasoning_step] } if isinstance(reasoning_step, ActionReasoningStep): log_structured('warning', 'Returning raw thinking due to failed action execution', { 'action': reasoning_step.action, 'action_input': reasoning_step.action_input }) return { "response": "I apologize, but I encountered an issue while processing your query. Please try asking your question in a different way.", "sources": [], "reasoning": [reasoning_step] } else: fallback_response = "I wasn't able to find a specific answer to your question. Please try rephrasing your query." return { "response": fallback_response, "sources": global_workflow_agent.sources, "reasoning": [reasoning_step] if reasoning_step else [] } except Exception as e: log_structured('error', f'Error in simple_run: {str(e)}', {'traceback': traceback.format_exc()}) error_response = f"I encountered an error while processing your query: {str(e)}" return { "response": error_response, "sources": [], "reasoning": [] } from shared_state import global_workflow_agent as current_agent if current_agent is None: log_structured('critical', 'Cannot set run method - global_workflow_agent is None') return False current_agent.run = simple_run if not hasattr(current_agent, 'run'): log_structured('critical', 'Failed to attach run method to agent') return False log_structured('info', 'Successfully attached run method to agent') log_structured('info', 'Testing agent functionality...') if current_agent and hasattr(current_agent, 'memory') and hasattr(current_agent.memory, 'reset'): current_agent.memory.reset() log_structured('info', 'Agent memory reset test successful') else: if not current_agent: log_structured('error', 'Agent memory test failed: agent is None') elif not hasattr(current_agent, 'memory'): log_structured('error', 'Agent memory test failed: agent has no memory attribute') elif not hasattr(current_agent.memory, 'reset'): log_structured('error', 'Agent memory test failed: agent.memory has no reset method') else: log_structured('error', 'Agent memory test failed: unknown reason') return False except Exception as method_err: log_structured('critical', f'Failed to set up agent run method: {str(method_err)}', {'error': str(method_err), 'traceback': traceback.format_exc()}) return False log_structured('info', 'Vector index and agent initialized successfully (vector-only mode).') return True except Exception as e: log_structured('critical', 'Vector index/agent initialization failed', { 'error': str(e), 'traceback': traceback.format_exc() }) return False # --- Phase 2: GraphRAG Background Initialization --- async def initialize_graphrag_components() -> bool: """Initialize GraphRAG components in the background. Connects to Neo4j, restores/extracts triples, builds communities, creates GraphRAG query engine, and dynamically adds the GraphRAG tool to the existing agent's tools list. """ set_graphrag_status(initializing=True, ready=False, error=None) try: import shared_state current_agent = shared_state.global_workflow_agent index = shared_state.global_index if index is None: raise RuntimeError("Vector index must be initialized first") if current_agent is None: raise RuntimeError("Agent must be initialized first") llm = Settings.llm # Connect to Neo4j from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore log_structured('info', f'Connecting to Neo4j at {NEO4J_URL}') property_graph_store = Neo4jPropertyGraphStore( username=NEO4J_USERNAME, password=NEO4J_PASSWORD, url=NEO4J_URL ) log_structured('info', 'Successfully connected to Neo4j database') # Check Neo4j state temp_graph_store = GraphRAGStore(property_graph_store) triplets = temp_graph_store.get_triplets() neo4j_has_data = len(triplets) > 0 if neo4j_has_data: log_structured('info', f'Neo4j contains {len(triplets)} triplets. Using existing data.') graph_store, property_graph_index = create_graph_components( llm=llm, force_reindex=False ) else: # Neo4j is empty — get nodes from vector index for potential extraction log_structured('info', 'Neo4j is empty. Getting nodes from vector index for GraphRAG.') vector_nodes = [] if hasattr(index, 'docstore') and index.docstore: vector_nodes = list(index.docstore.docs.values()) log_structured('info', f'Retrieved {len(vector_nodes)} nodes from index docstore') if not vector_nodes: raise ValueError("No nodes available for GraphRAG indexing") # create_graph_components will try cache restore first, then LLM extraction graph_store, property_graph_index = create_graph_components( llm=llm, nodes=vector_nodes, max_paths_per_chunk=10, force_reindex=False # Allow cache restore path ) # Ensure communities are built if not hasattr(graph_store, 'communities_built') or not graph_store.communities_built: graph_store.build_communities() # Create GraphRAG query engine vector_retriever = VectorIndexRetriever( index=index, similarity_top_k=SIMILARITY_TOP_K ) graphrag_query_engine = create_graphrag_query_engine( vector_retriever=vector_retriever, graph_store=graph_store, llm=llm, similarity_top_k=SIMILARITY_TOP_K ) # Store in shared state set_graphrag_components( graph_store=graph_store, property_graph_index=property_graph_index, graphrag_query_engine=graphrag_query_engine ) # Dynamically add GraphRAG tool to the existing agent graphrag_tool = GraphRAGTool(graphrag_query_engine) live_agent = shared_state.global_workflow_agent if live_agent is not None: live_agent.tools.append(graphrag_tool) log_structured('info', 'Added GraphRAG tool to existing agent', { 'total_tools': len(live_agent.tools), 'tool_names': [t.metadata.name for t in live_agent.tools] }) set_graphrag_status(ready=True, initializing=False) log_structured('info', 'GraphRAG components initialized successfully') return True except Exception as e: log_structured('error', f'GraphRAG background initialization failed: {e}', {'traceback': traceback.format_exc()}) set_graphrag_status(ready=False, initializing=False, error=e) return False # --- Global Index Initialization (backward-compatible wrapper) --- async def initialize_global_index() -> bool: """Backward-compatible wrapper: initializes everything sequentially.""" vector_success = await initialize_vector_index() if not vector_success: return False graphrag_success = await initialize_graphrag_components() if not graphrag_success: log_structured('warning', 'GraphRAG init failed, but vector search is available') return True