""" Cloud Run Jobs dispatcher — replaces Celery .delay() for heavy pipeline tasks. Heavy tasks (ingest, translate, render, rerender) are dispatched as Cloud Run Job executions. Each execution runs `python -m app.tasks.runner --task --job-id `. Light tasks (notify, embed_glossary) stay on the local Celery worker. Env vars: CLOUD_RUN_WORKER_JOB — Cloud Run Job name (default: va-worker) GCP_PROJECT_ID — GCP project (from settings) GCP_REGION — Cloud Run region (default: europe-west1) USE_CELERY_FALLBACK — set to "true" to use local Celery instead (local dev) """ from __future__ import annotations import os from typing import TYPE_CHECKING from ..core.logging import get_logger if TYPE_CHECKING: pass logger = get_logger(__name__) _JOB_NAME = os.environ.get("CLOUD_RUN_WORKER_JOB", "va-worker") _REGION = os.environ.get("GCP_REGION", "europe-west1") _USE_CELERY = os.environ.get("USE_CELERY_FALLBACK", "false").lower() == "true" def _job_resource(project: str) -> str: return f"projects/{project}/locations/{_REGION}/jobs/{_JOB_NAME}" async def dispatch(task: str, job_id: str, **extra_args: str | list) -> str: """ Dispatch a heavy task to Cloud Run Jobs. Returns the Cloud Run Operation name (useful for tracking). Falls back to local Celery when USE_CELERY_FALLBACK=true (local dev). """ if _USE_CELERY: return _celery_fallback(task, job_id, **extra_args) from google.cloud import run_v2 # type: ignore[import] from ..core.config import settings args = ["--task", task, "--job-id", job_id] for key, val in extra_args.items(): cli_key = f"--{key.replace('_', '-')}" if isinstance(val, list): args += [cli_key, ",".join(str(v) for v in val)] elif val is not None: args += [cli_key, str(val)] client = run_v2.JobsAsyncClient() request = run_v2.RunJobRequest( name=_job_resource(settings.gcp_project_id), overrides=run_v2.RunJobRequest.Overrides( container_overrides=[ run_v2.RunJobRequest.Overrides.ContainerOverride(args=args) ] ), ) logger.info("Dispatching Cloud Run Job: task=%s job_id=%s args=%s", task, job_id, args) operation = await client.run_job(request=request) op_name = operation.operation.name logger.info("Cloud Run Job dispatched: %s", op_name) return op_name def _celery_fallback(task: str, job_id: str, **extra_args) -> str: """Use local Celery when Cloud Run is not available (dev/test).""" logger.warning("USE_CELERY_FALLBACK=true — dispatching via local Celery: task=%s", task) if task == "ingest": from ..tasks.ingest_and_ai import ingest_and_ai_task ingest_and_ai_task.delay(job_id) elif task == "translate": from ..tasks.translate_and_synthesize import translate_and_synthesize_task _langs = extra_args.get("languages") if isinstance(_langs, str): _langs = [lang for lang in _langs.split(",") if lang] translate_and_synthesize_task.delay(job_id, languages=_langs or None) elif task == "render": from ..tasks.render_accessible_video import render_accessible_video_task render_accessible_video_task.delay(job_id, extra_args.get("language", "en")) elif task == "rerender": from ..tasks.rerender_accessible_video import rerender_accessible_video_task rerender_accessible_video_task.delay( job_id, extra_args.get("language", "en"), extra_args.get("regenerate_cues", []), extra_args.get("whisper_refine", False), ) else: raise ValueError(f"Unknown task: {task}") return f"celery:{task}:{job_id}"