Add a dedicated Celery queue (ffmpeg) with concurrency=1 to serialize all FFmpeg operations. This prevents CPU spikes when multiple render tasks run in parallel with multiple languages. Changes: - Add ffmpeg_operations.py with run_ffmpeg_command and run_ffprobe_command tasks - Update VideoRendererService to dispatch ffmpeg commands via the queue - Add ffmpeg-worker service to docker-compose with --concurrency=1 - Configure main worker to exclude the ffmpeg queue 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
127 lines
3.9 KiB
Python
127 lines
3.9 KiB
Python
"""Celery tasks for FFmpeg operations - runs on dedicated ffmpeg queue with concurrency=1.
|
|
|
|
This module provides Celery tasks that execute FFmpeg/FFprobe commands on a dedicated
|
|
queue with limited concurrency, preventing server overload when multiple render tasks
|
|
run in parallel.
|
|
"""
|
|
|
|
import subprocess
|
|
from typing import Any
|
|
|
|
from ..core.logging import get_logger
|
|
from . import celery_app
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
@celery_app.task(bind=True, queue='ffmpeg', time_limit=3600, soft_time_limit=3500)
|
|
def run_ffmpeg_command(self, cmd: list[str], timeout: int = 3600) -> dict[str, Any]:
|
|
"""
|
|
Execute an FFmpeg command on the dedicated ffmpeg queue.
|
|
|
|
This task runs with concurrency=1, ensuring only one FFmpeg operation
|
|
runs at a time across the entire system.
|
|
|
|
Args:
|
|
cmd: Command list (e.g., ['ffmpeg', '-y', '-i', 'input.mp4', ...])
|
|
timeout: Command timeout in seconds (default 3600 = 1 hour)
|
|
|
|
Returns:
|
|
dict with 'success', 'stdout', 'stderr', 'returncode'
|
|
"""
|
|
cmd_preview = ' '.join(cmd[:8]) + ('...' if len(cmd) > 8 else '')
|
|
logger.info(f"[FFmpeg Queue] Executing: {cmd_preview}")
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
logger.error(f"[FFmpeg Queue] Command failed (code {result.returncode}): {result.stderr[:500]}")
|
|
return {
|
|
'success': False,
|
|
'stdout': result.stdout,
|
|
'stderr': result.stderr,
|
|
'returncode': result.returncode
|
|
}
|
|
|
|
logger.info(f"[FFmpeg Queue] Command completed successfully")
|
|
return {
|
|
'success': True,
|
|
'stdout': result.stdout,
|
|
'stderr': result.stderr,
|
|
'returncode': 0
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
logger.error(f"[FFmpeg Queue] Command timed out after {timeout}s")
|
|
return {
|
|
'success': False,
|
|
'stdout': '',
|
|
'stderr': f'Command timed out after {timeout}s',
|
|
'returncode': -1
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"[FFmpeg Queue] Execution error: {e}")
|
|
return {
|
|
'success': False,
|
|
'stdout': '',
|
|
'stderr': str(e),
|
|
'returncode': -1
|
|
}
|
|
|
|
|
|
@celery_app.task(bind=True, queue='ffmpeg', time_limit=120, soft_time_limit=110)
|
|
def run_ffprobe_command(self, cmd: list[str]) -> dict[str, Any]:
|
|
"""
|
|
Execute an FFprobe command on the dedicated ffmpeg queue.
|
|
|
|
Shorter timeout since ffprobe operations are typically fast (probing metadata).
|
|
|
|
Args:
|
|
cmd: Command list (e.g., ['ffprobe', '-v', 'quiet', ...])
|
|
|
|
Returns:
|
|
dict with 'success', 'stdout', 'stderr', 'returncode'
|
|
"""
|
|
cmd_preview = ' '.join(cmd[:6]) + ('...' if len(cmd) > 6 else '')
|
|
logger.debug(f"[FFmpeg Queue] Executing ffprobe: {cmd_preview}")
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
logger.error(f"[FFmpeg Queue] FFprobe failed (code {result.returncode}): {result.stderr[:200]}")
|
|
|
|
return {
|
|
'success': result.returncode == 0,
|
|
'stdout': result.stdout,
|
|
'stderr': result.stderr,
|
|
'returncode': result.returncode
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
logger.error("[FFmpeg Queue] FFprobe timed out after 60s")
|
|
return {
|
|
'success': False,
|
|
'stdout': '',
|
|
'stderr': 'FFprobe timed out after 60s',
|
|
'returncode': -1
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"[FFmpeg Queue] FFprobe error: {e}")
|
|
return {
|
|
'success': False,
|
|
'stdout': '',
|
|
'stderr': str(e),
|
|
'returncode': -1
|
|
}
|