video-accessibility/backend/app/tasks/render_accessible_video.py
michael 80d3866d32 feat: add accessible video (MP4 with embedded audio descriptions)
Add new deliverable type that renders video with audio descriptions embedded.
Supports two AI-determined methods:
- Direct Overlay: ducks original audio and overlays AD TTS (for minimal dialogue)
- Pause-Insert: freeze-frame video, insert AD, re-time subtitles (for significant dialogue)

Backend:
- Add Pydantic schemas for Gemini analysis response
- Add Gemini prompt and analyze_accessible_video_placement() method
- Add video_renderer.py service using FFmpeg for both rendering methods
- Add vtt_retimer.py service for pause-insert subtitle adjustment
- Add render_accessible_video.py Celery task
- Modify TTS service to return individual per-cue segments
- Update translate_and_synthesize.py to save segments and trigger rendering
- Update download endpoint to include accessible video outputs

Frontend:
- Add accessible_video_mp4 checkbox to NewJob form
- Update TypeScript types for new deliverable

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-26 11:06:41 -06:00

315 lines
12 KiB
Python

"""Celery task for rendering accessible video with embedded audio descriptions."""
import asyncio
import os
import tempfile
from datetime import datetime
from typing import Any
from motor.motor_asyncio import AsyncIOMotorClient
from ..core.config import settings
from ..core.logging import get_logger
from ..models.job import JobStatus
from ..services.gcs import gcs_service
from ..services.gemini import gemini_service
from ..services.video_renderer import video_renderer_service
from ..services.vtt_retimer import vtt_retimer_service
from . import celery_app
from .translate_and_synthesize import broadcast_status_update, retry_with_backoff
logger = get_logger(__name__)
@celery_app.task(bind=True, time_limit=7200, soft_time_limit=7000) # 2 hour limit for video rendering
def render_accessible_video_task(self, job_id: str, language: str):
"""
Pipeline 3: Accessible Video Rendering
Triggered after TTS generation completes for a language when accessible_video_mp4 is requested.
Steps:
1. Download source video and per-cue AD MP3s from GCS
2. Get AD VTT content and calculate cue durations
3. Call Gemini for placement analysis
4. Render accessible video (overlay or pause-insert)
5. If pause-insert: generate re-timed caption VTT
6. Upload outputs to GCS
7. Update job document
"""
logger.info(f"Starting accessible video render for job {job_id}, language {language}")
try:
result = asyncio.run(_async_render_accessible_video(job_id, language))
logger.info(f"Accessible video render completed for job {job_id}, language {language}")
return result
except Exception as e:
logger.error(f"Accessible video render failed for job {job_id}/{language}: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
raise
async def _async_render_accessible_video(job_id: str, language: str):
"""Async implementation of accessible video rendering."""
logger.info(f"Async render started for job {job_id}, language {language}")
client = AsyncIOMotorClient(settings.mongodb_uri)
db = client[settings.mongodb_db]
try:
# Get job details
job_doc = await db.jobs.find_one({"_id": job_id})
if not job_doc:
raise ValueError(f"Job {job_id} not found")
job_title = job_doc.get("title", "Untitled Job")
# Verify accessible video is requested
if not job_doc["requested_outputs"].get("accessible_video_mp4"):
logger.info(f"Accessible video not requested for job {job_id}")
return
# Update progress to rendering
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
f"accessible_video_progress.{language}": {
"status": "rendering",
"started_at": datetime.utcnow()
},
"updated_at": datetime.utcnow()
}
}
)
# Broadcast status update
broadcast_status_update(
job_id,
"rendering",
job_title=job_title,
message=f"Rendering accessible video for {language.upper()}"
)
with tempfile.TemporaryDirectory() as temp_dir:
# 1. Download source video from GCS
source_video_gcs = job_doc["source"]["gcs_uri"]
source_blob_path = source_video_gcs.replace(f"gs://{settings.gcs_bucket}/", "")
source_video_path = os.path.join(temp_dir, "source.mp4")
logger.info(f"Downloading source video from {source_blob_path}")
source_blob = gcs_service.bucket.blob(source_blob_path)
source_blob.download_to_filename(source_video_path)
# 2. Get language outputs
lang_output = job_doc["outputs"].get(language)
if not lang_output:
raise ValueError(f"No outputs found for language {language}")
# 3. Download AD VTT content
ad_vtt_gcs = lang_output.get("ad_vtt_gcs")
if not ad_vtt_gcs:
raise ValueError(f"No AD VTT found for language {language}")
ad_blob_path = ad_vtt_gcs.replace(f"gs://{settings.gcs_bucket}/", "")
ad_blob = gcs_service.bucket.blob(ad_blob_path)
ad_vtt_content = ad_blob.download_as_text()
# 4. Download per-cue AD MP3 segments
ad_cues_prefix = lang_output.get("ad_cues_gcs_prefix")
if not ad_cues_prefix:
raise ValueError(f"No AD cue segments found for language {language}")
# List and download all cue segments
ad_segments = []
cue_durations = []
prefix_path = ad_cues_prefix.replace(f"gs://{settings.gcs_bucket}/", "")
blobs = list(gcs_service.bucket.list_blobs(prefix=prefix_path))
# Sort by cue index
cue_blobs = [(b, int(b.name.split("_")[-1].replace(".mp3", ""))) for b in blobs if b.name.endswith(".mp3")]
cue_blobs.sort(key=lambda x: x[1])
for blob, cue_index in cue_blobs:
local_path = os.path.join(temp_dir, f"cue_{cue_index}.mp3")
blob.download_to_filename(local_path)
ad_segments.append((cue_index, local_path))
# Get duration from audio file
from pydub import AudioSegment
audio = AudioSegment.from_mp3(local_path)
duration = len(audio) / 1000.0 # Convert ms to seconds
cue_durations.append(duration)
logger.info(f"Downloaded {len(ad_segments)} AD cue segments")
# 5. Call Gemini for placement analysis
logger.info("Analyzing video for AD placement with Gemini...")
async def analyze():
return await gemini_service.analyze_accessible_video_placement(
source_video_path,
ad_vtt_content,
cue_durations
)
analysis = await retry_with_backoff(analyze, max_retries=2)
method = analysis.get("method", "pause_insert")
logger.info(f"Gemini analysis complete: method={method}")
# 6. Render accessible video
output_video_path = os.path.join(temp_dir, "accessible_video.mp4")
logger.info(f"Rendering accessible video using {method} method...")
await video_renderer_service.render_accessible_video(
source_video_path,
ad_segments,
analysis,
output_video_path
)
# 7. Upload rendered video to GCS
video_blob_path = f"{job_id}/{language}/accessible_video.mp4"
video_blob = gcs_service.bucket.blob(video_blob_path)
video_blob.content_type = "video/mp4"
video_blob.upload_from_filename(output_video_path)
video_gcs_uri = f"gs://{settings.gcs_bucket}/{video_blob_path}"
logger.info(f"Uploaded accessible video to {video_gcs_uri}")
# 8. If pause-insert, generate re-timed captions VTT
retimed_captions_gcs_uri = None
if method == "pause_insert":
# Download original captions VTT
captions_vtt_gcs = lang_output.get("captions_vtt_gcs")
if captions_vtt_gcs:
captions_blob_path = captions_vtt_gcs.replace(f"gs://{settings.gcs_bucket}/", "")
captions_blob = gcs_service.bucket.blob(captions_blob_path)
original_captions_vtt = captions_blob.download_as_text()
# Re-time captions
retimed_captions = vtt_retimer_service.retime_for_pause_insert(
original_captions_vtt,
analysis
)
# Upload re-timed captions
retimed_blob_path = f"{job_id}/{language}/accessible_captions.vtt"
retimed_blob = gcs_service.bucket.blob(retimed_blob_path)
retimed_blob.content_type = "text/vtt"
retimed_blob.upload_from_string(retimed_captions, content_type="text/vtt")
retimed_captions_gcs_uri = f"gs://{settings.gcs_bucket}/{retimed_blob_path}"
logger.info(f"Uploaded re-timed captions to {retimed_captions_gcs_uri}")
# 9. Update job document with results
update_fields = {
f"outputs.{language}.accessible_video_gcs": video_gcs_uri,
f"outputs.{language}.accessible_video_method": method,
f"accessible_video_progress.{language}": {
"status": "completed",
"method": method,
"started_at": job_doc.get("accessible_video_progress", {}).get(language, {}).get("started_at"),
"completed_at": datetime.utcnow()
},
"updated_at": datetime.utcnow()
}
if retimed_captions_gcs_uri:
update_fields[f"outputs.{language}.retimed_captions_vtt_gcs"] = retimed_captions_gcs_uri
await db.jobs.update_one(
{"_id": job_id},
{"$set": update_fields}
)
# Broadcast completion
broadcast_status_update(
job_id,
"asset_ready",
job_title=job_title,
message=f"Accessible video ready for {language.upper()} ({method} method)"
)
# Check if all accessible videos are complete
await _check_accessible_video_completion(job_id, db)
logger.info(f"Accessible video render complete for job {job_id}/{language}")
except Exception as e:
logger.error(f"Accessible video render failed for job {job_id}/{language}: {e}")
# Update progress to failed
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
f"accessible_video_progress.{language}": {
"status": "failed",
"error_message": str(e),
"completed_at": datetime.utcnow()
},
"updated_at": datetime.utcnow()
}
}
)
raise
finally:
client.close()
async def _check_accessible_video_completion(job_id: str, db):
"""Check if all accessible videos are complete and update job status accordingly."""
job_doc = await db.jobs.find_one({"_id": job_id})
if not job_doc:
return
progress = job_doc.get("accessible_video_progress", {})
requested_languages = job_doc["requested_outputs"]["languages"]
# Check if all requested languages have completed accessible video
all_complete = True
any_failed = False
for language in requested_languages:
lang_progress = progress.get(language, {})
status = lang_progress.get("status", "pending")
if status == "failed":
any_failed = True
elif status != "completed":
all_complete = False
if all_complete:
logger.info(f"All accessible videos complete for job {job_id}")
# If job is still in TTS_GENERATING, transition to PENDING_FINAL_REVIEW
if job_doc["status"] == JobStatus.TTS_GENERATING.value:
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"status": JobStatus.PENDING_FINAL_REVIEW.value,
"updated_at": datetime.utcnow()
},
"$push": {
"review.history": {
"at": datetime.utcnow(),
"status": JobStatus.PENDING_FINAL_REVIEW.value,
"by": "system"
}
}
}
)
job_title = job_doc.get("title", "Untitled Job")
broadcast_status_update(
job_id,
JobStatus.PENDING_FINAL_REVIEW.value,
job_title=job_title,
message=f"{job_title} has all accessible videos complete - ready for Final Review"
)