perf: parallelize FFmpeg Cloud Run calls using asyncio.gather()
Refactored _render_pause_insert to execute FFmpeg operations in parallel phases instead of sequentially: Phase 1: Parallel extraction - Generate shared silence (once, reused by all) - Extract ALL video segments simultaneously - Extract ALL freeze frames simultaneously - Extract final segment Phase 2: Parallel audio concatenation - Concatenate ALL audio tracks (silence + AD + silence) simultaneously Phase 3: Parallel freeze segment creation - Create ALL freeze segments simultaneously Phase 4: Assemble segments in correct order for final concatenation This reduces render time from ~3 minutes (serial) to ~30 seconds (parallel) for an 8-cue video when using Cloud Run FFmpeg service. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
87a4b1ab77
commit
e2302d497d
1 changed files with 119 additions and 56 deletions
|
|
@ -504,12 +504,12 @@ class VideoRendererService:
|
|||
source_duration = await self._get_video_duration(source_video_path)
|
||||
logger.info(f"Source Properties: {video_props}, Duration: {source_duration:.2f}s")
|
||||
|
||||
segment_files = []
|
||||
current_time = 0.0
|
||||
|
||||
# Create a mapping of cue_index to mp3_path
|
||||
cue_to_mp3 = {cue_index: mp3_path for cue_index, mp3_path in ad_segments}
|
||||
|
||||
# Pre-process placements and validate
|
||||
valid_placements = []
|
||||
current_time = 0.0
|
||||
for i, placement in enumerate(sorted_placements):
|
||||
pause_point = placement["pause_point"]
|
||||
cue_index = placement["ad_cue_index"]
|
||||
|
|
@ -521,93 +521,156 @@ class VideoRendererService:
|
|||
f"Cue {cue_index}: pause_point {pause_point:.2f}s exceeds video duration "
|
||||
f"{source_duration:.2f}s, clamping to {source_duration - 0.1:.2f}s"
|
||||
)
|
||||
pause_point = max(0, source_duration - 0.1) # Clamp to 100ms before end
|
||||
pause_point = max(0, source_duration - 0.1)
|
||||
|
||||
# Get the AD audio for this cue
|
||||
ad_mp3_path = cue_to_mp3.get(cue_index)
|
||||
if not ad_mp3_path:
|
||||
logger.warning(f"No AD audio found for cue {cue_index}, skipping")
|
||||
continue
|
||||
|
||||
# 1. Extract video segment from current_time to pause_point (re-encoded for frame accuracy)
|
||||
if pause_point > current_time:
|
||||
valid_placements.append({
|
||||
"index": i,
|
||||
"pause_point": pause_point,
|
||||
"cue_index": cue_index,
|
||||
"ad_duration": ad_duration,
|
||||
"ad_mp3_path": ad_mp3_path,
|
||||
"segment_start": current_time,
|
||||
})
|
||||
current_time = pause_point
|
||||
|
||||
# Track final segment info
|
||||
final_segment_start = current_time
|
||||
final_segment_needed = final_segment_start < source_duration
|
||||
|
||||
# ============================================================
|
||||
# PARALLEL PHASE 1: Generate shared silence + extract all frames + all video segments
|
||||
# ============================================================
|
||||
logger.info(f"Phase 1: Parallel extraction of {len(valid_placements)} frames and video segments")
|
||||
|
||||
silence_duration = 0.5 # 500ms shared by all
|
||||
silence_path = temp_dir_path / "silence_shared.m4a"
|
||||
|
||||
# Build tasks for phase 1
|
||||
phase1_tasks = []
|
||||
|
||||
# Task: Generate silence (just once, shared by all)
|
||||
phase1_tasks.append(self._generate_silence(silence_duration, str(silence_path), video_props))
|
||||
|
||||
# Tasks: Extract all video segments
|
||||
video_segment_paths = {}
|
||||
for p in valid_placements:
|
||||
i = p["index"]
|
||||
if p["pause_point"] > p["segment_start"]:
|
||||
segment_path = temp_dir_path / f"segment_{i}_video.mp4"
|
||||
await self._extract_segment_reencoded(
|
||||
video_segment_paths[i] = str(segment_path)
|
||||
phase1_tasks.append(self._extract_segment_reencoded(
|
||||
source_video_path,
|
||||
current_time,
|
||||
pause_point - current_time,
|
||||
p["segment_start"],
|
||||
p["pause_point"] - p["segment_start"],
|
||||
str(segment_path),
|
||||
video_props
|
||||
)
|
||||
segment_files.append(str(segment_path))
|
||||
))
|
||||
|
||||
# 2. Extract freeze frame at pause point
|
||||
freeze_frame_path = temp_dir_path / f"freeze_{i}.png"
|
||||
await self._extract_frame(
|
||||
# Task: Extract final segment if needed
|
||||
final_segment_path = None
|
||||
if final_segment_needed:
|
||||
final_segment_path = temp_dir_path / "segment_final.mp4"
|
||||
phase1_tasks.append(self._extract_segment_reencoded(
|
||||
source_video_path,
|
||||
pause_point,
|
||||
str(freeze_frame_path)
|
||||
)
|
||||
|
||||
# 3. Prepare audio for freeze segment
|
||||
# Add 500ms silence before AND after the AD audio for smooth transitions
|
||||
silence_duration = 0.5 # 500ms
|
||||
silence_path = temp_dir_path / f"silence_{i}.m4a"
|
||||
await self._generate_silence(
|
||||
silence_duration,
|
||||
str(silence_path),
|
||||
final_segment_start,
|
||||
source_duration - final_segment_start,
|
||||
str(final_segment_path),
|
||||
video_props
|
||||
)
|
||||
))
|
||||
|
||||
# Concatenate: 500ms silence + AD audio + 500ms silence
|
||||
# Tasks: Extract all freeze frames
|
||||
freeze_frame_paths = {}
|
||||
for p in valid_placements:
|
||||
i = p["index"]
|
||||
freeze_frame_path = temp_dir_path / f"freeze_{i}.png"
|
||||
freeze_frame_paths[i] = str(freeze_frame_path)
|
||||
phase1_tasks.append(self._extract_frame(
|
||||
source_video_path,
|
||||
p["pause_point"],
|
||||
str(freeze_frame_path)
|
||||
))
|
||||
|
||||
# Execute phase 1 in parallel
|
||||
await asyncio.gather(*phase1_tasks)
|
||||
logger.info(f"Phase 1 complete: extracted {len(freeze_frame_paths)} frames, {len(video_segment_paths)} video segments")
|
||||
|
||||
# ============================================================
|
||||
# PARALLEL PHASE 2: Concatenate all audio tracks
|
||||
# ============================================================
|
||||
logger.info(f"Phase 2: Parallel audio concatenation for {len(valid_placements)} placements")
|
||||
|
||||
phase2_tasks = []
|
||||
combined_audio_paths = {}
|
||||
for p in valid_placements:
|
||||
i = p["index"]
|
||||
combined_audio_path = temp_dir_path / f"combined_audio_{i}.m4a"
|
||||
await self._concatenate_audio(
|
||||
[str(silence_path), ad_mp3_path, str(silence_path)],
|
||||
combined_audio_paths[i] = str(combined_audio_path)
|
||||
phase2_tasks.append(self._concatenate_audio(
|
||||
[str(silence_path), p["ad_mp3_path"], str(silence_path)],
|
||||
str(combined_audio_path),
|
||||
video_props
|
||||
)
|
||||
))
|
||||
|
||||
freeze_audio_path = str(combined_audio_path)
|
||||
total_freeze_duration = ad_duration + (2 * silence_duration) # AD + 1.0s total silence
|
||||
await asyncio.gather(*phase2_tasks)
|
||||
logger.info(f"Phase 2 complete: concatenated {len(combined_audio_paths)} audio tracks")
|
||||
|
||||
# ============================================================
|
||||
# PARALLEL PHASE 3: Create all freeze segments
|
||||
# ============================================================
|
||||
logger.info(f"Phase 3: Parallel freeze segment creation for {len(valid_placements)} placements")
|
||||
|
||||
phase3_tasks = []
|
||||
freeze_segment_paths = {}
|
||||
for p in valid_placements:
|
||||
i = p["index"]
|
||||
cue_index = p["cue_index"]
|
||||
ad_duration = p["ad_duration"]
|
||||
total_freeze_duration = ad_duration + (2 * silence_duration)
|
||||
|
||||
logger.info(
|
||||
f"Cue {cue_index}: Freeze segment with silence buffers - "
|
||||
f"500ms + AD={ad_duration:.2f}s + 500ms = {total_freeze_duration:.2f}s"
|
||||
)
|
||||
|
||||
# 4. Create freeze segment with prepared audio
|
||||
freeze_segment_path = temp_dir_path / f"freeze_segment_{i}.mp4"
|
||||
await self._create_freeze_segment_matched(
|
||||
str(freeze_frame_path),
|
||||
freeze_audio_path,
|
||||
freeze_segment_paths[i] = str(freeze_segment_path)
|
||||
phase3_tasks.append(self._create_freeze_segment_matched(
|
||||
freeze_frame_paths[i],
|
||||
combined_audio_paths[i],
|
||||
total_freeze_duration,
|
||||
str(freeze_segment_path),
|
||||
video_props
|
||||
)
|
||||
segment_files.append(str(freeze_segment_path))
|
||||
))
|
||||
|
||||
# 5. Resume video from pause_point (NOT resume_from)
|
||||
# This eliminates the visual jump-back artifact
|
||||
# The audio catch-up during freeze frame handles the overlap region
|
||||
current_time = pause_point
|
||||
await asyncio.gather(*phase3_tasks)
|
||||
logger.info(f"Phase 3 complete: created {len(freeze_segment_paths)} freeze segments")
|
||||
|
||||
# 4. Add final segment from last pause point to end (re-encoded for uniformity)
|
||||
# ============================================================
|
||||
# PHASE 4: Assemble segment list in correct order
|
||||
# ============================================================
|
||||
segment_files = []
|
||||
for p in valid_placements:
|
||||
i = p["index"]
|
||||
# Add video segment if it exists
|
||||
if i in video_segment_paths:
|
||||
segment_files.append(video_segment_paths[i])
|
||||
# Add freeze segment
|
||||
segment_files.append(freeze_segment_paths[i])
|
||||
|
||||
# Add final segment
|
||||
logger.info(
|
||||
f"Final segment check: current_time={current_time:.2f}s, "
|
||||
f"Final segment check: current_time={final_segment_start:.2f}s, "
|
||||
f"source_duration={source_duration:.2f}s, "
|
||||
f"remaining={source_duration - current_time:.2f}s"
|
||||
f"remaining={source_duration - final_segment_start:.2f}s"
|
||||
)
|
||||
if current_time < source_duration:
|
||||
final_segment_path = temp_dir_path / "segment_final.mp4"
|
||||
await self._extract_segment_reencoded(
|
||||
source_video_path,
|
||||
current_time,
|
||||
source_duration - current_time,
|
||||
str(final_segment_path),
|
||||
video_props
|
||||
)
|
||||
if final_segment_path:
|
||||
segment_files.append(str(final_segment_path))
|
||||
logger.info(f"Added final segment: {current_time:.2f}s to {source_duration:.2f}s")
|
||||
logger.info(f"Added final segment: {final_segment_start:.2f}s to {source_duration:.2f}s")
|
||||
else:
|
||||
logger.info("No final segment needed (current_time >= source_duration)")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue