"""Celery task for rendering accessible video with embedded audio descriptions.""" import asyncio import os import subprocess import tempfile from datetime import datetime from motor.motor_asyncio import AsyncIOMotorClient from ..core.config import settings from ..core.logging import get_logger from ..lib.vtt import VTTParser from ..models.job import AccessibleVideoEditState, JobStatus, PausePointData, VideoSegmentMetadata from ..schemas.whisper import CachedWhisperTranscript, CachedWordTimestamp from ..services.gcs import gcs_service from ..services.video_renderer import video_renderer_service from ..services.vtt_retimer import vtt_retimer_service from ..services.whisper_service import WordTimestamp, whisper_service from . import celery_app from .translate_and_synthesize import broadcast_status_update from .tts_synthesis import parse_cue_index_from_blob_name from .whisper_transcribe import transcribe_video_audio_task logger = get_logger(__name__) @celery_app.task(bind=True, time_limit=7200, soft_time_limit=7000) # 2 hour limit for video rendering def render_accessible_video_task(self, job_id: str, language: str): """ Pipeline 3: Accessible Video Rendering Triggered after TTS generation completes for a language when accessible_video_mp4 is requested. Steps: 1. Download source video and per-cue AD MP3s from GCS 2. Get AD VTT content and calculate cue durations 3. Call Gemini for placement analysis 4. Render accessible video (overlay or pause-insert) 5. If pause-insert: generate re-timed caption VTT 6. Upload outputs to GCS 7. Update job document """ logger.info(f"Starting accessible video render for job {job_id}, language {language}") try: result = asyncio.run(_async_render_accessible_video(job_id, language)) logger.info(f"Accessible video render completed for job {job_id}, language {language}") return result except Exception as e: logger.error(f"Accessible video render failed for job {job_id}/{language}: {e}") import traceback logger.error(f"Full traceback: {traceback.format_exc()}") raise async def _async_render_accessible_video(job_id: str, language: str): """Async implementation of accessible video rendering.""" logger.info(f"Async render started for job {job_id}, language {language}") client = AsyncIOMotorClient(settings.mongodb_uri) db = client[settings.mongodb_db] try: # Get job details job_doc = await db.jobs.find_one({"_id": job_id}) if not job_doc: raise ValueError(f"Job {job_id} not found") job_title = job_doc.get("title", "Untitled Job") # Verify accessible video is requested if not job_doc["requested_outputs"].get("accessible_video_mp4"): logger.info(f"Accessible video not requested for job {job_id}") return # Update progress to rendering and transition job status if needed update_fields = { f"accessible_video_progress.{language}": { "status": "rendering", "started_at": datetime.utcnow() }, "updated_at": datetime.utcnow() } # Transition job status to RENDERING_VIDEO if currently in TTS_GENERATING if job_doc["status"] == JobStatus.TTS_GENERATING.value: update_fields["status"] = JobStatus.RENDERING_VIDEO.value await db.jobs.update_one( {"_id": job_id}, {"$set": update_fields} ) # If we transitioned to RENDERING_VIDEO, also add to history if job_doc["status"] == JobStatus.TTS_GENERATING.value: await db.jobs.update_one( {"_id": job_id}, { "$push": { "review.history": { "at": datetime.utcnow(), "status": JobStatus.RENDERING_VIDEO.value, "by": "system" } } } ) # Broadcast status update broadcast_status_update( job_id, JobStatus.RENDERING_VIDEO.value if job_doc["status"] == JobStatus.TTS_GENERATING.value else "rendering", job_title=job_title, message=f"Rendering accessible video for {language.upper()}" ) # Use TMPDIR env var if set (for shared volume between workers) temp_base = os.environ.get('TMPDIR', None) with tempfile.TemporaryDirectory(dir=temp_base) as temp_dir: # 1. Download source video from GCS source_video_gcs = job_doc["source"]["gcs_uri"] source_blob_path = source_video_gcs.replace(f"gs://{settings.gcs_bucket}/", "") source_video_path = os.path.join(temp_dir, "source.mp4") logger.info(f"Downloading source video from {source_blob_path}") source_blob = gcs_service.bucket.blob(source_blob_path) source_blob.download_to_filename(source_video_path) # 2. Get language outputs lang_output = job_doc["outputs"].get(language) if not lang_output: raise ValueError(f"No outputs found for language {language}") # 3. Download AD VTT content ad_vtt_gcs = lang_output.get("ad_vtt_gcs") if not ad_vtt_gcs: raise ValueError(f"No AD VTT found for language {language}") ad_blob_path = ad_vtt_gcs.replace(f"gs://{settings.gcs_bucket}/", "") ad_blob = gcs_service.bucket.blob(ad_blob_path) ad_vtt_content = ad_blob.download_as_text() # 4. Download per-cue AD MP3 segments ad_cues_prefix = lang_output.get("ad_cues_gcs_prefix") if not ad_cues_prefix: raise ValueError(f"No AD cue segments found for language {language}") # Download per-cue MP3s — prefer manifest (stable across cue edits), fall back to blob listing ad_segments = [] cue_durations = [] from pydub import AudioSegment ad_cue_manifest = lang_output.get("ad_cue_manifest") if ad_cue_manifest: logger.info(f"Using ad_cue_manifest ({len(ad_cue_manifest)} entries) for MP3 download") for entry in sorted(ad_cue_manifest, key=lambda e: e["cue_index"]): cue_index = entry["cue_index"] gcs_uri = entry["gcs_uri"] blob_path = gcs_uri.replace(f"gs://{settings.gcs_bucket}/", "") local_path = os.path.join(temp_dir, f"cue_{cue_index}.mp3") gcs_service.bucket.blob(blob_path).download_to_filename(local_path) ad_segments.append((cue_index, local_path)) audio = AudioSegment.from_mp3(local_path) cue_durations.append(len(audio) / 1000.0) else: logger.info("No ad_cue_manifest found, falling back to legacy index-based blob listing") prefix_path = ad_cues_prefix.replace(f"gs://{settings.gcs_bucket}/", "") blobs = list(gcs_service.bucket.list_blobs(prefix=prefix_path)) cue_blobs = [ (b, parse_cue_index_from_blob_name(b.name)) for b in blobs if b.name.endswith(".mp3") ] cue_blobs = [(b, idx) for b, idx in cue_blobs if idx is not None] cue_blobs.sort(key=lambda x: x[1]) for blob, cue_index in cue_blobs: local_path = os.path.join(temp_dir, f"cue_{cue_index}.mp3") blob.download_to_filename(local_path) ad_segments.append((cue_index, local_path)) audio = AudioSegment.from_mp3(local_path) cue_durations.append(len(audio) / 1000.0) logger.info(f"Downloaded {len(ad_segments)} AD cue segments") # 5. Get method from job's requested_outputs (user selected at QC approval) method = job_doc["requested_outputs"].get("accessible_video_method") or "pause_insert" logger.info(f"Using user-selected accessible video method: {method}") # 5a. Build placements from AD VTT cues (instead of Gemini video analysis) placements = _build_placements_from_ad_vtt(ad_vtt_content, cue_durations) logger.info(f"Built {len(placements)} placements from AD VTT cues") # Build analysis dict compatible with existing code analysis = { "method": method, "method_rationale": "User-selected at QC Review approval", "placements": placements, "total_added_duration": sum(cue_durations) if method == "pause_insert" else 0, "warnings": [] } # 5b. If pause-insert method, refine pause points using Whisper if method == "pause_insert": logger.info("Refining pause points with Whisper speech analysis...") analysis, whisper_warnings = await _refine_pause_points_with_whisper( job_id, source_video_path, analysis, db, temp_dir ) if whisper_warnings: # Add warnings to analysis for visibility existing_warnings = analysis.get("warnings", []) analysis["warnings"] = existing_warnings + whisper_warnings logger.info(f"Whisper refinement complete with {len(whisper_warnings)} warnings") # 6. Render accessible video with segment persistence for QC editing output_video_path = os.path.join(temp_dir, "accessible_video.mp4") gcs_segment_prefix = f"{job_id}/{language}/segments/" logger.info(f"Rendering accessible video using {method} method with segment persistence...") rendered_path, updated_placements, segment_metadata, pause_points = await video_renderer_service.render_accessible_video( source_video_path, ad_segments, analysis, output_video_path, persist_segments=True, gcs_segment_prefix=gcs_segment_prefix ) # Update analysis with actual freeze durations for VTT retiming if updated_placements: analysis["placements"] = updated_placements logger.info(f"Updated {len(updated_placements)} placements with actual freeze durations") # Build edit state for QC review if segment metadata was returned edit_state = None if segment_metadata and pause_points: edit_state = AccessibleVideoEditState( pause_points=pause_points, video_segments=segment_metadata, tts_regeneration_queue=[], last_render_at=datetime.utcnow(), whisper_refine_enabled=False ) logger.info(f"Built edit state with {len(segment_metadata)} segments and {len(pause_points)} pause points") # 7. Upload rendered video to GCS video_blob_path = f"{job_id}/{language}/accessible_video.mp4" video_blob = gcs_service.bucket.blob(video_blob_path) video_blob.content_type = "video/mp4" video_blob.upload_from_filename(output_video_path) video_gcs_uri = f"gs://{settings.gcs_bucket}/{video_blob_path}" logger.info(f"Uploaded accessible video to {video_gcs_uri}") # 8. If pause-insert, generate re-timed captions VTT retimed_captions_gcs_uri = None if method == "pause_insert": # Download original captions VTT captions_vtt_gcs = lang_output.get("captions_vtt_gcs") if captions_vtt_gcs: captions_blob_path = captions_vtt_gcs.replace(f"gs://{settings.gcs_bucket}/", "") captions_blob = gcs_service.bucket.blob(captions_blob_path) original_captions_vtt = captions_blob.download_as_text() # Re-time captions retimed_captions = vtt_retimer_service.retime_for_pause_insert( original_captions_vtt, analysis ) # Upload re-timed captions retimed_blob_path = f"{job_id}/{language}/accessible_captions.vtt" retimed_blob = gcs_service.bucket.blob(retimed_blob_path) retimed_blob.content_type = "text/vtt" retimed_blob.upload_from_string(retimed_captions, content_type="text/vtt") retimed_captions_gcs_uri = f"gs://{settings.gcs_bucket}/{retimed_blob_path}" logger.info(f"Uploaded re-timed captions to {retimed_captions_gcs_uri}") # 9. Update job document with results (including edit state for QC review) update_fields = { f"outputs.{language}.accessible_video_gcs": video_gcs_uri, f"outputs.{language}.accessible_video_method": method, f"outputs.{language}.video_segments_gcs_prefix": f"gs://{settings.gcs_bucket}/{gcs_segment_prefix}", f"accessible_video_progress.{language}": { "status": "completed", "method": method, "started_at": job_doc.get("accessible_video_progress", {}).get(language, {}).get("started_at"), "completed_at": datetime.utcnow() }, "updated_at": datetime.utcnow() } if retimed_captions_gcs_uri: update_fields[f"outputs.{language}.retimed_captions_vtt_gcs"] = retimed_captions_gcs_uri # Store edit state for QC review accessible video editing if edit_state: update_fields[f"outputs.{language}.accessible_video_edit_state"] = edit_state.model_dump() await db.jobs.update_one( {"_id": job_id}, {"$set": update_fields} ) # Broadcast completion broadcast_status_update( job_id, "asset_ready", job_title=job_title, message=f"Accessible video ready for {language.upper()} ({method} method)" ) # Check if all accessible videos are complete await _check_accessible_video_completion(job_id, db) logger.info(f"Accessible video render complete for job {job_id}/{language}") except Exception as e: logger.error(f"Accessible video render failed for job {job_id}/{language}: {e}") # Update progress to failed await db.jobs.update_one( {"_id": job_id}, { "$set": { f"accessible_video_progress.{language}": { "status": "failed", "error_message": str(e), "completed_at": datetime.utcnow() }, "updated_at": datetime.utcnow() } } ) # Check if all videos are now finished (completed or failed) to update job status # This ensures the job transitions to RENDER_FAILED if all languages have finished await _check_accessible_video_completion(job_id, db) raise finally: client.close() def _build_placements_from_ad_vtt(ad_vtt_content: str, cue_durations: list[float]) -> list[dict]: """ Build placement instructions from AD VTT cues and TTS durations. Uses AD VTT start_time as initial pause_point for each cue. This replaces the Gemini analyze_accessible_video_placement call. Args: ad_vtt_content: The AD VTT content string cue_durations: List of TTS audio durations in seconds (same order as VTT cues) Returns: List of placement dicts compatible with Whisper refinement """ cues = VTTParser.parse(ad_vtt_content) if len(cues) != len(cue_durations): logger.warning( f"Cue count mismatch: {len(cues)} VTT cues, {len(cue_durations)} durations. " f"Using minimum count." ) placements = [] for i, cue in enumerate(cues): if i >= len(cue_durations): break placements.append({ "ad_cue_index": i, "original_start_time": cue.start_time, "original_end_time": cue.end_time, "target_start_time": cue.start_time, "ad_duration": cue_durations[i], "pause_point": cue.start_time, # Use VTT start_time as initial pause point "resume_from": cue.start_time, # Will be refined by Whisper "pause_point_rationale": "Derived from AD VTT cue start time" }) return placements async def _check_accessible_video_completion(job_id: str, db): """Check if all accessible videos are complete and update job status accordingly.""" job_doc = await db.jobs.find_one({"_id": job_id}) if not job_doc: return progress = job_doc.get("accessible_video_progress", {}) requested_languages = job_doc["requested_outputs"]["languages"] # Check status of all requested languages all_finished = True # All languages have either completed or failed any_failed = False failed_languages = [] for language in requested_languages: lang_progress = progress.get(language, {}) status = lang_progress.get("status", "pending") if status == "failed": any_failed = True failed_languages.append({ "language": language, "error": lang_progress.get("error_message", "Unknown error") }) elif status not in ["completed", "failed"]: # Still pending or rendering all_finished = False job_title = job_doc.get("title", "Untitled Job") # Only update job status if all languages have finished processing if all_finished: if any_failed: # Some or all videos failed - transition to RENDER_FAILED logger.error(f"Accessible video rendering failed for job {job_id}: {len(failed_languages)} language(s) failed") # Build error summary error_summary = "; ".join([f"{f['language']}: {f['error']}" for f in failed_languages]) if job_doc["status"] in [JobStatus.TTS_GENERATING.value, JobStatus.RENDERING_VIDEO.value]: await db.jobs.update_one( {"_id": job_id}, { "$set": { "status": JobStatus.RENDER_FAILED.value, "error": { "type": "render_failure", "failed_languages": failed_languages, "message": f"Video rendering failed for {len(failed_languages)} language(s): {error_summary}", "timestamp": datetime.utcnow().isoformat() }, "updated_at": datetime.utcnow() }, "$push": { "review.history": { "at": datetime.utcnow(), "status": JobStatus.RENDER_FAILED.value, "by": "system", "notes": f"Rendering failed for: {', '.join([f['language'] for f in failed_languages])}" } } } ) broadcast_status_update( job_id, JobStatus.RENDER_FAILED.value, job_title=job_title, message=f"{job_title} - Video rendering failed for {len(failed_languages)} language(s). Manual reprocessing required." ) else: # All videos completed successfully # NEW WORKFLOW: Go to PENDING_QC for QC review (not PENDING_FINAL_REVIEW) logger.info(f"All accessible videos complete for job {job_id}") if job_doc["status"] in [JobStatus.TTS_GENERATING.value, JobStatus.RENDERING_VIDEO.value]: await db.jobs.update_one( {"_id": job_id}, { "$set": { "status": JobStatus.PENDING_QC.value, "updated_at": datetime.utcnow() }, "$push": { "review.history": { "at": datetime.utcnow(), "status": JobStatus.PENDING_QC.value, "by": "system" } } } ) broadcast_status_update( job_id, JobStatus.PENDING_QC.value, job_title=job_title, message=f"{job_title} has all accessible videos complete - ready for QC Review" ) async def _refine_pause_points_with_whisper( job_id: str, video_path: str, analysis: dict, db, temp_dir: str ) -> tuple[dict, list[str]]: """ Refine Gemini pause points using Whisper speech gap detection. This function: 1. Extracts audio and runs Whisper transcription via dedicated queue (always fresh, no caching) 2. Saves transcript to job document for debugging/auditing 3. Identifies speech gaps from word timestamps 4. Snaps all pause points to nearest valid gaps AFTER the original point Args: job_id: Job ID for caching video_path: Path to source video analysis: Gemini analysis dict with placements db: MongoDB database handle temp_dir: Temporary directory for extracted audio Returns: Tuple of (refined_analysis, warnings) """ logger.info(f"Refining pause points with Whisper for job {job_id}") # Always generate a fresh Whisper transcript (no caching) # This ensures we get accurate word timestamps for the current video file audio_path = os.path.join(temp_dir, "source_audio.mp3") await _extract_audio_for_whisper(video_path, audio_path) # Dispatch Whisper transcription to dedicated whisper queue # Uses same pattern as FFmpeg: dispatch -> poll -> allow_join_result -> get logger.info(f"Dispatching Whisper transcription to dedicated queue for job {job_id}") try: words = await _dispatch_whisper_transcription(job_id, audio_path) except Exception as e: logger.error(f"Whisper transcription failed for job {job_id}: {e}") return analysis, [f"Whisper transcription failed: {str(e)} - using original Gemini timestamps"] if words: # Save transcript to job document (for debugging/auditing, not caching) transcript_data = CachedWhisperTranscript( words=[CachedWordTimestamp(word=w.word, start=w.start, end=w.end) for w in words], model_name=settings.whisper_model, audio_duration=words[-1].end if words else 0, created_at=datetime.utcnow().isoformat() ) await db.jobs.update_one( {"_id": job_id}, {"$set": {"whisper_transcript": transcript_data.model_dump()}} ) logger.info(f"Saved Whisper transcript with {len(words)} words for job {job_id}") if not words: logger.warning(f"No speech detected in video for job {job_id}, using original Gemini timestamps") return analysis, ["No speech detected in video - using original Gemini timestamps"] # Identify speech gaps gaps = whisper_service.identify_speech_gaps(words) logger.info(f"Found {len(gaps)} speech gaps in video for job {job_id}") # Refine pause points (Phase 1: individual refinement, Phase 2: consolidation) refined_placements, warnings = whisper_service.refine_all_pause_points( analysis.get("placements", []), words, gaps ) # Update analysis with refined placements refined_analysis = analysis.copy() refined_analysis["placements"] = refined_placements refined_analysis["whisper_refined"] = True logger.info(f"Pause point refinement complete for job {job_id}: {len(warnings)} warnings") return refined_analysis, warnings async def _extract_audio_for_whisper(video_path: str, audio_path: str): """ Extract audio track from video for Whisper transcription. Uses FFmpeg to extract audio at 16kHz mono MP3 (optimal for Whisper). """ cmd = [ "ffmpeg", "-y", "-i", video_path, "-vn", # No video "-acodec", "libmp3lame", "-ar", "16000", # 16kHz is optimal for Whisper "-ac", "1", # Mono "-q:a", "5", # Medium quality audio_path ] logger.info(f"Extracting audio for Whisper: {video_path} -> {audio_path}") # Run FFmpeg synchronously in a thread pool def run_ffmpeg(): process = subprocess.run( cmd, capture_output=True, text=True ) if process.returncode != 0: raise RuntimeError(f"FFmpeg audio extraction failed: {process.stderr}") await asyncio.to_thread(run_ffmpeg) logger.info(f"Audio extraction complete: {audio_path}") async def _dispatch_whisper_transcription(job_id: str, audio_path: str) -> list[WordTimestamp]: """ Dispatch Whisper transcription to dedicated whisper queue and wait for result. Uses the same pattern as FFmpeg dispatch: 1. apply_async() to dispatch to the whisper queue 2. Poll with ready() using async sleep to avoid blocking 3. Use allow_join_result() context manager 4. Get result only after task is ready Args: job_id: Job ID for logging audio_path: Path to extracted audio file Returns: List of WordTimestamp objects from transcription Raises: Exception: If transcription fails """ from celery.result import allow_join_result # Dispatch to whisper queue task_result = transcribe_video_audio_task.apply_async( args=[job_id, audio_path], queue='whisper' ) logger.info(f"Whisper task dispatched for job {job_id}, waiting for completion...") # Poll for result with async sleep to avoid blocking # Use longer sleep interval since Whisper takes a while poll_count = 0 while not task_result.ready(): await asyncio.sleep(1.0) poll_count += 1 if poll_count % 30 == 0: # Log every 30 seconds logger.info(f"Still waiting for Whisper transcription for job {job_id}...") # Get result - use allow_join_result since we're calling from within a task # This is safe because we've already confirmed the task is complete via ready() with allow_join_result(): result = task_result.get(timeout=30) # Check for task failure if task_result.failed(): raise Exception(f"Whisper task failed: {task_result.result}") # Convert to WordTimestamp objects words = [ WordTimestamp(word=w["word"], start=w["start"], end=w["end"]) for w in result.get("words", []) ] logger.info( f"Whisper transcription complete for job {job_id}: " f"{len(words)} words detected" ) return words