video-accessibility/backend/app/lib/vtt.py
Vadym Samoilenko fddf803b74 feat(translation): enforce EN-first pipeline with cue-preserving translations
All translations now derive strictly from the approved English master VTT,
eliminating the cue-count and timestamp drift reported by linguists
(e.g. PL AD = 11 cues vs EN AD = 17 cues).

Key changes:
- Remove video_native translation mode entirely; all languages go through
  translate_vtt() which guarantees 1:1 cue alignment with EN master
- Transcreation languages now use translate_vtt(style="transcreate") —
  same cue-preserving contract, culturally-adapted instructions
- Post-translation cue alignment validator added (VTTEditor.assert_cue_alignment)
- After ingestion, job moves to PENDING_QC (EN-only) instead of TRANSLATING;
  translation pipeline dispatches automatically when EN QC is approved
- New POST /jobs/{id}/retranslate-language endpoint for PM/admin to fix
  legacy video_native jobs on demand
- Frontend: origin badge (EN-aligned / transcreated / video-native warning),
  EN-first gate banner on target-language cards, Re-translate from EN button
  with confirm modal, removed translation mode selector from NewJob

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 12:11:35 +01:00

268 lines
9.2 KiB
Python

import re
from dataclasses import dataclass
@dataclass
class VTTCue:
start_time: float # seconds
end_time: float # seconds
text: str
identifier: str | None = None
class VTTParser:
"""Parser and builder for WebVTT files"""
@staticmethod
def parse(vtt_content: str) -> list[VTTCue]:
"""Parse VTT content into a list of cues"""
lines = vtt_content.strip().split('\n')
cues = []
i = 0
while i < len(lines):
line = lines[i].strip()
# Skip WEBVTT header, empty lines, and NOTE lines
if line == "WEBVTT" or line == "" or line.startswith("NOTE"):
i += 1
continue
# Check if this line is a cue identifier (optional)
identifier = None
if " --> " not in line and i + 1 < len(lines) and " --> " in lines[i + 1]:
identifier = line
i += 1
line = lines[i].strip()
# Parse timing line
if " --> " in line:
timing_match = re.match(r'([\d:.,]+)\s+-->\s+([\d:.,]+)', line)
if timing_match:
start_time = VTTParser._parse_timestamp(timing_match.group(1))
end_time = VTTParser._parse_timestamp(timing_match.group(2))
# Collect text lines until empty line or next cue
i += 1
text_lines = []
while i < len(lines) and lines[i].strip() != "":
text_lines.append(lines[i].strip())
i += 1
cues.append(VTTCue(
start_time=start_time,
end_time=end_time,
text="\n".join(text_lines),
identifier=identifier
))
else:
i += 1
return cues
@staticmethod
def build(cues: list[VTTCue]) -> str:
"""Build VTT content from a list of cues"""
lines = ["WEBVTT", ""]
for cue in cues:
# Add identifier if present
if cue.identifier:
lines.append(cue.identifier)
# Add timing line
start_timestamp = VTTParser._format_timestamp(cue.start_time)
end_timestamp = VTTParser._format_timestamp(cue.end_time)
lines.append(f"{start_timestamp} --> {end_timestamp}")
# Add text (can be multi-line)
lines.append(cue.text)
lines.append("") # Empty line between cues
return "\n".join(lines) + "\n"
@staticmethod
def _parse_timestamp(timestamp: str) -> float:
"""Convert VTT timestamp (HH:MM:SS.mmm or MM:SS.mmm) to seconds"""
# Clean up timestamp (handle both . and , as decimal separator)
timestamp = timestamp.replace(',', '.')
# Split by colon
parts = timestamp.split(':')
if len(parts) == 3: # HH:MM:SS.mmm
hours, minutes, seconds = parts
elif len(parts) == 2: # MM:SS.mmm
hours, minutes, seconds = "0", parts[0], parts[1]
else:
raise ValueError(f"Invalid timestamp format: {timestamp}")
# Parse seconds and decimal part
sec_parts = seconds.split('.')
whole_seconds = int(sec_parts[0])
decimal_part = int(sec_parts[1]) if len(sec_parts) > 1 else 0
# Convert to total seconds
total_seconds = (
int(hours) * 3600 +
int(minutes) * 60 +
whole_seconds +
decimal_part / 1000.0
)
return total_seconds
@staticmethod
def _format_timestamp(seconds: float) -> str:
"""Convert seconds to VTT timestamp format (HH:MM:SS.mmm)"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
whole_secs = int(secs)
milliseconds = round((secs - whole_secs) * 1000)
return f"{hours:02d}:{minutes:02d}:{whole_secs:02d}.{milliseconds:03d}"
class VTTEditor:
"""Utility class for editing VTT content while preserving timing"""
@staticmethod
def translate_preserving_timing(
vtt_content: str,
translated_texts: list[str]
) -> str:
"""Replace text in VTT cues while preserving all timing information"""
cues = VTTParser.parse(vtt_content)
if len(translated_texts) != len(cues):
raise ValueError(
f"Text count mismatch: {len(translated_texts)} texts for {len(cues)} cues"
)
# Update cue texts
for i, translated_text in enumerate(translated_texts):
cues[i].text = translated_text
return VTTParser.build(cues)
@staticmethod
def assert_cue_alignment(en_vtt: str, target_vtt: str, lang: str) -> None:
"""Raise ValueError if target VTT cue count or timestamps diverge from EN master."""
en_cues = VTTParser.parse(en_vtt)
tgt_cues = VTTParser.parse(target_vtt)
if len(tgt_cues) != len(en_cues):
raise ValueError(
f"Cue count mismatch for {lang}: EN has {len(en_cues)}, target has {len(tgt_cues)}"
)
for i, (en, tgt) in enumerate(zip(en_cues, tgt_cues)):
if en.start != tgt.start or en.end != tgt.end:
raise ValueError(
f"Timestamp mismatch for {lang} cue {i}: "
f"EN {en.start}-->{en.end}, target {tgt.start}-->{tgt.end}"
)
@staticmethod
def update_cue_text(vtt_content: str, cue_index: int, new_text: str) -> str:
"""Update text for a specific cue by index"""
cues = VTTParser.parse(vtt_content)
if cue_index < 0 or cue_index >= len(cues):
raise ValueError(f"Invalid cue index: {cue_index}")
cues[cue_index].text = new_text
return VTTParser.build(cues)
@staticmethod
def validate_vtt(vtt_content: str) -> tuple[bool, list[str]]:
"""Validate VTT content and return errors if any"""
errors = []
if not vtt_content.strip().startswith("WEBVTT"):
errors.append("VTT must start with 'WEBVTT'")
try:
cues = VTTParser.parse(vtt_content)
# Check timing consistency
for i, cue in enumerate(cues):
if cue.start_time >= cue.end_time:
errors.append(f"Cue {i + 1}: Start time must be before end time")
if i > 0 and cue.start_time < cues[i - 1].end_time:
errors.append(f"Cue {i + 1}: Overlapping with previous cue")
if not cue.text.strip():
errors.append(f"Cue {i + 1}: Empty text content")
except Exception as e:
errors.append(f"Parse error: {str(e)}")
return len(errors) == 0, errors
@staticmethod
def get_cue_count(vtt_content: str) -> int:
"""Get the number of cues in VTT content"""
try:
cues = VTTParser.parse(vtt_content)
return len(cues)
except Exception:
return 0
@staticmethod
def get_total_duration(vtt_content: str) -> float:
"""Get total duration of VTT content in seconds"""
try:
cues = VTTParser.parse(vtt_content)
if not cues:
return 0.0
return max(cue.end_time for cue in cues)
except Exception:
return 0.0
@staticmethod
def validate_translation_timing(source_vtt: str, translated_vtt: str) -> tuple[bool, list[str]]:
"""Verify that translated VTT has identical timestamps to the source VTT"""
errors = []
try:
source_cues = VTTParser.parse(source_vtt)
translated_cues = VTTParser.parse(translated_vtt)
if len(source_cues) != len(translated_cues):
errors.append(
f"Cue count mismatch: source has {len(source_cues)}, "
f"translation has {len(translated_cues)}"
)
return False, errors
for i, (src, tgt) in enumerate(zip(source_cues, translated_cues)):
if abs(src.start_time - tgt.start_time) > 0.001:
errors.append(
f"Cue {i + 1}: start time changed "
f"({src.start_time:.3f}s -> {tgt.start_time:.3f}s)"
)
if abs(src.end_time - tgt.end_time) > 0.001:
errors.append(
f"Cue {i + 1}: end time changed "
f"({src.end_time:.3f}s -> {tgt.end_time:.3f}s)"
)
except Exception as e:
errors.append(f"Validation error: {str(e)}")
return len(errors) == 0, errors
@staticmethod
def adjust_timing_offset(vtt_content: str, offset_seconds: float) -> str:
"""
Adjust all VTT cue timings by a fixed offset
Positive offset moves captions later, negative moves them earlier
"""
cues = VTTParser.parse(vtt_content)
for cue in cues:
cue.start_time = max(0.0, cue.start_time + offset_seconds)
cue.end_time = max(cue.start_time + 0.5, cue.end_time + offset_seconds)
return VTTParser.build(cues)