552 lines
19 KiB
Python
552 lines
19 KiB
Python
"""
|
|
Comprehensive error reporting and tracking system.
|
|
|
|
Provides structured error reporting with:
|
|
- Auto-categorization of errors
|
|
- User-friendly messages
|
|
- Technical debug information
|
|
- Suggested fixes for common issues
|
|
- Unique error IDs for tracking
|
|
|
|
Author: Video Query Application
|
|
"""
|
|
|
|
import sys
|
|
import uuid
|
|
import logging
|
|
import traceback
|
|
import platform
|
|
import os
|
|
from enum import Enum
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, Any, Tuple
|
|
from dataclasses import dataclass, asdict, field
|
|
import json
|
|
|
|
logger = logging.getLogger('video_query')
|
|
|
|
|
|
class ErrorCategory(Enum):
|
|
"""Error categories for classification."""
|
|
SYSTEM_ERROR = "system" # OS, dependencies, paths
|
|
API_ERROR = "api" # Gemini API errors
|
|
VIDEO_ERROR = "video" # Video file/encoding issues
|
|
USER_ERROR = "user" # Invalid input/config
|
|
NETWORK_ERROR = "network" # Connection issues
|
|
UPLOAD_ERROR = "upload" # File upload issues
|
|
UNKNOWN_ERROR = "unknown" # Unexpected errors
|
|
|
|
|
|
@dataclass
|
|
class ErrorReport:
|
|
"""Structured error report with full context."""
|
|
error_id: str
|
|
category: ErrorCategory
|
|
message: str # User-friendly message
|
|
technical_details: str # Stack trace, etc.
|
|
context: Dict[str, Any] # Additional context
|
|
suggested_fix: str # Actionable solution
|
|
timestamp: str
|
|
system_info: Dict[str, str]
|
|
severity: str = "error" # error, warning, critical
|
|
|
|
def to_dict(self) -> Dict:
|
|
"""Convert to dictionary."""
|
|
data = asdict(self)
|
|
data['category'] = self.category.value
|
|
return data
|
|
|
|
def to_json(self) -> str:
|
|
"""Convert to JSON string."""
|
|
return json.dumps(self.to_dict(), indent=2, default=str)
|
|
|
|
def format_user_message(self) -> str:
|
|
"""
|
|
Format user-friendly error message.
|
|
|
|
Returns:
|
|
String suitable for displaying to end users
|
|
"""
|
|
msg = f"❌ {self.message}\n\n"
|
|
if self.suggested_fix:
|
|
msg += f"💡 Suggested Fix:\n{self.suggested_fix}\n\n"
|
|
msg += f"📋 Error ID: {self.error_id}\n"
|
|
msg += f" (Reference this ID when reporting issues)\n"
|
|
return msg
|
|
|
|
def format_technical(self) -> str:
|
|
"""
|
|
Format technical debug message.
|
|
|
|
Returns:
|
|
Detailed technical information for logs
|
|
"""
|
|
separator = "="*80
|
|
msg = f"\n{separator}\n"
|
|
msg += f"ERROR REPORT: {self.error_id}\n"
|
|
msg += f"{separator}\n"
|
|
msg += f"Category: {self.category.value.upper()}\n"
|
|
msg += f"Severity: {self.severity.upper()}\n"
|
|
msg += f"Timestamp: {self.timestamp}\n"
|
|
msg += f"\nUser Message:\n{self.message}\n"
|
|
|
|
if self.context:
|
|
msg += f"\nContext:\n"
|
|
for key, value in self.context.items():
|
|
msg += f" {key}: {value}\n"
|
|
|
|
msg += f"\nSystem Information:\n"
|
|
for key, value in self.system_info.items():
|
|
msg += f" {key}: {value}\n"
|
|
|
|
if self.suggested_fix:
|
|
msg += f"\nSuggested Fix:\n{self.suggested_fix}\n"
|
|
|
|
msg += f"\nTechnical Details:\n{self.technical_details}\n"
|
|
msg += f"{separator}\n"
|
|
return msg
|
|
|
|
def format_short(self) -> str:
|
|
"""
|
|
Format short one-line summary.
|
|
|
|
Returns:
|
|
Brief error summary
|
|
"""
|
|
return f"[{self.error_id}] {self.category.value}: {self.message[:100]}"
|
|
|
|
|
|
class ErrorReporter:
|
|
"""Error reporting and tracking system."""
|
|
|
|
# Store recent errors (last 100)
|
|
_recent_errors: list = []
|
|
_max_recent = 100
|
|
|
|
@staticmethod
|
|
def capture_error(exception: Exception,
|
|
category: Optional[ErrorCategory] = None,
|
|
context: Optional[Dict] = None,
|
|
severity: str = "error") -> ErrorReport:
|
|
"""
|
|
Capture exception and create structured error report.
|
|
|
|
Args:
|
|
exception: The exception that occurred
|
|
category: Error category (auto-detected if None)
|
|
context: Additional context information (file paths, request data, etc.)
|
|
severity: Error severity (error, warning, critical)
|
|
|
|
Returns:
|
|
ErrorReport instance with full details
|
|
"""
|
|
# Generate unique error ID
|
|
error_id = str(uuid.uuid4())[:8].upper()
|
|
|
|
# Auto-categorize if not provided
|
|
if category is None:
|
|
category = ErrorReporter._categorize_error(exception)
|
|
|
|
# Extract exception details
|
|
exc_type, exc_value, exc_traceback = sys.exc_info()
|
|
if exc_traceback:
|
|
stack_trace = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
|
|
else:
|
|
# If no traceback available, create basic info
|
|
stack_trace = f"{type(exception).__name__}: {str(exception)}"
|
|
|
|
# Generate user-friendly message
|
|
user_message = ErrorReporter._generate_user_message(exception, category)
|
|
|
|
# Generate suggested fix
|
|
suggested_fix = ErrorReporter._suggest_fix(exception, category, context or {})
|
|
|
|
# Gather system info
|
|
system_info = ErrorReporter._gather_system_info()
|
|
|
|
# Create error report
|
|
report = ErrorReport(
|
|
error_id=error_id,
|
|
category=category,
|
|
message=user_message,
|
|
technical_details=stack_trace,
|
|
context=context or {},
|
|
suggested_fix=suggested_fix,
|
|
timestamp=datetime.now().isoformat(),
|
|
system_info=system_info,
|
|
severity=severity
|
|
)
|
|
|
|
# Log the error
|
|
if severity == "critical":
|
|
logger.critical(report.format_technical())
|
|
elif severity == "warning":
|
|
logger.warning(report.format_technical())
|
|
else:
|
|
logger.error(report.format_technical())
|
|
|
|
# Store in recent errors
|
|
ErrorReporter._recent_errors.append(report)
|
|
if len(ErrorReporter._recent_errors) > ErrorReporter._max_recent:
|
|
ErrorReporter._recent_errors.pop(0)
|
|
|
|
return report
|
|
|
|
@staticmethod
|
|
def _categorize_error(exception: Exception) -> ErrorCategory:
|
|
"""
|
|
Auto-categorize exception based on error message and type.
|
|
|
|
Args:
|
|
exception: The exception to categorize
|
|
|
|
Returns:
|
|
Appropriate ErrorCategory
|
|
"""
|
|
error_str = str(exception).lower()
|
|
exc_type = type(exception).__name__.lower()
|
|
|
|
# System errors (missing dependencies, file not found, etc.)
|
|
if any(x in error_str for x in ['not found', 'no such file', 'permission denied', 'access denied']):
|
|
return ErrorCategory.SYSTEM_ERROR
|
|
|
|
if any(x in exc_type for x in ['filenotfound', 'oserror', 'ioerror']):
|
|
return ErrorCategory.SYSTEM_ERROR
|
|
|
|
if any(x in error_str for x in ['ffprobe', 'ffmpeg', 'wkhtmltopdf']):
|
|
return ErrorCategory.SYSTEM_ERROR
|
|
|
|
# API errors (Gemini API issues)
|
|
if any(x in error_str for x in ['503', '500', '502', 'unavailable', 'overload', 'service']):
|
|
return ErrorCategory.API_ERROR
|
|
|
|
if any(x in error_str for x in ['429', 'rate limit', 'quota', 'resource_exhausted']):
|
|
return ErrorCategory.API_ERROR
|
|
|
|
if 'invalid_argument' in error_str and '400' in error_str:
|
|
return ErrorCategory.API_ERROR
|
|
|
|
# Video errors (file format, encoding, corruption)
|
|
if any(x in error_str for x in ['moov atom', 'invalid data', 'codec', 'corrupted', 'duration']):
|
|
return ErrorCategory.VIDEO_ERROR
|
|
|
|
if any(x in error_str for x in ['video file', 'format', 'encoding']):
|
|
return ErrorCategory.VIDEO_ERROR
|
|
|
|
# Network errors
|
|
if any(x in error_str for x in ['connection', 'timeout', 'network', 'dns', 'resolve']):
|
|
return ErrorCategory.NETWORK_ERROR
|
|
|
|
if any(x in exc_type for x in ['connectionerror', 'timeout']):
|
|
return ErrorCategory.NETWORK_ERROR
|
|
|
|
# Upload errors
|
|
if any(x in error_str for x in ['upload', 'chunk', 'multipart']):
|
|
return ErrorCategory.UPLOAD_ERROR
|
|
|
|
if any(x in error_str for x in ['file size', 'too large', 'entity too large']):
|
|
return ErrorCategory.UPLOAD_ERROR
|
|
|
|
# User errors (invalid input, configuration)
|
|
if any(x in error_str for x in ['invalid', 'missing', 'required', 'must be']):
|
|
return ErrorCategory.USER_ERROR
|
|
|
|
# Default to unknown
|
|
return ErrorCategory.UNKNOWN_ERROR
|
|
|
|
@staticmethod
|
|
def _generate_user_message(exception: Exception,
|
|
category: ErrorCategory) -> str:
|
|
"""
|
|
Generate user-friendly error message based on category.
|
|
|
|
Args:
|
|
exception: The exception
|
|
category: Error category
|
|
|
|
Returns:
|
|
User-friendly error message
|
|
"""
|
|
error_str = str(exception)
|
|
|
|
if category == ErrorCategory.SYSTEM_ERROR:
|
|
if 'ffprobe' in error_str or 'ffmpeg' in error_str:
|
|
return "System dependency missing: FFmpeg/FFprobe is not installed or not accessible"
|
|
elif 'wkhtmltopdf' in error_str:
|
|
return "System dependency missing: wkhtmltopdf is not installed or not accessible"
|
|
else:
|
|
return f"System configuration issue: {error_str}"
|
|
|
|
elif category == ErrorCategory.API_ERROR:
|
|
if '503' in error_str or 'unavailable' in error_str.lower():
|
|
return "Gemini API is temporarily overloaded or unavailable"
|
|
elif '429' in error_str or 'rate limit' in error_str.lower():
|
|
return "API rate limit exceeded - too many requests sent too quickly"
|
|
elif '500' in error_str:
|
|
return "Gemini API internal server error"
|
|
else:
|
|
return f"API service error: {error_str}"
|
|
|
|
elif category == ErrorCategory.VIDEO_ERROR:
|
|
if 'moov atom' in error_str.lower():
|
|
return "Video file is incomplete or corrupted (missing header data)"
|
|
elif 'duration' in error_str.lower():
|
|
return "Cannot determine video duration - file may be corrupted or unsupported format"
|
|
elif 'codec' in error_str.lower():
|
|
return "Video codec not supported or corrupted"
|
|
else:
|
|
return f"Video file processing error: {error_str}"
|
|
|
|
elif category == ErrorCategory.NETWORK_ERROR:
|
|
return f"Network connectivity issue: {error_str}"
|
|
|
|
elif category == ErrorCategory.UPLOAD_ERROR:
|
|
if 'too large' in error_str.lower():
|
|
return "File is too large for upload (maximum 5GB)"
|
|
else:
|
|
return f"File upload error: {error_str}"
|
|
|
|
elif category == ErrorCategory.USER_ERROR:
|
|
return f"Invalid input or configuration: {error_str}"
|
|
|
|
else:
|
|
return f"Unexpected error: {error_str}"
|
|
|
|
@staticmethod
|
|
def _suggest_fix(exception: Exception, category: ErrorCategory,
|
|
context: Dict) -> str:
|
|
"""
|
|
Generate suggested fix based on error type.
|
|
|
|
Args:
|
|
exception: The exception
|
|
category: Error category
|
|
context: Additional context
|
|
|
|
Returns:
|
|
Suggested fix or troubleshooting steps
|
|
"""
|
|
error_str = str(exception).lower()
|
|
|
|
# System errors - installation instructions
|
|
if 'ffprobe' in error_str or 'ffmpeg' in error_str:
|
|
system = platform.system().lower()
|
|
if 'darwin' in system:
|
|
return (
|
|
"Install FFmpeg using Homebrew:\n"
|
|
" brew install ffmpeg\n\n"
|
|
"Then restart the application."
|
|
)
|
|
else:
|
|
return (
|
|
"Install FFmpeg:\n"
|
|
" Ubuntu/Debian: sudo apt-get install ffmpeg\n"
|
|
" CentOS/RHEL: sudo yum install ffmpeg\n\n"
|
|
"Then restart the application."
|
|
)
|
|
|
|
if 'wkhtmltopdf' in error_str:
|
|
system = platform.system().lower()
|
|
if 'darwin' in system:
|
|
return "Install wkhtmltopdf: brew install wkhtmltopdf"
|
|
else:
|
|
return "Install wkhtmltopdf: sudo apt-get install wkhtmltopdf"
|
|
|
|
# API errors - retry and configuration
|
|
if '503' in error_str or 'overload' in error_str:
|
|
return (
|
|
"The API is temporarily overloaded. The system will automatically retry.\n"
|
|
"If this persists:\n"
|
|
" 1. Wait a few minutes and try again\n"
|
|
" 2. Reduce parallel processing: set MAX_PARALLEL_CHUNKS=1 in .env\n"
|
|
" 3. Set GEMINI_API_TIER=free in .env for conservative rate limiting"
|
|
)
|
|
|
|
if '429' in error_str or 'rate limit' in error_str:
|
|
return (
|
|
"Rate limit exceeded. To fix:\n"
|
|
" 1. Set GEMINI_API_TIER=free in backend/.env\n"
|
|
" 2. Set MAX_PARALLEL_CHUNKS=1 in backend/.env\n"
|
|
" 3. Wait a few minutes before trying again\n"
|
|
" 4. Consider upgrading to paid API tier for higher limits"
|
|
)
|
|
|
|
if '400' in error_str and 'invalid_argument' in error_str:
|
|
return (
|
|
"Invalid request to Gemini API. Possible causes:\n"
|
|
" 1. Video file may be corrupted or in unsupported format\n"
|
|
" 2. Video duration may be too short (<1 second)\n"
|
|
" 3. Video file size may exceed limits\n"
|
|
"Check the logs for more details about what was rejected."
|
|
)
|
|
|
|
# Video errors - file issues
|
|
if 'moov atom' in error_str:
|
|
return (
|
|
"Video file is incomplete or corrupted:\n"
|
|
" 1. Try re-uploading the file\n"
|
|
" 2. If the issue persists, re-encode the video:\n"
|
|
" ffmpeg -i input.mp4 -c copy output.mp4\n"
|
|
" 3. Ensure the video file fully uploaded before processing"
|
|
)
|
|
|
|
if 'duration' in error_str and context.get('video_path'):
|
|
return (
|
|
"Cannot determine video duration:\n"
|
|
" 1. Check that the video file is not corrupted\n"
|
|
" 2. Try playing the video in a media player to verify\n"
|
|
" 3. Re-encode the video if necessary:\n"
|
|
" ffmpeg -i input.mp4 -c:v libx264 -c:a aac output.mp4"
|
|
)
|
|
|
|
# Network errors
|
|
if 'connection' in error_str or 'timeout' in error_str:
|
|
return (
|
|
"Network connectivity issue:\n"
|
|
" 1. Check your internet connection\n"
|
|
" 2. Verify firewall isn't blocking the application\n"
|
|
" 3. Try again in a few moments\n"
|
|
" 4. Check if Gemini API is accessible from your network"
|
|
)
|
|
|
|
# Upload errors
|
|
if 'too large' in error_str:
|
|
return (
|
|
"File exceeds maximum size (5GB):\n"
|
|
" 1. Compress the video to reduce file size\n"
|
|
" 2. Use a lower resolution or bitrate\n"
|
|
" 3. Split into smaller segments"
|
|
)
|
|
|
|
# Generic fallback
|
|
if category == ErrorCategory.UNKNOWN_ERROR:
|
|
return (
|
|
"Unexpected error occurred:\n"
|
|
" 1. Check the application logs for more details\n"
|
|
" 2. Try restarting the application\n"
|
|
" 3. Report this error with the Error ID if it persists"
|
|
)
|
|
|
|
return "Check the logs for more details or contact support with the Error ID."
|
|
|
|
@staticmethod
|
|
def _gather_system_info() -> Dict[str, str]:
|
|
"""
|
|
Gather system information for error context.
|
|
|
|
Returns:
|
|
Dictionary with system details
|
|
"""
|
|
return {
|
|
'platform': platform.system(),
|
|
'platform_release': platform.release(),
|
|
'platform_version': platform.version(),
|
|
'architecture': platform.machine(),
|
|
'python_version': platform.python_version(),
|
|
'hostname': platform.node(),
|
|
'processor': platform.processor() or 'unknown'
|
|
}
|
|
|
|
@staticmethod
|
|
def get_recent_errors(limit: int = 10) -> list:
|
|
"""
|
|
Get recent error reports.
|
|
|
|
Args:
|
|
limit: Maximum number of errors to return
|
|
|
|
Returns:
|
|
List of recent ErrorReport objects
|
|
"""
|
|
return ErrorReporter._recent_errors[-limit:]
|
|
|
|
@staticmethod
|
|
def find_error_by_id(error_id: str) -> Optional[ErrorReport]:
|
|
"""
|
|
Find error report by ID.
|
|
|
|
Args:
|
|
error_id: Error ID to search for
|
|
|
|
Returns:
|
|
ErrorReport if found, None otherwise
|
|
"""
|
|
for error in ErrorReporter._recent_errors:
|
|
if error.error_id == error_id:
|
|
return error
|
|
return None
|
|
|
|
@staticmethod
|
|
def export_errors_to_file(filepath: str, limit: Optional[int] = None):
|
|
"""
|
|
Export error reports to JSON file.
|
|
|
|
Args:
|
|
filepath: Path to output file
|
|
limit: Number of recent errors to export (None = all)
|
|
"""
|
|
errors_to_export = ErrorReporter._recent_errors[-limit:] if limit else ErrorReporter._recent_errors
|
|
errors_data = [error.to_dict() for error in errors_to_export]
|
|
|
|
with open(filepath, 'w') as f:
|
|
json.dump(errors_data, f, indent=2, default=str)
|
|
|
|
logger.info(f"Exported {len(errors_data)} error reports to {filepath}")
|
|
|
|
@staticmethod
|
|
def clear_errors():
|
|
"""Clear all stored error reports."""
|
|
ErrorReporter._recent_errors.clear()
|
|
logger.info("Cleared all error reports")
|
|
|
|
|
|
# Module-level convenience functions
|
|
def capture_error(exception: Exception, **kwargs) -> ErrorReport:
|
|
"""
|
|
Convenience function for capturing errors.
|
|
|
|
Args:
|
|
exception: The exception to capture
|
|
**kwargs: Additional arguments for ErrorReporter.capture_error()
|
|
|
|
Returns:
|
|
ErrorReport instance
|
|
"""
|
|
return ErrorReporter.capture_error(exception, **kwargs)
|
|
|
|
|
|
def get_recent_errors(limit: int = 10) -> list:
|
|
"""Get recent errors (convenience function)."""
|
|
return ErrorReporter.get_recent_errors(limit)
|
|
|
|
|
|
def find_error_by_id(error_id: str) -> Optional[ErrorReport]:
|
|
"""Find error by ID (convenience function)."""
|
|
return ErrorReporter.find_error_by_id(error_id)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
"""Test the error reporter."""
|
|
print("="*80)
|
|
print("Error Reporter Test")
|
|
print("="*80)
|
|
|
|
# Test different error types
|
|
test_errors = [
|
|
(FileNotFoundError("ffprobe not found at /usr/bin/ffprobe"), "System Error"),
|
|
(Exception("503 UNAVAILABLE: Model overloaded"), "API Error"),
|
|
(Exception("moov atom not found"), "Video Error"),
|
|
(ConnectionError("Connection timeout"), "Network Error"),
|
|
]
|
|
|
|
for exception, description in test_errors:
|
|
print(f"\n--- Testing: {description} ---")
|
|
try:
|
|
raise exception
|
|
except Exception as e:
|
|
report = capture_error(e, context={'test': description})
|
|
print(report.format_user_message())
|
|
|
|
print("\n" + "="*80)
|
|
print(f"Total errors captured: {len(get_recent_errors())}")
|
|
print("="*80)
|