hp_chatbot/utils.py
michael 594f749d4c Initial commit: HP Marketing Materials GraphRAG Chatbot
Full-stack GraphRAG chatbot for HP marketing materials with:
- Python/Flask backend with custom ReAct agent (LlamaIndex)
- Neo4j knowledge graph + vector search hybrid retrieval
- LlamaParse multimodal document processing (text + images)
- React/Vite frontend with conversation management
- MongoDB conversation persistence
- MSAL authentication support

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 08:37:58 -06:00

178 lines
No EOL
7.7 KiB
Python

# hp_chatbot/utils.py
import logging
import json
import os
import shutil
from datetime import datetime, timezone
from typing import Optional, Dict, Any
from config import LOG_FILE_PATH, CHUNK_FOLDER, UPLOAD_METADATA_FOLDER, ALLOWED_EXTENSIONS
from llama_index.core.tools import ToolOutput # Import ToolOutput for serialization check
# Configure logging
logging.basicConfig(
level=logging.INFO, # Set level from config later if needed
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(LOG_FILE_PATH)
]
)
logger = logging.getLogger(__name__)
# Import after logger is defined to avoid circular dependency if CustomJSONEncoder uses logger
from json_utils import CustomJSONEncoder
# --- Logging Helper ---
def log_structured(level: str, event_message: str, data: Optional[Dict[str, Any]] = None):
"""
Structured logging helper
Args:
level: The logging level ('info', 'error', etc.)
event_message: The main message describing the event
data: Optional dictionary of additional data to log
"""
# Basic serializer to handle common non-serializable types safely
def safe_serialize(obj):
if isinstance(obj, ToolOutput):
# Let CustomJSONEncoder handle ToolOutput specifics if passed directly
return obj # Pass it through for the encoder
if isinstance(obj, (datetime)):
return obj.isoformat()
if isinstance(obj, bytes):
return "<bytes>"
if hasattr(obj, '__dict__') and not isinstance(obj, (ToolOutput)): # Avoid double processing ToolOutput
# Very basic dict representation, avoid complex objects
try:
# Limit recursion depth or complexity if needed
return {k: safe_serialize(v) for k, v in obj.__dict__.items() if not k.startswith('_') and not callable(v)}
except Exception:
return f"<Object type {type(obj).__name__}>" # Fallback for complex objects
# Add more types as needed (e.g., ObjectId, specific LlamaIndex objects if problematic)
return obj # Let JSON encoder handle the rest or fail
log_data = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'event': event_message
}
if data is not None:
try:
# Apply safe serialization recursively to the data dictionary
serialized_data = json.loads(json.dumps(data, default=safe_serialize))
log_data['data'] = serialized_data
except (TypeError, OverflowError, ValueError) as json_err:
logger.error(f"Serialization error in log_structured for event '{event_message}': {json_err}. Logging basic info.")
log_data['data_serialization_error'] = str(json_err)
# Attempt to log basic data structure if possible
try:
basic_data = {k: str(type(v)) for k, v in data.items()}
log_data['data_structure'] = basic_data
except Exception:
log_data['data_structure'] = "<Could not represent data structure>"
except Exception as e:
logger.error(f"Unexpected error during safe_serialize or json.dumps in log_structured: {e}")
log_data['logging_error'] = str(e)
try:
# Use the custom encoder for final JSON dump
log_string = json.dumps(log_data, cls=CustomJSONEncoder)
getattr(logger, level.lower())(log_string)
except AttributeError:
logger.error(f"Invalid log level: {level}. Defaulting to error.")
logger.error(json.dumps(log_data, cls=CustomJSONEncoder))
except Exception as e:
# Fallback logging if JSON fails completely
logger.error(f"FATAL: Error serializing log message with CustomJSONEncoder: {e}")
fallback_msg = f"{event_message}"
if data:
fallback_msg += f" | Data keys: {list(data.keys())}"
logger.error(fallback_msg)
# --- Response Validation ---
def validate_response(response: dict) -> bool:
"""Validate the structure of a response"""
# Simple validation, adjust as needed
required_fields = {'response', 'sources', 'reasoning'}
return isinstance(response, dict) and all(field in response for field in required_fields)
# --- File Handling Utilities (Keep if chunking/upload is potentially needed later) ---
def get_upload_metadata(upload_id):
"""Load metadata for an upload"""
metadata_path = os.path.join(UPLOAD_METADATA_FOLDER, f"{upload_id}.json")
if not os.path.exists(metadata_path):
return None
try:
with open(metadata_path, 'r') as f:
return json.load(f)
except Exception as e:
log_structured('error', f'Failed to load upload metadata for {upload_id}', {'error': str(e)})
return None
def save_upload_metadata(upload_id, metadata):
"""Save metadata for an upload"""
metadata_path = os.path.join(UPLOAD_METADATA_FOLDER, f"{upload_id}.json")
try:
with open(metadata_path, 'w') as f:
json.dump(metadata, f)
except Exception as e:
log_structured('error', f'Failed to save upload metadata for {upload_id}', {'error': str(e)})
def get_chunk_path(upload_id, chunk_index):
"""Get path for a specific chunk"""
upload_chunk_dir = os.path.join(CHUNK_FOLDER, upload_id)
os.makedirs(upload_chunk_dir, exist_ok=True)
return os.path.join(upload_chunk_dir, f"chunk_{chunk_index}")
def combine_chunks(upload_id, destination_path):
"""Combine all chunks into a single file"""
metadata = get_upload_metadata(upload_id)
if not metadata or 'totalChunks' not in metadata:
log_structured('error', f'Metadata missing or invalid for combining chunks: {upload_id}')
return False
upload_chunk_dir = os.path.join(CHUNK_FOLDER, upload_id)
try:
with open(destination_path, 'wb') as outfile:
for i in range(metadata['totalChunks']):
chunk_path = os.path.join(upload_chunk_dir, f"chunk_{i}")
if os.path.exists(chunk_path):
with open(chunk_path, 'rb') as infile:
outfile.write(infile.read())
else:
log_structured('error', f'Chunk {i} missing for upload {upload_id}')
# Clean up partially created file
if os.path.exists(destination_path):
os.remove(destination_path)
return False
return True
except Exception as e:
log_structured('error', f'Error combining chunks for {upload_id}', {'error': str(e)})
# Clean up partially created file
if os.path.exists(destination_path):
os.remove(destination_path)
return False
def clear_upload_chunks(upload_id):
"""Remove all chunks and metadata for an upload"""
upload_chunk_dir = os.path.join(CHUNK_FOLDER, upload_id)
if os.path.exists(upload_chunk_dir):
try:
shutil.rmtree(upload_chunk_dir)
log_structured('info', f'Cleared chunk directory: {upload_chunk_dir}')
except Exception as e:
log_structured('error', f'Failed to remove chunk directory {upload_chunk_dir}', {'error': str(e)})
metadata_path = os.path.join(UPLOAD_METADATA_FOLDER, f"{upload_id}.json")
if os.path.exists(metadata_path):
try:
os.remove(metadata_path)
log_structured('info', f'Cleared metadata file: {metadata_path}')
except Exception as e:
log_structured('error', f'Failed to remove metadata file {metadata_path}', {'error': str(e)})
def allowed_file(filename):
"""Check if a filename has an allowed extension"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS