From fddf803b747f122cefcc73245f8b130af71fb627 Mon Sep 17 00:00:00 2001 From: Vadym Samoilenko Date: Wed, 6 May 2026 12:11:35 +0100 Subject: [PATCH] feat(translation): enforce EN-first pipeline with cue-preserving translations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All translations now derive strictly from the approved English master VTT, eliminating the cue-count and timestamp drift reported by linguists (e.g. PL AD = 11 cues vs EN AD = 17 cues). Key changes: - Remove video_native translation mode entirely; all languages go through translate_vtt() which guarantees 1:1 cue alignment with EN master - Transcreation languages now use translate_vtt(style="transcreate") — same cue-preserving contract, culturally-adapted instructions - Post-translation cue alignment validator added (VTTEditor.assert_cue_alignment) - After ingestion, job moves to PENDING_QC (EN-only) instead of TRANSLATING; translation pipeline dispatches automatically when EN QC is approved - New POST /jobs/{id}/retranslate-language endpoint for PM/admin to fix legacy video_native jobs on demand - Frontend: origin badge (EN-aligned / transcreated / video-native warning), EN-first gate banner on target-language cards, Re-translate from EN button with confirm modal, removed translation mode selector from NewJob Co-Authored-By: Claude Sonnet 4.6 --- backend/app/api/v1/routes_jobs.py | 77 ++++ backend/app/lib/vtt.py | 16 + backend/app/models/job.py | 2 +- backend/app/schemas/job.py | 5 + backend/app/services/cloud_run_dispatch.py | 5 +- backend/app/services/gemini.py | 14 +- backend/app/services/language_qc.py | 35 ++ backend/app/tasks/ingest_and_ai.py | 21 +- backend/app/tasks/translate_and_synthesize.py | 353 ++++-------------- frontend/src/lib/api.ts | 5 + frontend/src/routes/admin/QCDetail.tsx | 95 ++++- frontend/src/routes/jobs/NewJob.tsx | 57 +-- frontend/src/types/api.ts | 4 +- 13 files changed, 347 insertions(+), 342 deletions(-) diff --git a/backend/app/api/v1/routes_jobs.py b/backend/app/api/v1/routes_jobs.py index 96d3f54..55df8b5 100644 --- a/backend/app/api/v1/routes_jobs.py +++ b/backend/app/api/v1/routes_jobs.py @@ -58,6 +58,7 @@ from ...schemas.job import ( JobUpdateRequest, PromoteToQCRequest, RejectJobRequest, + RetranslateLanguageRequest, ReturnToQCRequest, UpdateTTSPreferencesRequest, UploadCompleteRequest, @@ -1869,6 +1870,82 @@ async def _trigger_retranslation(job_id: str, job_doc: dict, db, current_user) - translate_and_synthesize_task.delay(job_id, languages=target_languages, retranslate=True) +@router.post("/{job_id}/retranslate-language", response_model=JobResponse) +async def retranslate_language( + job_id: str, + request: RetranslateLanguageRequest, + http_request: Request = None, + current_user: User = Depends(require_roles(UserRole.PRODUCTION, UserRole.ADMIN)), + ctx: MembershipContext = Depends(get_membership_context), + db: AsyncIOMotorDatabase = Depends(get_database), +): + """Re-translate a single target language from the approved EN master VTT. + Used by PM/admin to fix drift on existing jobs (e.g. legacy video_native translations). + EN must be approved before retranslation can be triggered. + """ + job_doc = await get_job_or_403(job_id, ctx, db) + source_language = job_doc["source"].get("language", "en") + lang = request.language + + if lang == source_language: + raise HTTPException(status_code=400, detail="Cannot retranslate the source language") + + requested_langs = job_doc.get("requested_outputs", {}).get("languages", []) + if lang not in requested_langs: + raise HTTPException(status_code=400, detail=f"Language '{lang}' is not in the job's requested languages") + + # Enforce EN-first gate + source_qc = (job_doc.get("language_qc") or {}).get(source_language, {}) + if source_qc.get("status") != LanguageQCStatus.APPROVED.value: + raise HTTPException( + status_code=409, + detail=f"Source language '{source_language}' must be approved before retranslation" + ) + + now = datetime.utcnow() + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": { + "status": JobStatus.TRANSLATING.value, + f"language_qc.{lang}.status": LanguageQCStatus.PENDING.value, + f"language_qc.{lang}.approved_by": None, + f"language_qc.{lang}.approved_at": None, + "updated_at": now, + }, + "$push": { + "review.history": { + "at": now, + "status": JobStatus.TRANSLATING.value, + "by": str(current_user.id), + "notes": f"Retranslate '{lang}' from EN master. Reason: {request.reason or 'not provided'}", + } + }, + }, + ) + + await log_job_action( + AuditAction.VTT_RETRANSLATE, job_id, current_user, http_request, + details={"language": lang, "reason": request.reason}, + ) + + from ...tasks.translate_and_synthesize import translate_and_synthesize_task + translate_and_synthesize_task.delay(job_id, languages=[lang], retranslate=True) + + result = await db.jobs.find_one({"_id": job_id}) + return JobResponse( + id=str(result["_id"]), + title=result["title"], + status=result["status"], + source=result["source"], + requested_outputs=RequestedOutputs(**result["requested_outputs"]), + review=result.get("review", {"notes": "", "history": []}), + outputs=result.get("outputs"), + created_at=result["created_at"].isoformat(), + updated_at=result["updated_at"].isoformat(), + ) + + @router.post("/{job_id}/vtt/adjust-timing", response_model=JobResponse) async def adjust_vtt_timing( job_id: str, diff --git a/backend/app/lib/vtt.py b/backend/app/lib/vtt.py index 89f58bd..a9c771e 100644 --- a/backend/app/lib/vtt.py +++ b/backend/app/lib/vtt.py @@ -147,6 +147,22 @@ class VTTEditor: return VTTParser.build(cues) + @staticmethod + def assert_cue_alignment(en_vtt: str, target_vtt: str, lang: str) -> None: + """Raise ValueError if target VTT cue count or timestamps diverge from EN master.""" + en_cues = VTTParser.parse(en_vtt) + tgt_cues = VTTParser.parse(target_vtt) + if len(tgt_cues) != len(en_cues): + raise ValueError( + f"Cue count mismatch for {lang}: EN has {len(en_cues)}, target has {len(tgt_cues)}" + ) + for i, (en, tgt) in enumerate(zip(en_cues, tgt_cues)): + if en.start != tgt.start or en.end != tgt.end: + raise ValueError( + f"Timestamp mismatch for {lang} cue {i}: " + f"EN {en.start}-->{en.end}, target {tgt.start}-->{tgt.end}" + ) + @staticmethod def update_cue_text(vtt_content: str, cue_index: int, new_text: str) -> str: """Update text for a specific cue by index""" diff --git a/backend/app/models/job.py b/backend/app/models/job.py index 2bbf167..692dc12 100644 --- a/backend/app/models/job.py +++ b/backend/app/models/job.py @@ -80,7 +80,7 @@ class RequestedOutputs(BaseModel): languages: list[str] = [] transcreation: list[str] = [] tts_preferences: TTSPreferences | None = None - translation_mode: Literal["traditional", "video_native"] = "video_native" + translation_mode: Literal["traditional", "video_native"] = "traditional" class PausePointData(BaseModel): diff --git a/backend/app/schemas/job.py b/backend/app/schemas/job.py index 7c13fc2..af65a8e 100644 --- a/backend/app/schemas/job.py +++ b/backend/app/schemas/job.py @@ -192,3 +192,8 @@ class UploadCompleteRequest(BaseModel): deadline: str | None = None initial_linguist_id: str | None = None initial_reviewer_id: str | None = None + + +class RetranslateLanguageRequest(BaseModel): + language: str + reason: str | None = None diff --git a/backend/app/services/cloud_run_dispatch.py b/backend/app/services/cloud_run_dispatch.py index a9f9bc2..a7a4436 100644 --- a/backend/app/services/cloud_run_dispatch.py +++ b/backend/app/services/cloud_run_dispatch.py @@ -80,7 +80,10 @@ def _celery_fallback(task: str, job_id: str, **extra_args) -> str: ingest_and_ai_task.delay(job_id) elif task == "translate": from ..tasks.translate_and_synthesize import translate_and_synthesize_task - translate_and_synthesize_task.delay(job_id) + _langs = extra_args.get("languages") + if isinstance(_langs, str): + _langs = [l for l in _langs.split(",") if l] + translate_and_synthesize_task.delay(job_id, languages=_langs or None) elif task == "render": from ..tasks.render_accessible_video import render_accessible_video_task render_accessible_video_task.delay(job_id, extra_args.get("language", "en")) diff --git a/backend/app/services/gemini.py b/backend/app/services/gemini.py index 20aaa10..a17f74b 100644 --- a/backend/app/services/gemini.py +++ b/backend/app/services/gemini.py @@ -830,6 +830,7 @@ JSON: target_language: str, source_language: str = "en", glossary_block: str | None = None, + style: str = "literal", _cost_ctx: dict | None = None, ) -> str: """ @@ -839,7 +840,8 @@ JSON: 1. Send only the text cues (no timestamps) to Gemini as a numbered list 2. Apply translated texts back onto the original VTT using translate_preserving_timing() - This avoids any possibility of Gemini drifting or altering timestamps. + style="literal" — direct translation preserving meaning exactly + style="transcreate" — culturally adapted but still returns EXACTLY N cues 1:1 """ from ..lib.vtt import VTTEditor, VTTParser @@ -850,6 +852,13 @@ JSON: cue_count = len(source_cues) + _style_instruction = ( + "- Culturally adapt the text for {tgt} audiences (brand voice, natural phrasing), " + "while keeping accessibility intent and line length (~32–40 chars)\n" + if style == "transcreate" + else "" + ) + async def _attempt_translation(extra_instruction: str = "") -> list[str]: numbered_texts = "\n".join( f"{i + 1}. {cue.text.replace(chr(10), ' ')}" @@ -859,13 +868,14 @@ JSON: _tgt_label = locale_lib.get_gemini_label(target_language) _glossary_section = self._build_glossary_block(glossary_block) _glossary_line = f"\n\n{_glossary_section}" if _glossary_section else "" + _adapt_line = _style_instruction.format(tgt=_tgt_label) if style == "transcreate" else "" prompt = f"""Translate the following {cue_count} numbered text segments from {_src_label} to {_tgt_label}. REQUIREMENTS: - Return EXACTLY {cue_count} numbered lines, one translation per line - Format: "1. translated text", "2. translated text", etc. - Preserve speaker labels like [Speaker 1]: unchanged -- Use natural, idiomatic {_tgt_label} +- {_adapt_line}Use natural, idiomatic {_tgt_label} - Do NOT add any explanation, preamble, or extra lines{extra_instruction}{_glossary_line} Segments to translate: diff --git a/backend/app/services/language_qc.py b/backend/app/services/language_qc.py index 6d0e2dd..602e059 100644 --- a/backend/app/services/language_qc.py +++ b/backend/app/services/language_qc.py @@ -692,6 +692,41 @@ async def approve_language( logger.exception("Failed to send approval emails") refreshed = await db[_JOBS].find_one({"_id": job_id}) + + # When the source language is approved, dispatch translation for any target + # languages that don't have VTTs yet (EN-first gate). + source_lang = (refreshed.get("source") or {}).get("language", "en") + if lang == source_lang: + target_langs = [lg for lg in _job_languages(refreshed) if lg != source_lang] + if target_langs: + outputs = refreshed.get("outputs") or {} + untranslated = [lg for lg in target_langs if not (outputs.get(lg) or {}).get("captions_vtt_gcs")] + if untranslated: + try: + from ..services.cloud_run_dispatch import dispatch as _cr_dispatch + await db[_JOBS].update_one( + {"_id": job_id}, + { + "$set": { + "status": JobStatus.TRANSLATING.value, + "updated_at": datetime.utcnow(), + }, + "$push": { + "review.history": { + "at": datetime.utcnow(), + "status": JobStatus.TRANSLATING.value, + "by": "system", + "notes": f"EN approved — dispatching translation for {untranslated}", + } + }, + }, + ) + await _cr_dispatch("translate", job_id, languages=untranslated) + logger.info(f"Job {job_id}: EN approved, dispatched translation for {untranslated}") + return LanguageQCState(**updated_state) + except Exception as exc: + logger.error(f"Job {job_id}: failed to dispatch translation after EN approval: {exc}") + await _maybe_advance_job(db, refreshed) return LanguageQCState(**updated_state) diff --git a/backend/app/tasks/ingest_and_ai.py b/backend/app/tasks/ingest_and_ai.py index 1a78715..b31c33b 100644 --- a/backend/app/tasks/ingest_and_ai.py +++ b/backend/app/tasks/ingest_and_ai.py @@ -249,14 +249,14 @@ async def ingest_and_ai_task_impl(job_id: str): if transcript_gcs_uri: source_lang_output["descriptive_transcript_gcs"] = transcript_gcs_uri - # Update job with AI results, detected language, and outputs - # Set status to TRANSLATING to trigger translation pipeline before QC + # Update job with AI results and move to QC for EN approval. + # Translation pipeline is triggered only after EN QC is approved. await db.jobs.update_one( {"_id": job_id}, { "$set": { - "status": JobStatus.TRANSLATING.value, - "source.language": source_language, # Update with detected language + "status": JobStatus.PENDING_QC.value, + "source.language": source_language, "source.detected_language": detected_language, "ai.ingestion_json": ai_result, "ai.confidence": ai_result["confidence"], @@ -266,26 +266,21 @@ async def ingest_and_ai_task_impl(job_id: str): "$push": { "review.history": { "at": datetime.utcnow(), - "status": JobStatus.TRANSLATING.value, + "status": JobStatus.PENDING_QC.value, "by": "system" } } } ) - # Broadcast status update broadcast_status_update( job_id, - JobStatus.TRANSLATING.value, + JobStatus.PENDING_QC.value, job_title=job_title, - message=f"{job_title} AI processing complete, starting translation pipeline" + message=f"{job_title} AI processing complete — awaiting EN approval before translation" ) - logger.info(f"AI processing complete for job {job_id}, triggering translation pipeline") - - # Trigger translation and synthesis pipeline via Cloud Run - from ..services.cloud_run_dispatch import dispatch as _cr_dispatch - await _cr_dispatch("translate", job_id) + logger.info(f"AI processing complete for job {job_id}, moved to PENDING_QC for EN review") finally: # Clean up temp file diff --git a/backend/app/tasks/translate_and_synthesize.py b/backend/app/tasks/translate_and_synthesize.py index ec0cbb5..abe6f80 100644 --- a/backend/app/tasks/translate_and_synthesize.py +++ b/backend/app/tasks/translate_and_synthesize.py @@ -1,7 +1,5 @@ import asyncio -import os import random -import tempfile from datetime import datetime from typing import Any @@ -20,9 +18,6 @@ from ._websocket_bridge import broadcast_status_update logger = get_logger(__name__) -# Maximum concurrent video-native translations (Gemini API rate limiting) -MAX_CONCURRENT_VIDEO_NATIVE = 3 - async def retry_with_backoff(func, max_retries=3, base_delay=1): """Retry a function with exponential backoff""" @@ -166,305 +161,121 @@ async def _async_translate_and_synthesize(job_id: str, languages: list[str] | No } ) - # Get translation mode (default to "traditional" for backwards compatibility) - translation_mode = job_doc["requested_outputs"].get("translation_mode", "traditional") - # Glossary: lazy-loaded per target language during the loop from ..services.glossary_service import get_glossary_block_for_job - logger.info(f"Translation mode for job {job_id}: {translation_mode}") sdh_requested = job_doc["requested_outputs"].get("sdh_vtt", False) - # Get source language VTT content (needed for traditional mode) + # Source language VTT is the EN master for all translations source_outputs = job_doc["outputs"].get(source_language) if not source_outputs: raise ValueError(f"No outputs found for source language {source_language}") - # Process each requested language (filtered to specific list when retranslating) + captions_blob_path = source_outputs["captions_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") + ad_blob_path = source_outputs["ad_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") + source_captions_vtt = gcs_service.bucket.blob(captions_blob_path).download_as_text() + source_ad_vtt = gcs_service.bucket.blob(ad_blob_path).download_as_text() + + source_sdh_vtt = None + if sdh_requested and source_outputs.get("sdh_captions_vtt_gcs"): + sdh_blob_path = source_outputs["sdh_captions_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") + source_sdh_vtt = gcs_service.bucket.blob(sdh_blob_path).download_as_text() + + # Process each requested language (filtered when retranslating a subset) requested_languages = job_doc["requested_outputs"]["languages"] if languages is not None: - requested_languages = [l for l in requested_languages if l in languages] + requested_languages = [lg for lg in requested_languages if lg in languages] transcreation_languages = job_doc["requested_outputs"]["transcreation"] updated_outputs = job_doc.get("outputs", {}) - - # For video_native mode, download source video once before the loop - video_local_path = None - source_captions_vtt = None - source_ad_vtt = None - - if translation_mode == "video_native": - # Download source video from GCS for re-processing - source_gcs_uri = job_doc["source"]["gcs_uri"] - source_blob_path = source_gcs_uri.replace(f"gs://{settings.gcs_bucket}/", "") - source_blob = gcs_service.bucket.blob(source_blob_path) - - # Create temp file for video - with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_file: - source_blob.download_to_filename(tmp_file.name) - video_local_path = tmp_file.name - logger.info(f"Downloaded source video for video_native processing: {video_local_path}") - else: - # Traditional mode: download source VTT files - captions_blob_path = source_outputs["captions_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") - ad_blob_path = source_outputs["ad_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") - - captions_blob = gcs_service.bucket.blob(captions_blob_path) - ad_blob = gcs_service.bucket.blob(ad_blob_path) - - source_captions_vtt = captions_blob.download_as_text() - source_ad_vtt = ad_blob.download_as_text() - - # Download source SDH VTT for traditional-mode translation - source_sdh_vtt = None - if sdh_requested and source_outputs.get("sdh_captions_vtt_gcs"): - sdh_blob_path = source_outputs["sdh_captions_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") - source_sdh_vtt = gcs_service.bucket.blob(sdh_blob_path).download_as_text() + _source_text_for_glossary = " ".join(filter(None, [source_captions_vtt, source_ad_vtt])) try: - # Get target languages (exclude source) target_languages = [lang for lang in requested_languages if lang != source_language] - if translation_mode == "video_native": - # VIDEO NATIVE MODE: Process all languages in parallel with rate limiting - # This generates VTTs from scratch with visual context for each language - # Note: Transcreation is NOT applicable - video_native replaces it + for language in target_languages: + _style = "transcreate" if language in transcreation_languages else "literal" + logger.info(f"Processing language: {language} (source: {source_language}, style: {_style})") - semaphore = asyncio.Semaphore(MAX_CONCURRENT_VIDEO_NATIVE) + await cost_tracker.aio_preflight( + model=gemini_service.model_name, + user_external_id=_cost_ctx["user_id"], + project_id=_cost_ctx["project_id"], + ) - job_brand_context = job_doc.get("brand_context") + _job_for_glossary = {**job_doc, "_glossary_source_text": _source_text_for_glossary} + _glossary = await get_glossary_block_for_job(_job_for_glossary, language, db) - async def translate_language_video_native(lang: str) -> tuple[str, str, str, str | None]: - """Process a single language with video-native translation. - Returns: (language, captions_gcs_uri, ad_gcs_uri, error_message or None) - """ - async with semaphore: - logger.info(f"Starting video-native translation for {lang} (from source: {source_language})") - try: - await cost_tracker.aio_preflight( - model=gemini_service.model_name, - user_external_id=_cost_ctx["user_id"], - project_id=_cost_ctx["project_id"], - ) + try: + async def translate_captions(_lang=language, _gloss=_glossary, _s=_style): + return await gemini_service.translate_vtt( + source_captions_vtt, _lang, source_language=source_language, + glossary_block=_gloss, style=_s, _cost_ctx=_cost_ctx, + ) - # Build glossary block from source VTT for this language - _job_for_glossary = {**job_doc, "_glossary_source_text": ""} - _glossary = await get_glossary_block_for_job(_job_for_glossary, lang, db) + async def translate_ad(_lang=language, _gloss=_glossary, _s=_style): + return await gemini_service.translate_vtt( + source_ad_vtt, _lang, source_language=source_language, + glossary_block=_gloss, style=_s, _cost_ctx=_cost_ctx, + ) - async def extract_targeted(): - return await gemini_service.extract_accessibility_targeted( - video_local_path, - lang, - brand_context=job_brand_context, - sdh_requested=sdh_requested, - glossary_block=_glossary, - _cost_ctx=_cost_ctx, - ) + translated_captions = await retry_with_backoff(translate_captions, max_retries=3) + translated_ad = await retry_with_backoff(translate_ad, max_retries=3) - result = await retry_with_backoff(extract_targeted, max_retries=3) - translated_captions = result["captions_vtt"] - translated_ad = result["audio_description_vtt"] + # Validate cue alignment against EN master + from ..lib.vtt import VTTEditor + VTTEditor.assert_cue_alignment(source_captions_vtt, translated_captions, language) + VTTEditor.assert_cue_alignment(source_ad_vtt, translated_ad, language) - # Upload translated VTT files - captions_gcs_uri = await upload_vtt_to_gcs( - translated_captions, - gcs_path(job_doc, lang, "captions.vtt") - ) - - ad_gcs_uri = await upload_vtt_to_gcs( - translated_ad, - gcs_path(job_doc, lang, "ad.vtt") - ) - - # Upload SDH VTT if generated - sdh_gcs_uri = None - if sdh_requested and result.get("sdh_captions_vtt"): - sdh_gcs_uri = await upload_vtt_to_gcs( - result["sdh_captions_vtt"], - gcs_path(job_doc, lang, "sdh_captions.vtt") - ) - - # Generate descriptive transcript (WCAG 2.1 1.2.1) - transcript_gcs_uri = None - try: - from ..services.descriptive_transcript import ( - generate_descriptive_transcript, - ) - transcript_text = generate_descriptive_transcript(translated_captions, translated_ad) - if transcript_text: - transcript_gcs_uri = await upload_vtt_to_gcs( - transcript_text, - gcs_path(job_doc, lang, "descriptive_transcript.txt") - ) - except Exception as transcript_err: - logger.warning(f"Failed to generate descriptive transcript for {lang}: {transcript_err}") - - logger.info(f"Completed video-native translation for {lang}") - return (lang, captions_gcs_uri, ad_gcs_uri, sdh_gcs_uri, transcript_gcs_uri, None) - - except Exception as e: - logger.error(f"Video-native translation failed for {lang}: {e}") - return (lang, None, None, None, None, str(e)) - - # Run all translations in parallel (limited by semaphore) - if target_languages: - logger.info(f"Starting parallel video-native translations for {len(target_languages)} languages: {target_languages}") - tasks = [translate_language_video_native(lang) for lang in target_languages] - results = await asyncio.gather(*tasks, return_exceptions=True) - - # Process results - for i, result in enumerate(results): - lang = target_languages[i] - if isinstance(result, Exception): - # Unexpected exception from gather - logger.error(f"Unexpected error for {lang}: {result}") - updated_outputs[lang] = { - "origin": "video_native", - "qa_notes": f"Translation failed: {str(result)}" - } - else: - lang, captions_uri, ad_uri, sdh_uri, transcript_uri, error_msg = result - if error_msg: - updated_outputs[lang] = { - "origin": "video_native", - "qa_notes": f"Translation failed: {error_msg}" - } - else: - lang_out = { - "captions_vtt_gcs": captions_uri, - "ad_vtt_gcs": ad_uri, - "origin": "video_native" - } - if sdh_uri: - lang_out["sdh_captions_vtt_gcs"] = sdh_uri - if transcript_uri: - lang_out["descriptive_transcript_gcs"] = transcript_uri - updated_outputs[lang] = lang_out - logger.info(f"Successfully processed VTT files for language: {lang} (origin: video_native)") - - else: - # Combine source VTTs for glossary term matching - _source_text_for_glossary = " ".join(filter(None, [source_captions_vtt, source_ad_vtt])) - - # TRADITIONAL MODE: Process languages sequentially - for language in target_languages: - logger.info(f"Processing language: {language} (from source: {source_language}, mode: {translation_mode})") - - await cost_tracker.aio_preflight( - model=gemini_service.model_name, - user_external_id=_cost_ctx["user_id"], - project_id=_cost_ctx["project_id"], + captions_gcs_uri = await upload_vtt_to_gcs( + translated_captions, gcs_path(job_doc, language, "captions.vtt") + ) + ad_gcs_uri = await upload_vtt_to_gcs( + translated_ad, gcs_path(job_doc, language, "ad.vtt") ) - # Lookup glossary terms for this target language - _job_for_glossary = {**job_doc, "_glossary_source_text": _source_text_for_glossary} - _glossary = await get_glossary_block_for_job(_job_for_glossary, language, db) + origin = "transcreate" if _style == "transcreate" else "gemini_translate" + lang_out: dict = { + "captions_vtt_gcs": captions_gcs_uri, + "ad_vtt_gcs": ad_gcs_uri, + "origin": origin, + } + + if sdh_requested and source_sdh_vtt: + async def translate_sdh(_lang=language, _gloss=_glossary, _s=_style): + return await gemini_service.translate_vtt( + source_sdh_vtt, _lang, source_language=source_language, + glossary_block=_gloss, style=_s, _cost_ctx=_cost_ctx, + ) + translated_sdh = await retry_with_backoff(translate_sdh, max_retries=3) + sdh_gcs_uri = await upload_vtt_to_gcs( + translated_sdh, gcs_path(job_doc, language, "sdh_captions.vtt") + ) + lang_out["sdh_captions_vtt_gcs"] = sdh_gcs_uri try: - if language in transcreation_languages: - # TRADITIONAL MODE with transcreation: cultural adaptation - async def transcreate(_lang=language, _gloss=_glossary): - return await gemini_service.transcreate_content( - source_captions_vtt, - source_ad_vtt, - _lang, - brief="Standard accessibility content", - glossary_block=_gloss, - _cost_ctx=_cost_ctx, - ) - - result = await retry_with_backoff(transcreate, max_retries=3) - translated_captions = result["captions_vtt"] - translated_ad = result["audio_description_vtt"] - origin = "transcreate" - - else: - # TRADITIONAL MODE: Use Gemini translation (6-36x cheaper than Google Translate API) - async def translate_captions(_lang=language, _gloss=_glossary): - return await gemini_service.translate_vtt( - source_captions_vtt, _lang, source_language=source_language, - glossary_block=_gloss, - _cost_ctx=_cost_ctx, - ) - - async def translate_ad(_lang=language, _gloss=_glossary): - return await gemini_service.translate_vtt( - source_ad_vtt, _lang, source_language=source_language, - glossary_block=_gloss, - _cost_ctx=_cost_ctx, - ) - - translated_captions = await retry_with_backoff(translate_captions, max_retries=3) - translated_ad = await retry_with_backoff(translate_ad, max_retries=3) - origin = "gemini_translate" - - # Upload translated VTT files - captions_gcs_uri = await upload_vtt_to_gcs( - translated_captions, - gcs_path(job_doc, language, "captions.vtt") - ) - - ad_gcs_uri = await upload_vtt_to_gcs( - translated_ad, - gcs_path(job_doc, language, "ad.vtt") - ) - - # Translate and upload SDH VTT if requested - lang_out: dict = { - "captions_vtt_gcs": captions_gcs_uri, - "ad_vtt_gcs": ad_gcs_uri, - "origin": origin - } - if sdh_requested and source_sdh_vtt: - async def translate_sdh(_lang=language, _gloss=_glossary): - return await gemini_service.translate_vtt( - source_sdh_vtt, _lang, source_language=source_language, - glossary_block=_gloss, - _cost_ctx=_cost_ctx, - ) - translated_sdh = await retry_with_backoff(translate_sdh, max_retries=3) - sdh_gcs_uri = await upload_vtt_to_gcs( - translated_sdh, - gcs_path(job_doc, language, "sdh_captions.vtt") + from ..services.descriptive_transcript import generate_descriptive_transcript + transcript_text = generate_descriptive_transcript(translated_captions, translated_ad) + if transcript_text: + transcript_gcs_uri = await upload_vtt_to_gcs( + transcript_text, gcs_path(job_doc, language, "descriptive_transcript.txt") ) - lang_out["sdh_captions_vtt_gcs"] = sdh_gcs_uri + lang_out["descriptive_transcript_gcs"] = transcript_gcs_uri + except Exception as transcript_err: + logger.warning(f"Failed to generate descriptive transcript for {language}: {transcript_err}") - # Generate descriptive transcript (WCAG 2.1 1.2.1) - try: - from ..services.descriptive_transcript import ( - generate_descriptive_transcript, - ) - transcript_text = generate_descriptive_transcript(translated_captions, translated_ad) - if transcript_text: - transcript_gcs_uri = await upload_vtt_to_gcs( - transcript_text, - gcs_path(job_doc, language, "descriptive_transcript.txt") - ) - lang_out["descriptive_transcript_gcs"] = transcript_gcs_uri - except Exception as transcript_err: - logger.warning(f"Failed to generate descriptive transcript for {language}: {transcript_err}") + updated_outputs[language] = lang_out + logger.info(f"Processed language: {language} (origin: {origin})") - # Store language outputs - updated_outputs[language] = lang_out - - logger.info(f"Successfully processed VTT files for language: {language} (origin: {origin})") - - except Exception as e: - logger.error(f"Failed to process language {language}: {e}") - fallback_origin = "transcreate" if language in transcreation_languages else "gemini_translate" - updated_outputs[language] = { - "origin": fallback_origin, - "qa_notes": f"Translation failed: {str(e)}" - } + except Exception as e: + logger.error(f"Failed to process language {language}: {e}") + updated_outputs[language] = { + "origin": "transcreate" if _style == "transcreate" else "gemini_translate", + "qa_notes": f"Translation failed: {str(e)}", + } finally: - # Cleanup temporary video file if created - if video_local_path: - try: - os.unlink(video_local_path) - logger.info(f"Cleaned up temporary video file: {video_local_path}") - except Exception as e: - logger.warning(f"Failed to cleanup temp video file: {e}") + pass # Update status to TTS generating await db.jobs.update_one( @@ -615,7 +426,7 @@ async def _async_translate_and_synthesize(job_id: str, languages: list[str] | No # Broadcast failure status broadcast_status_update( job_id, - failure_status, + JobStatus.PROCESSING_FAILED.value, job_title=job_title, message=f"Processing failed: {str(e)[:100]}" ) diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index f8123d2..9aa5045 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -302,6 +302,11 @@ class ApiClient { return response.data; } + async retranslateLanguage(id: string, language: string, reason?: string): Promise { + const response = await this.client.post(`/jobs/${id}/retranslate-language`, { language, reason }); + return response.data; + } + async validateJobAssets(id: string): Promise { const response = await this.client.get(`/jobs/${id}/validate`); return response.data; diff --git a/frontend/src/routes/admin/QCDetail.tsx b/frontend/src/routes/admin/QCDetail.tsx index f09edff..e201f90 100644 --- a/frontend/src/routes/admin/QCDetail.tsx +++ b/frontend/src/routes/admin/QCDetail.tsx @@ -53,8 +53,11 @@ export function QCDetail() { // Get source language from job (default to 'en' for backwards compatibility) const sourceLanguage = job?.source?.language || 'en'; - // Get all available languages from outputs (after workflow change, translations happen before QC) - const availableLanguages = job?.outputs ? Object.keys(job.outputs) : [sourceLanguage]; + // All languages for QC: source always first, then requested targets + const availableLanguages = [ + sourceLanguage, + ...((job?.requested_outputs?.languages ?? []).filter(l => l !== sourceLanguage)), + ].filter((l, i, a) => a.indexOf(l) === i); // Language selection for QC review const [selectedLanguage, setSelectedLanguage] = useState(sourceLanguage); @@ -115,6 +118,7 @@ export function QCDetail() { refetchInterval: 15_000, }); const langQcMap = langQcData?.language_qc ?? {}; + const isSourceApproved = (langQcMap[sourceLanguage]?.status ?? 'pending') === 'approved'; // Glossary terms for inline highlighting const clientId = job?.client_id; @@ -155,6 +159,11 @@ export function QCDetail() { const [openCommentLang, setOpenCommentLang] = useState(null); const [commentDraft, setCommentDraft] = useState(''); + // Per-language retranslate modal + const [retranslateLang, setRetranslateLang] = useState(null); + const [retranslateReason, setRetranslateReason] = useState(''); + const [retranslateLoading, setRetranslateLoading] = useState(false); + const canAssign = authUser?.role === 'project_manager' || authUser?.role === 'production' || authUser?.role === 'admin'; const canApproveAll = authUser?.role === 'production' || authUser?.role === 'admin'; @@ -1080,13 +1089,24 @@ export function QCDetail() { onClick={() => setSelectedLanguage(lang)} className="w-full flex items-center justify-between px-4 py-3 text-left" > -
+
{LANG_QC_ICON[qcStatus]} {lang.toUpperCase()} {lang === sourceLanguage && (source)} {LANG_QC_LABEL[qcStatus]} + {lang !== sourceLanguage && (() => { + const origin = job?.outputs?.[lang]?.origin; + if (!origin) return null; + const cfg: Record = { + gemini_translate: { cls: 'bg-green-100 text-green-700', label: 'EN-aligned' }, + transcreate: { cls: 'bg-green-100 text-green-700', label: 'transcreated' }, + video_native: { cls: 'bg-red-100 text-red-700', label: '⚠ video-native' }, + }; + const c = cfg[origin] ?? { cls: 'bg-gray-100 text-gray-500', label: origin }; + return {c.label}; + })()}
{isActive ? '▲' : '▼'} @@ -1139,6 +1159,13 @@ export function QCDetail() {
+ {/* EN-first gate banner for target languages */} + {lang !== sourceLanguage && !isSourceApproved && ( +
+ ⏳ Waiting for {sourceLanguage.toUpperCase()} approval — translation will start automatically once the source language is approved. +
+ )} + {/* Workflow action buttons */}
{canStartWork && ( @@ -1209,6 +1236,16 @@ export function QCDetail() { Reopen )} + {canApproveAll && lang !== sourceLanguage && ( + + )}
{/* Comments toggle */} @@ -1485,6 +1522,58 @@ export function QCDetail() { )} + {/* Per-language retranslate confirmation modal */} + {retranslateLang && ( +
+
+

Re-translate {retranslateLang.toUpperCase()} from EN master?

+

+ This will discard the current {retranslateLang.toUpperCase()} translation, + reset linguist progress for this language, and regenerate it from the approved English master. +

+
+ +