video-accessibility/backend/app/services/tts.py
Vadym Samoilenko 1e177a6d5c feat: add ElevenLabs voice selection to frontend and backend
Add dynamic ElevenLabs voice catalog with provider toggle in the UI,
allowing users to browse ElevenLabs voices, configure stability and
similarity boost settings, and preview/synthesize with ElevenLabs TTS.

Backend:
- New elevenlabs_voices.py service with 1-hour cached API fetching
- TTS routes support ?provider= query param for voices and options
- Preview endpoint routes to ElevenLabs or Gemini based on provider
- stability/similarity_boost params flow through TTS synthesis pipeline
- TTSPreferences model extended with ElevenLabs-specific fields
- Deprecated hardcoded elevenlabs_voices config (now fetched dynamically)

Frontend:
- Provider toggle (Gemini/ElevenLabs) in VoiceSelector
- ElevenLabsSettingsPanel with stability and similarity boost sliders
- VoicePreviewButton supports provider-specific preview parameters
- API client passes provider param to voices, options, and preview endpoints
- New VoiceInfo, ProviderVoicesResponse, ProviderOptionsResponse types

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-03 13:58:56 +00:00

486 lines
18 KiB
Python

import io
from dataclasses import dataclass
from typing import Optional
import aiohttp
from google.cloud import texttospeech
from pydub import AudioSegment
from ..core.config import settings
from ..core.logging import get_logger
from .gemini_tts import gemini_tts_service
logger = get_logger(__name__)
@dataclass
class TTSCueSegment:
"""Represents a synthesized audio segment for a single AD cue."""
cue_index: int
start_time: float # Original VTT start time
end_time: float # Original VTT end time
duration: float # Actual TTS audio duration in seconds
text: str # The AD text that was synthesized
audio_bytes: bytes # The raw MP3 audio bytes
class TTSService:
def __init__(self):
# Check Gemini TTS availability (uses same API key as other Gemini services)
self.gemini_available = bool(settings.gemini_api_key)
# Initialize Google Cloud TTS (uses GOOGLE_APPLICATION_CREDENTIALS env var)
try:
self.google_client = texttospeech.TextToSpeechClient()
logger.info("Google Cloud TTS client initialized successfully")
except Exception as e:
logger.warning(f"Google Cloud TTS credentials not configured: {e}")
self.google_client = None
# Check ElevenLabs availability
self.elevenlabs_available = bool(settings.elevenlabs_api_key)
# Log configured provider
logger.info(f"TTS provider configured: {settings.tts_provider}")
async def synthesize_audio_description(
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None,
provider: Optional[str] = None,
model: str = "flash",
speed: float = 1.0,
style_prompt: str = "",
stability: float = 0.5,
similarity_boost: float = 0.5,
) -> bytes:
"""
Generate MP3 audio from audio description VTT content.
Synthesizes each cue separately and stitches them together with timing.
Provider priority: specified provider > settings.tts_provider > fallback chain
Fallback chain: Gemini -> Google Cloud TTS -> ElevenLabs
Args:
ad_vtt_content: VTT content with audio description cues
language_code: Language code (e.g., "en-US")
voice_name: Voice name/ID for the provider
provider: TTS provider ("gemini", "google", "elevenlabs")
model: Gemini model variant - "flash" or "pro" (Gemini only)
speed: Speech rate multiplier 0.5-2.0 (Gemini only)
style_prompt: Style instructions (Gemini only)
"""
# Determine which provider to use
active_provider = provider or settings.tts_provider
# Extract simple language code for Gemini (e.g., "en-US" -> "en")
simple_lang = language_code.split("-")[0] if "-" in language_code else language_code
# Try the configured provider first, then fallback
if active_provider == "gemini" and self.gemini_available:
try:
logger.info(
f"Using Gemini TTS for language: {simple_lang}, voice: {voice_name}, "
f"model: {model}, speed: {speed}x"
)
return await gemini_tts_service.synthesize_audio_description(
ad_vtt_content,
simple_lang,
voice_name,
model=model,
speed=speed,
style_prompt=style_prompt
)
except Exception as e:
logger.warning(f"Gemini TTS failed, falling back: {e}")
# Fall through to Google/ElevenLabs
if active_provider == "google" or (active_provider == "gemini" and self.google_client):
try:
if self.google_client:
logger.info(f"Using Google Cloud TTS for language: {language_code}")
return await self._synthesize_with_google(ad_vtt_content, language_code, voice_name)
except Exception as e:
logger.warning(f"Google Cloud TTS failed: {e}")
if self.elevenlabs_available:
logger.info(f"Using ElevenLabs TTS for language: {language_code}")
return await self._synthesize_with_elevenlabs(
ad_vtt_content, language_code, voice_name,
stability=stability, similarity_boost=similarity_boost,
)
raise ValueError("No TTS service available")
async def synthesize_audio_description_with_segments(
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None,
provider: Optional[str] = None,
model: str = "flash",
speed: float = 1.0,
style_prompt: str = "",
stability: float = 0.5,
similarity_boost: float = 0.5,
) -> tuple[bytes, list[TTSCueSegment]]:
"""
Generate MP3 audio from audio description VTT content AND return individual segments.
Used for accessible video generation where we need per-cue audio files.
Returns:
Tuple of (combined_mp3_bytes, list_of_cue_segments)
"""
# Determine which provider to use
active_provider = provider or settings.tts_provider
# Extract simple language code for Gemini (e.g., "en-US" -> "en")
simple_lang = language_code.split("-")[0] if "-" in language_code else language_code
# Parse VTT cues first
cues = self._parse_ad_cues(ad_vtt_content)
if not cues:
raise ValueError("No audio description cues found")
# Synthesize each cue individually
segments: list[TTSCueSegment] = []
audio_segments_for_combine = []
current_audio_position = 0.0
for i, cue in enumerate(cues):
target_start_time = cue["start_time"]
# Add silence to reach the exact VTT start time
if target_start_time > current_audio_position:
silence_duration = target_start_time - current_audio_position
silence = AudioSegment.silent(duration=int(silence_duration * 1000))
audio_segments_for_combine.append(silence)
current_audio_position = target_start_time
text = cue["text"].strip()
if text:
# Ensure proper punctuation for natural TTS flow
if not text.endswith(('.', '!', '?')):
text += "."
# Synthesize with the appropriate provider
try:
if active_provider == "gemini" and self.gemini_available:
audio_data = await gemini_tts_service.synthesize_text(
text, voice_name or gemini_tts_service.default_voice,
simple_lang, model=model, speed=speed, style_prompt=style_prompt
)
elif self.google_client:
audio_data = await self._synthesize_text_google(text, language_code, voice_name)
elif self.elevenlabs_available:
voice_id = self._get_elevenlabs_voice(language_code, voice_name)
audio_data = await self._synthesize_text_elevenlabs(
text, voice_id,
stability=stability, similarity_boost=similarity_boost,
)
else:
raise ValueError("No TTS service available")
# Get actual duration from audio
audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3")
actual_duration = len(audio_segment) / 1000.0
# Store segment info
segments.append(TTSCueSegment(
cue_index=i,
start_time=cue["start_time"],
end_time=cue["end_time"],
duration=actual_duration,
text=cue["text"],
audio_bytes=audio_data
))
# Add to combined audio
audio_segments_for_combine.append(audio_segment)
current_audio_position += actual_duration
except Exception as e:
logger.warning(f"Failed to synthesize cue {i}: {e}")
# Add silence for failed cue
cue_duration = cue["end_time"] - cue["start_time"]
silence = AudioSegment.silent(duration=int(cue_duration * 1000))
audio_segments_for_combine.append(silence)
current_audio_position += cue_duration
# Combine all segments
if audio_segments_for_combine:
final_audio = sum(audio_segments_for_combine, AudioSegment.empty())
else:
final_audio = AudioSegment.silent(duration=1000)
# Export combined to MP3
output_buffer = io.BytesIO()
final_audio.export(output_buffer, format="mp3", bitrate="128k")
logger.info(f"Synthesized {len(segments)} AD cue segments")
return output_buffer.getvalue(), segments
async def _synthesize_with_google(
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None
) -> bytes:
"""Generate MP3 using Google TTS with 2-second pauses between passages"""
# Parse VTT cues
cues = self._parse_ad_cues(ad_vtt_content)
if not cues:
raise ValueError("No audio description cues found")
# Synthesize each cue separately with precise timing anchoring
audio_segments = []
current_audio_position = 0.0 # Track actual audio timeline position
for i, cue in enumerate(cues):
# Calculate where this cue should start (anchored to VTT timing)
target_start_time = cue["start_time"]
# Add silence to reach the exact VTT start time
if target_start_time > current_audio_position:
silence_duration = target_start_time - current_audio_position
silence = AudioSegment.silent(duration=int(silence_duration * 1000))
audio_segments.append(silence)
current_audio_position = target_start_time
# Synthesize this cue's text
text = cue["text"].strip()
if text:
# Ensure proper punctuation for natural TTS flow
if not text.endswith(('.', '!', '?')):
text += "."
# Synthesize this individual cue
audio_data = await self._synthesize_text_google(
text, language_code, voice_name
)
# Convert to AudioSegment and get actual duration
audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3")
audio_segments.append(audio_segment)
# Update current position based on actual audio duration (not VTT end time)
actual_audio_duration = len(audio_segment) / 1000.0 # Convert ms to seconds
current_audio_position += actual_audio_duration
# Combine all segments
if audio_segments:
final_audio = sum(audio_segments, AudioSegment.empty())
else:
# Fallback to empty audio if no segments
final_audio = AudioSegment.silent(duration=1000)
# Export to MP3
output_buffer = io.BytesIO()
final_audio.export(output_buffer, format="mp3", bitrate="128k")
return output_buffer.getvalue()
async def _synthesize_with_elevenlabs(
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None,
stability: float = 0.5,
similarity_boost: float = 0.5,
) -> bytes:
"""Generate MP3 using ElevenLabs TTS"""
# Parse VTT cues
cues = self._parse_ad_cues(ad_vtt_content)
if not cues:
raise ValueError("No audio description cues found")
# Get voice ID for language
voice_id = self._get_elevenlabs_voice(language_code, voice_name)
# Synthesize each cue with precise timing anchoring
audio_segments = []
current_audio_position = 0.0 # Track actual audio timeline position
for i, cue in enumerate(cues):
# Calculate where this cue should start (anchored to VTT timing)
target_start_time = cue["start_time"]
# Add silence to reach the exact VTT start time
if target_start_time > current_audio_position:
silence_duration = target_start_time - current_audio_position
silence = AudioSegment.silent(duration=int(silence_duration * 1000))
audio_segments.append(silence)
current_audio_position = target_start_time
# Synthesize this cue with ElevenLabs
text = cue["text"].strip()
if text:
audio_data = await self._synthesize_text_elevenlabs(
text, voice_id,
stability=stability, similarity_boost=similarity_boost,
)
# Convert to AudioSegment and get actual duration
audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3")
audio_segments.append(audio_segment)
# Update current position based on actual audio duration (not VTT end time)
actual_audio_duration = len(audio_segment) / 1000.0 # Convert ms to seconds
current_audio_position += actual_audio_duration
# Combine all segments
final_audio = sum(audio_segments, AudioSegment.empty())
# Export to MP3
output_buffer = io.BytesIO()
final_audio.export(output_buffer, format="mp3", bitrate="128k")
return output_buffer.getvalue()
async def _synthesize_text_google(
self,
text: str,
language_code: str,
voice_name: Optional[str] = None
) -> bytes:
"""Synthesize a single text string to audio using Google TTS"""
# Configure voice
if not voice_name:
voice_name = settings.google_tts_voices.get(language_code, "en-US-Neural2-D")
voice = texttospeech.VoiceSelectionParams(
language_code=language_code,
name=voice_name
)
# Configure audio
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3,
speaking_rate=1.2, # Faster cadence for better flow
pitch=0.0
)
# Synthesize
synthesis_input = texttospeech.SynthesisInput(text=text)
response = self.google_client.synthesize_speech(
input=synthesis_input,
voice=voice,
audio_config=audio_config
)
return response.audio_content
async def _synthesize_text_elevenlabs(
self,
text: str,
voice_id: str,
stability: float = 0.5,
similarity_boost: float = 0.5,
) -> bytes:
"""Synthesize text using ElevenLabs API"""
url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}"
headers = {
"Accept": "audio/mpeg",
"Content-Type": "application/json",
"xi-api-key": settings.elevenlabs_api_key
}
data = {
"text": text,
"model_id": "eleven_multilingual_v2",
"voice_settings": {
"stability": stability,
"similarity_boost": similarity_boost,
"style": 0.0,
"use_speaker_boost": True
}
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data, headers=headers) as response:
if response.status == 200:
return await response.read()
else:
error_text = await response.text()
raise ValueError(f"ElevenLabs TTS failed: {response.status} - {error_text}")
def _get_elevenlabs_voice(self, language_code: str, voice_name: Optional[str] = None) -> str:
"""Get ElevenLabs voice ID for language"""
if voice_name:
return voice_name
return settings.elevenlabs_voices.get(language_code, "21m00Tcm4TlvDq8ikWAM")
def _parse_ad_cues(self, vtt_content: str) -> list[dict]:
"""Parse audio description VTT and extract timing + text"""
lines = vtt_content.strip().split('\n')
cues = []
i = 0
while i < len(lines):
line = lines[i].strip()
# Skip header and empty lines
if line == "WEBVTT" or line == "" or line.startswith("NOTE"):
i += 1
continue
# Check for timing line
if " --> " in line:
timing_parts = line.split(" --> ")
start_time = self._parse_timestamp(timing_parts[0].strip())
end_time = self._parse_timestamp(timing_parts[1].strip())
# Get text from next line(s)
i += 1
text_lines = []
while i < len(lines) and lines[i].strip() != "":
text_lines.append(lines[i].strip())
i += 1
if text_lines:
cues.append({
"start_time": start_time,
"end_time": end_time,
"text": " ".join(text_lines)
})
else:
i += 1
return cues
def _parse_timestamp(self, timestamp: str) -> float:
"""Convert VTT timestamp to seconds"""
# Format: HH:MM:SS.mmm or MM:SS.mmm
parts = timestamp.split(":")
if len(parts) == 3: # HH:MM:SS.mmm
hours, minutes, seconds = parts
elif len(parts) == 2: # MM:SS.mmm
hours, minutes, seconds = "0", parts[0], parts[1]
else:
raise ValueError(f"Invalid timestamp format: {timestamp}")
# Parse seconds and milliseconds
sec_parts = seconds.split(".")
seconds = int(sec_parts[0])
milliseconds = int(sec_parts[1]) if len(sec_parts) > 1 else 0
total_seconds = (
int(hours) * 3600 +
int(minutes) * 60 +
seconds +
milliseconds / 1000.0
)
return total_seconds
# Global service instance
tts_service = TTSService()