video-accessibility/backend/app/services/tts.py
michael 865fcdc246 feat: add TTS settings panel with model, speed, and style options
- Add model selection (flash vs pro) for quality control
- Add speed slider (0.5x - 2.0x) for pacing adjustment
- Add style presets (neutral, calm, energetic, professional, warm, documentary)
- Add custom style prompt option for advanced customization
- New /tts/options endpoint returns available TTS options
- Voice preview now tests all settings so users hear exact output
- Backward compatible: all new fields have sensible defaults

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-22 15:22:14 -06:00

350 lines
13 KiB
Python

import io
from typing import Optional
import aiohttp
from google.cloud import texttospeech
from pydub import AudioSegment
from ..core.config import settings
from ..core.logging import get_logger
from .gemini_tts import gemini_tts_service
logger = get_logger(__name__)
class TTSService:
def __init__(self):
# Check Gemini TTS availability (uses same API key as other Gemini services)
self.gemini_available = bool(settings.gemini_api_key)
# Initialize Google Cloud TTS (uses GOOGLE_APPLICATION_CREDENTIALS env var)
try:
self.google_client = texttospeech.TextToSpeechClient()
logger.info("Google Cloud TTS client initialized successfully")
except Exception as e:
logger.warning(f"Google Cloud TTS credentials not configured: {e}")
self.google_client = None
# Check ElevenLabs availability
self.elevenlabs_available = bool(settings.elevenlabs_api_key)
# Log configured provider
logger.info(f"TTS provider configured: {settings.tts_provider}")
async def synthesize_audio_description(
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None,
provider: Optional[str] = None,
model: str = "flash",
speed: float = 1.0,
style_prompt: str = ""
) -> bytes:
"""
Generate MP3 audio from audio description VTT content.
Synthesizes each cue separately and stitches them together with timing.
Provider priority: specified provider > settings.tts_provider > fallback chain
Fallback chain: Gemini -> Google Cloud TTS -> ElevenLabs
Args:
ad_vtt_content: VTT content with audio description cues
language_code: Language code (e.g., "en-US")
voice_name: Voice name/ID for the provider
provider: TTS provider ("gemini", "google", "elevenlabs")
model: Gemini model variant - "flash" or "pro" (Gemini only)
speed: Speech rate multiplier 0.5-2.0 (Gemini only)
style_prompt: Style instructions (Gemini only)
"""
# Determine which provider to use
active_provider = provider or settings.tts_provider
# Extract simple language code for Gemini (e.g., "en-US" -> "en")
simple_lang = language_code.split("-")[0] if "-" in language_code else language_code
# Try the configured provider first, then fallback
if active_provider == "gemini" and self.gemini_available:
try:
logger.info(
f"Using Gemini TTS for language: {simple_lang}, voice: {voice_name}, "
f"model: {model}, speed: {speed}x"
)
return await gemini_tts_service.synthesize_audio_description(
ad_vtt_content,
simple_lang,
voice_name,
model=model,
speed=speed,
style_prompt=style_prompt
)
except Exception as e:
logger.warning(f"Gemini TTS failed, falling back: {e}")
# Fall through to Google/ElevenLabs
if active_provider == "google" or (active_provider == "gemini" and self.google_client):
try:
if self.google_client:
logger.info(f"Using Google Cloud TTS for language: {language_code}")
return await self._synthesize_with_google(ad_vtt_content, language_code, voice_name)
except Exception as e:
logger.warning(f"Google Cloud TTS failed: {e}")
if self.elevenlabs_available:
logger.info(f"Using ElevenLabs TTS for language: {language_code}")
return await self._synthesize_with_elevenlabs(ad_vtt_content, language_code, voice_name)
raise ValueError("No TTS service available")
async def _synthesize_with_google(
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None
) -> bytes:
"""Generate MP3 using Google TTS with 2-second pauses between passages"""
# Parse VTT cues
cues = self._parse_ad_cues(ad_vtt_content)
if not cues:
raise ValueError("No audio description cues found")
# Synthesize each cue separately with precise timing anchoring
audio_segments = []
current_audio_position = 0.0 # Track actual audio timeline position
for i, cue in enumerate(cues):
# Calculate where this cue should start (anchored to VTT timing)
target_start_time = cue["start_time"]
# Add silence to reach the exact VTT start time
if target_start_time > current_audio_position:
silence_duration = target_start_time - current_audio_position
silence = AudioSegment.silent(duration=int(silence_duration * 1000))
audio_segments.append(silence)
current_audio_position = target_start_time
# Synthesize this cue's text
text = cue["text"].strip()
if text:
# Ensure proper punctuation for natural TTS flow
if not text.endswith(('.', '!', '?')):
text += "."
# Synthesize this individual cue
audio_data = await self._synthesize_text_google(
text, language_code, voice_name
)
# Convert to AudioSegment and get actual duration
audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3")
audio_segments.append(audio_segment)
# Update current position based on actual audio duration (not VTT end time)
actual_audio_duration = len(audio_segment) / 1000.0 # Convert ms to seconds
current_audio_position += actual_audio_duration
# Combine all segments
if audio_segments:
final_audio = sum(audio_segments, AudioSegment.empty())
else:
# Fallback to empty audio if no segments
final_audio = AudioSegment.silent(duration=1000)
# Export to MP3
output_buffer = io.BytesIO()
final_audio.export(output_buffer, format="mp3", bitrate="128k")
return output_buffer.getvalue()
async def _synthesize_with_elevenlabs(
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None
) -> bytes:
"""Generate MP3 using ElevenLabs TTS"""
# Parse VTT cues
cues = self._parse_ad_cues(ad_vtt_content)
if not cues:
raise ValueError("No audio description cues found")
# Get voice ID for language
voice_id = self._get_elevenlabs_voice(language_code, voice_name)
# Synthesize each cue with precise timing anchoring
audio_segments = []
current_audio_position = 0.0 # Track actual audio timeline position
for i, cue in enumerate(cues):
# Calculate where this cue should start (anchored to VTT timing)
target_start_time = cue["start_time"]
# Add silence to reach the exact VTT start time
if target_start_time > current_audio_position:
silence_duration = target_start_time - current_audio_position
silence = AudioSegment.silent(duration=int(silence_duration * 1000))
audio_segments.append(silence)
current_audio_position = target_start_time
# Synthesize this cue with ElevenLabs
text = cue["text"].strip()
if text:
audio_data = await self._synthesize_text_elevenlabs(text, voice_id)
# Convert to AudioSegment and get actual duration
audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3")
audio_segments.append(audio_segment)
# Update current position based on actual audio duration (not VTT end time)
actual_audio_duration = len(audio_segment) / 1000.0 # Convert ms to seconds
current_audio_position += actual_audio_duration
# Combine all segments
final_audio = sum(audio_segments, AudioSegment.empty())
# Export to MP3
output_buffer = io.BytesIO()
final_audio.export(output_buffer, format="mp3", bitrate="128k")
return output_buffer.getvalue()
async def _synthesize_text_google(
self,
text: str,
language_code: str,
voice_name: Optional[str] = None
) -> bytes:
"""Synthesize a single text string to audio using Google TTS"""
# Configure voice
if not voice_name:
voice_name = settings.google_tts_voices.get(language_code, "en-US-Neural2-D")
voice = texttospeech.VoiceSelectionParams(
language_code=language_code,
name=voice_name
)
# Configure audio
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3,
speaking_rate=1.2, # Faster cadence for better flow
pitch=0.0
)
# Synthesize
synthesis_input = texttospeech.SynthesisInput(text=text)
response = self.google_client.synthesize_speech(
input=synthesis_input,
voice=voice,
audio_config=audio_config
)
return response.audio_content
async def _synthesize_text_elevenlabs(self, text: str, voice_id: str) -> bytes:
"""Synthesize text using ElevenLabs API"""
url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}"
headers = {
"Accept": "audio/mpeg",
"Content-Type": "application/json",
"xi-api-key": settings.elevenlabs_api_key
}
data = {
"text": text,
"model_id": "eleven_multilingual_v2",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.5,
"style": 0.0,
"use_speaker_boost": True
}
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data, headers=headers) as response:
if response.status == 200:
return await response.read()
else:
error_text = await response.text()
raise ValueError(f"ElevenLabs TTS failed: {response.status} - {error_text}")
def _get_elevenlabs_voice(self, language_code: str, voice_name: Optional[str] = None) -> str:
"""Get ElevenLabs voice ID for language"""
if voice_name:
return voice_name
return settings.elevenlabs_voices.get(language_code, "21m00Tcm4TlvDq8ikWAM")
def _parse_ad_cues(self, vtt_content: str) -> list[dict]:
"""Parse audio description VTT and extract timing + text"""
lines = vtt_content.strip().split('\n')
cues = []
i = 0
while i < len(lines):
line = lines[i].strip()
# Skip header and empty lines
if line == "WEBVTT" or line == "" or line.startswith("NOTE"):
i += 1
continue
# Check for timing line
if " --> " in line:
timing_parts = line.split(" --> ")
start_time = self._parse_timestamp(timing_parts[0].strip())
end_time = self._parse_timestamp(timing_parts[1].strip())
# Get text from next line(s)
i += 1
text_lines = []
while i < len(lines) and lines[i].strip() != "":
text_lines.append(lines[i].strip())
i += 1
if text_lines:
cues.append({
"start_time": start_time,
"end_time": end_time,
"text": " ".join(text_lines)
})
else:
i += 1
return cues
def _parse_timestamp(self, timestamp: str) -> float:
"""Convert VTT timestamp to seconds"""
# Format: HH:MM:SS.mmm or MM:SS.mmm
parts = timestamp.split(":")
if len(parts) == 3: # HH:MM:SS.mmm
hours, minutes, seconds = parts
elif len(parts) == 2: # MM:SS.mmm
hours, minutes, seconds = "0", parts[0], parts[1]
else:
raise ValueError(f"Invalid timestamp format: {timestamp}")
# Parse seconds and milliseconds
sec_parts = seconds.split(".")
seconds = int(sec_parts[0])
milliseconds = int(sec_parts[1]) if len(sec_parts) > 1 else 0
total_seconds = (
int(hours) * 3600 +
int(minutes) * 60 +
seconds +
milliseconds / 1000.0
)
return total_seconds
# Global service instance
tts_service = TTSService()