"""Celery tasks for FFmpeg operations - runs on dedicated ffmpeg queue with concurrency=1. This module provides Celery tasks that execute FFmpeg/FFprobe commands on a dedicated queue with limited concurrency, preventing server overload when multiple render tasks run in parallel. """ import subprocess from typing import Any from ..core.logging import get_logger from . import celery_app logger = get_logger(__name__) @celery_app.task(bind=True, queue='ffmpeg', time_limit=3600, soft_time_limit=3500) def run_ffmpeg_command(self, cmd: list[str], timeout: int = 3600) -> dict[str, Any]: """ Execute an FFmpeg command on the dedicated ffmpeg queue. This task runs with concurrency=1, ensuring only one FFmpeg operation runs at a time across the entire system. Args: cmd: Command list (e.g., ['ffmpeg', '-y', '-i', 'input.mp4', ...]) timeout: Command timeout in seconds (default 3600 = 1 hour) Returns: dict with 'success', 'stdout', 'stderr', 'returncode' """ cmd_preview = ' '.join(cmd[:8]) + ('...' if len(cmd) > 8 else '') logger.info(f"[FFmpeg Queue] Executing: {cmd_preview}") try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) if result.returncode != 0: logger.error(f"[FFmpeg Queue] Command failed (code {result.returncode}): {result.stderr[:500]}") return { 'success': False, 'stdout': result.stdout, 'stderr': result.stderr, 'returncode': result.returncode } logger.info(f"[FFmpeg Queue] Command completed successfully") return { 'success': True, 'stdout': result.stdout, 'stderr': result.stderr, 'returncode': 0 } except subprocess.TimeoutExpired: logger.error(f"[FFmpeg Queue] Command timed out after {timeout}s") return { 'success': False, 'stdout': '', 'stderr': f'Command timed out after {timeout}s', 'returncode': -1 } except Exception as e: logger.error(f"[FFmpeg Queue] Execution error: {e}") return { 'success': False, 'stdout': '', 'stderr': str(e), 'returncode': -1 } @celery_app.task(bind=True, queue='ffmpeg', time_limit=120, soft_time_limit=110) def run_ffprobe_command(self, cmd: list[str]) -> dict[str, Any]: """ Execute an FFprobe command on the dedicated ffmpeg queue. Shorter timeout since ffprobe operations are typically fast (probing metadata). Args: cmd: Command list (e.g., ['ffprobe', '-v', 'quiet', ...]) Returns: dict with 'success', 'stdout', 'stderr', 'returncode' """ cmd_preview = ' '.join(cmd[:6]) + ('...' if len(cmd) > 6 else '') logger.debug(f"[FFmpeg Queue] Executing ffprobe: {cmd_preview}") try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=60 ) if result.returncode != 0: logger.error(f"[FFmpeg Queue] FFprobe failed (code {result.returncode}): {result.stderr[:200]}") return { 'success': result.returncode == 0, 'stdout': result.stdout, 'stderr': result.stderr, 'returncode': result.returncode } except subprocess.TimeoutExpired: logger.error("[FFmpeg Queue] FFprobe timed out after 60s") return { 'success': False, 'stdout': '', 'stderr': 'FFprobe timed out after 60s', 'returncode': -1 } except Exception as e: logger.error(f"[FFmpeg Queue] FFprobe error: {e}") return { 'success': False, 'stdout': '', 'stderr': str(e), 'returncode': -1 }