feat: add accessible video (MP4 with embedded audio descriptions)

Add new deliverable type that renders video with audio descriptions embedded.
Supports two AI-determined methods:
- Direct Overlay: ducks original audio and overlays AD TTS (for minimal dialogue)
- Pause-Insert: freeze-frame video, insert AD, re-time subtitles (for significant dialogue)

Backend:
- Add Pydantic schemas for Gemini analysis response
- Add Gemini prompt and analyze_accessible_video_placement() method
- Add video_renderer.py service using FFmpeg for both rendering methods
- Add vtt_retimer.py service for pause-insert subtitle adjustment
- Add render_accessible_video.py Celery task
- Modify TTS service to return individual per-cue segments
- Update translate_and_synthesize.py to save segments and trigger rendering
- Update download endpoint to include accessible video outputs

Frontend:
- Add accessible_video_mp4 checkbox to NewJob form
- Update TypeScript types for new deliverable

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
michael 2025-12-26 11:06:41 -06:00
parent d1e51ebade
commit 80d3866d32
14 changed files with 1704 additions and 46 deletions

View file

@ -698,6 +698,27 @@ async def get_job_downloads(
except Exception as e:
logger.warning(f"Failed to generate signed URL for AD MP3 {language}: {e}")
# Accessible Video MP4
if "accessible_video_gcs" in lang_output:
blob_path = lang_output["accessible_video_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
try:
signed_url = await get_signed_download_url(blob_path, 24)
lang_downloads["accessible_video_mp4"] = signed_url
# Include method info if available
if "accessible_video_method" in lang_output:
lang_downloads["accessible_video_method"] = lang_output["accessible_video_method"]
except Exception as e:
logger.warning(f"Failed to generate signed URL for accessible video {language}: {e}")
# Re-timed Captions VTT (for pause-insert accessible videos)
if "retimed_captions_vtt_gcs" in lang_output:
blob_path = lang_output["retimed_captions_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
try:
signed_url = await get_signed_download_url(blob_path, 24)
lang_downloads["accessible_captions_vtt"] = signed_url
except Exception as e:
logger.warning(f"Failed to generate signed URL for retimed captions {language}: {e}")
if lang_downloads:
downloads[language] = lang_downloads

View file

@ -53,6 +53,7 @@ class RequestedOutputs(BaseModel):
captions_vtt: bool = True
audio_description_vtt: bool = True
audio_description_mp3: bool = True
accessible_video_mp4: bool = False # Rendered video with embedded audio descriptions
languages: list[str] = []
transcreation: list[str] = []
tts_preferences: Optional[TTSPreferences] = None
@ -62,6 +63,11 @@ class LangOutput(BaseModel):
captions_vtt_gcs: Optional[str] = None
ad_vtt_gcs: Optional[str] = None
ad_mp3_gcs: Optional[str] = None
# Accessible video outputs
accessible_video_gcs: Optional[str] = None # Rendered accessible MP4
accessible_video_method: Optional[Literal["overlay", "pause_insert"]] = None
retimed_captions_vtt_gcs: Optional[str] = None # Re-timed captions for pause-insert method
ad_cues_gcs_prefix: Optional[str] = None # GCS path prefix for per-cue MP3 segments
origin: Optional[Literal["translate", "transcreate"]] = None
qa_notes: Optional[str] = None
@ -84,6 +90,15 @@ class AISection(BaseModel):
confidence: Optional[float] = None
class AccessibleVideoProgressItem(BaseModel):
"""Progress tracking for accessible video rendering per language."""
status: Literal["pending", "rendering", "completed", "failed"] = "pending"
method: Optional[Literal["overlay", "pause_insert"]] = None
error_message: Optional[str] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
class Job(BaseModel):
id: Optional[str] = Field(None, alias="_id")
client_id: str
@ -93,6 +108,7 @@ class Job(BaseModel):
status: JobStatus = JobStatus.CREATED
review: Review = Review()
outputs: Optional[dict[str, LangOutput]] = None
accessible_video_progress: Optional[dict[str, AccessibleVideoProgressItem]] = None
ai: Optional[AISection] = None
error: Optional[dict[str, Any]] = None
created_at: Optional[datetime] = None

View file

@ -0,0 +1,160 @@
SYSTEM:
You are an expert accessible media engineer specializing in audio description integration for video content. Your task is to analyze a video and determine the optimal method for integrating pre-written audio description content. Produce STRICT JSON only.
USER:
You are given:
1. A video file
2. Audio description (AD) VTT content with timed cues
3. The actual TTS audio duration for each AD cue (in seconds)
Your task is to analyze the video and determine the optimal method and precise placement for integrating the audio descriptions.
METHODS:
1. **OVERLAY** - Use when video has minimal spoken dialogue or natural gaps
- Audio descriptions play over the original audio with ducking (reduced volume)
- Original video duration is preserved
- Best for: music-only videos, documentaries with narration gaps, videos with ambient sound
2. **PAUSE_INSERT** - Use when video has significant spoken dialogue
- Video pauses (freeze-frame) during AD playback
- Original content is not obscured by AD
- Best for: dialogue-heavy content, interviews, instructional videos with continuous speech
ANALYSIS STEPS:
1. Detect dialogue presence and density throughout the video
2. For each AD cue, determine if it can fit in existing audio gaps (for overlay) or needs pause insertion
3. If >30% of AD cues would significantly overlap dialogue, use PAUSE_INSERT
4. For PAUSE_INSERT: identify natural break points (scene transitions, sentence endings, breaths - NOT mid-word)
5. For OVERLAY: calculate duck timing windows that start slightly before AD and end slightly after
INPUT DATA:
AD VTT Content:
{AD_VTT_CONTENT}
AD Cue Durations (in seconds, matching VTT cue order):
{AD_CUE_DURATIONS}
OUTPUT FORMAT:
Return a JSON object with the following structure:
```json
{
"method": "overlay" | "pause_insert",
"method_rationale": "Clear explanation of why this method was chosen based on video analysis",
"dialogue_density": 0.0-1.0,
"placements": [
{
"ad_cue_index": 0,
"original_start_time": 5.0,
"original_end_time": 8.0,
"target_start_time": 5.0,
"ad_duration": 3.5,
"pause_point": null,
"duck_start": 4.5,
"duck_end": 9.0
}
],
"total_added_duration": 0.0,
"warnings": []
}
```
FIELD DESCRIPTIONS:
- method: The chosen integration method ("overlay" or "pause_insert")
- method_rationale: 1-2 sentences explaining why this method is optimal for this video
- dialogue_density: Score from 0.0 (no dialogue) to 1.0 (continuous dialogue)
- placements: Array of placement instructions for each AD cue:
- ad_cue_index: Index of the AD cue (0-based, matching VTT order)
- original_start_time: Start time from the AD VTT (in seconds)
- original_end_time: End time from the AD VTT (in seconds)
- target_start_time: Where to place the AD in the final video (in seconds)
- For overlay: usually same as original_start_time
- For pause_insert: accounts for cumulative pause durations
- ad_duration: The TTS audio duration for this cue (provided in input)
- pause_point: (pause_insert only) Where to insert freeze-frame in source video (in seconds)
- duck_start: (overlay only) When to begin reducing original audio volume (seconds)
- duck_end: (overlay only) When to restore original audio volume (seconds)
- total_added_duration: Sum of all pause durations (0 for overlay method)
- warnings: List any potential issues (e.g., "Cue 3 may overlap with quick dialogue")
CONSTRAINTS:
- Output MUST be valid JSON. Do not include markdown fences or any other text.
- All JSON strings must be properly escaped
- For pause_insert method:
- pause_point MUST be at a natural break (scene change, breath, sentence end)
- NEVER pause mid-word or mid-sentence when someone is speaking
- Ensure smooth visual flow - prefer pausing on held shots rather than during motion
- Calculate target_start_time accounting for all previous pauses
- For overlay method:
- duck_start should begin 0.2-0.5 seconds before AD starts
- duck_end should extend 0.2-0.3 seconds after AD ends for smooth transitions
- Avoid ducking during critical dialogue moments
- Validate that all timestamps are logically consistent
- If a cue cannot fit cleanly with overlay, the entire video should use pause_insert
CRITICAL: Return ONLY valid JSON that can be parsed by JSON.parse(). No additional text.
Example for OVERLAY method:
{
"method": "overlay",
"method_rationale": "Video contains primarily ambient music and sound effects with minimal dialogue, allowing AD to be overlaid with audio ducking.",
"dialogue_density": 0.15,
"placements": [
{
"ad_cue_index": 0,
"original_start_time": 2.0,
"original_end_time": 5.0,
"target_start_time": 2.0,
"ad_duration": 2.8,
"pause_point": null,
"duck_start": 1.7,
"duck_end": 5.3
},
{
"ad_cue_index": 1,
"original_start_time": 10.0,
"original_end_time": 14.0,
"target_start_time": 10.0,
"ad_duration": 3.5,
"pause_point": null,
"duck_start": 9.7,
"duck_end": 14.3
}
],
"total_added_duration": 0.0,
"warnings": []
}
Example for PAUSE_INSERT method:
{
"method": "pause_insert",
"method_rationale": "Video contains continuous dialogue throughout, requiring pauses to insert AD without obscuring speech.",
"dialogue_density": 0.85,
"placements": [
{
"ad_cue_index": 0,
"original_start_time": 5.0,
"original_end_time": 8.0,
"target_start_time": 5.0,
"ad_duration": 3.2,
"pause_point": 5.0,
"duck_start": null,
"duck_end": null
},
{
"ad_cue_index": 1,
"original_start_time": 15.0,
"original_end_time": 18.0,
"target_start_time": 18.2,
"ad_duration": 2.8,
"pause_point": 15.0,
"duck_start": null,
"duck_end": null
}
],
"total_added_duration": 6.0,
"warnings": ["Cue 1 pause point adjusted from 15.0s to natural breath at 15.0s"]
}
Follow this exact structure and formatting.

View file

@ -0,0 +1,95 @@
"""Schemas for accessible video generation with embedded audio descriptions."""
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
class AccessibleVideoMethod(str, Enum):
"""Method used for integrating audio descriptions into video."""
OVERLAY = "overlay"
PAUSE_INSERT = "pause_insert"
class ADPlacementCue(BaseModel):
"""Placement instruction for a single audio description cue from Gemini analysis."""
ad_cue_index: int = Field(..., description="Index of the AD cue in the VTT (0-based)")
original_start_time: float = Field(..., description="Original VTT start time in seconds")
original_end_time: float = Field(..., description="Original VTT end time in seconds")
target_start_time: float = Field(..., description="Target time in output video (seconds)")
ad_duration: float = Field(..., description="Duration of the AD TTS audio in seconds")
# For pause-insert method
pause_point: Optional[float] = Field(
None,
description="Where to insert freeze-frame in source video (seconds). Used for pause-insert method."
)
# For overlay method
duck_start: Optional[float] = Field(
None,
description="When to start ducking original audio (seconds). Used for overlay method."
)
duck_end: Optional[float] = Field(
None,
description="When to end ducking original audio (seconds). Used for overlay method."
)
class GeminiAccessibleVideoAnalysis(BaseModel):
"""Response schema for Gemini accessible video analysis.
This model captures the AI's determination of the optimal method
for integrating audio descriptions and the specific placement
instructions for each AD cue.
"""
method: AccessibleVideoMethod = Field(
...,
description="Chosen method: overlay (duck audio) or pause_insert (freeze-frame)"
)
method_rationale: str = Field(
...,
description="Explanation of why this method was chosen based on video analysis"
)
dialogue_density: float = Field(
...,
ge=0,
le=1,
description="Score from 0-1 indicating how much dialogue/speech is in the video"
)
placements: list[ADPlacementCue] = Field(
...,
description="Placement instructions for each AD cue"
)
total_added_duration: float = Field(
default=0,
description="Total pause time added to video (pause-insert method only, in seconds)"
)
warnings: list[str] = Field(
default_factory=list,
description="Any potential issues or concerns detected during analysis"
)
class ADCueSegment(BaseModel):
"""Represents a single synthesized AD cue segment."""
cue_index: int = Field(..., description="Index of the cue (0-based)")
start_time: float = Field(..., description="Original start time from VTT")
end_time: float = Field(..., description="Original end time from VTT")
duration: float = Field(..., description="Actual TTS audio duration in seconds")
gcs_uri: str = Field(..., description="GCS URI to the individual MP3 segment")
text: str = Field(..., description="The AD text that was synthesized")
class AccessibleVideoRenderRequest(BaseModel):
"""Request to render an accessible video for a job/language."""
job_id: str
language: str
class AccessibleVideoProgress(BaseModel):
"""Progress status for accessible video rendering."""
status: str = Field(..., description="pending | rendering | completed | failed")
method: Optional[AccessibleVideoMethod] = None
error_message: Optional[str] = None
started_at: Optional[str] = None
completed_at: Optional[str] = None

View file

@ -2,7 +2,14 @@ from typing import Any, Literal, Optional, Union
from pydantic import BaseModel
from ..models.job import JobStatus, LangOutput, RequestedOutputs, Review, TTSPreferences
from ..models.job import (
AccessibleVideoProgressItem,
JobStatus,
LangOutput,
RequestedOutputs,
Review,
TTSPreferences,
)
class JobResponse(BaseModel):
@ -13,6 +20,7 @@ class JobResponse(BaseModel):
requested_outputs: RequestedOutputs
review: Review
outputs: Optional[dict[str, LangOutput]] = None
accessible_video_progress: Optional[dict[str, AccessibleVideoProgressItem]] = None
created_at: Optional[str] = None
updated_at: Optional[str] = None

View file

@ -294,6 +294,161 @@ Fix the JSON and return it:
logger.debug(f"JSON fix attempt failed: {e}")
return None
async def analyze_accessible_video_placement(
self,
video_file_path: str,
ad_vtt_content: str,
ad_cue_durations: list[float]
) -> dict[str, Any]:
"""
Analyze video and determine optimal method for integrating audio descriptions.
Returns placement instructions for each AD cue.
Args:
video_file_path: Path to the source video file
ad_vtt_content: The audio description VTT content
ad_cue_durations: List of actual TTS audio durations in seconds (matching VTT cue order)
Returns:
Dictionary with method choice and placement instructions for each AD cue
"""
prompt_template = self._load_prompt("gemini_accessible_video.md")
# Format prompt with AD VTT content and durations
prompt = prompt_template.replace(
"{AD_VTT_CONTENT}", ad_vtt_content
).replace(
"{AD_CUE_DURATIONS}", json.dumps(ad_cue_durations)
)
uploaded_file = None
try:
logger.info(f"Starting accessible video analysis for: {video_file_path}")
logger.info(f"AD cues to place: {len(ad_cue_durations)}")
# Upload video file to Gemini
logger.info("Uploading video file to Gemini API for accessible video analysis...")
uploaded_file = await asyncio.to_thread(
client.files.upload,
file=video_file_path,
config={
"display_name": f"accessible_video_analysis_{Path(video_file_path).name}",
"mime_type": "video/mp4"
}
)
logger.info(f"Successfully uploaded file: {uploaded_file.name}")
# Wait for file to become ACTIVE
logger.info("Waiting for file to become ACTIVE...")
file_ready = await self._wait_for_file_active(uploaded_file.name)
if not file_ready:
raise Exception("File failed to become ACTIVE within timeout")
# Generate content with video and prompt
logger.info("Analyzing video with Gemini for accessible video placement...")
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[
genai.types.Part.from_text(text=prompt),
genai.types.Part.from_uri(
file_uri=uploaded_file.uri,
mime_type=uploaded_file.mime_type
)
]
)
# Parse JSON response
response_text = response.text.strip()
logger.info(f"Received accessible video analysis response (first 300 chars): {response_text[:300]}...")
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error in accessible video analysis: {e}")
# Try self-healing for this response
result = await self._self_heal_accessible_video_response(response_text)
# Validate required fields
required_fields = ["method", "method_rationale", "dialogue_density", "placements"]
for field in required_fields:
if field not in result:
raise ValueError(f"Missing required field in accessible video analysis: {field}")
# Validate method value
if result["method"] not in ["overlay", "pause_insert"]:
raise ValueError(f"Invalid method value: {result['method']}")
# Validate placements
if len(result["placements"]) != len(ad_cue_durations):
logger.warning(
f"Placement count mismatch: got {len(result['placements'])}, "
f"expected {len(ad_cue_durations)}"
)
logger.info(
f"Accessible video analysis complete: method={result['method']}, "
f"dialogue_density={result['dialogue_density']:.2f}, "
f"placements={len(result['placements'])}"
)
return result
except Exception as e:
logger.error(f"Accessible video analysis failed: {type(e).__name__}: {str(e)}")
raise
finally:
# Cleanup uploaded file
if uploaded_file:
try:
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
logger.info(f"Cleaned up uploaded file: {uploaded_file.name}")
except Exception as e:
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
async def _self_heal_accessible_video_response(self, invalid_response: str) -> dict[str, Any]:
"""Attempt to self-heal invalid JSON response from accessible video analysis"""
logger.info("Attempting to self-heal accessible video analysis response")
self_heal_prompt = f"""
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
CRITICAL REQUIREMENTS:
- The JSON MUST contain: method, method_rationale, dialogue_density, placements, total_added_duration, warnings
- method must be either "overlay" or "pause_insert"
- dialogue_density must be a number between 0 and 1
- placements must be an array of placement objects
- Fix any JSON syntax errors (trailing commas, unterminated strings, etc.)
Fix the JSON and return it:
{invalid_response}
"""
try:
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
)
response_text = response.text.strip()
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
result = json.loads(response_text)
logger.info("Successfully self-healed accessible video analysis response")
return result
except Exception as e:
logger.error(f"Self-heal attempt for accessible video analysis failed: {e}")
raise ValueError("Failed to get valid JSON from accessible video analysis after self-heal")
async def transcreate_content(
self,
captions_vtt: str,

View file

@ -1,4 +1,5 @@
import io
from dataclasses import dataclass
from typing import Optional
import aiohttp
@ -12,6 +13,17 @@ from .gemini_tts import gemini_tts_service
logger = get_logger(__name__)
@dataclass
class TTSCueSegment:
"""Represents a synthesized audio segment for a single AD cue."""
cue_index: int
start_time: float # Original VTT start time
end_time: float # Original VTT end time
duration: float # Actual TTS audio duration in seconds
text: str # The AD text that was synthesized
audio_bytes: bytes # The raw MP3 audio bytes
class TTSService:
def __init__(self):
# Check Gemini TTS availability (uses same API key as other Gemini services)
@ -96,6 +108,109 @@ class TTSService:
raise ValueError("No TTS service available")
async def synthesize_audio_description_with_segments(
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None,
provider: Optional[str] = None,
model: str = "flash",
speed: float = 1.0,
style_prompt: str = ""
) -> tuple[bytes, list[TTSCueSegment]]:
"""
Generate MP3 audio from audio description VTT content AND return individual segments.
Used for accessible video generation where we need per-cue audio files.
Returns:
Tuple of (combined_mp3_bytes, list_of_cue_segments)
"""
# Determine which provider to use
active_provider = provider or settings.tts_provider
# Extract simple language code for Gemini (e.g., "en-US" -> "en")
simple_lang = language_code.split("-")[0] if "-" in language_code else language_code
# Parse VTT cues first
cues = self._parse_ad_cues(ad_vtt_content)
if not cues:
raise ValueError("No audio description cues found")
# Synthesize each cue individually
segments: list[TTSCueSegment] = []
audio_segments_for_combine = []
current_audio_position = 0.0
for i, cue in enumerate(cues):
target_start_time = cue["start_time"]
# Add silence to reach the exact VTT start time
if target_start_time > current_audio_position:
silence_duration = target_start_time - current_audio_position
silence = AudioSegment.silent(duration=int(silence_duration * 1000))
audio_segments_for_combine.append(silence)
current_audio_position = target_start_time
text = cue["text"].strip()
if text:
# Ensure proper punctuation for natural TTS flow
if not text.endswith(('.', '!', '?')):
text += "."
# Synthesize with the appropriate provider
try:
if active_provider == "gemini" and self.gemini_available:
audio_data = await gemini_tts_service.synthesize_text(
text, voice_name or gemini_tts_service.default_voice,
simple_lang, model=model, speed=speed, style_prompt=style_prompt
)
elif self.google_client:
audio_data = await self._synthesize_text_google(text, language_code, voice_name)
elif self.elevenlabs_available:
voice_id = self._get_elevenlabs_voice(language_code, voice_name)
audio_data = await self._synthesize_text_elevenlabs(text, voice_id)
else:
raise ValueError("No TTS service available")
# Get actual duration from audio
audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3")
actual_duration = len(audio_segment) / 1000.0
# Store segment info
segments.append(TTSCueSegment(
cue_index=i,
start_time=cue["start_time"],
end_time=cue["end_time"],
duration=actual_duration,
text=cue["text"],
audio_bytes=audio_data
))
# Add to combined audio
audio_segments_for_combine.append(audio_segment)
current_audio_position += actual_duration
except Exception as e:
logger.warning(f"Failed to synthesize cue {i}: {e}")
# Add silence for failed cue
cue_duration = cue["end_time"] - cue["start_time"]
silence = AudioSegment.silent(duration=int(cue_duration * 1000))
audio_segments_for_combine.append(silence)
current_audio_position += cue_duration
# Combine all segments
if audio_segments_for_combine:
final_audio = sum(audio_segments_for_combine, AudioSegment.empty())
else:
final_audio = AudioSegment.silent(duration=1000)
# Export combined to MP3
output_buffer = io.BytesIO()
final_audio.export(output_buffer, format="mp3", bitrate="128k")
logger.info(f"Synthesized {len(segments)} AD cue segments")
return output_buffer.getvalue(), segments
async def _synthesize_with_google(
self,
ad_vtt_content: str,

View file

@ -0,0 +1,461 @@
"""Service for rendering accessible video with embedded audio descriptions using ffmpeg."""
import asyncio
import os
import subprocess
import tempfile
from pathlib import Path
from typing import Any
from ..core.config import settings
from ..core.logging import get_logger
from ..schemas.accessible_video import AccessibleVideoMethod, GeminiAccessibleVideoAnalysis
logger = get_logger(__name__)
class VideoRendererService:
"""Service for rendering accessible video with embedded audio descriptions."""
def __init__(self):
self.ffmpeg_path = "ffmpeg"
self.ffprobe_path = "ffprobe"
# Audio ducking settings
self.duck_level = getattr(settings, 'accessible_video_duck_level', 0.3)
self.duck_fade_ms = getattr(settings, 'accessible_video_duck_fade_ms', 200)
async def render_accessible_video(
self,
source_video_path: str,
ad_segments: list[tuple[int, str]], # [(cue_index, mp3_path), ...]
analysis: dict[str, Any],
output_path: str,
) -> str:
"""
Render accessible video based on Gemini analysis.
Args:
source_video_path: Path to source MP4
ad_segments: List of (cue_index, mp3_path) tuples for each AD segment
analysis: Gemini analysis dict with method and placements
output_path: Where to save the output MP4
Returns:
Path to rendered accessible video
"""
method = analysis.get("method", "pause_insert")
if method == "overlay":
return await self._render_overlay_method(
source_video_path, ad_segments, analysis, output_path
)
else:
return await self._render_pause_insert_method(
source_video_path, ad_segments, analysis, output_path
)
async def _render_overlay_method(
self,
source_video_path: str,
ad_segments: list[tuple[int, str]],
analysis: dict[str, Any],
output_path: str,
) -> str:
"""
Render with overlay method:
1. Create AD audio track with segments at target times
2. Apply ducking to original audio during AD playback
3. Mix tracks together
4. Mux with original video (copy video stream)
"""
logger.info(f"Starting overlay render for {source_video_path}")
placements = analysis.get("placements", [])
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
# Get source video duration
duration = await self._get_video_duration(source_video_path)
logger.info(f"Source video duration: {duration}s")
# Build ducking filter for original audio
duck_filters = []
for placement in placements:
duck_start = placement.get("duck_start")
duck_end = placement.get("duck_end")
if duck_start is not None and duck_end is not None:
# Volume filter: reduce to duck_level during AD, with fade
duck_filters.append(
f"volume=enable='between(t,{duck_start},{duck_end})':"
f"volume={self.duck_level}"
)
# Build ffmpeg command
inputs = ["-i", source_video_path]
filter_parts = []
# Add each AD segment as input
for cue_index, mp3_path in ad_segments:
inputs.extend(["-i", mp3_path])
# Build complex filter
# First, apply ducking to original audio
if duck_filters:
ducked_filter = ",".join(duck_filters)
filter_parts.append(f"[0:a]{ducked_filter}[ducked]")
base_audio = "[ducked]"
else:
base_audio = "[0:a]"
# Add delay to each AD segment and mix
ad_labels = []
for i, (cue_index, mp3_path) in enumerate(ad_segments):
# Find the placement for this cue
placement = next(
(p for p in placements if p.get("ad_cue_index") == cue_index),
None
)
if placement:
target_time = placement.get("target_start_time", 0)
delay_ms = int(target_time * 1000)
input_idx = i + 1 # 0 is source video
ad_label = f"ad{i}"
filter_parts.append(
f"[{input_idx}:a]adelay={delay_ms}|{delay_ms}[{ad_label}]"
)
ad_labels.append(f"[{ad_label}]")
# Mix all audio streams together
if ad_labels:
all_audio = base_audio + "".join(ad_labels)
num_inputs = 1 + len(ad_labels)
filter_parts.append(
f"{all_audio}amix=inputs={num_inputs}:duration=first:dropout_transition=0[mixed]"
)
audio_output = "[mixed]"
else:
audio_output = base_audio.replace("[", "").replace("]", "")
filter_complex = ";".join(filter_parts)
# Build final command
cmd = [
self.ffmpeg_path,
"-y", # Overwrite output
*inputs,
]
if filter_complex:
cmd.extend(["-filter_complex", filter_complex])
cmd.extend([
"-map", "0:v",
"-map", audio_output,
"-c:v", "copy", # Copy video stream (no re-encoding)
"-c:a", "aac",
"-b:a", "192k",
output_path
])
else:
cmd.extend([
"-c:v", "copy",
"-c:a", "copy",
output_path
])
logger.info(f"Running ffmpeg overlay command...")
await self._run_ffmpeg(cmd)
logger.info(f"Overlay render complete: {output_path}")
return output_path
async def _render_pause_insert_method(
self,
source_video_path: str,
ad_segments: list[tuple[int, str]],
analysis: dict[str, Any],
output_path: str,
) -> str:
"""
Render with pause-insert method:
1. Split video at each pause point
2. Extract freeze frame at each pause point
3. Create freeze-frame segment with AD audio
4. Concatenate all segments
"""
logger.info(f"Starting pause-insert render for {source_video_path}")
placements = analysis.get("placements", [])
# Sort placements by pause_point time
sorted_placements = sorted(
[p for p in placements if p.get("pause_point") is not None],
key=lambda p: p["pause_point"]
)
if not sorted_placements:
logger.warning("No pause points found, copying source video")
await self._copy_video(source_video_path, output_path)
return output_path
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
# Get video properties for re-encoding freeze frames
video_props = await self._get_video_properties(source_video_path)
logger.info(f"Video properties: {video_props}")
segment_files = []
current_time = 0.0
# Create a mapping of cue_index to mp3_path
cue_to_mp3 = {cue_index: mp3_path for cue_index, mp3_path in ad_segments}
for i, placement in enumerate(sorted_placements):
pause_point = placement["pause_point"]
cue_index = placement["ad_cue_index"]
ad_duration = placement["ad_duration"]
# Get the AD audio for this cue
ad_mp3_path = cue_to_mp3.get(cue_index)
if not ad_mp3_path:
logger.warning(f"No AD audio found for cue {cue_index}, skipping")
continue
# 1. Extract video segment from current_time to pause_point
if pause_point > current_time:
segment_path = temp_dir_path / f"segment_{i}_video.mp4"
await self._extract_segment(
source_video_path,
current_time,
pause_point - current_time,
str(segment_path)
)
segment_files.append(str(segment_path))
# 2. Extract freeze frame at pause point
freeze_frame_path = temp_dir_path / f"freeze_{i}.png"
await self._extract_frame(
source_video_path,
pause_point,
str(freeze_frame_path)
)
# 3. Create freeze segment with AD audio
freeze_segment_path = temp_dir_path / f"freeze_segment_{i}.mp4"
await self._create_freeze_segment(
str(freeze_frame_path),
ad_mp3_path,
ad_duration,
str(freeze_segment_path),
video_props
)
segment_files.append(str(freeze_segment_path))
current_time = pause_point
# 4. Add final segment from last pause point to end
source_duration = await self._get_video_duration(source_video_path)
if current_time < source_duration:
final_segment_path = temp_dir_path / "segment_final.mp4"
await self._extract_segment(
source_video_path,
current_time,
source_duration - current_time,
str(final_segment_path)
)
segment_files.append(str(final_segment_path))
# 5. Concatenate all segments
if segment_files:
await self._concatenate_segments(segment_files, output_path, temp_dir_path)
else:
await self._copy_video(source_video_path, output_path)
logger.info(f"Pause-insert render complete: {output_path}")
return output_path
async def _get_video_duration(self, video_path: str) -> float:
"""Get video duration in seconds using ffprobe."""
cmd = [
self.ffprobe_path,
"-v", "quiet",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
video_path
]
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
check=True
)
return float(result.stdout.strip())
async def _get_video_properties(self, video_path: str) -> dict[str, Any]:
"""Get video properties (resolution, framerate, codec) using ffprobe."""
cmd = [
self.ffprobe_path,
"-v", "quiet",
"-select_streams", "v:0",
"-show_entries", "stream=width,height,r_frame_rate,codec_name",
"-of", "json",
video_path
]
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
check=True
)
import json
data = json.loads(result.stdout)
stream = data.get("streams", [{}])[0]
# Parse frame rate (e.g., "30000/1001" or "30/1")
fps_str = stream.get("r_frame_rate", "30/1")
if "/" in fps_str:
num, den = fps_str.split("/")
fps = float(num) / float(den)
else:
fps = float(fps_str)
return {
"width": stream.get("width", 1920),
"height": stream.get("height", 1080),
"fps": fps,
"codec": stream.get("codec_name", "h264")
}
async def _extract_segment(
self,
source_path: str,
start_time: float,
duration: float,
output_path: str
):
"""Extract a video segment using ffmpeg."""
cmd = [
self.ffmpeg_path,
"-y",
"-ss", str(start_time),
"-i", source_path,
"-t", str(duration),
"-c", "copy",
"-avoid_negative_ts", "make_zero",
output_path
]
await self._run_ffmpeg(cmd)
async def _extract_frame(self, video_path: str, time_point: float, output_path: str):
"""Extract a single frame as PNG using ffmpeg."""
cmd = [
self.ffmpeg_path,
"-y",
"-ss", str(time_point),
"-i", video_path,
"-frames:v", "1",
"-q:v", "2",
output_path
]
await self._run_ffmpeg(cmd)
async def _create_freeze_segment(
self,
frame_path: str,
audio_path: str,
duration: float,
output_path: str,
video_props: dict[str, Any]
):
"""Create a freeze-frame video segment with audio overlay."""
width = video_props.get("width", 1920)
height = video_props.get("height", 1080)
fps = video_props.get("fps", 30)
cmd = [
self.ffmpeg_path,
"-y",
"-loop", "1",
"-i", frame_path,
"-i", audio_path,
"-c:v", "libx264",
"-preset", "fast",
"-tune", "stillimage",
"-c:a", "aac",
"-b:a", "192k",
"-pix_fmt", "yuv420p",
"-vf", f"scale={width}:{height}:force_original_aspect_ratio=decrease,pad={width}:{height}:(ow-iw)/2:(oh-ih)/2",
"-r", str(fps),
"-t", str(duration),
"-shortest",
output_path
]
await self._run_ffmpeg(cmd)
async def _concatenate_segments(
self,
segment_paths: list[str],
output_path: str,
temp_dir: Path
):
"""Concatenate video segments using ffmpeg concat demuxer."""
# Create concat file
concat_file = temp_dir / "concat.txt"
with open(concat_file, "w") as f:
for path in segment_paths:
# Escape single quotes in path
escaped_path = path.replace("'", "'\\''")
f.write(f"file '{escaped_path}'\n")
cmd = [
self.ffmpeg_path,
"-y",
"-f", "concat",
"-safe", "0",
"-i", str(concat_file),
"-c", "copy",
output_path
]
await self._run_ffmpeg(cmd)
async def _copy_video(self, source_path: str, output_path: str):
"""Copy video without modification."""
cmd = [
self.ffmpeg_path,
"-y",
"-i", source_path,
"-c", "copy",
output_path
]
await self._run_ffmpeg(cmd)
async def _run_ffmpeg(self, cmd: list[str], timeout: int = 3600):
"""Run ffmpeg command with proper error handling."""
logger.debug(f"Running command: {' '.join(cmd)}")
try:
result = await asyncio.wait_for(
asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True
),
timeout=timeout
)
if result.returncode != 0:
logger.error(f"ffmpeg error: {result.stderr}")
raise RuntimeError(f"ffmpeg failed with code {result.returncode}: {result.stderr}")
return result
except asyncio.TimeoutError:
logger.error(f"ffmpeg command timed out after {timeout}s")
raise RuntimeError(f"ffmpeg command timed out after {timeout}s")
# Global service instance
video_renderer_service = VideoRendererService()

View file

@ -0,0 +1,208 @@
"""Service for re-timing VTT files when pauses are inserted into video."""
from typing import Any
from ..core.logging import get_logger
logger = get_logger(__name__)
class VTTRetimerService:
"""Service for re-timing VTT subtitle files after pause insertions."""
def retime_for_pause_insert(
self,
original_vtt: str,
analysis: dict[str, Any]
) -> str:
"""
Generate new VTT with adjusted timings for pause-insert accessible video.
For each pause insertion, all subsequent cues shift forward by the pause duration.
Args:
original_vtt: Original VTT content
analysis: Gemini analysis with placements containing pause_point and ad_duration
Returns:
Re-timed VTT content
"""
placements = analysis.get("placements", [])
# Build list of (pause_point, pause_duration) sorted by time
pauses = []
for placement in placements:
pause_point = placement.get("pause_point")
ad_duration = placement.get("ad_duration", 0)
if pause_point is not None and ad_duration > 0:
pauses.append((pause_point, ad_duration))
pauses.sort(key=lambda x: x[0])
if not pauses:
logger.info("No pauses to apply, returning original VTT")
return original_vtt
logger.info(f"Re-timing VTT with {len(pauses)} pause insertions")
# Parse and retime cues
cues = self._parse_vtt(original_vtt)
retimed_cues = []
for cue in cues:
# Calculate cumulative offset from all pauses that occur before this cue's start
cumulative_offset = sum(
duration for pause_point, duration in pauses
if pause_point <= cue["start_time"]
)
retimed_cues.append({
"start_time": cue["start_time"] + cumulative_offset,
"end_time": cue["end_time"] + cumulative_offset,
"text": cue["text"]
})
return self._build_vtt(retimed_cues)
def retime_ad_vtt_for_pause_insert(
self,
original_ad_vtt: str,
analysis: dict[str, Any]
) -> str:
"""
Re-time the audio description VTT for pause-insert accessible video.
For AD cues, we use the target_start_time from the analysis
since they are placed at specific points during pauses.
Args:
original_ad_vtt: Original AD VTT content
analysis: Gemini analysis with placements
Returns:
Re-timed AD VTT content for accessible video
"""
placements = analysis.get("placements", [])
# Parse original AD VTT
cues = self._parse_vtt(original_ad_vtt)
if len(cues) != len(placements):
logger.warning(
f"AD cue count ({len(cues)}) doesn't match placements ({len(placements)})"
)
retimed_cues = []
for placement in placements:
cue_index = placement.get("ad_cue_index", 0)
target_start = placement.get("target_start_time", 0)
ad_duration = placement.get("ad_duration", 0)
# Get original text from matching cue
if cue_index < len(cues):
text = cues[cue_index]["text"]
else:
text = f"[Audio description cue {cue_index}]"
retimed_cues.append({
"start_time": target_start,
"end_time": target_start + ad_duration,
"text": text
})
return self._build_vtt(retimed_cues)
def _parse_vtt(self, vtt_content: str) -> list[dict]:
"""Parse VTT content into a list of cue dictionaries."""
lines = vtt_content.strip().split('\n')
cues = []
i = 0
while i < len(lines):
line = lines[i].strip()
# Skip header and empty lines
if line == "WEBVTT" or line == "" or line.startswith("NOTE"):
i += 1
continue
# Check for timing line
if " --> " in line:
timing_parts = line.split(" --> ")
start_time = self._parse_timestamp(timing_parts[0].strip())
# Handle potential settings after end time
end_part = timing_parts[1].strip()
if " " in end_part:
end_part = end_part.split(" ")[0]
end_time = self._parse_timestamp(end_part)
# Get text from next line(s)
i += 1
text_lines = []
while i < len(lines) and lines[i].strip() != "":
text_lines.append(lines[i].strip())
i += 1
if text_lines:
cues.append({
"start_time": start_time,
"end_time": end_time,
"text": "\n".join(text_lines)
})
else:
i += 1
return cues
def _parse_timestamp(self, timestamp: str) -> float:
"""Convert VTT timestamp to seconds."""
# Format: HH:MM:SS.mmm or MM:SS.mmm
parts = timestamp.split(":")
if len(parts) == 3: # HH:MM:SS.mmm
hours, minutes, seconds = parts
elif len(parts) == 2: # MM:SS.mmm
hours, minutes, seconds = "0", parts[0], parts[1]
else:
raise ValueError(f"Invalid timestamp format: {timestamp}")
# Parse seconds and milliseconds
sec_parts = seconds.split(".")
seconds_int = int(sec_parts[0])
milliseconds = int(sec_parts[1]) if len(sec_parts) > 1 else 0
total_seconds = (
int(hours) * 3600 +
int(minutes) * 60 +
seconds_int +
milliseconds / 1000.0
)
return total_seconds
def _format_timestamp(self, seconds: float) -> str:
"""Convert seconds to VTT timestamp format (HH:MM:SS.mmm)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
whole_secs = int(secs)
millis = int((secs - whole_secs) * 1000)
return f"{hours:02d}:{minutes:02d}:{whole_secs:02d}.{millis:03d}"
def _build_vtt(self, cues: list[dict]) -> str:
"""Build VTT content from list of cue dictionaries."""
lines = ["WEBVTT", ""]
for cue in cues:
start_ts = self._format_timestamp(cue["start_time"])
end_ts = self._format_timestamp(cue["end_time"])
lines.append(f"{start_ts} --> {end_ts}")
lines.append(cue["text"])
lines.append("")
return "\n".join(lines)
# Global service instance
vtt_retimer_service = VTTRetimerService()

View file

@ -27,6 +27,7 @@ celery_app.conf.update(
task_routes={
"app.tasks.ingest_and_ai.*": {"queue": "ingest"},
"app.tasks.translate_and_synthesize.*": {"queue": "default"},
"app.tasks.render_accessible_video.*": {"queue": "render"},
"app.tasks.notify.*": {"queue": "notify"},
},
task_default_queue="default",
@ -117,6 +118,7 @@ def import_task_modules():
try:
from . import ingest_and_ai # noqa: E402, F401
from . import translate_and_synthesize # noqa: E402, F401
from . import render_accessible_video # noqa: E402, F401
from . import notify # noqa: E402, F401
logger.info("Successfully imported all task modules")
except Exception as e:

View file

@ -0,0 +1,315 @@
"""Celery task for rendering accessible video with embedded audio descriptions."""
import asyncio
import os
import tempfile
from datetime import datetime
from typing import Any
from motor.motor_asyncio import AsyncIOMotorClient
from ..core.config import settings
from ..core.logging import get_logger
from ..models.job import JobStatus
from ..services.gcs import gcs_service
from ..services.gemini import gemini_service
from ..services.video_renderer import video_renderer_service
from ..services.vtt_retimer import vtt_retimer_service
from . import celery_app
from .translate_and_synthesize import broadcast_status_update, retry_with_backoff
logger = get_logger(__name__)
@celery_app.task(bind=True, time_limit=7200, soft_time_limit=7000) # 2 hour limit for video rendering
def render_accessible_video_task(self, job_id: str, language: str):
"""
Pipeline 3: Accessible Video Rendering
Triggered after TTS generation completes for a language when accessible_video_mp4 is requested.
Steps:
1. Download source video and per-cue AD MP3s from GCS
2. Get AD VTT content and calculate cue durations
3. Call Gemini for placement analysis
4. Render accessible video (overlay or pause-insert)
5. If pause-insert: generate re-timed caption VTT
6. Upload outputs to GCS
7. Update job document
"""
logger.info(f"Starting accessible video render for job {job_id}, language {language}")
try:
result = asyncio.run(_async_render_accessible_video(job_id, language))
logger.info(f"Accessible video render completed for job {job_id}, language {language}")
return result
except Exception as e:
logger.error(f"Accessible video render failed for job {job_id}/{language}: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
raise
async def _async_render_accessible_video(job_id: str, language: str):
"""Async implementation of accessible video rendering."""
logger.info(f"Async render started for job {job_id}, language {language}")
client = AsyncIOMotorClient(settings.mongodb_uri)
db = client[settings.mongodb_db]
try:
# Get job details
job_doc = await db.jobs.find_one({"_id": job_id})
if not job_doc:
raise ValueError(f"Job {job_id} not found")
job_title = job_doc.get("title", "Untitled Job")
# Verify accessible video is requested
if not job_doc["requested_outputs"].get("accessible_video_mp4"):
logger.info(f"Accessible video not requested for job {job_id}")
return
# Update progress to rendering
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
f"accessible_video_progress.{language}": {
"status": "rendering",
"started_at": datetime.utcnow()
},
"updated_at": datetime.utcnow()
}
}
)
# Broadcast status update
broadcast_status_update(
job_id,
"rendering",
job_title=job_title,
message=f"Rendering accessible video for {language.upper()}"
)
with tempfile.TemporaryDirectory() as temp_dir:
# 1. Download source video from GCS
source_video_gcs = job_doc["source"]["gcs_uri"]
source_blob_path = source_video_gcs.replace(f"gs://{settings.gcs_bucket}/", "")
source_video_path = os.path.join(temp_dir, "source.mp4")
logger.info(f"Downloading source video from {source_blob_path}")
source_blob = gcs_service.bucket.blob(source_blob_path)
source_blob.download_to_filename(source_video_path)
# 2. Get language outputs
lang_output = job_doc["outputs"].get(language)
if not lang_output:
raise ValueError(f"No outputs found for language {language}")
# 3. Download AD VTT content
ad_vtt_gcs = lang_output.get("ad_vtt_gcs")
if not ad_vtt_gcs:
raise ValueError(f"No AD VTT found for language {language}")
ad_blob_path = ad_vtt_gcs.replace(f"gs://{settings.gcs_bucket}/", "")
ad_blob = gcs_service.bucket.blob(ad_blob_path)
ad_vtt_content = ad_blob.download_as_text()
# 4. Download per-cue AD MP3 segments
ad_cues_prefix = lang_output.get("ad_cues_gcs_prefix")
if not ad_cues_prefix:
raise ValueError(f"No AD cue segments found for language {language}")
# List and download all cue segments
ad_segments = []
cue_durations = []
prefix_path = ad_cues_prefix.replace(f"gs://{settings.gcs_bucket}/", "")
blobs = list(gcs_service.bucket.list_blobs(prefix=prefix_path))
# Sort by cue index
cue_blobs = [(b, int(b.name.split("_")[-1].replace(".mp3", ""))) for b in blobs if b.name.endswith(".mp3")]
cue_blobs.sort(key=lambda x: x[1])
for blob, cue_index in cue_blobs:
local_path = os.path.join(temp_dir, f"cue_{cue_index}.mp3")
blob.download_to_filename(local_path)
ad_segments.append((cue_index, local_path))
# Get duration from audio file
from pydub import AudioSegment
audio = AudioSegment.from_mp3(local_path)
duration = len(audio) / 1000.0 # Convert ms to seconds
cue_durations.append(duration)
logger.info(f"Downloaded {len(ad_segments)} AD cue segments")
# 5. Call Gemini for placement analysis
logger.info("Analyzing video for AD placement with Gemini...")
async def analyze():
return await gemini_service.analyze_accessible_video_placement(
source_video_path,
ad_vtt_content,
cue_durations
)
analysis = await retry_with_backoff(analyze, max_retries=2)
method = analysis.get("method", "pause_insert")
logger.info(f"Gemini analysis complete: method={method}")
# 6. Render accessible video
output_video_path = os.path.join(temp_dir, "accessible_video.mp4")
logger.info(f"Rendering accessible video using {method} method...")
await video_renderer_service.render_accessible_video(
source_video_path,
ad_segments,
analysis,
output_video_path
)
# 7. Upload rendered video to GCS
video_blob_path = f"{job_id}/{language}/accessible_video.mp4"
video_blob = gcs_service.bucket.blob(video_blob_path)
video_blob.content_type = "video/mp4"
video_blob.upload_from_filename(output_video_path)
video_gcs_uri = f"gs://{settings.gcs_bucket}/{video_blob_path}"
logger.info(f"Uploaded accessible video to {video_gcs_uri}")
# 8. If pause-insert, generate re-timed captions VTT
retimed_captions_gcs_uri = None
if method == "pause_insert":
# Download original captions VTT
captions_vtt_gcs = lang_output.get("captions_vtt_gcs")
if captions_vtt_gcs:
captions_blob_path = captions_vtt_gcs.replace(f"gs://{settings.gcs_bucket}/", "")
captions_blob = gcs_service.bucket.blob(captions_blob_path)
original_captions_vtt = captions_blob.download_as_text()
# Re-time captions
retimed_captions = vtt_retimer_service.retime_for_pause_insert(
original_captions_vtt,
analysis
)
# Upload re-timed captions
retimed_blob_path = f"{job_id}/{language}/accessible_captions.vtt"
retimed_blob = gcs_service.bucket.blob(retimed_blob_path)
retimed_blob.content_type = "text/vtt"
retimed_blob.upload_from_string(retimed_captions, content_type="text/vtt")
retimed_captions_gcs_uri = f"gs://{settings.gcs_bucket}/{retimed_blob_path}"
logger.info(f"Uploaded re-timed captions to {retimed_captions_gcs_uri}")
# 9. Update job document with results
update_fields = {
f"outputs.{language}.accessible_video_gcs": video_gcs_uri,
f"outputs.{language}.accessible_video_method": method,
f"accessible_video_progress.{language}": {
"status": "completed",
"method": method,
"started_at": job_doc.get("accessible_video_progress", {}).get(language, {}).get("started_at"),
"completed_at": datetime.utcnow()
},
"updated_at": datetime.utcnow()
}
if retimed_captions_gcs_uri:
update_fields[f"outputs.{language}.retimed_captions_vtt_gcs"] = retimed_captions_gcs_uri
await db.jobs.update_one(
{"_id": job_id},
{"$set": update_fields}
)
# Broadcast completion
broadcast_status_update(
job_id,
"asset_ready",
job_title=job_title,
message=f"Accessible video ready for {language.upper()} ({method} method)"
)
# Check if all accessible videos are complete
await _check_accessible_video_completion(job_id, db)
logger.info(f"Accessible video render complete for job {job_id}/{language}")
except Exception as e:
logger.error(f"Accessible video render failed for job {job_id}/{language}: {e}")
# Update progress to failed
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
f"accessible_video_progress.{language}": {
"status": "failed",
"error_message": str(e),
"completed_at": datetime.utcnow()
},
"updated_at": datetime.utcnow()
}
}
)
raise
finally:
client.close()
async def _check_accessible_video_completion(job_id: str, db):
"""Check if all accessible videos are complete and update job status accordingly."""
job_doc = await db.jobs.find_one({"_id": job_id})
if not job_doc:
return
progress = job_doc.get("accessible_video_progress", {})
requested_languages = job_doc["requested_outputs"]["languages"]
# Check if all requested languages have completed accessible video
all_complete = True
any_failed = False
for language in requested_languages:
lang_progress = progress.get(language, {})
status = lang_progress.get("status", "pending")
if status == "failed":
any_failed = True
elif status != "completed":
all_complete = False
if all_complete:
logger.info(f"All accessible videos complete for job {job_id}")
# If job is still in TTS_GENERATING, transition to PENDING_FINAL_REVIEW
if job_doc["status"] == JobStatus.TTS_GENERATING.value:
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"status": JobStatus.PENDING_FINAL_REVIEW.value,
"updated_at": datetime.utcnow()
},
"$push": {
"review.history": {
"at": datetime.utcnow(),
"status": JobStatus.PENDING_FINAL_REVIEW.value,
"by": "system"
}
}
}
)
job_title = job_doc.get("title", "Untitled Job")
broadcast_status_update(
job_id,
JobStatus.PENDING_FINAL_REVIEW.value,
job_title=job_title,
message=f"{job_title} has all accessible videos complete - ready for Final Review"
)

View file

@ -268,36 +268,50 @@ async def _async_translate_and_synthesize(job_id: str):
)
# Generate TTS for languages that need MP3
accessible_video_requested = job_doc["requested_outputs"].get("accessible_video_mp4", False)
if job_doc["requested_outputs"]["audio_description_mp3"]:
# Get TTS preferences from job
tts_preferences = job_doc["requested_outputs"].get("tts_preferences", {})
await _generate_tts_for_languages(job_id, updated_outputs, db, source_language, tts_preferences)
await _generate_tts_for_languages(
job_id, updated_outputs, db, source_language, tts_preferences, accessible_video_requested
)
# Update final status
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"status": JobStatus.PENDING_FINAL_REVIEW.value,
"updated_at": datetime.utcnow()
},
"$push": {
"review.history": {
"at": datetime.utcnow(),
# If accessible video is requested, the render task will handle the transition
# to PENDING_FINAL_REVIEW when all videos are complete
if not accessible_video_requested:
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"status": JobStatus.PENDING_FINAL_REVIEW.value,
"by": "system"
"updated_at": datetime.utcnow()
},
"$push": {
"review.history": {
"at": datetime.utcnow(),
"status": JobStatus.PENDING_FINAL_REVIEW.value,
"by": "system"
}
}
}
}
)
# Broadcast status update
broadcast_status_update(
job_id,
JobStatus.PENDING_FINAL_REVIEW.value,
job_title=job_title,
message=f"{job_title} has finished translation and audio generation - ready for Final Review"
)
)
# Broadcast status update
broadcast_status_update(
job_id,
JobStatus.PENDING_FINAL_REVIEW.value,
job_title=job_title,
message=f"{job_title} has finished translation and audio generation - ready for Final Review"
)
else:
# When accessible video is requested, stay in TTS_GENERATING
# The render_accessible_video task will transition to PENDING_FINAL_REVIEW
logger.info(
f"Accessible video rendering triggered for job {job_id}. "
f"Staying in TTS_GENERATING until all videos are complete."
)
logger.info(f"Successfully completed translation and synthesis for job {job_id}")
@ -330,7 +344,8 @@ async def _generate_tts_for_languages(
outputs: dict[str, Any],
db,
source_language: str = "en",
tts_preferences: dict = None
tts_preferences: dict = None,
accessible_video_requested: bool = False
):
"""Generate TTS audio for each language's audio description"""
if tts_preferences is None:
@ -338,15 +353,19 @@ async def _generate_tts_for_languages(
# Always generate source language MP3 first
if source_language in outputs and "ad_vtt_gcs" in outputs[source_language]:
await _generate_language_tts(job_id, source_language, outputs[source_language], db, tts_preferences)
await _generate_language_tts(
job_id, source_language, outputs[source_language], db, tts_preferences, accessible_video_requested
)
# Generate for other languages
for language, lang_output in outputs.items():
if language != source_language and "ad_vtt_gcs" in lang_output:
await _generate_language_tts(job_id, language, lang_output, db, tts_preferences)
await _generate_language_tts(
job_id, language, lang_output, db, tts_preferences, accessible_video_requested
)
async def _generate_language_tts(job_id: str, language: str, lang_output: dict, db, tts_preferences: dict = None):
async def _generate_language_tts(job_id: str, language: str, lang_output: dict, db, tts_preferences: dict = None, accessible_video_requested: bool = False):
"""Generate TTS for a specific language"""
if tts_preferences is None:
tts_preferences = {}
@ -379,23 +398,52 @@ async def _generate_language_tts(job_id: str, language: str, lang_output: dict,
logger.info(
f"Generating TTS for {language} with voice={voice_name}, provider={provider}, "
f"model={model}, speed={speed}x, style={style_preset}"
f"model={model}, speed={speed}x, style={style_preset}, accessible_video={accessible_video_requested}"
)
async def synthesize():
return await tts_service.synthesize_audio_description(
ad_vtt_content,
language_code,
voice_name=voice_name,
provider=provider,
model=model,
speed=speed,
style_prompt=style_prompt
)
# Use the segments method if accessible video is requested
if accessible_video_requested:
async def synthesize_with_segments():
return await tts_service.synthesize_audio_description_with_segments(
ad_vtt_content,
language_code,
voice_name=voice_name,
provider=provider,
model=model,
speed=speed,
style_prompt=style_prompt
)
mp3_data = await retry_with_backoff(synthesize, max_retries=3)
mp3_data, segments = await retry_with_backoff(synthesize_with_segments, max_retries=3)
# Upload MP3 to GCS
# Upload individual cue segments to GCS
ad_cues_prefix = f"{job_id}/{language}/ad_cues/"
for segment in segments:
cue_blob_path = f"{ad_cues_prefix}cue_{segment.cue_index}.mp3"
cue_blob = gcs_service.bucket.blob(cue_blob_path)
cue_blob.content_type = "audio/mpeg"
cue_blob.upload_from_string(segment.audio_bytes, content_type="audio/mpeg")
logger.info(f"Uploaded {len(segments)} per-cue AD segments for {language}")
# Store the prefix path
ad_cues_gcs_prefix = f"gs://{settings.gcs_bucket}/{ad_cues_prefix}"
else:
async def synthesize():
return await tts_service.synthesize_audio_description(
ad_vtt_content,
language_code,
voice_name=voice_name,
provider=provider,
model=model,
speed=speed,
style_prompt=style_prompt
)
mp3_data = await retry_with_backoff(synthesize, max_retries=3)
ad_cues_gcs_prefix = None
# Upload combined MP3 to GCS
mp3_blob_path = f"{job_id}/{language}/ad.mp3"
mp3_blob = gcs_service.bucket.blob(mp3_blob_path)
mp3_blob.content_type = "audio/mpeg"
@ -404,14 +452,17 @@ async def _generate_language_tts(job_id: str, language: str, lang_output: dict,
mp3_gcs_uri = f"gs://{settings.gcs_bucket}/{mp3_blob_path}"
# Update job outputs
update_fields = {
f"outputs.{language}.ad_mp3_gcs": mp3_gcs_uri,
"updated_at": datetime.utcnow()
}
if ad_cues_gcs_prefix:
update_fields[f"outputs.{language}.ad_cues_gcs_prefix"] = ad_cues_gcs_prefix
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
f"outputs.{language}.ad_mp3_gcs": mp3_gcs_uri,
"updated_at": datetime.utcnow()
}
}
{"$set": update_fields}
)
logger.info(f"Successfully generated TTS for {language}")
@ -426,6 +477,25 @@ async def _generate_language_tts(job_id: str, language: str, lang_output: dict,
message=f"Audio description MP3 ready for {language.upper()}"
)
# Trigger accessible video rendering if requested
if accessible_video_requested:
from .render_accessible_video import render_accessible_video_task
# Initialize progress tracking for this language
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
f"accessible_video_progress.{language}": {
"status": "pending"
}
}
}
)
render_accessible_video_task.delay(job_id, language)
logger.info(f"Triggered accessible video rendering for job {job_id}/{language}")
except Exception as e:
logger.error(f"TTS generation failed for {language}: {e}")

View file

@ -20,6 +20,7 @@ const jobSchema = z.object({
captions_vtt: z.boolean(),
audio_description_vtt: z.boolean(),
audio_description_mp3: z.boolean(),
accessible_video_mp4: z.boolean(),
languages: z.array(z.string()),
transcreation: z.array(z.string()),
});
@ -71,6 +72,7 @@ export function NewJob() {
captions_vtt: true,
audio_description_vtt: true,
audio_description_mp3: true,
accessible_video_mp4: false,
languages: [],
transcreation: [],
}
@ -125,6 +127,7 @@ export function NewJob() {
captions_vtt: data.captions_vtt,
audio_description_vtt: data.audio_description_vtt,
audio_description_mp3: data.audio_description_mp3,
accessible_video_mp4: data.accessible_video_mp4,
languages: data.languages,
transcreation: data.transcreation,
tts_preferences: data.audio_description_mp3 ? ttsPreferences : undefined,
@ -202,6 +205,7 @@ export function NewJob() {
captions_vtt: data.captions_vtt,
audio_description_vtt: data.audio_description_vtt,
audio_description_mp3: data.audio_description_mp3,
accessible_video_mp4: data.accessible_video_mp4,
languages: data.languages,
transcreation: data.transcreation,
tts_preferences: data.audio_description_mp3 ? ttsPreferences : undefined,
@ -246,6 +250,7 @@ export function NewJob() {
captions_vtt: data.captions_vtt,
audio_description_vtt: data.audio_description_vtt,
audio_description_mp3: data.audio_description_mp3,
accessible_video_mp4: data.accessible_video_mp4,
languages: data.languages,
transcreation: data.transcreation,
tts_preferences: data.audio_description_mp3 ? ttsPreferences : undefined,
@ -583,6 +588,14 @@ export function NewJob() {
/>
<span>Audio Description Voiceover (MP3)</span>
</label>
<label className="flex items-center">
<input
type="checkbox"
{...register('accessible_video_mp4')}
className="mr-2"
/>
<span>Accessible Video (MP4 with embedded audio descriptions)</span>
</label>
</div>
</div>

View file

@ -55,6 +55,7 @@ export interface RequestedOutputs {
captions_vtt: boolean;
audio_description_vtt: boolean;
audio_description_mp3: boolean;
accessible_video_mp4: boolean; // Rendered video with embedded audio descriptions
languages: string[];
transcreation: string[];
tts_preferences?: TTSPreferences;
@ -88,10 +89,17 @@ export interface TTSOptionsResponse {
speed_range: SpeedRange;
}
export type AccessibleVideoMethod = "overlay" | "pause_insert";
export interface LangOutput {
captions_vtt_gcs?: string;
ad_vtt_gcs?: string;
ad_mp3_gcs?: string;
// Accessible video outputs
accessible_video_gcs?: string;
accessible_video_method?: AccessibleVideoMethod;
retimed_captions_vtt_gcs?: string; // Re-timed captions for pause-insert method
ad_cues_gcs_prefix?: string; // Path prefix for per-cue MP3 segments
origin?: "translate" | "transcreate";
qa_notes?: string;
}
@ -114,6 +122,16 @@ export interface AISection {
confidence?: number;
}
export type AccessibleVideoProgressStatus = "pending" | "rendering" | "completed" | "failed";
export interface AccessibleVideoProgressItem {
status: AccessibleVideoProgressStatus;
method?: AccessibleVideoMethod;
error_message?: string;
started_at?: string;
completed_at?: string;
}
export interface Job {
id: string;
client_id: string;
@ -123,6 +141,7 @@ export interface Job {
status: JobStatus;
review: Review;
outputs?: Record<string, LangOutput>;
accessible_video_progress?: Record<string, AccessibleVideoProgressItem>;
ai?: AISection;
error?: Record<string, unknown>;
created_at: string;