211 lines
7.6 KiB
Python
Executable file
211 lines
7.6 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Voice to Text API using OpenAI Whisper with DeepL Translation
|
|
Transcribes audio files to text, VTT, or SRT format and optionally translates them
|
|
"""
|
|
|
|
from flask import Flask, request, jsonify, send_file
|
|
from flask_cors import CORS
|
|
import whisper
|
|
import deepl
|
|
import os
|
|
import tempfile
|
|
from datetime import timedelta
|
|
import logging
|
|
|
|
app = Flask(__name__)
|
|
CORS(app)
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Load Whisper model (using base model for balance of speed and accuracy)
|
|
# Options: tiny, base, small, medium, large
|
|
logger.info("Loading Whisper model...")
|
|
model = whisper.load_model("base")
|
|
logger.info("Whisper model loaded successfully")
|
|
|
|
# Initialize DeepL translator
|
|
DEEPL_API_KEY = "28743b40-d23f-416d-8223-9b868c9531dc"
|
|
translator = deepl.Translator(DEEPL_API_KEY)
|
|
logger.info("DeepL translator initialized")
|
|
|
|
# Directory for output files
|
|
OUTPUT_DIR = os.path.join(os.path.dirname(__file__), 'outputs')
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
|
|
|
|
def format_timestamp(seconds):
|
|
"""Convert seconds to SRT timestamp format (HH:MM:SS,mmm)"""
|
|
td = timedelta(seconds=seconds)
|
|
hours = td.seconds // 3600
|
|
minutes = (td.seconds % 3600) // 60
|
|
secs = td.seconds % 60
|
|
millis = td.microseconds // 1000
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
|
|
|
|
|
|
def format_timestamp_vtt(seconds):
|
|
"""Convert seconds to VTT timestamp format (HH:MM:SS.mmm)"""
|
|
td = timedelta(seconds=seconds)
|
|
hours = td.seconds // 3600
|
|
minutes = (td.seconds % 3600) // 60
|
|
secs = td.seconds % 60
|
|
millis = td.microseconds // 1000
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d}.{millis:03d}"
|
|
|
|
|
|
def generate_srt(segments):
|
|
"""Generate SRT format from Whisper segments"""
|
|
srt_content = []
|
|
for i, segment in enumerate(segments, 1):
|
|
start = format_timestamp(segment['start'])
|
|
end = format_timestamp(segment['end'])
|
|
text = segment['text'].strip()
|
|
srt_content.append(f"{i}\n{start} --> {end}\n{text}\n")
|
|
return "\n".join(srt_content)
|
|
|
|
|
|
def generate_vtt(segments):
|
|
"""Generate VTT format from Whisper segments"""
|
|
vtt_content = ["WEBVTT\n"]
|
|
for segment in segments:
|
|
start = format_timestamp_vtt(segment['start'])
|
|
end = format_timestamp_vtt(segment['end'])
|
|
text = segment['text'].strip()
|
|
vtt_content.append(f"{start} --> {end}\n{text}\n")
|
|
return "\n".join(vtt_content)
|
|
|
|
|
|
def translate_text(text, target_lang):
|
|
"""Translate text using DeepL API"""
|
|
try:
|
|
logger.info(f"Translating text to {target_lang}...")
|
|
result = translator.translate_text(text, target_lang=target_lang)
|
|
return result.text
|
|
except deepl.exceptions.DeepLException as e:
|
|
logger.error(f"DeepL translation error: {str(e)}")
|
|
raise Exception(f"Translation failed: {str(e)}")
|
|
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health_check():
|
|
"""Health check endpoint"""
|
|
return jsonify({"status": "healthy", "model": "whisper-base"})
|
|
|
|
|
|
@app.route('/transcribe', methods=['POST'])
|
|
def transcribe():
|
|
"""
|
|
Transcribe audio file to text, VTT, or SRT format with optional translation
|
|
Expects: multipart/form-data with 'audio' file, 'format' (txt/vtt/srt),
|
|
'translate' (0/1), and 'target_lang' (e.g., 'EN-US')
|
|
"""
|
|
try:
|
|
# Check if audio file is present
|
|
if 'audio' not in request.files:
|
|
return jsonify({"error": "No audio file provided"}), 400
|
|
|
|
audio_file = request.files['audio']
|
|
output_format = request.form.get('format', 'txt').lower()
|
|
enable_translation = request.form.get('translate', '0') == '1'
|
|
target_lang = request.form.get('target_lang', 'EN-US')
|
|
|
|
if audio_file.filename == '':
|
|
return jsonify({"error": "Empty filename"}), 400
|
|
|
|
# Validate format
|
|
if output_format not in ['txt', 'vtt', 'srt']:
|
|
return jsonify({"error": "Invalid format. Use txt, vtt, or srt"}), 400
|
|
|
|
logger.info(f"Processing {audio_file.filename} - Format: {output_format}, Translation: {enable_translation}, Target: {target_lang}")
|
|
|
|
# Save uploaded file temporarily
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(audio_file.filename)[1]) as temp_audio:
|
|
audio_file.save(temp_audio.name)
|
|
temp_audio_path = temp_audio.name
|
|
|
|
try:
|
|
# Transcribe with Whisper
|
|
logger.info(f"Transcribing {audio_file.filename}...")
|
|
result = model.transcribe(temp_audio_path, verbose=False)
|
|
logger.info("Transcription complete")
|
|
|
|
# Generate output based on format
|
|
if output_format == 'txt':
|
|
content = result['text']
|
|
mimetype = 'text/plain'
|
|
extension = 'txt'
|
|
elif output_format == 'vtt':
|
|
content = generate_vtt(result['segments'])
|
|
mimetype = 'text/vtt'
|
|
extension = 'vtt'
|
|
elif output_format == 'srt':
|
|
content = generate_srt(result['segments'])
|
|
mimetype = 'text/plain'
|
|
extension = 'srt'
|
|
|
|
# Save original output file
|
|
base_filename = os.path.splitext(audio_file.filename)[0]
|
|
output_filename = f"{base_filename}_original.{extension}"
|
|
output_path = os.path.join(OUTPUT_DIR, output_filename)
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
response_data = {
|
|
"success": True,
|
|
"text": result['text'] if output_format == 'txt' else None,
|
|
"content": content, # Return content for all formats
|
|
"filename": output_filename,
|
|
"format": output_format
|
|
}
|
|
|
|
# Handle translation if requested
|
|
if enable_translation:
|
|
logger.info(f"Translating to {target_lang}...")
|
|
translated_content = translate_text(content, target_lang)
|
|
|
|
# Save translated output file
|
|
translated_filename = f"{base_filename}_translated.{extension}"
|
|
translated_path = os.path.join(OUTPUT_DIR, translated_filename)
|
|
|
|
with open(translated_path, 'w', encoding='utf-8') as f:
|
|
f.write(translated_content)
|
|
|
|
response_data["translated_filename"] = translated_filename
|
|
response_data["translated_text"] = translated_content if output_format == 'txt' else None
|
|
response_data["translated_content"] = translated_content # Return translated content for all formats
|
|
logger.info("Translation complete")
|
|
|
|
return jsonify(response_data)
|
|
|
|
finally:
|
|
# Clean up temporary audio file
|
|
if os.path.exists(temp_audio_path):
|
|
os.remove(temp_audio_path)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during transcription: {str(e)}")
|
|
return jsonify({"error": f"Transcription failed: {str(e)}"}), 500
|
|
|
|
|
|
@app.route('/download/<filename>', methods=['GET'])
|
|
def download_file(filename):
|
|
"""Download a transcribed file"""
|
|
try:
|
|
file_path = os.path.join(OUTPUT_DIR, filename)
|
|
if not os.path.exists(file_path):
|
|
return jsonify({"error": "File not found"}), 404
|
|
|
|
return send_file(file_path, as_attachment=True)
|
|
except Exception as e:
|
|
logger.error(f"Error downloading file: {str(e)}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Run on port 5010 by default
|
|
port = int(os.environ.get('PORT', 5011))
|
|
app.run(host='0.0.0.0', port=port, debug=False)
|