video-accessibility/backend/app/tasks/ffmpeg_operations.py
michael bf1c321088 feat: add dedicated ffmpeg queue to prevent server overload
Add a dedicated Celery queue (ffmpeg) with concurrency=1 to serialize
all FFmpeg operations. This prevents CPU spikes when multiple render
tasks run in parallel with multiple languages.

Changes:
- Add ffmpeg_operations.py with run_ffmpeg_command and run_ffprobe_command tasks
- Update VideoRendererService to dispatch ffmpeg commands via the queue
- Add ffmpeg-worker service to docker-compose with --concurrency=1
- Configure main worker to exclude the ffmpeg queue

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-26 17:56:23 -06:00

127 lines
3.9 KiB
Python

"""Celery tasks for FFmpeg operations - runs on dedicated ffmpeg queue with concurrency=1.
This module provides Celery tasks that execute FFmpeg/FFprobe commands on a dedicated
queue with limited concurrency, preventing server overload when multiple render tasks
run in parallel.
"""
import subprocess
from typing import Any
from ..core.logging import get_logger
from . import celery_app
logger = get_logger(__name__)
@celery_app.task(bind=True, queue='ffmpeg', time_limit=3600, soft_time_limit=3500)
def run_ffmpeg_command(self, cmd: list[str], timeout: int = 3600) -> dict[str, Any]:
"""
Execute an FFmpeg command on the dedicated ffmpeg queue.
This task runs with concurrency=1, ensuring only one FFmpeg operation
runs at a time across the entire system.
Args:
cmd: Command list (e.g., ['ffmpeg', '-y', '-i', 'input.mp4', ...])
timeout: Command timeout in seconds (default 3600 = 1 hour)
Returns:
dict with 'success', 'stdout', 'stderr', 'returncode'
"""
cmd_preview = ' '.join(cmd[:8]) + ('...' if len(cmd) > 8 else '')
logger.info(f"[FFmpeg Queue] Executing: {cmd_preview}")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
if result.returncode != 0:
logger.error(f"[FFmpeg Queue] Command failed (code {result.returncode}): {result.stderr[:500]}")
return {
'success': False,
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': result.returncode
}
logger.info(f"[FFmpeg Queue] Command completed successfully")
return {
'success': True,
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': 0
}
except subprocess.TimeoutExpired:
logger.error(f"[FFmpeg Queue] Command timed out after {timeout}s")
return {
'success': False,
'stdout': '',
'stderr': f'Command timed out after {timeout}s',
'returncode': -1
}
except Exception as e:
logger.error(f"[FFmpeg Queue] Execution error: {e}")
return {
'success': False,
'stdout': '',
'stderr': str(e),
'returncode': -1
}
@celery_app.task(bind=True, queue='ffmpeg', time_limit=120, soft_time_limit=110)
def run_ffprobe_command(self, cmd: list[str]) -> dict[str, Any]:
"""
Execute an FFprobe command on the dedicated ffmpeg queue.
Shorter timeout since ffprobe operations are typically fast (probing metadata).
Args:
cmd: Command list (e.g., ['ffprobe', '-v', 'quiet', ...])
Returns:
dict with 'success', 'stdout', 'stderr', 'returncode'
"""
cmd_preview = ' '.join(cmd[:6]) + ('...' if len(cmd) > 6 else '')
logger.debug(f"[FFmpeg Queue] Executing ffprobe: {cmd_preview}")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60
)
if result.returncode != 0:
logger.error(f"[FFmpeg Queue] FFprobe failed (code {result.returncode}): {result.stderr[:200]}")
return {
'success': result.returncode == 0,
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': result.returncode
}
except subprocess.TimeoutExpired:
logger.error("[FFmpeg Queue] FFprobe timed out after 60s")
return {
'success': False,
'stdout': '',
'stderr': 'FFprobe timed out after 60s',
'returncode': -1
}
except Exception as e:
logger.error(f"[FFmpeg Queue] FFprobe error: {e}")
return {
'success': False,
'stdout': '',
'stderr': str(e),
'returncode': -1
}