vtbp-video-translate/vtbp/cli.py
2025-09-02 11:43:56 -05:00

1153 lines
No EOL
54 KiB
Python

#!/usr/bin/env python3
import os
import sys
import json
import tempfile
import shutil
from pathlib import Path
from typing import Optional, Dict, Any, List
import typer
from rich.console import Console
from rich.progress import Progress, TaskID
from rich.table import Table
from rich import print as rprint
# Import our modules
from .io.ffmpeg import FFmpegIO, FFmpegError
from .separate.demucs import DemucsSeperator, DemucsError
from .asr.whisper import WhisperASR, WhisperError
from .asr.gemini_asr import GeminiASR, GeminiASRError
from .mt.opus_mt import OpusMTTranslator, TranslationError
from .mt.gemini_translation import GeminiTranslator, GeminiTranslationError
from .tts.piper import PiperTTS, PiperError
from .tts.google_tts import GoogleTTS, GoogleTTSError
from .align.stretch import AudioStretcher, StretchError
from .mix.mix_ffmpeg import AudioMixer, MixError
from .utils.timestamps import SegmentProcessor, TimestampError
from .config.api_config import APIManager
from .vad.silero_vad import SileroVAD, SileroVADError
from .video.segment_extractor import VideoSegmentExtractor, VideoSegmentError
app = typer.Typer(
name="vtbp",
help="Voice Translate Bed Preserve - Translate video voice while preserving music/SFX",
no_args_is_help=True
)
console = Console()
class VTBPError(Exception):
"""Main application error."""
pass
class VTBPPipeline:
"""Main pipeline for voice translation with bed preservation."""
def __init__(self, work_dir: str = "work", keep_temp: bool = False):
"""Initialize pipeline."""
self.work_dir = Path(work_dir)
self.keep_temp = keep_temp
self.console = Console()
# Initialize components
self.ffmpeg_io = FFmpegIO()
self.separator = None
self.asr = None
self.translator = None
self.tts = None
self.stretcher = None
self.mixer = None
self.segment_processor = None
# VAD and video segmentation
self.vad = None
self.video_extractor = None
# API management
self.api_manager = APIManager()
# Create work directory
self.work_dir.mkdir(exist_ok=True)
# Pipeline state
self.pipeline_state = {
'input_video': None,
'extracted_audio': None,
'voice_audio': None,
'bed_audio': None,
'transcript': None,
'translated_segments': None,
'synthesized_segments': None,
'aligned_segments': None,
'final_voice': None,
'final_mixed': None,
'output_video': None
}
def run_translation_pipeline(self, input_path: str, output_path: str,
src_lang: str = "auto", tgt_lang: str = "es",
voice_path: Optional[str] = None,
**kwargs) -> Dict[str, Any]:
"""
Run the complete voice translation pipeline.
Args:
input_path: Input video file path
output_path: Output video file path
src_lang: Source language code
tgt_lang: Target language code
voice_path: Path to TTS voice model
**kwargs: Additional pipeline parameters
Returns:
Dictionary with pipeline results and statistics
"""
try:
with Progress() as progress:
# Create progress tasks
main_task = progress.add_task("Overall Progress", total=8)
# Step 1: Extract audio from video
progress.update(main_task, description="Extracting audio...")
self._extract_audio(input_path, progress, main_task)
progress.advance(main_task)
# Step 2: Separate voice and bed
progress.update(main_task, description="Separating audio...")
self._separate_audio(kwargs, progress, main_task)
progress.advance(main_task)
# Step 3: Transcribe voice
progress.update(main_task, description="Transcribing speech...")
self._transcribe_voice(src_lang, kwargs, progress, main_task)
progress.advance(main_task)
# Step 4: Translate transcript
progress.update(main_task, description="Translating text...")
self._translate_transcript(src_lang, tgt_lang, kwargs, progress, main_task)
progress.advance(main_task)
# Step 5: Synthesize translated speech
progress.update(main_task, description="Synthesizing speech...")
self._synthesize_speech(voice_path, kwargs, progress, main_task)
progress.advance(main_task)
# Step 6: Time-align synthesized speech
progress.update(main_task, description="Aligning timing...")
self._align_audio(kwargs, progress, main_task)
progress.advance(main_task)
# Step 7: Mix voice with bed
progress.update(main_task, description="Mixing audio...")
self._mix_audio(kwargs, progress, main_task)
progress.advance(main_task)
# Step 8: Create final video
progress.update(main_task, description="Creating final video...")
self._create_final_video(input_path, output_path, kwargs, progress, main_task)
progress.advance(main_task)
# Generate results summary
results = self._generate_results_summary()
# Cleanup if not keeping temp files
if not self.keep_temp:
self._cleanup_temp_files()
return results
except Exception as e:
console.print(f"[red]Pipeline failed: {e}[/red]")
raise VTBPError(f"Translation pipeline failed: {e}")
def _extract_audio(self, input_path: str, progress: Progress, task: TaskID):
"""Extract audio from input video."""
try:
self.pipeline_state['input_video'] = input_path
# Get video info
audio_info = self.ffmpeg_io.get_audio_info(input_path)
console.print(f"Input audio: {audio_info['sample_rate']}Hz, {audio_info['channels']} channels")
# Extract audio
audio_path = self.work_dir / "input_audio.wav"
self.ffmpeg_io.extract_audio(input_path, str(audio_path),
sample_rate=48000, channels=2)
self.pipeline_state['extracted_audio'] = str(audio_path)
console.print(f"Audio extracted: {audio_path}")
except FFmpegError as e:
raise VTBPError(f"Audio extraction failed: {e}")
def _separate_audio(self, kwargs: Dict, progress: Progress, task: TaskID):
"""Separate audio into voice and bed."""
try:
model_name = kwargs.get('sep_model', 'htdemucs')
device = kwargs.get('device', None)
# Initialize separator
self.separator = DemucsSeperator(model_name=model_name, device=device)
# Separate audio
voice_path, bed_path = self.separator.separate_voice_and_bed(
self.pipeline_state['extracted_audio'],
str(self.work_dir)
)
self.pipeline_state['voice_audio'] = voice_path
self.pipeline_state['bed_audio'] = bed_path
console.print(f"Voice separated: {voice_path}")
console.print(f"Bed separated: {bed_path}")
except DemucsError as e:
raise VTBPError(f"Audio separation failed: {e}")
def _transcribe_voice(self, src_lang: str, kwargs: Dict, progress: Progress, task: TaskID):
"""Transcribe voice audio."""
try:
asr_provider = kwargs.get('asr_provider', 'whisper')
translation_provider = kwargs.get('translation_provider', 'opus')
target_language = kwargs.get('target_language', 'es')
vad_mode = kwargs.get('vad_mode', False)
if vad_mode and asr_provider == 'gemini':
# VAD-BASED WORKFLOW: Use Silero VAD + video segments + batch Gemini
console.print("🎤 VAD Mode: Using waveform-based speech detection")
# Initialize VAD
self.vad = SileroVAD()
# Load VAD parameters from environment
vad_config = self._load_vad_config()
# Detect speech passages using voice-only audio
voice_audio_path = self.pipeline_state['voice_audio']
speech_segments = self.vad.detect_speech_passages(
voice_audio_path,
threshold=vad_config['threshold'],
min_speech_ms=vad_config['min_speech_ms'],
min_silence_ms=vad_config['min_silence_ms'],
pad_pre_ms=vad_config['pad_pre_ms'],
pad_post_ms=vad_config['pad_post_ms']
)
# Extract video segments
self.video_extractor = VideoSegmentExtractor()
segments_dir = self.work_dir / "video_segments"
video_segments = self.video_extractor.extract_video_segments(
self.pipeline_state['input_video'],
speech_segments,
str(segments_dir)
)
# Process segments with batch Gemini
self.asr = GeminiASR(model="gemini-2.5-pro")
language = None if src_lang == "auto" else src_lang
transcript = self.asr.transcribe_video_segments_batch(
video_segments,
target_language=target_language,
source_language=language
)
# Mark as already translated (VAD mode includes translation)
self.pipeline_state['translation_consolidated'] = True
self.pipeline_state['vad_segments'] = speech_segments
self.pipeline_state['video_segments'] = video_segments
elif asr_provider == 'gemini' and translation_provider == 'gemini':
# CONSOLIDATED: Use Gemini for both transcription AND translation in one call
self.asr = GeminiASR(model="gemini-2.5-pro")
language = None if src_lang == "auto" else src_lang
# Use original video file directly for Gemini processing
video_path = self.pipeline_state['input_video']
transcript = self.asr.transcribe_and_translate_video(
video_path,
target_language=target_language,
source_language=language
)
# Mark as already translated
self.pipeline_state['translation_consolidated'] = True
elif asr_provider == 'gemini':
# Use Gemini 2.5 Pro for ASR only
self.asr = GeminiASR(model="gemini-2.5-pro")
language = None if src_lang == "auto" else src_lang
# Use video file directly for better multimodal understanding
video_path = self.pipeline_state['input_video']
transcript = self.asr.transcribe_video_file(video_path, language=language)
else:
# Use local Faster-Whisper
model_size = kwargs.get('asr_model', 'large-v2')
device = kwargs.get('device', None)
self.asr = WhisperASR(model_size=model_size, device=device)
# Transcribe with word timestamps
language = None if src_lang == "auto" else src_lang
transcript = self.asr.transcribe_with_timestamps(
self.pipeline_state['voice_audio'],
language=language,
word_timestamps=True
)
self.pipeline_state['transcript'] = transcript
# Process segments
self.segment_processor = SegmentProcessor(
min_segment_duration=kwargs.get('min_segment_duration', 1.5),
max_segment_duration=kwargs.get('max_segment_duration', 4.0)
)
# Group words into optimal segments
segments = self.segment_processor.group_words_into_segments(transcript)
segments = self.segment_processor.adjust_segment_timing(segments)
segments = self.segment_processor.merge_short_segments(segments)
self.pipeline_state['processed_segments'] = segments
console.print(f"Transcribed: {len(segments)} segments, "
f"language: {transcript['language']}")
# Save transcript
transcript_path = self.work_dir / "transcript.json"
with open(transcript_path, 'w', encoding='utf-8') as f:
json.dump(transcript, f, ensure_ascii=False, indent=2)
except (WhisperError, GeminiASRError, SileroVADError, VideoSegmentError) as e:
import traceback
console.print(f"[red]Speech recognition/VAD failed: {e}[/red]")
console.print(f"[yellow]Full traceback:[/yellow]")
console.print(traceback.format_exc())
raise VTBPError(f"Speech recognition failed: {e}")
except TimestampError as e:
import traceback
console.print(f"[red]Segment processing failed: {e}[/red]")
console.print(f"[yellow]Full traceback:[/yellow]")
console.print(traceback.format_exc())
raise VTBPError(f"Segment processing failed: {e}")
def _translate_transcript(self, src_lang: str, tgt_lang: str,
kwargs: Dict, progress: Progress, task: TaskID):
"""Translate transcript to target language."""
try:
# Check if translation was already done in consolidated ASR call
if self.pipeline_state.get('translation_consolidated', False):
console.print("Translation already completed in consolidated ASR+Translation call")
# Use the processed segments directly as they're already translated
segments = self.pipeline_state['processed_segments']
# Ensure each segment has proper translation fields
for segment in segments:
if not segment.get('translated_text'):
segment['translated_text'] = segment.get('text', '')
segment['original_text'] = segment.get('text', '') # For completeness
self.pipeline_state['translated_segments'] = segments
console.print(f"Using consolidated translation for {len(segments)} segments")
else:
# Perform separate translation step
translation_provider = kwargs.get('translation_provider', 'opus')
# Determine source language from ASR if auto
if src_lang == "auto":
src_lang = self.pipeline_state['transcript']['language']
if translation_provider == 'gemini':
# Use Gemini 2.5 Pro for translation
self.translator = GeminiTranslator(src_lang, tgt_lang, model="gemini-2.5-pro")
segments = self.pipeline_state['processed_segments']
translated_segments = self.translator.translate_segments(segments)
else:
# Use local OPUS-MT
device = kwargs.get('device', None)
self.translator = OpusMTTranslator(src_lang, tgt_lang, device=device)
segments = self.pipeline_state['processed_segments']
translated_segments = self.translator.translate_segments(segments)
self.pipeline_state['translated_segments'] = translated_segments
# Get the final translated segments for logging
final_segments = self.pipeline_state['translated_segments']
# Determine languages for logging
if self.pipeline_state.get('translation_consolidated'):
transcript = self.pipeline_state['transcript']
source_lang = transcript.get('source_language', src_lang)
target_lang = transcript.get('language', tgt_lang) # Target language for consolidated
else:
source_lang = src_lang
target_lang = tgt_lang
console.print(f"Translation completed: {len(final_segments)} segments "
f"from {source_lang} to {target_lang}")
# Save translation
translation_path = self.work_dir / "translation.json"
if self.pipeline_state.get('translation_consolidated'):
# Save consolidated transcript+translation
with open(translation_path, 'w', encoding='utf-8') as f:
json.dump(self.pipeline_state['transcript'], f, ensure_ascii=False, indent=2)
elif hasattr(self, 'translator'):
# Save separate translation
self.translator.save_translation(final_segments, str(translation_path))
except (TranslationError, GeminiTranslationError) as e:
raise VTBPError(f"Translation failed: {e}")
def _synthesize_speech(self, voice_path: Optional[str], kwargs: Dict,
progress: Progress, task: TaskID):
"""Synthesize translated speech."""
try:
tts_provider = kwargs.get('tts_provider', 'piper')
sample_rate = kwargs.get('sample_rate', 48000)
# Create TTS output directory
tts_dir = self.work_dir / "tts"
tts_dir.mkdir(exist_ok=True)
segments = self.pipeline_state['translated_segments']
if tts_provider == 'google':
# Use Google Cloud TTS Neural2 with timing awareness
self.tts = GoogleTTS()
# Get target language and voice
tgt_lang = kwargs.get('target_language', 'en-US')
voice_name = voice_path or self.tts.get_best_voice(tgt_lang.split('-')[0])
# Check if we need timing-aware synthesis (stretching enabled)
enable_time_stretch = kwargs.get('enable_time_stretch', True)
if enable_time_stretch:
# Use two-pass synthesis with automatic speed adjustment
synthesized_segments = self.tts.synthesize_segments_with_timing(
segments, str(tts_dir),
voice_name=voice_name,
language_code=tgt_lang,
sample_rate=sample_rate
)
# Mark segments as pre-timed (no librosa stretching needed)
for seg in synthesized_segments:
seg['pre_timed'] = True
seg['tts_provider'] = 'google_timed'
else:
# Use normal synthesis for no-stretch mode
synthesized_segments = self.tts.synthesize_segments(
segments, str(tts_dir),
voice_name=voice_name,
language_code=tgt_lang,
sample_rate=sample_rate
)
else:
# Use local Piper TTS
self.tts = PiperTTS(voice_path=voice_path)
synthesized_segments = self.tts.synthesize_segments(
segments, str(tts_dir), sample_rate=sample_rate
)
self.pipeline_state['synthesized_segments'] = synthesized_segments
successful = sum(1 for seg in synthesized_segments if seg.get('synthesized', False))
console.print(f"Synthesized {successful}/{len(synthesized_segments)} segments")
except (PiperError, GoogleTTSError) as e:
raise VTBPError(f"Speech synthesis failed: {e}")
def _align_audio(self, kwargs: Dict, progress: Progress, task: TaskID):
"""Time-align synthesized audio segments."""
try:
sample_rate = kwargs.get('sample_rate', 48000)
enable_time_stretch = kwargs.get('enable_time_stretch', True)
segments = self.pipeline_state['synthesized_segments']
if enable_time_stretch:
# Check if segments are already pre-timed from Google TTS
pre_timed_segments = [seg for seg in segments if seg.get('pre_timed', False)]
if pre_timed_segments:
# Google TTS with speed adjustment - no librosa stretching needed!
console.print("Using pre-timed Google TTS segments (no stretching required)")
# Mark segments as aligned and ready for placement
aligned_segments = []
for segment in segments:
aligned_segment = segment.copy()
aligned_segment['aligned_audio_path'] = segment.get('audio_path')
aligned_segment['alignment_success'] = True
aligned_segment['time_stretch_method'] = 'google_tts_speed'
aligned_segments.append(aligned_segment)
self.pipeline_state['aligned_segments'] = aligned_segments
# Use timestamp-aware placement
final_voice_path = self.work_dir / "voice_translated.wav"
self._place_stretched_segments_at_timestamps(aligned_segments, str(final_voice_path), sample_rate)
console.print(f"Pre-timed segments positioned: {len(aligned_segments)} segments")
else:
# Traditional librosa stretching for non-Google TTS
self.stretcher = AudioStretcher(sample_rate=sample_rate)
# Create alignment output directory
align_dir = self.work_dir / "aligned"
align_dir.mkdir(exist_ok=True)
# Align segments with librosa
tts_dir = self.work_dir / "tts"
aligned_segments = self.stretcher.align_segments(
segments, str(tts_dir), str(align_dir)
)
self.pipeline_state['aligned_segments'] = aligned_segments
# Use timestamp-aware placement
final_voice_path = self.work_dir / "voice_translated.wav"
self._place_stretched_segments_at_timestamps(aligned_segments, str(final_voice_path), sample_rate)
successful = sum(1 for seg in aligned_segments if seg.get('alignment_success', False))
console.print(f"Librosa-aligned and positioned {successful}/{len(aligned_segments)} segments")
else:
# No time stretching - use natural TTS timing
console.print("Time stretching disabled - using natural TTS timing")
# Simply concatenate TTS segments without stretching
final_voice_path = self.work_dir / "voice_translated.wav"
self._concatenate_natural_timing(segments, str(final_voice_path), sample_rate)
# Mark all segments as "aligned" but without actual stretching
aligned_segments = []
for segment in segments:
aligned_segment = segment.copy()
aligned_segment['aligned_audio_path'] = segment.get('audio_path')
aligned_segment['alignment_success'] = True
aligned_segment['time_stretch_applied'] = False
aligned_segments.append(aligned_segment)
self.pipeline_state['aligned_segments'] = aligned_segments
console.print(f"Natural timing preserved for {len(aligned_segments)} segments")
self.pipeline_state['final_voice'] = str(final_voice_path)
except StretchError as e:
raise VTBPError(f"Audio alignment failed: {e}")
def _concatenate_natural_timing(self, segments: List[Dict[str, Any]],
output_path: str, sample_rate: int):
"""
Place TTS segments at their correct timestamp positions (timing-aware placement).
Args:
segments: List of synthesized segments with start/end timestamps
output_path: Output audio file path
sample_rate: Sample rate for output
"""
import soundfile as sf
import numpy as np
try:
# Calculate total duration needed
if not segments:
raise VTBPError("No segments provided for timing placement")
# Get original video duration for reference
original_duration = self.ffmpeg_io.get_duration(self.pipeline_state['input_video'])
# Find the latest end time from segments
max_segment_end = max(seg.get('end', 0) for seg in segments)
# Use the longer of the two durations to ensure we don't cut off audio
max_end_time = max(original_duration, max_segment_end)
total_samples = int(max_end_time * sample_rate) + sample_rate # Add 1 second buffer
# Create empty audio buffer
final_audio = np.zeros(total_samples, dtype=np.float32)
console.print(f"Creating timed audio track:")
console.print(f" Original video duration: {original_duration:.2f}s")
console.print(f" Max segment end time: {max_segment_end:.2f}s")
console.print(f" Final track duration: {max_end_time:.2f}s")
# Sort segments by start time to handle overlaps
sorted_segments = sorted(segments, key=lambda s: s.get('start', 0))
# Detect and fix overlapping segments
fixed_segments = []
for i, segment in enumerate(sorted_segments):
current_start = segment.get('start', 0)
current_end = segment.get('end', 0)
# Check for overlap with previous segment
if fixed_segments:
prev_end = fixed_segments[-1].get('end', 0)
if current_start < prev_end:
console.print(f"⚠️ Overlap detected: Segment {i} starts at {current_start:.2f}s, "
f"previous ends at {prev_end:.2f}s")
# Adjust start time to avoid overlap
segment = segment.copy()
segment['start'] = prev_end + 0.1 # 100ms gap
console.print(f" → Adjusted to start at {segment['start']:.2f}s")
fixed_segments.append(segment)
console.print(f"Processing {len(fixed_segments)} segments (fixed overlaps)")
placed_segments = 0
for segment in fixed_segments:
audio_path = segment.get('audio_path')
start_time = segment.get('start', 0)
end_time = segment.get('end', 0)
if audio_path and os.path.exists(audio_path) and start_time is not None:
try:
# Load synthesized audio
audio, sr = sf.read(audio_path)
# Convert to mono if needed
if len(audio.shape) > 1:
audio_mono = np.mean(audio, axis=1)
else:
audio_mono = audio
# Resample if needed
if sr != sample_rate:
import scipy.signal
audio_mono = scipy.signal.resample(
audio_mono, int(len(audio_mono) * sample_rate / sr)
)
# Calculate placement position
start_sample = int(start_time * sample_rate)
segment_length = len(audio_mono)
end_sample = start_sample + segment_length
# Ensure we don't exceed buffer
if end_sample > len(final_audio):
end_sample = len(final_audio)
audio_mono = audio_mono[:end_sample - start_sample]
# Place audio at correct timestamp position
if start_sample < len(final_audio):
final_audio[start_sample:end_sample] = audio_mono
placed_segments += 1
# Debug timing info
actual_end_time = end_sample / sample_rate
tts_duration = segment_length / sample_rate
original_duration = end_time - start_time
console.print(f"✓ Segment {segment.get('id', '?')}: {start_time:.2f}s-{actual_end_time:.2f}s")
console.print(f" TTS duration: {tts_duration:.2f}s, Original: {original_duration:.2f}s")
# Check for timing issues
if abs(tts_duration - original_duration) > 1.0:
console.print(f" ⚠️ Large timing difference: {tts_duration-original_duration:+.2f}s")
except Exception as e:
console.print(f"Warning: Failed to place segment at {start_time:.2f}s: {e}")
continue
if placed_segments == 0:
raise VTBPError("No audio segments could be placed")
# Convert to stereo
final_stereo = np.column_stack([final_audio, final_audio])
# Save
sf.write(output_path, final_stereo, sample_rate, subtype='PCM_16')
duration = len(final_audio) / sample_rate
console.print(f"Timing-aware audio created: {output_path}")
console.print(f"Duration: {duration:.2f}s, Placed segments: {placed_segments}/{len(segments)}")
except Exception as e:
raise VTBPError(f"Timing-aware audio placement failed: {e}")
def _place_stretched_segments_at_timestamps(self, aligned_segments: List[Dict[str, Any]],
output_path: str, sample_rate: int):
"""
Place time-stretched segments at their correct timestamp positions.
Args:
aligned_segments: List of time-aligned segments with stretched audio
output_path: Output audio file path
sample_rate: Sample rate for output
"""
import soundfile as sf
import numpy as np
try:
# Calculate total duration needed
if not aligned_segments:
raise VTBPError("No aligned segments provided for placement")
# Get original video duration for reference
original_duration = self.ffmpeg_io.get_duration(self.pipeline_state['input_video'])
# Find the latest end time from segments
max_segment_end = max(seg.get('end', 0) for seg in aligned_segments)
# Use the longer of the two durations
max_end_time = max(original_duration, max_segment_end)
total_samples = int(max_end_time * sample_rate) + sample_rate # Add 1 second buffer
# Create empty audio buffer
final_audio = np.zeros(total_samples, dtype=np.float32)
console.print(f"🎵 Placing {len(aligned_segments)} stretched segments at original timestamps:")
console.print(f" Total timeline duration: {max_end_time:.2f}s")
placed_segments = 0
for segment in aligned_segments:
# Use the aligned (stretched) audio path
audio_path = segment.get('aligned_audio_path')
start_time = segment.get('start', 0)
end_time = segment.get('end', 0)
segment_id = segment.get('id', '?')
if audio_path and os.path.exists(audio_path) and start_time is not None:
try:
# Load stretched audio
audio, sr = sf.read(audio_path)
# Convert to mono if needed
if len(audio.shape) > 1:
audio_mono = np.mean(audio, axis=1)
else:
audio_mono = audio
# Resample if needed
if sr != sample_rate:
import scipy.signal
audio_mono = scipy.signal.resample(
audio_mono, int(len(audio_mono) * sample_rate / sr)
)
# Calculate placement position using ORIGINAL timestamps
start_sample = int(start_time * sample_rate)
segment_length = len(audio_mono)
end_sample = start_sample + segment_length
# Ensure we don't exceed buffer
if end_sample > len(final_audio):
end_sample = len(final_audio)
audio_mono = audio_mono[:end_sample - start_sample]
# Place stretched audio at original timestamp position
if start_sample < len(final_audio):
final_audio[start_sample:end_sample] = audio_mono
placed_segments += 1
# Debug timing info
actual_end_time = end_sample / sample_rate
stretched_duration = segment_length / sample_rate
original_duration = end_time - start_time
console.print(f"✅ Segment {segment_id}: Placed at {start_time:.2f}s-{actual_end_time:.2f}s")
console.print(f" Stretched: {stretched_duration:.2f}s, Target: {original_duration:.2f}s")
except Exception as e:
console.print(f"❌ Failed to place segment {segment_id} at {start_time:.2f}s: {e}")
continue
else:
console.print(f"⚠️ Segment {segment_id}: No aligned audio or invalid timing")
if placed_segments == 0:
raise VTBPError("No stretched segments could be placed")
# Convert to stereo
final_stereo = np.column_stack([final_audio, final_audio])
# Save
sf.write(output_path, final_stereo, sample_rate, subtype='PCM_16')
duration = len(final_audio) / sample_rate
console.print(f"🎯 Stretched segments placed at timestamps: {output_path}")
console.print(f"Duration: {duration:.2f}s, Placed segments: {placed_segments}/{len(aligned_segments)}")
except Exception as e:
raise VTBPError(f"Stretched segment placement failed: {e}")
def _load_vad_config(self) -> Dict[str, Any]:
"""Load VAD configuration from environment variables."""
return {
'threshold': float(os.getenv('VAD_THRESHOLD', '0.5')),
'min_speech_ms': int(os.getenv('VAD_MIN_SPEECH_MS', '350')),
'min_silence_ms': int(os.getenv('VAD_MIN_SILENCE_MS', '180')),
'pad_pre_ms': int(os.getenv('VAD_PAD_PRE_MS', '50')),
'pad_post_ms': int(os.getenv('VAD_PAD_POST_MS', '80'))
}
def _mix_audio(self, kwargs: Dict, progress: Progress, task: TaskID):
"""Mix translated voice with bed audio."""
try:
sample_rate = kwargs.get('sample_rate', 48000)
# Initialize mixer
self.mixer = AudioMixer(sample_rate=sample_rate)
# Mix parameters
voice_gain = kwargs.get('voice_gain', 0.0)
bed_gain = kwargs.get('bed_gain', -3.0)
enable_ducking = kwargs.get('enable_ducking', True)
duck_threshold = kwargs.get('duck_threshold', 0.08)
duck_ratio = kwargs.get('duck_ratio', 6.0)
duck_attack = kwargs.get('duck_attack', 5.0)
duck_release = kwargs.get('duck_release', 250.0)
enable_loudness = kwargs.get('enable_loudness_norm', True)
lufs_target = kwargs.get('lufs_target', -16.0)
# Create final mix
final_audio_path = self.work_dir / "final_audio.wav"
mix_info = self.mixer.create_final_mix(
self.pipeline_state['final_voice'],
self.pipeline_state['bed_audio'],
str(final_audio_path),
voice_gain=voice_gain,
bed_gain=bed_gain,
enable_ducking=enable_ducking,
duck_threshold=duck_threshold,
duck_ratio=duck_ratio,
duck_attack=duck_attack,
duck_release=duck_release,
enable_loudness_norm=enable_loudness,
lufs_target=lufs_target
)
self.pipeline_state['final_mixed'] = str(final_audio_path)
self.pipeline_state['mix_info'] = mix_info
console.print(f"Final mix created: {final_audio_path}")
if enable_loudness and 'loudness_measurements' in mix_info:
measurements = mix_info['loudness_measurements']
console.print(f"Loudness: {measurements.get('output_i', 'N/A')} LUFS")
except MixError as e:
raise VTBPError(f"Audio mixing failed: {e}")
def _create_final_video(self, input_path: str, output_path: str, kwargs: Dict,
progress: Progress, task: TaskID):
"""Create final video with translated audio."""
try:
# Remux video with new audio
copy_video = kwargs.get('copy_video', True)
self.ffmpeg_io.remux_video_with_audio(
input_path,
self.pipeline_state['final_mixed'],
output_path,
copy_video=copy_video
)
self.pipeline_state['output_video'] = output_path
console.print(f"Final video created: {output_path}")
except FFmpegError as e:
raise VTBPError(f"Video creation failed: {e}")
def _generate_results_summary(self) -> Dict[str, Any]:
"""Generate pipeline results summary."""
transcript = self.pipeline_state.get('transcript', {})
translated_segments = self.pipeline_state.get('translated_segments', [])
aligned_segments = self.pipeline_state.get('aligned_segments', [])
mix_info = self.pipeline_state.get('mix_info', {})
return {
'input_video': self.pipeline_state.get('input_video'),
'output_video': self.pipeline_state.get('output_video'),
'transcript_language': transcript.get('language'),
'transcript_confidence': transcript.get('language_probability'),
'total_segments': len(translated_segments),
'translation_success_rate': sum(1 for seg in translated_segments
if not seg.get('translation_error')) / len(translated_segments) if translated_segments else 0,
'synthesis_success_rate': sum(1 for seg in aligned_segments
if seg.get('synthesized', False)) / len(aligned_segments) if aligned_segments else 0,
'alignment_success_rate': sum(1 for seg in aligned_segments
if seg.get('alignment_success', False)) / len(aligned_segments) if aligned_segments else 0,
'mix_settings': mix_info.get('settings', {}),
'loudness_measurements': mix_info.get('loudness_measurements', {}),
'work_directory': str(self.work_dir),
'temp_files_kept': self.keep_temp
}
def _cleanup_temp_files(self):
"""Clean up temporary files."""
try:
if self.work_dir.exists():
shutil.rmtree(self.work_dir)
console.print(f"Cleaned up temporary files: {self.work_dir}")
except Exception as e:
console.print(f"[yellow]Warning: Failed to clean up temp files: {e}[/yellow]")
@app.command()
def translate(
input_path: str = typer.Argument(..., help="Input video file path"),
output_path: str = typer.Argument(..., help="Output video file path"),
src_lang: str = typer.Option("auto", "--src-lang", help="Source language code"),
tgt_lang: str = typer.Option("es", "--tgt-lang", help="Target language code"),
# Provider selection (NEW API OPTIONS)
asr_provider: str = typer.Option("whisper", "--asr-provider", help="ASR provider (whisper/gemini)"),
tts_provider: str = typer.Option("piper", "--tts-provider", help="TTS provider (piper/google)"),
translation_provider: str = typer.Option("opus", "--translation-provider", help="Translation provider (opus/gemini)"),
# VAD mode (NEW)
vad_mode: bool = typer.Option(False, "--vad-mode", help="Use waveform-based VAD for precise speech detection"),
# Voice and model options
voice_path: Optional[str] = typer.Option(None, "--voice", help="TTS voice (Piper .onnx path or Google voice name)"),
sep_model: str = typer.Option("htdemucs", "--sep", help="Audio separation model"),
asr_model: str = typer.Option("large-v2", "--asr", help="ASR model size (for local whisper)"),
# System options
device: Optional[str] = typer.Option(None, "--device", help="Processing device (cpu/cuda/mps/auto)"),
work_dir: str = typer.Option("work", "--work-dir", help="Working directory for temporary files"),
keep_temp: bool = typer.Option(False, "--keep-temp", help="Keep temporary files"),
# Audio processing options
lufs_target: float = typer.Option(-16.0, "--lufs", help="Target LUFS for loudness normalization"),
duck_threshold: float = typer.Option(0.08, "--duck-threshold", help="Ducking threshold (0.0-1.0)"),
duck_ratio: float = typer.Option(6.0, "--duck-ratio", help="Ducking compression ratio"),
duck_attack: float = typer.Option(5.0, "--duck-attack", help="Ducking attack time (ms)"),
duck_release: float = typer.Option(250.0, "--duck-release", help="Ducking release time (ms)"),
voice_gain: float = typer.Option(0.0, "--voice-gain", help="Voice gain adjustment (dB)"),
bed_gain: float = typer.Option(-3.0, "--bed-gain", help="Bed gain adjustment (dB)"),
disable_ducking: bool = typer.Option(False, "--no-duck", help="Disable sidechain ducking"),
disable_loudness: bool = typer.Option(False, "--no-loudnorm", help="Disable loudness normalization"),
disable_time_stretch: bool = typer.Option(False, "--no-stretch", help="Disable time stretching (use natural TTS timing)"),
sample_rate: int = typer.Option(48000, "--sample-rate", help="Audio sample rate"),
# API options
estimate_cost: bool = typer.Option(False, "--estimate-cost", help="Show cost estimation for API providers"),
validate_apis: bool = typer.Option(False, "--validate-apis", help="Validate API credentials before processing"),
):
"""
Translate video voice while preserving background music and sound effects.
Examples:
# API mode (recommended for Mac)
vtbp translate input.mp4 output.mp4 --asr-provider gemini --tts-provider google
# Local mode
vtbp translate input.mp4 output.mp4 --voice spanish_voice.onnx
# Hybrid mode
vtbp translate input.mp4 output.mp4 --asr-provider gemini --tts-provider piper
"""
try:
# Validate inputs
if not os.path.exists(input_path):
raise typer.BadParameter(f"Input file does not exist: {input_path}")
# Validate voice path for local providers
if voice_path and tts_provider == 'piper' and not os.path.exists(voice_path):
raise typer.BadParameter(f"Voice model does not exist: {voice_path}")
# Create output directory if needed
output_dir = os.path.dirname(output_path)
if output_dir: # Only create directory if path has a directory component
os.makedirs(output_dir, exist_ok=True)
# Initialize API manager
api_manager = APIManager()
# Validate provider options
providers = {
'asr': asr_provider,
'tts': tts_provider,
'translation': translation_provider
}
# Validate API credentials if needed
if validate_apis or any(p in ['gemini', 'google'] for p in providers.values()):
validation_results = api_manager.validate_api_keys(providers)
missing_creds = api_manager.get_missing_credentials(providers)
if missing_creds:
console.print("[red]❌ Missing API credentials:[/red]")
for provider_type, message in missing_creds.items():
console.print(f"{provider_type}: {message}")
console.print(f"\n{api_manager.setup_environment_guide()}")
raise typer.Exit(1)
else:
console.print("[green]✅ API credentials validated[/green]")
# Cost estimation
if estimate_cost:
# Rough estimation based on video duration
try:
ffmpeg_io = FFmpegIO()
duration = ffmpeg_io.get_duration(input_path)
text_length = int(duration * 20) # ~20 chars per second estimate
cost_estimate = api_manager.estimate_api_costs(providers, duration, text_length)
console.print("\n[bold yellow]💰 Cost Estimation:[/bold yellow]")
total_cost = 0
for provider_type, cost_info in cost_estimate.items():
if provider_type != 'total':
cost = cost_info.get('cost_usd', 0)
provider_name = cost_info.get('provider', 'N/A')
console.print(f"{provider_type.title()}: ${cost:.4f} ({provider_name})")
total_cost += cost
console.print(f" • [bold]Total: ${total_cost:.4f}[/bold]")
console.print(f" Note: {cost_estimate['total']['note']}")
if not typer.confirm("Continue with translation?"):
raise typer.Exit(0)
except Exception as e:
console.print(f"[yellow]Warning: Could not estimate costs: {e}[/yellow]")
# Show configuration
console.print("\n[bold blue]VTBP - Voice Translation Pipeline[/bold blue]")
config_table = Table(title="Configuration")
config_table.add_column("Parameter", style="cyan")
config_table.add_column("Value", style="white")
config_table.add_row("Input Video", input_path)
config_table.add_row("Output Video", output_path)
config_table.add_row("Source Language", src_lang)
config_table.add_row("Target Language", tgt_lang)
config_table.add_row("", "") # Separator
config_table.add_row("ASR Provider", f"{asr_provider} ({'API' if asr_provider == 'gemini' else 'Local'})")
config_table.add_row("Translation Provider", f"{translation_provider} ({'API' if translation_provider == 'gemini' else 'Local'})")
config_table.add_row("TTS Provider", f"{tts_provider} ({'API' if tts_provider == 'google' else 'Local'})")
config_table.add_row("VAD Mode", "Enabled (Silero)" if vad_mode else "Disabled")
config_table.add_row("", "") # Separator
config_table.add_row("Voice/Model", voice_path or "Auto-selected")
config_table.add_row("Separation Model", sep_model)
config_table.add_row("Device", device or "Auto")
config_table.add_row("LUFS Target", str(lufs_target))
config_table.add_row("Ducking", "Disabled" if disable_ducking else "Enabled")
config_table.add_row("Loudness Norm", "Disabled" if disable_loudness else "Enabled")
config_table.add_row("Time Stretching", "Disabled" if disable_time_stretch else "Enabled")
console.print(config_table)
console.print()
# Initialize pipeline
pipeline = VTBPPipeline(work_dir=work_dir, keep_temp=keep_temp)
# Run translation
kwargs = {
# Provider options
'asr_provider': asr_provider,
'tts_provider': tts_provider,
'translation_provider': translation_provider,
'target_language': tgt_lang,
'vad_mode': vad_mode, # NEW VAD option
# Model options
'sep_model': sep_model,
'asr_model': asr_model,
'device': device,
'sample_rate': sample_rate,
# Audio processing
'voice_gain': voice_gain,
'bed_gain': bed_gain,
'enable_ducking': not disable_ducking,
'duck_threshold': duck_threshold,
'duck_ratio': duck_ratio,
'duck_attack': duck_attack,
'duck_release': duck_release,
'enable_loudness_norm': not disable_loudness,
'enable_time_stretch': not disable_time_stretch,
'lufs_target': lufs_target,
}
results = pipeline.run_translation_pipeline(
input_path, output_path, src_lang, tgt_lang, voice_path, **kwargs
)
# Show results
console.print(f"\n[bold green]Translation completed successfully![/bold green]")
console.print(f"Output: {results['output_video']}")
console.print(f"Language: {results['transcript_language']}{tgt_lang}")
console.print(f"Segments: {results['total_segments']}")
console.print(f"Success rates - Translation: {results['translation_success_rate']:.1%}, "
f"Synthesis: {results['synthesis_success_rate']:.1%}, "
f"Alignment: {results['alignment_success_rate']:.1%}")
if results.get('loudness_measurements'):
measurements = results['loudness_measurements']
console.print(f"Final loudness: {measurements.get('output_i', 'N/A')} LUFS")
if keep_temp:
console.print(f"Temporary files kept in: {results['work_directory']}")
except VTBPError as e:
console.print(f"[red]Error: {e}[/red]")
raise typer.Exit(1)
except KeyboardInterrupt:
console.print(f"\n[yellow]Interrupted by user[/yellow]")
raise typer.Exit(1)
except Exception as e:
import traceback
console.print(f"[red]Unexpected error: {e}[/red]")
console.print(f"[yellow]Full traceback:[/yellow]")
console.print(traceback.format_exc())
raise typer.Exit(1)
@app.command()
def info():
"""Show information about VTBP and available models."""
console.print("\n[bold blue]VTBP - Voice Translate Bed Preserve[/bold blue]")
console.print("Translate video voice while preserving background music and sound effects\n")
console.print("[bold]Available Components:[/bold]")
console.print("• Audio Separation: Demucs (htdemucs, mdx models)")
console.print("• Speech Recognition: Faster-Whisper (tiny to large-v2)")
console.print("• Translation: OPUS-MT models (Helsinki-NLP)")
console.print("• Text-to-Speech: Piper TTS (.onnx voice models)")
console.print("• Time Stretching: Rubber Band (pyrubberband)")
console.print("• Audio Mixing: FFmpeg (sidechain, loudness normalization)")
console.print("\n[bold]Supported Languages (examples):[/bold]")
langs = ["en", "es", "fr", "de", "it", "pt", "ru", "zh", "ja", "ko", "ar", "hi"]
console.print("" + ", ".join(langs) + " (and many more)")
console.print("\n[bold]Example Usage:[/bold]")
console.print("vtbp translate input.mp4 output.mp4 --src-lang en --tgt-lang es --voice spanish_voice.onnx")
@app.command()
def version():
"""Show version information."""
console.print("VTBP (Voice Translate Bed Preserve) v1.0.0")
def main():
"""Main entry point."""
app()
if __name__ == "__main__":
main()