Add a dedicated Celery queue (ffmpeg) with concurrency=1 to serialize all FFmpeg operations. This prevents CPU spikes when multiple render tasks run in parallel with multiple languages. Changes: - Add ffmpeg_operations.py with run_ffmpeg_command and run_ffprobe_command tasks - Update VideoRendererService to dispatch ffmpeg commands via the queue - Add ffmpeg-worker service to docker-compose with --concurrency=1 - Configure main worker to exclude the ffmpeg queue 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
44 lines
1.6 KiB
Python
44 lines
1.6 KiB
Python
import sentry_sdk
|
|
from sentry_sdk.integrations.celery import CeleryIntegration
|
|
|
|
from app.core.config import settings
|
|
from app.core.logging import setup_logging, get_logger
|
|
from app.tasks import celery_app
|
|
|
|
# Set up logging first
|
|
setup_logging()
|
|
logger = get_logger(__name__)
|
|
|
|
# Initialize Sentry for worker
|
|
if settings.sentry_dsn and settings.sentry_dsn.startswith(('http', 'https')):
|
|
sentry_sdk.init(
|
|
dsn=settings.sentry_dsn,
|
|
integrations=[CeleryIntegration(monitor_beat_tasks=True)],
|
|
environment=settings.app_env,
|
|
release="1.0.0",
|
|
send_default_pii=False,
|
|
)
|
|
|
|
logger.info("Starting Celery worker with structured logging")
|
|
|
|
# Import task modules to register them
|
|
from app.tasks import ingest_and_ai
|
|
from app.tasks import translate_and_synthesize
|
|
from app.tasks import render_accessible_video
|
|
from app.tasks import ffmpeg_operations
|
|
|
|
# Debug: Show registered tasks
|
|
logger.info(f"Celery app: {celery_app}")
|
|
logger.info(f"Registered tasks: {list(celery_app.tasks.keys())}")
|
|
logger.info(f"Task routes: {celery_app.conf.task_routes}")
|
|
logger.info(f"Worker listening to queues: default,ingest,notify,render,ffmpeg")
|
|
|
|
# Specifically check for our translation task
|
|
if 'app.tasks.translate_and_synthesize.translate_and_synthesize_task' in celery_app.tasks:
|
|
logger.info("✅ translate_and_synthesize_task is registered")
|
|
else:
|
|
logger.error("❌ translate_and_synthesize_task is NOT registered")
|
|
logger.error(f"Available tasks: {[t for t in celery_app.tasks.keys() if not t.startswith('celery.')]}")
|
|
|
|
if __name__ == "__main__":
|
|
celery_app.start()
|