- caption_aligner: lower match ratio 0.5→0.35, widen search window 60→150, add time-based cursor fallback on miss - gemini.py: explicit 'MUST use glossary terms' requirement in translate_vtt prompt; source_has_ad prompt now instructs not to include AD narration in captions - ingest_and_ai: load glossary for source language and pass to extract_accessibility - render_accessible_video: handle source_has_ad=True via caption-embed path (ffmpeg subtitle inject, no AD pipeline) - translate_and_synthesize: track failed languages, write translation_errors to DB, add exc_info to error log - vtt.py: expand _FILLER_PATTERNS to nl/pt/pl/uk/ru, widen EN/ES/FR/DE/IT lists - gemini_ingestion.md: strengthen line:0% placement rule, expand disfluency examples per language Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
318 lines
12 KiB
Python
318 lines
12 KiB
Python
import re
|
|
from dataclasses import dataclass
|
|
|
|
|
|
@dataclass
|
|
class VTTCue:
|
|
start_time: float # seconds
|
|
end_time: float # seconds
|
|
text: str
|
|
identifier: str | None = None
|
|
settings: str = ""
|
|
|
|
|
|
class VTTParser:
|
|
"""Parser and builder for WebVTT files"""
|
|
|
|
@staticmethod
|
|
def parse(vtt_content: str) -> list[VTTCue]:
|
|
"""Parse VTT content into a list of cues"""
|
|
lines = vtt_content.strip().split('\n')
|
|
cues = []
|
|
|
|
i = 0
|
|
while i < len(lines):
|
|
line = lines[i].strip()
|
|
|
|
# Skip WEBVTT header, empty lines, and NOTE lines
|
|
if line == "WEBVTT" or line == "" or line.startswith("NOTE"):
|
|
i += 1
|
|
continue
|
|
|
|
# Check if this line is a cue identifier (optional)
|
|
identifier = None
|
|
if " --> " not in line and i + 1 < len(lines) and " --> " in lines[i + 1]:
|
|
identifier = line
|
|
i += 1
|
|
line = lines[i].strip()
|
|
|
|
# Parse timing line
|
|
if " --> " in line:
|
|
timing_match = re.match(r'([\d:.,]+)\s+-->\s+([\d:.,]+)\s*(.*)', line)
|
|
if timing_match:
|
|
start_time = VTTParser._parse_timestamp(timing_match.group(1))
|
|
end_time = VTTParser._parse_timestamp(timing_match.group(2))
|
|
settings = timing_match.group(3).strip()
|
|
|
|
# Collect text lines until empty line or next cue
|
|
i += 1
|
|
text_lines = []
|
|
while i < len(lines) and lines[i].strip() != "":
|
|
text_lines.append(lines[i].strip())
|
|
i += 1
|
|
|
|
cues.append(VTTCue(
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
text="\n".join(text_lines),
|
|
identifier=identifier,
|
|
settings=settings,
|
|
))
|
|
else:
|
|
i += 1
|
|
|
|
return cues
|
|
|
|
@staticmethod
|
|
def build(cues: list[VTTCue]) -> str:
|
|
"""Build VTT content from a list of cues"""
|
|
lines = ["WEBVTT", ""]
|
|
|
|
for cue in cues:
|
|
# Add identifier if present
|
|
if cue.identifier:
|
|
lines.append(cue.identifier)
|
|
|
|
# Add timing line (preserve cue settings like line:0%)
|
|
start_timestamp = VTTParser._format_timestamp(cue.start_time)
|
|
end_timestamp = VTTParser._format_timestamp(cue.end_time)
|
|
timing_line = f"{start_timestamp} --> {end_timestamp}"
|
|
if cue.settings:
|
|
timing_line += f" {cue.settings}"
|
|
lines.append(timing_line)
|
|
|
|
# Add text (can be multi-line)
|
|
lines.append(cue.text)
|
|
lines.append("") # Empty line between cues
|
|
|
|
return "\n".join(lines) + "\n"
|
|
|
|
@staticmethod
|
|
def _parse_timestamp(timestamp: str) -> float:
|
|
"""Convert VTT timestamp (HH:MM:SS.mmm or MM:SS.mmm) to seconds"""
|
|
# Clean up timestamp (handle both . and , as decimal separator)
|
|
timestamp = timestamp.replace(',', '.')
|
|
|
|
# Split by colon
|
|
parts = timestamp.split(':')
|
|
|
|
if len(parts) == 3: # HH:MM:SS.mmm
|
|
hours, minutes, seconds = parts
|
|
elif len(parts) == 2: # MM:SS.mmm
|
|
hours, minutes, seconds = "0", parts[0], parts[1]
|
|
else:
|
|
raise ValueError(f"Invalid timestamp format: {timestamp}")
|
|
|
|
# Parse seconds and decimal part
|
|
sec_parts = seconds.split('.')
|
|
whole_seconds = int(sec_parts[0])
|
|
decimal_part = int(sec_parts[1]) if len(sec_parts) > 1 else 0
|
|
|
|
# Convert to total seconds
|
|
total_seconds = (
|
|
int(hours) * 3600 +
|
|
int(minutes) * 60 +
|
|
whole_seconds +
|
|
decimal_part / 1000.0
|
|
)
|
|
|
|
return total_seconds
|
|
|
|
@staticmethod
|
|
def _format_timestamp(seconds: float) -> str:
|
|
"""Convert seconds to VTT timestamp format (HH:MM:SS.mmm)"""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
secs = seconds % 60
|
|
|
|
whole_secs = int(secs)
|
|
milliseconds = round((secs - whole_secs) * 1000)
|
|
|
|
return f"{hours:02d}:{minutes:02d}:{whole_secs:02d}.{milliseconds:03d}"
|
|
|
|
|
|
class VTTEditor:
|
|
"""Utility class for editing VTT content while preserving timing"""
|
|
|
|
@staticmethod
|
|
def translate_preserving_timing(
|
|
vtt_content: str,
|
|
translated_texts: list[str]
|
|
) -> str:
|
|
"""Replace text in VTT cues while preserving all timing information"""
|
|
cues = VTTParser.parse(vtt_content)
|
|
|
|
if len(translated_texts) != len(cues):
|
|
raise ValueError(
|
|
f"Text count mismatch: {len(translated_texts)} texts for {len(cues)} cues"
|
|
)
|
|
|
|
# Update cue texts
|
|
for i, translated_text in enumerate(translated_texts):
|
|
cues[i].text = translated_text
|
|
|
|
return VTTParser.build(cues)
|
|
|
|
@staticmethod
|
|
def assert_cue_alignment(en_vtt: str, target_vtt: str, lang: str) -> None:
|
|
"""Raise ValueError if target VTT cue count or timestamps diverge from EN master."""
|
|
en_cues = VTTParser.parse(en_vtt)
|
|
tgt_cues = VTTParser.parse(target_vtt)
|
|
if len(tgt_cues) != len(en_cues):
|
|
raise ValueError(
|
|
f"Cue count mismatch for {lang}: EN has {len(en_cues)}, target has {len(tgt_cues)}"
|
|
)
|
|
for i, (en, tgt) in enumerate(zip(en_cues, tgt_cues, strict=True)):
|
|
if en.start_time != tgt.start_time or en.end_time != tgt.end_time:
|
|
raise ValueError(
|
|
f"Timestamp mismatch for {lang} cue {i}: "
|
|
f"EN {en.start_time}-->{en.end_time}, target {tgt.start_time}-->{tgt.end_time}"
|
|
)
|
|
|
|
@staticmethod
|
|
def update_cue_text(vtt_content: str, cue_index: int, new_text: str) -> str:
|
|
"""Update text for a specific cue by index"""
|
|
cues = VTTParser.parse(vtt_content)
|
|
|
|
if cue_index < 0 or cue_index >= len(cues):
|
|
raise ValueError(f"Invalid cue index: {cue_index}")
|
|
|
|
cues[cue_index].text = new_text
|
|
return VTTParser.build(cues)
|
|
|
|
@staticmethod
|
|
def validate_vtt(vtt_content: str) -> tuple[bool, list[str]]:
|
|
"""Validate VTT content and return errors if any"""
|
|
errors = []
|
|
|
|
if not vtt_content.strip().startswith("WEBVTT"):
|
|
errors.append("VTT must start with 'WEBVTT'")
|
|
|
|
try:
|
|
cues = VTTParser.parse(vtt_content)
|
|
|
|
# Check timing consistency
|
|
for i, cue in enumerate(cues):
|
|
if cue.start_time >= cue.end_time:
|
|
errors.append(f"Cue {i + 1}: Start time must be before end time")
|
|
|
|
if i > 0 and cue.start_time < cues[i - 1].end_time:
|
|
errors.append(f"Cue {i + 1}: Overlapping with previous cue")
|
|
|
|
if not cue.text.strip():
|
|
errors.append(f"Cue {i + 1}: Empty text content")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Parse error: {str(e)}")
|
|
|
|
return len(errors) == 0, errors
|
|
|
|
@staticmethod
|
|
def fix_overlapping_cues(vtt_content: str) -> str:
|
|
"""Trim end_time of each cue so it does not overlap the next cue's start_time."""
|
|
cues = VTTParser.parse(vtt_content)
|
|
for i in range(1, len(cues)):
|
|
if cues[i].start_time < cues[i - 1].end_time:
|
|
# Clamp previous cue end to 1ms before next cue start
|
|
new_end = cues[i].start_time - 0.001
|
|
# Never let end_time go at or below start_time
|
|
if new_end <= cues[i - 1].start_time:
|
|
new_end = cues[i - 1].start_time + 0.001
|
|
cues[i - 1].end_time = new_end
|
|
return VTTParser.build(cues)
|
|
|
|
@staticmethod
|
|
def get_cue_count(vtt_content: str) -> int:
|
|
"""Get the number of cues in VTT content"""
|
|
try:
|
|
cues = VTTParser.parse(vtt_content)
|
|
return len(cues)
|
|
except Exception:
|
|
return 0
|
|
|
|
@staticmethod
|
|
def get_total_duration(vtt_content: str) -> float:
|
|
"""Get total duration of VTT content in seconds"""
|
|
try:
|
|
cues = VTTParser.parse(vtt_content)
|
|
if not cues:
|
|
return 0.0
|
|
return max(cue.end_time for cue in cues)
|
|
except Exception:
|
|
return 0.0
|
|
|
|
@staticmethod
|
|
def validate_translation_timing(source_vtt: str, translated_vtt: str) -> tuple[bool, list[str]]:
|
|
"""Verify that translated VTT has identical timestamps to the source VTT"""
|
|
errors = []
|
|
try:
|
|
source_cues = VTTParser.parse(source_vtt)
|
|
translated_cues = VTTParser.parse(translated_vtt)
|
|
|
|
if len(source_cues) != len(translated_cues):
|
|
errors.append(
|
|
f"Cue count mismatch: source has {len(source_cues)}, "
|
|
f"translation has {len(translated_cues)}"
|
|
)
|
|
return False, errors
|
|
|
|
for i, (src, tgt) in enumerate(zip(source_cues, translated_cues, strict=False)):
|
|
if abs(src.start_time - tgt.start_time) > 0.001:
|
|
errors.append(
|
|
f"Cue {i + 1}: start time changed "
|
|
f"({src.start_time:.3f}s -> {tgt.start_time:.3f}s)"
|
|
)
|
|
if abs(src.end_time - tgt.end_time) > 0.001:
|
|
errors.append(
|
|
f"Cue {i + 1}: end time changed "
|
|
f"({src.end_time:.3f}s -> {tgt.end_time:.3f}s)"
|
|
)
|
|
except Exception as e:
|
|
errors.append(f"Validation error: {str(e)}")
|
|
|
|
return len(errors) == 0, errors
|
|
|
|
@staticmethod
|
|
def adjust_timing_offset(vtt_content: str, offset_seconds: float) -> str:
|
|
"""
|
|
Adjust all VTT cue timings by a fixed offset
|
|
Positive offset moves captions later, negative moves them earlier
|
|
"""
|
|
cues = VTTParser.parse(vtt_content)
|
|
|
|
for cue in cues:
|
|
cue.start_time = max(0.0, cue.start_time + offset_seconds)
|
|
cue.end_time = max(cue.start_time + 0.5, cue.end_time + offset_seconds)
|
|
|
|
return VTTParser.build(cues)
|
|
|
|
# DCMP §6.01 filler patterns per language (whole-word, case-insensitive)
|
|
_FILLER_PATTERNS: dict[str, str] = {
|
|
"en": r'\b(um+|uh+|ah+|er+|hmm+|you know|i mean|sort of|kind of|basically|literally|honestly|actually|right\?|so yeah)\b',
|
|
"es": r'\b(eh+|este|o sea|pues|bueno|o sea que|mmm+)\b',
|
|
"fr": r'\b(euh+|beh|ben|donc|quoi|enfin|voilà|genre)\b',
|
|
"de": r'\b(äh+|ähm+|halt|ne|also|naja|sozusagen|quasi)\b',
|
|
"it": r'\b(ehm+|allora|cioè|tipo|praticamente|insomma|ecco)\b',
|
|
"nl": r'\b(eh+|nou|zeg|eigenlijk|gewoon|toch|zo van|hè)\b',
|
|
"pt": r'\b(ahn+|hã+|né|sabe|tipo|então|assim)\b',
|
|
"pl": r'\b(no|że|bo|znaczy|właśnie|jakby|wiesz)\b',
|
|
"uk": r'\b(ну+|ем+|типу|знаєш|значить|власне|от)\b',
|
|
"ru": r'\b(ну+|эм+|типа|знаешь|значит|вот|собственно)\b',
|
|
}
|
|
|
|
@staticmethod
|
|
def clean_disfluencies(vtt_content: str, lang: str) -> str:
|
|
"""Remove filler words and hesitations per DCMP §6.01 for supported languages."""
|
|
pattern = VTTEditor._FILLER_PATTERNS.get(lang.split("-")[0].lower())
|
|
if not pattern:
|
|
return vtt_content
|
|
cues = VTTParser.parse(vtt_content)
|
|
compiled = re.compile(pattern, re.IGNORECASE)
|
|
for cue in cues:
|
|
cleaned = compiled.sub("", cue.text)
|
|
# Collapse multiple spaces and strip leading/trailing punctuation artifacts
|
|
cleaned = re.sub(r'[ \t]{2,}', ' ', cleaned).strip().strip(',').strip()
|
|
if cleaned:
|
|
cue.text = cleaned
|
|
return VTTParser.build(cues)
|
|
|