# hp_chatbot/utils.py import logging import json import os import shutil from datetime import datetime, timezone from typing import Optional, Dict, Any from config import LOG_FILE_PATH, CHUNK_FOLDER, UPLOAD_METADATA_FOLDER, ALLOWED_EXTENSIONS from llama_index.core.tools import ToolOutput # Import ToolOutput for serialization check # Configure logging logging.basicConfig( level=logging.INFO, # Set level from config later if needed format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler(LOG_FILE_PATH) ] ) logger = logging.getLogger(__name__) # Import after logger is defined to avoid circular dependency if CustomJSONEncoder uses logger from json_utils import CustomJSONEncoder # --- Logging Helper --- def log_structured(level: str, event_message: str, data: Optional[Dict[str, Any]] = None): """ Structured logging helper Args: level: The logging level ('info', 'error', etc.) event_message: The main message describing the event data: Optional dictionary of additional data to log """ # Basic serializer to handle common non-serializable types safely def safe_serialize(obj): if isinstance(obj, ToolOutput): # Let CustomJSONEncoder handle ToolOutput specifics if passed directly return obj # Pass it through for the encoder if isinstance(obj, (datetime)): return obj.isoformat() if isinstance(obj, bytes): return "" if hasattr(obj, '__dict__') and not isinstance(obj, (ToolOutput)): # Avoid double processing ToolOutput # Very basic dict representation, avoid complex objects try: # Limit recursion depth or complexity if needed return {k: safe_serialize(v) for k, v in obj.__dict__.items() if not k.startswith('_') and not callable(v)} except Exception: return f"" # Fallback for complex objects # Add more types as needed (e.g., ObjectId, specific LlamaIndex objects if problematic) return obj # Let JSON encoder handle the rest or fail log_data = { 'timestamp': datetime.now(timezone.utc).isoformat(), 'event': event_message } if data is not None: try: # Apply safe serialization recursively to the data dictionary serialized_data = json.loads(json.dumps(data, default=safe_serialize)) log_data['data'] = serialized_data except (TypeError, OverflowError, ValueError) as json_err: logger.error(f"Serialization error in log_structured for event '{event_message}': {json_err}. Logging basic info.") log_data['data_serialization_error'] = str(json_err) # Attempt to log basic data structure if possible try: basic_data = {k: str(type(v)) for k, v in data.items()} log_data['data_structure'] = basic_data except Exception: log_data['data_structure'] = "" except Exception as e: logger.error(f"Unexpected error during safe_serialize or json.dumps in log_structured: {e}") log_data['logging_error'] = str(e) try: # Use the custom encoder for final JSON dump log_string = json.dumps(log_data, cls=CustomJSONEncoder) getattr(logger, level.lower())(log_string) except AttributeError: logger.error(f"Invalid log level: {level}. Defaulting to error.") logger.error(json.dumps(log_data, cls=CustomJSONEncoder)) except Exception as e: # Fallback logging if JSON fails completely logger.error(f"FATAL: Error serializing log message with CustomJSONEncoder: {e}") fallback_msg = f"{event_message}" if data: fallback_msg += f" | Data keys: {list(data.keys())}" logger.error(fallback_msg) # --- Response Validation --- def validate_response(response: dict) -> bool: """Validate the structure of a response""" # Simple validation, adjust as needed required_fields = {'response', 'sources', 'reasoning'} return isinstance(response, dict) and all(field in response for field in required_fields) # --- File Handling Utilities (Keep if chunking/upload is potentially needed later) --- def get_upload_metadata(upload_id): """Load metadata for an upload""" metadata_path = os.path.join(UPLOAD_METADATA_FOLDER, f"{upload_id}.json") if not os.path.exists(metadata_path): return None try: with open(metadata_path, 'r') as f: return json.load(f) except Exception as e: log_structured('error', f'Failed to load upload metadata for {upload_id}', {'error': str(e)}) return None def save_upload_metadata(upload_id, metadata): """Save metadata for an upload""" metadata_path = os.path.join(UPLOAD_METADATA_FOLDER, f"{upload_id}.json") try: with open(metadata_path, 'w') as f: json.dump(metadata, f) except Exception as e: log_structured('error', f'Failed to save upload metadata for {upload_id}', {'error': str(e)}) def get_chunk_path(upload_id, chunk_index): """Get path for a specific chunk""" upload_chunk_dir = os.path.join(CHUNK_FOLDER, upload_id) os.makedirs(upload_chunk_dir, exist_ok=True) return os.path.join(upload_chunk_dir, f"chunk_{chunk_index}") def combine_chunks(upload_id, destination_path): """Combine all chunks into a single file""" metadata = get_upload_metadata(upload_id) if not metadata or 'totalChunks' not in metadata: log_structured('error', f'Metadata missing or invalid for combining chunks: {upload_id}') return False upload_chunk_dir = os.path.join(CHUNK_FOLDER, upload_id) try: with open(destination_path, 'wb') as outfile: for i in range(metadata['totalChunks']): chunk_path = os.path.join(upload_chunk_dir, f"chunk_{i}") if os.path.exists(chunk_path): with open(chunk_path, 'rb') as infile: outfile.write(infile.read()) else: log_structured('error', f'Chunk {i} missing for upload {upload_id}') # Clean up partially created file if os.path.exists(destination_path): os.remove(destination_path) return False return True except Exception as e: log_structured('error', f'Error combining chunks for {upload_id}', {'error': str(e)}) # Clean up partially created file if os.path.exists(destination_path): os.remove(destination_path) return False def clear_upload_chunks(upload_id): """Remove all chunks and metadata for an upload""" upload_chunk_dir = os.path.join(CHUNK_FOLDER, upload_id) if os.path.exists(upload_chunk_dir): try: shutil.rmtree(upload_chunk_dir) log_structured('info', f'Cleared chunk directory: {upload_chunk_dir}') except Exception as e: log_structured('error', f'Failed to remove chunk directory {upload_chunk_dir}', {'error': str(e)}) metadata_path = os.path.join(UPLOAD_METADATA_FOLDER, f"{upload_id}.json") if os.path.exists(metadata_path): try: os.remove(metadata_path) log_structured('info', f'Cleared metadata file: {metadata_path}') except Exception as e: log_structured('error', f'Failed to remove metadata file {metadata_path}', {'error': str(e)}) def allowed_file(filename): """Check if a filename has an allowed extension""" return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS