Whisper's snap_pause_point() finds the nearest sentence boundary independently per cue, which can move a later cue's pause_point before an earlier cue's. The renderer then sorts by pause_point, producing non-sequential cue indices in the timeline. Add a forward monotonicity pass (clamp each pause_point >= previous) at three layers for defense-in-depth: - whisper_service: Phase 3 after consolidation - video_renderer: before temporal sort in _render_pause_insert_method - rerender_accessible_video: in _build_placements_with_adjustments Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
638 lines
25 KiB
Python
638 lines
25 KiB
Python
"""Service for Whisper-based speech analysis and pause point refinement."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
import time
|
||
from dataclasses import dataclass
|
||
|
||
from faster_whisper import WhisperModel
|
||
|
||
# Use simple logging for Cloud Run compatibility (no dependency on app.core.logging)
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Try to import settings, fall back to env vars for Cloud Run mode
|
||
try:
|
||
from ..core.config import settings
|
||
_HAS_SETTINGS = True
|
||
except Exception:
|
||
_HAS_SETTINGS = False
|
||
settings = None # type: ignore
|
||
|
||
|
||
def _get_setting(name: str, default):
|
||
"""Get setting value from Settings object or environment variable."""
|
||
if _HAS_SETTINGS and settings:
|
||
return getattr(settings, name, default)
|
||
# Fall back to environment variable
|
||
env_val = os.environ.get(name.upper())
|
||
if env_val is not None:
|
||
# Try to convert to same type as default
|
||
if isinstance(default, float):
|
||
return float(env_val)
|
||
if isinstance(default, int):
|
||
return int(env_val)
|
||
return env_val
|
||
return default
|
||
|
||
|
||
@dataclass
|
||
class WordTimestamp:
|
||
"""A single word with its timing information."""
|
||
word: str
|
||
start: float # seconds
|
||
end: float # seconds
|
||
|
||
def to_dict(self) -> dict:
|
||
"""Convert to serializable dict for Celery task results."""
|
||
return {"word": self.word, "start": self.start, "end": self.end}
|
||
|
||
@classmethod
|
||
def from_dict(cls, data: dict) -> WordTimestamp:
|
||
"""Create from dict (for deserializing Celery task results)."""
|
||
return cls(word=data["word"], start=data["start"], end=data["end"])
|
||
|
||
|
||
@dataclass
|
||
class SpeechGap:
|
||
"""A gap between words, potential pause point."""
|
||
start: float # End of previous word
|
||
end: float # Start of next word
|
||
duration: float # Gap duration in seconds
|
||
gap_type: str # "sentence", "phrase", or "word"
|
||
|
||
@property
|
||
def priority(self) -> int:
|
||
"""Lower number = higher priority for snapping."""
|
||
return {"sentence": 1, "phrase": 2, "word": 3}.get(self.gap_type, 4)
|
||
|
||
|
||
@dataclass
|
||
class SentenceBoundary:
|
||
"""A sentence boundary (start or end) for pause point snapping.
|
||
|
||
Used to determine where to place pause points relative to sentences:
|
||
- sentence_end: The end time of a word ending with .!?
|
||
- sentence_start: The start time of the first word after a sentence-ending word
|
||
"""
|
||
time: float # The boundary timestamp
|
||
boundary_type: str # "sentence_start" or "sentence_end"
|
||
word_index: int # Index of the associated word in the words list
|
||
has_previous_sentence: bool # Is there a sentence before this boundary?
|
||
has_next_sentence: bool # Is there a sentence after this boundary?
|
||
gap: SpeechGap | None # The gap this boundary belongs to (for double-buffer case)
|
||
|
||
|
||
class WhisperService:
|
||
"""Service for speech analysis using faster-whisper."""
|
||
|
||
def __init__(self):
|
||
self._model: WhisperModel | None = None
|
||
self._model_name = _get_setting('whisper_model', 'base')
|
||
|
||
# Gap classification thresholds (in seconds)
|
||
self.sentence_gap_threshold = _get_setting('whisper_sentence_gap_threshold', 0.5)
|
||
self.phrase_gap_threshold = _get_setting('whisper_phrase_gap_threshold', 0.3)
|
||
self.min_gap_threshold = _get_setting('whisper_min_gap_threshold', 0.15)
|
||
|
||
# Snapping configuration
|
||
self.max_search_window = _get_setting('whisper_max_search_window', 30.0)
|
||
|
||
@property
|
||
def model(self) -> WhisperModel:
|
||
"""Lazy-load Whisper model on first use."""
|
||
if self._model is None:
|
||
cpu_threads = os.cpu_count() or 4 # Fallback to 4 if cpu_count() returns None
|
||
logger.info(
|
||
f"Loading Whisper model '{self._model_name}' "
|
||
f"(device=cpu, compute_type=int8, cpu_threads={cpu_threads})..."
|
||
)
|
||
load_start = time.time()
|
||
self._model = WhisperModel(
|
||
self._model_name,
|
||
device="cpu",
|
||
compute_type="int8", # Quantized for faster CPU inference
|
||
cpu_threads=cpu_threads
|
||
)
|
||
load_time = time.time() - load_start
|
||
logger.info(f"Whisper model '{self._model_name}' loaded successfully in {load_time:.2f}s")
|
||
return self._model
|
||
|
||
def transcribe_audio(self, audio_path: str) -> list[WordTimestamp]:
|
||
"""
|
||
Transcribe audio file and return word-level timestamps.
|
||
|
||
Args:
|
||
audio_path: Path to audio file (MP3, WAV, etc.)
|
||
|
||
Returns:
|
||
List of WordTimestamp objects with word-level timing
|
||
"""
|
||
logger.info(f"Starting Whisper transcription using model '{self._model_name}': {audio_path}")
|
||
transcribe_start = time.time()
|
||
|
||
segments, info = self.model.transcribe(
|
||
audio_path,
|
||
word_timestamps=True,
|
||
vad_filter=True, # Filter out non-speech
|
||
vad_parameters={
|
||
"min_silence_duration_ms": 200,
|
||
"speech_pad_ms": 100
|
||
}
|
||
)
|
||
|
||
words = []
|
||
for segment in segments:
|
||
if segment.words:
|
||
for word in segment.words:
|
||
words.append(WordTimestamp(
|
||
word=word.word.strip(),
|
||
start=word.start,
|
||
end=word.end
|
||
))
|
||
|
||
transcribe_time = time.time() - transcribe_start
|
||
logger.info(
|
||
f"Whisper transcription complete using model '{self._model_name}': "
|
||
f"{len(words)} words detected in {transcribe_time:.2f}s"
|
||
)
|
||
return words
|
||
|
||
def identify_speech_gaps(self, words: list[WordTimestamp]) -> list[SpeechGap]:
|
||
"""
|
||
Identify gaps between words that could serve as pause points.
|
||
|
||
Args:
|
||
words: List of word timestamps from Whisper
|
||
|
||
Returns:
|
||
List of SpeechGap objects sorted by start time
|
||
"""
|
||
if len(words) < 2:
|
||
return []
|
||
|
||
gaps = []
|
||
for i in range(len(words) - 1):
|
||
current_word = words[i]
|
||
next_word = words[i + 1]
|
||
|
||
gap_start = current_word.end
|
||
gap_end = next_word.start
|
||
duration = gap_end - gap_start
|
||
|
||
if duration < self.min_gap_threshold:
|
||
continue
|
||
|
||
# Classify gap type based on duration
|
||
if duration >= self.sentence_gap_threshold:
|
||
gap_type = "sentence"
|
||
elif duration >= self.phrase_gap_threshold:
|
||
gap_type = "phrase"
|
||
else:
|
||
gap_type = "word"
|
||
|
||
# Also check if previous word ends with sentence punctuation
|
||
word_text = current_word.word.rstrip()
|
||
if word_text.endswith(('.', '!', '?', '...', '。', '!', '?')):
|
||
gap_type = "sentence"
|
||
elif word_text.endswith((',', ';', ':', '、', ',')):
|
||
if gap_type == "word":
|
||
gap_type = "phrase"
|
||
|
||
gaps.append(SpeechGap(
|
||
start=gap_start,
|
||
end=gap_end,
|
||
duration=duration,
|
||
gap_type=gap_type
|
||
))
|
||
|
||
logger.info(f"Identified {len(gaps)} speech gaps "
|
||
f"(sentence: {sum(1 for g in gaps if g.gap_type == 'sentence')}, "
|
||
f"phrase: {sum(1 for g in gaps if g.gap_type == 'phrase')}, "
|
||
f"word: {sum(1 for g in gaps if g.gap_type == 'word')})")
|
||
|
||
return sorted(gaps, key=lambda g: g.start)
|
||
|
||
def _is_during_speaking(
|
||
self,
|
||
pause_point: float,
|
||
words: list[WordTimestamp],
|
||
threshold: float = 2.0
|
||
) -> bool:
|
||
"""
|
||
Check if a pause point is "during speaking" (words nearby).
|
||
|
||
Args:
|
||
pause_point: The timestamp to check
|
||
words: List of word timestamps from Whisper
|
||
threshold: Max distance in seconds to consider "nearby" (default: 2.0s)
|
||
|
||
Returns:
|
||
True if any word is within ±threshold seconds of the pause point
|
||
"""
|
||
for word in words:
|
||
# Check if pause point is near word start or end
|
||
if abs(word.start - pause_point) <= threshold or abs(word.end - pause_point) <= threshold:
|
||
return True
|
||
return False
|
||
|
||
def _find_sentence_boundaries(
|
||
self,
|
||
words: list[WordTimestamp],
|
||
gaps: list[SpeechGap]
|
||
) -> list[SentenceBoundary]:
|
||
"""
|
||
Find all sentence boundaries (starts and ends) from the transcript.
|
||
|
||
Boundaries are identified from:
|
||
1. Words ending with sentence punctuation (.!?) - these mark sentence ends
|
||
2. Words following sentence-ending words - these mark sentence starts
|
||
3. Fallback: If no punctuation found, use the longest gap as a boundary
|
||
|
||
Args:
|
||
words: List of word timestamps from Whisper
|
||
gaps: List of speech gaps between words
|
||
|
||
Returns:
|
||
List of SentenceBoundary objects sorted by time
|
||
"""
|
||
if not words:
|
||
return []
|
||
|
||
boundaries: list[SentenceBoundary] = []
|
||
sentence_end_punctuation = ('.', '!', '?', '...', '。', '!', '?')
|
||
|
||
# Track which word indices end sentences
|
||
sentence_ending_indices: set[int] = set()
|
||
|
||
# Find all sentence-ending words
|
||
for i, word in enumerate(words):
|
||
word_text = word.word.rstrip()
|
||
if word_text.endswith(sentence_end_punctuation):
|
||
sentence_ending_indices.add(i)
|
||
|
||
# If no sentence punctuation found, use the longest gap as a fallback
|
||
if not sentence_ending_indices and gaps:
|
||
longest_gap = max(gaps, key=lambda g: g.duration)
|
||
# Find the word index that ends at this gap
|
||
for i, word in enumerate(words[:-1]):
|
||
if abs(word.end - longest_gap.start) < 0.01: # Match within 10ms
|
||
sentence_ending_indices.add(i)
|
||
logger.info(
|
||
f"No sentence punctuation found, using longest gap "
|
||
f"({longest_gap.duration:.2f}s) at {longest_gap.start:.2f}s as boundary"
|
||
)
|
||
break
|
||
|
||
# Create boundaries from sentence-ending words
|
||
for i in sorted(sentence_ending_indices):
|
||
word = words[i]
|
||
|
||
# Find the gap after this word (if any)
|
||
associated_gap = None
|
||
for gap in gaps:
|
||
if abs(gap.start - word.end) < 0.01: # Match within 10ms
|
||
associated_gap = gap
|
||
break
|
||
|
||
# Check if there's a previous sentence (any sentence-ending word before this one)
|
||
has_previous = any(j < i for j in sentence_ending_indices) or i > 0
|
||
|
||
# Check if there's a next sentence (any word after this one)
|
||
has_next = i < len(words) - 1
|
||
|
||
# Add sentence END boundary
|
||
boundaries.append(SentenceBoundary(
|
||
time=word.end,
|
||
boundary_type="sentence_end",
|
||
word_index=i,
|
||
has_previous_sentence=has_previous,
|
||
has_next_sentence=has_next,
|
||
gap=associated_gap
|
||
))
|
||
|
||
# Add sentence START boundary (next word's start) if there's a next word
|
||
if has_next and associated_gap:
|
||
next_word = words[i + 1]
|
||
# For sentence_start, check if there was a previous sentence
|
||
# (the sentence that just ended counts as previous)
|
||
boundaries.append(SentenceBoundary(
|
||
time=next_word.start,
|
||
boundary_type="sentence_start",
|
||
word_index=i + 1,
|
||
has_previous_sentence=True, # The sentence that just ended
|
||
has_next_sentence=any(j > i for j in sentence_ending_indices),
|
||
gap=associated_gap
|
||
))
|
||
|
||
# Also add boundaries for the very first and last words if not already covered
|
||
if words:
|
||
# First word boundary (if not already a sentence start)
|
||
first_word = words[0]
|
||
has_first_boundary = any(
|
||
b.boundary_type == "sentence_start" and b.word_index == 0
|
||
for b in boundaries
|
||
)
|
||
if not has_first_boundary:
|
||
boundaries.append(SentenceBoundary(
|
||
time=first_word.start,
|
||
boundary_type="sentence_start",
|
||
word_index=0,
|
||
has_previous_sentence=False, # Nothing before first word
|
||
has_next_sentence=len(sentence_ending_indices) > 0 or len(words) > 1,
|
||
gap=None
|
||
))
|
||
|
||
# Last word boundary (if it's a sentence end not already covered)
|
||
last_idx = len(words) - 1
|
||
if last_idx not in sentence_ending_indices:
|
||
last_word = words[last_idx]
|
||
boundaries.append(SentenceBoundary(
|
||
time=last_word.end,
|
||
boundary_type="sentence_end",
|
||
word_index=last_idx,
|
||
has_previous_sentence=len(sentence_ending_indices) > 0 or last_idx > 0,
|
||
has_next_sentence=False, # Nothing after last word
|
||
gap=None
|
||
))
|
||
|
||
return sorted(boundaries, key=lambda b: b.time)
|
||
|
||
def snap_pause_point(
|
||
self,
|
||
gemini_pause: float,
|
||
words: list[WordTimestamp],
|
||
gaps: list[SpeechGap],
|
||
boundaries: list[SentenceBoundary],
|
||
speaking_threshold: float = 2.0
|
||
) -> tuple[float, float, str | None]:
|
||
"""
|
||
Snap a Gemini pause point to the nearest sentence boundary.
|
||
|
||
Simplified algorithm:
|
||
1. Check if "during speaking" (words within ±threshold)
|
||
- If NO → Use Gemini's exact pause point
|
||
2. If during speaking, find nearest sentence gap and snap to MIDPOINT
|
||
3. Edge cases:
|
||
- Case A: First sentence in video → pause at video start (0.0)
|
||
- Case B: Last sentence in video → pause at video end
|
||
|
||
The video renderer adds 500ms silence buffers before/after AD audio,
|
||
so no overlap or catch-up logic is needed here.
|
||
|
||
Args:
|
||
gemini_pause: Original pause point from Gemini (seconds)
|
||
words: List of word timestamps from Whisper
|
||
gaps: List of speech gaps from identify_speech_gaps()
|
||
boundaries: List of sentence boundaries from _find_sentence_boundaries()
|
||
speaking_threshold: Max distance to consider "during speaking" (default: 2.0s)
|
||
|
||
Returns:
|
||
Tuple of (pause_point, resume_from, warning_message_or_none)
|
||
Note: resume_from always equals pause_point with the simplified algorithm
|
||
"""
|
||
# Step 1: Check if "during speaking" (words within ±threshold)
|
||
if not self._is_during_speaking(gemini_pause, words, speaking_threshold):
|
||
# Not during speaking - use Gemini's exact pause point
|
||
logger.info(
|
||
f"Pause point {gemini_pause:.2f}s is NOT during speaking "
|
||
f"(no words within ±{speaking_threshold}s), using Gemini's exact point"
|
||
)
|
||
return gemini_pause, gemini_pause, None
|
||
|
||
# Step 2: During speaking - find nearest sentence boundary
|
||
if not boundaries:
|
||
# No boundaries found at all - use Gemini's point with warning
|
||
logger.warning(f"No sentence boundaries found, using Gemini's exact point {gemini_pause:.2f}s")
|
||
return gemini_pause, gemini_pause, "No sentence boundaries found in transcript"
|
||
|
||
# Find the boundary closest to the Gemini pause point
|
||
closest_boundary = min(boundaries, key=lambda b: abs(b.time - gemini_pause))
|
||
|
||
logger.debug(
|
||
f"Nearest boundary to {gemini_pause:.2f}s: {closest_boundary.boundary_type} "
|
||
f"at {closest_boundary.time:.2f}s (distance: {abs(closest_boundary.time - gemini_pause):.2f}s)"
|
||
)
|
||
|
||
# Case A: First sentence in video (no previous sentence) → snap to video start
|
||
if closest_boundary.boundary_type == "sentence_start" and not closest_boundary.has_previous_sentence:
|
||
pause_point = 0.0
|
||
logger.info(
|
||
f"Case A (first sentence): pause_point={pause_point:.2f}s "
|
||
f"(snapped to video start)"
|
||
)
|
||
return pause_point, pause_point, None
|
||
|
||
# Case B: Last sentence in video (no next sentence) → snap to boundary time
|
||
if closest_boundary.boundary_type == "sentence_end" and not closest_boundary.has_next_sentence:
|
||
pause_point = closest_boundary.time
|
||
logger.info(
|
||
f"Case B (last sentence): pause_point={pause_point:.2f}s "
|
||
f"(snapped to video end at sentence boundary)"
|
||
)
|
||
return pause_point, pause_point, None
|
||
|
||
# Case C: Gap between two sentences → snap to MIDPOINT of the gap
|
||
if closest_boundary.gap:
|
||
gap = closest_boundary.gap
|
||
# Calculate midpoint between end of previous sentence and start of next
|
||
midpoint = (gap.start + gap.end) / 2.0
|
||
|
||
logger.info(
|
||
f"Case C (between sentences): gap={gap.start:.2f}s-{gap.end:.2f}s, "
|
||
f"midpoint={midpoint:.2f}s (resume from same point)"
|
||
)
|
||
return midpoint, midpoint, None
|
||
|
||
# Fallback: No gap associated with boundary - use the boundary time directly
|
||
# This shouldn't normally happen but handles edge cases
|
||
pause_point = closest_boundary.time
|
||
logger.info(
|
||
f"Fallback: Using boundary at {closest_boundary.time:.2f}s, "
|
||
f"pause_point={pause_point:.2f}s (no gap available)"
|
||
)
|
||
return pause_point, pause_point, None
|
||
|
||
def refine_all_pause_points(
|
||
self,
|
||
placements: list[dict],
|
||
words: list[WordTimestamp],
|
||
gaps: list[SpeechGap],
|
||
consolidation_threshold: float = 5.0
|
||
) -> tuple[list[dict], list[str]]:
|
||
"""
|
||
Refine all pause points in a Gemini analysis result.
|
||
|
||
Two-phase algorithm:
|
||
Phase 1: Refine each pause point individually using ordered logic:
|
||
1. Check if "during speaking" (words within ±2s)
|
||
2. If not during speaking → use Gemini's exact point
|
||
3. If during speaking → snap to nearest boundary with appropriate buffering
|
||
|
||
Phase 2: Consolidate cues that are within 5s of each other (after all refinements)
|
||
|
||
Args:
|
||
placements: List of placement dicts from Gemini analysis
|
||
words: Word timestamps from Whisper transcription
|
||
gaps: Speech gaps from Whisper analysis
|
||
consolidation_threshold: If consecutive cues have pause points within
|
||
this many seconds, combine them to play back-to-back (default: 5.0s)
|
||
|
||
Returns:
|
||
Tuple of (refined_placements, warnings)
|
||
"""
|
||
refined_placements = []
|
||
warnings = []
|
||
|
||
# Pre-compute sentence boundaries once for all placements
|
||
boundaries = self._find_sentence_boundaries(words, gaps)
|
||
logger.info(f"Found {len(boundaries)} sentence boundaries for pause point refinement")
|
||
|
||
# Phase 1: Refine each pause point individually
|
||
for placement in placements:
|
||
refined = placement.copy()
|
||
|
||
if placement.get("pause_point") is not None:
|
||
original = placement["pause_point"]
|
||
pause_point, resume_from, warning = self.snap_pause_point(
|
||
original, words, gaps, boundaries
|
||
)
|
||
|
||
refined["pause_point"] = pause_point
|
||
refined["resume_from"] = resume_from
|
||
refined["original_pause_point"] = original # Preserve for debugging
|
||
|
||
if warning:
|
||
warnings.append(f"Cue {placement['ad_cue_index']}: {warning}")
|
||
logger.warning(f"Pause point refinement warning for cue {placement['ad_cue_index']}: {warning}")
|
||
|
||
refined_placements.append(refined)
|
||
|
||
# Phase 2: Consolidate cues that are close together (AFTER all individual refinements)
|
||
refined_placements = self._consolidate_close_cues(
|
||
refined_placements, consolidation_threshold, warnings
|
||
)
|
||
|
||
# Phase 3: Enforce monotonicity - pause_points must be non-decreasing in cue_index order
|
||
# Whisper's snap_pause_point() finds the nearest boundary independently per cue,
|
||
# which can move a later cue's pause_point before an earlier cue's.
|
||
for i in range(1, len(refined_placements)):
|
||
prev_pp = refined_placements[i - 1].get("pause_point")
|
||
curr_pp = refined_placements[i].get("pause_point")
|
||
if curr_pp is not None and prev_pp is not None and curr_pp < prev_pp:
|
||
refined_placements[i]["pause_point"] = prev_pp
|
||
refined_placements[i]["resume_from"] = prev_pp
|
||
refined_placements[i]["monotonicity_clamped"] = True
|
||
cue_idx = refined_placements[i].get("ad_cue_index", i)
|
||
prev_cue_idx = refined_placements[i - 1].get("ad_cue_index", i - 1)
|
||
warning_msg = (
|
||
f"Cue {cue_idx}: Monotonicity violation - pause_point {curr_pp:.2f}s "
|
||
f"was before cue {prev_cue_idx}'s pause_point {prev_pp:.2f}s, "
|
||
f"clamped to {prev_pp:.2f}s"
|
||
)
|
||
warnings.append(warning_msg)
|
||
logger.warning(warning_msg)
|
||
|
||
return refined_placements, warnings
|
||
|
||
def _consolidate_close_cues(
|
||
self,
|
||
placements: list[dict],
|
||
threshold: float,
|
||
warnings: list[str]
|
||
) -> list[dict]:
|
||
"""
|
||
Consolidate AD cues whose pause points are within threshold seconds of each other.
|
||
|
||
Consolidated cues share the same pause_point and play back-to-back during
|
||
the freeze frame. With the simplified midpoint algorithm, resume_from always
|
||
equals pause_point, so no complex buffer logic is needed.
|
||
|
||
Args:
|
||
placements: List of refined placement dicts
|
||
threshold: Max seconds between pause points to trigger consolidation
|
||
warnings: List to append warning messages to
|
||
|
||
Returns:
|
||
Updated placements with consolidated pause points
|
||
"""
|
||
if len(placements) < 2:
|
||
return placements
|
||
|
||
consolidated = [placements[0].copy()]
|
||
|
||
for i in range(1, len(placements)):
|
||
current = placements[i].copy()
|
||
previous = consolidated[-1]
|
||
|
||
current_pause = current.get("pause_point")
|
||
previous_pause = previous.get("pause_point")
|
||
|
||
if current_pause is not None and previous_pause is not None:
|
||
gap = current_pause - previous_pause
|
||
|
||
if 0 < gap <= threshold:
|
||
# Consolidate: set current cue to use same pause point as previous
|
||
original_pause = current_pause
|
||
current["pause_point"] = previous_pause
|
||
current["resume_from"] = previous_pause # Always same as pause_point
|
||
current["consolidated_with_previous"] = True
|
||
current["original_pause_point_before_consolidation"] = original_pause
|
||
|
||
logger.info(
|
||
f"Consolidated cue {current['ad_cue_index']} with previous cue: "
|
||
f"pause_point {original_pause:.2f}s -> {previous_pause:.2f}s "
|
||
f"(gap was {gap:.2f}s, threshold {threshold:.2f}s)"
|
||
)
|
||
warnings.append(
|
||
f"Cue {current['ad_cue_index']}: Consolidated with previous cue "
|
||
f"(pause points were {gap:.2f}s apart, playing back-to-back)"
|
||
)
|
||
consolidated.append(current)
|
||
|
||
# Log the final consolidated groups
|
||
self._log_consolidated_groups(consolidated)
|
||
|
||
return consolidated
|
||
|
||
def _log_consolidated_groups(self, placements: list[dict]) -> None:
|
||
"""Log information about consolidated AD cue groups."""
|
||
groups = []
|
||
current_group = []
|
||
|
||
for p in placements:
|
||
pause_point = p.get("pause_point")
|
||
if not current_group or current_group[-1].get("pause_point") == pause_point:
|
||
current_group.append(p)
|
||
else:
|
||
if len(current_group) > 1:
|
||
groups.append(current_group)
|
||
current_group = [p]
|
||
|
||
if len(current_group) > 1:
|
||
groups.append(current_group)
|
||
|
||
for group in groups:
|
||
cue_indices = [p.get("ad_cue_index") for p in group]
|
||
pause_point = group[0].get("pause_point")
|
||
first_resume = group[0].get("resume_from")
|
||
last_resume = group[-1].get("resume_from")
|
||
logger.info(
|
||
f"Consolidated group: cues {cue_indices} at pause_point={pause_point:.2f}s, "
|
||
f"first_resume_from={first_resume:.2f}s (should equal pause_point), "
|
||
f"last_resume_from={last_resume:.2f}s (back buffer)"
|
||
)
|
||
|
||
def _find_gap_type(self, timestamp: float, gaps: list[SpeechGap]) -> str:
|
||
"""Find the gap type for a given timestamp."""
|
||
for gap in gaps:
|
||
# Check if timestamp falls within this gap (between end of prev word and start of next)
|
||
if gap.start <= timestamp <= gap.end:
|
||
return gap.gap_type
|
||
return "unknown"
|
||
|
||
|
||
# Global service instance (lazy-loaded)
|
||
whisper_service = WhisperService()
|