"""Video utility functions""" import subprocess import json from typing import Optional, Dict, Any def extract_video_metadata(file_path: str) -> Dict[str, Any]: """Extract video metadata using ffprobe Args: file_path: Path to the video file Returns: Dictionary containing: - duration_seconds: Video duration in seconds - width: Video width in pixels - height: Video height in pixels - fps: Frames per second - codec: Video codec name - bitrate: Video bitrate Returns empty dict if extraction fails. """ try: result = subprocess.run([ 'ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', file_path ], capture_output=True, text=True, timeout=30) if result.returncode != 0: print(f"ffprobe failed with return code {result.returncode}") return {} data = json.loads(result.stdout) # Initialize metadata dict metadata = { 'duration_seconds': None, 'width': None, 'height': None, 'fps': None, 'codec': None, 'bitrate': None } # Get duration from format if 'format' in data and 'duration' in data['format']: metadata['duration_seconds'] = float(data['format']['duration']) if 'format' in data and 'bit_rate' in data['format']: metadata['bitrate'] = int(data['format']['bit_rate']) # Get video stream info for stream in data.get('streams', []): if stream.get('codec_type') == 'video': metadata['width'] = stream.get('width') metadata['height'] = stream.get('height') metadata['codec'] = stream.get('codec_name') # Calculate FPS from r_frame_rate if 'r_frame_rate' in stream: try: num, den = map(int, stream['r_frame_rate'].split('/')) if den > 0: metadata['fps'] = num / den except (ValueError, ZeroDivisionError): pass # Some videos have avg_frame_rate instead if not metadata['fps'] and 'avg_frame_rate' in stream: try: num, den = map(int, stream['avg_frame_rate'].split('/')) if den > 0: metadata['fps'] = num / den except (ValueError, ZeroDivisionError): pass break # Use first video stream return metadata except subprocess.TimeoutExpired: print(f"ffprobe timed out for file: {file_path}") return {} except FileNotFoundError: print("ffprobe not found. Please ensure ffmpeg is installed.") return {} except json.JSONDecodeError: print(f"Failed to parse ffprobe output for file: {file_path}") return {} except Exception as e: print(f"Failed to extract video metadata: {e}") return {} def format_duration(seconds: float) -> str: """Format duration in seconds to HH:MM:SS Args: seconds: Duration in seconds Returns: Formatted duration string """ hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) if hours > 0: return f"{hours:02d}:{minutes:02d}:{secs:02d}" else: return f"{minutes:02d}:{secs:02d}" def generate_video_thumbnail(video_path: str, output_path: str, timestamp: float = 1.0) -> bool: """Generate a thumbnail from a video file at specified timestamp Args: video_path: Path to input video output_path: Path to save thumbnail (should end in .jpg or .png) timestamp: Time in seconds to extract frame from (default: 1.0) Returns: True if successful, False otherwise """ import os import logging logger = logging.getLogger(__name__) try: # Ensure output directory exists os.makedirs(os.path.dirname(output_path), exist_ok=True) cmd = [ 'ffmpeg', '-y', # Overwrite output file '-ss', str(timestamp), # Seek to timestamp '-i', video_path, '-vframes', '1', # Extract 1 frame '-vf', 'scale=320:-1', # Scale to 320px width, maintain aspect ratio '-q:v', '2', # High quality output_path ] result = subprocess.run( cmd, capture_output=True, text=True, timeout=30 ) if result.returncode == 0 and os.path.exists(output_path): logger.info(f"Generated thumbnail: {output_path}") return True else: logger.error(f"FFmpeg thumbnail generation failed: {result.stderr}") return False except Exception as e: logger.error(f"Failed to generate video thumbnail: {e}") return False