From b3ace220091f8b527bb28b5de709b808d6e5291e Mon Sep 17 00:00:00 2001 From: Vadym Samoilenko Date: Wed, 29 Apr 2026 21:47:10 +0100 Subject: [PATCH] feat(infra): move heavy workers to Cloud Run Jobs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Heavy pipeline tasks (ingest, translate, render, rerender) now dispatch to a Cloud Run Job (va-worker) instead of local Celery workers. optical-dev runs only api + lightweight worker (notify/embed) within its 2-CPU budget. - backend/app/tasks/runner.py — Cloud Run Job entrypoint - backend/app/services/cloud_run_dispatch.py — replaces .delay() for heavy tasks - backend/Dockerfile.cloudrun — Cloud Run worker image (ffmpeg included) - docker-compose.optical-dev.yml — 2-CPU safe overrides, disables heavy workers - cloudbuild.yaml — builds va-worker image and updates Cloud Run Job - deploy-dev.sh — uses 3-file compose, builds only api+worker locally - routes_jobs, routes_admin_production, ingest_and_ai, translate_and_synthesize — all dispatch sites updated to use cloud_run_dispatch.dispatch() USE_CELERY_FALLBACK=true in .env.local to use Celery locally during dev. Co-Authored-By: Claude Sonnet 4.6 --- backend/Dockerfile.cloudrun | 55 +++++++++++ backend/app/api/v1/routes_admin_production.py | 10 +- backend/app/api/v1/routes_jobs.py | 33 +++---- backend/app/services/cloud_run_dispatch.py | 96 +++++++++++++++++++ backend/app/tasks/ingest_and_ai.py | 7 +- backend/app/tasks/runner.py | 88 +++++++++++++++++ backend/app/tasks/translate_and_synthesize.py | 3 +- cloudbuild.yaml | 57 +++++++++++ docker-compose.optical-dev.yml | 87 +++++++++++++++++ scripts/deploy-dev.sh | 6 +- 10 files changed, 412 insertions(+), 30 deletions(-) create mode 100644 backend/Dockerfile.cloudrun create mode 100644 backend/app/services/cloud_run_dispatch.py create mode 100644 backend/app/tasks/runner.py create mode 100644 cloudbuild.yaml create mode 100644 docker-compose.optical-dev.yml diff --git a/backend/Dockerfile.cloudrun b/backend/Dockerfile.cloudrun new file mode 100644 index 0000000..98acd1d --- /dev/null +++ b/backend/Dockerfile.cloudrun @@ -0,0 +1,55 @@ +# ============================================================================= +# Cloud Run Job image — va-worker +# +# Reuses the multi-stage base from Dockerfile. +# Entrypoint: python -m app.tasks.runner --task --job-id +# +# Build: +# docker build -f backend/Dockerfile.cloudrun -t va-worker backend/ +# ============================================================================= + +# ── Stage 1: Builder ───────────────────────────────────────────────────────── +FROM python:3.11-slim AS builder + +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential curl \ + && rm -rf /var/lib/apt/lists/* + +RUN pip install --no-cache-dir poetry==1.8.3 + +WORKDIR /app +COPY pyproject.toml poetry.lock ./ +RUN poetry config virtualenvs.create false \ + && poetry install --no-interaction --no-ansi --only main + +# ── Stage 2: Runtime ───────────────────────────────────────────────────────── +FROM python:3.11-slim AS runtime + +# ffmpeg required for video rendering tasks +RUN apt-get update && apt-get install -y --no-install-recommends \ + ffmpeg \ + tini \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Copy installed packages from builder +COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin + +WORKDIR /app +COPY . . + +# Non-root user for security +RUN groupadd -r worker && useradd -r -g worker worker \ + && chown -R worker:worker /app +USER worker + +# Cloud Run Jobs: no persistent HTTP port needed. +# Cloud Run passes CLOUD_RUN_TASK_INDEX and CLOUD_RUN_TASK_COUNT env vars. +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PYTHONPATH=/app + +ENTRYPOINT ["tini", "--", "python", "-m", "app.tasks.runner"] +# Args are injected per-execution via Cloud Run Job overrides: +# --task ingest|translate|render|rerender --job-id [--language ] ... diff --git a/backend/app/api/v1/routes_admin_production.py b/backend/app/api/v1/routes_admin_production.py index a6ffd48..d18b262 100644 --- a/backend/app/api/v1/routes_admin_production.py +++ b/backend/app/api/v1/routes_admin_production.py @@ -1,6 +1,8 @@ """Admin production endpoints: failure dashboard and bulk retry.""" from datetime import datetime +from ...services.cloud_run_dispatch import dispatch as _cr_dispatch + from fastapi import APIRouter, Depends, HTTPException, Query, status from motor.motor_asyncio import AsyncIOMotorDatabase from pydantic import BaseModel @@ -141,12 +143,12 @@ async def bulk_retry( ) if step in ("ingestion", "ai_processing"): - ingest_and_ai_task.delay(job_id) + await _cr_dispatch("ingest", job_id) elif step in ("translation", "tts"): - translate_and_synthesize_task.delay(job_id) + await _cr_dispatch("translate", job_id) elif step == "render": - from ...tasks.rerender_accessible_video import rerender_accessible_video_task - rerender_accessible_video_task.delay(job_id) + lang = job.get("last_render_language", "en") + await _cr_dispatch("rerender", job_id, language=lang) retried.append(job_id) except Exception as e: diff --git a/backend/app/api/v1/routes_jobs.py b/backend/app/api/v1/routes_jobs.py index 5f838f4..f80d0ba 100644 --- a/backend/app/api/v1/routes_jobs.py +++ b/backend/app/api/v1/routes_jobs.py @@ -1,6 +1,8 @@ import hashlib from datetime import datetime +from ...services.cloud_run_dispatch import dispatch as _cr_dispatch + from bson import ObjectId from fastapi import ( APIRouter, @@ -193,10 +195,8 @@ async def create_job( logger.info(f"Task routing config: {celery_app.conf.task_routes}") try: # Use apply_async for more control and debugging - task = ingest_and_ai_task.apply_async(args=[job_id], queue='ingest') - logger.info(f"Task dispatched successfully: {task.id} for job {job_id} to queue 'ingest'") - logger.info(f"Task state: {task.state}") - logger.info(f"Task backend: {task.backend}") + await _cr_dispatch("ingest", job_id) + logger.info(f"Task dispatched to Cloud Run for job {job_id}") # Try to get the task result to see if it was actually queued try: @@ -1915,7 +1915,7 @@ async def retry_tts( # Re-trigger the translation/TTS pipeline try: - translate_and_synthesize_task.delay(job_id) + await _cr_dispatch("translate", job_id) logger.info(f"Triggered TTS retry for job {job_id} by {current_user.email}") except Exception as e: logger.error(f"Failed to trigger TTS retry task for job {job_id}: {e}") @@ -2049,12 +2049,11 @@ async def retry_job( try: if step in ("ingestion", "ai_processing"): - ingest_and_ai_task.delay(job_id) + await _cr_dispatch("ingest", job_id) elif step in ("translation", "tts"): - translate_and_synthesize_task.delay(job_id) + await _cr_dispatch("translate", job_id) elif step == "render": - from ...tasks.rerender_accessible_video import rerender_accessible_video_task - rerender_accessible_video_task.delay(job_id) + await _cr_dispatch("rerender", job_id, language=job_doc.get("last_render_language", "en")) logger.info(f"Triggered retry for job {job_id} step='{step}' by {current_user.email}") except Exception as e: logger.error(f"Failed to dispatch retry task for job {job_id}: {e}") @@ -2491,12 +2490,11 @@ async def trigger_accessible_video_rerender( ) # Trigger re-render task - from ...tasks.rerender_accessible_video import rerender_accessible_video_task - rerender_accessible_video_task.delay( - job_id=job_id, + await _cr_dispatch( + "rerender", job_id, language=language, - regenerate_cue_indices=regenerate_cues, - whisper_refine=request.whisper_refine + regenerate_cues=regenerate_cues, + whisper_refine=request.whisper_refine, ) logger.info( @@ -2629,11 +2627,10 @@ async def update_tts_preferences( ) # Trigger re-render task for this language - rerender_accessible_video_task.delay( - job_id=job_id, + await _cr_dispatch( + "rerender", job_id, language=lang, - regenerate_cue_indices=all_cue_indices, - whisper_refine=False + regenerate_cues=all_cue_indices, ) logger.info( diff --git a/backend/app/services/cloud_run_dispatch.py b/backend/app/services/cloud_run_dispatch.py new file mode 100644 index 0000000..b22f21d --- /dev/null +++ b/backend/app/services/cloud_run_dispatch.py @@ -0,0 +1,96 @@ +""" +Cloud Run Jobs dispatcher — replaces Celery .delay() for heavy pipeline tasks. + +Heavy tasks (ingest, translate, render, rerender) are dispatched as Cloud Run Job +executions. Each execution runs `python -m app.tasks.runner --task --job-id `. + +Light tasks (notify, embed_glossary) stay on the local Celery worker. + +Env vars: + CLOUD_RUN_WORKER_JOB — Cloud Run Job name (default: va-worker) + GCP_PROJECT_ID — GCP project (from settings) + GCP_REGION — Cloud Run region (default: europe-west1) + USE_CELERY_FALLBACK — set to "true" to use local Celery instead (local dev) +""" +from __future__ import annotations + +import os +from typing import TYPE_CHECKING + +from ..core.logging import get_logger + +if TYPE_CHECKING: + pass + +logger = get_logger(__name__) + +_JOB_NAME = os.environ.get("CLOUD_RUN_WORKER_JOB", "va-worker") +_REGION = os.environ.get("GCP_REGION", "europe-west1") +_USE_CELERY = os.environ.get("USE_CELERY_FALLBACK", "false").lower() == "true" + + +def _job_resource(project: str) -> str: + return f"projects/{project}/locations/{_REGION}/jobs/{_JOB_NAME}" + + +async def dispatch(task: str, job_id: str, **extra_args: str | list) -> str: + """ + Dispatch a heavy task to Cloud Run Jobs. + + Returns the Cloud Run Operation name (useful for tracking). + Falls back to local Celery when USE_CELERY_FALLBACK=true (local dev). + """ + if _USE_CELERY: + return _celery_fallback(task, job_id, **extra_args) + + from ..core.config import settings + from google.cloud import run_v2 # type: ignore[import] + + args = ["--task", task, "--job-id", job_id] + for key, val in extra_args.items(): + cli_key = f"--{key.replace('_', '-')}" + if isinstance(val, list): + args += [cli_key, ",".join(str(v) for v in val)] + elif val is not None: + args += [cli_key, str(val)] + + client = run_v2.JobsAsyncClient() + request = run_v2.RunJobRequest( + name=_job_resource(settings.gcp_project_id), + overrides=run_v2.RunJobRequest.Overrides( + container_overrides=[ + run_v2.RunJobRequest.Overrides.ContainerOverride(args=args) + ] + ), + ) + + logger.info("Dispatching Cloud Run Job: task=%s job_id=%s args=%s", task, job_id, args) + operation = await client.run_job(request=request) + op_name = operation.operation.name + logger.info("Cloud Run Job dispatched: %s", op_name) + return op_name + + +def _celery_fallback(task: str, job_id: str, **extra_args) -> str: + """Use local Celery when Cloud Run is not available (dev/test).""" + logger.warning("USE_CELERY_FALLBACK=true — dispatching via local Celery: task=%s", task) + if task == "ingest": + from ..tasks.ingest_and_ai import ingest_and_ai_task + ingest_and_ai_task.delay(job_id) + elif task == "translate": + from ..tasks.translate_and_synthesize import translate_and_synthesize_task + translate_and_synthesize_task.delay(job_id) + elif task == "render": + from ..tasks.render_accessible_video import render_accessible_video_task + render_accessible_video_task.delay(job_id, extra_args.get("language", "en")) + elif task == "rerender": + from ..tasks.rerender_accessible_video import rerender_accessible_video_task + rerender_accessible_video_task.delay( + job_id, + extra_args.get("language", "en"), + extra_args.get("regenerate_cues", []), + extra_args.get("whisper_refine", False), + ) + else: + raise ValueError(f"Unknown task: {task}") + return f"celery:{task}:{job_id}" diff --git a/backend/app/tasks/ingest_and_ai.py b/backend/app/tasks/ingest_and_ai.py index 3d10660..83f0136 100644 --- a/backend/app/tasks/ingest_and_ai.py +++ b/backend/app/tasks/ingest_and_ai.py @@ -283,10 +283,9 @@ async def ingest_and_ai_task_impl(job_id: str): logger.info(f"AI processing complete for job {job_id}, triggering translation pipeline") - # Trigger translation and synthesis pipeline - # This will process all translations, TTS, and accessible video BEFORE QC review - from .translate_and_synthesize import translate_and_synthesize_task - translate_and_synthesize_task.delay(job_id) + # Trigger translation and synthesis pipeline via Cloud Run + from ..services.cloud_run_dispatch import dispatch as _cr_dispatch + await _cr_dispatch("translate", job_id) finally: # Clean up temp file diff --git a/backend/app/tasks/runner.py b/backend/app/tasks/runner.py new file mode 100644 index 0000000..9271354 --- /dev/null +++ b/backend/app/tasks/runner.py @@ -0,0 +1,88 @@ +""" +Cloud Run Job entrypoint. + +Usage: + python -m app.tasks.runner --task ingest --job-id + python -m app.tasks.runner --task translate --job-id + python -m app.tasks.runner --task render --job-id --language en + python -m app.tasks.runner --task rerender --job-id --language en \ + --regenerate-cues 0,1,2 --whisper-refine +""" +import argparse +import asyncio +import sys + + +def main() -> None: + parser = argparse.ArgumentParser(description="Cloud Run Job task runner") + parser.add_argument( + "--task", + required=True, + choices=["ingest", "translate", "render", "rerender"], + help="Which pipeline task to run", + ) + parser.add_argument("--job-id", required=True, help="MongoDB job _id") + parser.add_argument("--language", default=None, help="Language code (render/rerender only)") + parser.add_argument( + "--regenerate-cues", + default="", + help="Comma-separated cue indices to regenerate TTS (rerender only)", + ) + parser.add_argument( + "--whisper-refine", + action="store_true", + help="Run Whisper pause-point refinement (rerender only)", + ) + args = parser.parse_args() + + job_id = args.job_id + task = args.task + + print(f"[runner] task={task} job_id={job_id}", flush=True) + + try: + if task == "ingest": + from app.tasks.ingest_and_ai import ingest_and_ai_task_impl + asyncio.run(ingest_and_ai_task_impl(job_id)) + + elif task == "translate": + from app.tasks.translate_and_synthesize import _async_translate_and_synthesize + asyncio.run(_async_translate_and_synthesize(job_id)) + + elif task == "render": + if not args.language: + print("[runner] ERROR: --language is required for task=render", file=sys.stderr) + sys.exit(1) + from app.tasks.render_accessible_video import _async_render_accessible_video + asyncio.run(_async_render_accessible_video(job_id, args.language)) + + elif task == "rerender": + if not args.language: + print("[runner] ERROR: --language is required for task=rerender", file=sys.stderr) + sys.exit(1) + cue_indices = ( + [int(x) for x in args.regenerate_cues.split(",") if x.strip()] + if args.regenerate_cues + else [] + ) + from app.tasks.rerender_accessible_video import _async_rerender_accessible_video + asyncio.run( + _async_rerender_accessible_video( + job_id, + args.language, + cue_indices, + args.whisper_refine, + ) + ) + + except Exception as exc: + print(f"[runner] FAILED task={task} job_id={job_id}: {exc}", file=sys.stderr, flush=True) + import traceback + traceback.print_exc() + sys.exit(1) + + print(f"[runner] DONE task={task} job_id={job_id}", flush=True) + + +if __name__ == "__main__": + main() diff --git a/backend/app/tasks/translate_and_synthesize.py b/backend/app/tasks/translate_and_synthesize.py index 498e843..f6afe54 100644 --- a/backend/app/tasks/translate_and_synthesize.py +++ b/backend/app/tasks/translate_and_synthesize.py @@ -995,7 +995,8 @@ async def _generate_language_tts(job_id: str, language: str, lang_output: dict, } ) - render_accessible_video_task.delay(job_id, language) + from ..services.cloud_run_dispatch import dispatch as _cr_dispatch + await _cr_dispatch("render", job_id, language=language) logger.info(f"Triggered accessible video rendering for job {job_id}/{language}") except TTSSynthesisError: diff --git a/cloudbuild.yaml b/cloudbuild.yaml new file mode 100644 index 0000000..b1224bf --- /dev/null +++ b/cloudbuild.yaml @@ -0,0 +1,57 @@ +# ============================================================================= +# Cloud Build — build va-worker image and push to Artifact Registry +# +# Trigger: manual or on push to main +# Usage: +# gcloud builds submit --config cloudbuild.yaml . +# ============================================================================= + +substitutions: + _REGION: europe-west1 + _REPO: nexus + _IMAGE: va-worker + _TAG: $COMMIT_SHA # replaced with actual SHA by Cloud Build; use "latest" for manual runs + +steps: + # ── Build Cloud Run worker image ────────────────────────────────────────── + - name: gcr.io/cloud-builders/docker + id: build-va-worker + args: + - build + - -f + - backend/Dockerfile.cloudrun + - -t + - ${_REGION}-docker.pkg.dev/$PROJECT_ID/${_REPO}/${_IMAGE}:${_TAG} + - -t + - ${_REGION}-docker.pkg.dev/$PROJECT_ID/${_REPO}/${_IMAGE}:latest + - backend/ + + # ── Push both tags ──────────────────────────────────────────────────────── + - name: gcr.io/cloud-builders/docker + id: push-va-worker + args: + - push + - --all-tags + - ${_REGION}-docker.pkg.dev/$PROJECT_ID/${_REPO}/${_IMAGE} + + # ── Update Cloud Run Job to use new image ───────────────────────────────── + - name: gcr.io/google.com/cloudsdktool/cloud-sdk + id: update-cloud-run-job + entrypoint: gcloud + args: + - run + - jobs + - update + - va-worker + - --image + - ${_REGION}-docker.pkg.dev/$PROJECT_ID/${_REPO}/${_IMAGE}:${_TAG} + - --region + - ${_REGION} + +images: + - ${_REGION}-docker.pkg.dev/$PROJECT_ID/${_REPO}/${_IMAGE}:${_TAG} + - ${_REGION}-docker.pkg.dev/$PROJECT_ID/${_REPO}/${_IMAGE}:latest + +options: + logging: CLOUD_LOGGING_ONLY + machineType: E2_HIGHCPU_8 # faster builds diff --git a/docker-compose.optical-dev.yml b/docker-compose.optical-dev.yml new file mode 100644 index 0000000..c4a8bf0 --- /dev/null +++ b/docker-compose.optical-dev.yml @@ -0,0 +1,87 @@ +# ============================================================================= +# optical-dev overrides — 2 CPU / ~8 GB RAM server +# +# Heavy pipeline workers (ingest, translate, render, rerender) run on +# Cloud Run Jobs. Only lightweight services run here. +# +# Usage: +# docker compose -f docker-compose.yml \ +# -f docker-compose.prod.yml \ +# -f docker-compose.optical-dev.yml \ +# --env-file .env.production up -d +# ============================================================================= + +services: + # ── Keep on this server, resource limits fit in 2 CPU ────────────────────── + + mongodb: + deploy: + resources: + limits: + memory: 1G + cpus: '0.5' + reservations: + memory: 512M + cpus: '0.25' + + redis: + deploy: + resources: + limits: + memory: 512M + cpus: '0.25' + reservations: + memory: 256M + cpus: '0.1' + + api: + deploy: + resources: + limits: + memory: 2G + cpus: '1.0' + reservations: + memory: 1G + cpus: '0.5' + environment: + APP_ENV: prod + # Cloud Run dispatch config + CLOUD_RUN_WORKER_JOB: va-worker + GCP_REGION: europe-west1 + USE_CELERY_FALLBACK: "false" + + # Lightweight worker: only notify + embed_glossary tasks + # Heavy tasks (ingest/translate/render) go to Cloud Run Jobs + worker: + deploy: + resources: + limits: + memory: 512M + cpus: '0.25' + reservations: + memory: 256M + cpus: '0.1' + environment: + APP_ENV: prod + # Only consume lightweight queues; heavy queues handled by Cloud Run + CELERY_QUEUES: "notify,embed" + command: > + celery -A app.tasks worker + --loglevel=info + --queues=notify,embed + --concurrency=2 + --hostname=lite-worker@%h + + # ── Disabled on optical-dev — run on Cloud Run Jobs instead ─────────────── + + ffmpeg-worker: + deploy: + replicas: 0 + + tts-worker: + deploy: + replicas: 0 + + whisper-worker: + deploy: + replicas: 0 diff --git a/scripts/deploy-dev.sh b/scripts/deploy-dev.sh index 552023f..0ad574a 100755 --- a/scripts/deploy-dev.sh +++ b/scripts/deploy-dev.sh @@ -18,12 +18,12 @@ PROJECT_DIR="/opt/video-accessibility" WEBROOT="/var/www/html/video-accessibility" APACHE_CONF_DIR="/etc/apache2/sites-available" APACHE_VHOST="optical-dev.oliver.solutions.conf" -COMPOSE="docker compose -f docker-compose.yml -f docker-compose.prod.yml --env-file .env.production" +COMPOSE="docker compose -f docker-compose.yml -f docker-compose.prod.yml -f docker-compose.optical-dev.yml --env-file .env.production" API_INTERNAL_PORT=8000 # host port the api container exposes VITE_BASE="/video-accessibility" -# Services built sequentially — whisper last (downloads ~1.5 GB model on first build) -BUILD_SERVICES="api worker ffmpeg-worker tts-worker whisper-worker" +# Only build images that run on optical-dev; heavy workers run on Cloud Run Jobs +BUILD_SERVICES="api worker" # ── Flags ───────────────────────────────────────────────────────────────────── SKIP_BUILD=false