""" Subtitle Processor Service - Whisper + DeepL + FFmpeg Full-featured subtitle processing with: - Whisper transcription (multiple model sizes) - DeepL translation (30+ languages) - FFmpeg burning with full styling control Styling Options: - font: Font family (Arial, Helvetica, Times New Roman, etc.) - font_size: Font size in points (default: 24) - text_color: Primary text color (white, yellow, black, red, blue, green, orange, purple) - outline_color: Outline/border color - outline_width: Outline thickness (0-5, default: 2) - background_color: Optional background box color - background_opacity: Background box opacity (0-1) - position: vertical position (bottom, top, center) - alignment: horizontal alignment (left, center, right) - margin_v: Vertical margin from edge (default: 30) - margin_h: Horizontal margin (default: 20) - shadow: Shadow depth (0-4) - bold: Bold text (true/false) - italic: Italic text (true/false) Whisper Models: - tiny: Fastest, lowest accuracy (~1GB VRAM) - base: Fast, good accuracy (~1GB VRAM) - default - small: Balanced (~2GB VRAM) - medium: High accuracy (~5GB VRAM) - large: Best accuracy (~10GB VRAM) - large-v2: Latest large model - large-v3: Newest model with best accuracy """ import os import subprocess from uuid import uuid4 from datetime import datetime, timedelta from typing import Optional from app.database import SessionLocal from app.models.job import Job from app.models.asset import Asset from app.config import settings import structlog logger = structlog.get_logger() # Supported languages for DeepL translation SUPPORTED_LANGUAGES = { 'BG': 'Bulgarian', 'CS': 'Czech', 'DA': 'Danish', 'DE': 'German', 'EL': 'Greek', 'EN-GB': 'English (British)', 'EN-US': 'English (American)', 'ES': 'Spanish', 'ET': 'Estonian', 'FI': 'Finnish', 'FR': 'French', 'HU': 'Hungarian', 'ID': 'Indonesian', 'IT': 'Italian', 'JA': 'Japanese', 'KO': 'Korean', 'LT': 'Lithuanian', 'LV': 'Latvian', 'NB': 'Norwegian (Bokmål)', 'NL': 'Dutch', 'PL': 'Polish', 'PT-BR': 'Portuguese (Brazilian)', 'PT-PT': 'Portuguese (European)', 'RO': 'Romanian', 'RU': 'Russian', 'SK': 'Slovak', 'SL': 'Slovenian', 'SV': 'Swedish', 'TR': 'Turkish', 'UK': 'Ukrainian', 'ZH': 'Chinese (simplified)', 'ZH-HANS': 'Chinese (simplified)' } # Color mapping for ASS format (BGR order) COLOR_MAP = { 'white': 'FFFFFF', 'yellow': '00FFFF', 'black': '000000', 'red': '0000FF', 'blue': 'FF0000', 'green': '00FF00', 'orange': '0080FF', 'purple': '800080', 'cyan': 'FFFF00', 'magenta': 'FF00FF', 'gray': '808080', 'silver': 'C0C0C0', 'gold': '00D7FF', 'lime': '00FF00', 'navy': '800000', 'teal': '808000', 'maroon': '000080', 'olive': '008080' } # Whisper model options WHISPER_MODELS = { 'tiny': {'name': 'Tiny', 'vram': '~1GB', 'speed': 'fastest'}, 'base': {'name': 'Base', 'vram': '~1GB', 'speed': 'fast'}, 'small': {'name': 'Small', 'vram': '~2GB', 'speed': 'moderate'}, 'medium': {'name': 'Medium', 'vram': '~5GB', 'speed': 'slow'}, 'large': {'name': 'Large', 'vram': '~10GB', 'speed': 'slowest'}, 'large-v2': {'name': 'Large V2', 'vram': '~10GB', 'speed': 'slowest'}, 'large-v3': {'name': 'Large V3', 'vram': '~10GB', 'speed': 'slowest'} } # Font presets FONT_PRESETS = { 'default': {'font': 'Arial', 'size': 24, 'outline': 2}, 'cinematic': {'font': 'Helvetica', 'size': 28, 'outline': 3}, 'documentary': {'font': 'Georgia', 'size': 22, 'outline': 1}, 'news': {'font': 'Arial', 'size': 26, 'outline': 2}, 'social_media': {'font': 'Arial Black', 'size': 32, 'outline': 4}, 'minimal': {'font': 'Helvetica', 'size': 20, 'outline': 1}, 'bold': {'font': 'Impact', 'size': 30, 'outline': 3} } def get_available_fonts(): """Get list of available fonts on the system""" try: # Check if fc-list exists subprocess.check_call(['which', 'fc-list'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) output = subprocess.check_output(['fc-list', ':', 'family'], stderr=subprocess.DEVNULL).decode('utf-8') fonts = set() for line in output.splitlines(): for font in line.split(','): font = font.strip() if font: fonts.add(font) return sorted(list(fonts)) except (subprocess.SubprocessError, FileNotFoundError): return [ 'Arial', 'Helvetica', 'Times New Roman', 'Courier New', 'Verdana', 'Georgia', 'Palatino', 'Garamond', 'Comic Sans MS', 'Trebuchet MS', 'Arial Black', 'Impact', 'Tahoma', 'Roboto', 'Open Sans', 'DejaVu Sans' ] def _check_font_availability(font_name: str) -> str: """Check if font exists, return validated font or fallback""" available = get_available_fonts() # Direct match if font_name in available: return font_name # Case insensitive match font_lower = font_name.lower() for f in available: if f.lower() == font_lower: return f # Fallback to defaults if specific font not found defaults = ['Arial', 'Helvetica', 'DejaVu Sans', 'FreeSans'] for default in defaults: if default in available: return default # Last resort (ffmpeg typically has a default sans serif) return 'Arial' def get_subtitle_config(): """Return available configuration options for subtitles""" return { "whisper_models": WHISPER_MODELS, "supported_languages": SUPPORTED_LANGUAGES, "colors": list(COLOR_MAP.keys()), "fonts": get_available_fonts(), "font_presets": FONT_PRESETS, "positions": ["bottom", "top", "center"], "alignments": ["left", "center", "right"] } async def process(job_id: str): """ Process video for subtitles - transcribe, translate, optionally burn Input parameters: - source_language: Source language code or "auto" for detection - target_language: Target language code for translation (optional) - burn_subtitles: Whether to burn subtitles into video - whisper_model: Whisper model size (tiny/base/small/medium/large) - font: Font family name - font_size: Font size in points - text_color: Primary text color - outline_color: Text outline color - outline_width: Outline thickness (0-5) - background_color: Background box color (optional) - background_opacity: Background opacity 0-1 (default 0) - position: Vertical position (bottom/top/center) - alignment: Horizontal alignment (left/center/right) - margin_v: Vertical margin from edge - margin_h: Horizontal margin - shadow: Shadow depth (0-4) - bold: Use bold text - italic: Use italic text - font_preset: Use a predefined style preset - word_timestamps: Include word-level timestamps in output - output_format: SRT, VTT, or ASS format """ db = SessionLocal() try: job = db.query(Job).filter(Job.id == job_id).first() if not job: return input_data = job.input_data input_asset_ids = job.input_asset_ids if not input_asset_ids: raise ValueError("No input asset provided") input_asset = db.query(Asset).filter(Asset.id == input_asset_ids[0]).first() if not input_asset: raise ValueError("Input asset not found") job.progress = 5 job.api_provider = "whisper" db.commit() # Get all parameters with defaults source_language = input_data.get("source_language", "auto") target_language = input_data.get("target_language") burn_subtitles = input_data.get("burn_subtitles", False) whisper_model = input_data.get("whisper_model", "base") word_timestamps = input_data.get("word_timestamps", False) output_format = input_data.get("output_format", "srt").lower() # Styling parameters font_preset = input_data.get("font_preset") if font_preset and font_preset in FONT_PRESETS: preset = FONT_PRESETS[font_preset] font_req = input_data.get("font", preset['font']) font_size = input_data.get("font_size", preset['size']) outline_width = input_data.get("outline_width", preset['outline']) else: font_req = input_data.get("font", "Arial") font_size = input_data.get("font_size", 24) outline_width = input_data.get("outline_width", 2) # Validate font availability font = _check_font_availability(font_req) if font != font_req: logger.warning(f"Font '{font_req}' not found, falling back to '{font}'") text_color = input_data.get("text_color", "white") outline_color = input_data.get("outline_color", "black") background_color = input_data.get("background_color") background_opacity = input_data.get("background_opacity", 0) position = input_data.get("position", "bottom") alignment = input_data.get("alignment", "center") margin_v = input_data.get("margin_v", 30) margin_h = input_data.get("margin_h", 20) shadow = input_data.get("shadow", 0) bold = input_data.get("bold", False) italic = input_data.get("italic", False) # Extract audio from video audio_path = os.path.join(settings.storage_path, "temp", f"{uuid4()}.wav") os.makedirs(os.path.dirname(audio_path), exist_ok=True) subprocess.run([ "ffmpeg", "-i", input_asset.file_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", audio_path ], check=True, capture_output=True) job.progress = 20 db.commit() # Transcribe with Whisper import whisper logger.info(f"Loading Whisper model: {whisper_model}") model = whisper.load_model(whisper_model) transcribe_options = { "language": None if source_language == "auto" else source_language, "verbose": False, "word_timestamps": word_timestamps } result = model.transcribe(audio_path, **transcribe_options) job.progress = 50 job.api_model = f"whisper-{whisper_model}" db.commit() # Generate subtitle content segments = result.get("segments", []) detected_language = result.get("language", source_language) if output_format == "vtt": subtitle_content = _generate_vtt(segments, word_timestamps) subtitle_ext = "vtt" elif output_format == "ass": subtitle_content = _generate_ass(segments, font, font_size, text_color, outline_color, outline_width, position, alignment, margin_v, margin_h, shadow, bold, italic, background_color, background_opacity) subtitle_ext = "ass" else: subtitle_content = _generate_srt(segments) subtitle_ext = "srt" # Helper validation for font font = _check_font_availability(font_req) # Check for provided subtitle file subtitle_asset_id = input_data.get("subtitle_asset_id") segments = [] detected_language = source_language subtitle_content = "" transcription_result = {} # To store result for output_data if subtitle_asset_id: logger.info("Using provided subtitle asset", asset_id=subtitle_asset_id) sub_asset = db.query(Asset).filter(Asset.id == subtitle_asset_id).first() if not sub_asset or not os.path.exists(sub_asset.file_path): raise ValueError("Provided subtitle asset not found") with open(sub_asset.file_path, "r", encoding="utf-8") as f: subtitle_content = f.read() # Simple assumption: Input is SRT if we are parsing it # In future we might check extension segments = _parse_srt(subtitle_content) job.progress = 50 # Skip transcription transcription_result = {"text": "Imported from SRT", "language": source_language} else: # Transcribe with Whisper job.api_provider = f"whisper-{whisper_model}" import whisper model = whisper.load_model(whisper_model) # Extract audio to temp file temp_audio = os.path.join(settings.storage_path, "temp", f"temp_{uuid4()}.wav") os.makedirs(os.path.dirname(temp_audio), exist_ok=True) try: subprocess.run([ "ffmpeg", "-i", input_asset.file_path, "-ar", "16000", "-ac", "1", "-c:a", "pcm_s16le", "-y", temp_audio ], check=True, capture_output=True) job.progress = 20 db.commit() # Transcribe transcribe_options = { "language": None if source_language == "auto" else source_language, "verbose": False, "word_timestamps": word_timestamps } result = model.transcribe(temp_audio, **transcribe_options) segments = result["segments"] detected_language = result["language"] transcription_result = result # Generate initial subtitle content if output_format == "vtt": subtitle_content = _generate_vtt(segments, word_timestamps) elif output_format == "ass": subtitle_content = _generate_ass(segments, font, font_size, text_color, outline_color, outline_width, position, alignment, margin_v, margin_h, shadow, bold, italic, background_color, background_opacity) else: subtitle_content = _generate_srt(segments) finally: if os.path.exists(temp_audio): os.remove(temp_audio) job.progress = 60 db.commit() # Update subtitle extension based on format subtitle_ext = "srt" if output_format == "vtt": subtitle_ext = "vtt" elif output_format == "ass": subtitle_ext = "ass" # Translate if needed translated_content = None if target_language: job.api_provider = "whisper+deepl" import deepl translator = deepl.Translator(settings.deepl_api_key) # Translate only the text content text_for_translation = "\n".join([seg.get("text", "").strip() for seg in segments]) translated_text = translator.translate_text( text_for_translation, target_lang=target_language ).text # Rebuild the subtitles with translated text translated_lines = translated_text.split("\n") translated_segments = [] for i, seg in enumerate(segments): new_seg = seg.copy() if i < len(translated_lines): new_seg["text"] = translated_lines[i] translated_segments.append(new_seg) if output_format == "vtt": translated_content = _generate_vtt(translated_segments, word_timestamps) elif output_format == "ass": translated_content = _generate_ass(translated_segments, font, font_size, text_color, outline_color, outline_width, position, alignment, margin_v, margin_h, shadow, bold, italic, background_color, background_opacity) else: translated_content = _generate_srt(translated_segments) job.progress = 70 db.commit() output_assets = [] # Save original subtitle file # Use simple naming convention base_name = os.path.splitext(input_asset.original_filename)[0] # Clean basename of special chars if needed, but for now just use it subtitle_filename = f"{base_name}-subtitles-{detected_language}.{subtitle_ext}" subtitle_path = os.path.join(settings.storage_path, "documents", subtitle_filename) # Handle duplicate filenames by appending uuid if needed, or just overwrite since uuid approach was replaced # Actually user wants to "see whats been done", so readable names are key. # If we overwrite, that's fine as per previous discussion, but unique names prevent collision in shared storage? # Let's stick to unique names but readable: base-subtitles-lang-uuid_short.ext short_id = str(uuid4())[:8] subtitle_filename = f"{base_name}-subtitles-{detected_language}-{short_id}.{subtitle_ext}" subtitle_path = os.path.join(settings.storage_path, "documents", subtitle_filename) os.makedirs(os.path.dirname(subtitle_path), exist_ok=True) with open(subtitle_path, "w", encoding="utf-8") as f: f.write(subtitle_content) subtitle_asset = Asset( user_id=job.user_id, project_id=job.project_id, original_filename=subtitle_filename, stored_filename=subtitle_filename, file_path=subtitle_path, file_type="document", mime_type="text/plain", file_size_bytes=len(subtitle_content.encode()), source_module="subtitle_processor", source_job_id=job.id, parent_asset_id=input_asset.id, asset_metadata={ "language": detected_language, "type": "original", "format": output_format, "whisper_model": whisper_model } ) db.add(subtitle_asset) db.commit() db.refresh(subtitle_asset) output_assets.append(subtitle_asset.id) # Save translated subtitle if exists trans_path = None if translated_content: short_id = str(uuid4())[:8] trans_filename = f"{base_name}-subtitles-{target_language}-{short_id}.{subtitle_ext}" trans_path = os.path.join(settings.storage_path, "documents", trans_filename) with open(trans_path, "w", encoding="utf-8") as f: f.write(translated_content) trans_asset = Asset( user_id=job.user_id, project_id=job.project_id, original_filename=trans_filename, stored_filename=trans_filename, file_path=trans_path, file_type="document", mime_type="text/plain", file_size_bytes=len(translated_content.encode()), source_module="subtitle_processor", source_job_id=job.id, parent_asset_id=input_asset.id, asset_metadata={ "language": target_language, "type": "translated", "format": output_format } ) db.add(trans_asset) db.commit() db.refresh(trans_asset) output_assets.append(trans_asset.id) job.progress = 80 db.commit() # Burn subtitles if requested if burn_subtitles: burn_path = trans_path if translated_content else subtitle_path # Burned video lang_code = target_language if translated_content else detected_language short_id = str(uuid4())[:8] output_filename = f"{base_name}-subtitled-{lang_code}-{short_id}.mp4" output_path = os.path.join(settings.storage_path, "videos", output_filename) os.makedirs(os.path.dirname(output_path), exist_ok=True) # Build the FFmpeg subtitle filter subtitle_filter = _build_subtitle_filter( burn_path, font, font_size, text_color, outline_color, outline_width, position, alignment, margin_v, margin_h, shadow, bold, italic, background_color, background_opacity ) try: result = subprocess.run([ "ffmpeg", "-i", input_asset.file_path, "-vf", subtitle_filter, "-c:a", "copy", "-y", output_path ], check=True, capture_output=True) except subprocess.CalledProcessError as e: logger.error("FFmpeg burning failed", stderr=e.stderr.decode('utf-8')) raise RuntimeError(f"FFmpeg error: {e.stderr.decode('utf-8')[-500:]}") video_size = os.path.getsize(output_path) video_asset = Asset( user_id=job.user_id, project_id=job.project_id, original_filename=output_filename, stored_filename=output_filename, file_path=output_path, file_type="video", mime_type="video/mp4", file_size_bytes=video_size, width=input_asset.width, height=input_asset.height, duration_seconds=input_asset.duration_seconds, source_module="subtitle_processor", source_job_id=job.id, parent_asset_id=input_asset.id, asset_metadata={ "burned_subtitles": True, "subtitle_language": target_language or detected_language, "styling": { "font": font, "font_size": font_size, "text_color": text_color, "position": position } } ) db.add(video_asset) db.commit() db.refresh(video_asset) output_assets.append(video_asset.id) # Cleanup temp audio if 'audio_path' in locals() and audio_path and os.path.exists(audio_path): os.remove(audio_path) job.output_asset_ids = output_assets job.output_data = { "transcript": transcription_result.get("text", ""), "language": detected_language, "segments_count": len(segments), "word_timestamps": word_timestamps, "output_format": output_format, "translated": bool(translated_content), "burned": burn_subtitles, "asset_ids": [str(a) for a in output_assets] } job.progress = 100 job.status = "completed" job.completed_at = datetime.utcnow() db.commit() except Exception as e: logger.error(f"Subtitle processing error: {e}", exc_info=True) job.status = "failed" job.error_message = str(e) db.commit() finally: db.close() def _generate_srt(segments: list) -> str: """Generate SRT format from segments""" srt_lines = [] for i, segment in enumerate(segments, 1): start = _format_srt_timestamp(segment['start']) end = _format_srt_timestamp(segment['end']) text = segment['text'].strip() srt_lines.append(f"{i}\n{start} --> {end}\n{text}\n") return "\n".join(srt_lines) def _generate_vtt(segments: list, word_timestamps: bool = False) -> str: """Generate WebVTT format from segments""" vtt_lines = ["WEBVTT", ""] for i, segment in enumerate(segments, 1): start = _format_vtt_timestamp(segment['start']) end = _format_vtt_timestamp(segment['end']) text = segment['text'].strip() # Add word-level timestamps if available if word_timestamps and 'words' in segment: words_with_timing = [] for word in segment['words']: word_start = _format_vtt_timestamp(word['start']) words_with_timing.append(f"<{word_start}>{word['word']}") text = "".join(words_with_timing) vtt_lines.append(f"{i}") vtt_lines.append(f"{start} --> {end}") vtt_lines.append(text) vtt_lines.append("") return "\n".join(vtt_lines) def _generate_ass(segments: list, font: str, font_size: int, text_color: str, outline_color: str, outline_width: float, position: str, alignment: str, margin_v: int, margin_h: int, shadow: int, bold: bool, italic: bool, background_color: Optional[str], background_opacity: float) -> str: """Generate ASS (Advanced SubStation Alpha) format with full styling""" # Convert colors to ASS format (&HBBGGRR) primary_hex = COLOR_MAP.get(text_color.lower(), 'FFFFFF') outline_hex = COLOR_MAP.get(outline_color.lower(), '000000') # Calculate alignment value (SSA uses different numbering) # 1=left-bottom, 2=center-bottom, 3=right-bottom # 4=left-middle, 5=center-middle, 6=right-middle # 7=left-top, 8=center-top, 9=right-top align_map = { ('left', 'bottom'): 1, ('center', 'bottom'): 2, ('right', 'bottom'): 3, ('left', 'center'): 4, ('center', 'center'): 5, ('right', 'center'): 6, ('left', 'top'): 7, ('center', 'top'): 8, ('right', 'top'): 9 } ass_alignment = align_map.get((alignment, position), 2) # Background color with opacity back_alpha = hex(int((1 - background_opacity) * 255))[2:].upper().zfill(2) if background_color: back_hex = COLOR_MAP.get(background_color.lower(), '000000') back_color = f"&H{back_alpha}{back_hex}" else: back_color = f"&H{back_alpha}000000" # Font weight and style bold_val = -1 if bold else 0 italic_val = -1 if italic else 0 ass_content = f"""[Script Info] Title: Generated Subtitles ScriptType: v4.00+ PlayResX: 1920 PlayResY: 1080 ScaledBorderAndShadow: yes [V4+ Styles] Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding Style: Default,{font},{font_size},&H00{primary_hex},&H00{primary_hex},&H00{outline_hex},{back_color},{bold_val},{italic_val},0,0,100,100,0,0,1,{outline_width},{shadow},{ass_alignment},{margin_h},{margin_h},{margin_v},1 [Events] Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text """ def _parse_srt(content: str) -> list: """Parse SRT content into segments""" segments = [] blocks = content.strip().split('\n\n') for block in blocks: lines = block.strip().split('\n') if len(lines) >= 3: # Parse timestamp line times = lines[1].split(' --> ') if len(times) != 2: continue start_str, end_str = times def parse_time(t_str): t_str = t_str.replace(',', '.') parts = t_str.split(':') return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2]) start = parse_time(start_str) end = parse_time(end_str) text = "\n".join(lines[2:]) segments.append({ 'start': start, 'end': end, 'text': text }) return segments for segment in segments: start = _format_ass_timestamp(segment['start']) end = _format_ass_timestamp(segment['end']) text = segment['text'].strip().replace('\n', '\\N') ass_content += f"Dialogue: 0,{start},{end},Default,,0,0,0,,{text}\n" return ass_content def _parse_srt(content: str) -> list: """Parse SRT content into segments""" segments = [] blocks = content.strip().split('\n\n') for block in blocks: lines = block.strip().split('\n') if len(lines) >= 3: # Parse timestamp line times = lines[1].split(' --> ') if len(times) != 2: continue start_str, end_str = times def parse_time(t_str): t_str = t_str.replace(',', '.') parts = t_str.split(':') return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2]) start = parse_time(start_str) end = parse_time(end_str) text = "\n".join(lines[2:]) segments.append({ 'start': start, 'end': end, 'text': text }) return segments def _format_srt_timestamp(seconds: float) -> str: """Convert seconds to SRT timestamp format (HH:MM:SS,mmm)""" td = timedelta(seconds=seconds) hours = td.seconds // 3600 minutes = (td.seconds % 3600) // 60 secs = td.seconds % 60 millis = td.microseconds // 1000 return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}" def _format_vtt_timestamp(seconds: float) -> str: """Convert seconds to WebVTT timestamp format (HH:MM:SS.mmm)""" td = timedelta(seconds=seconds) hours = td.seconds // 3600 minutes = (td.seconds % 3600) // 60 secs = td.seconds % 60 millis = td.microseconds // 1000 return f"{hours:02d}:{minutes:02d}:{secs:02d}.{millis:03d}" def _format_ass_timestamp(seconds: float) -> str: """Convert seconds to ASS timestamp format (H:MM:SS.cc)""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) centisecs = int((seconds - int(seconds)) * 100) return f"{hours}:{minutes:02d}:{secs:02d}.{centisecs:02d}" def _build_subtitle_filter(subtitle_path: str, font: str, font_size: int, text_color: str, outline_color: str, outline_width: float, position: str, alignment: str, margin_v: int, margin_h: int, shadow: int, bold: bool, italic: bool, background_color: Optional[str], background_opacity: float) -> str: """Build FFmpeg subtitle filter with styling""" # Determine if we're using ASS file (has its own styling) if subtitle_path.endswith('.ass'): return f"ass={subtitle_path}" # Get hex colors primary_hex = COLOR_MAP.get(text_color.lower(), 'FFFFFF') outline_hex = COLOR_MAP.get(outline_color.lower(), '000000') # Calculate alignment for subtitles filter # SSA/ASS alignment: 1-3 bottom, 4-6 middle, 7-9 top align_map = { ('left', 'bottom'): 1, ('center', 'bottom'): 2, ('right', 'bottom'): 3, ('left', 'center'): 4, ('center', 'center'): 5, ('right', 'center'): 6, ('left', 'top'): 7, ('center', 'top'): 8, ('right', 'top'): 9 } ass_alignment = align_map.get((alignment, position), 2) # Build force_style string style_parts = [ f"Fontname={font}", f"Fontsize={font_size}", f"PrimaryColour=&H00{primary_hex}", f"OutlineColour=&H00{outline_hex}", f"BorderStyle=1", f"Outline={outline_width:.1f}", f"Shadow={shadow}", f"Alignment={ass_alignment}", f"MarginL={margin_h}", f"MarginR={margin_h}", f"MarginV={margin_v}" ] if bold: style_parts.append("Bold=1") if italic: style_parts.append("Italic=1") # Add background if specified if background_color and background_opacity > 0: back_alpha = hex(int((1 - background_opacity) * 255))[2:].upper().zfill(2) back_hex = COLOR_MAP.get(background_color.lower(), '000000') style_parts.append(f"BackColour=&H{back_alpha}{back_hex}") style_parts.append("BorderStyle=4") # Opaque box style force_style = ",".join(style_parts) # Escape the subtitle path for FFmpeg escaped_path = subtitle_path.replace("'", "'\\''").replace(":", "\\:") return f"subtitles='{escaped_path}':force_style='{force_style}'"