Full-stack GraphRAG chatbot for HP marketing materials with: - Python/Flask backend with custom ReAct agent (LlamaIndex) - Neo4j knowledge graph + vector search hybrid retrieval - LlamaParse multimodal document processing (text + images) - React/Vite frontend with conversation management - MongoDB conversation persistence - MSAL authentication support Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
178 lines
No EOL
7.7 KiB
Python
178 lines
No EOL
7.7 KiB
Python
# hp_chatbot/utils.py
|
|
import logging
|
|
import json
|
|
import os
|
|
import shutil
|
|
from datetime import datetime, timezone
|
|
from typing import Optional, Dict, Any
|
|
from config import LOG_FILE_PATH, CHUNK_FOLDER, UPLOAD_METADATA_FOLDER, ALLOWED_EXTENSIONS
|
|
from llama_index.core.tools import ToolOutput # Import ToolOutput for serialization check
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO, # Set level from config later if needed
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(),
|
|
logging.FileHandler(LOG_FILE_PATH)
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Import after logger is defined to avoid circular dependency if CustomJSONEncoder uses logger
|
|
from json_utils import CustomJSONEncoder
|
|
|
|
# --- Logging Helper ---
|
|
def log_structured(level: str, event_message: str, data: Optional[Dict[str, Any]] = None):
|
|
"""
|
|
Structured logging helper
|
|
Args:
|
|
level: The logging level ('info', 'error', etc.)
|
|
event_message: The main message describing the event
|
|
data: Optional dictionary of additional data to log
|
|
"""
|
|
# Basic serializer to handle common non-serializable types safely
|
|
def safe_serialize(obj):
|
|
if isinstance(obj, ToolOutput):
|
|
# Let CustomJSONEncoder handle ToolOutput specifics if passed directly
|
|
return obj # Pass it through for the encoder
|
|
if isinstance(obj, (datetime)):
|
|
return obj.isoformat()
|
|
if isinstance(obj, bytes):
|
|
return "<bytes>"
|
|
if hasattr(obj, '__dict__') and not isinstance(obj, (ToolOutput)): # Avoid double processing ToolOutput
|
|
# Very basic dict representation, avoid complex objects
|
|
try:
|
|
# Limit recursion depth or complexity if needed
|
|
return {k: safe_serialize(v) for k, v in obj.__dict__.items() if not k.startswith('_') and not callable(v)}
|
|
except Exception:
|
|
return f"<Object type {type(obj).__name__}>" # Fallback for complex objects
|
|
# Add more types as needed (e.g., ObjectId, specific LlamaIndex objects if problematic)
|
|
return obj # Let JSON encoder handle the rest or fail
|
|
|
|
log_data = {
|
|
'timestamp': datetime.now(timezone.utc).isoformat(),
|
|
'event': event_message
|
|
}
|
|
|
|
if data is not None:
|
|
try:
|
|
# Apply safe serialization recursively to the data dictionary
|
|
serialized_data = json.loads(json.dumps(data, default=safe_serialize))
|
|
log_data['data'] = serialized_data
|
|
except (TypeError, OverflowError, ValueError) as json_err:
|
|
logger.error(f"Serialization error in log_structured for event '{event_message}': {json_err}. Logging basic info.")
|
|
log_data['data_serialization_error'] = str(json_err)
|
|
# Attempt to log basic data structure if possible
|
|
try:
|
|
basic_data = {k: str(type(v)) for k, v in data.items()}
|
|
log_data['data_structure'] = basic_data
|
|
except Exception:
|
|
log_data['data_structure'] = "<Could not represent data structure>"
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during safe_serialize or json.dumps in log_structured: {e}")
|
|
log_data['logging_error'] = str(e)
|
|
|
|
|
|
try:
|
|
# Use the custom encoder for final JSON dump
|
|
log_string = json.dumps(log_data, cls=CustomJSONEncoder)
|
|
getattr(logger, level.lower())(log_string)
|
|
except AttributeError:
|
|
logger.error(f"Invalid log level: {level}. Defaulting to error.")
|
|
logger.error(json.dumps(log_data, cls=CustomJSONEncoder))
|
|
except Exception as e:
|
|
# Fallback logging if JSON fails completely
|
|
logger.error(f"FATAL: Error serializing log message with CustomJSONEncoder: {e}")
|
|
fallback_msg = f"{event_message}"
|
|
if data:
|
|
fallback_msg += f" | Data keys: {list(data.keys())}"
|
|
logger.error(fallback_msg)
|
|
|
|
|
|
# --- Response Validation ---
|
|
def validate_response(response: dict) -> bool:
|
|
"""Validate the structure of a response"""
|
|
# Simple validation, adjust as needed
|
|
required_fields = {'response', 'sources', 'reasoning'}
|
|
return isinstance(response, dict) and all(field in response for field in required_fields)
|
|
|
|
# --- File Handling Utilities (Keep if chunking/upload is potentially needed later) ---
|
|
def get_upload_metadata(upload_id):
|
|
"""Load metadata for an upload"""
|
|
metadata_path = os.path.join(UPLOAD_METADATA_FOLDER, f"{upload_id}.json")
|
|
if not os.path.exists(metadata_path):
|
|
return None
|
|
try:
|
|
with open(metadata_path, 'r') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
log_structured('error', f'Failed to load upload metadata for {upload_id}', {'error': str(e)})
|
|
return None
|
|
|
|
def save_upload_metadata(upload_id, metadata):
|
|
"""Save metadata for an upload"""
|
|
metadata_path = os.path.join(UPLOAD_METADATA_FOLDER, f"{upload_id}.json")
|
|
try:
|
|
with open(metadata_path, 'w') as f:
|
|
json.dump(metadata, f)
|
|
except Exception as e:
|
|
log_structured('error', f'Failed to save upload metadata for {upload_id}', {'error': str(e)})
|
|
|
|
def get_chunk_path(upload_id, chunk_index):
|
|
"""Get path for a specific chunk"""
|
|
upload_chunk_dir = os.path.join(CHUNK_FOLDER, upload_id)
|
|
os.makedirs(upload_chunk_dir, exist_ok=True)
|
|
return os.path.join(upload_chunk_dir, f"chunk_{chunk_index}")
|
|
|
|
def combine_chunks(upload_id, destination_path):
|
|
"""Combine all chunks into a single file"""
|
|
metadata = get_upload_metadata(upload_id)
|
|
if not metadata or 'totalChunks' not in metadata:
|
|
log_structured('error', f'Metadata missing or invalid for combining chunks: {upload_id}')
|
|
return False
|
|
|
|
upload_chunk_dir = os.path.join(CHUNK_FOLDER, upload_id)
|
|
try:
|
|
with open(destination_path, 'wb') as outfile:
|
|
for i in range(metadata['totalChunks']):
|
|
chunk_path = os.path.join(upload_chunk_dir, f"chunk_{i}")
|
|
if os.path.exists(chunk_path):
|
|
with open(chunk_path, 'rb') as infile:
|
|
outfile.write(infile.read())
|
|
else:
|
|
log_structured('error', f'Chunk {i} missing for upload {upload_id}')
|
|
# Clean up partially created file
|
|
if os.path.exists(destination_path):
|
|
os.remove(destination_path)
|
|
return False
|
|
return True
|
|
except Exception as e:
|
|
log_structured('error', f'Error combining chunks for {upload_id}', {'error': str(e)})
|
|
# Clean up partially created file
|
|
if os.path.exists(destination_path):
|
|
os.remove(destination_path)
|
|
return False
|
|
|
|
def clear_upload_chunks(upload_id):
|
|
"""Remove all chunks and metadata for an upload"""
|
|
upload_chunk_dir = os.path.join(CHUNK_FOLDER, upload_id)
|
|
if os.path.exists(upload_chunk_dir):
|
|
try:
|
|
shutil.rmtree(upload_chunk_dir)
|
|
log_structured('info', f'Cleared chunk directory: {upload_chunk_dir}')
|
|
except Exception as e:
|
|
log_structured('error', f'Failed to remove chunk directory {upload_chunk_dir}', {'error': str(e)})
|
|
|
|
metadata_path = os.path.join(UPLOAD_METADATA_FOLDER, f"{upload_id}.json")
|
|
if os.path.exists(metadata_path):
|
|
try:
|
|
os.remove(metadata_path)
|
|
log_structured('info', f'Cleared metadata file: {metadata_path}')
|
|
except Exception as e:
|
|
log_structured('error', f'Failed to remove metadata file {metadata_path}', {'error': str(e)})
|
|
|
|
|
|
def allowed_file(filename):
|
|
"""Check if a filename has an allowed extension"""
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS |