feat: add dedicated ffmpeg queue to prevent server overload

Add a dedicated Celery queue (ffmpeg) with concurrency=1 to serialize
all FFmpeg operations. This prevents CPU spikes when multiple render
tasks run in parallel with multiple languages.

Changes:
- Add ffmpeg_operations.py with run_ffmpeg_command and run_ffprobe_command tasks
- Update VideoRendererService to dispatch ffmpeg commands via the queue
- Add ffmpeg-worker service to docker-compose with --concurrency=1
- Configure main worker to exclude the ffmpeg queue

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
michael 2025-12-26 17:56:23 -06:00
parent eaabf9cef6
commit bf1c321088
5 changed files with 303 additions and 46 deletions

View file

@ -1,8 +1,12 @@
"""Service for rendering accessible video with embedded audio descriptions using ffmpeg."""
"""Service for rendering accessible video with embedded audio descriptions using ffmpeg.
FFmpeg operations are dispatched to a dedicated Celery queue (ffmpeg) with concurrency=1
to prevent server overload when multiple render tasks run in parallel.
"""
import asyncio
import json
import os
import subprocess
import tempfile
from pathlib import Path
from typing import Any
@ -14,6 +18,11 @@ from ..schemas.accessible_video import AccessibleVideoMethod, GeminiAccessibleVi
logger = get_logger(__name__)
class FFmpegExecutionError(Exception):
"""Raised when an FFmpeg/FFprobe command fails."""
pass
class VideoRendererService:
"""Service for rendering accessible video with embedded audio descriptions."""
@ -24,6 +33,80 @@ class VideoRendererService:
self.duck_level = getattr(settings, 'accessible_video_duck_level', 0.3)
self.duck_fade_ms = getattr(settings, 'accessible_video_duck_fade_ms', 200)
async def _dispatch_ffmpeg(self, cmd: list[str], timeout: int = 3600) -> dict[str, Any]:
"""
Dispatch FFmpeg command to the dedicated ffmpeg queue and wait for result.
This method bridges the async render task with the sync Celery task.
Uses apply_async and polls for completion to avoid blocking the event loop.
Args:
cmd: FFmpeg command list
timeout: Command timeout in seconds
Returns:
dict with 'success', 'stdout', 'stderr', 'returncode'
Raises:
FFmpegExecutionError: If the command fails
"""
from ..tasks.ffmpeg_operations import run_ffmpeg_command
# Dispatch to ffmpeg queue
task_result = run_ffmpeg_command.apply_async(
args=[cmd, timeout],
queue='ffmpeg'
)
# Poll for result with async sleep to avoid blocking
while not task_result.ready():
await asyncio.sleep(0.5)
# Get result (should be ready, short timeout for safety)
result = task_result.get(timeout=30)
if not result['success']:
raise FFmpegExecutionError(
f"FFmpeg failed with code {result['returncode']}: {result['stderr'][:500]}"
)
return result
async def _dispatch_ffprobe(self, cmd: list[str]) -> dict[str, Any]:
"""
Dispatch FFprobe command to the dedicated ffmpeg queue and wait for result.
Args:
cmd: FFprobe command list
Returns:
dict with 'success', 'stdout', 'stderr', 'returncode'
Raises:
FFmpegExecutionError: If the command fails
"""
from ..tasks.ffmpeg_operations import run_ffprobe_command
# Dispatch to ffmpeg queue
task_result = run_ffprobe_command.apply_async(
args=[cmd],
queue='ffmpeg'
)
# Poll for result with async sleep (shorter interval for fast probe)
while not task_result.ready():
await asyncio.sleep(0.2)
# Get result
result = task_result.get(timeout=30)
if not result['success']:
raise FFmpegExecutionError(
f"FFprobe failed with code {result['returncode']}: {result['stderr'][:200]}"
)
return result
async def render_accessible_video(
self,
source_video_path: str,
@ -276,7 +359,7 @@ class VideoRendererService:
return output_path
async def _get_video_duration(self, video_path: str) -> float:
"""Get video duration in seconds using ffprobe."""
"""Get video duration in seconds using ffprobe via the ffmpeg queue."""
cmd = [
self.ffprobe_path,
"-v", "quiet",
@ -284,17 +367,11 @@ class VideoRendererService:
"-of", "default=noprint_wrappers=1:nokey=1",
video_path
]
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
check=True
)
return float(result.stdout.strip())
result = await self._dispatch_ffprobe(cmd)
return float(result['stdout'].strip())
async def _get_video_properties(self, video_path: str) -> dict[str, Any]:
"""Get detailed video and audio properties to ensure matching during concatenation."""
"""Get detailed video and audio properties via the ffmpeg queue."""
cmd = [
self.ffprobe_path,
"-v", "quiet",
@ -302,16 +379,8 @@ class VideoRendererService:
"-of", "json",
video_path
]
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
check=True
)
import json
data = json.loads(result.stdout)
result = await self._dispatch_ffprobe(cmd)
data = json.loads(result['stdout'])
# Defaults (44100 is common for MP3, but we detect from source)
props = {
@ -525,29 +594,9 @@ class VideoRendererService:
await self._run_ffmpeg(cmd)
async def _run_ffmpeg(self, cmd: list[str], timeout: int = 3600):
"""Run ffmpeg command with proper error handling."""
"""Run ffmpeg command via the dedicated ffmpeg queue."""
logger.debug(f"Running command: {' '.join(cmd)}")
try:
result = await asyncio.wait_for(
asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True
),
timeout=timeout
)
if result.returncode != 0:
logger.error(f"ffmpeg error: {result.stderr}")
raise RuntimeError(f"ffmpeg failed with code {result.returncode}: {result.stderr}")
return result
except asyncio.TimeoutError:
logger.error(f"ffmpeg command timed out after {timeout}s")
raise RuntimeError(f"ffmpeg command timed out after {timeout}s")
await self._dispatch_ffmpeg(cmd, timeout)
# Global service instance

View file

@ -29,6 +29,7 @@ celery_app.conf.update(
"app.tasks.translate_and_synthesize.*": {"queue": "default"},
"app.tasks.render_accessible_video.*": {"queue": "render"},
"app.tasks.notify.*": {"queue": "notify"},
"app.tasks.ffmpeg_operations.*": {"queue": "ffmpeg"},
},
task_default_queue="default",
task_create_missing_queues=True,
@ -120,6 +121,7 @@ def import_task_modules():
from . import translate_and_synthesize # noqa: E402, F401
from . import render_accessible_video # noqa: E402, F401
from . import notify # noqa: E402, F401
from . import ffmpeg_operations # noqa: E402, F401
logger.info("Successfully imported all task modules")
except Exception as e:
logger.error(f"Error importing task modules: {e}")

View file

@ -0,0 +1,127 @@
"""Celery tasks for FFmpeg operations - runs on dedicated ffmpeg queue with concurrency=1.
This module provides Celery tasks that execute FFmpeg/FFprobe commands on a dedicated
queue with limited concurrency, preventing server overload when multiple render tasks
run in parallel.
"""
import subprocess
from typing import Any
from ..core.logging import get_logger
from . import celery_app
logger = get_logger(__name__)
@celery_app.task(bind=True, queue='ffmpeg', time_limit=3600, soft_time_limit=3500)
def run_ffmpeg_command(self, cmd: list[str], timeout: int = 3600) -> dict[str, Any]:
"""
Execute an FFmpeg command on the dedicated ffmpeg queue.
This task runs with concurrency=1, ensuring only one FFmpeg operation
runs at a time across the entire system.
Args:
cmd: Command list (e.g., ['ffmpeg', '-y', '-i', 'input.mp4', ...])
timeout: Command timeout in seconds (default 3600 = 1 hour)
Returns:
dict with 'success', 'stdout', 'stderr', 'returncode'
"""
cmd_preview = ' '.join(cmd[:8]) + ('...' if len(cmd) > 8 else '')
logger.info(f"[FFmpeg Queue] Executing: {cmd_preview}")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
if result.returncode != 0:
logger.error(f"[FFmpeg Queue] Command failed (code {result.returncode}): {result.stderr[:500]}")
return {
'success': False,
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': result.returncode
}
logger.info(f"[FFmpeg Queue] Command completed successfully")
return {
'success': True,
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': 0
}
except subprocess.TimeoutExpired:
logger.error(f"[FFmpeg Queue] Command timed out after {timeout}s")
return {
'success': False,
'stdout': '',
'stderr': f'Command timed out after {timeout}s',
'returncode': -1
}
except Exception as e:
logger.error(f"[FFmpeg Queue] Execution error: {e}")
return {
'success': False,
'stdout': '',
'stderr': str(e),
'returncode': -1
}
@celery_app.task(bind=True, queue='ffmpeg', time_limit=120, soft_time_limit=110)
def run_ffprobe_command(self, cmd: list[str]) -> dict[str, Any]:
"""
Execute an FFprobe command on the dedicated ffmpeg queue.
Shorter timeout since ffprobe operations are typically fast (probing metadata).
Args:
cmd: Command list (e.g., ['ffprobe', '-v', 'quiet', ...])
Returns:
dict with 'success', 'stdout', 'stderr', 'returncode'
"""
cmd_preview = ' '.join(cmd[:6]) + ('...' if len(cmd) > 6 else '')
logger.debug(f"[FFmpeg Queue] Executing ffprobe: {cmd_preview}")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60
)
if result.returncode != 0:
logger.error(f"[FFmpeg Queue] FFprobe failed (code {result.returncode}): {result.stderr[:200]}")
return {
'success': result.returncode == 0,
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': result.returncode
}
except subprocess.TimeoutExpired:
logger.error("[FFmpeg Queue] FFprobe timed out after 60s")
return {
'success': False,
'stdout': '',
'stderr': 'FFprobe timed out after 60s',
'returncode': -1
}
except Exception as e:
logger.error(f"[FFmpeg Queue] FFprobe error: {e}")
return {
'success': False,
'stdout': '',
'stderr': str(e),
'returncode': -1
}

View file

@ -25,12 +25,13 @@ logger.info("Starting Celery worker with structured logging")
from app.tasks import ingest_and_ai
from app.tasks import translate_and_synthesize
from app.tasks import render_accessible_video
from app.tasks import ffmpeg_operations
# Debug: Show registered tasks
logger.info(f"Celery app: {celery_app}")
logger.info(f"Registered tasks: {list(celery_app.tasks.keys())}")
logger.info(f"Task routes: {celery_app.conf.task_routes}")
logger.info(f"Worker listening to queues: default,ingest,notify,render")
logger.info(f"Worker listening to queues: default,ingest,notify,render,ffmpeg")
# Specifically check for our translation task
if 'app.tasks.translate_and_synthesize.translate_and_synthesize_task' in celery_app.tasks:

View file

@ -142,7 +142,7 @@ services:
max-file: "3"
# ---------------------------------------------------------------------------
# Celery Worker for Background Processing
# Celery Worker for Background Processing (excludes ffmpeg queue)
# ---------------------------------------------------------------------------
worker:
build:
@ -151,6 +151,7 @@ services:
target: worker
container_name: accessible-video-worker
restart: unless-stopped
command: ["celery", "-A", "celery_worker", "worker", "-Q", "default,ingest,notify,render", "--loglevel=info", "--concurrency=4"]
depends_on:
mongodb:
condition: service_healthy
@ -215,6 +216,81 @@ services:
max-size: "10m"
max-file: "3"
# ---------------------------------------------------------------------------
# FFmpeg Worker - Dedicated worker for video encoding (concurrency=1)
# ---------------------------------------------------------------------------
ffmpeg-worker:
build:
context: ./backend
dockerfile: Dockerfile
target: worker
container_name: accessible-video-ffmpeg-worker
restart: unless-stopped
command: ["celery", "-A", "celery_worker", "worker", "-Q", "ffmpeg", "--loglevel=info", "--concurrency=1"]
depends_on:
mongodb:
condition: service_healthy
redis:
condition: service_healthy
environment:
# App configuration
APP_ENV: ${APP_ENV:-dev}
# Auth (required by Settings class even though worker doesn't use it)
JWT_SECRET: ${JWT_SECRET}
JWT_ALG: ${JWT_ALG:-HS256}
JWT_ACCESS_TTL_MIN: ${JWT_ACCESS_TTL_MIN:-240}
JWT_REFRESH_TTL_DAYS: ${JWT_REFRESH_TTL_DAYS:-7}
COOKIE_DOMAIN: ${COOKIE_DOMAIN:-ai-sandbox.oliver.solutions}
COOKIE_SECURE: ${COOKIE_SECURE:-true}
COOKIE_SAMESITE: ${COOKIE_SAMESITE:-Lax}
# Database
MONGODB_URI: mongodb://mongodb:27017/${MONGODB_DB:-accessible_video}
MONGODB_DB: ${MONGODB_DB:-accessible_video}
# Redis
REDIS_URL: redis://redis:6379/0
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/0
# GCP
GCP_PROJECT_ID: ${GCP_PROJECT_ID}
GCS_BUCKET: ${GCS_BUCKET:-accessible-video}
GOOGLE_APPLICATION_CREDENTIALS: /secrets/gcp-credentials.json
# AI Services
GEMINI_API_KEY: ${GEMINI_API_KEY}
TRANSLATE_API_KEY: ${TRANSLATE_API_KEY:-}
ELEVENLABS_API_KEY: ${ELEVENLABS_API_KEY:-}
GOOGLE_TTS_CREDENTIALS: /secrets/gcp-credentials.json
# Email
SENDGRID_API_KEY: ${SENDGRID_API_KEY:-}
EMAIL_FROM: ${EMAIL_FROM:-noreply@ai-sandbox.oliver.solutions}
CLIENT_BASE_URL: ${CLIENT_BASE_URL:-https://ai-sandbox.oliver.solutions/video-accessibility}
# Microsoft Authentication
AZURE_CLIENT_ID: ${AZURE_CLIENT_ID:-}
AZURE_AUTHORITY: ${AZURE_AUTHORITY:-}
AZURE_REDIRECT_URI: ${AZURE_REDIRECT_URI:-}
# CORS
CORS_ORIGINS: ${CORS_ORIGINS:-http://localhost:6001,http://localhost:5173,http://localhost:3000}
# Observability
SENTRY_DSN: ${SENTRY_DSN:-}
volumes:
- ./secrets:/secrets:ro
- ffmpeg-worker-logs:/app/logs
networks:
- accessible-video-network
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
# =============================================================================
# Networks
# =============================================================================
@ -237,3 +313,5 @@ volumes:
name: accessible-video-api-logs
worker-logs:
name: accessible-video-worker-logs
ffmpeg-worker-logs:
name: accessible-video-ffmpeg-worker-logs