852 lines
32 KiB
Python
852 lines
32 KiB
Python
"""
|
|
Subtitle Processor Service - Whisper + DeepL + FFmpeg
|
|
|
|
Full-featured subtitle processing with:
|
|
- Whisper transcription (multiple model sizes)
|
|
- DeepL translation (30+ languages)
|
|
- FFmpeg burning with full styling control
|
|
|
|
Styling Options:
|
|
- font: Font family (Arial, Helvetica, Times New Roman, etc.)
|
|
- font_size: Font size in points (default: 24)
|
|
- text_color: Primary text color (white, yellow, black, red, blue, green, orange, purple)
|
|
- outline_color: Outline/border color
|
|
- outline_width: Outline thickness (0-5, default: 2)
|
|
- background_color: Optional background box color
|
|
- background_opacity: Background box opacity (0-1)
|
|
- position: vertical position (bottom, top, center)
|
|
- alignment: horizontal alignment (left, center, right)
|
|
- margin_v: Vertical margin from edge (default: 30)
|
|
- margin_h: Horizontal margin (default: 20)
|
|
- shadow: Shadow depth (0-4)
|
|
- bold: Bold text (true/false)
|
|
- italic: Italic text (true/false)
|
|
|
|
Whisper Models:
|
|
- tiny: Fastest, lowest accuracy (~1GB VRAM)
|
|
- base: Fast, good accuracy (~1GB VRAM) - default
|
|
- small: Balanced (~2GB VRAM)
|
|
- medium: High accuracy (~5GB VRAM)
|
|
- large: Best accuracy (~10GB VRAM)
|
|
- large-v2: Latest large model
|
|
- large-v3: Newest model with best accuracy
|
|
"""
|
|
import os
|
|
import subprocess
|
|
from uuid import uuid4
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from app.database import SessionLocal
|
|
from app.models.job import Job
|
|
from app.models.asset import Asset
|
|
from app.config import settings
|
|
import structlog
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
# Supported languages for DeepL translation
|
|
SUPPORTED_LANGUAGES = {
|
|
'BG': 'Bulgarian',
|
|
'CS': 'Czech',
|
|
'DA': 'Danish',
|
|
'DE': 'German',
|
|
'EL': 'Greek',
|
|
'EN-GB': 'English (British)',
|
|
'EN-US': 'English (American)',
|
|
'ES': 'Spanish',
|
|
'ET': 'Estonian',
|
|
'FI': 'Finnish',
|
|
'FR': 'French',
|
|
'HU': 'Hungarian',
|
|
'ID': 'Indonesian',
|
|
'IT': 'Italian',
|
|
'JA': 'Japanese',
|
|
'KO': 'Korean',
|
|
'LT': 'Lithuanian',
|
|
'LV': 'Latvian',
|
|
'NB': 'Norwegian (Bokmål)',
|
|
'NL': 'Dutch',
|
|
'PL': 'Polish',
|
|
'PT-BR': 'Portuguese (Brazilian)',
|
|
'PT-PT': 'Portuguese (European)',
|
|
'RO': 'Romanian',
|
|
'RU': 'Russian',
|
|
'SK': 'Slovak',
|
|
'SL': 'Slovenian',
|
|
'SV': 'Swedish',
|
|
'TR': 'Turkish',
|
|
'UK': 'Ukrainian',
|
|
'ZH': 'Chinese (simplified)',
|
|
'ZH-HANS': 'Chinese (simplified)'
|
|
}
|
|
|
|
# Color mapping for ASS format (BGR order)
|
|
COLOR_MAP = {
|
|
'white': 'FFFFFF',
|
|
'yellow': '00FFFF',
|
|
'black': '000000',
|
|
'red': '0000FF',
|
|
'blue': 'FF0000',
|
|
'green': '00FF00',
|
|
'orange': '0080FF',
|
|
'purple': '800080',
|
|
'cyan': 'FFFF00',
|
|
'magenta': 'FF00FF',
|
|
'gray': '808080',
|
|
'silver': 'C0C0C0',
|
|
'gold': '00D7FF',
|
|
'lime': '00FF00',
|
|
'navy': '800000',
|
|
'teal': '808000',
|
|
'maroon': '000080',
|
|
'olive': '008080'
|
|
}
|
|
|
|
# Whisper model options
|
|
WHISPER_MODELS = {
|
|
'tiny': {'name': 'Tiny', 'vram': '~1GB', 'speed': 'fastest'},
|
|
'base': {'name': 'Base', 'vram': '~1GB', 'speed': 'fast'},
|
|
'small': {'name': 'Small', 'vram': '~2GB', 'speed': 'moderate'},
|
|
'medium': {'name': 'Medium', 'vram': '~5GB', 'speed': 'slow'},
|
|
'large': {'name': 'Large', 'vram': '~10GB', 'speed': 'slowest'},
|
|
'large-v2': {'name': 'Large V2', 'vram': '~10GB', 'speed': 'slowest'},
|
|
'large-v3': {'name': 'Large V3', 'vram': '~10GB', 'speed': 'slowest'}
|
|
}
|
|
|
|
# Font presets
|
|
FONT_PRESETS = {
|
|
'default': {'font': 'Arial', 'size': 24, 'outline': 2},
|
|
'cinematic': {'font': 'Helvetica', 'size': 28, 'outline': 3},
|
|
'documentary': {'font': 'Georgia', 'size': 22, 'outline': 1},
|
|
'news': {'font': 'Arial', 'size': 26, 'outline': 2},
|
|
'social_media': {'font': 'Arial Black', 'size': 32, 'outline': 4},
|
|
'minimal': {'font': 'Helvetica', 'size': 20, 'outline': 1},
|
|
'bold': {'font': 'Impact', 'size': 30, 'outline': 3}
|
|
}
|
|
|
|
|
|
def get_available_fonts():
|
|
"""Get list of available fonts on the system"""
|
|
try:
|
|
# Check if fc-list exists
|
|
subprocess.check_call(['which', 'fc-list'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
output = subprocess.check_output(['fc-list', ':', 'family'], stderr=subprocess.DEVNULL).decode('utf-8')
|
|
fonts = set()
|
|
for line in output.splitlines():
|
|
for font in line.split(','):
|
|
font = font.strip()
|
|
if font:
|
|
fonts.add(font)
|
|
return sorted(list(fonts))
|
|
except (subprocess.SubprocessError, FileNotFoundError):
|
|
return [
|
|
'Arial', 'Helvetica', 'Times New Roman', 'Courier New', 'Verdana',
|
|
'Georgia', 'Palatino', 'Garamond', 'Comic Sans MS', 'Trebuchet MS',
|
|
'Arial Black', 'Impact', 'Tahoma', 'Roboto', 'Open Sans', 'DejaVu Sans'
|
|
]
|
|
|
|
def _check_font_availability(font_name: str) -> str:
|
|
"""Check if font exists, return validated font or fallback"""
|
|
available = get_available_fonts()
|
|
|
|
# Direct match
|
|
if font_name in available:
|
|
return font_name
|
|
|
|
# Case insensitive match
|
|
font_lower = font_name.lower()
|
|
for f in available:
|
|
if f.lower() == font_lower:
|
|
return f
|
|
|
|
# Fallback to defaults if specific font not found
|
|
defaults = ['Arial', 'Helvetica', 'DejaVu Sans', 'FreeSans']
|
|
for default in defaults:
|
|
if default in available:
|
|
return default
|
|
|
|
# Last resort (ffmpeg typically has a default sans serif)
|
|
return 'Arial'
|
|
|
|
|
|
def get_subtitle_config():
|
|
"""Return available configuration options for subtitles"""
|
|
return {
|
|
"whisper_models": WHISPER_MODELS,
|
|
"supported_languages": SUPPORTED_LANGUAGES,
|
|
"colors": list(COLOR_MAP.keys()),
|
|
"fonts": get_available_fonts(),
|
|
"font_presets": FONT_PRESETS,
|
|
"positions": ["bottom", "top", "center"],
|
|
"alignments": ["left", "center", "right"]
|
|
}
|
|
|
|
|
|
async def process(job_id: str):
|
|
"""
|
|
Process video for subtitles - transcribe, translate, optionally burn
|
|
|
|
Input parameters:
|
|
- source_language: Source language code or "auto" for detection
|
|
- target_language: Target language code for translation (optional)
|
|
- burn_subtitles: Whether to burn subtitles into video
|
|
- whisper_model: Whisper model size (tiny/base/small/medium/large)
|
|
- font: Font family name
|
|
- font_size: Font size in points
|
|
- text_color: Primary text color
|
|
- outline_color: Text outline color
|
|
- outline_width: Outline thickness (0-5)
|
|
- background_color: Background box color (optional)
|
|
- background_opacity: Background opacity 0-1 (default 0)
|
|
- position: Vertical position (bottom/top/center)
|
|
- alignment: Horizontal alignment (left/center/right)
|
|
- margin_v: Vertical margin from edge
|
|
- margin_h: Horizontal margin
|
|
- shadow: Shadow depth (0-4)
|
|
- bold: Use bold text
|
|
- italic: Use italic text
|
|
- font_preset: Use a predefined style preset
|
|
- word_timestamps: Include word-level timestamps in output
|
|
- output_format: SRT, VTT, or ASS format
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
job = db.query(Job).filter(Job.id == job_id).first()
|
|
if not job:
|
|
return
|
|
|
|
input_data = job.input_data
|
|
input_asset_ids = job.input_asset_ids
|
|
|
|
if not input_asset_ids:
|
|
raise ValueError("No input asset provided")
|
|
|
|
input_asset = db.query(Asset).filter(Asset.id == input_asset_ids[0]).first()
|
|
if not input_asset:
|
|
raise ValueError("Input asset not found")
|
|
|
|
job.progress = 5
|
|
job.api_provider = "whisper"
|
|
db.commit()
|
|
|
|
# Get all parameters with defaults
|
|
source_language = input_data.get("source_language", "auto")
|
|
target_language = input_data.get("target_language")
|
|
burn_subtitles = input_data.get("burn_subtitles", False)
|
|
whisper_model = input_data.get("whisper_model", "base")
|
|
word_timestamps = input_data.get("word_timestamps", False)
|
|
output_format = input_data.get("output_format", "srt").lower()
|
|
|
|
# Styling parameters
|
|
font_preset = input_data.get("font_preset")
|
|
if font_preset and font_preset in FONT_PRESETS:
|
|
preset = FONT_PRESETS[font_preset]
|
|
font_req = input_data.get("font", preset['font'])
|
|
font_size = input_data.get("font_size", preset['size'])
|
|
outline_width = input_data.get("outline_width", preset['outline'])
|
|
else:
|
|
font_req = input_data.get("font", "Arial")
|
|
font_size = input_data.get("font_size", 24)
|
|
outline_width = input_data.get("outline_width", 2)
|
|
|
|
# Validate font availability
|
|
font = _check_font_availability(font_req)
|
|
if font != font_req:
|
|
logger.warning(f"Font '{font_req}' not found, falling back to '{font}'")
|
|
|
|
text_color = input_data.get("text_color", "white")
|
|
outline_color = input_data.get("outline_color", "black")
|
|
background_color = input_data.get("background_color")
|
|
background_opacity = input_data.get("background_opacity", 0)
|
|
position = input_data.get("position", "bottom")
|
|
alignment = input_data.get("alignment", "center")
|
|
margin_v = input_data.get("margin_v", 30)
|
|
margin_h = input_data.get("margin_h", 20)
|
|
shadow = input_data.get("shadow", 0)
|
|
bold = input_data.get("bold", False)
|
|
italic = input_data.get("italic", False)
|
|
|
|
# Extract audio from video
|
|
audio_path = os.path.join(settings.storage_path, "temp", f"{uuid4()}.wav")
|
|
os.makedirs(os.path.dirname(audio_path), exist_ok=True)
|
|
|
|
subprocess.run([
|
|
"ffmpeg", "-i", input_asset.file_path,
|
|
"-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
|
|
"-y", audio_path
|
|
], check=True, capture_output=True)
|
|
|
|
job.progress = 20
|
|
db.commit()
|
|
|
|
# Transcribe with Whisper
|
|
import whisper
|
|
|
|
logger.info(f"Loading Whisper model: {whisper_model}")
|
|
model = whisper.load_model(whisper_model)
|
|
|
|
transcribe_options = {
|
|
"language": None if source_language == "auto" else source_language,
|
|
"verbose": False,
|
|
"word_timestamps": word_timestamps
|
|
}
|
|
|
|
result = model.transcribe(audio_path, **transcribe_options)
|
|
|
|
job.progress = 50
|
|
job.api_model = f"whisper-{whisper_model}"
|
|
db.commit()
|
|
|
|
# Generate subtitle content
|
|
segments = result.get("segments", [])
|
|
detected_language = result.get("language", source_language)
|
|
|
|
if output_format == "vtt":
|
|
subtitle_content = _generate_vtt(segments, word_timestamps)
|
|
subtitle_ext = "vtt"
|
|
elif output_format == "ass":
|
|
subtitle_content = _generate_ass(segments, font, font_size, text_color, outline_color,
|
|
outline_width, position, alignment, margin_v, margin_h,
|
|
shadow, bold, italic, background_color, background_opacity)
|
|
subtitle_ext = "ass"
|
|
else:
|
|
subtitle_content = _generate_srt(segments)
|
|
subtitle_ext = "srt"
|
|
|
|
# Helper validation for font
|
|
font = _check_font_availability(font_req)
|
|
|
|
# Check for provided subtitle file
|
|
subtitle_asset_id = input_data.get("subtitle_asset_id")
|
|
|
|
segments = []
|
|
detected_language = source_language
|
|
subtitle_content = ""
|
|
transcription_result = {} # To store result for output_data
|
|
|
|
if subtitle_asset_id:
|
|
logger.info("Using provided subtitle asset", asset_id=subtitle_asset_id)
|
|
sub_asset = db.query(Asset).filter(Asset.id == subtitle_asset_id).first()
|
|
if not sub_asset or not os.path.exists(sub_asset.file_path):
|
|
raise ValueError("Provided subtitle asset not found")
|
|
|
|
with open(sub_asset.file_path, "r", encoding="utf-8") as f:
|
|
subtitle_content = f.read()
|
|
|
|
# Simple assumption: Input is SRT if we are parsing it
|
|
# In future we might check extension
|
|
segments = _parse_srt(subtitle_content)
|
|
job.progress = 50 # Skip transcription
|
|
transcription_result = {"text": "Imported from SRT", "language": source_language}
|
|
|
|
else:
|
|
# Transcribe with Whisper
|
|
job.api_provider = f"whisper-{whisper_model}"
|
|
|
|
import whisper
|
|
model = whisper.load_model(whisper_model)
|
|
|
|
# Extract audio to temp file
|
|
temp_audio = os.path.join(settings.storage_path, "temp", f"temp_{uuid4()}.wav")
|
|
os.makedirs(os.path.dirname(temp_audio), exist_ok=True)
|
|
|
|
try:
|
|
subprocess.run([
|
|
"ffmpeg", "-i", input_asset.file_path,
|
|
"-ar", "16000",
|
|
"-ac", "1",
|
|
"-c:a", "pcm_s16le",
|
|
"-y", temp_audio
|
|
], check=True, capture_output=True)
|
|
|
|
job.progress = 20
|
|
db.commit()
|
|
|
|
# Transcribe
|
|
transcribe_options = {
|
|
"language": None if source_language == "auto" else source_language,
|
|
"verbose": False,
|
|
"word_timestamps": word_timestamps
|
|
}
|
|
result = model.transcribe(temp_audio, **transcribe_options)
|
|
|
|
segments = result["segments"]
|
|
detected_language = result["language"]
|
|
transcription_result = result
|
|
|
|
# Generate initial subtitle content
|
|
if output_format == "vtt":
|
|
subtitle_content = _generate_vtt(segments, word_timestamps)
|
|
elif output_format == "ass":
|
|
subtitle_content = _generate_ass(segments, font, font_size, text_color,
|
|
outline_color, outline_width, position, alignment,
|
|
margin_v, margin_h, shadow, bold, italic,
|
|
background_color, background_opacity)
|
|
else:
|
|
subtitle_content = _generate_srt(segments)
|
|
|
|
finally:
|
|
if os.path.exists(temp_audio):
|
|
os.remove(temp_audio)
|
|
|
|
job.progress = 60
|
|
db.commit()
|
|
|
|
# Update subtitle extension based on format
|
|
subtitle_ext = "srt"
|
|
if output_format == "vtt": subtitle_ext = "vtt"
|
|
elif output_format == "ass": subtitle_ext = "ass"
|
|
|
|
# Translate if needed
|
|
translated_content = None
|
|
if target_language:
|
|
job.api_provider = "whisper+deepl"
|
|
import deepl
|
|
translator = deepl.Translator(settings.deepl_api_key)
|
|
|
|
# Translate only the text content
|
|
text_for_translation = "\n".join([seg.get("text", "").strip() for seg in segments])
|
|
translated_text = translator.translate_text(
|
|
text_for_translation,
|
|
target_lang=target_language
|
|
).text
|
|
|
|
# Rebuild the subtitles with translated text
|
|
translated_lines = translated_text.split("\n")
|
|
translated_segments = []
|
|
for i, seg in enumerate(segments):
|
|
new_seg = seg.copy()
|
|
if i < len(translated_lines):
|
|
new_seg["text"] = translated_lines[i]
|
|
translated_segments.append(new_seg)
|
|
|
|
if output_format == "vtt":
|
|
translated_content = _generate_vtt(translated_segments, word_timestamps)
|
|
elif output_format == "ass":
|
|
translated_content = _generate_ass(translated_segments, font, font_size, text_color,
|
|
outline_color, outline_width, position, alignment,
|
|
margin_v, margin_h, shadow, bold, italic,
|
|
background_color, background_opacity)
|
|
else:
|
|
translated_content = _generate_srt(translated_segments)
|
|
|
|
job.progress = 70
|
|
db.commit()
|
|
|
|
output_assets = []
|
|
|
|
# Save original subtitle file
|
|
# Use simple naming convention
|
|
base_name = os.path.splitext(input_asset.original_filename)[0]
|
|
# Clean basename of special chars if needed, but for now just use it
|
|
|
|
subtitle_filename = f"{base_name}-subtitles-{detected_language}.{subtitle_ext}"
|
|
subtitle_path = os.path.join(settings.storage_path, "documents", subtitle_filename)
|
|
# Handle duplicate filenames by appending uuid if needed, or just overwrite since uuid approach was replaced
|
|
# Actually user wants to "see whats been done", so readable names are key.
|
|
# If we overwrite, that's fine as per previous discussion, but unique names prevent collision in shared storage?
|
|
# Let's stick to unique names but readable: base-subtitles-lang-uuid_short.ext
|
|
short_id = str(uuid4())[:8]
|
|
subtitle_filename = f"{base_name}-subtitles-{detected_language}-{short_id}.{subtitle_ext}"
|
|
subtitle_path = os.path.join(settings.storage_path, "documents", subtitle_filename)
|
|
os.makedirs(os.path.dirname(subtitle_path), exist_ok=True)
|
|
|
|
with open(subtitle_path, "w", encoding="utf-8") as f:
|
|
f.write(subtitle_content)
|
|
|
|
subtitle_asset = Asset(
|
|
user_id=job.user_id,
|
|
project_id=job.project_id,
|
|
original_filename=subtitle_filename,
|
|
stored_filename=subtitle_filename,
|
|
file_path=subtitle_path,
|
|
file_type="document",
|
|
mime_type="text/plain",
|
|
file_size_bytes=len(subtitle_content.encode()),
|
|
source_module="subtitle_processor",
|
|
source_job_id=job.id,
|
|
parent_asset_id=input_asset.id,
|
|
asset_metadata={
|
|
"language": detected_language,
|
|
"type": "original",
|
|
"format": output_format,
|
|
"whisper_model": whisper_model
|
|
}
|
|
)
|
|
db.add(subtitle_asset)
|
|
db.commit()
|
|
db.refresh(subtitle_asset)
|
|
output_assets.append(subtitle_asset.id)
|
|
|
|
# Save translated subtitle if exists
|
|
trans_path = None
|
|
if translated_content:
|
|
short_id = str(uuid4())[:8]
|
|
trans_filename = f"{base_name}-subtitles-{target_language}-{short_id}.{subtitle_ext}"
|
|
trans_path = os.path.join(settings.storage_path, "documents", trans_filename)
|
|
|
|
with open(trans_path, "w", encoding="utf-8") as f:
|
|
f.write(translated_content)
|
|
|
|
trans_asset = Asset(
|
|
user_id=job.user_id,
|
|
project_id=job.project_id,
|
|
original_filename=trans_filename,
|
|
stored_filename=trans_filename,
|
|
file_path=trans_path,
|
|
file_type="document",
|
|
mime_type="text/plain",
|
|
file_size_bytes=len(translated_content.encode()),
|
|
source_module="subtitle_processor",
|
|
source_job_id=job.id,
|
|
parent_asset_id=input_asset.id,
|
|
asset_metadata={
|
|
"language": target_language,
|
|
"type": "translated",
|
|
"format": output_format
|
|
}
|
|
)
|
|
db.add(trans_asset)
|
|
db.commit()
|
|
db.refresh(trans_asset)
|
|
output_assets.append(trans_asset.id)
|
|
|
|
job.progress = 80
|
|
db.commit()
|
|
|
|
# Burn subtitles if requested
|
|
if burn_subtitles:
|
|
burn_path = trans_path if translated_content else subtitle_path
|
|
# Burned video
|
|
lang_code = target_language if translated_content else detected_language
|
|
short_id = str(uuid4())[:8]
|
|
output_filename = f"{base_name}-subtitled-{lang_code}-{short_id}.mp4"
|
|
output_path = os.path.join(settings.storage_path, "videos", output_filename)
|
|
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
|
|
|
# Build the FFmpeg subtitle filter
|
|
subtitle_filter = _build_subtitle_filter(
|
|
burn_path, font, font_size, text_color, outline_color,
|
|
outline_width, position, alignment, margin_v, margin_h,
|
|
shadow, bold, italic, background_color, background_opacity
|
|
)
|
|
|
|
try:
|
|
result = subprocess.run([
|
|
"ffmpeg", "-i", input_asset.file_path,
|
|
"-vf", subtitle_filter,
|
|
"-c:a", "copy",
|
|
"-y", output_path
|
|
], check=True, capture_output=True)
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error("FFmpeg burning failed", stderr=e.stderr.decode('utf-8'))
|
|
raise RuntimeError(f"FFmpeg error: {e.stderr.decode('utf-8')[-500:]}")
|
|
|
|
video_size = os.path.getsize(output_path)
|
|
|
|
video_asset = Asset(
|
|
user_id=job.user_id,
|
|
project_id=job.project_id,
|
|
original_filename=output_filename,
|
|
stored_filename=output_filename,
|
|
file_path=output_path,
|
|
file_type="video",
|
|
mime_type="video/mp4",
|
|
file_size_bytes=video_size,
|
|
width=input_asset.width,
|
|
height=input_asset.height,
|
|
duration_seconds=input_asset.duration_seconds,
|
|
source_module="subtitle_processor",
|
|
source_job_id=job.id,
|
|
parent_asset_id=input_asset.id,
|
|
asset_metadata={
|
|
"burned_subtitles": True,
|
|
"subtitle_language": target_language or detected_language,
|
|
"styling": {
|
|
"font": font,
|
|
"font_size": font_size,
|
|
"text_color": text_color,
|
|
"position": position
|
|
}
|
|
}
|
|
)
|
|
db.add(video_asset)
|
|
db.commit()
|
|
db.refresh(video_asset)
|
|
output_assets.append(video_asset.id)
|
|
|
|
# Cleanup temp audio
|
|
if 'audio_path' in locals() and audio_path and os.path.exists(audio_path):
|
|
os.remove(audio_path)
|
|
|
|
job.output_asset_ids = output_assets
|
|
job.output_data = {
|
|
"transcript": transcription_result.get("text", ""),
|
|
"language": detected_language,
|
|
"segments_count": len(segments),
|
|
"word_timestamps": word_timestamps,
|
|
"output_format": output_format,
|
|
"translated": bool(translated_content),
|
|
"burned": burn_subtitles,
|
|
"asset_ids": [str(a) for a in output_assets]
|
|
}
|
|
job.progress = 100
|
|
job.status = "completed"
|
|
job.completed_at = datetime.utcnow()
|
|
db.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Subtitle processing error: {e}", exc_info=True)
|
|
job.status = "failed"
|
|
job.error_message = str(e)
|
|
db.commit()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def _generate_srt(segments: list) -> str:
|
|
"""Generate SRT format from segments"""
|
|
srt_lines = []
|
|
for i, segment in enumerate(segments, 1):
|
|
start = _format_srt_timestamp(segment['start'])
|
|
end = _format_srt_timestamp(segment['end'])
|
|
text = segment['text'].strip()
|
|
srt_lines.append(f"{i}\n{start} --> {end}\n{text}\n")
|
|
return "\n".join(srt_lines)
|
|
|
|
|
|
def _generate_vtt(segments: list, word_timestamps: bool = False) -> str:
|
|
"""Generate WebVTT format from segments"""
|
|
vtt_lines = ["WEBVTT", ""]
|
|
for i, segment in enumerate(segments, 1):
|
|
start = _format_vtt_timestamp(segment['start'])
|
|
end = _format_vtt_timestamp(segment['end'])
|
|
text = segment['text'].strip()
|
|
|
|
# Add word-level timestamps if available
|
|
if word_timestamps and 'words' in segment:
|
|
words_with_timing = []
|
|
for word in segment['words']:
|
|
word_start = _format_vtt_timestamp(word['start'])
|
|
words_with_timing.append(f"<{word_start}>{word['word']}")
|
|
text = "".join(words_with_timing)
|
|
|
|
vtt_lines.append(f"{i}")
|
|
vtt_lines.append(f"{start} --> {end}")
|
|
vtt_lines.append(text)
|
|
vtt_lines.append("")
|
|
return "\n".join(vtt_lines)
|
|
|
|
|
|
def _generate_ass(segments: list, font: str, font_size: int, text_color: str,
|
|
outline_color: str, outline_width: float, position: str,
|
|
alignment: str, margin_v: int, margin_h: int, shadow: int,
|
|
bold: bool, italic: bool, background_color: Optional[str],
|
|
background_opacity: float) -> str:
|
|
"""Generate ASS (Advanced SubStation Alpha) format with full styling"""
|
|
|
|
# Convert colors to ASS format (&HBBGGRR)
|
|
primary_hex = COLOR_MAP.get(text_color.lower(), 'FFFFFF')
|
|
outline_hex = COLOR_MAP.get(outline_color.lower(), '000000')
|
|
|
|
# Calculate alignment value (SSA uses different numbering)
|
|
# 1=left-bottom, 2=center-bottom, 3=right-bottom
|
|
# 4=left-middle, 5=center-middle, 6=right-middle
|
|
# 7=left-top, 8=center-top, 9=right-top
|
|
align_map = {
|
|
('left', 'bottom'): 1, ('center', 'bottom'): 2, ('right', 'bottom'): 3,
|
|
('left', 'center'): 4, ('center', 'center'): 5, ('right', 'center'): 6,
|
|
('left', 'top'): 7, ('center', 'top'): 8, ('right', 'top'): 9
|
|
}
|
|
ass_alignment = align_map.get((alignment, position), 2)
|
|
|
|
# Background color with opacity
|
|
back_alpha = hex(int((1 - background_opacity) * 255))[2:].upper().zfill(2)
|
|
if background_color:
|
|
back_hex = COLOR_MAP.get(background_color.lower(), '000000')
|
|
back_color = f"&H{back_alpha}{back_hex}"
|
|
else:
|
|
back_color = f"&H{back_alpha}000000"
|
|
|
|
# Font weight and style
|
|
bold_val = -1 if bold else 0
|
|
italic_val = -1 if italic else 0
|
|
|
|
ass_content = f"""[Script Info]
|
|
Title: Generated Subtitles
|
|
ScriptType: v4.00+
|
|
PlayResX: 1920
|
|
PlayResY: 1080
|
|
ScaledBorderAndShadow: yes
|
|
|
|
[V4+ Styles]
|
|
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
|
|
Style: Default,{font},{font_size},&H00{primary_hex},&H00{primary_hex},&H00{outline_hex},{back_color},{bold_val},{italic_val},0,0,100,100,0,0,1,{outline_width},{shadow},{ass_alignment},{margin_h},{margin_h},{margin_v},1
|
|
|
|
[Events]
|
|
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
|
|
"""
|
|
|
|
|
|
def _parse_srt(content: str) -> list:
|
|
"""Parse SRT content into segments"""
|
|
segments = []
|
|
blocks = content.strip().split('\n\n')
|
|
|
|
for block in blocks:
|
|
lines = block.strip().split('\n')
|
|
if len(lines) >= 3:
|
|
# Parse timestamp line
|
|
times = lines[1].split(' --> ')
|
|
if len(times) != 2:
|
|
continue
|
|
|
|
start_str, end_str = times
|
|
|
|
def parse_time(t_str):
|
|
t_str = t_str.replace(',', '.')
|
|
parts = t_str.split(':')
|
|
return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2])
|
|
|
|
start = parse_time(start_str)
|
|
end = parse_time(end_str)
|
|
text = "\n".join(lines[2:])
|
|
|
|
segments.append({
|
|
'start': start,
|
|
'end': end,
|
|
'text': text
|
|
})
|
|
|
|
return segments
|
|
|
|
for segment in segments:
|
|
start = _format_ass_timestamp(segment['start'])
|
|
end = _format_ass_timestamp(segment['end'])
|
|
text = segment['text'].strip().replace('\n', '\\N')
|
|
ass_content += f"Dialogue: 0,{start},{end},Default,,0,0,0,,{text}\n"
|
|
|
|
return ass_content
|
|
|
|
|
|
def _parse_srt(content: str) -> list:
|
|
"""Parse SRT content into segments"""
|
|
segments = []
|
|
blocks = content.strip().split('\n\n')
|
|
|
|
for block in blocks:
|
|
lines = block.strip().split('\n')
|
|
if len(lines) >= 3:
|
|
# Parse timestamp line
|
|
times = lines[1].split(' --> ')
|
|
if len(times) != 2:
|
|
continue
|
|
|
|
start_str, end_str = times
|
|
|
|
def parse_time(t_str):
|
|
t_str = t_str.replace(',', '.')
|
|
parts = t_str.split(':')
|
|
return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2])
|
|
|
|
start = parse_time(start_str)
|
|
end = parse_time(end_str)
|
|
text = "\n".join(lines[2:])
|
|
|
|
segments.append({
|
|
'start': start,
|
|
'end': end,
|
|
'text': text
|
|
})
|
|
|
|
return segments
|
|
|
|
|
|
def _format_srt_timestamp(seconds: float) -> str:
|
|
"""Convert seconds to SRT timestamp format (HH:MM:SS,mmm)"""
|
|
td = timedelta(seconds=seconds)
|
|
hours = td.seconds // 3600
|
|
minutes = (td.seconds % 3600) // 60
|
|
secs = td.seconds % 60
|
|
millis = td.microseconds // 1000
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
|
|
|
|
|
|
def _format_vtt_timestamp(seconds: float) -> str:
|
|
"""Convert seconds to WebVTT timestamp format (HH:MM:SS.mmm)"""
|
|
td = timedelta(seconds=seconds)
|
|
hours = td.seconds // 3600
|
|
minutes = (td.seconds % 3600) // 60
|
|
secs = td.seconds % 60
|
|
millis = td.microseconds // 1000
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d}.{millis:03d}"
|
|
|
|
|
|
def _format_ass_timestamp(seconds: float) -> str:
|
|
"""Convert seconds to ASS timestamp format (H:MM:SS.cc)"""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
secs = int(seconds % 60)
|
|
centisecs = int((seconds - int(seconds)) * 100)
|
|
return f"{hours}:{minutes:02d}:{secs:02d}.{centisecs:02d}"
|
|
|
|
|
|
def _build_subtitle_filter(subtitle_path: str, font: str, font_size: int,
|
|
text_color: str, outline_color: str, outline_width: float,
|
|
position: str, alignment: str, margin_v: int, margin_h: int,
|
|
shadow: int, bold: bool, italic: bool,
|
|
background_color: Optional[str], background_opacity: float) -> str:
|
|
"""Build FFmpeg subtitle filter with styling"""
|
|
|
|
# Determine if we're using ASS file (has its own styling)
|
|
if subtitle_path.endswith('.ass'):
|
|
return f"ass={subtitle_path}"
|
|
|
|
# Get hex colors
|
|
primary_hex = COLOR_MAP.get(text_color.lower(), 'FFFFFF')
|
|
outline_hex = COLOR_MAP.get(outline_color.lower(), '000000')
|
|
|
|
# Calculate alignment for subtitles filter
|
|
# SSA/ASS alignment: 1-3 bottom, 4-6 middle, 7-9 top
|
|
align_map = {
|
|
('left', 'bottom'): 1, ('center', 'bottom'): 2, ('right', 'bottom'): 3,
|
|
('left', 'center'): 4, ('center', 'center'): 5, ('right', 'center'): 6,
|
|
('left', 'top'): 7, ('center', 'top'): 8, ('right', 'top'): 9
|
|
}
|
|
ass_alignment = align_map.get((alignment, position), 2)
|
|
|
|
# Build force_style string
|
|
style_parts = [
|
|
f"Fontname={font}",
|
|
f"Fontsize={font_size}",
|
|
f"PrimaryColour=&H00{primary_hex}",
|
|
f"OutlineColour=&H00{outline_hex}",
|
|
f"BorderStyle=1",
|
|
f"Outline={outline_width:.1f}",
|
|
f"Shadow={shadow}",
|
|
f"Alignment={ass_alignment}",
|
|
f"MarginL={margin_h}",
|
|
f"MarginR={margin_h}",
|
|
f"MarginV={margin_v}"
|
|
]
|
|
|
|
if bold:
|
|
style_parts.append("Bold=1")
|
|
if italic:
|
|
style_parts.append("Italic=1")
|
|
|
|
# Add background if specified
|
|
if background_color and background_opacity > 0:
|
|
back_alpha = hex(int((1 - background_opacity) * 255))[2:].upper().zfill(2)
|
|
back_hex = COLOR_MAP.get(background_color.lower(), '000000')
|
|
style_parts.append(f"BackColour=&H{back_alpha}{back_hex}")
|
|
style_parts.append("BorderStyle=4") # Opaque box style
|
|
|
|
force_style = ",".join(style_parts)
|
|
|
|
# Escape the subtitle path for FFmpeg
|
|
escaped_path = subtitle_path.replace("'", "'\\''").replace(":", "\\:")
|
|
|
|
return f"subtitles='{escaped_path}':force_style='{force_style}'"
|