Subtitles were appearing progressively out of sync (~1.0s early per AD) because the VTT retimer calculated freeze durations theoretically rather than using actual rendered segment durations. Changes: - video_renderer: Measure actual freeze segment duration after creation - video_renderer: Return updated placements with actual_freeze_duration - vtt_retimer: Prefer actual_freeze_duration over calculated values - render_task: Pass actual durations to VTT retimer This ensures subtitle timing matches the real video timeline regardless of any FFmpeg encoding variations. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
391 lines
13 KiB
Python
391 lines
13 KiB
Python
"""Service for re-timing VTT files when pauses are inserted into video."""
|
|
|
|
from typing import Any
|
|
|
|
from ..core.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class VTTRetimerService:
|
|
"""Service for re-timing VTT subtitle files after pause insertions."""
|
|
|
|
# Minimum segment duration to keep after splitting (100ms)
|
|
MIN_SEGMENT_DURATION = 0.1
|
|
|
|
def retime_for_pause_insert(
|
|
self,
|
|
original_vtt: str,
|
|
analysis: dict[str, Any]
|
|
) -> str:
|
|
"""
|
|
Generate new VTT with adjusted timings for pause-insert accessible video.
|
|
|
|
Timeline mapping for pause-insert:
|
|
- Source [0, pause_point) -> Accessible [0, pause_point)
|
|
- Freeze: Accessible [pause_point, pause_point + freeze_duration)
|
|
- Source [pause_point, end) -> Accessible [pause_point + freeze_duration, ...]
|
|
|
|
Where freeze_duration = ad_duration + 1.0s (500ms silence buffers before and after)
|
|
|
|
Cues that span pause points are split to ensure captions don't display during
|
|
freeze periods (when AD audio is playing).
|
|
|
|
Args:
|
|
original_vtt: Original VTT content
|
|
analysis: Gemini analysis with placements containing pause_point and ad_duration
|
|
|
|
Returns:
|
|
Re-timed VTT content
|
|
"""
|
|
placements = analysis.get("placements", [])
|
|
|
|
# Build sorted list of (pause_point, effective_offset)
|
|
pauses = self._build_pause_list(placements)
|
|
|
|
if not pauses:
|
|
logger.info("No pauses to apply, returning original VTT")
|
|
return original_vtt
|
|
|
|
logger.info(f"Re-timing VTT with {len(pauses)} pause insertions")
|
|
|
|
# Parse and retime cues
|
|
cues = self._parse_vtt(original_vtt)
|
|
retimed_cues = []
|
|
|
|
for cue in cues:
|
|
segments = self._retime_cue(cue, pauses)
|
|
retimed_cues.extend(segments)
|
|
|
|
# Filter out very short segments
|
|
retimed_cues = self._filter_short_segments(retimed_cues)
|
|
|
|
return self._build_vtt(retimed_cues)
|
|
|
|
def _build_pause_list(
|
|
self,
|
|
placements: list[dict]
|
|
) -> list[tuple[float, float]]:
|
|
"""Build sorted list of (pause_point, effective_offset) tuples.
|
|
|
|
Uses actual_freeze_duration when available (measured from rendered video),
|
|
otherwise falls back to calculated value (ad_duration + 1.0s).
|
|
"""
|
|
silence_buffer_total = 1.0 # 500ms + 500ms (fallback calculation)
|
|
|
|
pauses = []
|
|
for placement in placements:
|
|
pause_point = placement.get("pause_point")
|
|
ad_duration = placement.get("ad_duration", 0)
|
|
|
|
if pause_point is not None and ad_duration > 0:
|
|
# Prefer actual freeze duration if available (measured from rendered video)
|
|
actual_freeze = placement.get("actual_freeze_duration")
|
|
if actual_freeze is not None:
|
|
effective_offset = actual_freeze
|
|
logger.debug(
|
|
f"Pause at {pause_point:.2f}s: using actual_freeze_duration={effective_offset:.2f}s "
|
|
f"(ad_duration={ad_duration:.2f}s)"
|
|
)
|
|
else:
|
|
effective_offset = ad_duration + silence_buffer_total
|
|
logger.debug(
|
|
f"Pause at {pause_point:.2f}s: using calculated freeze_duration={effective_offset:.2f}s "
|
|
f"(ad_duration={ad_duration:.2f}s + 1.0s buffer)"
|
|
)
|
|
|
|
pauses.append((pause_point, effective_offset))
|
|
|
|
return sorted(pauses, key=lambda x: x[0])
|
|
|
|
def _offset_at(
|
|
self,
|
|
timestamp: float,
|
|
pauses: list[tuple[float, float]]
|
|
) -> float:
|
|
"""
|
|
Calculate cumulative offset for timestamps AT or AFTER pause points.
|
|
|
|
A pause at time T affects all timestamps >= T. This is used for cue
|
|
segments that start after a pause (the segment starts after the freeze).
|
|
"""
|
|
return sum(eo for pp, eo in pauses if pp <= timestamp)
|
|
|
|
def _offset_before(
|
|
self,
|
|
timestamp: float,
|
|
pauses: list[tuple[float, float]]
|
|
) -> float:
|
|
"""
|
|
Calculate cumulative offset for timestamps STRICTLY BEFORE pause points.
|
|
|
|
Used for cue segments ending at a pause point (the end is before freeze).
|
|
"""
|
|
return sum(eo for pp, eo in pauses if pp < timestamp)
|
|
|
|
def _retime_cue(
|
|
self,
|
|
cue: dict,
|
|
pauses: list[tuple[float, float]]
|
|
) -> list[dict]:
|
|
"""
|
|
Retime a single cue, potentially splitting it across pause points.
|
|
|
|
If a pause point falls within the cue's timespan, the cue is split into
|
|
multiple segments: one ending before the freeze, and one starting after.
|
|
This ensures captions don't display during freeze periods.
|
|
|
|
Returns:
|
|
List of 1 or more cue segments.
|
|
"""
|
|
cue_start = cue["start_time"]
|
|
cue_end = cue["end_time"]
|
|
cue_text = cue["text"]
|
|
|
|
# Find pauses that fall STRICTLY within this cue (between start and end)
|
|
pauses_in_cue = [
|
|
(pp, eo) for pp, eo in pauses
|
|
if cue_start < pp < cue_end
|
|
]
|
|
|
|
if not pauses_in_cue:
|
|
# Simple case: no splitting needed
|
|
return [self._retime_simple_cue(cue, pauses)]
|
|
|
|
# Complex case: split at each pause point
|
|
logger.debug(
|
|
f"Splitting cue [{cue_start:.2f}s-{cue_end:.2f}s] at {len(pauses_in_cue)} pause point(s)"
|
|
)
|
|
|
|
segments = []
|
|
segment_start = cue_start
|
|
|
|
for pause_point, _ in pauses_in_cue:
|
|
# Create segment BEFORE this pause
|
|
segment_end = pause_point
|
|
|
|
new_start = segment_start + self._offset_at(segment_start, pauses)
|
|
# End time uses offset_before because the segment ends exactly at
|
|
# the pause point, BEFORE the freeze starts
|
|
new_end = segment_end + self._offset_before(segment_end, pauses)
|
|
|
|
segments.append({
|
|
"start_time": new_start,
|
|
"end_time": new_end,
|
|
"text": cue_text
|
|
})
|
|
|
|
logger.debug(
|
|
f" Split segment before pause at {pause_point:.2f}s: "
|
|
f"[{new_start:.2f}s-{new_end:.2f}s]"
|
|
)
|
|
|
|
# Next segment starts at the pause point (maps to after freeze)
|
|
segment_start = pause_point
|
|
|
|
# Final segment: from last pause point to cue end
|
|
new_start = segment_start + self._offset_at(segment_start, pauses)
|
|
new_end = cue_end + self._offset_at(cue_end, pauses)
|
|
|
|
segments.append({
|
|
"start_time": new_start,
|
|
"end_time": new_end,
|
|
"text": cue_text
|
|
})
|
|
|
|
logger.debug(
|
|
f" Final segment after pauses: [{new_start:.2f}s-{new_end:.2f}s]"
|
|
)
|
|
|
|
return segments
|
|
|
|
def _retime_simple_cue(
|
|
self,
|
|
cue: dict,
|
|
pauses: list[tuple[float, float]]
|
|
) -> dict:
|
|
"""
|
|
Retime a cue that doesn't span any pause points.
|
|
|
|
Start time uses offset_at (includes pause if pause <= start).
|
|
End time uses offset_before (excludes pause at exactly end time).
|
|
|
|
This ensures that a cue ending exactly at a pause point ends
|
|
right when the freeze starts, not after it.
|
|
"""
|
|
cue_start = cue["start_time"]
|
|
cue_end = cue["end_time"]
|
|
|
|
# Start: if pause_point <= start, include the offset
|
|
start_offset = self._offset_at(cue_start, pauses)
|
|
# End: if pause_point < end, include the offset (exclude pause AT end)
|
|
end_offset = self._offset_before(cue_end, pauses)
|
|
|
|
return {
|
|
"start_time": cue_start + start_offset,
|
|
"end_time": cue_end + end_offset,
|
|
"text": cue["text"]
|
|
}
|
|
|
|
def _filter_short_segments(
|
|
self,
|
|
cues: list[dict]
|
|
) -> list[dict]:
|
|
"""Filter out segments that are too short to display meaningfully."""
|
|
filtered = []
|
|
|
|
for cue in cues:
|
|
duration = cue["end_time"] - cue["start_time"]
|
|
|
|
if duration >= self.MIN_SEGMENT_DURATION:
|
|
filtered.append(cue)
|
|
else:
|
|
logger.debug(
|
|
f"Filtered out short segment: [{cue['start_time']:.2f}s-"
|
|
f"{cue['end_time']:.2f}s] (duration={duration:.3f}s)"
|
|
)
|
|
|
|
return filtered
|
|
|
|
def retime_ad_vtt_for_pause_insert(
|
|
self,
|
|
original_ad_vtt: str,
|
|
analysis: dict[str, Any]
|
|
) -> str:
|
|
"""
|
|
Re-time the audio description VTT for pause-insert accessible video.
|
|
|
|
For AD cues, we use the target_start_time from the analysis
|
|
since they are placed at specific points during pauses.
|
|
|
|
Args:
|
|
original_ad_vtt: Original AD VTT content
|
|
analysis: Gemini analysis with placements
|
|
|
|
Returns:
|
|
Re-timed AD VTT content for accessible video
|
|
"""
|
|
placements = analysis.get("placements", [])
|
|
|
|
# Parse original AD VTT
|
|
cues = self._parse_vtt(original_ad_vtt)
|
|
|
|
if len(cues) != len(placements):
|
|
logger.warning(
|
|
f"AD cue count ({len(cues)}) doesn't match placements ({len(placements)})"
|
|
)
|
|
|
|
retimed_cues = []
|
|
for placement in placements:
|
|
cue_index = placement.get("ad_cue_index", 0)
|
|
target_start = placement.get("target_start_time", 0)
|
|
ad_duration = placement.get("ad_duration", 0)
|
|
|
|
# Get original text from matching cue
|
|
if cue_index < len(cues):
|
|
text = cues[cue_index]["text"]
|
|
else:
|
|
text = f"[Audio description cue {cue_index}]"
|
|
|
|
retimed_cues.append({
|
|
"start_time": target_start,
|
|
"end_time": target_start + ad_duration,
|
|
"text": text
|
|
})
|
|
|
|
return self._build_vtt(retimed_cues)
|
|
|
|
def _parse_vtt(self, vtt_content: str) -> list[dict]:
|
|
"""Parse VTT content into a list of cue dictionaries."""
|
|
lines = vtt_content.strip().split('\n')
|
|
cues = []
|
|
|
|
i = 0
|
|
while i < len(lines):
|
|
line = lines[i].strip()
|
|
|
|
# Skip header and empty lines
|
|
if line == "WEBVTT" or line == "" or line.startswith("NOTE"):
|
|
i += 1
|
|
continue
|
|
|
|
# Check for timing line
|
|
if " --> " in line:
|
|
timing_parts = line.split(" --> ")
|
|
start_time = self._parse_timestamp(timing_parts[0].strip())
|
|
# Handle potential settings after end time
|
|
end_part = timing_parts[1].strip()
|
|
if " " in end_part:
|
|
end_part = end_part.split(" ")[0]
|
|
end_time = self._parse_timestamp(end_part)
|
|
|
|
# Get text from next line(s)
|
|
i += 1
|
|
text_lines = []
|
|
while i < len(lines) and lines[i].strip() != "":
|
|
text_lines.append(lines[i].strip())
|
|
i += 1
|
|
|
|
if text_lines:
|
|
cues.append({
|
|
"start_time": start_time,
|
|
"end_time": end_time,
|
|
"text": "\n".join(text_lines)
|
|
})
|
|
else:
|
|
i += 1
|
|
|
|
return cues
|
|
|
|
def _parse_timestamp(self, timestamp: str) -> float:
|
|
"""Convert VTT timestamp to seconds."""
|
|
# Format: HH:MM:SS.mmm or MM:SS.mmm
|
|
parts = timestamp.split(":")
|
|
|
|
if len(parts) == 3: # HH:MM:SS.mmm
|
|
hours, minutes, seconds = parts
|
|
elif len(parts) == 2: # MM:SS.mmm
|
|
hours, minutes, seconds = "0", parts[0], parts[1]
|
|
else:
|
|
raise ValueError(f"Invalid timestamp format: {timestamp}")
|
|
|
|
# Parse seconds and milliseconds
|
|
sec_parts = seconds.split(".")
|
|
seconds_int = int(sec_parts[0])
|
|
milliseconds = int(sec_parts[1]) if len(sec_parts) > 1 else 0
|
|
|
|
total_seconds = (
|
|
int(hours) * 3600 +
|
|
int(minutes) * 60 +
|
|
seconds_int +
|
|
milliseconds / 1000.0
|
|
)
|
|
|
|
return total_seconds
|
|
|
|
def _format_timestamp(self, seconds: float) -> str:
|
|
"""Convert seconds to VTT timestamp format (HH:MM:SS.mmm)."""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
secs = seconds % 60
|
|
whole_secs = int(secs)
|
|
millis = int((secs - whole_secs) * 1000)
|
|
|
|
return f"{hours:02d}:{minutes:02d}:{whole_secs:02d}.{millis:03d}"
|
|
|
|
def _build_vtt(self, cues: list[dict]) -> str:
|
|
"""Build VTT content from list of cue dictionaries."""
|
|
lines = ["WEBVTT", ""]
|
|
|
|
for cue in cues:
|
|
start_ts = self._format_timestamp(cue["start_time"])
|
|
end_ts = self._format_timestamp(cue["end_time"])
|
|
lines.append(f"{start_ts} --> {end_ts}")
|
|
lines.append(cue["text"])
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# Global service instance
|
|
vtt_retimer_service = VTTRetimerService()
|