""" Comprehensive error reporting and tracking system. Provides structured error reporting with: - Auto-categorization of errors - User-friendly messages - Technical debug information - Suggested fixes for common issues - Unique error IDs for tracking Author: Video Query Application """ import sys import uuid import logging import traceback import platform import os from enum import Enum from datetime import datetime from typing import Optional, Dict, Any, Tuple from dataclasses import dataclass, asdict, field import json logger = logging.getLogger('video_query') class ErrorCategory(Enum): """Error categories for classification.""" SYSTEM_ERROR = "system" # OS, dependencies, paths API_ERROR = "api" # Gemini API errors VIDEO_ERROR = "video" # Video file/encoding issues USER_ERROR = "user" # Invalid input/config NETWORK_ERROR = "network" # Connection issues UPLOAD_ERROR = "upload" # File upload issues UNKNOWN_ERROR = "unknown" # Unexpected errors @dataclass class ErrorReport: """Structured error report with full context.""" error_id: str category: ErrorCategory message: str # User-friendly message technical_details: str # Stack trace, etc. context: Dict[str, Any] # Additional context suggested_fix: str # Actionable solution timestamp: str system_info: Dict[str, str] severity: str = "error" # error, warning, critical def to_dict(self) -> Dict: """Convert to dictionary.""" data = asdict(self) data['category'] = self.category.value return data def to_json(self) -> str: """Convert to JSON string.""" return json.dumps(self.to_dict(), indent=2, default=str) def format_user_message(self) -> str: """ Format user-friendly error message. Returns: String suitable for displaying to end users """ msg = f"❌ {self.message}\n\n" if self.suggested_fix: msg += f"💡 Suggested Fix:\n{self.suggested_fix}\n\n" msg += f"📋 Error ID: {self.error_id}\n" msg += f" (Reference this ID when reporting issues)\n" return msg def format_technical(self) -> str: """ Format technical debug message. Returns: Detailed technical information for logs """ separator = "="*80 msg = f"\n{separator}\n" msg += f"ERROR REPORT: {self.error_id}\n" msg += f"{separator}\n" msg += f"Category: {self.category.value.upper()}\n" msg += f"Severity: {self.severity.upper()}\n" msg += f"Timestamp: {self.timestamp}\n" msg += f"\nUser Message:\n{self.message}\n" if self.context: msg += f"\nContext:\n" for key, value in self.context.items(): msg += f" {key}: {value}\n" msg += f"\nSystem Information:\n" for key, value in self.system_info.items(): msg += f" {key}: {value}\n" if self.suggested_fix: msg += f"\nSuggested Fix:\n{self.suggested_fix}\n" msg += f"\nTechnical Details:\n{self.technical_details}\n" msg += f"{separator}\n" return msg def format_short(self) -> str: """ Format short one-line summary. Returns: Brief error summary """ return f"[{self.error_id}] {self.category.value}: {self.message[:100]}" class ErrorReporter: """Error reporting and tracking system.""" # Store recent errors (last 100) _recent_errors: list = [] _max_recent = 100 @staticmethod def capture_error(exception: Exception, category: Optional[ErrorCategory] = None, context: Optional[Dict] = None, severity: str = "error") -> ErrorReport: """ Capture exception and create structured error report. Args: exception: The exception that occurred category: Error category (auto-detected if None) context: Additional context information (file paths, request data, etc.) severity: Error severity (error, warning, critical) Returns: ErrorReport instance with full details """ # Generate unique error ID error_id = str(uuid.uuid4())[:8].upper() # Auto-categorize if not provided if category is None: category = ErrorReporter._categorize_error(exception) # Extract exception details exc_type, exc_value, exc_traceback = sys.exc_info() if exc_traceback: stack_trace = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback)) else: # If no traceback available, create basic info stack_trace = f"{type(exception).__name__}: {str(exception)}" # Generate user-friendly message user_message = ErrorReporter._generate_user_message(exception, category) # Generate suggested fix suggested_fix = ErrorReporter._suggest_fix(exception, category, context or {}) # Gather system info system_info = ErrorReporter._gather_system_info() # Create error report report = ErrorReport( error_id=error_id, category=category, message=user_message, technical_details=stack_trace, context=context or {}, suggested_fix=suggested_fix, timestamp=datetime.now().isoformat(), system_info=system_info, severity=severity ) # Log the error if severity == "critical": logger.critical(report.format_technical()) elif severity == "warning": logger.warning(report.format_technical()) else: logger.error(report.format_technical()) # Store in recent errors ErrorReporter._recent_errors.append(report) if len(ErrorReporter._recent_errors) > ErrorReporter._max_recent: ErrorReporter._recent_errors.pop(0) return report @staticmethod def _categorize_error(exception: Exception) -> ErrorCategory: """ Auto-categorize exception based on error message and type. Args: exception: The exception to categorize Returns: Appropriate ErrorCategory """ error_str = str(exception).lower() exc_type = type(exception).__name__.lower() # System errors (missing dependencies, file not found, etc.) if any(x in error_str for x in ['not found', 'no such file', 'permission denied', 'access denied']): return ErrorCategory.SYSTEM_ERROR if any(x in exc_type for x in ['filenotfound', 'oserror', 'ioerror']): return ErrorCategory.SYSTEM_ERROR if any(x in error_str for x in ['ffprobe', 'ffmpeg', 'wkhtmltopdf']): return ErrorCategory.SYSTEM_ERROR # API errors (Gemini API issues) if any(x in error_str for x in ['503', '500', '502', 'unavailable', 'overload', 'service']): return ErrorCategory.API_ERROR if any(x in error_str for x in ['429', 'rate limit', 'quota', 'resource_exhausted']): return ErrorCategory.API_ERROR if 'invalid_argument' in error_str and '400' in error_str: return ErrorCategory.API_ERROR # Video errors (file format, encoding, corruption) if any(x in error_str for x in ['moov atom', 'invalid data', 'codec', 'corrupted', 'duration']): return ErrorCategory.VIDEO_ERROR if any(x in error_str for x in ['video file', 'format', 'encoding']): return ErrorCategory.VIDEO_ERROR # Network errors if any(x in error_str for x in ['connection', 'timeout', 'network', 'dns', 'resolve']): return ErrorCategory.NETWORK_ERROR if any(x in exc_type for x in ['connectionerror', 'timeout']): return ErrorCategory.NETWORK_ERROR # Upload errors if any(x in error_str for x in ['upload', 'chunk', 'multipart']): return ErrorCategory.UPLOAD_ERROR if any(x in error_str for x in ['file size', 'too large', 'entity too large']): return ErrorCategory.UPLOAD_ERROR # User errors (invalid input, configuration) if any(x in error_str for x in ['invalid', 'missing', 'required', 'must be']): return ErrorCategory.USER_ERROR # Default to unknown return ErrorCategory.UNKNOWN_ERROR @staticmethod def _generate_user_message(exception: Exception, category: ErrorCategory) -> str: """ Generate user-friendly error message based on category. Args: exception: The exception category: Error category Returns: User-friendly error message """ error_str = str(exception) if category == ErrorCategory.SYSTEM_ERROR: if 'ffprobe' in error_str or 'ffmpeg' in error_str: return "System dependency missing: FFmpeg/FFprobe is not installed or not accessible" elif 'wkhtmltopdf' in error_str: return "System dependency missing: wkhtmltopdf is not installed or not accessible" else: return f"System configuration issue: {error_str}" elif category == ErrorCategory.API_ERROR: if '503' in error_str or 'unavailable' in error_str.lower(): return "Gemini API is temporarily overloaded or unavailable" elif '429' in error_str or 'rate limit' in error_str.lower(): return "API rate limit exceeded - too many requests sent too quickly" elif '500' in error_str: return "Gemini API internal server error" else: return f"API service error: {error_str}" elif category == ErrorCategory.VIDEO_ERROR: if 'moov atom' in error_str.lower(): return "Video file is incomplete or corrupted (missing header data)" elif 'duration' in error_str.lower(): return "Cannot determine video duration - file may be corrupted or unsupported format" elif 'codec' in error_str.lower(): return "Video codec not supported or corrupted" else: return f"Video file processing error: {error_str}" elif category == ErrorCategory.NETWORK_ERROR: return f"Network connectivity issue: {error_str}" elif category == ErrorCategory.UPLOAD_ERROR: if 'too large' in error_str.lower(): return "File is too large for upload (maximum 5GB)" else: return f"File upload error: {error_str}" elif category == ErrorCategory.USER_ERROR: return f"Invalid input or configuration: {error_str}" else: return f"Unexpected error: {error_str}" @staticmethod def _suggest_fix(exception: Exception, category: ErrorCategory, context: Dict) -> str: """ Generate suggested fix based on error type. Args: exception: The exception category: Error category context: Additional context Returns: Suggested fix or troubleshooting steps """ error_str = str(exception).lower() # System errors - installation instructions if 'ffprobe' in error_str or 'ffmpeg' in error_str: system = platform.system().lower() if 'darwin' in system: return ( "Install FFmpeg using Homebrew:\n" " brew install ffmpeg\n\n" "Then restart the application." ) else: return ( "Install FFmpeg:\n" " Ubuntu/Debian: sudo apt-get install ffmpeg\n" " CentOS/RHEL: sudo yum install ffmpeg\n\n" "Then restart the application." ) if 'wkhtmltopdf' in error_str: system = platform.system().lower() if 'darwin' in system: return "Install wkhtmltopdf: brew install wkhtmltopdf" else: return "Install wkhtmltopdf: sudo apt-get install wkhtmltopdf" # API errors - retry and configuration if '503' in error_str or 'overload' in error_str: return ( "The API is temporarily overloaded. The system will automatically retry.\n" "If this persists:\n" " 1. Wait a few minutes and try again\n" " 2. Reduce parallel processing: set MAX_PARALLEL_CHUNKS=1 in .env\n" " 3. Set GEMINI_API_TIER=free in .env for conservative rate limiting" ) if '429' in error_str or 'rate limit' in error_str: return ( "Rate limit exceeded. To fix:\n" " 1. Set GEMINI_API_TIER=free in backend/.env\n" " 2. Set MAX_PARALLEL_CHUNKS=1 in backend/.env\n" " 3. Wait a few minutes before trying again\n" " 4. Consider upgrading to paid API tier for higher limits" ) if '400' in error_str and 'invalid_argument' in error_str: return ( "Invalid request to Gemini API. Possible causes:\n" " 1. Video file may be corrupted or in unsupported format\n" " 2. Video duration may be too short (<1 second)\n" " 3. Video file size may exceed limits\n" "Check the logs for more details about what was rejected." ) # Video errors - file issues if 'moov atom' in error_str: return ( "Video file is incomplete or corrupted:\n" " 1. Try re-uploading the file\n" " 2. If the issue persists, re-encode the video:\n" " ffmpeg -i input.mp4 -c copy output.mp4\n" " 3. Ensure the video file fully uploaded before processing" ) if 'duration' in error_str and context.get('video_path'): return ( "Cannot determine video duration:\n" " 1. Check that the video file is not corrupted\n" " 2. Try playing the video in a media player to verify\n" " 3. Re-encode the video if necessary:\n" " ffmpeg -i input.mp4 -c:v libx264 -c:a aac output.mp4" ) # Network errors if 'connection' in error_str or 'timeout' in error_str: return ( "Network connectivity issue:\n" " 1. Check your internet connection\n" " 2. Verify firewall isn't blocking the application\n" " 3. Try again in a few moments\n" " 4. Check if Gemini API is accessible from your network" ) # Upload errors if 'too large' in error_str: return ( "File exceeds maximum size (5GB):\n" " 1. Compress the video to reduce file size\n" " 2. Use a lower resolution or bitrate\n" " 3. Split into smaller segments" ) # Generic fallback if category == ErrorCategory.UNKNOWN_ERROR: return ( "Unexpected error occurred:\n" " 1. Check the application logs for more details\n" " 2. Try restarting the application\n" " 3. Report this error with the Error ID if it persists" ) return "Check the logs for more details or contact support with the Error ID." @staticmethod def _gather_system_info() -> Dict[str, str]: """ Gather system information for error context. Returns: Dictionary with system details """ return { 'platform': platform.system(), 'platform_release': platform.release(), 'platform_version': platform.version(), 'architecture': platform.machine(), 'python_version': platform.python_version(), 'hostname': platform.node(), 'processor': platform.processor() or 'unknown' } @staticmethod def get_recent_errors(limit: int = 10) -> list: """ Get recent error reports. Args: limit: Maximum number of errors to return Returns: List of recent ErrorReport objects """ return ErrorReporter._recent_errors[-limit:] @staticmethod def find_error_by_id(error_id: str) -> Optional[ErrorReport]: """ Find error report by ID. Args: error_id: Error ID to search for Returns: ErrorReport if found, None otherwise """ for error in ErrorReporter._recent_errors: if error.error_id == error_id: return error return None @staticmethod def export_errors_to_file(filepath: str, limit: Optional[int] = None): """ Export error reports to JSON file. Args: filepath: Path to output file limit: Number of recent errors to export (None = all) """ errors_to_export = ErrorReporter._recent_errors[-limit:] if limit else ErrorReporter._recent_errors errors_data = [error.to_dict() for error in errors_to_export] with open(filepath, 'w') as f: json.dump(errors_data, f, indent=2, default=str) logger.info(f"Exported {len(errors_data)} error reports to {filepath}") @staticmethod def clear_errors(): """Clear all stored error reports.""" ErrorReporter._recent_errors.clear() logger.info("Cleared all error reports") # Module-level convenience functions def capture_error(exception: Exception, **kwargs) -> ErrorReport: """ Convenience function for capturing errors. Args: exception: The exception to capture **kwargs: Additional arguments for ErrorReporter.capture_error() Returns: ErrorReport instance """ return ErrorReporter.capture_error(exception, **kwargs) def get_recent_errors(limit: int = 10) -> list: """Get recent errors (convenience function).""" return ErrorReporter.get_recent_errors(limit) def find_error_by_id(error_id: str) -> Optional[ErrorReport]: """Find error by ID (convenience function).""" return ErrorReporter.find_error_by_id(error_id) if __name__ == "__main__": """Test the error reporter.""" print("="*80) print("Error Reporter Test") print("="*80) # Test different error types test_errors = [ (FileNotFoundError("ffprobe not found at /usr/bin/ffprobe"), "System Error"), (Exception("503 UNAVAILABLE: Model overloaded"), "API Error"), (Exception("moov atom not found"), "Video Error"), (ConnectionError("Connection timeout"), "Network Error"), ] for exception, description in test_errors: print(f"\n--- Testing: {description} ---") try: raise exception except Exception as e: report = capture_error(e, context={'test': description}) print(report.format_user_message()) print("\n" + "="*80) print(f"Total errors captured: {len(get_recent_errors())}") print("="*80)