perf: parallelize video-native translations with asyncio.gather

Video-native translation mode now processes all target languages in parallel
using asyncio.gather() with a semaphore (max 3 concurrent) for rate limiting.
This significantly reduces total translation time when multiple languages
are selected.

- Add MAX_CONCURRENT_VIDEO_NATIVE constant for rate limiting
- Refactor video-native path to use parallel coroutines
- Keep traditional VTT translation mode sequential
- Handle per-language errors without stopping other translations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
michael 2026-01-01 09:21:07 -06:00
parent d08c20914e
commit 77be93b526

View file

@ -20,6 +20,9 @@ from . import celery_app
logger = get_logger(__name__)
# Maximum concurrent video-native translations (Gemini API rate limiting)
MAX_CONCURRENT_VIDEO_NATIVE = 3
def broadcast_status_update(job_id: str, status: str, job_title: str = None, message: str = None, progress: int = None):
"""
@ -206,94 +209,146 @@ async def _async_translate_and_synthesize(job_id: str):
source_ad_vtt = ad_blob.download_as_text()
try:
for language in requested_languages:
if language == source_language:
continue # Skip source language as it's already processed
# Get target languages (exclude source)
target_languages = [lang for lang in requested_languages if lang != source_language]
logger.info(f"Processing language: {language} (from source: {source_language}, mode: {translation_mode})")
if translation_mode == "video_native":
# VIDEO NATIVE MODE: Process all languages in parallel with rate limiting
# This generates VTTs from scratch with visual context for each language
# Note: Transcreation is NOT applicable - video_native replaces it
try:
if translation_mode == "video_native":
# VIDEO NATIVE MODE: Re-process video with Gemini for target language
# This generates VTTs from scratch with visual context
# Note: Transcreation is NOT applicable - video_native replaces it
semaphore = asyncio.Semaphore(MAX_CONCURRENT_VIDEO_NATIVE)
async def extract_targeted():
return await gemini_service.extract_accessibility_targeted(
video_local_path,
language
async def translate_language_video_native(lang: str) -> tuple[str, str, str, str | None]:
"""Process a single language with video-native translation.
Returns: (language, captions_gcs_uri, ad_gcs_uri, error_message or None)
"""
async with semaphore:
logger.info(f"Starting video-native translation for {lang} (from source: {source_language})")
try:
async def extract_targeted():
return await gemini_service.extract_accessibility_targeted(
video_local_path,
lang
)
result = await retry_with_backoff(extract_targeted, max_retries=3)
translated_captions = result["captions_vtt"]
translated_ad = result["audio_description_vtt"]
# Upload translated VTT files
captions_gcs_uri = await upload_vtt_to_gcs(
translated_captions,
f"{job_id}/{lang}/captions.vtt"
)
result = await retry_with_backoff(extract_targeted, max_retries=3)
translated_captions = result["captions_vtt"]
translated_ad = result["audio_description_vtt"]
origin = "video_native"
elif language in transcreation_languages:
# TRADITIONAL MODE with transcreation: cultural adaptation
async def transcreate():
return await gemini_service.transcreate_content(
source_captions_vtt,
source_ad_vtt,
language,
brief="Standard accessibility content"
ad_gcs_uri = await upload_vtt_to_gcs(
translated_ad,
f"{job_id}/{lang}/ad.vtt"
)
result = await retry_with_backoff(transcreate, max_retries=3)
translated_captions = result["captions_vtt"]
translated_ad = result["audio_description_vtt"]
origin = "transcreate"
logger.info(f"Completed video-native translation for {lang}")
return (lang, captions_gcs_uri, ad_gcs_uri, None)
else:
# TRADITIONAL MODE: Use Gemini translation (6-36x cheaper than Google Translate API)
async def translate_captions():
return await gemini_service.translate_vtt(
source_captions_vtt, language, source_language=source_language
)
except Exception as e:
logger.error(f"Video-native translation failed for {lang}: {e}")
return (lang, None, None, str(e))
async def translate_ad():
return await gemini_service.translate_vtt(
source_ad_vtt, language, source_language=source_language
)
# Run all translations in parallel (limited by semaphore)
if target_languages:
logger.info(f"Starting parallel video-native translations for {len(target_languages)} languages: {target_languages}")
tasks = [translate_language_video_native(lang) for lang in target_languages]
results = await asyncio.gather(*tasks, return_exceptions=True)
translated_captions = await retry_with_backoff(translate_captions, max_retries=3)
translated_ad = await retry_with_backoff(translate_ad, max_retries=3)
origin = "gemini_translate"
# Process results
for i, result in enumerate(results):
lang = target_languages[i]
if isinstance(result, Exception):
# Unexpected exception from gather
logger.error(f"Unexpected error for {lang}: {result}")
updated_outputs[lang] = {
"origin": "video_native",
"qa_notes": f"Translation failed: {str(result)}"
}
else:
lang, captions_uri, ad_uri, error_msg = result
if error_msg:
updated_outputs[lang] = {
"origin": "video_native",
"qa_notes": f"Translation failed: {error_msg}"
}
else:
updated_outputs[lang] = {
"captions_vtt_gcs": captions_uri,
"ad_vtt_gcs": ad_uri,
"origin": "video_native"
}
logger.info(f"Successfully processed VTT files for language: {lang} (origin: video_native)")
# Upload translated VTT files
captions_gcs_uri = await upload_vtt_to_gcs(
translated_captions,
f"{job_id}/{language}/captions.vtt"
)
else:
# TRADITIONAL MODE: Process languages sequentially
for language in target_languages:
logger.info(f"Processing language: {language} (from source: {source_language}, mode: {translation_mode})")
ad_gcs_uri = await upload_vtt_to_gcs(
translated_ad,
f"{job_id}/{language}/ad.vtt"
)
try:
if language in transcreation_languages:
# TRADITIONAL MODE with transcreation: cultural adaptation
async def transcreate():
return await gemini_service.transcreate_content(
source_captions_vtt,
source_ad_vtt,
language,
brief="Standard accessibility content"
)
# Store language outputs
updated_outputs[language] = {
"captions_vtt_gcs": captions_gcs_uri,
"ad_vtt_gcs": ad_gcs_uri,
"origin": origin
}
result = await retry_with_backoff(transcreate, max_retries=3)
translated_captions = result["captions_vtt"]
translated_ad = result["audio_description_vtt"]
origin = "transcreate"
logger.info(f"Successfully processed VTT files for language: {language} (origin: {origin})")
else:
# TRADITIONAL MODE: Use Gemini translation (6-36x cheaper than Google Translate API)
async def translate_captions():
return await gemini_service.translate_vtt(
source_captions_vtt, language, source_language=source_language
)
except Exception as e:
logger.error(f"Failed to process language {language}: {e}")
# Determine origin based on mode
if translation_mode == "video_native":
fallback_origin = "video_native"
elif language in transcreation_languages:
fallback_origin = "transcreate"
else:
fallback_origin = "gemini_translate"
async def translate_ad():
return await gemini_service.translate_vtt(
source_ad_vtt, language, source_language=source_language
)
updated_outputs[language] = {
"origin": fallback_origin,
"qa_notes": f"Translation failed: {str(e)}"
}
translated_captions = await retry_with_backoff(translate_captions, max_retries=3)
translated_ad = await retry_with_backoff(translate_ad, max_retries=3)
origin = "gemini_translate"
# Upload translated VTT files
captions_gcs_uri = await upload_vtt_to_gcs(
translated_captions,
f"{job_id}/{language}/captions.vtt"
)
ad_gcs_uri = await upload_vtt_to_gcs(
translated_ad,
f"{job_id}/{language}/ad.vtt"
)
# Store language outputs
updated_outputs[language] = {
"captions_vtt_gcs": captions_gcs_uri,
"ad_vtt_gcs": ad_gcs_uri,
"origin": origin
}
logger.info(f"Successfully processed VTT files for language: {language} (origin: {origin})")
except Exception as e:
logger.error(f"Failed to process language {language}: {e}")
fallback_origin = "transcreate" if language in transcreation_languages else "gemini_translate"
updated_outputs[language] = {
"origin": fallback_origin,
"qa_notes": f"Translation failed: {str(e)}"
}
finally:
# Cleanup temporary video file if created