Add dynamic ElevenLabs voice catalog with provider toggle in the UI, allowing users to browse ElevenLabs voices, configure stability and similarity boost settings, and preview/synthesize with ElevenLabs TTS. Backend: - New elevenlabs_voices.py service with 1-hour cached API fetching - TTS routes support ?provider= query param for voices and options - Preview endpoint routes to ElevenLabs or Gemini based on provider - stability/similarity_boost params flow through TTS synthesis pipeline - TTSPreferences model extended with ElevenLabs-specific fields - Deprecated hardcoded elevenlabs_voices config (now fetched dynamically) Frontend: - Provider toggle (Gemini/ElevenLabs) in VoiceSelector - ElevenLabsSettingsPanel with stability and similarity boost sliders - VoicePreviewButton supports provider-specific preview parameters - API client passes provider param to voices, options, and preview endpoints - New VoiceInfo, ProviderVoicesResponse, ProviderOptionsResponse types Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
486 lines
18 KiB
Python
486 lines
18 KiB
Python
import io
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
import aiohttp
|
|
from google.cloud import texttospeech
|
|
from pydub import AudioSegment
|
|
|
|
from ..core.config import settings
|
|
from ..core.logging import get_logger
|
|
from .gemini_tts import gemini_tts_service
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class TTSCueSegment:
|
|
"""Represents a synthesized audio segment for a single AD cue."""
|
|
cue_index: int
|
|
start_time: float # Original VTT start time
|
|
end_time: float # Original VTT end time
|
|
duration: float # Actual TTS audio duration in seconds
|
|
text: str # The AD text that was synthesized
|
|
audio_bytes: bytes # The raw MP3 audio bytes
|
|
|
|
|
|
class TTSService:
|
|
def __init__(self):
|
|
# Check Gemini TTS availability (uses same API key as other Gemini services)
|
|
self.gemini_available = bool(settings.gemini_api_key)
|
|
|
|
# Initialize Google Cloud TTS (uses GOOGLE_APPLICATION_CREDENTIALS env var)
|
|
try:
|
|
self.google_client = texttospeech.TextToSpeechClient()
|
|
logger.info("Google Cloud TTS client initialized successfully")
|
|
except Exception as e:
|
|
logger.warning(f"Google Cloud TTS credentials not configured: {e}")
|
|
self.google_client = None
|
|
|
|
# Check ElevenLabs availability
|
|
self.elevenlabs_available = bool(settings.elevenlabs_api_key)
|
|
|
|
# Log configured provider
|
|
logger.info(f"TTS provider configured: {settings.tts_provider}")
|
|
|
|
async def synthesize_audio_description(
|
|
self,
|
|
ad_vtt_content: str,
|
|
language_code: str = "en-US",
|
|
voice_name: Optional[str] = None,
|
|
provider: Optional[str] = None,
|
|
model: str = "flash",
|
|
speed: float = 1.0,
|
|
style_prompt: str = "",
|
|
stability: float = 0.5,
|
|
similarity_boost: float = 0.5,
|
|
) -> bytes:
|
|
"""
|
|
Generate MP3 audio from audio description VTT content.
|
|
Synthesizes each cue separately and stitches them together with timing.
|
|
|
|
Provider priority: specified provider > settings.tts_provider > fallback chain
|
|
Fallback chain: Gemini -> Google Cloud TTS -> ElevenLabs
|
|
|
|
Args:
|
|
ad_vtt_content: VTT content with audio description cues
|
|
language_code: Language code (e.g., "en-US")
|
|
voice_name: Voice name/ID for the provider
|
|
provider: TTS provider ("gemini", "google", "elevenlabs")
|
|
model: Gemini model variant - "flash" or "pro" (Gemini only)
|
|
speed: Speech rate multiplier 0.5-2.0 (Gemini only)
|
|
style_prompt: Style instructions (Gemini only)
|
|
"""
|
|
# Determine which provider to use
|
|
active_provider = provider or settings.tts_provider
|
|
|
|
# Extract simple language code for Gemini (e.g., "en-US" -> "en")
|
|
simple_lang = language_code.split("-")[0] if "-" in language_code else language_code
|
|
|
|
# Try the configured provider first, then fallback
|
|
if active_provider == "gemini" and self.gemini_available:
|
|
try:
|
|
logger.info(
|
|
f"Using Gemini TTS for language: {simple_lang}, voice: {voice_name}, "
|
|
f"model: {model}, speed: {speed}x"
|
|
)
|
|
return await gemini_tts_service.synthesize_audio_description(
|
|
ad_vtt_content,
|
|
simple_lang,
|
|
voice_name,
|
|
model=model,
|
|
speed=speed,
|
|
style_prompt=style_prompt
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Gemini TTS failed, falling back: {e}")
|
|
# Fall through to Google/ElevenLabs
|
|
|
|
if active_provider == "google" or (active_provider == "gemini" and self.google_client):
|
|
try:
|
|
if self.google_client:
|
|
logger.info(f"Using Google Cloud TTS for language: {language_code}")
|
|
return await self._synthesize_with_google(ad_vtt_content, language_code, voice_name)
|
|
except Exception as e:
|
|
logger.warning(f"Google Cloud TTS failed: {e}")
|
|
|
|
if self.elevenlabs_available:
|
|
logger.info(f"Using ElevenLabs TTS for language: {language_code}")
|
|
return await self._synthesize_with_elevenlabs(
|
|
ad_vtt_content, language_code, voice_name,
|
|
stability=stability, similarity_boost=similarity_boost,
|
|
)
|
|
|
|
raise ValueError("No TTS service available")
|
|
|
|
async def synthesize_audio_description_with_segments(
|
|
self,
|
|
ad_vtt_content: str,
|
|
language_code: str = "en-US",
|
|
voice_name: Optional[str] = None,
|
|
provider: Optional[str] = None,
|
|
model: str = "flash",
|
|
speed: float = 1.0,
|
|
style_prompt: str = "",
|
|
stability: float = 0.5,
|
|
similarity_boost: float = 0.5,
|
|
) -> tuple[bytes, list[TTSCueSegment]]:
|
|
"""
|
|
Generate MP3 audio from audio description VTT content AND return individual segments.
|
|
Used for accessible video generation where we need per-cue audio files.
|
|
|
|
Returns:
|
|
Tuple of (combined_mp3_bytes, list_of_cue_segments)
|
|
"""
|
|
# Determine which provider to use
|
|
active_provider = provider or settings.tts_provider
|
|
|
|
# Extract simple language code for Gemini (e.g., "en-US" -> "en")
|
|
simple_lang = language_code.split("-")[0] if "-" in language_code else language_code
|
|
|
|
# Parse VTT cues first
|
|
cues = self._parse_ad_cues(ad_vtt_content)
|
|
if not cues:
|
|
raise ValueError("No audio description cues found")
|
|
|
|
# Synthesize each cue individually
|
|
segments: list[TTSCueSegment] = []
|
|
audio_segments_for_combine = []
|
|
current_audio_position = 0.0
|
|
|
|
for i, cue in enumerate(cues):
|
|
target_start_time = cue["start_time"]
|
|
|
|
# Add silence to reach the exact VTT start time
|
|
if target_start_time > current_audio_position:
|
|
silence_duration = target_start_time - current_audio_position
|
|
silence = AudioSegment.silent(duration=int(silence_duration * 1000))
|
|
audio_segments_for_combine.append(silence)
|
|
current_audio_position = target_start_time
|
|
|
|
text = cue["text"].strip()
|
|
if text:
|
|
# Ensure proper punctuation for natural TTS flow
|
|
if not text.endswith(('.', '!', '?')):
|
|
text += "."
|
|
|
|
# Synthesize with the appropriate provider
|
|
try:
|
|
if active_provider == "gemini" and self.gemini_available:
|
|
audio_data = await gemini_tts_service.synthesize_text(
|
|
text, voice_name or gemini_tts_service.default_voice,
|
|
simple_lang, model=model, speed=speed, style_prompt=style_prompt
|
|
)
|
|
elif self.google_client:
|
|
audio_data = await self._synthesize_text_google(text, language_code, voice_name)
|
|
elif self.elevenlabs_available:
|
|
voice_id = self._get_elevenlabs_voice(language_code, voice_name)
|
|
audio_data = await self._synthesize_text_elevenlabs(
|
|
text, voice_id,
|
|
stability=stability, similarity_boost=similarity_boost,
|
|
)
|
|
else:
|
|
raise ValueError("No TTS service available")
|
|
|
|
# Get actual duration from audio
|
|
audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3")
|
|
actual_duration = len(audio_segment) / 1000.0
|
|
|
|
# Store segment info
|
|
segments.append(TTSCueSegment(
|
|
cue_index=i,
|
|
start_time=cue["start_time"],
|
|
end_time=cue["end_time"],
|
|
duration=actual_duration,
|
|
text=cue["text"],
|
|
audio_bytes=audio_data
|
|
))
|
|
|
|
# Add to combined audio
|
|
audio_segments_for_combine.append(audio_segment)
|
|
current_audio_position += actual_duration
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to synthesize cue {i}: {e}")
|
|
# Add silence for failed cue
|
|
cue_duration = cue["end_time"] - cue["start_time"]
|
|
silence = AudioSegment.silent(duration=int(cue_duration * 1000))
|
|
audio_segments_for_combine.append(silence)
|
|
current_audio_position += cue_duration
|
|
|
|
# Combine all segments
|
|
if audio_segments_for_combine:
|
|
final_audio = sum(audio_segments_for_combine, AudioSegment.empty())
|
|
else:
|
|
final_audio = AudioSegment.silent(duration=1000)
|
|
|
|
# Export combined to MP3
|
|
output_buffer = io.BytesIO()
|
|
final_audio.export(output_buffer, format="mp3", bitrate="128k")
|
|
|
|
logger.info(f"Synthesized {len(segments)} AD cue segments")
|
|
return output_buffer.getvalue(), segments
|
|
|
|
async def _synthesize_with_google(
|
|
self,
|
|
ad_vtt_content: str,
|
|
language_code: str = "en-US",
|
|
voice_name: Optional[str] = None
|
|
) -> bytes:
|
|
"""Generate MP3 using Google TTS with 2-second pauses between passages"""
|
|
|
|
# Parse VTT cues
|
|
cues = self._parse_ad_cues(ad_vtt_content)
|
|
|
|
if not cues:
|
|
raise ValueError("No audio description cues found")
|
|
|
|
# Synthesize each cue separately with precise timing anchoring
|
|
audio_segments = []
|
|
current_audio_position = 0.0 # Track actual audio timeline position
|
|
|
|
for i, cue in enumerate(cues):
|
|
# Calculate where this cue should start (anchored to VTT timing)
|
|
target_start_time = cue["start_time"]
|
|
|
|
# Add silence to reach the exact VTT start time
|
|
if target_start_time > current_audio_position:
|
|
silence_duration = target_start_time - current_audio_position
|
|
silence = AudioSegment.silent(duration=int(silence_duration * 1000))
|
|
audio_segments.append(silence)
|
|
current_audio_position = target_start_time
|
|
|
|
# Synthesize this cue's text
|
|
text = cue["text"].strip()
|
|
if text:
|
|
# Ensure proper punctuation for natural TTS flow
|
|
if not text.endswith(('.', '!', '?')):
|
|
text += "."
|
|
|
|
# Synthesize this individual cue
|
|
audio_data = await self._synthesize_text_google(
|
|
text, language_code, voice_name
|
|
)
|
|
|
|
# Convert to AudioSegment and get actual duration
|
|
audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3")
|
|
audio_segments.append(audio_segment)
|
|
|
|
# Update current position based on actual audio duration (not VTT end time)
|
|
actual_audio_duration = len(audio_segment) / 1000.0 # Convert ms to seconds
|
|
current_audio_position += actual_audio_duration
|
|
|
|
# Combine all segments
|
|
if audio_segments:
|
|
final_audio = sum(audio_segments, AudioSegment.empty())
|
|
else:
|
|
# Fallback to empty audio if no segments
|
|
final_audio = AudioSegment.silent(duration=1000)
|
|
|
|
# Export to MP3
|
|
output_buffer = io.BytesIO()
|
|
final_audio.export(output_buffer, format="mp3", bitrate="128k")
|
|
|
|
return output_buffer.getvalue()
|
|
|
|
async def _synthesize_with_elevenlabs(
|
|
self,
|
|
ad_vtt_content: str,
|
|
language_code: str = "en-US",
|
|
voice_name: Optional[str] = None,
|
|
stability: float = 0.5,
|
|
similarity_boost: float = 0.5,
|
|
) -> bytes:
|
|
"""Generate MP3 using ElevenLabs TTS"""
|
|
# Parse VTT cues
|
|
cues = self._parse_ad_cues(ad_vtt_content)
|
|
|
|
if not cues:
|
|
raise ValueError("No audio description cues found")
|
|
|
|
# Get voice ID for language
|
|
voice_id = self._get_elevenlabs_voice(language_code, voice_name)
|
|
|
|
# Synthesize each cue with precise timing anchoring
|
|
audio_segments = []
|
|
current_audio_position = 0.0 # Track actual audio timeline position
|
|
|
|
for i, cue in enumerate(cues):
|
|
# Calculate where this cue should start (anchored to VTT timing)
|
|
target_start_time = cue["start_time"]
|
|
|
|
# Add silence to reach the exact VTT start time
|
|
if target_start_time > current_audio_position:
|
|
silence_duration = target_start_time - current_audio_position
|
|
silence = AudioSegment.silent(duration=int(silence_duration * 1000))
|
|
audio_segments.append(silence)
|
|
current_audio_position = target_start_time
|
|
|
|
# Synthesize this cue with ElevenLabs
|
|
text = cue["text"].strip()
|
|
if text:
|
|
audio_data = await self._synthesize_text_elevenlabs(
|
|
text, voice_id,
|
|
stability=stability, similarity_boost=similarity_boost,
|
|
)
|
|
|
|
# Convert to AudioSegment and get actual duration
|
|
audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3")
|
|
audio_segments.append(audio_segment)
|
|
|
|
# Update current position based on actual audio duration (not VTT end time)
|
|
actual_audio_duration = len(audio_segment) / 1000.0 # Convert ms to seconds
|
|
current_audio_position += actual_audio_duration
|
|
|
|
# Combine all segments
|
|
final_audio = sum(audio_segments, AudioSegment.empty())
|
|
|
|
# Export to MP3
|
|
output_buffer = io.BytesIO()
|
|
final_audio.export(output_buffer, format="mp3", bitrate="128k")
|
|
|
|
return output_buffer.getvalue()
|
|
|
|
async def _synthesize_text_google(
|
|
self,
|
|
text: str,
|
|
language_code: str,
|
|
voice_name: Optional[str] = None
|
|
) -> bytes:
|
|
"""Synthesize a single text string to audio using Google TTS"""
|
|
# Configure voice
|
|
if not voice_name:
|
|
voice_name = settings.google_tts_voices.get(language_code, "en-US-Neural2-D")
|
|
|
|
voice = texttospeech.VoiceSelectionParams(
|
|
language_code=language_code,
|
|
name=voice_name
|
|
)
|
|
|
|
# Configure audio
|
|
audio_config = texttospeech.AudioConfig(
|
|
audio_encoding=texttospeech.AudioEncoding.MP3,
|
|
speaking_rate=1.2, # Faster cadence for better flow
|
|
pitch=0.0
|
|
)
|
|
|
|
# Synthesize
|
|
synthesis_input = texttospeech.SynthesisInput(text=text)
|
|
|
|
response = self.google_client.synthesize_speech(
|
|
input=synthesis_input,
|
|
voice=voice,
|
|
audio_config=audio_config
|
|
)
|
|
|
|
return response.audio_content
|
|
|
|
async def _synthesize_text_elevenlabs(
|
|
self,
|
|
text: str,
|
|
voice_id: str,
|
|
stability: float = 0.5,
|
|
similarity_boost: float = 0.5,
|
|
) -> bytes:
|
|
"""Synthesize text using ElevenLabs API"""
|
|
url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}"
|
|
|
|
headers = {
|
|
"Accept": "audio/mpeg",
|
|
"Content-Type": "application/json",
|
|
"xi-api-key": settings.elevenlabs_api_key
|
|
}
|
|
|
|
data = {
|
|
"text": text,
|
|
"model_id": "eleven_multilingual_v2",
|
|
"voice_settings": {
|
|
"stability": stability,
|
|
"similarity_boost": similarity_boost,
|
|
"style": 0.0,
|
|
"use_speaker_boost": True
|
|
}
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(url, json=data, headers=headers) as response:
|
|
if response.status == 200:
|
|
return await response.read()
|
|
else:
|
|
error_text = await response.text()
|
|
raise ValueError(f"ElevenLabs TTS failed: {response.status} - {error_text}")
|
|
|
|
def _get_elevenlabs_voice(self, language_code: str, voice_name: Optional[str] = None) -> str:
|
|
"""Get ElevenLabs voice ID for language"""
|
|
if voice_name:
|
|
return voice_name
|
|
|
|
return settings.elevenlabs_voices.get(language_code, "21m00Tcm4TlvDq8ikWAM")
|
|
|
|
def _parse_ad_cues(self, vtt_content: str) -> list[dict]:
|
|
"""Parse audio description VTT and extract timing + text"""
|
|
lines = vtt_content.strip().split('\n')
|
|
cues = []
|
|
|
|
i = 0
|
|
while i < len(lines):
|
|
line = lines[i].strip()
|
|
|
|
# Skip header and empty lines
|
|
if line == "WEBVTT" or line == "" or line.startswith("NOTE"):
|
|
i += 1
|
|
continue
|
|
|
|
# Check for timing line
|
|
if " --> " in line:
|
|
timing_parts = line.split(" --> ")
|
|
start_time = self._parse_timestamp(timing_parts[0].strip())
|
|
end_time = self._parse_timestamp(timing_parts[1].strip())
|
|
|
|
# Get text from next line(s)
|
|
i += 1
|
|
text_lines = []
|
|
while i < len(lines) and lines[i].strip() != "":
|
|
text_lines.append(lines[i].strip())
|
|
i += 1
|
|
|
|
if text_lines:
|
|
cues.append({
|
|
"start_time": start_time,
|
|
"end_time": end_time,
|
|
"text": " ".join(text_lines)
|
|
})
|
|
else:
|
|
i += 1
|
|
|
|
return cues
|
|
|
|
def _parse_timestamp(self, timestamp: str) -> float:
|
|
"""Convert VTT timestamp to seconds"""
|
|
# Format: HH:MM:SS.mmm or MM:SS.mmm
|
|
parts = timestamp.split(":")
|
|
|
|
if len(parts) == 3: # HH:MM:SS.mmm
|
|
hours, minutes, seconds = parts
|
|
elif len(parts) == 2: # MM:SS.mmm
|
|
hours, minutes, seconds = "0", parts[0], parts[1]
|
|
else:
|
|
raise ValueError(f"Invalid timestamp format: {timestamp}")
|
|
|
|
# Parse seconds and milliseconds
|
|
sec_parts = seconds.split(".")
|
|
seconds = int(sec_parts[0])
|
|
milliseconds = int(sec_parts[1]) if len(sec_parts) > 1 else 0
|
|
|
|
total_seconds = (
|
|
int(hours) * 3600 +
|
|
int(minutes) * 60 +
|
|
seconds +
|
|
milliseconds / 1000.0
|
|
)
|
|
|
|
return total_seconds
|
|
|
|
|
|
# Global service instance
|
|
tts_service = TTSService()
|