diff --git a/backend/app/api/v1/routes_jobs.py b/backend/app/api/v1/routes_jobs.py index db01ba0..fc943bf 100644 --- a/backend/app/api/v1/routes_jobs.py +++ b/backend/app/api/v1/routes_jobs.py @@ -698,6 +698,27 @@ async def get_job_downloads( except Exception as e: logger.warning(f"Failed to generate signed URL for AD MP3 {language}: {e}") + # Accessible Video MP4 + if "accessible_video_gcs" in lang_output: + blob_path = lang_output["accessible_video_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") + try: + signed_url = await get_signed_download_url(blob_path, 24) + lang_downloads["accessible_video_mp4"] = signed_url + # Include method info if available + if "accessible_video_method" in lang_output: + lang_downloads["accessible_video_method"] = lang_output["accessible_video_method"] + except Exception as e: + logger.warning(f"Failed to generate signed URL for accessible video {language}: {e}") + + # Re-timed Captions VTT (for pause-insert accessible videos) + if "retimed_captions_vtt_gcs" in lang_output: + blob_path = lang_output["retimed_captions_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") + try: + signed_url = await get_signed_download_url(blob_path, 24) + lang_downloads["accessible_captions_vtt"] = signed_url + except Exception as e: + logger.warning(f"Failed to generate signed URL for retimed captions {language}: {e}") + if lang_downloads: downloads[language] = lang_downloads diff --git a/backend/app/models/job.py b/backend/app/models/job.py index 5020042..c6b6857 100644 --- a/backend/app/models/job.py +++ b/backend/app/models/job.py @@ -53,6 +53,7 @@ class RequestedOutputs(BaseModel): captions_vtt: bool = True audio_description_vtt: bool = True audio_description_mp3: bool = True + accessible_video_mp4: bool = False # Rendered video with embedded audio descriptions languages: list[str] = [] transcreation: list[str] = [] tts_preferences: Optional[TTSPreferences] = None @@ -62,6 +63,11 @@ class LangOutput(BaseModel): captions_vtt_gcs: Optional[str] = None ad_vtt_gcs: Optional[str] = None ad_mp3_gcs: Optional[str] = None + # Accessible video outputs + accessible_video_gcs: Optional[str] = None # Rendered accessible MP4 + accessible_video_method: Optional[Literal["overlay", "pause_insert"]] = None + retimed_captions_vtt_gcs: Optional[str] = None # Re-timed captions for pause-insert method + ad_cues_gcs_prefix: Optional[str] = None # GCS path prefix for per-cue MP3 segments origin: Optional[Literal["translate", "transcreate"]] = None qa_notes: Optional[str] = None @@ -84,6 +90,15 @@ class AISection(BaseModel): confidence: Optional[float] = None +class AccessibleVideoProgressItem(BaseModel): + """Progress tracking for accessible video rendering per language.""" + status: Literal["pending", "rendering", "completed", "failed"] = "pending" + method: Optional[Literal["overlay", "pause_insert"]] = None + error_message: Optional[str] = None + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + + class Job(BaseModel): id: Optional[str] = Field(None, alias="_id") client_id: str @@ -93,6 +108,7 @@ class Job(BaseModel): status: JobStatus = JobStatus.CREATED review: Review = Review() outputs: Optional[dict[str, LangOutput]] = None + accessible_video_progress: Optional[dict[str, AccessibleVideoProgressItem]] = None ai: Optional[AISection] = None error: Optional[dict[str, Any]] = None created_at: Optional[datetime] = None diff --git a/backend/app/prompts/gemini_accessible_video.md b/backend/app/prompts/gemini_accessible_video.md new file mode 100644 index 0000000..86c57af --- /dev/null +++ b/backend/app/prompts/gemini_accessible_video.md @@ -0,0 +1,160 @@ +SYSTEM: +You are an expert accessible media engineer specializing in audio description integration for video content. Your task is to analyze a video and determine the optimal method for integrating pre-written audio description content. Produce STRICT JSON only. + +USER: +You are given: +1. A video file +2. Audio description (AD) VTT content with timed cues +3. The actual TTS audio duration for each AD cue (in seconds) + +Your task is to analyze the video and determine the optimal method and precise placement for integrating the audio descriptions. + +METHODS: +1. **OVERLAY** - Use when video has minimal spoken dialogue or natural gaps + - Audio descriptions play over the original audio with ducking (reduced volume) + - Original video duration is preserved + - Best for: music-only videos, documentaries with narration gaps, videos with ambient sound + +2. **PAUSE_INSERT** - Use when video has significant spoken dialogue + - Video pauses (freeze-frame) during AD playback + - Original content is not obscured by AD + - Best for: dialogue-heavy content, interviews, instructional videos with continuous speech + +ANALYSIS STEPS: +1. Detect dialogue presence and density throughout the video +2. For each AD cue, determine if it can fit in existing audio gaps (for overlay) or needs pause insertion +3. If >30% of AD cues would significantly overlap dialogue, use PAUSE_INSERT +4. For PAUSE_INSERT: identify natural break points (scene transitions, sentence endings, breaths - NOT mid-word) +5. For OVERLAY: calculate duck timing windows that start slightly before AD and end slightly after + +INPUT DATA: + +AD VTT Content: +{AD_VTT_CONTENT} + +AD Cue Durations (in seconds, matching VTT cue order): +{AD_CUE_DURATIONS} + +OUTPUT FORMAT: +Return a JSON object with the following structure: + +```json +{ + "method": "overlay" | "pause_insert", + "method_rationale": "Clear explanation of why this method was chosen based on video analysis", + "dialogue_density": 0.0-1.0, + "placements": [ + { + "ad_cue_index": 0, + "original_start_time": 5.0, + "original_end_time": 8.0, + "target_start_time": 5.0, + "ad_duration": 3.5, + "pause_point": null, + "duck_start": 4.5, + "duck_end": 9.0 + } + ], + "total_added_duration": 0.0, + "warnings": [] +} +``` + +FIELD DESCRIPTIONS: +- method: The chosen integration method ("overlay" or "pause_insert") +- method_rationale: 1-2 sentences explaining why this method is optimal for this video +- dialogue_density: Score from 0.0 (no dialogue) to 1.0 (continuous dialogue) +- placements: Array of placement instructions for each AD cue: + - ad_cue_index: Index of the AD cue (0-based, matching VTT order) + - original_start_time: Start time from the AD VTT (in seconds) + - original_end_time: End time from the AD VTT (in seconds) + - target_start_time: Where to place the AD in the final video (in seconds) + - For overlay: usually same as original_start_time + - For pause_insert: accounts for cumulative pause durations + - ad_duration: The TTS audio duration for this cue (provided in input) + - pause_point: (pause_insert only) Where to insert freeze-frame in source video (in seconds) + - duck_start: (overlay only) When to begin reducing original audio volume (seconds) + - duck_end: (overlay only) When to restore original audio volume (seconds) +- total_added_duration: Sum of all pause durations (0 for overlay method) +- warnings: List any potential issues (e.g., "Cue 3 may overlap with quick dialogue") + +CONSTRAINTS: +- Output MUST be valid JSON. Do not include markdown fences or any other text. +- All JSON strings must be properly escaped +- For pause_insert method: + - pause_point MUST be at a natural break (scene change, breath, sentence end) + - NEVER pause mid-word or mid-sentence when someone is speaking + - Ensure smooth visual flow - prefer pausing on held shots rather than during motion + - Calculate target_start_time accounting for all previous pauses +- For overlay method: + - duck_start should begin 0.2-0.5 seconds before AD starts + - duck_end should extend 0.2-0.3 seconds after AD ends for smooth transitions + - Avoid ducking during critical dialogue moments +- Validate that all timestamps are logically consistent +- If a cue cannot fit cleanly with overlay, the entire video should use pause_insert + +CRITICAL: Return ONLY valid JSON that can be parsed by JSON.parse(). No additional text. + +Example for OVERLAY method: +{ + "method": "overlay", + "method_rationale": "Video contains primarily ambient music and sound effects with minimal dialogue, allowing AD to be overlaid with audio ducking.", + "dialogue_density": 0.15, + "placements": [ + { + "ad_cue_index": 0, + "original_start_time": 2.0, + "original_end_time": 5.0, + "target_start_time": 2.0, + "ad_duration": 2.8, + "pause_point": null, + "duck_start": 1.7, + "duck_end": 5.3 + }, + { + "ad_cue_index": 1, + "original_start_time": 10.0, + "original_end_time": 14.0, + "target_start_time": 10.0, + "ad_duration": 3.5, + "pause_point": null, + "duck_start": 9.7, + "duck_end": 14.3 + } + ], + "total_added_duration": 0.0, + "warnings": [] +} + +Example for PAUSE_INSERT method: +{ + "method": "pause_insert", + "method_rationale": "Video contains continuous dialogue throughout, requiring pauses to insert AD without obscuring speech.", + "dialogue_density": 0.85, + "placements": [ + { + "ad_cue_index": 0, + "original_start_time": 5.0, + "original_end_time": 8.0, + "target_start_time": 5.0, + "ad_duration": 3.2, + "pause_point": 5.0, + "duck_start": null, + "duck_end": null + }, + { + "ad_cue_index": 1, + "original_start_time": 15.0, + "original_end_time": 18.0, + "target_start_time": 18.2, + "ad_duration": 2.8, + "pause_point": 15.0, + "duck_start": null, + "duck_end": null + } + ], + "total_added_duration": 6.0, + "warnings": ["Cue 1 pause point adjusted from 15.0s to natural breath at 15.0s"] +} + +Follow this exact structure and formatting. diff --git a/backend/app/schemas/accessible_video.py b/backend/app/schemas/accessible_video.py new file mode 100644 index 0000000..fd21bbb --- /dev/null +++ b/backend/app/schemas/accessible_video.py @@ -0,0 +1,95 @@ +"""Schemas for accessible video generation with embedded audio descriptions.""" + +from enum import Enum +from typing import Optional + +from pydantic import BaseModel, Field + + +class AccessibleVideoMethod(str, Enum): + """Method used for integrating audio descriptions into video.""" + OVERLAY = "overlay" + PAUSE_INSERT = "pause_insert" + + +class ADPlacementCue(BaseModel): + """Placement instruction for a single audio description cue from Gemini analysis.""" + ad_cue_index: int = Field(..., description="Index of the AD cue in the VTT (0-based)") + original_start_time: float = Field(..., description="Original VTT start time in seconds") + original_end_time: float = Field(..., description="Original VTT end time in seconds") + target_start_time: float = Field(..., description="Target time in output video (seconds)") + ad_duration: float = Field(..., description="Duration of the AD TTS audio in seconds") + # For pause-insert method + pause_point: Optional[float] = Field( + None, + description="Where to insert freeze-frame in source video (seconds). Used for pause-insert method." + ) + # For overlay method + duck_start: Optional[float] = Field( + None, + description="When to start ducking original audio (seconds). Used for overlay method." + ) + duck_end: Optional[float] = Field( + None, + description="When to end ducking original audio (seconds). Used for overlay method." + ) + + +class GeminiAccessibleVideoAnalysis(BaseModel): + """Response schema for Gemini accessible video analysis. + + This model captures the AI's determination of the optimal method + for integrating audio descriptions and the specific placement + instructions for each AD cue. + """ + method: AccessibleVideoMethod = Field( + ..., + description="Chosen method: overlay (duck audio) or pause_insert (freeze-frame)" + ) + method_rationale: str = Field( + ..., + description="Explanation of why this method was chosen based on video analysis" + ) + dialogue_density: float = Field( + ..., + ge=0, + le=1, + description="Score from 0-1 indicating how much dialogue/speech is in the video" + ) + placements: list[ADPlacementCue] = Field( + ..., + description="Placement instructions for each AD cue" + ) + total_added_duration: float = Field( + default=0, + description="Total pause time added to video (pause-insert method only, in seconds)" + ) + warnings: list[str] = Field( + default_factory=list, + description="Any potential issues or concerns detected during analysis" + ) + + +class ADCueSegment(BaseModel): + """Represents a single synthesized AD cue segment.""" + cue_index: int = Field(..., description="Index of the cue (0-based)") + start_time: float = Field(..., description="Original start time from VTT") + end_time: float = Field(..., description="Original end time from VTT") + duration: float = Field(..., description="Actual TTS audio duration in seconds") + gcs_uri: str = Field(..., description="GCS URI to the individual MP3 segment") + text: str = Field(..., description="The AD text that was synthesized") + + +class AccessibleVideoRenderRequest(BaseModel): + """Request to render an accessible video for a job/language.""" + job_id: str + language: str + + +class AccessibleVideoProgress(BaseModel): + """Progress status for accessible video rendering.""" + status: str = Field(..., description="pending | rendering | completed | failed") + method: Optional[AccessibleVideoMethod] = None + error_message: Optional[str] = None + started_at: Optional[str] = None + completed_at: Optional[str] = None diff --git a/backend/app/schemas/job.py b/backend/app/schemas/job.py index 0ed9ed3..92462b7 100644 --- a/backend/app/schemas/job.py +++ b/backend/app/schemas/job.py @@ -2,7 +2,14 @@ from typing import Any, Literal, Optional, Union from pydantic import BaseModel -from ..models.job import JobStatus, LangOutput, RequestedOutputs, Review, TTSPreferences +from ..models.job import ( + AccessibleVideoProgressItem, + JobStatus, + LangOutput, + RequestedOutputs, + Review, + TTSPreferences, +) class JobResponse(BaseModel): @@ -13,6 +20,7 @@ class JobResponse(BaseModel): requested_outputs: RequestedOutputs review: Review outputs: Optional[dict[str, LangOutput]] = None + accessible_video_progress: Optional[dict[str, AccessibleVideoProgressItem]] = None created_at: Optional[str] = None updated_at: Optional[str] = None diff --git a/backend/app/services/gemini.py b/backend/app/services/gemini.py index 9f271b4..783a847 100644 --- a/backend/app/services/gemini.py +++ b/backend/app/services/gemini.py @@ -294,6 +294,161 @@ Fix the JSON and return it: logger.debug(f"JSON fix attempt failed: {e}") return None + async def analyze_accessible_video_placement( + self, + video_file_path: str, + ad_vtt_content: str, + ad_cue_durations: list[float] + ) -> dict[str, Any]: + """ + Analyze video and determine optimal method for integrating audio descriptions. + Returns placement instructions for each AD cue. + + Args: + video_file_path: Path to the source video file + ad_vtt_content: The audio description VTT content + ad_cue_durations: List of actual TTS audio durations in seconds (matching VTT cue order) + + Returns: + Dictionary with method choice and placement instructions for each AD cue + """ + prompt_template = self._load_prompt("gemini_accessible_video.md") + + # Format prompt with AD VTT content and durations + prompt = prompt_template.replace( + "{AD_VTT_CONTENT}", ad_vtt_content + ).replace( + "{AD_CUE_DURATIONS}", json.dumps(ad_cue_durations) + ) + + uploaded_file = None + + try: + logger.info(f"Starting accessible video analysis for: {video_file_path}") + logger.info(f"AD cues to place: {len(ad_cue_durations)}") + + # Upload video file to Gemini + logger.info("Uploading video file to Gemini API for accessible video analysis...") + uploaded_file = await asyncio.to_thread( + client.files.upload, + file=video_file_path, + config={ + "display_name": f"accessible_video_analysis_{Path(video_file_path).name}", + "mime_type": "video/mp4" + } + ) + logger.info(f"Successfully uploaded file: {uploaded_file.name}") + + # Wait for file to become ACTIVE + logger.info("Waiting for file to become ACTIVE...") + file_ready = await self._wait_for_file_active(uploaded_file.name) + if not file_ready: + raise Exception("File failed to become ACTIVE within timeout") + + # Generate content with video and prompt + logger.info("Analyzing video with Gemini for accessible video placement...") + response = await asyncio.to_thread( + client.models.generate_content, + model=self.model_name, + contents=[ + genai.types.Part.from_text(text=prompt), + genai.types.Part.from_uri( + file_uri=uploaded_file.uri, + mime_type=uploaded_file.mime_type + ) + ] + ) + + # Parse JSON response + response_text = response.text.strip() + logger.info(f"Received accessible video analysis response (first 300 chars): {response_text[:300]}...") + + # Handle potential markdown formatting + if response_text.startswith("```json"): + response_text = response_text.replace("```json", "").replace("```", "").strip() + + try: + result = json.loads(response_text) + except json.JSONDecodeError as e: + logger.error(f"JSON parse error in accessible video analysis: {e}") + # Try self-healing for this response + result = await self._self_heal_accessible_video_response(response_text) + + # Validate required fields + required_fields = ["method", "method_rationale", "dialogue_density", "placements"] + for field in required_fields: + if field not in result: + raise ValueError(f"Missing required field in accessible video analysis: {field}") + + # Validate method value + if result["method"] not in ["overlay", "pause_insert"]: + raise ValueError(f"Invalid method value: {result['method']}") + + # Validate placements + if len(result["placements"]) != len(ad_cue_durations): + logger.warning( + f"Placement count mismatch: got {len(result['placements'])}, " + f"expected {len(ad_cue_durations)}" + ) + + logger.info( + f"Accessible video analysis complete: method={result['method']}, " + f"dialogue_density={result['dialogue_density']:.2f}, " + f"placements={len(result['placements'])}" + ) + + return result + + except Exception as e: + logger.error(f"Accessible video analysis failed: {type(e).__name__}: {str(e)}") + raise + finally: + # Cleanup uploaded file + if uploaded_file: + try: + await asyncio.to_thread(client.files.delete, name=uploaded_file.name) + logger.info(f"Cleaned up uploaded file: {uploaded_file.name}") + except Exception as e: + logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}") + + async def _self_heal_accessible_video_response(self, invalid_response: str) -> dict[str, Any]: + """Attempt to self-heal invalid JSON response from accessible video analysis""" + logger.info("Attempting to self-heal accessible video analysis response") + + self_heal_prompt = f""" +SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON. + +CRITICAL REQUIREMENTS: +- The JSON MUST contain: method, method_rationale, dialogue_density, placements, total_added_duration, warnings +- method must be either "overlay" or "pause_insert" +- dialogue_density must be a number between 0 and 1 +- placements must be an array of placement objects +- Fix any JSON syntax errors (trailing commas, unterminated strings, etc.) + +Fix the JSON and return it: + +{invalid_response} + """ + + try: + response = await asyncio.to_thread( + client.models.generate_content, + model=self.model_name, + contents=[genai.types.Part.from_text(text=self_heal_prompt)] + ) + + response_text = response.text.strip() + if response_text.startswith("```json"): + response_text = response_text.replace("```json", "").replace("```", "").strip() + + result = json.loads(response_text) + logger.info("Successfully self-healed accessible video analysis response") + return result + + except Exception as e: + logger.error(f"Self-heal attempt for accessible video analysis failed: {e}") + raise ValueError("Failed to get valid JSON from accessible video analysis after self-heal") + async def transcreate_content( self, captions_vtt: str, diff --git a/backend/app/services/tts.py b/backend/app/services/tts.py index ebdab23..fd49528 100644 --- a/backend/app/services/tts.py +++ b/backend/app/services/tts.py @@ -1,4 +1,5 @@ import io +from dataclasses import dataclass from typing import Optional import aiohttp @@ -12,6 +13,17 @@ from .gemini_tts import gemini_tts_service logger = get_logger(__name__) +@dataclass +class TTSCueSegment: + """Represents a synthesized audio segment for a single AD cue.""" + cue_index: int + start_time: float # Original VTT start time + end_time: float # Original VTT end time + duration: float # Actual TTS audio duration in seconds + text: str # The AD text that was synthesized + audio_bytes: bytes # The raw MP3 audio bytes + + class TTSService: def __init__(self): # Check Gemini TTS availability (uses same API key as other Gemini services) @@ -96,6 +108,109 @@ class TTSService: raise ValueError("No TTS service available") + async def synthesize_audio_description_with_segments( + self, + ad_vtt_content: str, + language_code: str = "en-US", + voice_name: Optional[str] = None, + provider: Optional[str] = None, + model: str = "flash", + speed: float = 1.0, + style_prompt: str = "" + ) -> tuple[bytes, list[TTSCueSegment]]: + """ + Generate MP3 audio from audio description VTT content AND return individual segments. + Used for accessible video generation where we need per-cue audio files. + + Returns: + Tuple of (combined_mp3_bytes, list_of_cue_segments) + """ + # Determine which provider to use + active_provider = provider or settings.tts_provider + + # Extract simple language code for Gemini (e.g., "en-US" -> "en") + simple_lang = language_code.split("-")[0] if "-" in language_code else language_code + + # Parse VTT cues first + cues = self._parse_ad_cues(ad_vtt_content) + if not cues: + raise ValueError("No audio description cues found") + + # Synthesize each cue individually + segments: list[TTSCueSegment] = [] + audio_segments_for_combine = [] + current_audio_position = 0.0 + + for i, cue in enumerate(cues): + target_start_time = cue["start_time"] + + # Add silence to reach the exact VTT start time + if target_start_time > current_audio_position: + silence_duration = target_start_time - current_audio_position + silence = AudioSegment.silent(duration=int(silence_duration * 1000)) + audio_segments_for_combine.append(silence) + current_audio_position = target_start_time + + text = cue["text"].strip() + if text: + # Ensure proper punctuation for natural TTS flow + if not text.endswith(('.', '!', '?')): + text += "." + + # Synthesize with the appropriate provider + try: + if active_provider == "gemini" and self.gemini_available: + audio_data = await gemini_tts_service.synthesize_text( + text, voice_name or gemini_tts_service.default_voice, + simple_lang, model=model, speed=speed, style_prompt=style_prompt + ) + elif self.google_client: + audio_data = await self._synthesize_text_google(text, language_code, voice_name) + elif self.elevenlabs_available: + voice_id = self._get_elevenlabs_voice(language_code, voice_name) + audio_data = await self._synthesize_text_elevenlabs(text, voice_id) + else: + raise ValueError("No TTS service available") + + # Get actual duration from audio + audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3") + actual_duration = len(audio_segment) / 1000.0 + + # Store segment info + segments.append(TTSCueSegment( + cue_index=i, + start_time=cue["start_time"], + end_time=cue["end_time"], + duration=actual_duration, + text=cue["text"], + audio_bytes=audio_data + )) + + # Add to combined audio + audio_segments_for_combine.append(audio_segment) + current_audio_position += actual_duration + + except Exception as e: + logger.warning(f"Failed to synthesize cue {i}: {e}") + # Add silence for failed cue + cue_duration = cue["end_time"] - cue["start_time"] + silence = AudioSegment.silent(duration=int(cue_duration * 1000)) + audio_segments_for_combine.append(silence) + current_audio_position += cue_duration + + # Combine all segments + if audio_segments_for_combine: + final_audio = sum(audio_segments_for_combine, AudioSegment.empty()) + else: + final_audio = AudioSegment.silent(duration=1000) + + # Export combined to MP3 + output_buffer = io.BytesIO() + final_audio.export(output_buffer, format="mp3", bitrate="128k") + + logger.info(f"Synthesized {len(segments)} AD cue segments") + return output_buffer.getvalue(), segments + async def _synthesize_with_google( self, ad_vtt_content: str, diff --git a/backend/app/services/video_renderer.py b/backend/app/services/video_renderer.py new file mode 100644 index 0000000..abf2ab2 --- /dev/null +++ b/backend/app/services/video_renderer.py @@ -0,0 +1,461 @@ +"""Service for rendering accessible video with embedded audio descriptions using ffmpeg.""" + +import asyncio +import os +import subprocess +import tempfile +from pathlib import Path +from typing import Any + +from ..core.config import settings +from ..core.logging import get_logger +from ..schemas.accessible_video import AccessibleVideoMethod, GeminiAccessibleVideoAnalysis + +logger = get_logger(__name__) + + +class VideoRendererService: + """Service for rendering accessible video with embedded audio descriptions.""" + + def __init__(self): + self.ffmpeg_path = "ffmpeg" + self.ffprobe_path = "ffprobe" + # Audio ducking settings + self.duck_level = getattr(settings, 'accessible_video_duck_level', 0.3) + self.duck_fade_ms = getattr(settings, 'accessible_video_duck_fade_ms', 200) + + async def render_accessible_video( + self, + source_video_path: str, + ad_segments: list[tuple[int, str]], # [(cue_index, mp3_path), ...] + analysis: dict[str, Any], + output_path: str, + ) -> str: + """ + Render accessible video based on Gemini analysis. + + Args: + source_video_path: Path to source MP4 + ad_segments: List of (cue_index, mp3_path) tuples for each AD segment + analysis: Gemini analysis dict with method and placements + output_path: Where to save the output MP4 + + Returns: + Path to rendered accessible video + """ + method = analysis.get("method", "pause_insert") + + if method == "overlay": + return await self._render_overlay_method( + source_video_path, ad_segments, analysis, output_path + ) + else: + return await self._render_pause_insert_method( + source_video_path, ad_segments, analysis, output_path + ) + + async def _render_overlay_method( + self, + source_video_path: str, + ad_segments: list[tuple[int, str]], + analysis: dict[str, Any], + output_path: str, + ) -> str: + """ + Render with overlay method: + 1. Create AD audio track with segments at target times + 2. Apply ducking to original audio during AD playback + 3. Mix tracks together + 4. Mux with original video (copy video stream) + """ + logger.info(f"Starting overlay render for {source_video_path}") + placements = analysis.get("placements", []) + + with tempfile.TemporaryDirectory() as temp_dir: + temp_dir_path = Path(temp_dir) + + # Get source video duration + duration = await self._get_video_duration(source_video_path) + logger.info(f"Source video duration: {duration}s") + + # Build ducking filter for original audio + duck_filters = [] + for placement in placements: + duck_start = placement.get("duck_start") + duck_end = placement.get("duck_end") + if duck_start is not None and duck_end is not None: + # Volume filter: reduce to duck_level during AD, with fade + duck_filters.append( + f"volume=enable='between(t,{duck_start},{duck_end})':" + f"volume={self.duck_level}" + ) + + # Build ffmpeg command + inputs = ["-i", source_video_path] + filter_parts = [] + + # Add each AD segment as input + for cue_index, mp3_path in ad_segments: + inputs.extend(["-i", mp3_path]) + + # Build complex filter + # First, apply ducking to original audio + if duck_filters: + ducked_filter = ",".join(duck_filters) + filter_parts.append(f"[0:a]{ducked_filter}[ducked]") + base_audio = "[ducked]" + else: + base_audio = "[0:a]" + + # Add delay to each AD segment and mix + ad_labels = [] + for i, (cue_index, mp3_path) in enumerate(ad_segments): + # Find the placement for this cue + placement = next( + (p for p in placements if p.get("ad_cue_index") == cue_index), + None + ) + if placement: + target_time = placement.get("target_start_time", 0) + delay_ms = int(target_time * 1000) + input_idx = i + 1 # 0 is source video + ad_label = f"ad{i}" + filter_parts.append( + f"[{input_idx}:a]adelay={delay_ms}|{delay_ms}[{ad_label}]" + ) + ad_labels.append(f"[{ad_label}]") + + # Mix all audio streams together + if ad_labels: + all_audio = base_audio + "".join(ad_labels) + num_inputs = 1 + len(ad_labels) + filter_parts.append( + f"{all_audio}amix=inputs={num_inputs}:duration=first:dropout_transition=0[mixed]" + ) + audio_output = "[mixed]" + else: + audio_output = base_audio.replace("[", "").replace("]", "") + + filter_complex = ";".join(filter_parts) + + # Build final command + cmd = [ + self.ffmpeg_path, + "-y", # Overwrite output + *inputs, + ] + + if filter_complex: + cmd.extend(["-filter_complex", filter_complex]) + cmd.extend([ + "-map", "0:v", + "-map", audio_output, + "-c:v", "copy", # Copy video stream (no re-encoding) + "-c:a", "aac", + "-b:a", "192k", + output_path + ]) + else: + cmd.extend([ + "-c:v", "copy", + "-c:a", "copy", + output_path + ]) + + logger.info(f"Running ffmpeg overlay command...") + await self._run_ffmpeg(cmd) + + logger.info(f"Overlay render complete: {output_path}") + return output_path + + async def _render_pause_insert_method( + self, + source_video_path: str, + ad_segments: list[tuple[int, str]], + analysis: dict[str, Any], + output_path: str, + ) -> str: + """ + Render with pause-insert method: + 1. Split video at each pause point + 2. Extract freeze frame at each pause point + 3. Create freeze-frame segment with AD audio + 4. Concatenate all segments + """ + logger.info(f"Starting pause-insert render for {source_video_path}") + placements = analysis.get("placements", []) + + # Sort placements by pause_point time + sorted_placements = sorted( + [p for p in placements if p.get("pause_point") is not None], + key=lambda p: p["pause_point"] + ) + + if not sorted_placements: + logger.warning("No pause points found, copying source video") + await self._copy_video(source_video_path, output_path) + return output_path + + with tempfile.TemporaryDirectory() as temp_dir: + temp_dir_path = Path(temp_dir) + + # Get video properties for re-encoding freeze frames + video_props = await self._get_video_properties(source_video_path) + logger.info(f"Video properties: {video_props}") + + segment_files = [] + current_time = 0.0 + + # Create a mapping of cue_index to mp3_path + cue_to_mp3 = {cue_index: mp3_path for cue_index, mp3_path in ad_segments} + + for i, placement in enumerate(sorted_placements): + pause_point = placement["pause_point"] + cue_index = placement["ad_cue_index"] + ad_duration = placement["ad_duration"] + + # Get the AD audio for this cue + ad_mp3_path = cue_to_mp3.get(cue_index) + if not ad_mp3_path: + logger.warning(f"No AD audio found for cue {cue_index}, skipping") + continue + + # 1. Extract video segment from current_time to pause_point + if pause_point > current_time: + segment_path = temp_dir_path / f"segment_{i}_video.mp4" + await self._extract_segment( + source_video_path, + current_time, + pause_point - current_time, + str(segment_path) + ) + segment_files.append(str(segment_path)) + + # 2. Extract freeze frame at pause point + freeze_frame_path = temp_dir_path / f"freeze_{i}.png" + await self._extract_frame( + source_video_path, + pause_point, + str(freeze_frame_path) + ) + + # 3. Create freeze segment with AD audio + freeze_segment_path = temp_dir_path / f"freeze_segment_{i}.mp4" + await self._create_freeze_segment( + str(freeze_frame_path), + ad_mp3_path, + ad_duration, + str(freeze_segment_path), + video_props + ) + segment_files.append(str(freeze_segment_path)) + + current_time = pause_point + + # 4. Add final segment from last pause point to end + source_duration = await self._get_video_duration(source_video_path) + if current_time < source_duration: + final_segment_path = temp_dir_path / "segment_final.mp4" + await self._extract_segment( + source_video_path, + current_time, + source_duration - current_time, + str(final_segment_path) + ) + segment_files.append(str(final_segment_path)) + + # 5. Concatenate all segments + if segment_files: + await self._concatenate_segments(segment_files, output_path, temp_dir_path) + else: + await self._copy_video(source_video_path, output_path) + + logger.info(f"Pause-insert render complete: {output_path}") + return output_path + + async def _get_video_duration(self, video_path: str) -> float: + """Get video duration in seconds using ffprobe.""" + cmd = [ + self.ffprobe_path, + "-v", "quiet", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + video_path + ] + result = await asyncio.to_thread( + subprocess.run, + cmd, + capture_output=True, + text=True, + check=True + ) + return float(result.stdout.strip()) + + async def _get_video_properties(self, video_path: str) -> dict[str, Any]: + """Get video properties (resolution, framerate, codec) using ffprobe.""" + cmd = [ + self.ffprobe_path, + "-v", "quiet", + "-select_streams", "v:0", + "-show_entries", "stream=width,height,r_frame_rate,codec_name", + "-of", "json", + video_path + ] + result = await asyncio.to_thread( + subprocess.run, + cmd, + capture_output=True, + text=True, + check=True + ) + + import json + data = json.loads(result.stdout) + stream = data.get("streams", [{}])[0] + + # Parse frame rate (e.g., "30000/1001" or "30/1") + fps_str = stream.get("r_frame_rate", "30/1") + if "/" in fps_str: + num, den = fps_str.split("/") + fps = float(num) / float(den) + else: + fps = float(fps_str) + + return { + "width": stream.get("width", 1920), + "height": stream.get("height", 1080), + "fps": fps, + "codec": stream.get("codec_name", "h264") + } + + async def _extract_segment( + self, + source_path: str, + start_time: float, + duration: float, + output_path: str + ): + """Extract a video segment using ffmpeg.""" + cmd = [ + self.ffmpeg_path, + "-y", + "-ss", str(start_time), + "-i", source_path, + "-t", str(duration), + "-c", "copy", + "-avoid_negative_ts", "make_zero", + output_path + ] + await self._run_ffmpeg(cmd) + + async def _extract_frame(self, video_path: str, time_point: float, output_path: str): + """Extract a single frame as PNG using ffmpeg.""" + cmd = [ + self.ffmpeg_path, + "-y", + "-ss", str(time_point), + "-i", video_path, + "-frames:v", "1", + "-q:v", "2", + output_path + ] + await self._run_ffmpeg(cmd) + + async def _create_freeze_segment( + self, + frame_path: str, + audio_path: str, + duration: float, + output_path: str, + video_props: dict[str, Any] + ): + """Create a freeze-frame video segment with audio overlay.""" + width = video_props.get("width", 1920) + height = video_props.get("height", 1080) + fps = video_props.get("fps", 30) + + cmd = [ + self.ffmpeg_path, + "-y", + "-loop", "1", + "-i", frame_path, + "-i", audio_path, + "-c:v", "libx264", + "-preset", "fast", + "-tune", "stillimage", + "-c:a", "aac", + "-b:a", "192k", + "-pix_fmt", "yuv420p", + "-vf", f"scale={width}:{height}:force_original_aspect_ratio=decrease,pad={width}:{height}:(ow-iw)/2:(oh-ih)/2", + "-r", str(fps), + "-t", str(duration), + "-shortest", + output_path + ] + await self._run_ffmpeg(cmd) + + async def _concatenate_segments( + self, + segment_paths: list[str], + output_path: str, + temp_dir: Path + ): + """Concatenate video segments using ffmpeg concat demuxer.""" + # Create concat file + concat_file = temp_dir / "concat.txt" + with open(concat_file, "w") as f: + for path in segment_paths: + # Escape single quotes in path + escaped_path = path.replace("'", "'\\''") + f.write(f"file '{escaped_path}'\n") + + cmd = [ + self.ffmpeg_path, + "-y", + "-f", "concat", + "-safe", "0", + "-i", str(concat_file), + "-c", "copy", + output_path + ] + await self._run_ffmpeg(cmd) + + async def _copy_video(self, source_path: str, output_path: str): + """Copy video without modification.""" + cmd = [ + self.ffmpeg_path, + "-y", + "-i", source_path, + "-c", "copy", + output_path + ] + await self._run_ffmpeg(cmd) + + async def _run_ffmpeg(self, cmd: list[str], timeout: int = 3600): + """Run ffmpeg command with proper error handling.""" + logger.debug(f"Running command: {' '.join(cmd)}") + + try: + result = await asyncio.wait_for( + asyncio.to_thread( + subprocess.run, + cmd, + capture_output=True, + text=True + ), + timeout=timeout + ) + + if result.returncode != 0: + logger.error(f"ffmpeg error: {result.stderr}") + raise RuntimeError(f"ffmpeg failed with code {result.returncode}: {result.stderr}") + + return result + + except asyncio.TimeoutError: + logger.error(f"ffmpeg command timed out after {timeout}s") + raise RuntimeError(f"ffmpeg command timed out after {timeout}s") + + +# Global service instance +video_renderer_service = VideoRendererService() diff --git a/backend/app/services/vtt_retimer.py b/backend/app/services/vtt_retimer.py new file mode 100644 index 0000000..c1cbc5f --- /dev/null +++ b/backend/app/services/vtt_retimer.py @@ -0,0 +1,208 @@ +"""Service for re-timing VTT files when pauses are inserted into video.""" + +from typing import Any + +from ..core.logging import get_logger + +logger = get_logger(__name__) + + +class VTTRetimerService: + """Service for re-timing VTT subtitle files after pause insertions.""" + + def retime_for_pause_insert( + self, + original_vtt: str, + analysis: dict[str, Any] + ) -> str: + """ + Generate new VTT with adjusted timings for pause-insert accessible video. + + For each pause insertion, all subsequent cues shift forward by the pause duration. + + Args: + original_vtt: Original VTT content + analysis: Gemini analysis with placements containing pause_point and ad_duration + + Returns: + Re-timed VTT content + """ + placements = analysis.get("placements", []) + + # Build list of (pause_point, pause_duration) sorted by time + pauses = [] + for placement in placements: + pause_point = placement.get("pause_point") + ad_duration = placement.get("ad_duration", 0) + if pause_point is not None and ad_duration > 0: + pauses.append((pause_point, ad_duration)) + + pauses.sort(key=lambda x: x[0]) + + if not pauses: + logger.info("No pauses to apply, returning original VTT") + return original_vtt + + logger.info(f"Re-timing VTT with {len(pauses)} pause insertions") + + # Parse and retime cues + cues = self._parse_vtt(original_vtt) + retimed_cues = [] + + for cue in cues: + # Calculate cumulative offset from all pauses that occur before this cue's start + cumulative_offset = sum( + duration for pause_point, duration in pauses + if pause_point <= cue["start_time"] + ) + + retimed_cues.append({ + "start_time": cue["start_time"] + cumulative_offset, + "end_time": cue["end_time"] + cumulative_offset, + "text": cue["text"] + }) + + return self._build_vtt(retimed_cues) + + def retime_ad_vtt_for_pause_insert( + self, + original_ad_vtt: str, + analysis: dict[str, Any] + ) -> str: + """ + Re-time the audio description VTT for pause-insert accessible video. + + For AD cues, we use the target_start_time from the analysis + since they are placed at specific points during pauses. + + Args: + original_ad_vtt: Original AD VTT content + analysis: Gemini analysis with placements + + Returns: + Re-timed AD VTT content for accessible video + """ + placements = analysis.get("placements", []) + + # Parse original AD VTT + cues = self._parse_vtt(original_ad_vtt) + + if len(cues) != len(placements): + logger.warning( + f"AD cue count ({len(cues)}) doesn't match placements ({len(placements)})" + ) + + retimed_cues = [] + for placement in placements: + cue_index = placement.get("ad_cue_index", 0) + target_start = placement.get("target_start_time", 0) + ad_duration = placement.get("ad_duration", 0) + + # Get original text from matching cue + if cue_index < len(cues): + text = cues[cue_index]["text"] + else: + text = f"[Audio description cue {cue_index}]" + + retimed_cues.append({ + "start_time": target_start, + "end_time": target_start + ad_duration, + "text": text + }) + + return self._build_vtt(retimed_cues) + + def _parse_vtt(self, vtt_content: str) -> list[dict]: + """Parse VTT content into a list of cue dictionaries.""" + lines = vtt_content.strip().split('\n') + cues = [] + + i = 0 + while i < len(lines): + line = lines[i].strip() + + # Skip header and empty lines + if line == "WEBVTT" or line == "" or line.startswith("NOTE"): + i += 1 + continue + + # Check for timing line + if " --> " in line: + timing_parts = line.split(" --> ") + start_time = self._parse_timestamp(timing_parts[0].strip()) + # Handle potential settings after end time + end_part = timing_parts[1].strip() + if " " in end_part: + end_part = end_part.split(" ")[0] + end_time = self._parse_timestamp(end_part) + + # Get text from next line(s) + i += 1 + text_lines = [] + while i < len(lines) and lines[i].strip() != "": + text_lines.append(lines[i].strip()) + i += 1 + + if text_lines: + cues.append({ + "start_time": start_time, + "end_time": end_time, + "text": "\n".join(text_lines) + }) + else: + i += 1 + + return cues + + def _parse_timestamp(self, timestamp: str) -> float: + """Convert VTT timestamp to seconds.""" + # Format: HH:MM:SS.mmm or MM:SS.mmm + parts = timestamp.split(":") + + if len(parts) == 3: # HH:MM:SS.mmm + hours, minutes, seconds = parts + elif len(parts) == 2: # MM:SS.mmm + hours, minutes, seconds = "0", parts[0], parts[1] + else: + raise ValueError(f"Invalid timestamp format: {timestamp}") + + # Parse seconds and milliseconds + sec_parts = seconds.split(".") + seconds_int = int(sec_parts[0]) + milliseconds = int(sec_parts[1]) if len(sec_parts) > 1 else 0 + + total_seconds = ( + int(hours) * 3600 + + int(minutes) * 60 + + seconds_int + + milliseconds / 1000.0 + ) + + return total_seconds + + def _format_timestamp(self, seconds: float) -> str: + """Convert seconds to VTT timestamp format (HH:MM:SS.mmm).""" + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + secs = seconds % 60 + whole_secs = int(secs) + millis = int((secs - whole_secs) * 1000) + + return f"{hours:02d}:{minutes:02d}:{whole_secs:02d}.{millis:03d}" + + def _build_vtt(self, cues: list[dict]) -> str: + """Build VTT content from list of cue dictionaries.""" + lines = ["WEBVTT", ""] + + for cue in cues: + start_ts = self._format_timestamp(cue["start_time"]) + end_ts = self._format_timestamp(cue["end_time"]) + lines.append(f"{start_ts} --> {end_ts}") + lines.append(cue["text"]) + lines.append("") + + return "\n".join(lines) + + +# Global service instance +vtt_retimer_service = VTTRetimerService() diff --git a/backend/app/tasks/__init__.py b/backend/app/tasks/__init__.py index 032afd6..a3f0495 100644 --- a/backend/app/tasks/__init__.py +++ b/backend/app/tasks/__init__.py @@ -27,6 +27,7 @@ celery_app.conf.update( task_routes={ "app.tasks.ingest_and_ai.*": {"queue": "ingest"}, "app.tasks.translate_and_synthesize.*": {"queue": "default"}, + "app.tasks.render_accessible_video.*": {"queue": "render"}, "app.tasks.notify.*": {"queue": "notify"}, }, task_default_queue="default", @@ -117,6 +118,7 @@ def import_task_modules(): try: from . import ingest_and_ai # noqa: E402, F401 from . import translate_and_synthesize # noqa: E402, F401 + from . import render_accessible_video # noqa: E402, F401 from . import notify # noqa: E402, F401 logger.info("Successfully imported all task modules") except Exception as e: diff --git a/backend/app/tasks/render_accessible_video.py b/backend/app/tasks/render_accessible_video.py new file mode 100644 index 0000000..087b1d9 --- /dev/null +++ b/backend/app/tasks/render_accessible_video.py @@ -0,0 +1,315 @@ +"""Celery task for rendering accessible video with embedded audio descriptions.""" + +import asyncio +import os +import tempfile +from datetime import datetime +from typing import Any + +from motor.motor_asyncio import AsyncIOMotorClient + +from ..core.config import settings +from ..core.logging import get_logger +from ..models.job import JobStatus +from ..services.gcs import gcs_service +from ..services.gemini import gemini_service +from ..services.video_renderer import video_renderer_service +from ..services.vtt_retimer import vtt_retimer_service +from . import celery_app +from .translate_and_synthesize import broadcast_status_update, retry_with_backoff + +logger = get_logger(__name__) + + +@celery_app.task(bind=True, time_limit=7200, soft_time_limit=7000) # 2 hour limit for video rendering +def render_accessible_video_task(self, job_id: str, language: str): + """ + Pipeline 3: Accessible Video Rendering + Triggered after TTS generation completes for a language when accessible_video_mp4 is requested. + + Steps: + 1. Download source video and per-cue AD MP3s from GCS + 2. Get AD VTT content and calculate cue durations + 3. Call Gemini for placement analysis + 4. Render accessible video (overlay or pause-insert) + 5. If pause-insert: generate re-timed caption VTT + 6. Upload outputs to GCS + 7. Update job document + """ + logger.info(f"Starting accessible video render for job {job_id}, language {language}") + + try: + result = asyncio.run(_async_render_accessible_video(job_id, language)) + logger.info(f"Accessible video render completed for job {job_id}, language {language}") + return result + except Exception as e: + logger.error(f"Accessible video render failed for job {job_id}/{language}: {e}") + import traceback + logger.error(f"Full traceback: {traceback.format_exc()}") + raise + + +async def _async_render_accessible_video(job_id: str, language: str): + """Async implementation of accessible video rendering.""" + logger.info(f"Async render started for job {job_id}, language {language}") + + client = AsyncIOMotorClient(settings.mongodb_uri) + db = client[settings.mongodb_db] + + try: + # Get job details + job_doc = await db.jobs.find_one({"_id": job_id}) + if not job_doc: + raise ValueError(f"Job {job_id} not found") + + job_title = job_doc.get("title", "Untitled Job") + + # Verify accessible video is requested + if not job_doc["requested_outputs"].get("accessible_video_mp4"): + logger.info(f"Accessible video not requested for job {job_id}") + return + + # Update progress to rendering + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": { + f"accessible_video_progress.{language}": { + "status": "rendering", + "started_at": datetime.utcnow() + }, + "updated_at": datetime.utcnow() + } + } + ) + + # Broadcast status update + broadcast_status_update( + job_id, + "rendering", + job_title=job_title, + message=f"Rendering accessible video for {language.upper()}" + ) + + with tempfile.TemporaryDirectory() as temp_dir: + # 1. Download source video from GCS + source_video_gcs = job_doc["source"]["gcs_uri"] + source_blob_path = source_video_gcs.replace(f"gs://{settings.gcs_bucket}/", "") + source_video_path = os.path.join(temp_dir, "source.mp4") + + logger.info(f"Downloading source video from {source_blob_path}") + source_blob = gcs_service.bucket.blob(source_blob_path) + source_blob.download_to_filename(source_video_path) + + # 2. Get language outputs + lang_output = job_doc["outputs"].get(language) + if not lang_output: + raise ValueError(f"No outputs found for language {language}") + + # 3. Download AD VTT content + ad_vtt_gcs = lang_output.get("ad_vtt_gcs") + if not ad_vtt_gcs: + raise ValueError(f"No AD VTT found for language {language}") + + ad_blob_path = ad_vtt_gcs.replace(f"gs://{settings.gcs_bucket}/", "") + ad_blob = gcs_service.bucket.blob(ad_blob_path) + ad_vtt_content = ad_blob.download_as_text() + + # 4. Download per-cue AD MP3 segments + ad_cues_prefix = lang_output.get("ad_cues_gcs_prefix") + if not ad_cues_prefix: + raise ValueError(f"No AD cue segments found for language {language}") + + # List and download all cue segments + ad_segments = [] + cue_durations = [] + + prefix_path = ad_cues_prefix.replace(f"gs://{settings.gcs_bucket}/", "") + blobs = list(gcs_service.bucket.list_blobs(prefix=prefix_path)) + + # Sort by cue index + cue_blobs = [(b, int(b.name.split("_")[-1].replace(".mp3", ""))) for b in blobs if b.name.endswith(".mp3")] + cue_blobs.sort(key=lambda x: x[1]) + + for blob, cue_index in cue_blobs: + local_path = os.path.join(temp_dir, f"cue_{cue_index}.mp3") + blob.download_to_filename(local_path) + ad_segments.append((cue_index, local_path)) + + # Get duration from audio file + from pydub import AudioSegment + audio = AudioSegment.from_mp3(local_path) + duration = len(audio) / 1000.0 # Convert ms to seconds + cue_durations.append(duration) + + logger.info(f"Downloaded {len(ad_segments)} AD cue segments") + + # 5. Call Gemini for placement analysis + logger.info("Analyzing video for AD placement with Gemini...") + + async def analyze(): + return await gemini_service.analyze_accessible_video_placement( + source_video_path, + ad_vtt_content, + cue_durations + ) + + analysis = await retry_with_backoff(analyze, max_retries=2) + + method = analysis.get("method", "pause_insert") + logger.info(f"Gemini analysis complete: method={method}") + + # 6. Render accessible video + output_video_path = os.path.join(temp_dir, "accessible_video.mp4") + + logger.info(f"Rendering accessible video using {method} method...") + await video_renderer_service.render_accessible_video( + source_video_path, + ad_segments, + analysis, + output_video_path + ) + + # 7. Upload rendered video to GCS + video_blob_path = f"{job_id}/{language}/accessible_video.mp4" + video_blob = gcs_service.bucket.blob(video_blob_path) + video_blob.content_type = "video/mp4" + video_blob.upload_from_filename(output_video_path) + + video_gcs_uri = f"gs://{settings.gcs_bucket}/{video_blob_path}" + logger.info(f"Uploaded accessible video to {video_gcs_uri}") + + # 8. If pause-insert, generate re-timed captions VTT + retimed_captions_gcs_uri = None + if method == "pause_insert": + # Download original captions VTT + captions_vtt_gcs = lang_output.get("captions_vtt_gcs") + if captions_vtt_gcs: + captions_blob_path = captions_vtt_gcs.replace(f"gs://{settings.gcs_bucket}/", "") + captions_blob = gcs_service.bucket.blob(captions_blob_path) + original_captions_vtt = captions_blob.download_as_text() + + # Re-time captions + retimed_captions = vtt_retimer_service.retime_for_pause_insert( + original_captions_vtt, + analysis + ) + + # Upload re-timed captions + retimed_blob_path = f"{job_id}/{language}/accessible_captions.vtt" + retimed_blob = gcs_service.bucket.blob(retimed_blob_path) + retimed_blob.content_type = "text/vtt" + retimed_blob.upload_from_string(retimed_captions, content_type="text/vtt") + + retimed_captions_gcs_uri = f"gs://{settings.gcs_bucket}/{retimed_blob_path}" + logger.info(f"Uploaded re-timed captions to {retimed_captions_gcs_uri}") + + # 9. Update job document with results + update_fields = { + f"outputs.{language}.accessible_video_gcs": video_gcs_uri, + f"outputs.{language}.accessible_video_method": method, + f"accessible_video_progress.{language}": { + "status": "completed", + "method": method, + "started_at": job_doc.get("accessible_video_progress", {}).get(language, {}).get("started_at"), + "completed_at": datetime.utcnow() + }, + "updated_at": datetime.utcnow() + } + + if retimed_captions_gcs_uri: + update_fields[f"outputs.{language}.retimed_captions_vtt_gcs"] = retimed_captions_gcs_uri + + await db.jobs.update_one( + {"_id": job_id}, + {"$set": update_fields} + ) + + # Broadcast completion + broadcast_status_update( + job_id, + "asset_ready", + job_title=job_title, + message=f"Accessible video ready for {language.upper()} ({method} method)" + ) + + # Check if all accessible videos are complete + await _check_accessible_video_completion(job_id, db) + + logger.info(f"Accessible video render complete for job {job_id}/{language}") + + except Exception as e: + logger.error(f"Accessible video render failed for job {job_id}/{language}: {e}") + + # Update progress to failed + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": { + f"accessible_video_progress.{language}": { + "status": "failed", + "error_message": str(e), + "completed_at": datetime.utcnow() + }, + "updated_at": datetime.utcnow() + } + } + ) + + raise + + finally: + client.close() + + +async def _check_accessible_video_completion(job_id: str, db): + """Check if all accessible videos are complete and update job status accordingly.""" + job_doc = await db.jobs.find_one({"_id": job_id}) + if not job_doc: + return + + progress = job_doc.get("accessible_video_progress", {}) + requested_languages = job_doc["requested_outputs"]["languages"] + + # Check if all requested languages have completed accessible video + all_complete = True + any_failed = False + + for language in requested_languages: + lang_progress = progress.get(language, {}) + status = lang_progress.get("status", "pending") + + if status == "failed": + any_failed = True + elif status != "completed": + all_complete = False + + if all_complete: + logger.info(f"All accessible videos complete for job {job_id}") + + # If job is still in TTS_GENERATING, transition to PENDING_FINAL_REVIEW + if job_doc["status"] == JobStatus.TTS_GENERATING.value: + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": { + "status": JobStatus.PENDING_FINAL_REVIEW.value, + "updated_at": datetime.utcnow() + }, + "$push": { + "review.history": { + "at": datetime.utcnow(), + "status": JobStatus.PENDING_FINAL_REVIEW.value, + "by": "system" + } + } + } + ) + + job_title = job_doc.get("title", "Untitled Job") + broadcast_status_update( + job_id, + JobStatus.PENDING_FINAL_REVIEW.value, + job_title=job_title, + message=f"{job_title} has all accessible videos complete - ready for Final Review" + ) diff --git a/backend/app/tasks/translate_and_synthesize.py b/backend/app/tasks/translate_and_synthesize.py index f2fa9c7..7d5c8c6 100644 --- a/backend/app/tasks/translate_and_synthesize.py +++ b/backend/app/tasks/translate_and_synthesize.py @@ -268,36 +268,50 @@ async def _async_translate_and_synthesize(job_id: str): ) # Generate TTS for languages that need MP3 + accessible_video_requested = job_doc["requested_outputs"].get("accessible_video_mp4", False) + if job_doc["requested_outputs"]["audio_description_mp3"]: # Get TTS preferences from job tts_preferences = job_doc["requested_outputs"].get("tts_preferences", {}) - await _generate_tts_for_languages(job_id, updated_outputs, db, source_language, tts_preferences) + await _generate_tts_for_languages( + job_id, updated_outputs, db, source_language, tts_preferences, accessible_video_requested + ) # Update final status - await db.jobs.update_one( - {"_id": job_id}, - { - "$set": { - "status": JobStatus.PENDING_FINAL_REVIEW.value, - "updated_at": datetime.utcnow() - }, - "$push": { - "review.history": { - "at": datetime.utcnow(), + # If accessible video is requested, the render task will handle the transition + # to PENDING_FINAL_REVIEW when all videos are complete + if not accessible_video_requested: + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": { "status": JobStatus.PENDING_FINAL_REVIEW.value, - "by": "system" + "updated_at": datetime.utcnow() + }, + "$push": { + "review.history": { + "at": datetime.utcnow(), + "status": JobStatus.PENDING_FINAL_REVIEW.value, + "by": "system" + } } } - } - ) - - # Broadcast status update - broadcast_status_update( - job_id, - JobStatus.PENDING_FINAL_REVIEW.value, - job_title=job_title, - message=f"{job_title} has finished translation and audio generation - ready for Final Review" - ) + ) + + # Broadcast status update + broadcast_status_update( + job_id, + JobStatus.PENDING_FINAL_REVIEW.value, + job_title=job_title, + message=f"{job_title} has finished translation and audio generation - ready for Final Review" + ) + else: + # When accessible video is requested, stay in TTS_GENERATING + # The render_accessible_video task will transition to PENDING_FINAL_REVIEW + logger.info( + f"Accessible video rendering triggered for job {job_id}. " + f"Staying in TTS_GENERATING until all videos are complete." + ) logger.info(f"Successfully completed translation and synthesis for job {job_id}") @@ -330,7 +344,8 @@ async def _generate_tts_for_languages( outputs: dict[str, Any], db, source_language: str = "en", - tts_preferences: dict = None + tts_preferences: dict = None, + accessible_video_requested: bool = False ): """Generate TTS audio for each language's audio description""" if tts_preferences is None: @@ -338,15 +353,19 @@ async def _generate_tts_for_languages( # Always generate source language MP3 first if source_language in outputs and "ad_vtt_gcs" in outputs[source_language]: - await _generate_language_tts(job_id, source_language, outputs[source_language], db, tts_preferences) + await _generate_language_tts( + job_id, source_language, outputs[source_language], db, tts_preferences, accessible_video_requested + ) # Generate for other languages for language, lang_output in outputs.items(): if language != source_language and "ad_vtt_gcs" in lang_output: - await _generate_language_tts(job_id, language, lang_output, db, tts_preferences) + await _generate_language_tts( + job_id, language, lang_output, db, tts_preferences, accessible_video_requested + ) -async def _generate_language_tts(job_id: str, language: str, lang_output: dict, db, tts_preferences: dict = None): +async def _generate_language_tts(job_id: str, language: str, lang_output: dict, db, tts_preferences: dict = None, accessible_video_requested: bool = False): """Generate TTS for a specific language""" if tts_preferences is None: tts_preferences = {} @@ -379,23 +398,52 @@ async def _generate_language_tts(job_id: str, language: str, lang_output: dict, logger.info( f"Generating TTS for {language} with voice={voice_name}, provider={provider}, " - f"model={model}, speed={speed}x, style={style_preset}" + f"model={model}, speed={speed}x, style={style_preset}, accessible_video={accessible_video_requested}" ) - async def synthesize(): - return await tts_service.synthesize_audio_description( - ad_vtt_content, - language_code, - voice_name=voice_name, - provider=provider, - model=model, - speed=speed, - style_prompt=style_prompt - ) + # Use the segments method if accessible video is requested + if accessible_video_requested: + async def synthesize_with_segments(): + return await tts_service.synthesize_audio_description_with_segments( + ad_vtt_content, + language_code, + voice_name=voice_name, + provider=provider, + model=model, + speed=speed, + style_prompt=style_prompt + ) - mp3_data = await retry_with_backoff(synthesize, max_retries=3) + mp3_data, segments = await retry_with_backoff(synthesize_with_segments, max_retries=3) - # Upload MP3 to GCS + # Upload individual cue segments to GCS + ad_cues_prefix = f"{job_id}/{language}/ad_cues/" + for segment in segments: + cue_blob_path = f"{ad_cues_prefix}cue_{segment.cue_index}.mp3" + cue_blob = gcs_service.bucket.blob(cue_blob_path) + cue_blob.content_type = "audio/mpeg" + cue_blob.upload_from_string(segment.audio_bytes, content_type="audio/mpeg") + + logger.info(f"Uploaded {len(segments)} per-cue AD segments for {language}") + + # Store the prefix path + ad_cues_gcs_prefix = f"gs://{settings.gcs_bucket}/{ad_cues_prefix}" + else: + async def synthesize(): + return await tts_service.synthesize_audio_description( + ad_vtt_content, + language_code, + voice_name=voice_name, + provider=provider, + model=model, + speed=speed, + style_prompt=style_prompt + ) + + mp3_data = await retry_with_backoff(synthesize, max_retries=3) + ad_cues_gcs_prefix = None + + # Upload combined MP3 to GCS mp3_blob_path = f"{job_id}/{language}/ad.mp3" mp3_blob = gcs_service.bucket.blob(mp3_blob_path) mp3_blob.content_type = "audio/mpeg" @@ -404,14 +452,17 @@ async def _generate_language_tts(job_id: str, language: str, lang_output: dict, mp3_gcs_uri = f"gs://{settings.gcs_bucket}/{mp3_blob_path}" # Update job outputs + update_fields = { + f"outputs.{language}.ad_mp3_gcs": mp3_gcs_uri, + "updated_at": datetime.utcnow() + } + + if ad_cues_gcs_prefix: + update_fields[f"outputs.{language}.ad_cues_gcs_prefix"] = ad_cues_gcs_prefix + await db.jobs.update_one( {"_id": job_id}, - { - "$set": { - f"outputs.{language}.ad_mp3_gcs": mp3_gcs_uri, - "updated_at": datetime.utcnow() - } - } + {"$set": update_fields} ) logger.info(f"Successfully generated TTS for {language}") @@ -426,6 +477,25 @@ async def _generate_language_tts(job_id: str, language: str, lang_output: dict, message=f"Audio description MP3 ready for {language.upper()}" ) + # Trigger accessible video rendering if requested + if accessible_video_requested: + from .render_accessible_video import render_accessible_video_task + + # Initialize progress tracking for this language + await db.jobs.update_one( + {"_id": job_id}, + { + "$set": { + f"accessible_video_progress.{language}": { + "status": "pending" + } + } + } + ) + + render_accessible_video_task.delay(job_id, language) + logger.info(f"Triggered accessible video rendering for job {job_id}/{language}") + except Exception as e: logger.error(f"TTS generation failed for {language}: {e}") diff --git a/frontend/src/routes/jobs/NewJob.tsx b/frontend/src/routes/jobs/NewJob.tsx index 3ed06cc..56441a6 100644 --- a/frontend/src/routes/jobs/NewJob.tsx +++ b/frontend/src/routes/jobs/NewJob.tsx @@ -20,6 +20,7 @@ const jobSchema = z.object({ captions_vtt: z.boolean(), audio_description_vtt: z.boolean(), audio_description_mp3: z.boolean(), + accessible_video_mp4: z.boolean(), languages: z.array(z.string()), transcreation: z.array(z.string()), }); @@ -71,6 +72,7 @@ export function NewJob() { captions_vtt: true, audio_description_vtt: true, audio_description_mp3: true, + accessible_video_mp4: false, languages: [], transcreation: [], } @@ -125,6 +127,7 @@ export function NewJob() { captions_vtt: data.captions_vtt, audio_description_vtt: data.audio_description_vtt, audio_description_mp3: data.audio_description_mp3, + accessible_video_mp4: data.accessible_video_mp4, languages: data.languages, transcreation: data.transcreation, tts_preferences: data.audio_description_mp3 ? ttsPreferences : undefined, @@ -202,6 +205,7 @@ export function NewJob() { captions_vtt: data.captions_vtt, audio_description_vtt: data.audio_description_vtt, audio_description_mp3: data.audio_description_mp3, + accessible_video_mp4: data.accessible_video_mp4, languages: data.languages, transcreation: data.transcreation, tts_preferences: data.audio_description_mp3 ? ttsPreferences : undefined, @@ -246,6 +250,7 @@ export function NewJob() { captions_vtt: data.captions_vtt, audio_description_vtt: data.audio_description_vtt, audio_description_mp3: data.audio_description_mp3, + accessible_video_mp4: data.accessible_video_mp4, languages: data.languages, transcreation: data.transcreation, tts_preferences: data.audio_description_mp3 ? ttsPreferences : undefined, @@ -583,6 +588,14 @@ export function NewJob() { /> Audio Description Voiceover (MP3) + diff --git a/frontend/src/types/api.ts b/frontend/src/types/api.ts index dbf77e6..90fb450 100644 --- a/frontend/src/types/api.ts +++ b/frontend/src/types/api.ts @@ -55,6 +55,7 @@ export interface RequestedOutputs { captions_vtt: boolean; audio_description_vtt: boolean; audio_description_mp3: boolean; + accessible_video_mp4: boolean; // Rendered video with embedded audio descriptions languages: string[]; transcreation: string[]; tts_preferences?: TTSPreferences; @@ -88,10 +89,17 @@ export interface TTSOptionsResponse { speed_range: SpeedRange; } +export type AccessibleVideoMethod = "overlay" | "pause_insert"; + export interface LangOutput { captions_vtt_gcs?: string; ad_vtt_gcs?: string; ad_mp3_gcs?: string; + // Accessible video outputs + accessible_video_gcs?: string; + accessible_video_method?: AccessibleVideoMethod; + retimed_captions_vtt_gcs?: string; // Re-timed captions for pause-insert method + ad_cues_gcs_prefix?: string; // Path prefix for per-cue MP3 segments origin?: "translate" | "transcreate"; qa_notes?: string; } @@ -114,6 +122,16 @@ export interface AISection { confidence?: number; } +export type AccessibleVideoProgressStatus = "pending" | "rendering" | "completed" | "failed"; + +export interface AccessibleVideoProgressItem { + status: AccessibleVideoProgressStatus; + method?: AccessibleVideoMethod; + error_message?: string; + started_at?: string; + completed_at?: string; +} + export interface Job { id: string; client_id: string; @@ -123,6 +141,7 @@ export interface Job { status: JobStatus; review: Review; outputs?: Record; + accessible_video_progress?: Record; ai?: AISection; error?: Record; created_at: string;