video-accessibility/backend/app/lib/vtt.py
Vadym Samoilenko 76bee82119 fix(pipeline): fix 5 QA tickets — caption alignment, glossary, source_has_ad render, filler words, NL error surfacing
- caption_aligner: lower match ratio 0.5→0.35, widen search window 60→150, add time-based cursor fallback on miss
- gemini.py: explicit 'MUST use glossary terms' requirement in translate_vtt prompt; source_has_ad prompt now instructs not to include AD narration in captions
- ingest_and_ai: load glossary for source language and pass to extract_accessibility
- render_accessible_video: handle source_has_ad=True via caption-embed path (ffmpeg subtitle inject, no AD pipeline)
- translate_and_synthesize: track failed languages, write translation_errors to DB, add exc_info to error log
- vtt.py: expand _FILLER_PATTERNS to nl/pt/pl/uk/ru, widen EN/ES/FR/DE/IT lists
- gemini_ingestion.md: strengthen line:0% placement rule, expand disfluency examples per language

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 18:36:59 +01:00

318 lines
12 KiB
Python

import re
from dataclasses import dataclass
@dataclass
class VTTCue:
start_time: float # seconds
end_time: float # seconds
text: str
identifier: str | None = None
settings: str = ""
class VTTParser:
"""Parser and builder for WebVTT files"""
@staticmethod
def parse(vtt_content: str) -> list[VTTCue]:
"""Parse VTT content into a list of cues"""
lines = vtt_content.strip().split('\n')
cues = []
i = 0
while i < len(lines):
line = lines[i].strip()
# Skip WEBVTT header, empty lines, and NOTE lines
if line == "WEBVTT" or line == "" or line.startswith("NOTE"):
i += 1
continue
# Check if this line is a cue identifier (optional)
identifier = None
if " --> " not in line and i + 1 < len(lines) and " --> " in lines[i + 1]:
identifier = line
i += 1
line = lines[i].strip()
# Parse timing line
if " --> " in line:
timing_match = re.match(r'([\d:.,]+)\s+-->\s+([\d:.,]+)\s*(.*)', line)
if timing_match:
start_time = VTTParser._parse_timestamp(timing_match.group(1))
end_time = VTTParser._parse_timestamp(timing_match.group(2))
settings = timing_match.group(3).strip()
# Collect text lines until empty line or next cue
i += 1
text_lines = []
while i < len(lines) and lines[i].strip() != "":
text_lines.append(lines[i].strip())
i += 1
cues.append(VTTCue(
start_time=start_time,
end_time=end_time,
text="\n".join(text_lines),
identifier=identifier,
settings=settings,
))
else:
i += 1
return cues
@staticmethod
def build(cues: list[VTTCue]) -> str:
"""Build VTT content from a list of cues"""
lines = ["WEBVTT", ""]
for cue in cues:
# Add identifier if present
if cue.identifier:
lines.append(cue.identifier)
# Add timing line (preserve cue settings like line:0%)
start_timestamp = VTTParser._format_timestamp(cue.start_time)
end_timestamp = VTTParser._format_timestamp(cue.end_time)
timing_line = f"{start_timestamp} --> {end_timestamp}"
if cue.settings:
timing_line += f" {cue.settings}"
lines.append(timing_line)
# Add text (can be multi-line)
lines.append(cue.text)
lines.append("") # Empty line between cues
return "\n".join(lines) + "\n"
@staticmethod
def _parse_timestamp(timestamp: str) -> float:
"""Convert VTT timestamp (HH:MM:SS.mmm or MM:SS.mmm) to seconds"""
# Clean up timestamp (handle both . and , as decimal separator)
timestamp = timestamp.replace(',', '.')
# Split by colon
parts = timestamp.split(':')
if len(parts) == 3: # HH:MM:SS.mmm
hours, minutes, seconds = parts
elif len(parts) == 2: # MM:SS.mmm
hours, minutes, seconds = "0", parts[0], parts[1]
else:
raise ValueError(f"Invalid timestamp format: {timestamp}")
# Parse seconds and decimal part
sec_parts = seconds.split('.')
whole_seconds = int(sec_parts[0])
decimal_part = int(sec_parts[1]) if len(sec_parts) > 1 else 0
# Convert to total seconds
total_seconds = (
int(hours) * 3600 +
int(minutes) * 60 +
whole_seconds +
decimal_part / 1000.0
)
return total_seconds
@staticmethod
def _format_timestamp(seconds: float) -> str:
"""Convert seconds to VTT timestamp format (HH:MM:SS.mmm)"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
whole_secs = int(secs)
milliseconds = round((secs - whole_secs) * 1000)
return f"{hours:02d}:{minutes:02d}:{whole_secs:02d}.{milliseconds:03d}"
class VTTEditor:
"""Utility class for editing VTT content while preserving timing"""
@staticmethod
def translate_preserving_timing(
vtt_content: str,
translated_texts: list[str]
) -> str:
"""Replace text in VTT cues while preserving all timing information"""
cues = VTTParser.parse(vtt_content)
if len(translated_texts) != len(cues):
raise ValueError(
f"Text count mismatch: {len(translated_texts)} texts for {len(cues)} cues"
)
# Update cue texts
for i, translated_text in enumerate(translated_texts):
cues[i].text = translated_text
return VTTParser.build(cues)
@staticmethod
def assert_cue_alignment(en_vtt: str, target_vtt: str, lang: str) -> None:
"""Raise ValueError if target VTT cue count or timestamps diverge from EN master."""
en_cues = VTTParser.parse(en_vtt)
tgt_cues = VTTParser.parse(target_vtt)
if len(tgt_cues) != len(en_cues):
raise ValueError(
f"Cue count mismatch for {lang}: EN has {len(en_cues)}, target has {len(tgt_cues)}"
)
for i, (en, tgt) in enumerate(zip(en_cues, tgt_cues, strict=True)):
if en.start_time != tgt.start_time or en.end_time != tgt.end_time:
raise ValueError(
f"Timestamp mismatch for {lang} cue {i}: "
f"EN {en.start_time}-->{en.end_time}, target {tgt.start_time}-->{tgt.end_time}"
)
@staticmethod
def update_cue_text(vtt_content: str, cue_index: int, new_text: str) -> str:
"""Update text for a specific cue by index"""
cues = VTTParser.parse(vtt_content)
if cue_index < 0 or cue_index >= len(cues):
raise ValueError(f"Invalid cue index: {cue_index}")
cues[cue_index].text = new_text
return VTTParser.build(cues)
@staticmethod
def validate_vtt(vtt_content: str) -> tuple[bool, list[str]]:
"""Validate VTT content and return errors if any"""
errors = []
if not vtt_content.strip().startswith("WEBVTT"):
errors.append("VTT must start with 'WEBVTT'")
try:
cues = VTTParser.parse(vtt_content)
# Check timing consistency
for i, cue in enumerate(cues):
if cue.start_time >= cue.end_time:
errors.append(f"Cue {i + 1}: Start time must be before end time")
if i > 0 and cue.start_time < cues[i - 1].end_time:
errors.append(f"Cue {i + 1}: Overlapping with previous cue")
if not cue.text.strip():
errors.append(f"Cue {i + 1}: Empty text content")
except Exception as e:
errors.append(f"Parse error: {str(e)}")
return len(errors) == 0, errors
@staticmethod
def fix_overlapping_cues(vtt_content: str) -> str:
"""Trim end_time of each cue so it does not overlap the next cue's start_time."""
cues = VTTParser.parse(vtt_content)
for i in range(1, len(cues)):
if cues[i].start_time < cues[i - 1].end_time:
# Clamp previous cue end to 1ms before next cue start
new_end = cues[i].start_time - 0.001
# Never let end_time go at or below start_time
if new_end <= cues[i - 1].start_time:
new_end = cues[i - 1].start_time + 0.001
cues[i - 1].end_time = new_end
return VTTParser.build(cues)
@staticmethod
def get_cue_count(vtt_content: str) -> int:
"""Get the number of cues in VTT content"""
try:
cues = VTTParser.parse(vtt_content)
return len(cues)
except Exception:
return 0
@staticmethod
def get_total_duration(vtt_content: str) -> float:
"""Get total duration of VTT content in seconds"""
try:
cues = VTTParser.parse(vtt_content)
if not cues:
return 0.0
return max(cue.end_time for cue in cues)
except Exception:
return 0.0
@staticmethod
def validate_translation_timing(source_vtt: str, translated_vtt: str) -> tuple[bool, list[str]]:
"""Verify that translated VTT has identical timestamps to the source VTT"""
errors = []
try:
source_cues = VTTParser.parse(source_vtt)
translated_cues = VTTParser.parse(translated_vtt)
if len(source_cues) != len(translated_cues):
errors.append(
f"Cue count mismatch: source has {len(source_cues)}, "
f"translation has {len(translated_cues)}"
)
return False, errors
for i, (src, tgt) in enumerate(zip(source_cues, translated_cues, strict=False)):
if abs(src.start_time - tgt.start_time) > 0.001:
errors.append(
f"Cue {i + 1}: start time changed "
f"({src.start_time:.3f}s -> {tgt.start_time:.3f}s)"
)
if abs(src.end_time - tgt.end_time) > 0.001:
errors.append(
f"Cue {i + 1}: end time changed "
f"({src.end_time:.3f}s -> {tgt.end_time:.3f}s)"
)
except Exception as e:
errors.append(f"Validation error: {str(e)}")
return len(errors) == 0, errors
@staticmethod
def adjust_timing_offset(vtt_content: str, offset_seconds: float) -> str:
"""
Adjust all VTT cue timings by a fixed offset
Positive offset moves captions later, negative moves them earlier
"""
cues = VTTParser.parse(vtt_content)
for cue in cues:
cue.start_time = max(0.0, cue.start_time + offset_seconds)
cue.end_time = max(cue.start_time + 0.5, cue.end_time + offset_seconds)
return VTTParser.build(cues)
# DCMP §6.01 filler patterns per language (whole-word, case-insensitive)
_FILLER_PATTERNS: dict[str, str] = {
"en": r'\b(um+|uh+|ah+|er+|hmm+|you know|i mean|sort of|kind of|basically|literally|honestly|actually|right\?|so yeah)\b',
"es": r'\b(eh+|este|o sea|pues|bueno|o sea que|mmm+)\b',
"fr": r'\b(euh+|beh|ben|donc|quoi|enfin|voilà|genre)\b',
"de": r'\b(äh+|ähm+|halt|ne|also|naja|sozusagen|quasi)\b',
"it": r'\b(ehm+|allora|cioè|tipo|praticamente|insomma|ecco)\b',
"nl": r'\b(eh+|nou|zeg|eigenlijk|gewoon|toch|zo van|hè)\b',
"pt": r'\b(ahn+|hã+|né|sabe|tipo|então|assim)\b',
"pl": r'\b(no|że|bo|znaczy|właśnie|jakby|wiesz)\b',
"uk": r'\b(ну+|ем+|типу|знаєш|значить|власне|от)\b',
"ru": r'\b(ну+|эм+|типа|знаешь|значит|вот|собственно)\b',
}
@staticmethod
def clean_disfluencies(vtt_content: str, lang: str) -> str:
"""Remove filler words and hesitations per DCMP §6.01 for supported languages."""
pattern = VTTEditor._FILLER_PATTERNS.get(lang.split("-")[0].lower())
if not pattern:
return vtt_content
cues = VTTParser.parse(vtt_content)
compiled = re.compile(pattern, re.IGNORECASE)
for cue in cues:
cleaned = compiled.sub("", cue.text)
# Collapse multiple spaces and strip leading/trailing punctuation artifacts
cleaned = re.sub(r'[ \t]{2,}', ' ', cleaned).strip().strip(',').strip()
if cleaned:
cue.text = cleaned
return VTTParser.build(cues)