diff --git a/backend/app/services/video_renderer.py b/backend/app/services/video_renderer.py index e47ec99..51cdab6 100644 --- a/backend/app/services/video_renderer.py +++ b/backend/app/services/video_renderer.py @@ -1,8 +1,12 @@ -"""Service for rendering accessible video with embedded audio descriptions using ffmpeg.""" +"""Service for rendering accessible video with embedded audio descriptions using ffmpeg. + +FFmpeg operations are dispatched to a dedicated Celery queue (ffmpeg) with concurrency=1 +to prevent server overload when multiple render tasks run in parallel. +""" import asyncio +import json import os -import subprocess import tempfile from pathlib import Path from typing import Any @@ -14,6 +18,11 @@ from ..schemas.accessible_video import AccessibleVideoMethod, GeminiAccessibleVi logger = get_logger(__name__) +class FFmpegExecutionError(Exception): + """Raised when an FFmpeg/FFprobe command fails.""" + pass + + class VideoRendererService: """Service for rendering accessible video with embedded audio descriptions.""" @@ -24,6 +33,80 @@ class VideoRendererService: self.duck_level = getattr(settings, 'accessible_video_duck_level', 0.3) self.duck_fade_ms = getattr(settings, 'accessible_video_duck_fade_ms', 200) + async def _dispatch_ffmpeg(self, cmd: list[str], timeout: int = 3600) -> dict[str, Any]: + """ + Dispatch FFmpeg command to the dedicated ffmpeg queue and wait for result. + + This method bridges the async render task with the sync Celery task. + Uses apply_async and polls for completion to avoid blocking the event loop. + + Args: + cmd: FFmpeg command list + timeout: Command timeout in seconds + + Returns: + dict with 'success', 'stdout', 'stderr', 'returncode' + + Raises: + FFmpegExecutionError: If the command fails + """ + from ..tasks.ffmpeg_operations import run_ffmpeg_command + + # Dispatch to ffmpeg queue + task_result = run_ffmpeg_command.apply_async( + args=[cmd, timeout], + queue='ffmpeg' + ) + + # Poll for result with async sleep to avoid blocking + while not task_result.ready(): + await asyncio.sleep(0.5) + + # Get result (should be ready, short timeout for safety) + result = task_result.get(timeout=30) + + if not result['success']: + raise FFmpegExecutionError( + f"FFmpeg failed with code {result['returncode']}: {result['stderr'][:500]}" + ) + + return result + + async def _dispatch_ffprobe(self, cmd: list[str]) -> dict[str, Any]: + """ + Dispatch FFprobe command to the dedicated ffmpeg queue and wait for result. + + Args: + cmd: FFprobe command list + + Returns: + dict with 'success', 'stdout', 'stderr', 'returncode' + + Raises: + FFmpegExecutionError: If the command fails + """ + from ..tasks.ffmpeg_operations import run_ffprobe_command + + # Dispatch to ffmpeg queue + task_result = run_ffprobe_command.apply_async( + args=[cmd], + queue='ffmpeg' + ) + + # Poll for result with async sleep (shorter interval for fast probe) + while not task_result.ready(): + await asyncio.sleep(0.2) + + # Get result + result = task_result.get(timeout=30) + + if not result['success']: + raise FFmpegExecutionError( + f"FFprobe failed with code {result['returncode']}: {result['stderr'][:200]}" + ) + + return result + async def render_accessible_video( self, source_video_path: str, @@ -276,7 +359,7 @@ class VideoRendererService: return output_path async def _get_video_duration(self, video_path: str) -> float: - """Get video duration in seconds using ffprobe.""" + """Get video duration in seconds using ffprobe via the ffmpeg queue.""" cmd = [ self.ffprobe_path, "-v", "quiet", @@ -284,17 +367,11 @@ class VideoRendererService: "-of", "default=noprint_wrappers=1:nokey=1", video_path ] - result = await asyncio.to_thread( - subprocess.run, - cmd, - capture_output=True, - text=True, - check=True - ) - return float(result.stdout.strip()) + result = await self._dispatch_ffprobe(cmd) + return float(result['stdout'].strip()) async def _get_video_properties(self, video_path: str) -> dict[str, Any]: - """Get detailed video and audio properties to ensure matching during concatenation.""" + """Get detailed video and audio properties via the ffmpeg queue.""" cmd = [ self.ffprobe_path, "-v", "quiet", @@ -302,16 +379,8 @@ class VideoRendererService: "-of", "json", video_path ] - result = await asyncio.to_thread( - subprocess.run, - cmd, - capture_output=True, - text=True, - check=True - ) - - import json - data = json.loads(result.stdout) + result = await self._dispatch_ffprobe(cmd) + data = json.loads(result['stdout']) # Defaults (44100 is common for MP3, but we detect from source) props = { @@ -525,29 +594,9 @@ class VideoRendererService: await self._run_ffmpeg(cmd) async def _run_ffmpeg(self, cmd: list[str], timeout: int = 3600): - """Run ffmpeg command with proper error handling.""" + """Run ffmpeg command via the dedicated ffmpeg queue.""" logger.debug(f"Running command: {' '.join(cmd)}") - - try: - result = await asyncio.wait_for( - asyncio.to_thread( - subprocess.run, - cmd, - capture_output=True, - text=True - ), - timeout=timeout - ) - - if result.returncode != 0: - logger.error(f"ffmpeg error: {result.stderr}") - raise RuntimeError(f"ffmpeg failed with code {result.returncode}: {result.stderr}") - - return result - - except asyncio.TimeoutError: - logger.error(f"ffmpeg command timed out after {timeout}s") - raise RuntimeError(f"ffmpeg command timed out after {timeout}s") + await self._dispatch_ffmpeg(cmd, timeout) # Global service instance diff --git a/backend/app/tasks/__init__.py b/backend/app/tasks/__init__.py index a3f0495..e2788b4 100644 --- a/backend/app/tasks/__init__.py +++ b/backend/app/tasks/__init__.py @@ -29,6 +29,7 @@ celery_app.conf.update( "app.tasks.translate_and_synthesize.*": {"queue": "default"}, "app.tasks.render_accessible_video.*": {"queue": "render"}, "app.tasks.notify.*": {"queue": "notify"}, + "app.tasks.ffmpeg_operations.*": {"queue": "ffmpeg"}, }, task_default_queue="default", task_create_missing_queues=True, @@ -120,6 +121,7 @@ def import_task_modules(): from . import translate_and_synthesize # noqa: E402, F401 from . import render_accessible_video # noqa: E402, F401 from . import notify # noqa: E402, F401 + from . import ffmpeg_operations # noqa: E402, F401 logger.info("Successfully imported all task modules") except Exception as e: logger.error(f"Error importing task modules: {e}") diff --git a/backend/app/tasks/ffmpeg_operations.py b/backend/app/tasks/ffmpeg_operations.py new file mode 100644 index 0000000..f1c36b7 --- /dev/null +++ b/backend/app/tasks/ffmpeg_operations.py @@ -0,0 +1,127 @@ +"""Celery tasks for FFmpeg operations - runs on dedicated ffmpeg queue with concurrency=1. + +This module provides Celery tasks that execute FFmpeg/FFprobe commands on a dedicated +queue with limited concurrency, preventing server overload when multiple render tasks +run in parallel. +""" + +import subprocess +from typing import Any + +from ..core.logging import get_logger +from . import celery_app + +logger = get_logger(__name__) + + +@celery_app.task(bind=True, queue='ffmpeg', time_limit=3600, soft_time_limit=3500) +def run_ffmpeg_command(self, cmd: list[str], timeout: int = 3600) -> dict[str, Any]: + """ + Execute an FFmpeg command on the dedicated ffmpeg queue. + + This task runs with concurrency=1, ensuring only one FFmpeg operation + runs at a time across the entire system. + + Args: + cmd: Command list (e.g., ['ffmpeg', '-y', '-i', 'input.mp4', ...]) + timeout: Command timeout in seconds (default 3600 = 1 hour) + + Returns: + dict with 'success', 'stdout', 'stderr', 'returncode' + """ + cmd_preview = ' '.join(cmd[:8]) + ('...' if len(cmd) > 8 else '') + logger.info(f"[FFmpeg Queue] Executing: {cmd_preview}") + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout + ) + + if result.returncode != 0: + logger.error(f"[FFmpeg Queue] Command failed (code {result.returncode}): {result.stderr[:500]}") + return { + 'success': False, + 'stdout': result.stdout, + 'stderr': result.stderr, + 'returncode': result.returncode + } + + logger.info(f"[FFmpeg Queue] Command completed successfully") + return { + 'success': True, + 'stdout': result.stdout, + 'stderr': result.stderr, + 'returncode': 0 + } + + except subprocess.TimeoutExpired: + logger.error(f"[FFmpeg Queue] Command timed out after {timeout}s") + return { + 'success': False, + 'stdout': '', + 'stderr': f'Command timed out after {timeout}s', + 'returncode': -1 + } + except Exception as e: + logger.error(f"[FFmpeg Queue] Execution error: {e}") + return { + 'success': False, + 'stdout': '', + 'stderr': str(e), + 'returncode': -1 + } + + +@celery_app.task(bind=True, queue='ffmpeg', time_limit=120, soft_time_limit=110) +def run_ffprobe_command(self, cmd: list[str]) -> dict[str, Any]: + """ + Execute an FFprobe command on the dedicated ffmpeg queue. + + Shorter timeout since ffprobe operations are typically fast (probing metadata). + + Args: + cmd: Command list (e.g., ['ffprobe', '-v', 'quiet', ...]) + + Returns: + dict with 'success', 'stdout', 'stderr', 'returncode' + """ + cmd_preview = ' '.join(cmd[:6]) + ('...' if len(cmd) > 6 else '') + logger.debug(f"[FFmpeg Queue] Executing ffprobe: {cmd_preview}") + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=60 + ) + + if result.returncode != 0: + logger.error(f"[FFmpeg Queue] FFprobe failed (code {result.returncode}): {result.stderr[:200]}") + + return { + 'success': result.returncode == 0, + 'stdout': result.stdout, + 'stderr': result.stderr, + 'returncode': result.returncode + } + + except subprocess.TimeoutExpired: + logger.error("[FFmpeg Queue] FFprobe timed out after 60s") + return { + 'success': False, + 'stdout': '', + 'stderr': 'FFprobe timed out after 60s', + 'returncode': -1 + } + except Exception as e: + logger.error(f"[FFmpeg Queue] FFprobe error: {e}") + return { + 'success': False, + 'stdout': '', + 'stderr': str(e), + 'returncode': -1 + } diff --git a/backend/celery_worker.py b/backend/celery_worker.py index b3c6a03..40e65b4 100644 --- a/backend/celery_worker.py +++ b/backend/celery_worker.py @@ -25,12 +25,13 @@ logger.info("Starting Celery worker with structured logging") from app.tasks import ingest_and_ai from app.tasks import translate_and_synthesize from app.tasks import render_accessible_video +from app.tasks import ffmpeg_operations # Debug: Show registered tasks logger.info(f"Celery app: {celery_app}") logger.info(f"Registered tasks: {list(celery_app.tasks.keys())}") logger.info(f"Task routes: {celery_app.conf.task_routes}") -logger.info(f"Worker listening to queues: default,ingest,notify,render") +logger.info(f"Worker listening to queues: default,ingest,notify,render,ffmpeg") # Specifically check for our translation task if 'app.tasks.translate_and_synthesize.translate_and_synthesize_task' in celery_app.tasks: diff --git a/docker-compose.yml b/docker-compose.yml index f5f39d3..69cd237 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -142,7 +142,7 @@ services: max-file: "3" # --------------------------------------------------------------------------- - # Celery Worker for Background Processing + # Celery Worker for Background Processing (excludes ffmpeg queue) # --------------------------------------------------------------------------- worker: build: @@ -151,6 +151,7 @@ services: target: worker container_name: accessible-video-worker restart: unless-stopped + command: ["celery", "-A", "celery_worker", "worker", "-Q", "default,ingest,notify,render", "--loglevel=info", "--concurrency=4"] depends_on: mongodb: condition: service_healthy @@ -215,6 +216,81 @@ services: max-size: "10m" max-file: "3" + # --------------------------------------------------------------------------- + # FFmpeg Worker - Dedicated worker for video encoding (concurrency=1) + # --------------------------------------------------------------------------- + ffmpeg-worker: + build: + context: ./backend + dockerfile: Dockerfile + target: worker + container_name: accessible-video-ffmpeg-worker + restart: unless-stopped + command: ["celery", "-A", "celery_worker", "worker", "-Q", "ffmpeg", "--loglevel=info", "--concurrency=1"] + depends_on: + mongodb: + condition: service_healthy + redis: + condition: service_healthy + environment: + # App configuration + APP_ENV: ${APP_ENV:-dev} + + # Auth (required by Settings class even though worker doesn't use it) + JWT_SECRET: ${JWT_SECRET} + JWT_ALG: ${JWT_ALG:-HS256} + JWT_ACCESS_TTL_MIN: ${JWT_ACCESS_TTL_MIN:-240} + JWT_REFRESH_TTL_DAYS: ${JWT_REFRESH_TTL_DAYS:-7} + COOKIE_DOMAIN: ${COOKIE_DOMAIN:-ai-sandbox.oliver.solutions} + COOKIE_SECURE: ${COOKIE_SECURE:-true} + COOKIE_SAMESITE: ${COOKIE_SAMESITE:-Lax} + + # Database + MONGODB_URI: mongodb://mongodb:27017/${MONGODB_DB:-accessible_video} + MONGODB_DB: ${MONGODB_DB:-accessible_video} + + # Redis + REDIS_URL: redis://redis:6379/0 + CELERY_BROKER_URL: redis://redis:6379/0 + CELERY_RESULT_BACKEND: redis://redis:6379/0 + + # GCP + GCP_PROJECT_ID: ${GCP_PROJECT_ID} + GCS_BUCKET: ${GCS_BUCKET:-accessible-video} + GOOGLE_APPLICATION_CREDENTIALS: /secrets/gcp-credentials.json + + # AI Services + GEMINI_API_KEY: ${GEMINI_API_KEY} + TRANSLATE_API_KEY: ${TRANSLATE_API_KEY:-} + ELEVENLABS_API_KEY: ${ELEVENLABS_API_KEY:-} + GOOGLE_TTS_CREDENTIALS: /secrets/gcp-credentials.json + + # Email + SENDGRID_API_KEY: ${SENDGRID_API_KEY:-} + EMAIL_FROM: ${EMAIL_FROM:-noreply@ai-sandbox.oliver.solutions} + CLIENT_BASE_URL: ${CLIENT_BASE_URL:-https://ai-sandbox.oliver.solutions/video-accessibility} + + # Microsoft Authentication + AZURE_CLIENT_ID: ${AZURE_CLIENT_ID:-} + AZURE_AUTHORITY: ${AZURE_AUTHORITY:-} + AZURE_REDIRECT_URI: ${AZURE_REDIRECT_URI:-} + + # CORS + CORS_ORIGINS: ${CORS_ORIGINS:-http://localhost:6001,http://localhost:5173,http://localhost:3000} + + # Observability + SENTRY_DSN: ${SENTRY_DSN:-} + volumes: + - ./secrets:/secrets:ro + - ffmpeg-worker-logs:/app/logs + networks: + - accessible-video-network + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" + # ============================================================================= # Networks # ============================================================================= @@ -237,3 +313,5 @@ volumes: name: accessible-video-api-logs worker-logs: name: accessible-video-worker-logs + ffmpeg-worker-logs: + name: accessible-video-ffmpeg-worker-logs