video-accessibility/backend/app/lib/vtt.py
Vadym Samoilenko 6f963ff7c4 feat: DCMP compliance, descriptive transcript, new languages, QA bug fixes
- Rewrote VTT translation to two-step (text-only → Gemini → apply to original timestamps) preventing caption timing desync
- Added polling fallback for all processing states and Safari visibilitychange WebSocket reconnect
- Added 11 new TTS languages (cs, da, fi, hu, no, sk, sv, es-419, pt-BR, fr-CA)
- Updated caption/AD prompts to DCMP Captioning Key & Description Key standards (line splitting, ♪ music notation, italic tags, caption positioning, ethics guidelines)
- Added descriptive transcript generation (WCAG 2.1 §1.2.1) combining captions + AD into plain text
- Fixed amix normalize=0 to prevent audio loss in rendered videos
- Fixed AD re-timing double-count when source_ms is None
- Fixed cue block numbering to be 1-based in VttEditor and Timeline Preview

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-27 11:50:43 +00:00

253 lines
8.4 KiB
Python

import re
from dataclasses import dataclass
@dataclass
class VTTCue:
start_time: float # seconds
end_time: float # seconds
text: str
identifier: str | None = None
class VTTParser:
"""Parser and builder for WebVTT files"""
@staticmethod
def parse(vtt_content: str) -> list[VTTCue]:
"""Parse VTT content into a list of cues"""
lines = vtt_content.strip().split('\n')
cues = []
i = 0
while i < len(lines):
line = lines[i].strip()
# Skip WEBVTT header, empty lines, and NOTE lines
if line == "WEBVTT" or line == "" or line.startswith("NOTE"):
i += 1
continue
# Check if this line is a cue identifier (optional)
identifier = None
if " --> " not in line and i + 1 < len(lines) and " --> " in lines[i + 1]:
identifier = line
i += 1
line = lines[i].strip()
# Parse timing line
if " --> " in line:
timing_match = re.match(r'([\d:.,]+)\s+-->\s+([\d:.,]+)', line)
if timing_match:
start_time = VTTParser._parse_timestamp(timing_match.group(1))
end_time = VTTParser._parse_timestamp(timing_match.group(2))
# Collect text lines until empty line or next cue
i += 1
text_lines = []
while i < len(lines) and lines[i].strip() != "":
text_lines.append(lines[i].strip())
i += 1
if text_lines:
cues.append(VTTCue(
start_time=start_time,
end_time=end_time,
text="\n".join(text_lines),
identifier=identifier
))
else:
i += 1
return cues
@staticmethod
def build(cues: list[VTTCue]) -> str:
"""Build VTT content from a list of cues"""
lines = ["WEBVTT", ""]
for cue in cues:
# Add identifier if present
if cue.identifier:
lines.append(cue.identifier)
# Add timing line
start_timestamp = VTTParser._format_timestamp(cue.start_time)
end_timestamp = VTTParser._format_timestamp(cue.end_time)
lines.append(f"{start_timestamp} --> {end_timestamp}")
# Add text (can be multi-line)
lines.append(cue.text)
lines.append("") # Empty line between cues
return "\n".join(lines)
@staticmethod
def _parse_timestamp(timestamp: str) -> float:
"""Convert VTT timestamp (HH:MM:SS.mmm or MM:SS.mmm) to seconds"""
# Clean up timestamp (handle both . and , as decimal separator)
timestamp = timestamp.replace(',', '.')
# Split by colon
parts = timestamp.split(':')
if len(parts) == 3: # HH:MM:SS.mmm
hours, minutes, seconds = parts
elif len(parts) == 2: # MM:SS.mmm
hours, minutes, seconds = "0", parts[0], parts[1]
else:
raise ValueError(f"Invalid timestamp format: {timestamp}")
# Parse seconds and decimal part
sec_parts = seconds.split('.')
whole_seconds = int(sec_parts[0])
decimal_part = int(sec_parts[1]) if len(sec_parts) > 1 else 0
# Convert to total seconds
total_seconds = (
int(hours) * 3600 +
int(minutes) * 60 +
whole_seconds +
decimal_part / 1000.0
)
return total_seconds
@staticmethod
def _format_timestamp(seconds: float) -> str:
"""Convert seconds to VTT timestamp format (HH:MM:SS.mmm)"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
whole_secs = int(secs)
milliseconds = int((secs - whole_secs) * 1000)
return f"{hours:02d}:{minutes:02d}:{whole_secs:02d}.{milliseconds:03d}"
class VTTEditor:
"""Utility class for editing VTT content while preserving timing"""
@staticmethod
def translate_preserving_timing(
vtt_content: str,
translated_texts: list[str]
) -> str:
"""Replace text in VTT cues while preserving all timing information"""
cues = VTTParser.parse(vtt_content)
if len(translated_texts) != len(cues):
raise ValueError(
f"Text count mismatch: {len(translated_texts)} texts for {len(cues)} cues"
)
# Update cue texts
for i, translated_text in enumerate(translated_texts):
cues[i].text = translated_text
return VTTParser.build(cues)
@staticmethod
def update_cue_text(vtt_content: str, cue_index: int, new_text: str) -> str:
"""Update text for a specific cue by index"""
cues = VTTParser.parse(vtt_content)
if cue_index < 0 or cue_index >= len(cues):
raise ValueError(f"Invalid cue index: {cue_index}")
cues[cue_index].text = new_text
return VTTParser.build(cues)
@staticmethod
def validate_vtt(vtt_content: str) -> tuple[bool, list[str]]:
"""Validate VTT content and return errors if any"""
errors = []
if not vtt_content.strip().startswith("WEBVTT"):
errors.append("VTT must start with 'WEBVTT'")
try:
cues = VTTParser.parse(vtt_content)
# Check timing consistency
for i, cue in enumerate(cues):
if cue.start_time >= cue.end_time:
errors.append(f"Cue {i + 1}: Start time must be before end time")
if i > 0 and cue.start_time < cues[i - 1].end_time:
errors.append(f"Cue {i + 1}: Overlapping with previous cue")
if not cue.text.strip():
errors.append(f"Cue {i + 1}: Empty text content")
except Exception as e:
errors.append(f"Parse error: {str(e)}")
return len(errors) == 0, errors
@staticmethod
def get_cue_count(vtt_content: str) -> int:
"""Get the number of cues in VTT content"""
try:
cues = VTTParser.parse(vtt_content)
return len(cues)
except Exception:
return 0
@staticmethod
def get_total_duration(vtt_content: str) -> float:
"""Get total duration of VTT content in seconds"""
try:
cues = VTTParser.parse(vtt_content)
if not cues:
return 0.0
return max(cue.end_time for cue in cues)
except Exception:
return 0.0
@staticmethod
def validate_translation_timing(source_vtt: str, translated_vtt: str) -> tuple[bool, list[str]]:
"""Verify that translated VTT has identical timestamps to the source VTT"""
errors = []
try:
source_cues = VTTParser.parse(source_vtt)
translated_cues = VTTParser.parse(translated_vtt)
if len(source_cues) != len(translated_cues):
errors.append(
f"Cue count mismatch: source has {len(source_cues)}, "
f"translation has {len(translated_cues)}"
)
return False, errors
for i, (src, tgt) in enumerate(zip(source_cues, translated_cues)):
if abs(src.start_time - tgt.start_time) > 0.001:
errors.append(
f"Cue {i + 1}: start time changed "
f"({src.start_time:.3f}s -> {tgt.start_time:.3f}s)"
)
if abs(src.end_time - tgt.end_time) > 0.001:
errors.append(
f"Cue {i + 1}: end time changed "
f"({src.end_time:.3f}s -> {tgt.end_time:.3f}s)"
)
except Exception as e:
errors.append(f"Validation error: {str(e)}")
return len(errors) == 0, errors
@staticmethod
def adjust_timing_offset(vtt_content: str, offset_seconds: float) -> str:
"""
Adjust all VTT cue timings by a fixed offset
Positive offset moves captions later, negative moves them earlier
"""
cues = VTTParser.parse(vtt_content)
for cue in cues:
cue.start_time = max(0.0, cue.start_time + offset_seconds)
cue.end_time = max(cue.start_time + 0.5, cue.end_time + offset_seconds)
return VTTParser.build(cues)