""" Cloud Run Job entrypoint. Usage: python -m app.tasks.runner --task ingest --job-id python -m app.tasks.runner --task translate --job-id python -m app.tasks.runner --task render --job-id --language en python -m app.tasks.runner --task rerender --job-id --language en \ --regenerate-cues 0,1,2 --whisper-refine """ import argparse import asyncio import sys def main() -> None: parser = argparse.ArgumentParser(description="Cloud Run Job task runner") parser.add_argument( "--task", required=True, choices=["ingest", "translate", "render", "rerender"], help="Which pipeline task to run", ) parser.add_argument("--job-id", required=True, help="MongoDB job _id") parser.add_argument("--language", default=None, help="Language code (render/rerender only)") parser.add_argument( "--regenerate-cues", default="", help="Comma-separated cue indices to regenerate TTS (rerender only)", ) parser.add_argument( "--whisper-refine", action="store_true", help="Run Whisper pause-point refinement (rerender only)", ) args = parser.parse_args() job_id = args.job_id task = args.task print(f"[runner] task={task} job_id={job_id}", flush=True) try: if task == "ingest": from app.tasks.ingest_and_ai import ingest_and_ai_task_impl asyncio.run(ingest_and_ai_task_impl(job_id)) elif task == "translate": from app.tasks.translate_and_synthesize import ( _async_translate_and_synthesize, ) asyncio.run(_async_translate_and_synthesize(job_id)) elif task == "render": if not args.language: print("[runner] ERROR: --language is required for task=render", file=sys.stderr) sys.exit(1) from app.tasks.render_accessible_video import _async_render_accessible_video asyncio.run(_async_render_accessible_video(job_id, args.language)) elif task == "rerender": if not args.language: print("[runner] ERROR: --language is required for task=rerender", file=sys.stderr) sys.exit(1) cue_indices = ( [int(x) for x in args.regenerate_cues.split(",") if x.strip()] if args.regenerate_cues else [] ) from app.tasks.rerender_accessible_video import ( _async_rerender_accessible_video, ) asyncio.run( _async_rerender_accessible_video( job_id, args.language, cue_indices, args.whisper_refine, ) ) except Exception as exc: print(f"[runner] FAILED task={task} job_id={job_id}: {exc}", file=sys.stderr, flush=True) import traceback traceback.print_exc() sys.exit(1) print(f"[runner] DONE task={task} job_id={job_id}", flush=True) if __name__ == "__main__": main()