1153 lines
No EOL
54 KiB
Python
1153 lines
No EOL
54 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import tempfile
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List
|
|
import typer
|
|
from rich.console import Console
|
|
from rich.progress import Progress, TaskID
|
|
from rich.table import Table
|
|
from rich import print as rprint
|
|
|
|
# Import our modules
|
|
from .io.ffmpeg import FFmpegIO, FFmpegError
|
|
from .separate.demucs import DemucsSeperator, DemucsError
|
|
from .asr.whisper import WhisperASR, WhisperError
|
|
from .asr.gemini_asr import GeminiASR, GeminiASRError
|
|
from .mt.opus_mt import OpusMTTranslator, TranslationError
|
|
from .mt.gemini_translation import GeminiTranslator, GeminiTranslationError
|
|
from .tts.piper import PiperTTS, PiperError
|
|
from .tts.google_tts import GoogleTTS, GoogleTTSError
|
|
from .align.stretch import AudioStretcher, StretchError
|
|
from .mix.mix_ffmpeg import AudioMixer, MixError
|
|
from .utils.timestamps import SegmentProcessor, TimestampError
|
|
from .config.api_config import APIManager
|
|
from .vad.silero_vad import SileroVAD, SileroVADError
|
|
from .video.segment_extractor import VideoSegmentExtractor, VideoSegmentError
|
|
|
|
|
|
app = typer.Typer(
|
|
name="vtbp",
|
|
help="Voice Translate Bed Preserve - Translate video voice while preserving music/SFX",
|
|
no_args_is_help=True
|
|
)
|
|
console = Console()
|
|
|
|
|
|
class VTBPError(Exception):
|
|
"""Main application error."""
|
|
pass
|
|
|
|
|
|
class VTBPPipeline:
|
|
"""Main pipeline for voice translation with bed preservation."""
|
|
|
|
def __init__(self, work_dir: str = "work", keep_temp: bool = False):
|
|
"""Initialize pipeline."""
|
|
self.work_dir = Path(work_dir)
|
|
self.keep_temp = keep_temp
|
|
self.console = Console()
|
|
|
|
# Initialize components
|
|
self.ffmpeg_io = FFmpegIO()
|
|
self.separator = None
|
|
self.asr = None
|
|
self.translator = None
|
|
self.tts = None
|
|
self.stretcher = None
|
|
self.mixer = None
|
|
self.segment_processor = None
|
|
|
|
# VAD and video segmentation
|
|
self.vad = None
|
|
self.video_extractor = None
|
|
|
|
# API management
|
|
self.api_manager = APIManager()
|
|
|
|
# Create work directory
|
|
self.work_dir.mkdir(exist_ok=True)
|
|
|
|
# Pipeline state
|
|
self.pipeline_state = {
|
|
'input_video': None,
|
|
'extracted_audio': None,
|
|
'voice_audio': None,
|
|
'bed_audio': None,
|
|
'transcript': None,
|
|
'translated_segments': None,
|
|
'synthesized_segments': None,
|
|
'aligned_segments': None,
|
|
'final_voice': None,
|
|
'final_mixed': None,
|
|
'output_video': None
|
|
}
|
|
|
|
def run_translation_pipeline(self, input_path: str, output_path: str,
|
|
src_lang: str = "auto", tgt_lang: str = "es",
|
|
voice_path: Optional[str] = None,
|
|
**kwargs) -> Dict[str, Any]:
|
|
"""
|
|
Run the complete voice translation pipeline.
|
|
|
|
Args:
|
|
input_path: Input video file path
|
|
output_path: Output video file path
|
|
src_lang: Source language code
|
|
tgt_lang: Target language code
|
|
voice_path: Path to TTS voice model
|
|
**kwargs: Additional pipeline parameters
|
|
|
|
Returns:
|
|
Dictionary with pipeline results and statistics
|
|
"""
|
|
try:
|
|
with Progress() as progress:
|
|
# Create progress tasks
|
|
main_task = progress.add_task("Overall Progress", total=8)
|
|
|
|
# Step 1: Extract audio from video
|
|
progress.update(main_task, description="Extracting audio...")
|
|
self._extract_audio(input_path, progress, main_task)
|
|
progress.advance(main_task)
|
|
|
|
# Step 2: Separate voice and bed
|
|
progress.update(main_task, description="Separating audio...")
|
|
self._separate_audio(kwargs, progress, main_task)
|
|
progress.advance(main_task)
|
|
|
|
# Step 3: Transcribe voice
|
|
progress.update(main_task, description="Transcribing speech...")
|
|
self._transcribe_voice(src_lang, kwargs, progress, main_task)
|
|
progress.advance(main_task)
|
|
|
|
# Step 4: Translate transcript
|
|
progress.update(main_task, description="Translating text...")
|
|
self._translate_transcript(src_lang, tgt_lang, kwargs, progress, main_task)
|
|
progress.advance(main_task)
|
|
|
|
# Step 5: Synthesize translated speech
|
|
progress.update(main_task, description="Synthesizing speech...")
|
|
self._synthesize_speech(voice_path, kwargs, progress, main_task)
|
|
progress.advance(main_task)
|
|
|
|
# Step 6: Time-align synthesized speech
|
|
progress.update(main_task, description="Aligning timing...")
|
|
self._align_audio(kwargs, progress, main_task)
|
|
progress.advance(main_task)
|
|
|
|
# Step 7: Mix voice with bed
|
|
progress.update(main_task, description="Mixing audio...")
|
|
self._mix_audio(kwargs, progress, main_task)
|
|
progress.advance(main_task)
|
|
|
|
# Step 8: Create final video
|
|
progress.update(main_task, description="Creating final video...")
|
|
self._create_final_video(input_path, output_path, kwargs, progress, main_task)
|
|
progress.advance(main_task)
|
|
|
|
# Generate results summary
|
|
results = self._generate_results_summary()
|
|
|
|
# Cleanup if not keeping temp files
|
|
if not self.keep_temp:
|
|
self._cleanup_temp_files()
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
console.print(f"[red]Pipeline failed: {e}[/red]")
|
|
raise VTBPError(f"Translation pipeline failed: {e}")
|
|
|
|
def _extract_audio(self, input_path: str, progress: Progress, task: TaskID):
|
|
"""Extract audio from input video."""
|
|
try:
|
|
self.pipeline_state['input_video'] = input_path
|
|
|
|
# Get video info
|
|
audio_info = self.ffmpeg_io.get_audio_info(input_path)
|
|
console.print(f"Input audio: {audio_info['sample_rate']}Hz, {audio_info['channels']} channels")
|
|
|
|
# Extract audio
|
|
audio_path = self.work_dir / "input_audio.wav"
|
|
self.ffmpeg_io.extract_audio(input_path, str(audio_path),
|
|
sample_rate=48000, channels=2)
|
|
|
|
self.pipeline_state['extracted_audio'] = str(audio_path)
|
|
console.print(f"Audio extracted: {audio_path}")
|
|
|
|
except FFmpegError as e:
|
|
raise VTBPError(f"Audio extraction failed: {e}")
|
|
|
|
def _separate_audio(self, kwargs: Dict, progress: Progress, task: TaskID):
|
|
"""Separate audio into voice and bed."""
|
|
try:
|
|
model_name = kwargs.get('sep_model', 'htdemucs')
|
|
device = kwargs.get('device', None)
|
|
|
|
# Initialize separator
|
|
self.separator = DemucsSeperator(model_name=model_name, device=device)
|
|
|
|
# Separate audio
|
|
voice_path, bed_path = self.separator.separate_voice_and_bed(
|
|
self.pipeline_state['extracted_audio'],
|
|
str(self.work_dir)
|
|
)
|
|
|
|
self.pipeline_state['voice_audio'] = voice_path
|
|
self.pipeline_state['bed_audio'] = bed_path
|
|
|
|
console.print(f"Voice separated: {voice_path}")
|
|
console.print(f"Bed separated: {bed_path}")
|
|
|
|
except DemucsError as e:
|
|
raise VTBPError(f"Audio separation failed: {e}")
|
|
|
|
def _transcribe_voice(self, src_lang: str, kwargs: Dict, progress: Progress, task: TaskID):
|
|
"""Transcribe voice audio."""
|
|
try:
|
|
asr_provider = kwargs.get('asr_provider', 'whisper')
|
|
translation_provider = kwargs.get('translation_provider', 'opus')
|
|
target_language = kwargs.get('target_language', 'es')
|
|
vad_mode = kwargs.get('vad_mode', False)
|
|
|
|
if vad_mode and asr_provider == 'gemini':
|
|
# VAD-BASED WORKFLOW: Use Silero VAD + video segments + batch Gemini
|
|
console.print("🎤 VAD Mode: Using waveform-based speech detection")
|
|
|
|
# Initialize VAD
|
|
self.vad = SileroVAD()
|
|
|
|
# Load VAD parameters from environment
|
|
vad_config = self._load_vad_config()
|
|
|
|
# Detect speech passages using voice-only audio
|
|
voice_audio_path = self.pipeline_state['voice_audio']
|
|
speech_segments = self.vad.detect_speech_passages(
|
|
voice_audio_path,
|
|
threshold=vad_config['threshold'],
|
|
min_speech_ms=vad_config['min_speech_ms'],
|
|
min_silence_ms=vad_config['min_silence_ms'],
|
|
pad_pre_ms=vad_config['pad_pre_ms'],
|
|
pad_post_ms=vad_config['pad_post_ms']
|
|
)
|
|
|
|
# Extract video segments
|
|
self.video_extractor = VideoSegmentExtractor()
|
|
segments_dir = self.work_dir / "video_segments"
|
|
|
|
video_segments = self.video_extractor.extract_video_segments(
|
|
self.pipeline_state['input_video'],
|
|
speech_segments,
|
|
str(segments_dir)
|
|
)
|
|
|
|
# Process segments with batch Gemini
|
|
self.asr = GeminiASR(model="gemini-2.5-pro")
|
|
language = None if src_lang == "auto" else src_lang
|
|
|
|
transcript = self.asr.transcribe_video_segments_batch(
|
|
video_segments,
|
|
target_language=target_language,
|
|
source_language=language
|
|
)
|
|
|
|
# Mark as already translated (VAD mode includes translation)
|
|
self.pipeline_state['translation_consolidated'] = True
|
|
self.pipeline_state['vad_segments'] = speech_segments
|
|
self.pipeline_state['video_segments'] = video_segments
|
|
|
|
elif asr_provider == 'gemini' and translation_provider == 'gemini':
|
|
# CONSOLIDATED: Use Gemini for both transcription AND translation in one call
|
|
self.asr = GeminiASR(model="gemini-2.5-pro")
|
|
language = None if src_lang == "auto" else src_lang
|
|
|
|
# Use original video file directly for Gemini processing
|
|
video_path = self.pipeline_state['input_video']
|
|
|
|
transcript = self.asr.transcribe_and_translate_video(
|
|
video_path,
|
|
target_language=target_language,
|
|
source_language=language
|
|
)
|
|
|
|
# Mark as already translated
|
|
self.pipeline_state['translation_consolidated'] = True
|
|
|
|
elif asr_provider == 'gemini':
|
|
# Use Gemini 2.5 Pro for ASR only
|
|
self.asr = GeminiASR(model="gemini-2.5-pro")
|
|
language = None if src_lang == "auto" else src_lang
|
|
|
|
# Use video file directly for better multimodal understanding
|
|
video_path = self.pipeline_state['input_video']
|
|
transcript = self.asr.transcribe_video_file(video_path, language=language)
|
|
|
|
else:
|
|
# Use local Faster-Whisper
|
|
model_size = kwargs.get('asr_model', 'large-v2')
|
|
device = kwargs.get('device', None)
|
|
self.asr = WhisperASR(model_size=model_size, device=device)
|
|
|
|
# Transcribe with word timestamps
|
|
language = None if src_lang == "auto" else src_lang
|
|
transcript = self.asr.transcribe_with_timestamps(
|
|
self.pipeline_state['voice_audio'],
|
|
language=language,
|
|
word_timestamps=True
|
|
)
|
|
|
|
self.pipeline_state['transcript'] = transcript
|
|
|
|
# Process segments
|
|
self.segment_processor = SegmentProcessor(
|
|
min_segment_duration=kwargs.get('min_segment_duration', 1.5),
|
|
max_segment_duration=kwargs.get('max_segment_duration', 4.0)
|
|
)
|
|
|
|
# Group words into optimal segments
|
|
segments = self.segment_processor.group_words_into_segments(transcript)
|
|
segments = self.segment_processor.adjust_segment_timing(segments)
|
|
segments = self.segment_processor.merge_short_segments(segments)
|
|
|
|
self.pipeline_state['processed_segments'] = segments
|
|
|
|
console.print(f"Transcribed: {len(segments)} segments, "
|
|
f"language: {transcript['language']}")
|
|
|
|
# Save transcript
|
|
transcript_path = self.work_dir / "transcript.json"
|
|
with open(transcript_path, 'w', encoding='utf-8') as f:
|
|
json.dump(transcript, f, ensure_ascii=False, indent=2)
|
|
|
|
except (WhisperError, GeminiASRError, SileroVADError, VideoSegmentError) as e:
|
|
import traceback
|
|
console.print(f"[red]Speech recognition/VAD failed: {e}[/red]")
|
|
console.print(f"[yellow]Full traceback:[/yellow]")
|
|
console.print(traceback.format_exc())
|
|
raise VTBPError(f"Speech recognition failed: {e}")
|
|
except TimestampError as e:
|
|
import traceback
|
|
console.print(f"[red]Segment processing failed: {e}[/red]")
|
|
console.print(f"[yellow]Full traceback:[/yellow]")
|
|
console.print(traceback.format_exc())
|
|
raise VTBPError(f"Segment processing failed: {e}")
|
|
|
|
def _translate_transcript(self, src_lang: str, tgt_lang: str,
|
|
kwargs: Dict, progress: Progress, task: TaskID):
|
|
"""Translate transcript to target language."""
|
|
try:
|
|
# Check if translation was already done in consolidated ASR call
|
|
if self.pipeline_state.get('translation_consolidated', False):
|
|
console.print("Translation already completed in consolidated ASR+Translation call")
|
|
|
|
# Use the processed segments directly as they're already translated
|
|
segments = self.pipeline_state['processed_segments']
|
|
|
|
# Ensure each segment has proper translation fields
|
|
for segment in segments:
|
|
if not segment.get('translated_text'):
|
|
segment['translated_text'] = segment.get('text', '')
|
|
segment['original_text'] = segment.get('text', '') # For completeness
|
|
|
|
self.pipeline_state['translated_segments'] = segments
|
|
console.print(f"Using consolidated translation for {len(segments)} segments")
|
|
|
|
else:
|
|
# Perform separate translation step
|
|
translation_provider = kwargs.get('translation_provider', 'opus')
|
|
|
|
# Determine source language from ASR if auto
|
|
if src_lang == "auto":
|
|
src_lang = self.pipeline_state['transcript']['language']
|
|
|
|
if translation_provider == 'gemini':
|
|
# Use Gemini 2.5 Pro for translation
|
|
self.translator = GeminiTranslator(src_lang, tgt_lang, model="gemini-2.5-pro")
|
|
segments = self.pipeline_state['processed_segments']
|
|
translated_segments = self.translator.translate_segments(segments)
|
|
|
|
else:
|
|
# Use local OPUS-MT
|
|
device = kwargs.get('device', None)
|
|
self.translator = OpusMTTranslator(src_lang, tgt_lang, device=device)
|
|
segments = self.pipeline_state['processed_segments']
|
|
translated_segments = self.translator.translate_segments(segments)
|
|
|
|
self.pipeline_state['translated_segments'] = translated_segments
|
|
|
|
# Get the final translated segments for logging
|
|
final_segments = self.pipeline_state['translated_segments']
|
|
|
|
# Determine languages for logging
|
|
if self.pipeline_state.get('translation_consolidated'):
|
|
transcript = self.pipeline_state['transcript']
|
|
source_lang = transcript.get('source_language', src_lang)
|
|
target_lang = transcript.get('language', tgt_lang) # Target language for consolidated
|
|
else:
|
|
source_lang = src_lang
|
|
target_lang = tgt_lang
|
|
|
|
console.print(f"Translation completed: {len(final_segments)} segments "
|
|
f"from {source_lang} to {target_lang}")
|
|
|
|
# Save translation
|
|
translation_path = self.work_dir / "translation.json"
|
|
if self.pipeline_state.get('translation_consolidated'):
|
|
# Save consolidated transcript+translation
|
|
with open(translation_path, 'w', encoding='utf-8') as f:
|
|
json.dump(self.pipeline_state['transcript'], f, ensure_ascii=False, indent=2)
|
|
elif hasattr(self, 'translator'):
|
|
# Save separate translation
|
|
self.translator.save_translation(final_segments, str(translation_path))
|
|
|
|
except (TranslationError, GeminiTranslationError) as e:
|
|
raise VTBPError(f"Translation failed: {e}")
|
|
|
|
def _synthesize_speech(self, voice_path: Optional[str], kwargs: Dict,
|
|
progress: Progress, task: TaskID):
|
|
"""Synthesize translated speech."""
|
|
try:
|
|
tts_provider = kwargs.get('tts_provider', 'piper')
|
|
sample_rate = kwargs.get('sample_rate', 48000)
|
|
|
|
# Create TTS output directory
|
|
tts_dir = self.work_dir / "tts"
|
|
tts_dir.mkdir(exist_ok=True)
|
|
|
|
segments = self.pipeline_state['translated_segments']
|
|
|
|
if tts_provider == 'google':
|
|
# Use Google Cloud TTS Neural2 with timing awareness
|
|
self.tts = GoogleTTS()
|
|
|
|
# Get target language and voice
|
|
tgt_lang = kwargs.get('target_language', 'en-US')
|
|
voice_name = voice_path or self.tts.get_best_voice(tgt_lang.split('-')[0])
|
|
|
|
# Check if we need timing-aware synthesis (stretching enabled)
|
|
enable_time_stretch = kwargs.get('enable_time_stretch', True)
|
|
|
|
if enable_time_stretch:
|
|
# Use two-pass synthesis with automatic speed adjustment
|
|
synthesized_segments = self.tts.synthesize_segments_with_timing(
|
|
segments, str(tts_dir),
|
|
voice_name=voice_name,
|
|
language_code=tgt_lang,
|
|
sample_rate=sample_rate
|
|
)
|
|
|
|
# Mark segments as pre-timed (no librosa stretching needed)
|
|
for seg in synthesized_segments:
|
|
seg['pre_timed'] = True
|
|
seg['tts_provider'] = 'google_timed'
|
|
else:
|
|
# Use normal synthesis for no-stretch mode
|
|
synthesized_segments = self.tts.synthesize_segments(
|
|
segments, str(tts_dir),
|
|
voice_name=voice_name,
|
|
language_code=tgt_lang,
|
|
sample_rate=sample_rate
|
|
)
|
|
|
|
else:
|
|
# Use local Piper TTS
|
|
self.tts = PiperTTS(voice_path=voice_path)
|
|
|
|
synthesized_segments = self.tts.synthesize_segments(
|
|
segments, str(tts_dir), sample_rate=sample_rate
|
|
)
|
|
|
|
self.pipeline_state['synthesized_segments'] = synthesized_segments
|
|
|
|
successful = sum(1 for seg in synthesized_segments if seg.get('synthesized', False))
|
|
console.print(f"Synthesized {successful}/{len(synthesized_segments)} segments")
|
|
|
|
except (PiperError, GoogleTTSError) as e:
|
|
raise VTBPError(f"Speech synthesis failed: {e}")
|
|
|
|
def _align_audio(self, kwargs: Dict, progress: Progress, task: TaskID):
|
|
"""Time-align synthesized audio segments."""
|
|
try:
|
|
sample_rate = kwargs.get('sample_rate', 48000)
|
|
enable_time_stretch = kwargs.get('enable_time_stretch', True)
|
|
|
|
segments = self.pipeline_state['synthesized_segments']
|
|
|
|
if enable_time_stretch:
|
|
# Check if segments are already pre-timed from Google TTS
|
|
pre_timed_segments = [seg for seg in segments if seg.get('pre_timed', False)]
|
|
|
|
if pre_timed_segments:
|
|
# Google TTS with speed adjustment - no librosa stretching needed!
|
|
console.print("Using pre-timed Google TTS segments (no stretching required)")
|
|
|
|
# Mark segments as aligned and ready for placement
|
|
aligned_segments = []
|
|
for segment in segments:
|
|
aligned_segment = segment.copy()
|
|
aligned_segment['aligned_audio_path'] = segment.get('audio_path')
|
|
aligned_segment['alignment_success'] = True
|
|
aligned_segment['time_stretch_method'] = 'google_tts_speed'
|
|
aligned_segments.append(aligned_segment)
|
|
|
|
self.pipeline_state['aligned_segments'] = aligned_segments
|
|
|
|
# Use timestamp-aware placement
|
|
final_voice_path = self.work_dir / "voice_translated.wav"
|
|
self._place_stretched_segments_at_timestamps(aligned_segments, str(final_voice_path), sample_rate)
|
|
|
|
console.print(f"Pre-timed segments positioned: {len(aligned_segments)} segments")
|
|
|
|
else:
|
|
# Traditional librosa stretching for non-Google TTS
|
|
self.stretcher = AudioStretcher(sample_rate=sample_rate)
|
|
|
|
# Create alignment output directory
|
|
align_dir = self.work_dir / "aligned"
|
|
align_dir.mkdir(exist_ok=True)
|
|
|
|
# Align segments with librosa
|
|
tts_dir = self.work_dir / "tts"
|
|
|
|
aligned_segments = self.stretcher.align_segments(
|
|
segments, str(tts_dir), str(align_dir)
|
|
)
|
|
|
|
self.pipeline_state['aligned_segments'] = aligned_segments
|
|
|
|
# Use timestamp-aware placement
|
|
final_voice_path = self.work_dir / "voice_translated.wav"
|
|
self._place_stretched_segments_at_timestamps(aligned_segments, str(final_voice_path), sample_rate)
|
|
|
|
successful = sum(1 for seg in aligned_segments if seg.get('alignment_success', False))
|
|
console.print(f"Librosa-aligned and positioned {successful}/{len(aligned_segments)} segments")
|
|
|
|
else:
|
|
# No time stretching - use natural TTS timing
|
|
console.print("Time stretching disabled - using natural TTS timing")
|
|
|
|
# Simply concatenate TTS segments without stretching
|
|
final_voice_path = self.work_dir / "voice_translated.wav"
|
|
self._concatenate_natural_timing(segments, str(final_voice_path), sample_rate)
|
|
|
|
# Mark all segments as "aligned" but without actual stretching
|
|
aligned_segments = []
|
|
for segment in segments:
|
|
aligned_segment = segment.copy()
|
|
aligned_segment['aligned_audio_path'] = segment.get('audio_path')
|
|
aligned_segment['alignment_success'] = True
|
|
aligned_segment['time_stretch_applied'] = False
|
|
aligned_segments.append(aligned_segment)
|
|
|
|
self.pipeline_state['aligned_segments'] = aligned_segments
|
|
console.print(f"Natural timing preserved for {len(aligned_segments)} segments")
|
|
|
|
self.pipeline_state['final_voice'] = str(final_voice_path)
|
|
|
|
except StretchError as e:
|
|
raise VTBPError(f"Audio alignment failed: {e}")
|
|
|
|
def _concatenate_natural_timing(self, segments: List[Dict[str, Any]],
|
|
output_path: str, sample_rate: int):
|
|
"""
|
|
Place TTS segments at their correct timestamp positions (timing-aware placement).
|
|
|
|
Args:
|
|
segments: List of synthesized segments with start/end timestamps
|
|
output_path: Output audio file path
|
|
sample_rate: Sample rate for output
|
|
"""
|
|
import soundfile as sf
|
|
import numpy as np
|
|
|
|
try:
|
|
# Calculate total duration needed
|
|
if not segments:
|
|
raise VTBPError("No segments provided for timing placement")
|
|
|
|
# Get original video duration for reference
|
|
original_duration = self.ffmpeg_io.get_duration(self.pipeline_state['input_video'])
|
|
|
|
# Find the latest end time from segments
|
|
max_segment_end = max(seg.get('end', 0) for seg in segments)
|
|
|
|
# Use the longer of the two durations to ensure we don't cut off audio
|
|
max_end_time = max(original_duration, max_segment_end)
|
|
total_samples = int(max_end_time * sample_rate) + sample_rate # Add 1 second buffer
|
|
|
|
# Create empty audio buffer
|
|
final_audio = np.zeros(total_samples, dtype=np.float32)
|
|
|
|
console.print(f"Creating timed audio track:")
|
|
console.print(f" Original video duration: {original_duration:.2f}s")
|
|
console.print(f" Max segment end time: {max_segment_end:.2f}s")
|
|
console.print(f" Final track duration: {max_end_time:.2f}s")
|
|
|
|
# Sort segments by start time to handle overlaps
|
|
sorted_segments = sorted(segments, key=lambda s: s.get('start', 0))
|
|
|
|
# Detect and fix overlapping segments
|
|
fixed_segments = []
|
|
for i, segment in enumerate(sorted_segments):
|
|
current_start = segment.get('start', 0)
|
|
current_end = segment.get('end', 0)
|
|
|
|
# Check for overlap with previous segment
|
|
if fixed_segments:
|
|
prev_end = fixed_segments[-1].get('end', 0)
|
|
if current_start < prev_end:
|
|
console.print(f"⚠️ Overlap detected: Segment {i} starts at {current_start:.2f}s, "
|
|
f"previous ends at {prev_end:.2f}s")
|
|
# Adjust start time to avoid overlap
|
|
segment = segment.copy()
|
|
segment['start'] = prev_end + 0.1 # 100ms gap
|
|
console.print(f" → Adjusted to start at {segment['start']:.2f}s")
|
|
|
|
fixed_segments.append(segment)
|
|
|
|
console.print(f"Processing {len(fixed_segments)} segments (fixed overlaps)")
|
|
|
|
placed_segments = 0
|
|
for segment in fixed_segments:
|
|
audio_path = segment.get('audio_path')
|
|
start_time = segment.get('start', 0)
|
|
end_time = segment.get('end', 0)
|
|
|
|
if audio_path and os.path.exists(audio_path) and start_time is not None:
|
|
try:
|
|
# Load synthesized audio
|
|
audio, sr = sf.read(audio_path)
|
|
|
|
# Convert to mono if needed
|
|
if len(audio.shape) > 1:
|
|
audio_mono = np.mean(audio, axis=1)
|
|
else:
|
|
audio_mono = audio
|
|
|
|
# Resample if needed
|
|
if sr != sample_rate:
|
|
import scipy.signal
|
|
audio_mono = scipy.signal.resample(
|
|
audio_mono, int(len(audio_mono) * sample_rate / sr)
|
|
)
|
|
|
|
# Calculate placement position
|
|
start_sample = int(start_time * sample_rate)
|
|
segment_length = len(audio_mono)
|
|
end_sample = start_sample + segment_length
|
|
|
|
# Ensure we don't exceed buffer
|
|
if end_sample > len(final_audio):
|
|
end_sample = len(final_audio)
|
|
audio_mono = audio_mono[:end_sample - start_sample]
|
|
|
|
# Place audio at correct timestamp position
|
|
if start_sample < len(final_audio):
|
|
final_audio[start_sample:end_sample] = audio_mono
|
|
placed_segments += 1
|
|
|
|
# Debug timing info
|
|
actual_end_time = end_sample / sample_rate
|
|
tts_duration = segment_length / sample_rate
|
|
original_duration = end_time - start_time
|
|
|
|
console.print(f"✓ Segment {segment.get('id', '?')}: {start_time:.2f}s-{actual_end_time:.2f}s")
|
|
console.print(f" TTS duration: {tts_duration:.2f}s, Original: {original_duration:.2f}s")
|
|
|
|
# Check for timing issues
|
|
if abs(tts_duration - original_duration) > 1.0:
|
|
console.print(f" ⚠️ Large timing difference: {tts_duration-original_duration:+.2f}s")
|
|
|
|
except Exception as e:
|
|
console.print(f"Warning: Failed to place segment at {start_time:.2f}s: {e}")
|
|
continue
|
|
|
|
if placed_segments == 0:
|
|
raise VTBPError("No audio segments could be placed")
|
|
|
|
# Convert to stereo
|
|
final_stereo = np.column_stack([final_audio, final_audio])
|
|
|
|
# Save
|
|
sf.write(output_path, final_stereo, sample_rate, subtype='PCM_16')
|
|
|
|
duration = len(final_audio) / sample_rate
|
|
console.print(f"Timing-aware audio created: {output_path}")
|
|
console.print(f"Duration: {duration:.2f}s, Placed segments: {placed_segments}/{len(segments)}")
|
|
|
|
except Exception as e:
|
|
raise VTBPError(f"Timing-aware audio placement failed: {e}")
|
|
|
|
def _place_stretched_segments_at_timestamps(self, aligned_segments: List[Dict[str, Any]],
|
|
output_path: str, sample_rate: int):
|
|
"""
|
|
Place time-stretched segments at their correct timestamp positions.
|
|
|
|
Args:
|
|
aligned_segments: List of time-aligned segments with stretched audio
|
|
output_path: Output audio file path
|
|
sample_rate: Sample rate for output
|
|
"""
|
|
import soundfile as sf
|
|
import numpy as np
|
|
|
|
try:
|
|
# Calculate total duration needed
|
|
if not aligned_segments:
|
|
raise VTBPError("No aligned segments provided for placement")
|
|
|
|
# Get original video duration for reference
|
|
original_duration = self.ffmpeg_io.get_duration(self.pipeline_state['input_video'])
|
|
|
|
# Find the latest end time from segments
|
|
max_segment_end = max(seg.get('end', 0) for seg in aligned_segments)
|
|
|
|
# Use the longer of the two durations
|
|
max_end_time = max(original_duration, max_segment_end)
|
|
total_samples = int(max_end_time * sample_rate) + sample_rate # Add 1 second buffer
|
|
|
|
# Create empty audio buffer
|
|
final_audio = np.zeros(total_samples, dtype=np.float32)
|
|
|
|
console.print(f"🎵 Placing {len(aligned_segments)} stretched segments at original timestamps:")
|
|
console.print(f" Total timeline duration: {max_end_time:.2f}s")
|
|
|
|
placed_segments = 0
|
|
for segment in aligned_segments:
|
|
# Use the aligned (stretched) audio path
|
|
audio_path = segment.get('aligned_audio_path')
|
|
start_time = segment.get('start', 0)
|
|
end_time = segment.get('end', 0)
|
|
segment_id = segment.get('id', '?')
|
|
|
|
if audio_path and os.path.exists(audio_path) and start_time is not None:
|
|
try:
|
|
# Load stretched audio
|
|
audio, sr = sf.read(audio_path)
|
|
|
|
# Convert to mono if needed
|
|
if len(audio.shape) > 1:
|
|
audio_mono = np.mean(audio, axis=1)
|
|
else:
|
|
audio_mono = audio
|
|
|
|
# Resample if needed
|
|
if sr != sample_rate:
|
|
import scipy.signal
|
|
audio_mono = scipy.signal.resample(
|
|
audio_mono, int(len(audio_mono) * sample_rate / sr)
|
|
)
|
|
|
|
# Calculate placement position using ORIGINAL timestamps
|
|
start_sample = int(start_time * sample_rate)
|
|
segment_length = len(audio_mono)
|
|
end_sample = start_sample + segment_length
|
|
|
|
# Ensure we don't exceed buffer
|
|
if end_sample > len(final_audio):
|
|
end_sample = len(final_audio)
|
|
audio_mono = audio_mono[:end_sample - start_sample]
|
|
|
|
# Place stretched audio at original timestamp position
|
|
if start_sample < len(final_audio):
|
|
final_audio[start_sample:end_sample] = audio_mono
|
|
placed_segments += 1
|
|
|
|
# Debug timing info
|
|
actual_end_time = end_sample / sample_rate
|
|
stretched_duration = segment_length / sample_rate
|
|
original_duration = end_time - start_time
|
|
|
|
console.print(f"✅ Segment {segment_id}: Placed at {start_time:.2f}s-{actual_end_time:.2f}s")
|
|
console.print(f" Stretched: {stretched_duration:.2f}s, Target: {original_duration:.2f}s")
|
|
|
|
except Exception as e:
|
|
console.print(f"❌ Failed to place segment {segment_id} at {start_time:.2f}s: {e}")
|
|
continue
|
|
else:
|
|
console.print(f"⚠️ Segment {segment_id}: No aligned audio or invalid timing")
|
|
|
|
if placed_segments == 0:
|
|
raise VTBPError("No stretched segments could be placed")
|
|
|
|
# Convert to stereo
|
|
final_stereo = np.column_stack([final_audio, final_audio])
|
|
|
|
# Save
|
|
sf.write(output_path, final_stereo, sample_rate, subtype='PCM_16')
|
|
|
|
duration = len(final_audio) / sample_rate
|
|
console.print(f"🎯 Stretched segments placed at timestamps: {output_path}")
|
|
console.print(f"Duration: {duration:.2f}s, Placed segments: {placed_segments}/{len(aligned_segments)}")
|
|
|
|
except Exception as e:
|
|
raise VTBPError(f"Stretched segment placement failed: {e}")
|
|
|
|
def _load_vad_config(self) -> Dict[str, Any]:
|
|
"""Load VAD configuration from environment variables."""
|
|
return {
|
|
'threshold': float(os.getenv('VAD_THRESHOLD', '0.5')),
|
|
'min_speech_ms': int(os.getenv('VAD_MIN_SPEECH_MS', '350')),
|
|
'min_silence_ms': int(os.getenv('VAD_MIN_SILENCE_MS', '180')),
|
|
'pad_pre_ms': int(os.getenv('VAD_PAD_PRE_MS', '50')),
|
|
'pad_post_ms': int(os.getenv('VAD_PAD_POST_MS', '80'))
|
|
}
|
|
|
|
def _mix_audio(self, kwargs: Dict, progress: Progress, task: TaskID):
|
|
"""Mix translated voice with bed audio."""
|
|
try:
|
|
sample_rate = kwargs.get('sample_rate', 48000)
|
|
|
|
# Initialize mixer
|
|
self.mixer = AudioMixer(sample_rate=sample_rate)
|
|
|
|
# Mix parameters
|
|
voice_gain = kwargs.get('voice_gain', 0.0)
|
|
bed_gain = kwargs.get('bed_gain', -3.0)
|
|
enable_ducking = kwargs.get('enable_ducking', True)
|
|
duck_threshold = kwargs.get('duck_threshold', 0.08)
|
|
duck_ratio = kwargs.get('duck_ratio', 6.0)
|
|
duck_attack = kwargs.get('duck_attack', 5.0)
|
|
duck_release = kwargs.get('duck_release', 250.0)
|
|
enable_loudness = kwargs.get('enable_loudness_norm', True)
|
|
lufs_target = kwargs.get('lufs_target', -16.0)
|
|
|
|
# Create final mix
|
|
final_audio_path = self.work_dir / "final_audio.wav"
|
|
|
|
mix_info = self.mixer.create_final_mix(
|
|
self.pipeline_state['final_voice'],
|
|
self.pipeline_state['bed_audio'],
|
|
str(final_audio_path),
|
|
voice_gain=voice_gain,
|
|
bed_gain=bed_gain,
|
|
enable_ducking=enable_ducking,
|
|
duck_threshold=duck_threshold,
|
|
duck_ratio=duck_ratio,
|
|
duck_attack=duck_attack,
|
|
duck_release=duck_release,
|
|
enable_loudness_norm=enable_loudness,
|
|
lufs_target=lufs_target
|
|
)
|
|
|
|
self.pipeline_state['final_mixed'] = str(final_audio_path)
|
|
self.pipeline_state['mix_info'] = mix_info
|
|
|
|
console.print(f"Final mix created: {final_audio_path}")
|
|
|
|
if enable_loudness and 'loudness_measurements' in mix_info:
|
|
measurements = mix_info['loudness_measurements']
|
|
console.print(f"Loudness: {measurements.get('output_i', 'N/A')} LUFS")
|
|
|
|
except MixError as e:
|
|
raise VTBPError(f"Audio mixing failed: {e}")
|
|
|
|
def _create_final_video(self, input_path: str, output_path: str, kwargs: Dict,
|
|
progress: Progress, task: TaskID):
|
|
"""Create final video with translated audio."""
|
|
try:
|
|
# Remux video with new audio
|
|
copy_video = kwargs.get('copy_video', True)
|
|
|
|
self.ffmpeg_io.remux_video_with_audio(
|
|
input_path,
|
|
self.pipeline_state['final_mixed'],
|
|
output_path,
|
|
copy_video=copy_video
|
|
)
|
|
|
|
self.pipeline_state['output_video'] = output_path
|
|
|
|
console.print(f"Final video created: {output_path}")
|
|
|
|
except FFmpegError as e:
|
|
raise VTBPError(f"Video creation failed: {e}")
|
|
|
|
def _generate_results_summary(self) -> Dict[str, Any]:
|
|
"""Generate pipeline results summary."""
|
|
transcript = self.pipeline_state.get('transcript', {})
|
|
translated_segments = self.pipeline_state.get('translated_segments', [])
|
|
aligned_segments = self.pipeline_state.get('aligned_segments', [])
|
|
mix_info = self.pipeline_state.get('mix_info', {})
|
|
|
|
return {
|
|
'input_video': self.pipeline_state.get('input_video'),
|
|
'output_video': self.pipeline_state.get('output_video'),
|
|
'transcript_language': transcript.get('language'),
|
|
'transcript_confidence': transcript.get('language_probability'),
|
|
'total_segments': len(translated_segments),
|
|
'translation_success_rate': sum(1 for seg in translated_segments
|
|
if not seg.get('translation_error')) / len(translated_segments) if translated_segments else 0,
|
|
'synthesis_success_rate': sum(1 for seg in aligned_segments
|
|
if seg.get('synthesized', False)) / len(aligned_segments) if aligned_segments else 0,
|
|
'alignment_success_rate': sum(1 for seg in aligned_segments
|
|
if seg.get('alignment_success', False)) / len(aligned_segments) if aligned_segments else 0,
|
|
'mix_settings': mix_info.get('settings', {}),
|
|
'loudness_measurements': mix_info.get('loudness_measurements', {}),
|
|
'work_directory': str(self.work_dir),
|
|
'temp_files_kept': self.keep_temp
|
|
}
|
|
|
|
def _cleanup_temp_files(self):
|
|
"""Clean up temporary files."""
|
|
try:
|
|
if self.work_dir.exists():
|
|
shutil.rmtree(self.work_dir)
|
|
console.print(f"Cleaned up temporary files: {self.work_dir}")
|
|
except Exception as e:
|
|
console.print(f"[yellow]Warning: Failed to clean up temp files: {e}[/yellow]")
|
|
|
|
|
|
@app.command()
|
|
def translate(
|
|
input_path: str = typer.Argument(..., help="Input video file path"),
|
|
output_path: str = typer.Argument(..., help="Output video file path"),
|
|
src_lang: str = typer.Option("auto", "--src-lang", help="Source language code"),
|
|
tgt_lang: str = typer.Option("es", "--tgt-lang", help="Target language code"),
|
|
|
|
# Provider selection (NEW API OPTIONS)
|
|
asr_provider: str = typer.Option("whisper", "--asr-provider", help="ASR provider (whisper/gemini)"),
|
|
tts_provider: str = typer.Option("piper", "--tts-provider", help="TTS provider (piper/google)"),
|
|
translation_provider: str = typer.Option("opus", "--translation-provider", help="Translation provider (opus/gemini)"),
|
|
|
|
# VAD mode (NEW)
|
|
vad_mode: bool = typer.Option(False, "--vad-mode", help="Use waveform-based VAD for precise speech detection"),
|
|
|
|
# Voice and model options
|
|
voice_path: Optional[str] = typer.Option(None, "--voice", help="TTS voice (Piper .onnx path or Google voice name)"),
|
|
sep_model: str = typer.Option("htdemucs", "--sep", help="Audio separation model"),
|
|
asr_model: str = typer.Option("large-v2", "--asr", help="ASR model size (for local whisper)"),
|
|
|
|
# System options
|
|
device: Optional[str] = typer.Option(None, "--device", help="Processing device (cpu/cuda/mps/auto)"),
|
|
work_dir: str = typer.Option("work", "--work-dir", help="Working directory for temporary files"),
|
|
keep_temp: bool = typer.Option(False, "--keep-temp", help="Keep temporary files"),
|
|
|
|
# Audio processing options
|
|
lufs_target: float = typer.Option(-16.0, "--lufs", help="Target LUFS for loudness normalization"),
|
|
duck_threshold: float = typer.Option(0.08, "--duck-threshold", help="Ducking threshold (0.0-1.0)"),
|
|
duck_ratio: float = typer.Option(6.0, "--duck-ratio", help="Ducking compression ratio"),
|
|
duck_attack: float = typer.Option(5.0, "--duck-attack", help="Ducking attack time (ms)"),
|
|
duck_release: float = typer.Option(250.0, "--duck-release", help="Ducking release time (ms)"),
|
|
voice_gain: float = typer.Option(0.0, "--voice-gain", help="Voice gain adjustment (dB)"),
|
|
bed_gain: float = typer.Option(-3.0, "--bed-gain", help="Bed gain adjustment (dB)"),
|
|
disable_ducking: bool = typer.Option(False, "--no-duck", help="Disable sidechain ducking"),
|
|
disable_loudness: bool = typer.Option(False, "--no-loudnorm", help="Disable loudness normalization"),
|
|
disable_time_stretch: bool = typer.Option(False, "--no-stretch", help="Disable time stretching (use natural TTS timing)"),
|
|
sample_rate: int = typer.Option(48000, "--sample-rate", help="Audio sample rate"),
|
|
|
|
# API options
|
|
estimate_cost: bool = typer.Option(False, "--estimate-cost", help="Show cost estimation for API providers"),
|
|
validate_apis: bool = typer.Option(False, "--validate-apis", help="Validate API credentials before processing"),
|
|
):
|
|
"""
|
|
Translate video voice while preserving background music and sound effects.
|
|
|
|
Examples:
|
|
# API mode (recommended for Mac)
|
|
vtbp translate input.mp4 output.mp4 --asr-provider gemini --tts-provider google
|
|
|
|
# Local mode
|
|
vtbp translate input.mp4 output.mp4 --voice spanish_voice.onnx
|
|
|
|
# Hybrid mode
|
|
vtbp translate input.mp4 output.mp4 --asr-provider gemini --tts-provider piper
|
|
"""
|
|
try:
|
|
# Validate inputs
|
|
if not os.path.exists(input_path):
|
|
raise typer.BadParameter(f"Input file does not exist: {input_path}")
|
|
|
|
# Validate voice path for local providers
|
|
if voice_path and tts_provider == 'piper' and not os.path.exists(voice_path):
|
|
raise typer.BadParameter(f"Voice model does not exist: {voice_path}")
|
|
|
|
# Create output directory if needed
|
|
output_dir = os.path.dirname(output_path)
|
|
if output_dir: # Only create directory if path has a directory component
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Initialize API manager
|
|
api_manager = APIManager()
|
|
|
|
# Validate provider options
|
|
providers = {
|
|
'asr': asr_provider,
|
|
'tts': tts_provider,
|
|
'translation': translation_provider
|
|
}
|
|
|
|
# Validate API credentials if needed
|
|
if validate_apis or any(p in ['gemini', 'google'] for p in providers.values()):
|
|
validation_results = api_manager.validate_api_keys(providers)
|
|
missing_creds = api_manager.get_missing_credentials(providers)
|
|
|
|
if missing_creds:
|
|
console.print("[red]❌ Missing API credentials:[/red]")
|
|
for provider_type, message in missing_creds.items():
|
|
console.print(f" • {provider_type}: {message}")
|
|
console.print(f"\n{api_manager.setup_environment_guide()}")
|
|
raise typer.Exit(1)
|
|
else:
|
|
console.print("[green]✅ API credentials validated[/green]")
|
|
|
|
# Cost estimation
|
|
if estimate_cost:
|
|
# Rough estimation based on video duration
|
|
try:
|
|
ffmpeg_io = FFmpegIO()
|
|
duration = ffmpeg_io.get_duration(input_path)
|
|
text_length = int(duration * 20) # ~20 chars per second estimate
|
|
|
|
cost_estimate = api_manager.estimate_api_costs(providers, duration, text_length)
|
|
|
|
console.print("\n[bold yellow]💰 Cost Estimation:[/bold yellow]")
|
|
total_cost = 0
|
|
for provider_type, cost_info in cost_estimate.items():
|
|
if provider_type != 'total':
|
|
cost = cost_info.get('cost_usd', 0)
|
|
provider_name = cost_info.get('provider', 'N/A')
|
|
console.print(f" • {provider_type.title()}: ${cost:.4f} ({provider_name})")
|
|
total_cost += cost
|
|
|
|
console.print(f" • [bold]Total: ${total_cost:.4f}[/bold]")
|
|
console.print(f" Note: {cost_estimate['total']['note']}")
|
|
|
|
if not typer.confirm("Continue with translation?"):
|
|
raise typer.Exit(0)
|
|
except Exception as e:
|
|
console.print(f"[yellow]Warning: Could not estimate costs: {e}[/yellow]")
|
|
|
|
# Show configuration
|
|
console.print("\n[bold blue]VTBP - Voice Translation Pipeline[/bold blue]")
|
|
|
|
config_table = Table(title="Configuration")
|
|
config_table.add_column("Parameter", style="cyan")
|
|
config_table.add_column("Value", style="white")
|
|
|
|
config_table.add_row("Input Video", input_path)
|
|
config_table.add_row("Output Video", output_path)
|
|
config_table.add_row("Source Language", src_lang)
|
|
config_table.add_row("Target Language", tgt_lang)
|
|
config_table.add_row("", "") # Separator
|
|
config_table.add_row("ASR Provider", f"{asr_provider} ({'API' if asr_provider == 'gemini' else 'Local'})")
|
|
config_table.add_row("Translation Provider", f"{translation_provider} ({'API' if translation_provider == 'gemini' else 'Local'})")
|
|
config_table.add_row("TTS Provider", f"{tts_provider} ({'API' if tts_provider == 'google' else 'Local'})")
|
|
config_table.add_row("VAD Mode", "Enabled (Silero)" if vad_mode else "Disabled")
|
|
config_table.add_row("", "") # Separator
|
|
config_table.add_row("Voice/Model", voice_path or "Auto-selected")
|
|
config_table.add_row("Separation Model", sep_model)
|
|
config_table.add_row("Device", device or "Auto")
|
|
config_table.add_row("LUFS Target", str(lufs_target))
|
|
config_table.add_row("Ducking", "Disabled" if disable_ducking else "Enabled")
|
|
config_table.add_row("Loudness Norm", "Disabled" if disable_loudness else "Enabled")
|
|
config_table.add_row("Time Stretching", "Disabled" if disable_time_stretch else "Enabled")
|
|
|
|
console.print(config_table)
|
|
console.print()
|
|
|
|
# Initialize pipeline
|
|
pipeline = VTBPPipeline(work_dir=work_dir, keep_temp=keep_temp)
|
|
|
|
# Run translation
|
|
kwargs = {
|
|
# Provider options
|
|
'asr_provider': asr_provider,
|
|
'tts_provider': tts_provider,
|
|
'translation_provider': translation_provider,
|
|
'target_language': tgt_lang,
|
|
'vad_mode': vad_mode, # NEW VAD option
|
|
|
|
# Model options
|
|
'sep_model': sep_model,
|
|
'asr_model': asr_model,
|
|
'device': device,
|
|
'sample_rate': sample_rate,
|
|
|
|
# Audio processing
|
|
'voice_gain': voice_gain,
|
|
'bed_gain': bed_gain,
|
|
'enable_ducking': not disable_ducking,
|
|
'duck_threshold': duck_threshold,
|
|
'duck_ratio': duck_ratio,
|
|
'duck_attack': duck_attack,
|
|
'duck_release': duck_release,
|
|
'enable_loudness_norm': not disable_loudness,
|
|
'enable_time_stretch': not disable_time_stretch,
|
|
'lufs_target': lufs_target,
|
|
}
|
|
|
|
results = pipeline.run_translation_pipeline(
|
|
input_path, output_path, src_lang, tgt_lang, voice_path, **kwargs
|
|
)
|
|
|
|
# Show results
|
|
console.print(f"\n[bold green]Translation completed successfully![/bold green]")
|
|
console.print(f"Output: {results['output_video']}")
|
|
console.print(f"Language: {results['transcript_language']} → {tgt_lang}")
|
|
console.print(f"Segments: {results['total_segments']}")
|
|
console.print(f"Success rates - Translation: {results['translation_success_rate']:.1%}, "
|
|
f"Synthesis: {results['synthesis_success_rate']:.1%}, "
|
|
f"Alignment: {results['alignment_success_rate']:.1%}")
|
|
|
|
if results.get('loudness_measurements'):
|
|
measurements = results['loudness_measurements']
|
|
console.print(f"Final loudness: {measurements.get('output_i', 'N/A')} LUFS")
|
|
|
|
if keep_temp:
|
|
console.print(f"Temporary files kept in: {results['work_directory']}")
|
|
|
|
except VTBPError as e:
|
|
console.print(f"[red]Error: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
except KeyboardInterrupt:
|
|
console.print(f"\n[yellow]Interrupted by user[/yellow]")
|
|
raise typer.Exit(1)
|
|
except Exception as e:
|
|
import traceback
|
|
console.print(f"[red]Unexpected error: {e}[/red]")
|
|
console.print(f"[yellow]Full traceback:[/yellow]")
|
|
console.print(traceback.format_exc())
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def info():
|
|
"""Show information about VTBP and available models."""
|
|
console.print("\n[bold blue]VTBP - Voice Translate Bed Preserve[/bold blue]")
|
|
console.print("Translate video voice while preserving background music and sound effects\n")
|
|
|
|
console.print("[bold]Available Components:[/bold]")
|
|
console.print("• Audio Separation: Demucs (htdemucs, mdx models)")
|
|
console.print("• Speech Recognition: Faster-Whisper (tiny to large-v2)")
|
|
console.print("• Translation: OPUS-MT models (Helsinki-NLP)")
|
|
console.print("• Text-to-Speech: Piper TTS (.onnx voice models)")
|
|
console.print("• Time Stretching: Rubber Band (pyrubberband)")
|
|
console.print("• Audio Mixing: FFmpeg (sidechain, loudness normalization)")
|
|
|
|
console.print("\n[bold]Supported Languages (examples):[/bold]")
|
|
langs = ["en", "es", "fr", "de", "it", "pt", "ru", "zh", "ja", "ko", "ar", "hi"]
|
|
console.print("• " + ", ".join(langs) + " (and many more)")
|
|
|
|
console.print("\n[bold]Example Usage:[/bold]")
|
|
console.print("vtbp translate input.mp4 output.mp4 --src-lang en --tgt-lang es --voice spanish_voice.onnx")
|
|
|
|
|
|
@app.command()
|
|
def version():
|
|
"""Show version information."""
|
|
console.print("VTBP (Voice Translate Bed Preserve) v1.0.0")
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
app()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |