import io from dataclasses import dataclass from typing import Optional import aiohttp from google.cloud import texttospeech from pydub import AudioSegment from ..core.config import settings from ..core.logging import get_logger from .gemini_tts import gemini_tts_service logger = get_logger(__name__) @dataclass class TTSCueSegment: """Represents a synthesized audio segment for a single AD cue.""" cue_index: int start_time: float # Original VTT start time end_time: float # Original VTT end time duration: float # Actual TTS audio duration in seconds text: str # The AD text that was synthesized audio_bytes: bytes # The raw MP3 audio bytes class TTSService: def __init__(self): # Check Gemini TTS availability (uses same API key as other Gemini services) self.gemini_available = bool(settings.gemini_api_key) # Initialize Google Cloud TTS (uses GOOGLE_APPLICATION_CREDENTIALS env var) try: self.google_client = texttospeech.TextToSpeechClient() logger.info("Google Cloud TTS client initialized successfully") except Exception as e: logger.warning(f"Google Cloud TTS credentials not configured: {e}") self.google_client = None # Check ElevenLabs availability self.elevenlabs_available = bool(settings.elevenlabs_api_key) # Log configured provider logger.info(f"TTS provider configured: {settings.tts_provider}") async def synthesize_audio_description( self, ad_vtt_content: str, language_code: str = "en-US", voice_name: Optional[str] = None, provider: Optional[str] = None, model: str = "flash", speed: float = 1.0, style_prompt: str = "", stability: float = 0.5, similarity_boost: float = 0.5, ) -> bytes: """ Generate MP3 audio from audio description VTT content. Synthesizes each cue separately and stitches them together with timing. Provider priority: specified provider > settings.tts_provider > fallback chain Fallback chain: Gemini -> Google Cloud TTS -> ElevenLabs Args: ad_vtt_content: VTT content with audio description cues language_code: Language code (e.g., "en-US") voice_name: Voice name/ID for the provider provider: TTS provider ("gemini", "google", "elevenlabs") model: Gemini model variant - "flash" or "pro" (Gemini only) speed: Speech rate multiplier 0.5-2.0 (Gemini only) style_prompt: Style instructions (Gemini only) """ # Determine which provider to use active_provider = provider or settings.tts_provider # Extract simple language code for Gemini (e.g., "en-US" -> "en") simple_lang = language_code.split("-")[0] if "-" in language_code else language_code # Try the configured provider first, then fallback if active_provider == "gemini" and self.gemini_available: try: logger.info( f"Using Gemini TTS for language: {simple_lang}, voice: {voice_name}, " f"model: {model}, speed: {speed}x" ) return await gemini_tts_service.synthesize_audio_description( ad_vtt_content, simple_lang, voice_name, model=model, speed=speed, style_prompt=style_prompt ) except Exception as e: logger.warning(f"Gemini TTS failed, falling back: {e}") # Fall through to Google/ElevenLabs if active_provider == "google" or (active_provider == "gemini" and self.google_client): try: if self.google_client: logger.info(f"Using Google Cloud TTS for language: {language_code}") return await self._synthesize_with_google(ad_vtt_content, language_code, voice_name) except Exception as e: logger.warning(f"Google Cloud TTS failed: {e}") if self.elevenlabs_available: logger.info(f"Using ElevenLabs TTS for language: {language_code}") return await self._synthesize_with_elevenlabs( ad_vtt_content, language_code, voice_name, stability=stability, similarity_boost=similarity_boost, ) raise ValueError("No TTS service available") async def synthesize_audio_description_with_segments( self, ad_vtt_content: str, language_code: str = "en-US", voice_name: Optional[str] = None, provider: Optional[str] = None, model: str = "flash", speed: float = 1.0, style_prompt: str = "", stability: float = 0.5, similarity_boost: float = 0.5, ) -> tuple[bytes, list[TTSCueSegment]]: """ Generate MP3 audio from audio description VTT content AND return individual segments. Used for accessible video generation where we need per-cue audio files. Returns: Tuple of (combined_mp3_bytes, list_of_cue_segments) """ # Determine which provider to use active_provider = provider or settings.tts_provider # Extract simple language code for Gemini (e.g., "en-US" -> "en") simple_lang = language_code.split("-")[0] if "-" in language_code else language_code # Parse VTT cues first cues = self._parse_ad_cues(ad_vtt_content) if not cues: raise ValueError("No audio description cues found") # Synthesize each cue individually segments: list[TTSCueSegment] = [] audio_segments_for_combine = [] current_audio_position = 0.0 for i, cue in enumerate(cues): target_start_time = cue["start_time"] # Add silence to reach the exact VTT start time if target_start_time > current_audio_position: silence_duration = target_start_time - current_audio_position silence = AudioSegment.silent(duration=int(silence_duration * 1000)) audio_segments_for_combine.append(silence) current_audio_position = target_start_time text = cue["text"].strip() if text: # Ensure proper punctuation for natural TTS flow if not text.endswith(('.', '!', '?')): text += "." # Synthesize with the appropriate provider try: if active_provider == "gemini" and self.gemini_available: audio_data = await gemini_tts_service.synthesize_text( text, voice_name or gemini_tts_service.default_voice, simple_lang, model=model, speed=speed, style_prompt=style_prompt ) elif self.google_client: audio_data = await self._synthesize_text_google(text, language_code, voice_name) elif self.elevenlabs_available: voice_id = self._get_elevenlabs_voice(language_code, voice_name) audio_data = await self._synthesize_text_elevenlabs( text, voice_id, stability=stability, similarity_boost=similarity_boost, ) else: raise ValueError("No TTS service available") # Get actual duration from audio audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3") actual_duration = len(audio_segment) / 1000.0 # Store segment info segments.append(TTSCueSegment( cue_index=i, start_time=cue["start_time"], end_time=cue["end_time"], duration=actual_duration, text=cue["text"], audio_bytes=audio_data )) # Add to combined audio audio_segments_for_combine.append(audio_segment) current_audio_position += actual_duration except Exception as e: logger.warning(f"Failed to synthesize cue {i}: {e}") # Add silence for failed cue cue_duration = cue["end_time"] - cue["start_time"] silence = AudioSegment.silent(duration=int(cue_duration * 1000)) audio_segments_for_combine.append(silence) current_audio_position += cue_duration # Combine all segments if audio_segments_for_combine: final_audio = sum(audio_segments_for_combine, AudioSegment.empty()) else: final_audio = AudioSegment.silent(duration=1000) # Export combined to MP3 output_buffer = io.BytesIO() final_audio.export(output_buffer, format="mp3", bitrate="128k") logger.info(f"Synthesized {len(segments)} AD cue segments") return output_buffer.getvalue(), segments async def _synthesize_with_google( self, ad_vtt_content: str, language_code: str = "en-US", voice_name: Optional[str] = None ) -> bytes: """Generate MP3 using Google TTS with 2-second pauses between passages""" # Parse VTT cues cues = self._parse_ad_cues(ad_vtt_content) if not cues: raise ValueError("No audio description cues found") # Synthesize each cue separately with precise timing anchoring audio_segments = [] current_audio_position = 0.0 # Track actual audio timeline position for i, cue in enumerate(cues): # Calculate where this cue should start (anchored to VTT timing) target_start_time = cue["start_time"] # Add silence to reach the exact VTT start time if target_start_time > current_audio_position: silence_duration = target_start_time - current_audio_position silence = AudioSegment.silent(duration=int(silence_duration * 1000)) audio_segments.append(silence) current_audio_position = target_start_time # Synthesize this cue's text text = cue["text"].strip() if text: # Ensure proper punctuation for natural TTS flow if not text.endswith(('.', '!', '?')): text += "." # Synthesize this individual cue audio_data = await self._synthesize_text_google( text, language_code, voice_name ) # Convert to AudioSegment and get actual duration audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3") audio_segments.append(audio_segment) # Update current position based on actual audio duration (not VTT end time) actual_audio_duration = len(audio_segment) / 1000.0 # Convert ms to seconds current_audio_position += actual_audio_duration # Combine all segments if audio_segments: final_audio = sum(audio_segments, AudioSegment.empty()) else: # Fallback to empty audio if no segments final_audio = AudioSegment.silent(duration=1000) # Export to MP3 output_buffer = io.BytesIO() final_audio.export(output_buffer, format="mp3", bitrate="128k") return output_buffer.getvalue() async def _synthesize_with_elevenlabs( self, ad_vtt_content: str, language_code: str = "en-US", voice_name: Optional[str] = None, stability: float = 0.5, similarity_boost: float = 0.5, ) -> bytes: """Generate MP3 using ElevenLabs TTS""" # Parse VTT cues cues = self._parse_ad_cues(ad_vtt_content) if not cues: raise ValueError("No audio description cues found") # Get voice ID for language voice_id = self._get_elevenlabs_voice(language_code, voice_name) # Synthesize each cue with precise timing anchoring audio_segments = [] current_audio_position = 0.0 # Track actual audio timeline position for i, cue in enumerate(cues): # Calculate where this cue should start (anchored to VTT timing) target_start_time = cue["start_time"] # Add silence to reach the exact VTT start time if target_start_time > current_audio_position: silence_duration = target_start_time - current_audio_position silence = AudioSegment.silent(duration=int(silence_duration * 1000)) audio_segments.append(silence) current_audio_position = target_start_time # Synthesize this cue with ElevenLabs text = cue["text"].strip() if text: audio_data = await self._synthesize_text_elevenlabs( text, voice_id, stability=stability, similarity_boost=similarity_boost, ) # Convert to AudioSegment and get actual duration audio_segment = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3") audio_segments.append(audio_segment) # Update current position based on actual audio duration (not VTT end time) actual_audio_duration = len(audio_segment) / 1000.0 # Convert ms to seconds current_audio_position += actual_audio_duration # Combine all segments final_audio = sum(audio_segments, AudioSegment.empty()) # Export to MP3 output_buffer = io.BytesIO() final_audio.export(output_buffer, format="mp3", bitrate="128k") return output_buffer.getvalue() async def _synthesize_text_google( self, text: str, language_code: str, voice_name: Optional[str] = None ) -> bytes: """Synthesize a single text string to audio using Google TTS""" # Configure voice if not voice_name: voice_name = settings.google_tts_voices.get(language_code, "en-US-Neural2-D") voice = texttospeech.VoiceSelectionParams( language_code=language_code, name=voice_name ) # Configure audio audio_config = texttospeech.AudioConfig( audio_encoding=texttospeech.AudioEncoding.MP3, speaking_rate=1.2, # Faster cadence for better flow pitch=0.0 ) # Synthesize synthesis_input = texttospeech.SynthesisInput(text=text) response = self.google_client.synthesize_speech( input=synthesis_input, voice=voice, audio_config=audio_config ) return response.audio_content async def _synthesize_text_elevenlabs( self, text: str, voice_id: str, stability: float = 0.5, similarity_boost: float = 0.5, ) -> bytes: """Synthesize text using ElevenLabs API""" url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}" headers = { "Accept": "audio/mpeg", "Content-Type": "application/json", "xi-api-key": settings.elevenlabs_api_key } data = { "text": text, "model_id": "eleven_multilingual_v2", "voice_settings": { "stability": stability, "similarity_boost": similarity_boost, "style": 0.0, "use_speaker_boost": True } } async with aiohttp.ClientSession() as session: async with session.post(url, json=data, headers=headers) as response: if response.status == 200: return await response.read() else: error_text = await response.text() raise ValueError(f"ElevenLabs TTS failed: {response.status} - {error_text}") def _get_elevenlabs_voice(self, language_code: str, voice_name: Optional[str] = None) -> str: """Get ElevenLabs voice ID for language""" if voice_name: return voice_name return settings.elevenlabs_voices.get(language_code, "21m00Tcm4TlvDq8ikWAM") def _parse_ad_cues(self, vtt_content: str) -> list[dict]: """Parse audio description VTT and extract timing + text""" lines = vtt_content.strip().split('\n') cues = [] i = 0 while i < len(lines): line = lines[i].strip() # Skip header and empty lines if line == "WEBVTT" or line == "" or line.startswith("NOTE"): i += 1 continue # Check for timing line if " --> " in line: timing_parts = line.split(" --> ") start_time = self._parse_timestamp(timing_parts[0].strip()) end_time = self._parse_timestamp(timing_parts[1].strip()) # Get text from next line(s) i += 1 text_lines = [] while i < len(lines) and lines[i].strip() != "": text_lines.append(lines[i].strip()) i += 1 if text_lines: cues.append({ "start_time": start_time, "end_time": end_time, "text": " ".join(text_lines) }) else: i += 1 return cues def _parse_timestamp(self, timestamp: str) -> float: """Convert VTT timestamp to seconds""" # Format: HH:MM:SS.mmm or MM:SS.mmm parts = timestamp.split(":") if len(parts) == 3: # HH:MM:SS.mmm hours, minutes, seconds = parts elif len(parts) == 2: # MM:SS.mmm hours, minutes, seconds = "0", parts[0], parts[1] else: raise ValueError(f"Invalid timestamp format: {timestamp}") # Parse seconds and milliseconds sec_parts = seconds.split(".") seconds = int(sec_parts[0]) milliseconds = int(sec_parts[1]) if len(sec_parts) > 1 else 0 total_seconds = ( int(hours) * 3600 + int(minutes) * 60 + seconds + milliseconds / 1000.0 ) return total_seconds # Global service instance tts_service = TTSService()