""" Video processing module using FFmpeg Handles conversion based on platform specifications """ import ffmpeg import os import subprocess from platform_specs import PLATFORM_SPECS class VideoProcessor: def __init__(self, input_path): self.input_path = input_path self.probe_data = None def probe_video(self): """Get video metadata using ffprobe""" try: probe = ffmpeg.probe(self.input_path) self.probe_data = probe video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) audio_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None) return { 'duration': float(probe['format']['duration']), 'size': int(probe['format']['size']), 'bitrate': int(probe['format']['bit_rate']) // 1000, # Convert to kbps 'width': int(video_stream['width']) if video_stream else None, 'height': int(video_stream['height']) if video_stream else None, 'codec': video_stream['codec_name'] if video_stream else None, 'has_audio': audio_stream is not None } except Exception as e: raise Exception(f"Error probing video: {str(e)}") def convert_video(self, platform, aspect_ratio, output_path, custom_bitrate=None): """ Convert video based on platform specifications Args: platform: Platform key (e.g., 'tiktok', 'meta') aspect_ratio: Aspect ratio (e.g., '1:1', '16:9') output_path: Path for output file custom_bitrate: Optional custom bitrate override Returns: dict: Conversion results including output path and stats """ if platform not in PLATFORM_SPECS: raise ValueError(f"Unknown platform: {platform}") platform_info = PLATFORM_SPECS[platform] # Find matching format format_spec = None for fmt in platform_info['formats']: if fmt['ratio'] == aspect_ratio: format_spec = fmt break if not format_spec: raise ValueError(f"Aspect ratio {aspect_ratio} not supported for {platform}") # Get conversion parameters codec = platform_info['codec'] size = format_spec['size'] bitrate = custom_bitrate if custom_bitrate else format_spec['bitrate'] audio_bitrate = format_spec['audio'] audio_codec = format_spec.get('audio_codec', 'aac') try: # Build FFmpeg command input_stream = ffmpeg.input(self.input_path) # Video encoding parameters video_params = { 'vcodec': codec, 'b:v': bitrate, 's': size, } # Add codec-specific parameters if codec == 'libx264': video_params.update({ 'preset': 'medium', 'crf': 23, 'profile:v': 'main', 'pix_fmt': 'yuv420p' }) elif codec == 'libx265': video_params.update({ 'preset': 'medium', 'crf': 28, 'pix_fmt': 'yuv420p', 'x265-params': 'log-level=error' }) elif codec == 'libvpx-vp9': video_params.update({ 'deadline': 'good', 'cpu-used': 2, 'row-mt': 1 }) # Audio encoding parameters audio_params = { 'acodec': audio_codec, 'b:a': audio_bitrate } # Build and execute FFmpeg command output_stream = ffmpeg.output( input_stream, output_path, **video_params, **audio_params ) # Overwrite output file if exists output_stream = ffmpeg.overwrite_output(output_stream) # Run the conversion ffmpeg.run(output_stream, capture_stdout=True, capture_stderr=True) # Get output file stats output_size = os.path.getsize(output_path) # Probe output file for verification output_probe = ffmpeg.probe(output_path) output_duration = float(output_probe['format']['duration']) return { 'success': True, 'output_path': output_path, 'output_size': output_size, 'duration': output_duration, 'platform': platform, 'aspect_ratio': aspect_ratio, 'resolution': size, 'codec': codec, 'bitrate': bitrate } except ffmpeg.Error as e: error_message = e.stderr.decode() if e.stderr else str(e) raise Exception(f"FFmpeg conversion error: {error_message}") def get_video_info(self): """Get formatted video information""" if not self.probe_data: self.probe_video() info = self.probe_video() aspect_ratio = self._calculate_aspect_ratio(info['width'], info['height']) return { **info, 'aspect_ratio': aspect_ratio } def _calculate_aspect_ratio(self, width, height): """Calculate aspect ratio from width and height""" from math import gcd divisor = gcd(width, height) ratio_width = width // divisor ratio_height = height // divisor # Map to common aspect ratios ratio_map = { (1, 1): '1:1', (16, 9): '16:9', (9, 16): '9:16', (4, 5): '4:5', (5, 4): '5:4', (2, 3): '2:3', (3, 2): '3:2' } return ratio_map.get((ratio_width, ratio_height), f"{ratio_width}:{ratio_height}") @staticmethod def check_ffmpeg_installed(): """Check if FFmpeg is installed and accessible""" try: result = subprocess.run( ['ffmpeg', '-version'], capture_output=True, text=True, timeout=5 ) return result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError): return False