video-accessibility/backend/app/services/tts.py
2025-10-08 18:47:28 -05:00

303 lines
11 KiB
Python

import io
from typing import Optional
import aiohttp
from google.cloud import texttospeech
from pydub import AudioSegment
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger(__name__)
class TTSService:
def __init__(self):
# Initialize Google TTS (uses GOOGLE_APPLICATION_CREDENTIALS env var)
# The same GCP credentials used for GCS also work for TTS
try:
self.google_client = texttospeech.TextToSpeechClient()
logger.info("Google TTS client initialized successfully")
except Exception as e:
logger.warning(f"Google TTS credentials not configured: {e}")
self.google_client = None
# Check ElevenLabs availability
self.elevenlabs_available = bool(settings.elevenlabs_api_key)
async def synthesize_audio_description(
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None
) -> bytes:
"""
Generate MP3 audio from audio description VTT content
Synthesizes each cue separately and stitches them together with timing
Uses Google TTS with ElevenLabs fallback
"""
# Try Google TTS first, fallback to ElevenLabs
try:
if self.google_client:
return await self._synthesize_with_google(ad_vtt_content, language_code, voice_name)
elif self.elevenlabs_available:
return await self._synthesize_with_elevenlabs(ad_vtt_content, language_code, voice_name)
else:
raise ValueError("No TTS service configured")
except Exception as e:
if self.elevenlabs_available and self.google_client:
logger.warning(f"Google TTS failed, trying ElevenLabs: {e}")
return await self._synthesize_with_elevenlabs(ad_vtt_content, language_code, voice_name)
raise
async def _synthesize_with_google(
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None
) -> bytes:
"""Generate MP3 using Google TTS with 2-second pauses between passages"""
# Parse VTT cues
cues = self._parse_ad_cues(ad_vtt_content)
if not cues:
raise ValueError("No audio description cues found")
# Synthesize each cue separately with precise timing anchoring
audio_segments = []
current_audio_position = 0.0 # Track actual audio timeline position
for i, cue in enumerate(cues):
# Calculate where this cue should start (anchored to VTT timing)
target_start_time = cue["start_time"]
# Add silence to reach the exact VTT start time
if target_start_time > current_audio_position:
silence_duration = target_start_time - current_audio_position
silence = AudioSegment.silent(duration=int(silence_duration * 1000))
audio_segments.append(silence)
current_audio_position = target_start_time
# Synthesize this cue's text
text = cue["text"].strip()
if text:
# Ensure proper punctuation for natural TTS flow
if not text.endswith(('.', '!', '?')):
text += "."
# Synthesize this individual cue
audio_data = await self._synthesize_text_google(
text, language_code, voice_name
)
# Convert to AudioSegment and get actual duration
audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3")
audio_segments.append(audio_segment)
# Update current position based on actual audio duration (not VTT end time)
actual_audio_duration = len(audio_segment) / 1000.0 # Convert ms to seconds
current_audio_position += actual_audio_duration
# Combine all segments
if audio_segments:
final_audio = sum(audio_segments, AudioSegment.empty())
else:
# Fallback to empty audio if no segments
final_audio = AudioSegment.silent(duration=1000)
# Export to MP3
output_buffer = io.BytesIO()
final_audio.export(output_buffer, format="mp3", bitrate="128k")
return output_buffer.getvalue()
async def _synthesize_with_elevenlabs(
self,
ad_vtt_content: str,
language_code: str = "en-US",
voice_name: Optional[str] = None
) -> bytes:
"""Generate MP3 using ElevenLabs TTS"""
# Parse VTT cues
cues = self._parse_ad_cues(ad_vtt_content)
if not cues:
raise ValueError("No audio description cues found")
# Get voice ID for language
voice_id = self._get_elevenlabs_voice(language_code, voice_name)
# Synthesize each cue with precise timing anchoring
audio_segments = []
current_audio_position = 0.0 # Track actual audio timeline position
for i, cue in enumerate(cues):
# Calculate where this cue should start (anchored to VTT timing)
target_start_time = cue["start_time"]
# Add silence to reach the exact VTT start time
if target_start_time > current_audio_position:
silence_duration = target_start_time - current_audio_position
silence = AudioSegment.silent(duration=int(silence_duration * 1000))
audio_segments.append(silence)
current_audio_position = target_start_time
# Synthesize this cue with ElevenLabs
text = cue["text"].strip()
if text:
audio_data = await self._synthesize_text_elevenlabs(text, voice_id)
# Convert to AudioSegment and get actual duration
audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3")
audio_segments.append(audio_segment)
# Update current position based on actual audio duration (not VTT end time)
actual_audio_duration = len(audio_segment) / 1000.0 # Convert ms to seconds
current_audio_position += actual_audio_duration
# Combine all segments
final_audio = sum(audio_segments, AudioSegment.empty())
# Export to MP3
output_buffer = io.BytesIO()
final_audio.export(output_buffer, format="mp3", bitrate="128k")
return output_buffer.getvalue()
async def _synthesize_text_google(
self,
text: str,
language_code: str,
voice_name: Optional[str] = None
) -> bytes:
"""Synthesize a single text string to audio using Google TTS"""
# Configure voice
if not voice_name:
voice_name = settings.google_tts_voices.get(language_code, "en-US-Neural2-D")
voice = texttospeech.VoiceSelectionParams(
language_code=language_code,
name=voice_name
)
# Configure audio
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3,
speaking_rate=1.2, # Faster cadence for better flow
pitch=0.0
)
# Synthesize
synthesis_input = texttospeech.SynthesisInput(text=text)
response = self.google_client.synthesize_speech(
input=synthesis_input,
voice=voice,
audio_config=audio_config
)
return response.audio_content
async def _synthesize_text_elevenlabs(self, text: str, voice_id: str) -> bytes:
"""Synthesize text using ElevenLabs API"""
url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}"
headers = {
"Accept": "audio/mpeg",
"Content-Type": "application/json",
"xi-api-key": settings.elevenlabs_api_key
}
data = {
"text": text,
"model_id": "eleven_multilingual_v2",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.5,
"style": 0.0,
"use_speaker_boost": True
}
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data, headers=headers) as response:
if response.status == 200:
return await response.read()
else:
error_text = await response.text()
raise ValueError(f"ElevenLabs TTS failed: {response.status} - {error_text}")
def _get_elevenlabs_voice(self, language_code: str, voice_name: Optional[str] = None) -> str:
"""Get ElevenLabs voice ID for language"""
if voice_name:
return voice_name
return settings.elevenlabs_voices.get(language_code, "21m00Tcm4TlvDq8ikWAM")
def _parse_ad_cues(self, vtt_content: str) -> list[dict]:
"""Parse audio description VTT and extract timing + text"""
lines = vtt_content.strip().split('\n')
cues = []
i = 0
while i < len(lines):
line = lines[i].strip()
# Skip header and empty lines
if line == "WEBVTT" or line == "" or line.startswith("NOTE"):
i += 1
continue
# Check for timing line
if " --> " in line:
timing_parts = line.split(" --> ")
start_time = self._parse_timestamp(timing_parts[0].strip())
end_time = self._parse_timestamp(timing_parts[1].strip())
# Get text from next line(s)
i += 1
text_lines = []
while i < len(lines) and lines[i].strip() != "":
text_lines.append(lines[i].strip())
i += 1
if text_lines:
cues.append({
"start_time": start_time,
"end_time": end_time,
"text": " ".join(text_lines)
})
else:
i += 1
return cues
def _parse_timestamp(self, timestamp: str) -> float:
"""Convert VTT timestamp to seconds"""
# Format: HH:MM:SS.mmm or MM:SS.mmm
parts = timestamp.split(":")
if len(parts) == 3: # HH:MM:SS.mmm
hours, minutes, seconds = parts
elif len(parts) == 2: # MM:SS.mmm
hours, minutes, seconds = "0", parts[0], parts[1]
else:
raise ValueError(f"Invalid timestamp format: {timestamp}")
# Parse seconds and milliseconds
sec_parts = seconds.split(".")
seconds = int(sec_parts[0])
milliseconds = int(sec_parts[1]) if len(sec_parts) > 1 else 0
total_seconds = (
int(hours) * 3600 +
int(minutes) * 60 +
seconds +
milliseconds / 1000.0
)
return total_seconds
# Global service instance
tts_service = TTSService()