From 77be93b5264779bb75f22cbf7fc42ec126fe97eb Mon Sep 17 00:00:00 2001 From: michael Date: Thu, 1 Jan 2026 09:21:07 -0600 Subject: [PATCH] perf: parallelize video-native translations with asyncio.gather MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Video-native translation mode now processes all target languages in parallel using asyncio.gather() with a semaphore (max 3 concurrent) for rate limiting. This significantly reduces total translation time when multiple languages are selected. - Add MAX_CONCURRENT_VIDEO_NATIVE constant for rate limiting - Refactor video-native path to use parallel coroutines - Keep traditional VTT translation mode sequential - Handle per-language errors without stopping other translations 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/app/tasks/translate_and_synthesize.py | 199 +++++++++++------- 1 file changed, 127 insertions(+), 72 deletions(-) diff --git a/backend/app/tasks/translate_and_synthesize.py b/backend/app/tasks/translate_and_synthesize.py index 4f58023..70c31dc 100644 --- a/backend/app/tasks/translate_and_synthesize.py +++ b/backend/app/tasks/translate_and_synthesize.py @@ -20,6 +20,9 @@ from . import celery_app logger = get_logger(__name__) +# Maximum concurrent video-native translations (Gemini API rate limiting) +MAX_CONCURRENT_VIDEO_NATIVE = 3 + def broadcast_status_update(job_id: str, status: str, job_title: str = None, message: str = None, progress: int = None): """ @@ -206,94 +209,146 @@ async def _async_translate_and_synthesize(job_id: str): source_ad_vtt = ad_blob.download_as_text() try: - for language in requested_languages: - if language == source_language: - continue # Skip source language as it's already processed + # Get target languages (exclude source) + target_languages = [lang for lang in requested_languages if lang != source_language] - logger.info(f"Processing language: {language} (from source: {source_language}, mode: {translation_mode})") + if translation_mode == "video_native": + # VIDEO NATIVE MODE: Process all languages in parallel with rate limiting + # This generates VTTs from scratch with visual context for each language + # Note: Transcreation is NOT applicable - video_native replaces it - try: - if translation_mode == "video_native": - # VIDEO NATIVE MODE: Re-process video with Gemini for target language - # This generates VTTs from scratch with visual context - # Note: Transcreation is NOT applicable - video_native replaces it + semaphore = asyncio.Semaphore(MAX_CONCURRENT_VIDEO_NATIVE) - async def extract_targeted(): - return await gemini_service.extract_accessibility_targeted( - video_local_path, - language + async def translate_language_video_native(lang: str) -> tuple[str, str, str, str | None]: + """Process a single language with video-native translation. + Returns: (language, captions_gcs_uri, ad_gcs_uri, error_message or None) + """ + async with semaphore: + logger.info(f"Starting video-native translation for {lang} (from source: {source_language})") + try: + async def extract_targeted(): + return await gemini_service.extract_accessibility_targeted( + video_local_path, + lang + ) + + result = await retry_with_backoff(extract_targeted, max_retries=3) + translated_captions = result["captions_vtt"] + translated_ad = result["audio_description_vtt"] + + # Upload translated VTT files + captions_gcs_uri = await upload_vtt_to_gcs( + translated_captions, + f"{job_id}/{lang}/captions.vtt" ) - result = await retry_with_backoff(extract_targeted, max_retries=3) - translated_captions = result["captions_vtt"] - translated_ad = result["audio_description_vtt"] - origin = "video_native" - - elif language in transcreation_languages: - # TRADITIONAL MODE with transcreation: cultural adaptation - async def transcreate(): - return await gemini_service.transcreate_content( - source_captions_vtt, - source_ad_vtt, - language, - brief="Standard accessibility content" + ad_gcs_uri = await upload_vtt_to_gcs( + translated_ad, + f"{job_id}/{lang}/ad.vtt" ) - result = await retry_with_backoff(transcreate, max_retries=3) - translated_captions = result["captions_vtt"] - translated_ad = result["audio_description_vtt"] - origin = "transcreate" + logger.info(f"Completed video-native translation for {lang}") + return (lang, captions_gcs_uri, ad_gcs_uri, None) - else: - # TRADITIONAL MODE: Use Gemini translation (6-36x cheaper than Google Translate API) - async def translate_captions(): - return await gemini_service.translate_vtt( - source_captions_vtt, language, source_language=source_language - ) + except Exception as e: + logger.error(f"Video-native translation failed for {lang}: {e}") + return (lang, None, None, str(e)) - async def translate_ad(): - return await gemini_service.translate_vtt( - source_ad_vtt, language, source_language=source_language - ) + # Run all translations in parallel (limited by semaphore) + if target_languages: + logger.info(f"Starting parallel video-native translations for {len(target_languages)} languages: {target_languages}") + tasks = [translate_language_video_native(lang) for lang in target_languages] + results = await asyncio.gather(*tasks, return_exceptions=True) - translated_captions = await retry_with_backoff(translate_captions, max_retries=3) - translated_ad = await retry_with_backoff(translate_ad, max_retries=3) - origin = "gemini_translate" + # Process results + for i, result in enumerate(results): + lang = target_languages[i] + if isinstance(result, Exception): + # Unexpected exception from gather + logger.error(f"Unexpected error for {lang}: {result}") + updated_outputs[lang] = { + "origin": "video_native", + "qa_notes": f"Translation failed: {str(result)}" + } + else: + lang, captions_uri, ad_uri, error_msg = result + if error_msg: + updated_outputs[lang] = { + "origin": "video_native", + "qa_notes": f"Translation failed: {error_msg}" + } + else: + updated_outputs[lang] = { + "captions_vtt_gcs": captions_uri, + "ad_vtt_gcs": ad_uri, + "origin": "video_native" + } + logger.info(f"Successfully processed VTT files for language: {lang} (origin: video_native)") - # Upload translated VTT files - captions_gcs_uri = await upload_vtt_to_gcs( - translated_captions, - f"{job_id}/{language}/captions.vtt" - ) + else: + # TRADITIONAL MODE: Process languages sequentially + for language in target_languages: + logger.info(f"Processing language: {language} (from source: {source_language}, mode: {translation_mode})") - ad_gcs_uri = await upload_vtt_to_gcs( - translated_ad, - f"{job_id}/{language}/ad.vtt" - ) + try: + if language in transcreation_languages: + # TRADITIONAL MODE with transcreation: cultural adaptation + async def transcreate(): + return await gemini_service.transcreate_content( + source_captions_vtt, + source_ad_vtt, + language, + brief="Standard accessibility content" + ) - # Store language outputs - updated_outputs[language] = { - "captions_vtt_gcs": captions_gcs_uri, - "ad_vtt_gcs": ad_gcs_uri, - "origin": origin - } + result = await retry_with_backoff(transcreate, max_retries=3) + translated_captions = result["captions_vtt"] + translated_ad = result["audio_description_vtt"] + origin = "transcreate" - logger.info(f"Successfully processed VTT files for language: {language} (origin: {origin})") + else: + # TRADITIONAL MODE: Use Gemini translation (6-36x cheaper than Google Translate API) + async def translate_captions(): + return await gemini_service.translate_vtt( + source_captions_vtt, language, source_language=source_language + ) - except Exception as e: - logger.error(f"Failed to process language {language}: {e}") - # Determine origin based on mode - if translation_mode == "video_native": - fallback_origin = "video_native" - elif language in transcreation_languages: - fallback_origin = "transcreate" - else: - fallback_origin = "gemini_translate" + async def translate_ad(): + return await gemini_service.translate_vtt( + source_ad_vtt, language, source_language=source_language + ) - updated_outputs[language] = { - "origin": fallback_origin, - "qa_notes": f"Translation failed: {str(e)}" - } + translated_captions = await retry_with_backoff(translate_captions, max_retries=3) + translated_ad = await retry_with_backoff(translate_ad, max_retries=3) + origin = "gemini_translate" + + # Upload translated VTT files + captions_gcs_uri = await upload_vtt_to_gcs( + translated_captions, + f"{job_id}/{language}/captions.vtt" + ) + + ad_gcs_uri = await upload_vtt_to_gcs( + translated_ad, + f"{job_id}/{language}/ad.vtt" + ) + + # Store language outputs + updated_outputs[language] = { + "captions_vtt_gcs": captions_gcs_uri, + "ad_vtt_gcs": ad_gcs_uri, + "origin": origin + } + + logger.info(f"Successfully processed VTT files for language: {language} (origin: {origin})") + + except Exception as e: + logger.error(f"Failed to process language {language}: {e}") + fallback_origin = "transcreate" if language in transcreation_languages else "gemini_translate" + updated_outputs[language] = { + "origin": fallback_origin, + "qa_notes": f"Translation failed: {str(e)}" + } finally: # Cleanup temporary video file if created