feat(infra): move heavy workers to Cloud Run Jobs

Heavy pipeline tasks (ingest, translate, render, rerender) now dispatch
to a Cloud Run Job (va-worker) instead of local Celery workers. optical-dev
runs only api + lightweight worker (notify/embed) within its 2-CPU budget.

- backend/app/tasks/runner.py — Cloud Run Job entrypoint
- backend/app/services/cloud_run_dispatch.py — replaces .delay() for heavy tasks
- backend/Dockerfile.cloudrun — Cloud Run worker image (ffmpeg included)
- docker-compose.optical-dev.yml — 2-CPU safe overrides, disables heavy workers
- cloudbuild.yaml — builds va-worker image and updates Cloud Run Job
- deploy-dev.sh — uses 3-file compose, builds only api+worker locally
- routes_jobs, routes_admin_production, ingest_and_ai, translate_and_synthesize
  — all dispatch sites updated to use cloud_run_dispatch.dispatch()

USE_CELERY_FALLBACK=true in .env.local to use Celery locally during dev.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Vadym Samoilenko 2026-04-29 21:47:10 +01:00
parent f723e3f0bc
commit b3ace22009
10 changed files with 412 additions and 30 deletions

View file

@ -0,0 +1,55 @@
# =============================================================================
# Cloud Run Job image — va-worker
#
# Reuses the multi-stage base from Dockerfile.
# Entrypoint: python -m app.tasks.runner --task <name> --job-id <id>
#
# Build:
# docker build -f backend/Dockerfile.cloudrun -t va-worker backend/
# =============================================================================
# ── Stage 1: Builder ─────────────────────────────────────────────────────────
FROM python:3.11-slim AS builder
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential curl \
&& rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir poetry==1.8.3
WORKDIR /app
COPY pyproject.toml poetry.lock ./
RUN poetry config virtualenvs.create false \
&& poetry install --no-interaction --no-ansi --only main
# ── Stage 2: Runtime ─────────────────────────────────────────────────────────
FROM python:3.11-slim AS runtime
# ffmpeg required for video rendering tasks
RUN apt-get update && apt-get install -y --no-install-recommends \
ffmpeg \
tini \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy installed packages from builder
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
WORKDIR /app
COPY . .
# Non-root user for security
RUN groupadd -r worker && useradd -r -g worker worker \
&& chown -R worker:worker /app
USER worker
# Cloud Run Jobs: no persistent HTTP port needed.
# Cloud Run passes CLOUD_RUN_TASK_INDEX and CLOUD_RUN_TASK_COUNT env vars.
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONPATH=/app
ENTRYPOINT ["tini", "--", "python", "-m", "app.tasks.runner"]
# Args are injected per-execution via Cloud Run Job overrides:
# --task ingest|translate|render|rerender --job-id <id> [--language <lang>] ...

View file

@ -1,6 +1,8 @@
"""Admin production endpoints: failure dashboard and bulk retry."""
from datetime import datetime
from ...services.cloud_run_dispatch import dispatch as _cr_dispatch
from fastapi import APIRouter, Depends, HTTPException, Query, status
from motor.motor_asyncio import AsyncIOMotorDatabase
from pydantic import BaseModel
@ -141,12 +143,12 @@ async def bulk_retry(
)
if step in ("ingestion", "ai_processing"):
ingest_and_ai_task.delay(job_id)
await _cr_dispatch("ingest", job_id)
elif step in ("translation", "tts"):
translate_and_synthesize_task.delay(job_id)
await _cr_dispatch("translate", job_id)
elif step == "render":
from ...tasks.rerender_accessible_video import rerender_accessible_video_task
rerender_accessible_video_task.delay(job_id)
lang = job.get("last_render_language", "en")
await _cr_dispatch("rerender", job_id, language=lang)
retried.append(job_id)
except Exception as e:

View file

@ -1,6 +1,8 @@
import hashlib
from datetime import datetime
from ...services.cloud_run_dispatch import dispatch as _cr_dispatch
from bson import ObjectId
from fastapi import (
APIRouter,
@ -193,10 +195,8 @@ async def create_job(
logger.info(f"Task routing config: {celery_app.conf.task_routes}")
try:
# Use apply_async for more control and debugging
task = ingest_and_ai_task.apply_async(args=[job_id], queue='ingest')
logger.info(f"Task dispatched successfully: {task.id} for job {job_id} to queue 'ingest'")
logger.info(f"Task state: {task.state}")
logger.info(f"Task backend: {task.backend}")
await _cr_dispatch("ingest", job_id)
logger.info(f"Task dispatched to Cloud Run for job {job_id}")
# Try to get the task result to see if it was actually queued
try:
@ -1915,7 +1915,7 @@ async def retry_tts(
# Re-trigger the translation/TTS pipeline
try:
translate_and_synthesize_task.delay(job_id)
await _cr_dispatch("translate", job_id)
logger.info(f"Triggered TTS retry for job {job_id} by {current_user.email}")
except Exception as e:
logger.error(f"Failed to trigger TTS retry task for job {job_id}: {e}")
@ -2049,12 +2049,11 @@ async def retry_job(
try:
if step in ("ingestion", "ai_processing"):
ingest_and_ai_task.delay(job_id)
await _cr_dispatch("ingest", job_id)
elif step in ("translation", "tts"):
translate_and_synthesize_task.delay(job_id)
await _cr_dispatch("translate", job_id)
elif step == "render":
from ...tasks.rerender_accessible_video import rerender_accessible_video_task
rerender_accessible_video_task.delay(job_id)
await _cr_dispatch("rerender", job_id, language=job_doc.get("last_render_language", "en"))
logger.info(f"Triggered retry for job {job_id} step='{step}' by {current_user.email}")
except Exception as e:
logger.error(f"Failed to dispatch retry task for job {job_id}: {e}")
@ -2491,12 +2490,11 @@ async def trigger_accessible_video_rerender(
)
# Trigger re-render task
from ...tasks.rerender_accessible_video import rerender_accessible_video_task
rerender_accessible_video_task.delay(
job_id=job_id,
await _cr_dispatch(
"rerender", job_id,
language=language,
regenerate_cue_indices=regenerate_cues,
whisper_refine=request.whisper_refine
regenerate_cues=regenerate_cues,
whisper_refine=request.whisper_refine,
)
logger.info(
@ -2629,11 +2627,10 @@ async def update_tts_preferences(
)
# Trigger re-render task for this language
rerender_accessible_video_task.delay(
job_id=job_id,
await _cr_dispatch(
"rerender", job_id,
language=lang,
regenerate_cue_indices=all_cue_indices,
whisper_refine=False
regenerate_cues=all_cue_indices,
)
logger.info(

View file

@ -0,0 +1,96 @@
"""
Cloud Run Jobs dispatcher replaces Celery .delay() for heavy pipeline tasks.
Heavy tasks (ingest, translate, render, rerender) are dispatched as Cloud Run Job
executions. Each execution runs `python -m app.tasks.runner --task <name> --job-id <id>`.
Light tasks (notify, embed_glossary) stay on the local Celery worker.
Env vars:
CLOUD_RUN_WORKER_JOB Cloud Run Job name (default: va-worker)
GCP_PROJECT_ID GCP project (from settings)
GCP_REGION Cloud Run region (default: europe-west1)
USE_CELERY_FALLBACK set to "true" to use local Celery instead (local dev)
"""
from __future__ import annotations
import os
from typing import TYPE_CHECKING
from ..core.logging import get_logger
if TYPE_CHECKING:
pass
logger = get_logger(__name__)
_JOB_NAME = os.environ.get("CLOUD_RUN_WORKER_JOB", "va-worker")
_REGION = os.environ.get("GCP_REGION", "europe-west1")
_USE_CELERY = os.environ.get("USE_CELERY_FALLBACK", "false").lower() == "true"
def _job_resource(project: str) -> str:
return f"projects/{project}/locations/{_REGION}/jobs/{_JOB_NAME}"
async def dispatch(task: str, job_id: str, **extra_args: str | list) -> str:
"""
Dispatch a heavy task to Cloud Run Jobs.
Returns the Cloud Run Operation name (useful for tracking).
Falls back to local Celery when USE_CELERY_FALLBACK=true (local dev).
"""
if _USE_CELERY:
return _celery_fallback(task, job_id, **extra_args)
from ..core.config import settings
from google.cloud import run_v2 # type: ignore[import]
args = ["--task", task, "--job-id", job_id]
for key, val in extra_args.items():
cli_key = f"--{key.replace('_', '-')}"
if isinstance(val, list):
args += [cli_key, ",".join(str(v) for v in val)]
elif val is not None:
args += [cli_key, str(val)]
client = run_v2.JobsAsyncClient()
request = run_v2.RunJobRequest(
name=_job_resource(settings.gcp_project_id),
overrides=run_v2.RunJobRequest.Overrides(
container_overrides=[
run_v2.RunJobRequest.Overrides.ContainerOverride(args=args)
]
),
)
logger.info("Dispatching Cloud Run Job: task=%s job_id=%s args=%s", task, job_id, args)
operation = await client.run_job(request=request)
op_name = operation.operation.name
logger.info("Cloud Run Job dispatched: %s", op_name)
return op_name
def _celery_fallback(task: str, job_id: str, **extra_args) -> str:
"""Use local Celery when Cloud Run is not available (dev/test)."""
logger.warning("USE_CELERY_FALLBACK=true — dispatching via local Celery: task=%s", task)
if task == "ingest":
from ..tasks.ingest_and_ai import ingest_and_ai_task
ingest_and_ai_task.delay(job_id)
elif task == "translate":
from ..tasks.translate_and_synthesize import translate_and_synthesize_task
translate_and_synthesize_task.delay(job_id)
elif task == "render":
from ..tasks.render_accessible_video import render_accessible_video_task
render_accessible_video_task.delay(job_id, extra_args.get("language", "en"))
elif task == "rerender":
from ..tasks.rerender_accessible_video import rerender_accessible_video_task
rerender_accessible_video_task.delay(
job_id,
extra_args.get("language", "en"),
extra_args.get("regenerate_cues", []),
extra_args.get("whisper_refine", False),
)
else:
raise ValueError(f"Unknown task: {task}")
return f"celery:{task}:{job_id}"

View file

@ -283,10 +283,9 @@ async def ingest_and_ai_task_impl(job_id: str):
logger.info(f"AI processing complete for job {job_id}, triggering translation pipeline")
# Trigger translation and synthesis pipeline
# This will process all translations, TTS, and accessible video BEFORE QC review
from .translate_and_synthesize import translate_and_synthesize_task
translate_and_synthesize_task.delay(job_id)
# Trigger translation and synthesis pipeline via Cloud Run
from ..services.cloud_run_dispatch import dispatch as _cr_dispatch
await _cr_dispatch("translate", job_id)
finally:
# Clean up temp file

View file

@ -0,0 +1,88 @@
"""
Cloud Run Job entrypoint.
Usage:
python -m app.tasks.runner --task ingest --job-id <id>
python -m app.tasks.runner --task translate --job-id <id>
python -m app.tasks.runner --task render --job-id <id> --language en
python -m app.tasks.runner --task rerender --job-id <id> --language en \
--regenerate-cues 0,1,2 --whisper-refine
"""
import argparse
import asyncio
import sys
def main() -> None:
parser = argparse.ArgumentParser(description="Cloud Run Job task runner")
parser.add_argument(
"--task",
required=True,
choices=["ingest", "translate", "render", "rerender"],
help="Which pipeline task to run",
)
parser.add_argument("--job-id", required=True, help="MongoDB job _id")
parser.add_argument("--language", default=None, help="Language code (render/rerender only)")
parser.add_argument(
"--regenerate-cues",
default="",
help="Comma-separated cue indices to regenerate TTS (rerender only)",
)
parser.add_argument(
"--whisper-refine",
action="store_true",
help="Run Whisper pause-point refinement (rerender only)",
)
args = parser.parse_args()
job_id = args.job_id
task = args.task
print(f"[runner] task={task} job_id={job_id}", flush=True)
try:
if task == "ingest":
from app.tasks.ingest_and_ai import ingest_and_ai_task_impl
asyncio.run(ingest_and_ai_task_impl(job_id))
elif task == "translate":
from app.tasks.translate_and_synthesize import _async_translate_and_synthesize
asyncio.run(_async_translate_and_synthesize(job_id))
elif task == "render":
if not args.language:
print("[runner] ERROR: --language is required for task=render", file=sys.stderr)
sys.exit(1)
from app.tasks.render_accessible_video import _async_render_accessible_video
asyncio.run(_async_render_accessible_video(job_id, args.language))
elif task == "rerender":
if not args.language:
print("[runner] ERROR: --language is required for task=rerender", file=sys.stderr)
sys.exit(1)
cue_indices = (
[int(x) for x in args.regenerate_cues.split(",") if x.strip()]
if args.regenerate_cues
else []
)
from app.tasks.rerender_accessible_video import _async_rerender_accessible_video
asyncio.run(
_async_rerender_accessible_video(
job_id,
args.language,
cue_indices,
args.whisper_refine,
)
)
except Exception as exc:
print(f"[runner] FAILED task={task} job_id={job_id}: {exc}", file=sys.stderr, flush=True)
import traceback
traceback.print_exc()
sys.exit(1)
print(f"[runner] DONE task={task} job_id={job_id}", flush=True)
if __name__ == "__main__":
main()

View file

@ -995,7 +995,8 @@ async def _generate_language_tts(job_id: str, language: str, lang_output: dict,
}
)
render_accessible_video_task.delay(job_id, language)
from ..services.cloud_run_dispatch import dispatch as _cr_dispatch
await _cr_dispatch("render", job_id, language=language)
logger.info(f"Triggered accessible video rendering for job {job_id}/{language}")
except TTSSynthesisError:

57
cloudbuild.yaml Normal file
View file

@ -0,0 +1,57 @@
# =============================================================================
# Cloud Build — build va-worker image and push to Artifact Registry
#
# Trigger: manual or on push to main
# Usage:
# gcloud builds submit --config cloudbuild.yaml .
# =============================================================================
substitutions:
_REGION: europe-west1
_REPO: nexus
_IMAGE: va-worker
_TAG: $COMMIT_SHA # replaced with actual SHA by Cloud Build; use "latest" for manual runs
steps:
# ── Build Cloud Run worker image ──────────────────────────────────────────
- name: gcr.io/cloud-builders/docker
id: build-va-worker
args:
- build
- -f
- backend/Dockerfile.cloudrun
- -t
- ${_REGION}-docker.pkg.dev/$PROJECT_ID/${_REPO}/${_IMAGE}:${_TAG}
- -t
- ${_REGION}-docker.pkg.dev/$PROJECT_ID/${_REPO}/${_IMAGE}:latest
- backend/
# ── Push both tags ────────────────────────────────────────────────────────
- name: gcr.io/cloud-builders/docker
id: push-va-worker
args:
- push
- --all-tags
- ${_REGION}-docker.pkg.dev/$PROJECT_ID/${_REPO}/${_IMAGE}
# ── Update Cloud Run Job to use new image ─────────────────────────────────
- name: gcr.io/google.com/cloudsdktool/cloud-sdk
id: update-cloud-run-job
entrypoint: gcloud
args:
- run
- jobs
- update
- va-worker
- --image
- ${_REGION}-docker.pkg.dev/$PROJECT_ID/${_REPO}/${_IMAGE}:${_TAG}
- --region
- ${_REGION}
images:
- ${_REGION}-docker.pkg.dev/$PROJECT_ID/${_REPO}/${_IMAGE}:${_TAG}
- ${_REGION}-docker.pkg.dev/$PROJECT_ID/${_REPO}/${_IMAGE}:latest
options:
logging: CLOUD_LOGGING_ONLY
machineType: E2_HIGHCPU_8 # faster builds

View file

@ -0,0 +1,87 @@
# =============================================================================
# optical-dev overrides — 2 CPU / ~8 GB RAM server
#
# Heavy pipeline workers (ingest, translate, render, rerender) run on
# Cloud Run Jobs. Only lightweight services run here.
#
# Usage:
# docker compose -f docker-compose.yml \
# -f docker-compose.prod.yml \
# -f docker-compose.optical-dev.yml \
# --env-file .env.production up -d
# =============================================================================
services:
# ── Keep on this server, resource limits fit in 2 CPU ──────────────────────
mongodb:
deploy:
resources:
limits:
memory: 1G
cpus: '0.5'
reservations:
memory: 512M
cpus: '0.25'
redis:
deploy:
resources:
limits:
memory: 512M
cpus: '0.25'
reservations:
memory: 256M
cpus: '0.1'
api:
deploy:
resources:
limits:
memory: 2G
cpus: '1.0'
reservations:
memory: 1G
cpus: '0.5'
environment:
APP_ENV: prod
# Cloud Run dispatch config
CLOUD_RUN_WORKER_JOB: va-worker
GCP_REGION: europe-west1
USE_CELERY_FALLBACK: "false"
# Lightweight worker: only notify + embed_glossary tasks
# Heavy tasks (ingest/translate/render) go to Cloud Run Jobs
worker:
deploy:
resources:
limits:
memory: 512M
cpus: '0.25'
reservations:
memory: 256M
cpus: '0.1'
environment:
APP_ENV: prod
# Only consume lightweight queues; heavy queues handled by Cloud Run
CELERY_QUEUES: "notify,embed"
command: >
celery -A app.tasks worker
--loglevel=info
--queues=notify,embed
--concurrency=2
--hostname=lite-worker@%h
# ── Disabled on optical-dev — run on Cloud Run Jobs instead ───────────────
ffmpeg-worker:
deploy:
replicas: 0
tts-worker:
deploy:
replicas: 0
whisper-worker:
deploy:
replicas: 0

View file

@ -18,12 +18,12 @@ PROJECT_DIR="/opt/video-accessibility"
WEBROOT="/var/www/html/video-accessibility"
APACHE_CONF_DIR="/etc/apache2/sites-available"
APACHE_VHOST="optical-dev.oliver.solutions.conf"
COMPOSE="docker compose -f docker-compose.yml -f docker-compose.prod.yml --env-file .env.production"
COMPOSE="docker compose -f docker-compose.yml -f docker-compose.prod.yml -f docker-compose.optical-dev.yml --env-file .env.production"
API_INTERNAL_PORT=8000 # host port the api container exposes
VITE_BASE="/video-accessibility"
# Services built sequentially — whisper last (downloads ~1.5 GB model on first build)
BUILD_SERVICES="api worker ffmpeg-worker tts-worker whisper-worker"
# Only build images that run on optical-dev; heavy workers run on Cloud Run Jobs
BUILD_SERVICES="api worker"
# ── Flags ─────────────────────────────────────────────────────────────────────
SKIP_BUILD=false