video-accessibility/backend/app/services/whisper_service.py
michael 030f1b67ee fix: enforce AD cue pause_point monotonicity to preserve cue order
Whisper's snap_pause_point() finds the nearest sentence boundary
independently per cue, which can move a later cue's pause_point before
an earlier cue's. The renderer then sorts by pause_point, producing
non-sequential cue indices in the timeline.

Add a forward monotonicity pass (clamp each pause_point >= previous) at
three layers for defense-in-depth:
- whisper_service: Phase 3 after consolidation
- video_renderer: before temporal sort in _render_pause_insert_method
- rerender_accessible_video: in _build_placements_with_adjustments

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 08:15:06 -06:00

638 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Service for Whisper-based speech analysis and pause point refinement."""
from __future__ import annotations
import logging
import os
import time
from dataclasses import dataclass
from faster_whisper import WhisperModel
# Use simple logging for Cloud Run compatibility (no dependency on app.core.logging)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Try to import settings, fall back to env vars for Cloud Run mode
try:
from ..core.config import settings
_HAS_SETTINGS = True
except Exception:
_HAS_SETTINGS = False
settings = None # type: ignore
def _get_setting(name: str, default):
"""Get setting value from Settings object or environment variable."""
if _HAS_SETTINGS and settings:
return getattr(settings, name, default)
# Fall back to environment variable
env_val = os.environ.get(name.upper())
if env_val is not None:
# Try to convert to same type as default
if isinstance(default, float):
return float(env_val)
if isinstance(default, int):
return int(env_val)
return env_val
return default
@dataclass
class WordTimestamp:
"""A single word with its timing information."""
word: str
start: float # seconds
end: float # seconds
def to_dict(self) -> dict:
"""Convert to serializable dict for Celery task results."""
return {"word": self.word, "start": self.start, "end": self.end}
@classmethod
def from_dict(cls, data: dict) -> WordTimestamp:
"""Create from dict (for deserializing Celery task results)."""
return cls(word=data["word"], start=data["start"], end=data["end"])
@dataclass
class SpeechGap:
"""A gap between words, potential pause point."""
start: float # End of previous word
end: float # Start of next word
duration: float # Gap duration in seconds
gap_type: str # "sentence", "phrase", or "word"
@property
def priority(self) -> int:
"""Lower number = higher priority for snapping."""
return {"sentence": 1, "phrase": 2, "word": 3}.get(self.gap_type, 4)
@dataclass
class SentenceBoundary:
"""A sentence boundary (start or end) for pause point snapping.
Used to determine where to place pause points relative to sentences:
- sentence_end: The end time of a word ending with .!?
- sentence_start: The start time of the first word after a sentence-ending word
"""
time: float # The boundary timestamp
boundary_type: str # "sentence_start" or "sentence_end"
word_index: int # Index of the associated word in the words list
has_previous_sentence: bool # Is there a sentence before this boundary?
has_next_sentence: bool # Is there a sentence after this boundary?
gap: SpeechGap | None # The gap this boundary belongs to (for double-buffer case)
class WhisperService:
"""Service for speech analysis using faster-whisper."""
def __init__(self):
self._model: WhisperModel | None = None
self._model_name = _get_setting('whisper_model', 'base')
# Gap classification thresholds (in seconds)
self.sentence_gap_threshold = _get_setting('whisper_sentence_gap_threshold', 0.5)
self.phrase_gap_threshold = _get_setting('whisper_phrase_gap_threshold', 0.3)
self.min_gap_threshold = _get_setting('whisper_min_gap_threshold', 0.15)
# Snapping configuration
self.max_search_window = _get_setting('whisper_max_search_window', 30.0)
@property
def model(self) -> WhisperModel:
"""Lazy-load Whisper model on first use."""
if self._model is None:
cpu_threads = os.cpu_count() or 4 # Fallback to 4 if cpu_count() returns None
logger.info(
f"Loading Whisper model '{self._model_name}' "
f"(device=cpu, compute_type=int8, cpu_threads={cpu_threads})..."
)
load_start = time.time()
self._model = WhisperModel(
self._model_name,
device="cpu",
compute_type="int8", # Quantized for faster CPU inference
cpu_threads=cpu_threads
)
load_time = time.time() - load_start
logger.info(f"Whisper model '{self._model_name}' loaded successfully in {load_time:.2f}s")
return self._model
def transcribe_audio(self, audio_path: str) -> list[WordTimestamp]:
"""
Transcribe audio file and return word-level timestamps.
Args:
audio_path: Path to audio file (MP3, WAV, etc.)
Returns:
List of WordTimestamp objects with word-level timing
"""
logger.info(f"Starting Whisper transcription using model '{self._model_name}': {audio_path}")
transcribe_start = time.time()
segments, info = self.model.transcribe(
audio_path,
word_timestamps=True,
vad_filter=True, # Filter out non-speech
vad_parameters={
"min_silence_duration_ms": 200,
"speech_pad_ms": 100
}
)
words = []
for segment in segments:
if segment.words:
for word in segment.words:
words.append(WordTimestamp(
word=word.word.strip(),
start=word.start,
end=word.end
))
transcribe_time = time.time() - transcribe_start
logger.info(
f"Whisper transcription complete using model '{self._model_name}': "
f"{len(words)} words detected in {transcribe_time:.2f}s"
)
return words
def identify_speech_gaps(self, words: list[WordTimestamp]) -> list[SpeechGap]:
"""
Identify gaps between words that could serve as pause points.
Args:
words: List of word timestamps from Whisper
Returns:
List of SpeechGap objects sorted by start time
"""
if len(words) < 2:
return []
gaps = []
for i in range(len(words) - 1):
current_word = words[i]
next_word = words[i + 1]
gap_start = current_word.end
gap_end = next_word.start
duration = gap_end - gap_start
if duration < self.min_gap_threshold:
continue
# Classify gap type based on duration
if duration >= self.sentence_gap_threshold:
gap_type = "sentence"
elif duration >= self.phrase_gap_threshold:
gap_type = "phrase"
else:
gap_type = "word"
# Also check if previous word ends with sentence punctuation
word_text = current_word.word.rstrip()
if word_text.endswith(('.', '!', '?', '...', '', '', '')):
gap_type = "sentence"
elif word_text.endswith((',', ';', ':', '', '')):
if gap_type == "word":
gap_type = "phrase"
gaps.append(SpeechGap(
start=gap_start,
end=gap_end,
duration=duration,
gap_type=gap_type
))
logger.info(f"Identified {len(gaps)} speech gaps "
f"(sentence: {sum(1 for g in gaps if g.gap_type == 'sentence')}, "
f"phrase: {sum(1 for g in gaps if g.gap_type == 'phrase')}, "
f"word: {sum(1 for g in gaps if g.gap_type == 'word')})")
return sorted(gaps, key=lambda g: g.start)
def _is_during_speaking(
self,
pause_point: float,
words: list[WordTimestamp],
threshold: float = 2.0
) -> bool:
"""
Check if a pause point is "during speaking" (words nearby).
Args:
pause_point: The timestamp to check
words: List of word timestamps from Whisper
threshold: Max distance in seconds to consider "nearby" (default: 2.0s)
Returns:
True if any word is within ±threshold seconds of the pause point
"""
for word in words:
# Check if pause point is near word start or end
if abs(word.start - pause_point) <= threshold or abs(word.end - pause_point) <= threshold:
return True
return False
def _find_sentence_boundaries(
self,
words: list[WordTimestamp],
gaps: list[SpeechGap]
) -> list[SentenceBoundary]:
"""
Find all sentence boundaries (starts and ends) from the transcript.
Boundaries are identified from:
1. Words ending with sentence punctuation (.!?) - these mark sentence ends
2. Words following sentence-ending words - these mark sentence starts
3. Fallback: If no punctuation found, use the longest gap as a boundary
Args:
words: List of word timestamps from Whisper
gaps: List of speech gaps between words
Returns:
List of SentenceBoundary objects sorted by time
"""
if not words:
return []
boundaries: list[SentenceBoundary] = []
sentence_end_punctuation = ('.', '!', '?', '...', '', '', '')
# Track which word indices end sentences
sentence_ending_indices: set[int] = set()
# Find all sentence-ending words
for i, word in enumerate(words):
word_text = word.word.rstrip()
if word_text.endswith(sentence_end_punctuation):
sentence_ending_indices.add(i)
# If no sentence punctuation found, use the longest gap as a fallback
if not sentence_ending_indices and gaps:
longest_gap = max(gaps, key=lambda g: g.duration)
# Find the word index that ends at this gap
for i, word in enumerate(words[:-1]):
if abs(word.end - longest_gap.start) < 0.01: # Match within 10ms
sentence_ending_indices.add(i)
logger.info(
f"No sentence punctuation found, using longest gap "
f"({longest_gap.duration:.2f}s) at {longest_gap.start:.2f}s as boundary"
)
break
# Create boundaries from sentence-ending words
for i in sorted(sentence_ending_indices):
word = words[i]
# Find the gap after this word (if any)
associated_gap = None
for gap in gaps:
if abs(gap.start - word.end) < 0.01: # Match within 10ms
associated_gap = gap
break
# Check if there's a previous sentence (any sentence-ending word before this one)
has_previous = any(j < i for j in sentence_ending_indices) or i > 0
# Check if there's a next sentence (any word after this one)
has_next = i < len(words) - 1
# Add sentence END boundary
boundaries.append(SentenceBoundary(
time=word.end,
boundary_type="sentence_end",
word_index=i,
has_previous_sentence=has_previous,
has_next_sentence=has_next,
gap=associated_gap
))
# Add sentence START boundary (next word's start) if there's a next word
if has_next and associated_gap:
next_word = words[i + 1]
# For sentence_start, check if there was a previous sentence
# (the sentence that just ended counts as previous)
boundaries.append(SentenceBoundary(
time=next_word.start,
boundary_type="sentence_start",
word_index=i + 1,
has_previous_sentence=True, # The sentence that just ended
has_next_sentence=any(j > i for j in sentence_ending_indices),
gap=associated_gap
))
# Also add boundaries for the very first and last words if not already covered
if words:
# First word boundary (if not already a sentence start)
first_word = words[0]
has_first_boundary = any(
b.boundary_type == "sentence_start" and b.word_index == 0
for b in boundaries
)
if not has_first_boundary:
boundaries.append(SentenceBoundary(
time=first_word.start,
boundary_type="sentence_start",
word_index=0,
has_previous_sentence=False, # Nothing before first word
has_next_sentence=len(sentence_ending_indices) > 0 or len(words) > 1,
gap=None
))
# Last word boundary (if it's a sentence end not already covered)
last_idx = len(words) - 1
if last_idx not in sentence_ending_indices:
last_word = words[last_idx]
boundaries.append(SentenceBoundary(
time=last_word.end,
boundary_type="sentence_end",
word_index=last_idx,
has_previous_sentence=len(sentence_ending_indices) > 0 or last_idx > 0,
has_next_sentence=False, # Nothing after last word
gap=None
))
return sorted(boundaries, key=lambda b: b.time)
def snap_pause_point(
self,
gemini_pause: float,
words: list[WordTimestamp],
gaps: list[SpeechGap],
boundaries: list[SentenceBoundary],
speaking_threshold: float = 2.0
) -> tuple[float, float, str | None]:
"""
Snap a Gemini pause point to the nearest sentence boundary.
Simplified algorithm:
1. Check if "during speaking" (words within ±threshold)
- If NO → Use Gemini's exact pause point
2. If during speaking, find nearest sentence gap and snap to MIDPOINT
3. Edge cases:
- Case A: First sentence in video → pause at video start (0.0)
- Case B: Last sentence in video → pause at video end
The video renderer adds 500ms silence buffers before/after AD audio,
so no overlap or catch-up logic is needed here.
Args:
gemini_pause: Original pause point from Gemini (seconds)
words: List of word timestamps from Whisper
gaps: List of speech gaps from identify_speech_gaps()
boundaries: List of sentence boundaries from _find_sentence_boundaries()
speaking_threshold: Max distance to consider "during speaking" (default: 2.0s)
Returns:
Tuple of (pause_point, resume_from, warning_message_or_none)
Note: resume_from always equals pause_point with the simplified algorithm
"""
# Step 1: Check if "during speaking" (words within ±threshold)
if not self._is_during_speaking(gemini_pause, words, speaking_threshold):
# Not during speaking - use Gemini's exact pause point
logger.info(
f"Pause point {gemini_pause:.2f}s is NOT during speaking "
f"(no words within ±{speaking_threshold}s), using Gemini's exact point"
)
return gemini_pause, gemini_pause, None
# Step 2: During speaking - find nearest sentence boundary
if not boundaries:
# No boundaries found at all - use Gemini's point with warning
logger.warning(f"No sentence boundaries found, using Gemini's exact point {gemini_pause:.2f}s")
return gemini_pause, gemini_pause, "No sentence boundaries found in transcript"
# Find the boundary closest to the Gemini pause point
closest_boundary = min(boundaries, key=lambda b: abs(b.time - gemini_pause))
logger.debug(
f"Nearest boundary to {gemini_pause:.2f}s: {closest_boundary.boundary_type} "
f"at {closest_boundary.time:.2f}s (distance: {abs(closest_boundary.time - gemini_pause):.2f}s)"
)
# Case A: First sentence in video (no previous sentence) → snap to video start
if closest_boundary.boundary_type == "sentence_start" and not closest_boundary.has_previous_sentence:
pause_point = 0.0
logger.info(
f"Case A (first sentence): pause_point={pause_point:.2f}s "
f"(snapped to video start)"
)
return pause_point, pause_point, None
# Case B: Last sentence in video (no next sentence) → snap to boundary time
if closest_boundary.boundary_type == "sentence_end" and not closest_boundary.has_next_sentence:
pause_point = closest_boundary.time
logger.info(
f"Case B (last sentence): pause_point={pause_point:.2f}s "
f"(snapped to video end at sentence boundary)"
)
return pause_point, pause_point, None
# Case C: Gap between two sentences → snap to MIDPOINT of the gap
if closest_boundary.gap:
gap = closest_boundary.gap
# Calculate midpoint between end of previous sentence and start of next
midpoint = (gap.start + gap.end) / 2.0
logger.info(
f"Case C (between sentences): gap={gap.start:.2f}s-{gap.end:.2f}s, "
f"midpoint={midpoint:.2f}s (resume from same point)"
)
return midpoint, midpoint, None
# Fallback: No gap associated with boundary - use the boundary time directly
# This shouldn't normally happen but handles edge cases
pause_point = closest_boundary.time
logger.info(
f"Fallback: Using boundary at {closest_boundary.time:.2f}s, "
f"pause_point={pause_point:.2f}s (no gap available)"
)
return pause_point, pause_point, None
def refine_all_pause_points(
self,
placements: list[dict],
words: list[WordTimestamp],
gaps: list[SpeechGap],
consolidation_threshold: float = 5.0
) -> tuple[list[dict], list[str]]:
"""
Refine all pause points in a Gemini analysis result.
Two-phase algorithm:
Phase 1: Refine each pause point individually using ordered logic:
1. Check if "during speaking" (words within ±2s)
2. If not during speaking → use Gemini's exact point
3. If during speaking → snap to nearest boundary with appropriate buffering
Phase 2: Consolidate cues that are within 5s of each other (after all refinements)
Args:
placements: List of placement dicts from Gemini analysis
words: Word timestamps from Whisper transcription
gaps: Speech gaps from Whisper analysis
consolidation_threshold: If consecutive cues have pause points within
this many seconds, combine them to play back-to-back (default: 5.0s)
Returns:
Tuple of (refined_placements, warnings)
"""
refined_placements = []
warnings = []
# Pre-compute sentence boundaries once for all placements
boundaries = self._find_sentence_boundaries(words, gaps)
logger.info(f"Found {len(boundaries)} sentence boundaries for pause point refinement")
# Phase 1: Refine each pause point individually
for placement in placements:
refined = placement.copy()
if placement.get("pause_point") is not None:
original = placement["pause_point"]
pause_point, resume_from, warning = self.snap_pause_point(
original, words, gaps, boundaries
)
refined["pause_point"] = pause_point
refined["resume_from"] = resume_from
refined["original_pause_point"] = original # Preserve for debugging
if warning:
warnings.append(f"Cue {placement['ad_cue_index']}: {warning}")
logger.warning(f"Pause point refinement warning for cue {placement['ad_cue_index']}: {warning}")
refined_placements.append(refined)
# Phase 2: Consolidate cues that are close together (AFTER all individual refinements)
refined_placements = self._consolidate_close_cues(
refined_placements, consolidation_threshold, warnings
)
# Phase 3: Enforce monotonicity - pause_points must be non-decreasing in cue_index order
# Whisper's snap_pause_point() finds the nearest boundary independently per cue,
# which can move a later cue's pause_point before an earlier cue's.
for i in range(1, len(refined_placements)):
prev_pp = refined_placements[i - 1].get("pause_point")
curr_pp = refined_placements[i].get("pause_point")
if curr_pp is not None and prev_pp is not None and curr_pp < prev_pp:
refined_placements[i]["pause_point"] = prev_pp
refined_placements[i]["resume_from"] = prev_pp
refined_placements[i]["monotonicity_clamped"] = True
cue_idx = refined_placements[i].get("ad_cue_index", i)
prev_cue_idx = refined_placements[i - 1].get("ad_cue_index", i - 1)
warning_msg = (
f"Cue {cue_idx}: Monotonicity violation - pause_point {curr_pp:.2f}s "
f"was before cue {prev_cue_idx}'s pause_point {prev_pp:.2f}s, "
f"clamped to {prev_pp:.2f}s"
)
warnings.append(warning_msg)
logger.warning(warning_msg)
return refined_placements, warnings
def _consolidate_close_cues(
self,
placements: list[dict],
threshold: float,
warnings: list[str]
) -> list[dict]:
"""
Consolidate AD cues whose pause points are within threshold seconds of each other.
Consolidated cues share the same pause_point and play back-to-back during
the freeze frame. With the simplified midpoint algorithm, resume_from always
equals pause_point, so no complex buffer logic is needed.
Args:
placements: List of refined placement dicts
threshold: Max seconds between pause points to trigger consolidation
warnings: List to append warning messages to
Returns:
Updated placements with consolidated pause points
"""
if len(placements) < 2:
return placements
consolidated = [placements[0].copy()]
for i in range(1, len(placements)):
current = placements[i].copy()
previous = consolidated[-1]
current_pause = current.get("pause_point")
previous_pause = previous.get("pause_point")
if current_pause is not None and previous_pause is not None:
gap = current_pause - previous_pause
if 0 < gap <= threshold:
# Consolidate: set current cue to use same pause point as previous
original_pause = current_pause
current["pause_point"] = previous_pause
current["resume_from"] = previous_pause # Always same as pause_point
current["consolidated_with_previous"] = True
current["original_pause_point_before_consolidation"] = original_pause
logger.info(
f"Consolidated cue {current['ad_cue_index']} with previous cue: "
f"pause_point {original_pause:.2f}s -> {previous_pause:.2f}s "
f"(gap was {gap:.2f}s, threshold {threshold:.2f}s)"
)
warnings.append(
f"Cue {current['ad_cue_index']}: Consolidated with previous cue "
f"(pause points were {gap:.2f}s apart, playing back-to-back)"
)
consolidated.append(current)
# Log the final consolidated groups
self._log_consolidated_groups(consolidated)
return consolidated
def _log_consolidated_groups(self, placements: list[dict]) -> None:
"""Log information about consolidated AD cue groups."""
groups = []
current_group = []
for p in placements:
pause_point = p.get("pause_point")
if not current_group or current_group[-1].get("pause_point") == pause_point:
current_group.append(p)
else:
if len(current_group) > 1:
groups.append(current_group)
current_group = [p]
if len(current_group) > 1:
groups.append(current_group)
for group in groups:
cue_indices = [p.get("ad_cue_index") for p in group]
pause_point = group[0].get("pause_point")
first_resume = group[0].get("resume_from")
last_resume = group[-1].get("resume_from")
logger.info(
f"Consolidated group: cues {cue_indices} at pause_point={pause_point:.2f}s, "
f"first_resume_from={first_resume:.2f}s (should equal pause_point), "
f"last_resume_from={last_resume:.2f}s (back buffer)"
)
def _find_gap_type(self, timestamp: float, gaps: list[SpeechGap]) -> str:
"""Find the gap type for a given timestamp."""
for gap in gaps:
# Check if timestamp falls within this gap (between end of prev word and start of next)
if gap.start <= timestamp <= gap.end:
return gap.gap_type
return "unknown"
# Global service instance (lazy-loaded)
whisper_service = WhisperService()