video-accessibility/backend/app/services/cloud_run_dispatch.py
Vadym Samoilenko fddf803b74 feat(translation): enforce EN-first pipeline with cue-preserving translations
All translations now derive strictly from the approved English master VTT,
eliminating the cue-count and timestamp drift reported by linguists
(e.g. PL AD = 11 cues vs EN AD = 17 cues).

Key changes:
- Remove video_native translation mode entirely; all languages go through
  translate_vtt() which guarantees 1:1 cue alignment with EN master
- Transcreation languages now use translate_vtt(style="transcreate") —
  same cue-preserving contract, culturally-adapted instructions
- Post-translation cue alignment validator added (VTTEditor.assert_cue_alignment)
- After ingestion, job moves to PENDING_QC (EN-only) instead of TRANSLATING;
  translation pipeline dispatches automatically when EN QC is approved
- New POST /jobs/{id}/retranslate-language endpoint for PM/admin to fix
  legacy video_native jobs on demand
- Frontend: origin badge (EN-aligned / transcreated / video-native warning),
  EN-first gate banner on target-language cards, Re-translate from EN button
  with confirm modal, removed translation mode selector from NewJob

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 12:11:35 +01:00

100 lines
3.7 KiB
Python

"""
Cloud Run Jobs dispatcher — replaces Celery .delay() for heavy pipeline tasks.
Heavy tasks (ingest, translate, render, rerender) are dispatched as Cloud Run Job
executions. Each execution runs `python -m app.tasks.runner --task <name> --job-id <id>`.
Light tasks (notify, embed_glossary) stay on the local Celery worker.
Env vars:
CLOUD_RUN_WORKER_JOB — Cloud Run Job name (default: va-worker)
GCP_PROJECT_ID — GCP project (from settings)
GCP_REGION — Cloud Run region (default: europe-west1)
USE_CELERY_FALLBACK — set to "true" to use local Celery instead (local dev)
"""
from __future__ import annotations
import os
from typing import TYPE_CHECKING
from ..core.logging import get_logger
if TYPE_CHECKING:
pass
logger = get_logger(__name__)
_JOB_NAME = os.environ.get("CLOUD_RUN_WORKER_JOB", "va-worker")
_REGION = os.environ.get("GCP_REGION", "europe-west1")
_USE_CELERY = os.environ.get("USE_CELERY_FALLBACK", "false").lower() == "true"
def _job_resource(project: str) -> str:
return f"projects/{project}/locations/{_REGION}/jobs/{_JOB_NAME}"
async def dispatch(task: str, job_id: str, **extra_args: str | list) -> str:
"""
Dispatch a heavy task to Cloud Run Jobs.
Returns the Cloud Run Operation name (useful for tracking).
Falls back to local Celery when USE_CELERY_FALLBACK=true (local dev).
"""
if _USE_CELERY:
return _celery_fallback(task, job_id, **extra_args)
from google.cloud import run_v2 # type: ignore[import]
from ..core.config import settings
args = ["--task", task, "--job-id", job_id]
for key, val in extra_args.items():
cli_key = f"--{key.replace('_', '-')}"
if isinstance(val, list):
args += [cli_key, ",".join(str(v) for v in val)]
elif val is not None:
args += [cli_key, str(val)]
client = run_v2.JobsAsyncClient()
request = run_v2.RunJobRequest(
name=_job_resource(settings.gcp_project_id),
overrides=run_v2.RunJobRequest.Overrides(
container_overrides=[
run_v2.RunJobRequest.Overrides.ContainerOverride(args=args)
]
),
)
logger.info("Dispatching Cloud Run Job: task=%s job_id=%s args=%s", task, job_id, args)
operation = await client.run_job(request=request)
op_name = operation.operation.name
logger.info("Cloud Run Job dispatched: %s", op_name)
return op_name
def _celery_fallback(task: str, job_id: str, **extra_args) -> str:
"""Use local Celery when Cloud Run is not available (dev/test)."""
logger.warning("USE_CELERY_FALLBACK=true — dispatching via local Celery: task=%s", task)
if task == "ingest":
from ..tasks.ingest_and_ai import ingest_and_ai_task
ingest_and_ai_task.delay(job_id)
elif task == "translate":
from ..tasks.translate_and_synthesize import translate_and_synthesize_task
_langs = extra_args.get("languages")
if isinstance(_langs, str):
_langs = [l for l in _langs.split(",") if l]
translate_and_synthesize_task.delay(job_id, languages=_langs or None)
elif task == "render":
from ..tasks.render_accessible_video import render_accessible_video_task
render_accessible_video_task.delay(job_id, extra_args.get("language", "en"))
elif task == "rerender":
from ..tasks.rerender_accessible_video import rerender_accessible_video_task
rerender_accessible_video_task.delay(
job_id,
extra_args.get("language", "en"),
extra_args.get("regenerate_cues", []),
extra_args.get("whisper_refine", False),
)
else:
raise ValueError(f"Unknown task: {task}")
return f"celery:{task}:{job_id}"