"""Service for re-timing VTT files when pauses are inserted into video.""" from typing import Any from ..core.logging import get_logger logger = get_logger(__name__) class VTTRetimerService: """Service for re-timing VTT subtitle files after pause insertions.""" # Minimum segment duration to keep after splitting (100ms) MIN_SEGMENT_DURATION = 0.1 def retime_for_pause_insert( self, original_vtt: str, analysis: dict[str, Any] ) -> str: """ Generate new VTT with adjusted timings for pause-insert accessible video. Timeline mapping for pause-insert: - Source [0, pause_point) -> Accessible [0, pause_point) - Freeze: Accessible [pause_point, pause_point + freeze_duration) - Source [pause_point, end) -> Accessible [pause_point + freeze_duration, ...] Where freeze_duration = ad_duration + 1.0s (500ms silence buffers before and after) Cues that span pause points are split to ensure captions don't display during freeze periods (when AD audio is playing). Args: original_vtt: Original VTT content analysis: Gemini analysis with placements containing pause_point and ad_duration Returns: Re-timed VTT content """ placements = analysis.get("placements", []) # Build sorted list of (pause_point, effective_offset) pauses = self._build_pause_list(placements) if not pauses: logger.info("No pauses to apply, returning original VTT") return original_vtt logger.info(f"Re-timing VTT with {len(pauses)} pause insertions") # Parse and retime cues cues = self._parse_vtt(original_vtt) retimed_cues = [] for cue in cues: segments = self._retime_cue(cue, pauses) retimed_cues.extend(segments) # Filter out very short segments retimed_cues = self._filter_short_segments(retimed_cues) return self._build_vtt(retimed_cues) def _build_pause_list( self, placements: list[dict] ) -> list[tuple[float, float]]: """Build sorted list of (pause_point, effective_offset) tuples. Uses actual_freeze_duration when available (measured from rendered video), otherwise falls back to calculated value (ad_duration + 1.0s). """ silence_buffer_total = 1.0 # 500ms + 500ms (fallback calculation) pauses = [] for placement in placements: pause_point = placement.get("pause_point") ad_duration = placement.get("ad_duration", 0) if pause_point is not None and ad_duration > 0: # Prefer actual freeze duration if available (measured from rendered video) actual_freeze = placement.get("actual_freeze_duration") if actual_freeze is not None: effective_offset = actual_freeze logger.debug( f"Pause at {pause_point:.2f}s: using actual_freeze_duration={effective_offset:.2f}s " f"(ad_duration={ad_duration:.2f}s)" ) else: effective_offset = ad_duration + silence_buffer_total logger.debug( f"Pause at {pause_point:.2f}s: using calculated freeze_duration={effective_offset:.2f}s " f"(ad_duration={ad_duration:.2f}s + 1.0s buffer)" ) pauses.append((pause_point, effective_offset)) return sorted(pauses, key=lambda x: x[0]) def _offset_at( self, timestamp: float, pauses: list[tuple[float, float]] ) -> float: """ Calculate cumulative offset for timestamps AT or AFTER pause points. A pause at time T affects all timestamps >= T. This is used for cue segments that start after a pause (the segment starts after the freeze). """ return sum(eo for pp, eo in pauses if pp <= timestamp) def _offset_before( self, timestamp: float, pauses: list[tuple[float, float]] ) -> float: """ Calculate cumulative offset for timestamps STRICTLY BEFORE pause points. Used for cue segments ending at a pause point (the end is before freeze). """ return sum(eo for pp, eo in pauses if pp < timestamp) def _retime_cue( self, cue: dict, pauses: list[tuple[float, float]] ) -> list[dict]: """ Retime a single cue, potentially splitting it across pause points. If a pause point falls within the cue's timespan, the cue is split into multiple segments: one ending before the freeze, and one starting after. This ensures captions don't display during freeze periods. Returns: List of 1 or more cue segments. """ cue_start = cue["start_time"] cue_end = cue["end_time"] cue_text = cue["text"] # Find pauses that fall STRICTLY within this cue (between start and end) pauses_in_cue = [ (pp, eo) for pp, eo in pauses if cue_start < pp < cue_end ] if not pauses_in_cue: # Simple case: no splitting needed return [self._retime_simple_cue(cue, pauses)] # Complex case: split at each pause point logger.debug( f"Splitting cue [{cue_start:.2f}s-{cue_end:.2f}s] at {len(pauses_in_cue)} pause point(s)" ) segments = [] segment_start = cue_start for pause_point, _ in pauses_in_cue: # Create segment BEFORE this pause segment_end = pause_point new_start = segment_start + self._offset_at(segment_start, pauses) # End time uses offset_before because the segment ends exactly at # the pause point, BEFORE the freeze starts new_end = segment_end + self._offset_before(segment_end, pauses) segments.append({ "start_time": new_start, "end_time": new_end, "text": cue_text }) logger.debug( f" Split segment before pause at {pause_point:.2f}s: " f"[{new_start:.2f}s-{new_end:.2f}s]" ) # Next segment starts at the pause point (maps to after freeze) segment_start = pause_point # Final segment: from last pause point to cue end new_start = segment_start + self._offset_at(segment_start, pauses) new_end = cue_end + self._offset_at(cue_end, pauses) segments.append({ "start_time": new_start, "end_time": new_end, "text": cue_text }) logger.debug( f" Final segment after pauses: [{new_start:.2f}s-{new_end:.2f}s]" ) return segments def _retime_simple_cue( self, cue: dict, pauses: list[tuple[float, float]] ) -> dict: """ Retime a cue that doesn't span any pause points. Start time uses offset_at (includes pause if pause <= start). End time uses offset_before (excludes pause at exactly end time). This ensures that a cue ending exactly at a pause point ends right when the freeze starts, not after it. """ cue_start = cue["start_time"] cue_end = cue["end_time"] # Start: if pause_point <= start, include the offset start_offset = self._offset_at(cue_start, pauses) # End: if pause_point < end, include the offset (exclude pause AT end) end_offset = self._offset_before(cue_end, pauses) return { "start_time": cue_start + start_offset, "end_time": cue_end + end_offset, "text": cue["text"] } def _filter_short_segments( self, cues: list[dict] ) -> list[dict]: """Filter out segments that are too short to display meaningfully.""" filtered = [] for cue in cues: duration = cue["end_time"] - cue["start_time"] if duration >= self.MIN_SEGMENT_DURATION: filtered.append(cue) else: logger.debug( f"Filtered out short segment: [{cue['start_time']:.2f}s-" f"{cue['end_time']:.2f}s] (duration={duration:.3f}s)" ) return filtered def retime_ad_vtt_for_pause_insert( self, original_ad_vtt: str, analysis: dict[str, Any] ) -> str: """ Re-time the audio description VTT for pause-insert accessible video. For AD cues, we use the target_start_time from the analysis since they are placed at specific points during pauses. Args: original_ad_vtt: Original AD VTT content analysis: Gemini analysis with placements Returns: Re-timed AD VTT content for accessible video """ placements = analysis.get("placements", []) # Parse original AD VTT cues = self._parse_vtt(original_ad_vtt) if len(cues) != len(placements): logger.warning( f"AD cue count ({len(cues)}) doesn't match placements ({len(placements)})" ) retimed_cues = [] for placement in placements: cue_index = placement.get("ad_cue_index", 0) target_start = placement.get("target_start_time", 0) ad_duration = placement.get("ad_duration", 0) # Get original text from matching cue if cue_index < len(cues): text = cues[cue_index]["text"] else: text = f"[Audio description cue {cue_index}]" retimed_cues.append({ "start_time": target_start, "end_time": target_start + ad_duration, "text": text }) return self._build_vtt(retimed_cues) def _parse_vtt(self, vtt_content: str) -> list[dict]: """Parse VTT content into a list of cue dictionaries.""" lines = vtt_content.strip().split('\n') cues = [] i = 0 while i < len(lines): line = lines[i].strip() # Skip header and empty lines if line == "WEBVTT" or line == "" or line.startswith("NOTE"): i += 1 continue # Check for timing line if " --> " in line: timing_parts = line.split(" --> ") start_time = self._parse_timestamp(timing_parts[0].strip()) # Handle potential settings after end time end_part = timing_parts[1].strip() if " " in end_part: end_part = end_part.split(" ")[0] end_time = self._parse_timestamp(end_part) # Get text from next line(s) i += 1 text_lines = [] while i < len(lines) and lines[i].strip() != "": text_lines.append(lines[i].strip()) i += 1 if text_lines: cues.append({ "start_time": start_time, "end_time": end_time, "text": "\n".join(text_lines) }) else: i += 1 return cues def _parse_timestamp(self, timestamp: str) -> float: """Convert VTT timestamp to seconds.""" # Format: HH:MM:SS.mmm or MM:SS.mmm parts = timestamp.split(":") if len(parts) == 3: # HH:MM:SS.mmm hours, minutes, seconds = parts elif len(parts) == 2: # MM:SS.mmm hours, minutes, seconds = "0", parts[0], parts[1] else: raise ValueError(f"Invalid timestamp format: {timestamp}") # Parse seconds and milliseconds sec_parts = seconds.split(".") seconds_int = int(sec_parts[0]) milliseconds = int(sec_parts[1]) if len(sec_parts) > 1 else 0 total_seconds = ( int(hours) * 3600 + int(minutes) * 60 + seconds_int + milliseconds / 1000.0 ) return total_seconds def _format_timestamp(self, seconds: float) -> str: """Convert seconds to VTT timestamp format (HH:MM:SS.mmm).""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = seconds % 60 whole_secs = int(secs) millis = int((secs - whole_secs) * 1000) return f"{hours:02d}:{minutes:02d}:{whole_secs:02d}.{millis:03d}" def _build_vtt(self, cues: list[dict]) -> str: """Build VTT content from list of cue dictionaries.""" lines = ["WEBVTT", ""] for cue in cues: start_ts = self._format_timestamp(cue["start_time"]) end_ts = self._format_timestamp(cue["end_time"]) lines.append(f"{start_ts} --> {end_ts}") lines.append(cue["text"]) lines.append("") return "\n".join(lines) # Global service instance vtt_retimer_service = VTTRetimerService()