"""Service for rendering accessible video with embedded audio descriptions using ffmpeg. FFmpeg operations are dispatched to a dedicated Celery queue (ffmpeg) with concurrency=1 to prevent server overload when multiple render tasks run in parallel. When FFMPEG_SERVICE_URL is configured, operations are offloaded to Cloud Run for autoscaling. """ import asyncio import json import os import tempfile from pathlib import Path from typing import Any from uuid import uuid4 import google.auth.transport.requests import httpx from google.auth import default from google.cloud import storage from google.oauth2 import id_token from ..core.config import settings from ..core.logging import get_logger from ..models.job import PausePointData, VideoSegmentMetadata from ..schemas.accessible_video import AccessibleVideoMethod, GeminiAccessibleVideoAnalysis logger = get_logger(__name__) def _get_cloud_run_id_token(audience: str) -> str: """ Get an ID token for authenticating to Cloud Run services. Uses the service account credentials to generate an ID token that Cloud Run will accept for authentication. """ credentials, _ = default() request = google.auth.transport.requests.Request() token = id_token.fetch_id_token(request, audience) return token class FFmpegExecutionError(Exception): """Raised when an FFmpeg/FFprobe command fails.""" pass class VideoRendererService: """Service for rendering accessible video with embedded audio descriptions.""" def __init__(self): self.ffmpeg_path = "ffmpeg" self.ffprobe_path = "ffprobe" # Audio ducking settings self.duck_level = getattr(settings, 'accessible_video_duck_level', 0.3) self.duck_fade_ms = getattr(settings, 'accessible_video_duck_fade_ms', 200) # Cloud Run support self._gcs_client: storage.Client | None = None # Source video caching for Cloud Run (uploaded once, reused across operations) self._cached_source_gcs_uri: str | None = None @property def _use_cloud_run(self) -> bool: """Check if Cloud Run FFmpeg service is configured.""" return bool(settings.ffmpeg_service_url) def _get_gcs_client(self) -> storage.Client: """Get or create GCS client for file transfers.""" if self._gcs_client is None: self._gcs_client = storage.Client() return self._gcs_client def _upload_to_gcs_temp(self, local_path: str, prefix: str) -> str: """ Upload local file to GCS temp location and return gs:// URI. Args: local_path: Path to local file prefix: Prefix for temp path (e.g., "source", "frame", "audio") Returns: GCS URI (gs://bucket/temp/ffmpeg/{prefix}/{uuid}/{filename}) """ client = self._get_gcs_client() bucket = client.bucket(settings.gcs_bucket) filename = os.path.basename(local_path) gcs_path = f"temp/ffmpeg/{prefix}/{uuid4().hex}/{filename}" blob = bucket.blob(gcs_path) blob.upload_from_filename(local_path) gcs_uri = f"gs://{settings.gcs_bucket}/{gcs_path}" logger.debug(f"Uploaded {local_path} to {gcs_uri}") return gcs_uri def _download_from_gcs_temp(self, gcs_uri: str, local_path: str): """ Download file from GCS to local path. Args: gcs_uri: GCS URI (gs://bucket/path) local_path: Local destination path """ client = self._get_gcs_client() # Parse gs:// URI if not gcs_uri.startswith("gs://"): raise ValueError(f"Invalid GCS URI: {gcs_uri}") path_parts = gcs_uri[5:].split("/", 1) bucket_name = path_parts[0] blob_path = path_parts[1] if len(path_parts) > 1 else "" bucket = client.bucket(bucket_name) blob = bucket.blob(blob_path) # Ensure parent directory exists os.makedirs(os.path.dirname(local_path), exist_ok=True) blob.download_to_filename(local_path) logger.debug(f"Downloaded {gcs_uri} to {local_path}") def _delete_gcs_temp(self, gcs_uri: str): """ Delete temporary GCS file. Args: gcs_uri: GCS URI to delete (gs://bucket/path) """ if not gcs_uri: return try: client = self._get_gcs_client() # Parse gs:// URI if not gcs_uri.startswith("gs://"): logger.warning(f"Invalid GCS URI for deletion: {gcs_uri}") return path_parts = gcs_uri[5:].split("/", 1) bucket_name = path_parts[0] blob_path = path_parts[1] if len(path_parts) > 1 else "" bucket = client.bucket(bucket_name) blob = bucket.blob(blob_path) blob.delete() logger.debug(f"Deleted {gcs_uri}") except Exception as e: # Log but don't fail on cleanup errors logger.warning(f"Failed to delete GCS temp file {gcs_uri}: {e}") def _upload_to_gcs_permanent(self, local_path: str, gcs_path: str) -> str: """ Upload local file to permanent GCS location (not temp). Args: local_path: Path to local file gcs_path: Full GCS path within the bucket (e.g., "job_id/en/segments/seg_0.mp4") Returns: GCS URI (gs://bucket/gcs_path) """ client = self._get_gcs_client() bucket = client.bucket(settings.gcs_bucket) blob = bucket.blob(gcs_path) blob.upload_from_filename(local_path) gcs_uri = f"gs://{settings.gcs_bucket}/{gcs_path}" logger.debug(f"Uploaded {local_path} to {gcs_uri} (permanent)") return gcs_uri async def _call_cloud_run_probe(self, gcs_uri: str) -> dict[str, Any]: """ Call Cloud Run FFmpeg service /probe endpoint. Args: gcs_uri: GCS URI of video to probe (gs://bucket/path) Returns: Probe result with duration and stream info """ service_url = settings.ffmpeg_service_url.rstrip("/") auth_token = _get_cloud_run_id_token(service_url) async with httpx.AsyncClient(timeout=600.0) as client: response = await client.post( f"{service_url}/probe", json={"gcs_uri": gcs_uri}, # Matches ProbeRequest model headers={"Authorization": f"Bearer {auth_token}"} ) response.raise_for_status() return response.json() async def _call_cloud_run_endpoint( self, endpoint: str, payload: dict[str, Any], output_gcs_uri: str | None = None ) -> dict[str, Any]: """ Call Cloud Run FFmpeg service endpoint. Args: endpoint: Endpoint path (e.g., "/encode-segment", "/extract-frame") payload: Request payload output_gcs_uri: Expected output GCS URI (for cleanup on error) Returns: Response JSON """ service_url = settings.ffmpeg_service_url.rstrip("/") auth_token = _get_cloud_run_id_token(service_url) async with httpx.AsyncClient(timeout=600.0) as client: try: response = await client.post( f"{service_url}{endpoint}", json=payload, headers={"Authorization": f"Bearer {auth_token}"} ) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: # Try to get error details from response try: error_detail = e.response.json().get("detail", str(e)) except Exception: error_detail = str(e) raise FFmpegExecutionError(f"Cloud Run {endpoint} failed: {error_detail}") async def _dispatch_ffmpeg(self, cmd: list[str], timeout: int = 3600) -> dict[str, Any]: """ Dispatch FFmpeg command to the dedicated ffmpeg queue and wait for result. This method bridges the async render task with the sync Celery task. Uses apply_async and polls for completion to avoid blocking the event loop. Args: cmd: FFmpeg command list timeout: Command timeout in seconds Returns: dict with 'success', 'stdout', 'stderr', 'returncode' Raises: FFmpegExecutionError: If the command fails """ from celery.result import allow_join_result from ..tasks.ffmpeg_operations import run_ffmpeg_command # Dispatch to ffmpeg queue task_result = run_ffmpeg_command.apply_async( args=[cmd, timeout], queue='ffmpeg' ) # Poll for result with async sleep to avoid blocking while not task_result.ready(): await asyncio.sleep(0.5) # Get result - use allow_join_result since we're calling from within a task # This is safe because we've already confirmed the task is complete via ready() with allow_join_result(): result = task_result.get(timeout=30) if not result['success']: # FFmpeg writes version/config to stderr first, actual error is at the end # Show last 1500 chars to capture the actual error message stderr = result['stderr'] if len(stderr) > 1500: stderr = f"...{stderr[-1500:]}" raise FFmpegExecutionError( f"FFmpeg failed with code {result['returncode']}: {stderr}" ) return result async def _dispatch_ffprobe(self, cmd: list[str]) -> dict[str, Any]: """ Dispatch FFprobe command to the dedicated ffmpeg queue and wait for result. Args: cmd: FFprobe command list Returns: dict with 'success', 'stdout', 'stderr', 'returncode' Raises: FFmpegExecutionError: If the command fails """ from celery.result import allow_join_result from ..tasks.ffmpeg_operations import run_ffprobe_command # Dispatch to ffmpeg queue task_result = run_ffprobe_command.apply_async( args=[cmd], queue='ffmpeg' ) # Poll for result with async sleep (shorter interval for fast probe) while not task_result.ready(): await asyncio.sleep(0.2) # Get result - use allow_join_result since we're calling from within a task # This is safe because we've already confirmed the task is complete via ready() with allow_join_result(): result = task_result.get(timeout=30) if not result['success']: raise FFmpegExecutionError( f"FFprobe failed with code {result['returncode']}: {result['stderr'][:200]}" ) return result async def render_accessible_video( self, source_video_path: str, ad_segments: list[tuple[int, str]], # [(cue_index, mp3_path), ...] analysis: dict[str, Any], output_path: str, persist_segments: bool = False, gcs_segment_prefix: str | None = None, ) -> tuple[str, list[dict] | None, list[VideoSegmentMetadata] | None, list[PausePointData] | None]: """ Render accessible video based on Gemini analysis. Args: source_video_path: Path to source MP4 ad_segments: List of (cue_index, mp3_path) tuples for each AD segment analysis: Gemini analysis dict with method and placements output_path: Where to save the output MP4 persist_segments: If True, upload video segments to GCS for QC editing gcs_segment_prefix: GCS path prefix for segments (e.g., "job_id/en/segments/") Returns: Tuple of (output_path, updated_placements, segment_metadata, pause_points) - output_path: Path to rendered accessible video - updated_placements: Placements with actual_freeze_duration added (pause-insert only) - segment_metadata: List of VideoSegmentMetadata if persist_segments=True, else None - pause_points: List of PausePointData if persist_segments=True, else None """ method = analysis.get("method", "pause_insert") # Cloud Run optimization: Upload source video once and cache for all operations if self._use_cloud_run: logger.info("Cloud Run mode: uploading source video to GCS for caching") self._cached_source_gcs_uri = self._upload_to_gcs_temp(source_video_path, "source") logger.info(f"Source video cached at: {self._cached_source_gcs_uri}") try: if method == "overlay": result_path = await self._render_overlay_method( source_video_path, ad_segments, analysis, output_path ) return (result_path, None, None, None) else: return await self._render_pause_insert_method( source_video_path, ad_segments, analysis, output_path, persist_segments=persist_segments, gcs_segment_prefix=gcs_segment_prefix ) finally: # Clean up cached source video from GCS if self._use_cloud_run and self._cached_source_gcs_uri: logger.info(f"Cleaning up cached source video: {self._cached_source_gcs_uri}") self._delete_gcs_temp(self._cached_source_gcs_uri) self._cached_source_gcs_uri = None async def _render_overlay_method( self, source_video_path: str, ad_segments: list[tuple[int, str]], analysis: dict[str, Any], output_path: str, ) -> str: """ Render with overlay method: 1. Create AD audio track with segments at target times 2. Apply ducking to original audio during AD playback 3. Mix tracks together 4. Mux with original video (copy video stream) """ logger.info(f"Starting overlay render for {source_video_path}") placements = analysis.get("placements", []) with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) # Get source video duration duration = await self._get_video_duration(source_video_path) logger.info(f"Source video duration: {duration}s") # Build ducking filter for original audio duck_filters = [] for placement in placements: duck_start = placement.get("duck_start") duck_end = placement.get("duck_end") if duck_start is not None and duck_end is not None: # Volume filter: reduce to duck_level during AD, with fade duck_filters.append( f"volume=enable='between(t,{duck_start},{duck_end})':" f"volume={self.duck_level}" ) # Build ffmpeg command inputs = ["-i", source_video_path] filter_parts = [] # Add each AD segment as input for cue_index, mp3_path in ad_segments: inputs.extend(["-i", mp3_path]) # Build complex filter # First, apply ducking to original audio if duck_filters: ducked_filter = ",".join(duck_filters) filter_parts.append(f"[0:a]{ducked_filter}[ducked]") base_audio = "[ducked]" else: base_audio = "[0:a]" # Add delay to each AD segment and mix ad_labels = [] for i, (cue_index, mp3_path) in enumerate(ad_segments): # Find the placement for this cue placement = next( (p for p in placements if p.get("ad_cue_index") == cue_index), None ) if placement: target_time = placement.get("target_start_time", 0) delay_ms = int(target_time * 1000) input_idx = i + 1 # 0 is source video ad_label = f"ad{i}" filter_parts.append( f"[{input_idx}:a]adelay={delay_ms}|{delay_ms}[{ad_label}]" ) ad_labels.append(f"[{ad_label}]") # Mix all audio streams together if ad_labels: all_audio = base_audio + "".join(ad_labels) num_inputs = 1 + len(ad_labels) filter_parts.append( f"{all_audio}amix=inputs={num_inputs}:duration=first:dropout_transition=0:normalize=0[mixed]" ) audio_output = "[mixed]" else: audio_output = base_audio.replace("[", "").replace("]", "") filter_complex = ";".join(filter_parts) # Build final command cmd = [ self.ffmpeg_path, "-y", # Overwrite output *inputs, ] if filter_complex: cmd.extend(["-filter_complex", filter_complex]) cmd.extend([ "-map", "0:v", "-map", audio_output, "-c:v", "copy", # Copy video stream (no re-encoding) "-c:a", "aac", "-b:a", "192k", output_path ]) else: cmd.extend([ "-c:v", "copy", "-c:a", "copy", output_path ]) logger.info(f"Running ffmpeg overlay command...") await self._run_ffmpeg(cmd) logger.info(f"Overlay render complete: {output_path}") return output_path async def _render_pause_insert_method( self, source_video_path: str, ad_segments: list[tuple[int, str]], analysis: dict[str, Any], output_path: str, persist_segments: bool = False, gcs_segment_prefix: str | None = None, ) -> tuple[str, list[dict], list[VideoSegmentMetadata] | None, list[PausePointData] | None]: """ Render with pause-insert method: 1. Split video at each pause point 2. Extract freeze frame at each pause point 3. Create freeze-frame segment with AD audio 4. Concatenate all segments 5. Optionally persist segments to GCS for QC editing Args: persist_segments: If True, upload segments to GCS and return metadata gcs_segment_prefix: GCS path prefix (e.g., "job_id/en/segments/") Returns: Tuple of (output_path, updated_placements, segment_metadata, pause_points) """ logger.info(f"Starting pause-insert render for {source_video_path}") placements = analysis.get("placements", []) # Defensive: enforce pause_point monotonicity in cue_index order before temporal sort. # Whisper refinement or user adjustments can cause a later cue's pause_point to # precede an earlier cue's, which would reorder cues in the rendered timeline. cue_ordered = sorted( [p for p in placements if p.get("pause_point") is not None], key=lambda p: p.get("ad_cue_index", 0) ) for i in range(1, len(cue_ordered)): prev_pp = cue_ordered[i - 1]["pause_point"] curr_pp = cue_ordered[i]["pause_point"] if curr_pp < prev_pp: logger.warning( f"Renderer monotonicity fix: cue {cue_ordered[i].get('ad_cue_index')} " f"pause_point {curr_pp:.2f}s < cue {cue_ordered[i-1].get('ad_cue_index')} " f"pause_point {prev_pp:.2f}s, clamping to {prev_pp:.2f}s" ) cue_ordered[i]["pause_point"] = prev_pp cue_ordered[i]["resume_from"] = prev_pp # Sort by pause_point time, with ad_cue_index as secondary key # to ensure consolidated cues (sharing same pause_point) maintain VTT order sorted_placements = sorted( cue_ordered, key=lambda p: (p["pause_point"], p.get("ad_cue_index", 0)) ) # Debug logging for pause points (midpoint algorithm with silence buffers) logger.info(f"Pause-insert (midpoint + 500ms silence buffers): {len(sorted_placements)} placements") for i, p in enumerate(sorted_placements): logger.info( f" Placement {i}: cue_index={p.get('ad_cue_index')}, " f"pause_at={p.get('pause_point'):.2f}s, ad_duration={p.get('ad_duration'):.2f}s, " f"consolidated={p.get('consolidated_with_previous', False)}" ) if not sorted_placements: logger.warning("No pause points found, copying source video") await self._copy_video(source_video_path, output_path) return (output_path, [], None, None) with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) # Get detailed video AND audio properties for uniform encoding video_props = await self._get_video_properties(source_video_path) source_duration = await self._get_video_duration(source_video_path) logger.info(f"Source Properties: {video_props}, Duration: {source_duration:.2f}s") # Create a mapping of cue_index to mp3_path cue_to_mp3 = {cue_index: mp3_path for cue_index, mp3_path in ad_segments} # Pre-process placements and validate valid_placements = [] current_time = 0.0 for i, placement in enumerate(sorted_placements): pause_point = placement["pause_point"] cue_index = placement["ad_cue_index"] ad_duration = placement["ad_duration"] # Validate pause_point is within video bounds if pause_point >= source_duration: logger.warning( f"Cue {cue_index}: pause_point {pause_point:.2f}s exceeds video duration " f"{source_duration:.2f}s, clamping to {source_duration - 0.1:.2f}s" ) pause_point = max(0, source_duration - 0.1) ad_mp3_path = cue_to_mp3.get(cue_index) if not ad_mp3_path: logger.warning(f"No AD audio found for cue {cue_index}, skipping") continue valid_placements.append({ "index": i, "pause_point": pause_point, "cue_index": cue_index, "ad_duration": ad_duration, "ad_mp3_path": ad_mp3_path, "segment_start": current_time, }) current_time = pause_point # Track final segment info final_segment_start = current_time final_segment_needed = final_segment_start < source_duration # ============================================================ # PARALLEL PHASE 1: Generate shared silence + extract all frames + all video segments # ============================================================ logger.info(f"Phase 1: Parallel extraction of {len(valid_placements)} frames and video segments") silence_duration = 0.5 # 500ms shared by all silence_path = temp_dir_path / "silence_shared.m4a" # Build tasks for phase 1 phase1_tasks = [] # Task: Generate silence (just once, shared by all) phase1_tasks.append(self._generate_silence(silence_duration, str(silence_path), video_props)) # Tasks: Extract all video segments video_segment_paths = {} for p in valid_placements: i = p["index"] if p["pause_point"] > p["segment_start"]: segment_path = temp_dir_path / f"segment_{i}_video.mp4" video_segment_paths[i] = str(segment_path) phase1_tasks.append(self._extract_segment_reencoded( source_video_path, p["segment_start"], p["pause_point"] - p["segment_start"], str(segment_path), video_props )) # Task: Extract final segment if needed final_segment_path = None if final_segment_needed: final_segment_path = temp_dir_path / "segment_final.mp4" phase1_tasks.append(self._extract_segment_reencoded( source_video_path, final_segment_start, source_duration - final_segment_start, str(final_segment_path), video_props )) # Tasks: Extract all freeze frames freeze_frame_paths = {} for p in valid_placements: i = p["index"] freeze_frame_path = temp_dir_path / f"freeze_{i}.png" freeze_frame_paths[i] = str(freeze_frame_path) phase1_tasks.append(self._extract_frame( source_video_path, p["pause_point"], str(freeze_frame_path) )) # Execute phase 1 in parallel await asyncio.gather(*phase1_tasks) logger.info(f"Phase 1 complete: extracted {len(freeze_frame_paths)} frames, {len(video_segment_paths)} video segments") # ============================================================ # PARALLEL PHASE 2: Concatenate all audio tracks # ============================================================ logger.info(f"Phase 2: Parallel audio concatenation for {len(valid_placements)} placements") phase2_tasks = [] combined_audio_paths = {} for p in valid_placements: i = p["index"] combined_audio_path = temp_dir_path / f"combined_audio_{i}.m4a" combined_audio_paths[i] = str(combined_audio_path) phase2_tasks.append(self._concatenate_audio( [str(silence_path), p["ad_mp3_path"], str(silence_path)], str(combined_audio_path), video_props )) await asyncio.gather(*phase2_tasks) logger.info(f"Phase 2 complete: concatenated {len(combined_audio_paths)} audio tracks") # ============================================================ # PARALLEL PHASE 3: Create all freeze segments # ============================================================ logger.info(f"Phase 3: Parallel freeze segment creation for {len(valid_placements)} placements") phase3_tasks = [] freeze_segment_paths = {} for p in valid_placements: i = p["index"] cue_index = p["cue_index"] ad_duration = p["ad_duration"] total_freeze_duration = ad_duration + (2 * silence_duration) logger.info( f"Cue {cue_index}: Freeze segment with silence buffers - " f"500ms + AD={ad_duration:.2f}s + 500ms = {total_freeze_duration:.2f}s" ) freeze_segment_path = temp_dir_path / f"freeze_segment_{i}.mp4" freeze_segment_paths[i] = str(freeze_segment_path) phase3_tasks.append(self._create_freeze_segment_matched( freeze_frame_paths[i], combined_audio_paths[i], total_freeze_duration, str(freeze_segment_path), video_props )) await asyncio.gather(*phase3_tasks) logger.info(f"Phase 3 complete: created {len(freeze_segment_paths)} freeze segments") # ============================================================ # PHASE 3.5: Measure actual freeze segment durations for VTT retiming # ============================================================ # NOTE: Use _get_video_duration_local directly since freeze segments are # local files. Using _get_video_duration would incorrectly use the cached # source video URI in Cloud Run mode instead of measuring the freeze segment. logger.info("Measuring actual freeze segment durations...") for p in valid_placements: i = p["index"] freeze_path = freeze_segment_paths[i] actual_duration = await self._get_video_duration_local(freeze_path) p["actual_freeze_duration"] = actual_duration # Log any discrepancy between expected and actual duration expected = p["ad_duration"] + (2 * silence_duration) discrepancy = actual_duration - expected if abs(discrepancy) > 0.01: # 10ms threshold logger.warning( f"Freeze segment duration mismatch for cue {p['cue_index']}: " f"expected={expected:.3f}s, actual={actual_duration:.3f}s, " f"discrepancy={discrepancy:+.3f}s" ) else: logger.debug( f"Freeze segment cue {p['cue_index']}: duration={actual_duration:.3f}s (expected={expected:.3f}s)" ) # ============================================================ # PHASE 4: Assemble segment list in correct order # ============================================================ segment_files = [] for p in valid_placements: i = p["index"] # Add video segment if it exists if i in video_segment_paths: segment_files.append(video_segment_paths[i]) # Add freeze segment segment_files.append(freeze_segment_paths[i]) # Add final segment logger.info( f"Final segment check: current_time={final_segment_start:.2f}s, " f"source_duration={source_duration:.2f}s, " f"remaining={source_duration - final_segment_start:.2f}s" ) if final_segment_path: segment_files.append(str(final_segment_path)) logger.info(f"Added final segment: {final_segment_start:.2f}s to {source_duration:.2f}s") else: logger.info("No final segment needed (current_time >= source_duration)") # Debug: Log all segments before concatenation logger.info(f"Total segments to concatenate: {len(segment_files)}") for idx, seg in enumerate(segment_files): logger.info(f" Segment {idx}: {seg}") # 5. Concatenate all segments if segment_files: await self._concatenate_segments(segment_files, output_path, temp_dir_path) else: await self._copy_video(source_video_path, output_path) logger.info(f"Pause-insert render complete: {output_path}") # Build updated placements with actual_freeze_duration # Map from cue_index to actual_freeze_duration actual_durations = { p["cue_index"]: p["actual_freeze_duration"] for p in valid_placements } # Update original placements with actual freeze durations updated_placements = [] for placement in sorted_placements: updated = placement.copy() cue_index = placement.get("ad_cue_index") if cue_index in actual_durations: updated["actual_freeze_duration"] = actual_durations[cue_index] updated_placements.append(updated) # ============================================================ # PHASE 5: Persist segments to GCS for QC editing (optional) # ============================================================ segment_metadata_list: list[VideoSegmentMetadata] | None = None pause_point_data_list: list[PausePointData] | None = None if persist_segments and gcs_segment_prefix: logger.info(f"Persisting {len(segment_files)} segments to GCS at {gcs_segment_prefix}") segment_metadata_list = [] segment_idx = 0 cumulative_time_ms = 0.0 for p in valid_placements: i = p["index"] # Upload video segment if it exists if i in video_segment_paths: local_path = video_segment_paths[i] gcs_path = f"{gcs_segment_prefix}seg_{segment_idx}.mp4" gcs_uri = self._upload_to_gcs_permanent(local_path, gcs_path) segment_duration_ms = (p["pause_point"] - p["segment_start"]) * 1000 segment_metadata_list.append(VideoSegmentMetadata( segment_index=segment_idx, start_ms=cumulative_time_ms, end_ms=cumulative_time_ms + segment_duration_ms, gcs_uri=gcs_uri, duration_ms=segment_duration_ms, is_freeze_frame=False, cue_index=None )) cumulative_time_ms += segment_duration_ms segment_idx += 1 # Upload freeze segment freeze_local_path = freeze_segment_paths[i] gcs_path = f"{gcs_segment_prefix}seg_{segment_idx}_freeze.mp4" gcs_uri = self._upload_to_gcs_permanent(freeze_local_path, gcs_path) freeze_duration_ms = p["actual_freeze_duration"] * 1000 segment_metadata_list.append(VideoSegmentMetadata( segment_index=segment_idx, start_ms=cumulative_time_ms, end_ms=cumulative_time_ms + freeze_duration_ms, gcs_uri=gcs_uri, duration_ms=freeze_duration_ms, is_freeze_frame=True, cue_index=p["cue_index"] )) cumulative_time_ms += freeze_duration_ms segment_idx += 1 # Upload final segment if exists if final_segment_path: gcs_path = f"{gcs_segment_prefix}seg_{segment_idx}.mp4" gcs_uri = self._upload_to_gcs_permanent(str(final_segment_path), gcs_path) final_duration_ms = (source_duration - final_segment_start) * 1000 segment_metadata_list.append(VideoSegmentMetadata( segment_index=segment_idx, start_ms=cumulative_time_ms, end_ms=cumulative_time_ms + final_duration_ms, gcs_uri=gcs_uri, duration_ms=final_duration_ms, is_freeze_frame=False, cue_index=None )) logger.info(f"Persisted {len(segment_metadata_list)} segments to GCS") # Build PausePointData list with bounds # Pause points should be in RENDERED video coordinates (matching segment timeline) pause_point_data_list = [] # Find the start position of each freeze frame segment in the rendered timeline freeze_frame_starts = {} for seg in segment_metadata_list: if seg.is_freeze_frame and seg.cue_index is not None: freeze_frame_starts[seg.cue_index] = seg.start_ms for idx, p in enumerate(valid_placements): cue_index = p["cue_index"] # Pause point is at the START of the freeze frame in the rendered timeline pause_ms = freeze_frame_starts.get(cue_index, p["pause_point"] * 1000) # Find the freeze segment for this cue to get its end position freeze_seg = next((s for s in segment_metadata_list if s.is_freeze_frame and s.cue_index == cue_index), None) # Compute min bound: end of previous AD segment (or 0 for first) if idx == 0: min_bound_ms = 0.0 else: prev_cue_index = valid_placements[idx - 1]["cue_index"] prev_freeze_seg = next((s for s in segment_metadata_list if s.is_freeze_frame and s.cue_index == prev_cue_index), None) if prev_freeze_seg: min_bound_ms = prev_freeze_seg.end_ms else: min_bound_ms = 0.0 # Compute max bound: start of next freeze frame (or total duration for last) if idx == len(valid_placements) - 1: # Max bound is the total rendered video duration max_bound_ms = segment_metadata_list[-1].end_ms if segment_metadata_list else source_duration * 1000 else: next_cue_index = valid_placements[idx + 1]["cue_index"] next_pause_ms = freeze_frame_starts.get(next_cue_index, valid_placements[idx + 1]["pause_point"] * 1000) max_bound_ms = next_pause_ms pause_point_data_list.append(PausePointData( cue_index=cue_index, original_ms=pause_ms, # Rendered timeline position source_ms=p["pause_point"] * 1000, # Source video cut point adjusted_ms=None, min_bound_ms=min_bound_ms, max_bound_ms=max_bound_ms )) logger.info(f"Built {len(pause_point_data_list)} pause point data entries") return (output_path, updated_placements, segment_metadata_list, pause_point_data_list) async def _get_video_duration(self, video_path: str) -> float: """Get video duration in seconds using ffprobe.""" if self._use_cloud_run: return await self._get_video_duration_cloud_run(video_path) return await self._get_video_duration_local(video_path) async def _get_video_duration_local(self, video_path: str) -> float: """Get video duration using local ffprobe via Celery queue.""" cmd = [ self.ffprobe_path, "-v", "quiet", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path ] result = await self._dispatch_ffprobe(cmd) return float(result['stdout'].strip()) async def _get_video_duration_cloud_run(self, video_path: str) -> float: """Get video duration via Cloud Run FFmpeg service.""" # Use cached source if available, otherwise upload if self._cached_source_gcs_uri: gcs_uri = self._cached_source_gcs_uri else: gcs_uri = self._upload_to_gcs_temp(video_path, "probe") try: result = await self._call_cloud_run_probe(gcs_uri) return result["duration"] finally: # Clean up if we uploaded specifically for this call if not self._cached_source_gcs_uri: self._delete_gcs_temp(gcs_uri) async def _get_video_properties(self, video_path: str) -> dict[str, Any]: """Get detailed video and audio properties.""" if self._use_cloud_run: return await self._get_video_properties_cloud_run(video_path) return await self._get_video_properties_local(video_path) async def _get_video_properties_local(self, video_path: str) -> dict[str, Any]: """Get video properties using local ffprobe via Celery queue.""" cmd = [ self.ffprobe_path, "-v", "quiet", "-show_streams", "-of", "json", video_path ] result = await self._dispatch_ffprobe(cmd) data = json.loads(result['stdout']) # Defaults (44100 is common for MP3, but we detect from source) props = { "width": 1920, "height": 1080, "fps": 30.0, "sample_rate": "44100", "channels": "2", "pix_fmt": "yuv420p", "codec": "h264" } for stream in data.get("streams", []): if stream.get("codec_type") == "video": props["width"] = stream.get("width", props["width"]) props["height"] = stream.get("height", props["height"]) props["pix_fmt"] = stream.get("pix_fmt", props["pix_fmt"]) props["codec"] = stream.get("codec_name", props["codec"]) # Parse frame rate (e.g., "30000/1001" or "30/1") fps_str = stream.get("r_frame_rate", "30/1") if "/" in fps_str: num, den = fps_str.split("/") props["fps"] = float(num) / float(den) else: props["fps"] = float(fps_str) elif stream.get("codec_type") == "audio": props["sample_rate"] = stream.get("sample_rate", props["sample_rate"]) props["channels"] = str(stream.get("channels", props["channels"])) return props async def _get_video_properties_cloud_run(self, video_path: str) -> dict[str, Any]: """Get video properties via Cloud Run FFmpeg service.""" # Use cached source if available, otherwise upload if self._cached_source_gcs_uri: gcs_uri = self._cached_source_gcs_uri else: gcs_uri = self._upload_to_gcs_temp(video_path, "probe") try: result = await self._call_cloud_run_probe(gcs_uri) # Defaults props = { "width": 1920, "height": 1080, "fps": 30.0, "sample_rate": "44100", "channels": "2", "pix_fmt": "yuv420p", "codec": "h264" } # Extract from probe result for stream in result.get("streams", []): if stream.get("codec_type") == "video": props["width"] = stream.get("width", props["width"]) props["height"] = stream.get("height", props["height"]) props["pix_fmt"] = stream.get("pix_fmt", props["pix_fmt"]) props["codec"] = stream.get("codec_name", props["codec"]) fps_str = stream.get("r_frame_rate", "30/1") if "/" in fps_str: num, den = fps_str.split("/") props["fps"] = float(num) / float(den) else: props["fps"] = float(fps_str) elif stream.get("codec_type") == "audio": props["sample_rate"] = stream.get("sample_rate", props["sample_rate"]) props["channels"] = str(stream.get("channels", props["channels"])) return props finally: # Clean up if we uploaded specifically for this call if not self._cached_source_gcs_uri: self._delete_gcs_temp(gcs_uri) async def _extract_segment( self, source_path: str, start_time: float, duration: float, output_path: str ): """Extract a video segment using ffmpeg (stream copy - for overlay method).""" cmd = [ self.ffmpeg_path, "-y", "-ss", str(start_time), "-i", source_path, "-t", str(duration), "-c", "copy", "-avoid_negative_ts", "make_zero", output_path ] await self._run_ffmpeg(cmd) async def _extract_segment_reencoded( self, source_path: str, start_time: float, duration: float, output_path: str, props: dict[str, Any] ): """ Extract segment with RE-ENCODING for frame-accurate cuts. Crucial for pause-insert method to avoid: - Keyframe-only cuts causing audio dropouts - Timestamp desynchronization """ if self._use_cloud_run: await self._extract_segment_reencoded_cloud_run( source_path, start_time, duration, output_path, props ) else: await self._extract_segment_reencoded_local( source_path, start_time, duration, output_path, props ) async def _extract_segment_reencoded_local( self, source_path: str, start_time: float, duration: float, output_path: str, props: dict[str, Any] ): """Extract segment with re-encoding using local ffmpeg via Celery queue.""" cmd = [ self.ffmpeg_path, "-y", "-ss", str(start_time), "-i", source_path, "-t", str(duration), # Video Encoding "-c:v", "libx264", "-preset", "fast", "-pix_fmt", props["pix_fmt"], "-r", str(props["fps"]), # Audio Encoding (Force match source) "-c:a", "aac", "-ar", props["sample_rate"], "-ac", props["channels"], "-b:a", "192k", # Ensure timestamp continuity "-video_track_timescale", "90000", output_path ] await self._run_ffmpeg(cmd) async def _extract_segment_reencoded_cloud_run( self, source_path: str, start_time: float, duration: float, output_path: str, props: dict[str, Any] ): """Extract segment with re-encoding via Cloud Run FFmpeg service.""" # Use cached source GCS URI if not self._cached_source_gcs_uri: raise FFmpegExecutionError("Source video not cached for Cloud Run operation") output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/segment/{uuid4().hex}/segment.mp4" try: await self._call_cloud_run_endpoint( "/encode-segment", { "source_gcs_uri": self._cached_source_gcs_uri, "output_gcs_uri": output_gcs_uri, "start_time": start_time, "duration": duration, "width": props["width"], "height": props["height"], "fps": props["fps"], "pix_fmt": props["pix_fmt"], "sample_rate": props["sample_rate"], "channels": props["channels"], } ) # Download result to local path self._download_from_gcs_temp(output_gcs_uri, output_path) finally: self._delete_gcs_temp(output_gcs_uri) async def _extract_frame(self, video_path: str, time_point: float, output_path: str): """Extract a single frame as PNG using ffmpeg.""" if self._use_cloud_run: await self._extract_frame_cloud_run(video_path, time_point, output_path) else: await self._extract_frame_local(video_path, time_point, output_path) async def _extract_frame_local(self, video_path: str, time_point: float, output_path: str): """Extract frame using local ffmpeg via Celery queue.""" logger.debug(f"Extracting frame at {time_point:.2f}s from {video_path}") cmd = [ self.ffmpeg_path, "-y", "-ss", str(time_point), "-i", video_path, "-frames:v", "1", "-q:v", "2", output_path ] await self._run_ffmpeg(cmd) # Verify frame was actually created (FFmpeg can "succeed" but produce nothing # if time_point is beyond video duration) if not os.path.exists(output_path): raise FileNotFoundError( f"Frame extraction failed: no frame created at {time_point:.2f}s " f"(time may be beyond video duration)" ) async def _extract_frame_cloud_run(self, video_path: str, time_point: float, output_path: str): """Extract frame via Cloud Run FFmpeg service.""" if not self._cached_source_gcs_uri: raise FFmpegExecutionError("Source video not cached for Cloud Run operation") output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/frame/{uuid4().hex}/frame.png" try: await self._call_cloud_run_endpoint( "/extract-frame", { "source_gcs_uri": self._cached_source_gcs_uri, "output_gcs_uri": output_gcs_uri, "time_point": time_point, } ) # Download result to local path self._download_from_gcs_temp(output_gcs_uri, output_path) # Verify frame was actually created if not os.path.exists(output_path): raise FileNotFoundError( f"Frame extraction failed: no frame created at {time_point:.2f}s " f"(time may be beyond video duration)" ) finally: self._delete_gcs_temp(output_gcs_uri) async def _create_freeze_segment( self, frame_path: str, audio_path: str, duration: float, output_path: str, video_props: dict[str, Any] ): """Create a freeze-frame video segment with audio overlay (for overlay method).""" width = video_props.get("width", 1920) height = video_props.get("height", 1080) fps = video_props.get("fps", 30) cmd = [ self.ffmpeg_path, "-y", "-loop", "1", "-i", frame_path, "-i", audio_path, "-c:v", "libx264", "-preset", "fast", "-tune", "stillimage", "-c:a", "aac", "-b:a", "192k", "-pix_fmt", "yuv420p", "-vf", f"scale={width}:{height}:force_original_aspect_ratio=decrease,pad={width}:{height}:(ow-iw)/2:(oh-ih)/2", "-r", str(fps), "-t", str(duration), "-shortest", output_path ] await self._run_ffmpeg(cmd) async def _create_freeze_segment_matched( self, frame_path: str, audio_path: str, duration: float, output_path: str, props: dict[str, Any] ): """ Create freeze frame that rigidly matches the source video properties. This fixes the "silent pause" issue caused by sample rate mismatch when concatenating with extracted video segments. """ if self._use_cloud_run: await self._create_freeze_segment_matched_cloud_run( frame_path, audio_path, duration, output_path, props ) else: await self._create_freeze_segment_matched_local( frame_path, audio_path, duration, output_path, props ) async def _create_freeze_segment_matched_local( self, frame_path: str, audio_path: str, duration: float, output_path: str, props: dict[str, Any] ): """Create freeze segment using local ffmpeg via Celery queue.""" # Validate inputs if duration <= 0: raise ValueError(f"Invalid freeze segment duration: {duration}") if not os.path.exists(frame_path): raise FileNotFoundError(f"Frame file not found: {frame_path}") if not os.path.exists(audio_path): raise FileNotFoundError(f"Audio file not found: {audio_path}") logger.debug( f"Creating freeze segment: frame={frame_path}, audio={audio_path}, " f"duration={duration:.2f}s, props={props}" ) cmd = [ self.ffmpeg_path, "-y", "-loop", "1", "-i", frame_path, "-i", audio_path, "-c:v", "libx264", "-preset", "fast", "-tune", "stillimage", "-pix_fmt", props["pix_fmt"], "-r", str(props["fps"]), # Scale filter to ensure dimensions match exactly "-vf", f"scale={props['width']}:{props['height']}:force_original_aspect_ratio=decrease,pad={props['width']}:{props['height']}:(ow-iw)/2:(oh-ih)/2", # Audio Encoding (CRITICAL: Match source sample rate and channels) "-c:a", "aac", "-ar", props["sample_rate"], "-ac", props["channels"], "-b:a", "192k", "-t", str(duration), "-video_track_timescale", "90000", "-shortest", output_path ] await self._run_ffmpeg(cmd) async def _create_freeze_segment_matched_cloud_run( self, frame_path: str, audio_path: str, duration: float, output_path: str, props: dict[str, Any] ): """Create freeze segment via Cloud Run FFmpeg service.""" # Validate inputs if duration <= 0: raise ValueError(f"Invalid freeze segment duration: {duration}") if not os.path.exists(frame_path): raise FileNotFoundError(f"Frame file not found: {frame_path}") if not os.path.exists(audio_path): raise FileNotFoundError(f"Audio file not found: {audio_path}") # Upload frame and audio to GCS frame_gcs_uri = self._upload_to_gcs_temp(frame_path, "frame") audio_gcs_uri = self._upload_to_gcs_temp(audio_path, "audio") output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/freeze/{uuid4().hex}/freeze.mp4" try: await self._call_cloud_run_endpoint( "/create-freeze-segment", { "frame_gcs_uri": frame_gcs_uri, "audio_gcs_uri": audio_gcs_uri, "output_gcs_uri": output_gcs_uri, "duration": duration, "width": props["width"], "height": props["height"], "fps": props["fps"], "pix_fmt": props["pix_fmt"], "sample_rate": props["sample_rate"], "channels": props["channels"], } ) # Download result to local path self._download_from_gcs_temp(output_gcs_uri, output_path) finally: self._delete_gcs_temp(frame_gcs_uri) self._delete_gcs_temp(audio_gcs_uri) self._delete_gcs_temp(output_gcs_uri) async def _extract_audio_segment( self, source_path: str, start_time: float, duration: float, output_path: str, props: dict[str, Any] ): """ Extract audio segment from video for catch-up audio. Used to extract the source audio from [resume_from, pause_point] which plays during the freeze frame extension after AD audio. Args: source_path: Path to source video start_time: Start time in seconds duration: Duration in seconds output_path: Path to output audio file props: Video properties (for sample_rate, channels) """ if duration <= 0: raise ValueError(f"Invalid audio segment duration: {duration}") logger.debug( f"Extracting audio segment: start={start_time:.2f}s, " f"duration={duration:.2f}s, output={output_path}" ) cmd = [ self.ffmpeg_path, "-y", "-ss", str(start_time), "-i", source_path, "-t", str(duration), "-vn", # No video "-c:a", "aac", # Match freeze segment encoding "-ar", props["sample_rate"], "-ac", props["channels"], "-b:a", "192k", output_path ] await self._run_ffmpeg(cmd) async def _concatenate_audio( self, audio_paths: list[str], output_path: str, props: dict[str, Any] ): """ Concatenate multiple audio files for combined AD + catch-up audio. Uses FFmpeg concat filter for seamless joining with consistent encoding. """ if self._use_cloud_run: await self._concatenate_audio_cloud_run(audio_paths, output_path, props) else: await self._concatenate_audio_local(audio_paths, output_path, props) async def _concatenate_audio_local( self, audio_paths: list[str], output_path: str, props: dict[str, Any] ): """Concatenate audio files using local ffmpeg via Celery queue.""" if not audio_paths: raise ValueError("No audio files to concatenate") if len(audio_paths) == 1: # Just copy the single file with re-encoding to ensure format consistency cmd = [ self.ffmpeg_path, "-y", "-i", audio_paths[0], "-c:a", "aac", "-ar", props["sample_rate"], "-ac", props["channels"], "-b:a", "192k", output_path ] await self._run_ffmpeg(cmd) return # Build filter complex for concatenation inputs = [] filter_parts = [] for i, path in enumerate(audio_paths): inputs.extend(["-i", path]) filter_parts.append(f"[{i}:a]") filter_complex = "".join(filter_parts) + f"concat=n={len(audio_paths)}:v=0:a=1[out]" logger.debug( f"Concatenating {len(audio_paths)} audio files: {audio_paths}" ) cmd = [ self.ffmpeg_path, "-y", *inputs, "-filter_complex", filter_complex, "-map", "[out]", "-c:a", "aac", "-ar", props["sample_rate"], "-ac", props["channels"], "-b:a", "192k", output_path ] await self._run_ffmpeg(cmd) async def _concatenate_audio_cloud_run( self, audio_paths: list[str], output_path: str, props: dict[str, Any] ): """Concatenate audio files via Cloud Run FFmpeg service.""" if not audio_paths: raise ValueError("No audio files to concatenate") # Upload all audio files to GCS audio_gcs_uris = [] for path in audio_paths: gcs_uri = self._upload_to_gcs_temp(path, "audio") audio_gcs_uris.append(gcs_uri) output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/audio/{uuid4().hex}/combined.m4a" try: # Build ffmpeg command for Cloud Run # Use filter_complex for concatenation # Build command template with placeholders for each input filter_parts = [] for i in range(len(audio_gcs_uris)): filter_parts.append(f"[{i}:a]") filter_complex = "".join(filter_parts) + f"concat=n={len(audio_gcs_uris)}:v=0:a=1[out]" # Build command template with -i {input_N} for each input file cmd_template = ["ffmpeg", "-y"] for i in range(len(audio_gcs_uris)): cmd_template.extend(["-i", f"{{input_{i}}}"]) cmd_template.extend([ "-filter_complex", filter_complex, "-map", "[out]", "-c:a", "aac", "-ar", props["sample_rate"], "-ac", props["channels"], "-b:a", "192k", "{output}" ]) await self._call_cloud_run_endpoint( "/run-ffmpeg", { "input_gcs_uris": audio_gcs_uris, "output_gcs_uri": output_gcs_uri, "command_template": cmd_template } ) # Download result to local path self._download_from_gcs_temp(output_gcs_uri, output_path) finally: # Cleanup for gcs_uri in audio_gcs_uris: self._delete_gcs_temp(gcs_uri) self._delete_gcs_temp(output_gcs_uri) async def _generate_silence( self, duration: float, output_path: str, props: dict[str, Any] ): """ Generate a silent audio file of specified duration. Used to create 500ms silence buffers before/after AD audio. """ if self._use_cloud_run: await self._generate_silence_cloud_run(duration, output_path, props) else: await self._generate_silence_local(duration, output_path, props) async def _generate_silence_local( self, duration: float, output_path: str, props: dict[str, Any] ): """Generate silence using local ffmpeg via Celery queue.""" if duration <= 0: raise ValueError(f"Invalid silence duration: {duration}") logger.debug( f"Generating {duration:.2f}s silence: output={output_path}" ) cmd = [ self.ffmpeg_path, "-y", "-f", "lavfi", "-i", f"anullsrc=r={props['sample_rate']}:cl={'stereo' if props['channels'] == '2' else 'mono'}", "-t", str(duration), "-c:a", "aac", "-ar", props["sample_rate"], "-ac", props["channels"], "-b:a", "192k", output_path ] await self._run_ffmpeg(cmd) async def _generate_silence_cloud_run( self, duration: float, output_path: str, props: dict[str, Any] ): """Generate silence via Cloud Run FFmpeg service.""" if duration <= 0: raise ValueError(f"Invalid silence duration: {duration}") output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/silence/{uuid4().hex}/silence.m4a" try: channel_layout = "stereo" if props["channels"] == "2" else "mono" await self._call_cloud_run_endpoint( "/run-ffmpeg", { "input_gcs_uris": [], # No input files, using lavfi "output_gcs_uri": output_gcs_uri, "command_template": [ "ffmpeg", "-y", "-f", "lavfi", "-i", f"anullsrc=r={props['sample_rate']}:cl={channel_layout}", "-t", str(duration), "-c:a", "aac", "-ar", props["sample_rate"], "-ac", props["channels"], "-b:a", "192k", "{output}" ] } ) # Download result to local path self._download_from_gcs_temp(output_gcs_uri, output_path) finally: self._delete_gcs_temp(output_gcs_uri) async def _concatenate_segments( self, segment_paths: list[str], output_path: str, temp_dir: Path ): """Concatenate video segments using ffmpeg concat demuxer.""" if self._use_cloud_run: await self._concatenate_segments_cloud_run(segment_paths, output_path) else: await self._concatenate_segments_local(segment_paths, output_path, temp_dir) async def _concatenate_segments_local( self, segment_paths: list[str], output_path: str, temp_dir: Path ): """Concatenate video segments using local ffmpeg via Celery queue.""" # Create concat file concat_file = temp_dir / "concat.txt" with open(concat_file, "w") as f: for path in segment_paths: # Escape single quotes in path escaped_path = path.replace("'", "'\\''") f.write(f"file '{escaped_path}'\n") cmd = [ self.ffmpeg_path, "-y", "-f", "concat", "-safe", "0", "-i", str(concat_file), "-c", "copy", output_path ] await self._run_ffmpeg(cmd) async def _concatenate_segments_cloud_run( self, segment_paths: list[str], output_path: str ): """Concatenate video segments via Cloud Run FFmpeg service.""" # Upload all segment files to GCS segment_gcs_uris = [] for path in segment_paths: gcs_uri = self._upload_to_gcs_temp(path, "segment") segment_gcs_uris.append(gcs_uri) output_gcs_uri = f"gs://{settings.gcs_bucket}/temp/ffmpeg/concat/{uuid4().hex}/final.mp4" try: await self._call_cloud_run_endpoint( "/concatenate", { "segment_gcs_uris": segment_gcs_uris, "output_gcs_uri": output_gcs_uri, } ) # Download result to local path self._download_from_gcs_temp(output_gcs_uri, output_path) finally: # Cleanup uploaded segments for gcs_uri in segment_gcs_uris: self._delete_gcs_temp(gcs_uri) self._delete_gcs_temp(output_gcs_uri) async def _copy_video(self, source_path: str, output_path: str): """Copy video without modification.""" cmd = [ self.ffmpeg_path, "-y", "-i", source_path, "-c", "copy", output_path ] await self._run_ffmpeg(cmd) async def _run_ffmpeg(self, cmd: list[str], timeout: int = 3600): """Run ffmpeg command via the dedicated ffmpeg queue.""" logger.debug(f"Running command: {' '.join(cmd)}") await self._dispatch_ffmpeg(cmd, timeout) # Global service instance video_renderer_service = VideoRendererService()