video-accessibility/backend/app/services/vtt_retimer.py
michael add958008a fix: use actual freeze segment durations for VTT subtitle retiming
Subtitles were appearing progressively out of sync (~1.0s early per AD)
because the VTT retimer calculated freeze durations theoretically
rather than using actual rendered segment durations.

Changes:
- video_renderer: Measure actual freeze segment duration after creation
- video_renderer: Return updated placements with actual_freeze_duration
- vtt_retimer: Prefer actual_freeze_duration over calculated values
- render_task: Pass actual durations to VTT retimer

This ensures subtitle timing matches the real video timeline regardless
of any FFmpeg encoding variations.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 15:52:57 -06:00

391 lines
13 KiB
Python

"""Service for re-timing VTT files when pauses are inserted into video."""
from typing import Any
from ..core.logging import get_logger
logger = get_logger(__name__)
class VTTRetimerService:
"""Service for re-timing VTT subtitle files after pause insertions."""
# Minimum segment duration to keep after splitting (100ms)
MIN_SEGMENT_DURATION = 0.1
def retime_for_pause_insert(
self,
original_vtt: str,
analysis: dict[str, Any]
) -> str:
"""
Generate new VTT with adjusted timings for pause-insert accessible video.
Timeline mapping for pause-insert:
- Source [0, pause_point) -> Accessible [0, pause_point)
- Freeze: Accessible [pause_point, pause_point + freeze_duration)
- Source [pause_point, end) -> Accessible [pause_point + freeze_duration, ...]
Where freeze_duration = ad_duration + 1.0s (500ms silence buffers before and after)
Cues that span pause points are split to ensure captions don't display during
freeze periods (when AD audio is playing).
Args:
original_vtt: Original VTT content
analysis: Gemini analysis with placements containing pause_point and ad_duration
Returns:
Re-timed VTT content
"""
placements = analysis.get("placements", [])
# Build sorted list of (pause_point, effective_offset)
pauses = self._build_pause_list(placements)
if not pauses:
logger.info("No pauses to apply, returning original VTT")
return original_vtt
logger.info(f"Re-timing VTT with {len(pauses)} pause insertions")
# Parse and retime cues
cues = self._parse_vtt(original_vtt)
retimed_cues = []
for cue in cues:
segments = self._retime_cue(cue, pauses)
retimed_cues.extend(segments)
# Filter out very short segments
retimed_cues = self._filter_short_segments(retimed_cues)
return self._build_vtt(retimed_cues)
def _build_pause_list(
self,
placements: list[dict]
) -> list[tuple[float, float]]:
"""Build sorted list of (pause_point, effective_offset) tuples.
Uses actual_freeze_duration when available (measured from rendered video),
otherwise falls back to calculated value (ad_duration + 1.0s).
"""
silence_buffer_total = 1.0 # 500ms + 500ms (fallback calculation)
pauses = []
for placement in placements:
pause_point = placement.get("pause_point")
ad_duration = placement.get("ad_duration", 0)
if pause_point is not None and ad_duration > 0:
# Prefer actual freeze duration if available (measured from rendered video)
actual_freeze = placement.get("actual_freeze_duration")
if actual_freeze is not None:
effective_offset = actual_freeze
logger.debug(
f"Pause at {pause_point:.2f}s: using actual_freeze_duration={effective_offset:.2f}s "
f"(ad_duration={ad_duration:.2f}s)"
)
else:
effective_offset = ad_duration + silence_buffer_total
logger.debug(
f"Pause at {pause_point:.2f}s: using calculated freeze_duration={effective_offset:.2f}s "
f"(ad_duration={ad_duration:.2f}s + 1.0s buffer)"
)
pauses.append((pause_point, effective_offset))
return sorted(pauses, key=lambda x: x[0])
def _offset_at(
self,
timestamp: float,
pauses: list[tuple[float, float]]
) -> float:
"""
Calculate cumulative offset for timestamps AT or AFTER pause points.
A pause at time T affects all timestamps >= T. This is used for cue
segments that start after a pause (the segment starts after the freeze).
"""
return sum(eo for pp, eo in pauses if pp <= timestamp)
def _offset_before(
self,
timestamp: float,
pauses: list[tuple[float, float]]
) -> float:
"""
Calculate cumulative offset for timestamps STRICTLY BEFORE pause points.
Used for cue segments ending at a pause point (the end is before freeze).
"""
return sum(eo for pp, eo in pauses if pp < timestamp)
def _retime_cue(
self,
cue: dict,
pauses: list[tuple[float, float]]
) -> list[dict]:
"""
Retime a single cue, potentially splitting it across pause points.
If a pause point falls within the cue's timespan, the cue is split into
multiple segments: one ending before the freeze, and one starting after.
This ensures captions don't display during freeze periods.
Returns:
List of 1 or more cue segments.
"""
cue_start = cue["start_time"]
cue_end = cue["end_time"]
cue_text = cue["text"]
# Find pauses that fall STRICTLY within this cue (between start and end)
pauses_in_cue = [
(pp, eo) for pp, eo in pauses
if cue_start < pp < cue_end
]
if not pauses_in_cue:
# Simple case: no splitting needed
return [self._retime_simple_cue(cue, pauses)]
# Complex case: split at each pause point
logger.debug(
f"Splitting cue [{cue_start:.2f}s-{cue_end:.2f}s] at {len(pauses_in_cue)} pause point(s)"
)
segments = []
segment_start = cue_start
for pause_point, _ in pauses_in_cue:
# Create segment BEFORE this pause
segment_end = pause_point
new_start = segment_start + self._offset_at(segment_start, pauses)
# End time uses offset_before because the segment ends exactly at
# the pause point, BEFORE the freeze starts
new_end = segment_end + self._offset_before(segment_end, pauses)
segments.append({
"start_time": new_start,
"end_time": new_end,
"text": cue_text
})
logger.debug(
f" Split segment before pause at {pause_point:.2f}s: "
f"[{new_start:.2f}s-{new_end:.2f}s]"
)
# Next segment starts at the pause point (maps to after freeze)
segment_start = pause_point
# Final segment: from last pause point to cue end
new_start = segment_start + self._offset_at(segment_start, pauses)
new_end = cue_end + self._offset_at(cue_end, pauses)
segments.append({
"start_time": new_start,
"end_time": new_end,
"text": cue_text
})
logger.debug(
f" Final segment after pauses: [{new_start:.2f}s-{new_end:.2f}s]"
)
return segments
def _retime_simple_cue(
self,
cue: dict,
pauses: list[tuple[float, float]]
) -> dict:
"""
Retime a cue that doesn't span any pause points.
Start time uses offset_at (includes pause if pause <= start).
End time uses offset_before (excludes pause at exactly end time).
This ensures that a cue ending exactly at a pause point ends
right when the freeze starts, not after it.
"""
cue_start = cue["start_time"]
cue_end = cue["end_time"]
# Start: if pause_point <= start, include the offset
start_offset = self._offset_at(cue_start, pauses)
# End: if pause_point < end, include the offset (exclude pause AT end)
end_offset = self._offset_before(cue_end, pauses)
return {
"start_time": cue_start + start_offset,
"end_time": cue_end + end_offset,
"text": cue["text"]
}
def _filter_short_segments(
self,
cues: list[dict]
) -> list[dict]:
"""Filter out segments that are too short to display meaningfully."""
filtered = []
for cue in cues:
duration = cue["end_time"] - cue["start_time"]
if duration >= self.MIN_SEGMENT_DURATION:
filtered.append(cue)
else:
logger.debug(
f"Filtered out short segment: [{cue['start_time']:.2f}s-"
f"{cue['end_time']:.2f}s] (duration={duration:.3f}s)"
)
return filtered
def retime_ad_vtt_for_pause_insert(
self,
original_ad_vtt: str,
analysis: dict[str, Any]
) -> str:
"""
Re-time the audio description VTT for pause-insert accessible video.
For AD cues, we use the target_start_time from the analysis
since they are placed at specific points during pauses.
Args:
original_ad_vtt: Original AD VTT content
analysis: Gemini analysis with placements
Returns:
Re-timed AD VTT content for accessible video
"""
placements = analysis.get("placements", [])
# Parse original AD VTT
cues = self._parse_vtt(original_ad_vtt)
if len(cues) != len(placements):
logger.warning(
f"AD cue count ({len(cues)}) doesn't match placements ({len(placements)})"
)
retimed_cues = []
for placement in placements:
cue_index = placement.get("ad_cue_index", 0)
target_start = placement.get("target_start_time", 0)
ad_duration = placement.get("ad_duration", 0)
# Get original text from matching cue
if cue_index < len(cues):
text = cues[cue_index]["text"]
else:
text = f"[Audio description cue {cue_index}]"
retimed_cues.append({
"start_time": target_start,
"end_time": target_start + ad_duration,
"text": text
})
return self._build_vtt(retimed_cues)
def _parse_vtt(self, vtt_content: str) -> list[dict]:
"""Parse VTT content into a list of cue dictionaries."""
lines = vtt_content.strip().split('\n')
cues = []
i = 0
while i < len(lines):
line = lines[i].strip()
# Skip header and empty lines
if line == "WEBVTT" or line == "" or line.startswith("NOTE"):
i += 1
continue
# Check for timing line
if " --> " in line:
timing_parts = line.split(" --> ")
start_time = self._parse_timestamp(timing_parts[0].strip())
# Handle potential settings after end time
end_part = timing_parts[1].strip()
if " " in end_part:
end_part = end_part.split(" ")[0]
end_time = self._parse_timestamp(end_part)
# Get text from next line(s)
i += 1
text_lines = []
while i < len(lines) and lines[i].strip() != "":
text_lines.append(lines[i].strip())
i += 1
if text_lines:
cues.append({
"start_time": start_time,
"end_time": end_time,
"text": "\n".join(text_lines)
})
else:
i += 1
return cues
def _parse_timestamp(self, timestamp: str) -> float:
"""Convert VTT timestamp to seconds."""
# Format: HH:MM:SS.mmm or MM:SS.mmm
parts = timestamp.split(":")
if len(parts) == 3: # HH:MM:SS.mmm
hours, minutes, seconds = parts
elif len(parts) == 2: # MM:SS.mmm
hours, minutes, seconds = "0", parts[0], parts[1]
else:
raise ValueError(f"Invalid timestamp format: {timestamp}")
# Parse seconds and milliseconds
sec_parts = seconds.split(".")
seconds_int = int(sec_parts[0])
milliseconds = int(sec_parts[1]) if len(sec_parts) > 1 else 0
total_seconds = (
int(hours) * 3600 +
int(minutes) * 60 +
seconds_int +
milliseconds / 1000.0
)
return total_seconds
def _format_timestamp(self, seconds: float) -> str:
"""Convert seconds to VTT timestamp format (HH:MM:SS.mmm)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
whole_secs = int(secs)
millis = int((secs - whole_secs) * 1000)
return f"{hours:02d}:{minutes:02d}:{whole_secs:02d}.{millis:03d}"
def _build_vtt(self, cues: list[dict]) -> str:
"""Build VTT content from list of cue dictionaries."""
lines = ["WEBVTT", ""]
for cue in cues:
start_ts = self._format_timestamp(cue["start_time"])
end_ts = self._format_timestamp(cue["end_time"])
lines.append(f"{start_ts} --> {end_ts}")
lines.append(cue["text"])
lines.append("")
return "\n".join(lines)
# Global service instance
vtt_retimer_service = VTTRetimerService()