forge/backend/app/services/subtitle_processor.py

852 lines
32 KiB
Python

"""
Subtitle Processor Service - Whisper + DeepL + FFmpeg
Full-featured subtitle processing with:
- Whisper transcription (multiple model sizes)
- DeepL translation (30+ languages)
- FFmpeg burning with full styling control
Styling Options:
- font: Font family (Arial, Helvetica, Times New Roman, etc.)
- font_size: Font size in points (default: 24)
- text_color: Primary text color (white, yellow, black, red, blue, green, orange, purple)
- outline_color: Outline/border color
- outline_width: Outline thickness (0-5, default: 2)
- background_color: Optional background box color
- background_opacity: Background box opacity (0-1)
- position: vertical position (bottom, top, center)
- alignment: horizontal alignment (left, center, right)
- margin_v: Vertical margin from edge (default: 30)
- margin_h: Horizontal margin (default: 20)
- shadow: Shadow depth (0-4)
- bold: Bold text (true/false)
- italic: Italic text (true/false)
Whisper Models:
- tiny: Fastest, lowest accuracy (~1GB VRAM)
- base: Fast, good accuracy (~1GB VRAM) - default
- small: Balanced (~2GB VRAM)
- medium: High accuracy (~5GB VRAM)
- large: Best accuracy (~10GB VRAM)
- large-v2: Latest large model
- large-v3: Newest model with best accuracy
"""
import os
import subprocess
from uuid import uuid4
from datetime import datetime, timedelta
from typing import Optional
from app.database import SessionLocal
from app.models.job import Job
from app.models.asset import Asset
from app.config import settings
import structlog
logger = structlog.get_logger()
# Supported languages for DeepL translation
SUPPORTED_LANGUAGES = {
'BG': 'Bulgarian',
'CS': 'Czech',
'DA': 'Danish',
'DE': 'German',
'EL': 'Greek',
'EN-GB': 'English (British)',
'EN-US': 'English (American)',
'ES': 'Spanish',
'ET': 'Estonian',
'FI': 'Finnish',
'FR': 'French',
'HU': 'Hungarian',
'ID': 'Indonesian',
'IT': 'Italian',
'JA': 'Japanese',
'KO': 'Korean',
'LT': 'Lithuanian',
'LV': 'Latvian',
'NB': 'Norwegian (Bokmål)',
'NL': 'Dutch',
'PL': 'Polish',
'PT-BR': 'Portuguese (Brazilian)',
'PT-PT': 'Portuguese (European)',
'RO': 'Romanian',
'RU': 'Russian',
'SK': 'Slovak',
'SL': 'Slovenian',
'SV': 'Swedish',
'TR': 'Turkish',
'UK': 'Ukrainian',
'ZH': 'Chinese (simplified)',
'ZH-HANS': 'Chinese (simplified)'
}
# Color mapping for ASS format (BGR order)
COLOR_MAP = {
'white': 'FFFFFF',
'yellow': '00FFFF',
'black': '000000',
'red': '0000FF',
'blue': 'FF0000',
'green': '00FF00',
'orange': '0080FF',
'purple': '800080',
'cyan': 'FFFF00',
'magenta': 'FF00FF',
'gray': '808080',
'silver': 'C0C0C0',
'gold': '00D7FF',
'lime': '00FF00',
'navy': '800000',
'teal': '808000',
'maroon': '000080',
'olive': '008080'
}
# Whisper model options
WHISPER_MODELS = {
'tiny': {'name': 'Tiny', 'vram': '~1GB', 'speed': 'fastest'},
'base': {'name': 'Base', 'vram': '~1GB', 'speed': 'fast'},
'small': {'name': 'Small', 'vram': '~2GB', 'speed': 'moderate'},
'medium': {'name': 'Medium', 'vram': '~5GB', 'speed': 'slow'},
'large': {'name': 'Large', 'vram': '~10GB', 'speed': 'slowest'},
'large-v2': {'name': 'Large V2', 'vram': '~10GB', 'speed': 'slowest'},
'large-v3': {'name': 'Large V3', 'vram': '~10GB', 'speed': 'slowest'}
}
# Font presets
FONT_PRESETS = {
'default': {'font': 'Arial', 'size': 24, 'outline': 2},
'cinematic': {'font': 'Helvetica', 'size': 28, 'outline': 3},
'documentary': {'font': 'Georgia', 'size': 22, 'outline': 1},
'news': {'font': 'Arial', 'size': 26, 'outline': 2},
'social_media': {'font': 'Arial Black', 'size': 32, 'outline': 4},
'minimal': {'font': 'Helvetica', 'size': 20, 'outline': 1},
'bold': {'font': 'Impact', 'size': 30, 'outline': 3}
}
def get_available_fonts():
"""Get list of available fonts on the system"""
try:
# Check if fc-list exists
subprocess.check_call(['which', 'fc-list'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
output = subprocess.check_output(['fc-list', ':', 'family'], stderr=subprocess.DEVNULL).decode('utf-8')
fonts = set()
for line in output.splitlines():
for font in line.split(','):
font = font.strip()
if font:
fonts.add(font)
return sorted(list(fonts))
except (subprocess.SubprocessError, FileNotFoundError):
return [
'Arial', 'Helvetica', 'Times New Roman', 'Courier New', 'Verdana',
'Georgia', 'Palatino', 'Garamond', 'Comic Sans MS', 'Trebuchet MS',
'Arial Black', 'Impact', 'Tahoma', 'Roboto', 'Open Sans', 'DejaVu Sans'
]
def _check_font_availability(font_name: str) -> str:
"""Check if font exists, return validated font or fallback"""
available = get_available_fonts()
# Direct match
if font_name in available:
return font_name
# Case insensitive match
font_lower = font_name.lower()
for f in available:
if f.lower() == font_lower:
return f
# Fallback to defaults if specific font not found
defaults = ['Arial', 'Helvetica', 'DejaVu Sans', 'FreeSans']
for default in defaults:
if default in available:
return default
# Last resort (ffmpeg typically has a default sans serif)
return 'Arial'
def get_subtitle_config():
"""Return available configuration options for subtitles"""
return {
"whisper_models": WHISPER_MODELS,
"supported_languages": SUPPORTED_LANGUAGES,
"colors": list(COLOR_MAP.keys()),
"fonts": get_available_fonts(),
"font_presets": FONT_PRESETS,
"positions": ["bottom", "top", "center"],
"alignments": ["left", "center", "right"]
}
async def process(job_id: str):
"""
Process video for subtitles - transcribe, translate, optionally burn
Input parameters:
- source_language: Source language code or "auto" for detection
- target_language: Target language code for translation (optional)
- burn_subtitles: Whether to burn subtitles into video
- whisper_model: Whisper model size (tiny/base/small/medium/large)
- font: Font family name
- font_size: Font size in points
- text_color: Primary text color
- outline_color: Text outline color
- outline_width: Outline thickness (0-5)
- background_color: Background box color (optional)
- background_opacity: Background opacity 0-1 (default 0)
- position: Vertical position (bottom/top/center)
- alignment: Horizontal alignment (left/center/right)
- margin_v: Vertical margin from edge
- margin_h: Horizontal margin
- shadow: Shadow depth (0-4)
- bold: Use bold text
- italic: Use italic text
- font_preset: Use a predefined style preset
- word_timestamps: Include word-level timestamps in output
- output_format: SRT, VTT, or ASS format
"""
db = SessionLocal()
try:
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
return
input_data = job.input_data
input_asset_ids = job.input_asset_ids
if not input_asset_ids:
raise ValueError("No input asset provided")
input_asset = db.query(Asset).filter(Asset.id == input_asset_ids[0]).first()
if not input_asset:
raise ValueError("Input asset not found")
job.progress = 5
job.api_provider = "whisper"
db.commit()
# Get all parameters with defaults
source_language = input_data.get("source_language", "auto")
target_language = input_data.get("target_language")
burn_subtitles = input_data.get("burn_subtitles", False)
whisper_model = input_data.get("whisper_model", "base")
word_timestamps = input_data.get("word_timestamps", False)
output_format = input_data.get("output_format", "srt").lower()
# Styling parameters
font_preset = input_data.get("font_preset")
if font_preset and font_preset in FONT_PRESETS:
preset = FONT_PRESETS[font_preset]
font_req = input_data.get("font", preset['font'])
font_size = input_data.get("font_size", preset['size'])
outline_width = input_data.get("outline_width", preset['outline'])
else:
font_req = input_data.get("font", "Arial")
font_size = input_data.get("font_size", 24)
outline_width = input_data.get("outline_width", 2)
# Validate font availability
font = _check_font_availability(font_req)
if font != font_req:
logger.warning(f"Font '{font_req}' not found, falling back to '{font}'")
text_color = input_data.get("text_color", "white")
outline_color = input_data.get("outline_color", "black")
background_color = input_data.get("background_color")
background_opacity = input_data.get("background_opacity", 0)
position = input_data.get("position", "bottom")
alignment = input_data.get("alignment", "center")
margin_v = input_data.get("margin_v", 30)
margin_h = input_data.get("margin_h", 20)
shadow = input_data.get("shadow", 0)
bold = input_data.get("bold", False)
italic = input_data.get("italic", False)
# Extract audio from video
audio_path = os.path.join(settings.storage_path, "temp", f"{uuid4()}.wav")
os.makedirs(os.path.dirname(audio_path), exist_ok=True)
subprocess.run([
"ffmpeg", "-i", input_asset.file_path,
"-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
"-y", audio_path
], check=True, capture_output=True)
job.progress = 20
db.commit()
# Transcribe with Whisper
import whisper
logger.info(f"Loading Whisper model: {whisper_model}")
model = whisper.load_model(whisper_model)
transcribe_options = {
"language": None if source_language == "auto" else source_language,
"verbose": False,
"word_timestamps": word_timestamps
}
result = model.transcribe(audio_path, **transcribe_options)
job.progress = 50
job.api_model = f"whisper-{whisper_model}"
db.commit()
# Generate subtitle content
segments = result.get("segments", [])
detected_language = result.get("language", source_language)
if output_format == "vtt":
subtitle_content = _generate_vtt(segments, word_timestamps)
subtitle_ext = "vtt"
elif output_format == "ass":
subtitle_content = _generate_ass(segments, font, font_size, text_color, outline_color,
outline_width, position, alignment, margin_v, margin_h,
shadow, bold, italic, background_color, background_opacity)
subtitle_ext = "ass"
else:
subtitle_content = _generate_srt(segments)
subtitle_ext = "srt"
# Helper validation for font
font = _check_font_availability(font_req)
# Check for provided subtitle file
subtitle_asset_id = input_data.get("subtitle_asset_id")
segments = []
detected_language = source_language
subtitle_content = ""
transcription_result = {} # To store result for output_data
if subtitle_asset_id:
logger.info("Using provided subtitle asset", asset_id=subtitle_asset_id)
sub_asset = db.query(Asset).filter(Asset.id == subtitle_asset_id).first()
if not sub_asset or not os.path.exists(sub_asset.file_path):
raise ValueError("Provided subtitle asset not found")
with open(sub_asset.file_path, "r", encoding="utf-8") as f:
subtitle_content = f.read()
# Simple assumption: Input is SRT if we are parsing it
# In future we might check extension
segments = _parse_srt(subtitle_content)
job.progress = 50 # Skip transcription
transcription_result = {"text": "Imported from SRT", "language": source_language}
else:
# Transcribe with Whisper
job.api_provider = f"whisper-{whisper_model}"
import whisper
model = whisper.load_model(whisper_model)
# Extract audio to temp file
temp_audio = os.path.join(settings.storage_path, "temp", f"temp_{uuid4()}.wav")
os.makedirs(os.path.dirname(temp_audio), exist_ok=True)
try:
subprocess.run([
"ffmpeg", "-i", input_asset.file_path,
"-ar", "16000",
"-ac", "1",
"-c:a", "pcm_s16le",
"-y", temp_audio
], check=True, capture_output=True)
job.progress = 20
db.commit()
# Transcribe
transcribe_options = {
"language": None if source_language == "auto" else source_language,
"verbose": False,
"word_timestamps": word_timestamps
}
result = model.transcribe(temp_audio, **transcribe_options)
segments = result["segments"]
detected_language = result["language"]
transcription_result = result
# Generate initial subtitle content
if output_format == "vtt":
subtitle_content = _generate_vtt(segments, word_timestamps)
elif output_format == "ass":
subtitle_content = _generate_ass(segments, font, font_size, text_color,
outline_color, outline_width, position, alignment,
margin_v, margin_h, shadow, bold, italic,
background_color, background_opacity)
else:
subtitle_content = _generate_srt(segments)
finally:
if os.path.exists(temp_audio):
os.remove(temp_audio)
job.progress = 60
db.commit()
# Update subtitle extension based on format
subtitle_ext = "srt"
if output_format == "vtt": subtitle_ext = "vtt"
elif output_format == "ass": subtitle_ext = "ass"
# Translate if needed
translated_content = None
if target_language:
job.api_provider = "whisper+deepl"
import deepl
translator = deepl.Translator(settings.deepl_api_key)
# Translate only the text content
text_for_translation = "\n".join([seg.get("text", "").strip() for seg in segments])
translated_text = translator.translate_text(
text_for_translation,
target_lang=target_language
).text
# Rebuild the subtitles with translated text
translated_lines = translated_text.split("\n")
translated_segments = []
for i, seg in enumerate(segments):
new_seg = seg.copy()
if i < len(translated_lines):
new_seg["text"] = translated_lines[i]
translated_segments.append(new_seg)
if output_format == "vtt":
translated_content = _generate_vtt(translated_segments, word_timestamps)
elif output_format == "ass":
translated_content = _generate_ass(translated_segments, font, font_size, text_color,
outline_color, outline_width, position, alignment,
margin_v, margin_h, shadow, bold, italic,
background_color, background_opacity)
else:
translated_content = _generate_srt(translated_segments)
job.progress = 70
db.commit()
output_assets = []
# Save original subtitle file
# Use simple naming convention
base_name = os.path.splitext(input_asset.original_filename)[0]
# Clean basename of special chars if needed, but for now just use it
subtitle_filename = f"{base_name}-subtitles-{detected_language}.{subtitle_ext}"
subtitle_path = os.path.join(settings.storage_path, "documents", subtitle_filename)
# Handle duplicate filenames by appending uuid if needed, or just overwrite since uuid approach was replaced
# Actually user wants to "see whats been done", so readable names are key.
# If we overwrite, that's fine as per previous discussion, but unique names prevent collision in shared storage?
# Let's stick to unique names but readable: base-subtitles-lang-uuid_short.ext
short_id = str(uuid4())[:8]
subtitle_filename = f"{base_name}-subtitles-{detected_language}-{short_id}.{subtitle_ext}"
subtitle_path = os.path.join(settings.storage_path, "documents", subtitle_filename)
os.makedirs(os.path.dirname(subtitle_path), exist_ok=True)
with open(subtitle_path, "w", encoding="utf-8") as f:
f.write(subtitle_content)
subtitle_asset = Asset(
user_id=job.user_id,
project_id=job.project_id,
original_filename=subtitle_filename,
stored_filename=subtitle_filename,
file_path=subtitle_path,
file_type="document",
mime_type="text/plain",
file_size_bytes=len(subtitle_content.encode()),
source_module="subtitle_processor",
source_job_id=job.id,
parent_asset_id=input_asset.id,
asset_metadata={
"language": detected_language,
"type": "original",
"format": output_format,
"whisper_model": whisper_model
}
)
db.add(subtitle_asset)
db.commit()
db.refresh(subtitle_asset)
output_assets.append(subtitle_asset.id)
# Save translated subtitle if exists
trans_path = None
if translated_content:
short_id = str(uuid4())[:8]
trans_filename = f"{base_name}-subtitles-{target_language}-{short_id}.{subtitle_ext}"
trans_path = os.path.join(settings.storage_path, "documents", trans_filename)
with open(trans_path, "w", encoding="utf-8") as f:
f.write(translated_content)
trans_asset = Asset(
user_id=job.user_id,
project_id=job.project_id,
original_filename=trans_filename,
stored_filename=trans_filename,
file_path=trans_path,
file_type="document",
mime_type="text/plain",
file_size_bytes=len(translated_content.encode()),
source_module="subtitle_processor",
source_job_id=job.id,
parent_asset_id=input_asset.id,
asset_metadata={
"language": target_language,
"type": "translated",
"format": output_format
}
)
db.add(trans_asset)
db.commit()
db.refresh(trans_asset)
output_assets.append(trans_asset.id)
job.progress = 80
db.commit()
# Burn subtitles if requested
if burn_subtitles:
burn_path = trans_path if translated_content else subtitle_path
# Burned video
lang_code = target_language if translated_content else detected_language
short_id = str(uuid4())[:8]
output_filename = f"{base_name}-subtitled-{lang_code}-{short_id}.mp4"
output_path = os.path.join(settings.storage_path, "videos", output_filename)
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Build the FFmpeg subtitle filter
subtitle_filter = _build_subtitle_filter(
burn_path, font, font_size, text_color, outline_color,
outline_width, position, alignment, margin_v, margin_h,
shadow, bold, italic, background_color, background_opacity
)
try:
result = subprocess.run([
"ffmpeg", "-i", input_asset.file_path,
"-vf", subtitle_filter,
"-c:a", "copy",
"-y", output_path
], check=True, capture_output=True)
except subprocess.CalledProcessError as e:
logger.error("FFmpeg burning failed", stderr=e.stderr.decode('utf-8'))
raise RuntimeError(f"FFmpeg error: {e.stderr.decode('utf-8')[-500:]}")
video_size = os.path.getsize(output_path)
video_asset = Asset(
user_id=job.user_id,
project_id=job.project_id,
original_filename=output_filename,
stored_filename=output_filename,
file_path=output_path,
file_type="video",
mime_type="video/mp4",
file_size_bytes=video_size,
width=input_asset.width,
height=input_asset.height,
duration_seconds=input_asset.duration_seconds,
source_module="subtitle_processor",
source_job_id=job.id,
parent_asset_id=input_asset.id,
asset_metadata={
"burned_subtitles": True,
"subtitle_language": target_language or detected_language,
"styling": {
"font": font,
"font_size": font_size,
"text_color": text_color,
"position": position
}
}
)
db.add(video_asset)
db.commit()
db.refresh(video_asset)
output_assets.append(video_asset.id)
# Cleanup temp audio
if 'audio_path' in locals() and audio_path and os.path.exists(audio_path):
os.remove(audio_path)
job.output_asset_ids = output_assets
job.output_data = {
"transcript": transcription_result.get("text", ""),
"language": detected_language,
"segments_count": len(segments),
"word_timestamps": word_timestamps,
"output_format": output_format,
"translated": bool(translated_content),
"burned": burn_subtitles,
"asset_ids": [str(a) for a in output_assets]
}
job.progress = 100
job.status = "completed"
job.completed_at = datetime.utcnow()
db.commit()
except Exception as e:
logger.error(f"Subtitle processing error: {e}", exc_info=True)
job.status = "failed"
job.error_message = str(e)
db.commit()
finally:
db.close()
def _generate_srt(segments: list) -> str:
"""Generate SRT format from segments"""
srt_lines = []
for i, segment in enumerate(segments, 1):
start = _format_srt_timestamp(segment['start'])
end = _format_srt_timestamp(segment['end'])
text = segment['text'].strip()
srt_lines.append(f"{i}\n{start} --> {end}\n{text}\n")
return "\n".join(srt_lines)
def _generate_vtt(segments: list, word_timestamps: bool = False) -> str:
"""Generate WebVTT format from segments"""
vtt_lines = ["WEBVTT", ""]
for i, segment in enumerate(segments, 1):
start = _format_vtt_timestamp(segment['start'])
end = _format_vtt_timestamp(segment['end'])
text = segment['text'].strip()
# Add word-level timestamps if available
if word_timestamps and 'words' in segment:
words_with_timing = []
for word in segment['words']:
word_start = _format_vtt_timestamp(word['start'])
words_with_timing.append(f"<{word_start}>{word['word']}")
text = "".join(words_with_timing)
vtt_lines.append(f"{i}")
vtt_lines.append(f"{start} --> {end}")
vtt_lines.append(text)
vtt_lines.append("")
return "\n".join(vtt_lines)
def _generate_ass(segments: list, font: str, font_size: int, text_color: str,
outline_color: str, outline_width: float, position: str,
alignment: str, margin_v: int, margin_h: int, shadow: int,
bold: bool, italic: bool, background_color: Optional[str],
background_opacity: float) -> str:
"""Generate ASS (Advanced SubStation Alpha) format with full styling"""
# Convert colors to ASS format (&HBBGGRR)
primary_hex = COLOR_MAP.get(text_color.lower(), 'FFFFFF')
outline_hex = COLOR_MAP.get(outline_color.lower(), '000000')
# Calculate alignment value (SSA uses different numbering)
# 1=left-bottom, 2=center-bottom, 3=right-bottom
# 4=left-middle, 5=center-middle, 6=right-middle
# 7=left-top, 8=center-top, 9=right-top
align_map = {
('left', 'bottom'): 1, ('center', 'bottom'): 2, ('right', 'bottom'): 3,
('left', 'center'): 4, ('center', 'center'): 5, ('right', 'center'): 6,
('left', 'top'): 7, ('center', 'top'): 8, ('right', 'top'): 9
}
ass_alignment = align_map.get((alignment, position), 2)
# Background color with opacity
back_alpha = hex(int((1 - background_opacity) * 255))[2:].upper().zfill(2)
if background_color:
back_hex = COLOR_MAP.get(background_color.lower(), '000000')
back_color = f"&H{back_alpha}{back_hex}"
else:
back_color = f"&H{back_alpha}000000"
# Font weight and style
bold_val = -1 if bold else 0
italic_val = -1 if italic else 0
ass_content = f"""[Script Info]
Title: Generated Subtitles
ScriptType: v4.00+
PlayResX: 1920
PlayResY: 1080
ScaledBorderAndShadow: yes
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Default,{font},{font_size},&H00{primary_hex},&H00{primary_hex},&H00{outline_hex},{back_color},{bold_val},{italic_val},0,0,100,100,0,0,1,{outline_width},{shadow},{ass_alignment},{margin_h},{margin_h},{margin_v},1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
def _parse_srt(content: str) -> list:
"""Parse SRT content into segments"""
segments = []
blocks = content.strip().split('\n\n')
for block in blocks:
lines = block.strip().split('\n')
if len(lines) >= 3:
# Parse timestamp line
times = lines[1].split(' --> ')
if len(times) != 2:
continue
start_str, end_str = times
def parse_time(t_str):
t_str = t_str.replace(',', '.')
parts = t_str.split(':')
return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2])
start = parse_time(start_str)
end = parse_time(end_str)
text = "\n".join(lines[2:])
segments.append({
'start': start,
'end': end,
'text': text
})
return segments
for segment in segments:
start = _format_ass_timestamp(segment['start'])
end = _format_ass_timestamp(segment['end'])
text = segment['text'].strip().replace('\n', '\\N')
ass_content += f"Dialogue: 0,{start},{end},Default,,0,0,0,,{text}\n"
return ass_content
def _parse_srt(content: str) -> list:
"""Parse SRT content into segments"""
segments = []
blocks = content.strip().split('\n\n')
for block in blocks:
lines = block.strip().split('\n')
if len(lines) >= 3:
# Parse timestamp line
times = lines[1].split(' --> ')
if len(times) != 2:
continue
start_str, end_str = times
def parse_time(t_str):
t_str = t_str.replace(',', '.')
parts = t_str.split(':')
return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2])
start = parse_time(start_str)
end = parse_time(end_str)
text = "\n".join(lines[2:])
segments.append({
'start': start,
'end': end,
'text': text
})
return segments
def _format_srt_timestamp(seconds: float) -> str:
"""Convert seconds to SRT timestamp format (HH:MM:SS,mmm)"""
td = timedelta(seconds=seconds)
hours = td.seconds // 3600
minutes = (td.seconds % 3600) // 60
secs = td.seconds % 60
millis = td.microseconds // 1000
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
def _format_vtt_timestamp(seconds: float) -> str:
"""Convert seconds to WebVTT timestamp format (HH:MM:SS.mmm)"""
td = timedelta(seconds=seconds)
hours = td.seconds // 3600
minutes = (td.seconds % 3600) // 60
secs = td.seconds % 60
millis = td.microseconds // 1000
return f"{hours:02d}:{minutes:02d}:{secs:02d}.{millis:03d}"
def _format_ass_timestamp(seconds: float) -> str:
"""Convert seconds to ASS timestamp format (H:MM:SS.cc)"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
centisecs = int((seconds - int(seconds)) * 100)
return f"{hours}:{minutes:02d}:{secs:02d}.{centisecs:02d}"
def _build_subtitle_filter(subtitle_path: str, font: str, font_size: int,
text_color: str, outline_color: str, outline_width: float,
position: str, alignment: str, margin_v: int, margin_h: int,
shadow: int, bold: bool, italic: bool,
background_color: Optional[str], background_opacity: float) -> str:
"""Build FFmpeg subtitle filter with styling"""
# Determine if we're using ASS file (has its own styling)
if subtitle_path.endswith('.ass'):
return f"ass={subtitle_path}"
# Get hex colors
primary_hex = COLOR_MAP.get(text_color.lower(), 'FFFFFF')
outline_hex = COLOR_MAP.get(outline_color.lower(), '000000')
# Calculate alignment for subtitles filter
# SSA/ASS alignment: 1-3 bottom, 4-6 middle, 7-9 top
align_map = {
('left', 'bottom'): 1, ('center', 'bottom'): 2, ('right', 'bottom'): 3,
('left', 'center'): 4, ('center', 'center'): 5, ('right', 'center'): 6,
('left', 'top'): 7, ('center', 'top'): 8, ('right', 'top'): 9
}
ass_alignment = align_map.get((alignment, position), 2)
# Build force_style string
style_parts = [
f"Fontname={font}",
f"Fontsize={font_size}",
f"PrimaryColour=&H00{primary_hex}",
f"OutlineColour=&H00{outline_hex}",
f"BorderStyle=1",
f"Outline={outline_width:.1f}",
f"Shadow={shadow}",
f"Alignment={ass_alignment}",
f"MarginL={margin_h}",
f"MarginR={margin_h}",
f"MarginV={margin_v}"
]
if bold:
style_parts.append("Bold=1")
if italic:
style_parts.append("Italic=1")
# Add background if specified
if background_color and background_opacity > 0:
back_alpha = hex(int((1 - background_opacity) * 255))[2:].upper().zfill(2)
back_hex = COLOR_MAP.get(background_color.lower(), '000000')
style_parts.append(f"BackColour=&H{back_alpha}{back_hex}")
style_parts.append("BorderStyle=4") # Opaque box style
force_style = ",".join(style_parts)
# Escape the subtitle path for FFmpeg
escaped_path = subtitle_path.replace("'", "'\\''").replace(":", "\\:")
return f"subtitles='{escaped_path}':force_style='{force_style}'"