import json import asyncio import time from pathlib import Path from typing import Any, Optional import google.genai as genai from ..core.config import settings from ..core.logging import get_logger logger = get_logger(__name__) # Configure Gemini client client = genai.Client(api_key=settings.gemini_api_key) async def _record_gemini_usage( response, model: str, user_id: str, job_id: str, project_id: Optional[str], elapsed_ms: int, ) -> None: try: from ..services import cost_tracker usage = getattr(response, "usage_metadata", None) if usage is None: return await cost_tracker.aio_record( model=model, provider="google", user_external_id=user_id, project_id=project_id, job_external_id=job_id, input_tokens=getattr(usage, "prompt_token_count", 0) or 0, output_tokens=getattr(usage, "candidates_token_count", 0) or 0, latency_ms=elapsed_ms, ) except Exception as e: logger.warning(f"Cost tracker record failed (non-fatal): {e}") class GeminiService: def __init__(self): self.model_name = 'gemini-3-pro-preview' # Gemini 3 Pro preview model self.prompts_dir = Path(__file__).parent.parent / "prompts" def _load_prompt(self, prompt_file: str) -> str: """Load prompt template from prompts directory""" prompt_path = self.prompts_dir / prompt_file try: return prompt_path.read_text() except FileNotFoundError: logger.error(f"Prompt file not found: {prompt_file}") raise async def _wait_for_file_active(self, file_name: str, max_wait_seconds: int = 300) -> bool: """Wait for uploaded file to become ACTIVE state""" wait_time = 1 # Start with 1 second total_waited = 0 while total_waited < max_wait_seconds: try: # Get file status - use asyncio.to_thread to avoid blocking event loop file_info = await asyncio.to_thread(client.files.get, name=file_name) logger.info(f"File {file_name} status: {file_info.state} (waited {total_waited}s)") if file_info.state == "ACTIVE": logger.info(f"File {file_name} is now ACTIVE!") return True elif file_info.state == "FAILED": logger.error(f"File {file_name} processing FAILED") return False # Wait with exponential backoff (max 30s) logger.info(f"File not ready, waiting {wait_time}s...") await asyncio.sleep(wait_time) total_waited += wait_time wait_time = min(wait_time * 1.5, 30) # Exponential backoff, max 30s except Exception as e: logger.error(f"Error checking file status: {e}") await asyncio.sleep(5) # Wait 5s on error total_waited += 5 logger.error(f"File {file_name} did not become ACTIVE within {max_wait_seconds}s") return False def _build_sdh_field(self, sdh_requested: bool) -> str: if sdh_requested: return "- sdh_captions_vtt: a valid WebVTT file as a single string, containing SDH-format captions (same timing as captions_vtt, but enriched with speaker labels, sound effects, and music notation)" return "" def _build_sdh_guidelines(self, sdh_requested: bool) -> str: if not sdh_requested: return "" return """SDH (SUBTITLES FOR THE DEAF AND HARD OF HEARING) GUIDELINES: Generate sdh_captions_vtt using the same cue timings as captions_vtt, enriched with: - Speaker identification when multiple speakers are present: use "NAME:" prefix (e.g., "JOHN: Hello there") or "[NARRATOR]" for narration - Non-speech sounds that are plot-relevant, in square brackets: [DOOR SLAMS], [PHONE RINGS], [CROWD CHEERING], [THUNDER] - Music: use ♪ for background music cues (e.g., "♪ tense music ♪") or ♪ around sung lyrics - Off-screen or voice-over speakers: indicate with "(off-screen)" or "[V.O.]" where relevant - Non-speech vocalisations when relevant: [SIGHS], [LAUGHS], [SCREAMS] - Maintain the same timestamp format as captions_vtt (HH:MM:SS.mmm --> HH:MM:SS.mmm) - Only add sound effect cues where they add meaningful context; do not annotate every minor sound""" def _build_brand_context_block(self, brand_context: Optional[str]) -> str: """Build the brand context instruction block for injection into prompts.""" if brand_context and brand_context.strip(): brands = [b.strip() for b in brand_context.split(",") if b.strip()] if brands: brand_list = ", ".join(f'"{b}"' for b in brands) return ( f"The client has confirmed the following brand names appear in this video: {brand_list}. " f"Use these exact brand names when you identify those products on screen." ) return "No specific brand names have been provided for this video." async def extract_accessibility(self, video_file_path: str, brand_context: Optional[str] = None, sdh_requested: bool = False, _cost_ctx: Optional[dict] = None) -> dict[str, Any]: """ Extract captions and audio descriptions from video using Gemini 2.0 Returns structured JSON with transcript, captions VTT, and audio description VTT """ prompt_template = self._load_prompt("gemini_ingestion.md") prompt = ( prompt_template .replace("{BRAND_CONTEXT}", self._build_brand_context_block(brand_context)) .replace("{SDH_FIELD}", self._build_sdh_field(sdh_requested)) .replace("{SDH_GUIDELINES}", self._build_sdh_guidelines(sdh_requested)) ) uploaded_file = None try: logger.info(f"Starting Gemini processing for video: {video_file_path}") # Upload video file to Gemini using new API - use asyncio.to_thread to avoid blocking logger.info("Uploading video file to Gemini API...") uploaded_file = await asyncio.to_thread( client.files.upload, file=video_file_path, config={ "display_name": f"video_processing_{Path(video_file_path).name}", "mime_type": "video/mp4" } ) logger.info(f"Successfully uploaded file: {uploaded_file.name} (URI: {uploaded_file.uri})") # Wait for file to become ACTIVE before using it logger.info("Waiting for file to become ACTIVE...") file_ready = await self._wait_for_file_active(uploaded_file.name) if not file_ready: raise Exception("File failed to become ACTIVE within timeout") # Generate content using new API - use asyncio.to_thread to avoid blocking logger.info("Generating content with Gemini model...") _t0 = time.monotonic() response = await asyncio.to_thread( client.models.generate_content, model=self.model_name, contents=[ genai.types.Part.from_text(text=prompt), genai.types.Part.from_uri( file_uri=uploaded_file.uri, mime_type=uploaded_file.mime_type ) ], config=genai.types.GenerateContentConfig( temperature=0.2, # Lower temperature for consistent, deterministic AD output top_p=0.8, top_k=40, ), ) if _cost_ctx: asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000))) # Parse JSON response response_text = response.text.strip() logger.info(f"Received Gemini response (first 200 chars): {response_text[:200]}...") # Handle potential markdown formatting if response_text.startswith("```json"): response_text = response_text.replace("```json", "").replace("```", "").strip() logger.info("Cleaned markdown formatting from response") # Additional cleanup for common JSON issues response_text = response_text.strip() logger.info("Parsing JSON response...") try: result = json.loads(response_text) except json.JSONDecodeError as e: logger.error(f"JSON parse error at position {e.pos}: {e.msg}") # Log the problematic area start = max(0, e.pos - 100) end = min(len(response_text), e.pos + 100) problematic_text = response_text[start:end] logger.error(f"Problematic JSON area: ...{problematic_text}...") raise # Validate required fields required_fields = [ "language", "confidence", "summary", "transcript_plaintext", "captions_vtt", "audio_description_vtt" ] for field in required_fields: if field not in result: raise ValueError(f"Missing required field: {field}") # Validate VTT format if not result["captions_vtt"].startswith("WEBVTT"): raise ValueError("Invalid captions VTT format") if not result["audio_description_vtt"].startswith("WEBVTT"): raise ValueError("Invalid audio description VTT format") logger.info( f"Successfully extracted accessibility content with confidence: {result['confidence']}" ) return result except json.JSONDecodeError as e: logger.error(f"Failed to parse Gemini JSON response: {e}") logger.error(f"Raw response that failed to parse: {response_text}") # Attempt self-healing return await self._self_heal_response(video_file_path, response_text) except Exception as e: logger.error(f"Gemini extraction failed with exception: {type(e).__name__}: {str(e)}") logger.error(f"Video file path: {video_file_path}") # Print to stdout for immediate visibility print(f"🚨 GEMINI ERROR: {type(e).__name__}: {str(e)}") raise finally: # Guaranteed cleanup of uploaded file regardless of success/failure/cancellation if uploaded_file: try: await asyncio.to_thread(client.files.delete, name=uploaded_file.name) logger.info(f"Successfully cleaned up uploaded file: {uploaded_file.name}") except Exception as e: logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}") async def _self_heal_response(self, video_file_path: str, invalid_response: str) -> dict[str, Any]: """Attempt to self-heal invalid JSON response from Gemini""" logger.info("Attempting to self-heal JSON response without re-uploading video") # Try to fix common JSON issues first try: fixed_response = self._attempt_json_fix(invalid_response) if fixed_response: logger.info("Successfully fixed JSON without re-processing") return fixed_response except Exception as e: logger.warning(f"JSON fix attempt failed: {e}") # If simple fixes don't work, try a text-only self-heal prompt with more context self_heal_prompt = f""" SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON. CRITICAL REQUIREMENTS: - The JSON MUST contain these exact fields: language, confidence, summary, transcript_plaintext, captions_vtt, audio_description_vtt - If audio_description_vtt is truncated or missing, reconstruct it as a valid WebVTT with at least basic descriptions - All VTT content must start with "WEBVTT" and have proper timestamp format (HH:MM:SS.mmm --> HH:MM:SS.mmm) - Properly escape all quotes within strings using \" - Fix unterminated strings by adding closing quotes - Remove trailing commas - Ensure all JSON is properly closed with }} Fix the JSON and return it: {invalid_response} """ try: response = await asyncio.to_thread( client.models.generate_content, model=self.model_name, contents=[genai.types.Part.from_text(text=self_heal_prompt)] ) response_text = response.text.strip() # Handle potential markdown formatting if response_text.startswith("```json"): response_text = response_text.replace("```json", "").replace("```", "").strip() result = json.loads(response_text) # Validate that all required fields are present after healing required_fields = [ "language", "confidence", "summary", "transcript_plaintext", "captions_vtt", "audio_description_vtt" ] missing_fields = [field for field in required_fields if field not in result] if missing_fields: logger.error(f"Self-heal lost required fields: {missing_fields}") # If audio_description_vtt is missing, create a basic one if "audio_description_vtt" in missing_fields: logger.info("Creating fallback audio_description_vtt") result["audio_description_vtt"] = "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\nVideo content with visual elements described." # If other critical fields are missing, raise an error remaining_missing = [f for f in missing_fields if f != "audio_description_vtt"] if remaining_missing: raise ValueError(f"Self-heal failed to preserve required fields: {remaining_missing}") logger.info("Successfully self-healed Gemini response with all required fields") return result except Exception as e: logger.error(f"Self-heal attempt failed: {e}") raise ValueError("Failed to get valid JSON from Gemini after self-heal attempt") async def extract_accessibility_targeted( self, video_file_path: str, target_language: str, brand_context: Optional[str] = None, sdh_requested: bool = False, _cost_ctx: Optional[dict] = None, ) -> dict[str, Any]: """ Extract captions and audio descriptions from video using Gemini, generating content directly in the specified target language. Unlike extract_accessibility() which auto-detects language, this method takes an explicit target language and generates all outputs in that language. This is used for "video_native" translation mode which re-processes the video for each target language with full visual context. Args: video_file_path: Path to the video file target_language: BCP-47 language code (e.g., "es", "fr", "de") brand_context: Optional comma-separated brand names present in the video Returns: Structured JSON with transcript, captions VTT, and audio description VTT all in the target language """ prompt_template = self._load_prompt("gemini_ingestion_targeted.md") prompt = ( prompt_template .replace("{TARGET_LANGUAGE}", target_language) .replace("{BRAND_CONTEXT}", self._build_brand_context_block(brand_context)) .replace("{SDH_FIELD}", self._build_sdh_field(sdh_requested)) .replace("{SDH_GUIDELINES}", self._build_sdh_guidelines(sdh_requested)) ) uploaded_file = None try: logger.info(f"Starting Gemini targeted processing for video: {video_file_path}, target: {target_language}") # Upload video file to Gemini using new API logger.info("Uploading video file to Gemini API for targeted extraction...") uploaded_file = await asyncio.to_thread( client.files.upload, file=video_file_path, config={ "display_name": f"video_processing_targeted_{target_language}_{Path(video_file_path).name}", "mime_type": "video/mp4" } ) logger.info(f"Successfully uploaded file: {uploaded_file.name} (URI: {uploaded_file.uri})") # Wait for file to become ACTIVE before using it logger.info("Waiting for file to become ACTIVE...") file_ready = await self._wait_for_file_active(uploaded_file.name) if not file_ready: raise Exception("File failed to become ACTIVE within timeout") # Generate content using new API logger.info(f"Generating content with Gemini model for {target_language}...") _t0 = time.monotonic() response = await asyncio.to_thread( client.models.generate_content, model=self.model_name, contents=[ genai.types.Part.from_text(text=prompt), genai.types.Part.from_uri( file_uri=uploaded_file.uri, mime_type=uploaded_file.mime_type ) ] ) if _cost_ctx: asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000))) # Parse JSON response response_text = response.text.strip() logger.info(f"Received Gemini targeted response for {target_language} (first 200 chars): {response_text[:200]}...") # Handle potential markdown formatting if response_text.startswith("```json"): response_text = response_text.replace("```json", "").replace("```", "").strip() logger.info("Cleaned markdown formatting from response") response_text = response_text.strip() logger.info("Parsing JSON response...") try: result = json.loads(response_text) except json.JSONDecodeError as e: logger.error(f"JSON parse error at position {e.pos}: {e.msg}") start = max(0, e.pos - 100) end = min(len(response_text), e.pos + 100) problematic_text = response_text[start:end] logger.error(f"Problematic JSON area: ...{problematic_text}...") # Attempt self-healing return await self._self_heal_targeted_response(target_language, response_text) # Validate required fields required_fields = [ "language", "confidence", "summary", "transcript_plaintext", "captions_vtt", "audio_description_vtt" ] for field in required_fields: if field not in result: raise ValueError(f"Missing required field: {field}") # Validate VTT format if not result["captions_vtt"].startswith("WEBVTT"): raise ValueError("Invalid captions VTT format") if not result["audio_description_vtt"].startswith("WEBVTT"): raise ValueError("Invalid audio description VTT format") logger.info( f"Successfully extracted targeted accessibility content for {target_language} " f"with confidence: {result['confidence']}" ) return result except json.JSONDecodeError as e: logger.error(f"Failed to parse Gemini JSON response: {e}") logger.error(f"Raw response that failed to parse: {response_text}") return await self._self_heal_targeted_response(target_language, response_text) except Exception as e: logger.error(f"Gemini targeted extraction failed for {target_language}: {type(e).__name__}: {str(e)}") logger.error(f"Video file path: {video_file_path}") print(f"🚨 GEMINI TARGETED ERROR ({target_language}): {type(e).__name__}: {str(e)}") raise finally: # Cleanup uploaded file if uploaded_file: try: await asyncio.to_thread(client.files.delete, name=uploaded_file.name) logger.info(f"Successfully cleaned up uploaded file: {uploaded_file.name}") except Exception as e: logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}") async def _self_heal_targeted_response( self, target_language: str, invalid_response: str ) -> dict[str, Any]: """Attempt to self-heal invalid JSON response from targeted extraction""" logger.info(f"Attempting to self-heal targeted response for {target_language}") # Try to fix common JSON issues first try: fixed_response = self._attempt_json_fix(invalid_response) if fixed_response: logger.info("Successfully fixed JSON without re-processing") return fixed_response except Exception as e: logger.warning(f"JSON fix attempt failed: {e}") self_heal_prompt = f""" SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON. CRITICAL REQUIREMENTS: - The JSON MUST contain these exact fields: language, confidence, summary, transcript_plaintext, captions_vtt, audio_description_vtt - All content should be in {target_language} - If audio_description_vtt is truncated or missing, reconstruct it as a valid WebVTT with at least basic descriptions in {target_language} - All VTT content must start with "WEBVTT" and have proper timestamp format (HH:MM:SS.mmm --> HH:MM:SS.mmm) - Properly escape all quotes within strings using \" - Fix unterminated strings by adding closing quotes - Remove trailing commas - Ensure all JSON is properly closed with }} Fix the JSON and return it: {invalid_response} """ try: response = await asyncio.to_thread( client.models.generate_content, model=self.model_name, contents=[genai.types.Part.from_text(text=self_heal_prompt)] ) response_text = response.text.strip() if response_text.startswith("```json"): response_text = response_text.replace("```json", "").replace("```", "").strip() result = json.loads(response_text) required_fields = [ "language", "confidence", "summary", "transcript_plaintext", "captions_vtt", "audio_description_vtt" ] missing_fields = [field for field in required_fields if field not in result] if missing_fields: logger.error(f"Self-heal lost required fields: {missing_fields}") if "audio_description_vtt" in missing_fields: logger.info("Creating fallback audio_description_vtt") result["audio_description_vtt"] = "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\nVideo content with visual elements described." remaining_missing = [f for f in missing_fields if f != "audio_description_vtt"] if remaining_missing: raise ValueError(f"Self-heal failed to preserve required fields: {remaining_missing}") logger.info(f"Successfully self-healed targeted response for {target_language}") return result except Exception as e: logger.error(f"Self-heal attempt failed for {target_language}: {e}") raise ValueError(f"Failed to get valid JSON from Gemini targeted extraction for {target_language}") def _attempt_json_fix(self, json_text: str) -> dict[str, Any] | None: """Attempt to fix common JSON syntax issues""" # Try to identify and fix common issues fixes_tried = [] fixed_text = json_text import re # Fix 1: Remove trailing commas fixed_text = re.sub(r',(\s*[}\]])', r'\1', fixed_text) fixes_tried.append("removed trailing commas") # Fix 2: Try to fix unterminated strings by adding closing quote and brace if fixed_text.count('"') % 2 != 0: # Odd number of quotes suggests unterminated string # Find the last quote and see if we need to close the JSON last_quote_pos = fixed_text.rfind('"') remainder = fixed_text[last_quote_pos + 1:].strip() # If there's no closing brace after the last quote, try to fix it if remainder and not remainder.endswith('}'): # Try to intelligently close the JSON if 'audio_description_vtt' in fixed_text[max(0, last_quote_pos - 100):]: # This appears to be in the audio_description_vtt field fixed_text += '"\n}' fixes_tried.append("closed unterminated audio_description_vtt string") else: fixed_text += '"' fixes_tried.append("closed unterminated string") # Fix 3: Ensure JSON ends with closing brace if not fixed_text.rstrip().endswith('}'): fixed_text = fixed_text.rstrip() + '\n}' fixes_tried.append("added closing brace") try: result = json.loads(fixed_text) logger.info(f"JSON fixed with: {', '.join(fixes_tried)}") # Validate that we have the required fields required_fields = [ "language", "confidence", "summary", "transcript_plaintext", "captions_vtt", "audio_description_vtt" ] missing_fields = [field for field in required_fields if field not in result] if missing_fields: logger.warning(f"Fixed JSON is missing required fields: {missing_fields}") return None # Let the more advanced self-healing handle this return result except json.JSONDecodeError as e: logger.debug(f"JSON fix attempt failed: {e}") return None async def analyze_accessible_video_placement( self, video_file_path: str, ad_vtt_content: str, ad_cue_durations: list[float] ) -> dict[str, Any]: """ DEPRECATED: This function is no longer called in the render pipeline. Pause points are now derived from AD VTT cue start times and refined by Whisper. Method selection (overlay/pause_insert) is done by user at QC Review approval. This function is kept for potential future use or rollback scenarios. See render_accessible_video._build_placements_from_ad_vtt() for the replacement logic. --- Original description: Analyze video and determine optimal method for integrating audio descriptions. Returns placement instructions for each AD cue. Args: video_file_path: Path to the source video file ad_vtt_content: The audio description VTT content ad_cue_durations: List of actual TTS audio durations in seconds (matching VTT cue order) Returns: Dictionary with method choice and placement instructions for each AD cue """ import warnings warnings.warn( "analyze_accessible_video_placement is deprecated. " "Pause points are now derived from AD VTT cue start times and refined by Whisper.", DeprecationWarning, stacklevel=2 ) prompt_template = self._load_prompt("gemini_accessible_video.md") # Format prompt with AD VTT content and durations prompt = prompt_template.replace( "{AD_VTT_CONTENT}", ad_vtt_content ).replace( "{AD_CUE_DURATIONS}", json.dumps(ad_cue_durations) ) uploaded_file = None try: logger.info(f"Starting accessible video analysis for: {video_file_path}") logger.info(f"AD cues to place: {len(ad_cue_durations)}") # Upload video file to Gemini logger.info("Uploading video file to Gemini API for accessible video analysis...") uploaded_file = await asyncio.to_thread( client.files.upload, file=video_file_path, config={ "display_name": f"accessible_video_analysis_{Path(video_file_path).name}", "mime_type": "video/mp4" } ) logger.info(f"Successfully uploaded file: {uploaded_file.name}") # Wait for file to become ACTIVE logger.info("Waiting for file to become ACTIVE...") file_ready = await self._wait_for_file_active(uploaded_file.name) if not file_ready: raise Exception("File failed to become ACTIVE within timeout") # Generate content with video and prompt logger.info("Analyzing video with Gemini for accessible video placement...") response = await asyncio.to_thread( client.models.generate_content, model=self.model_name, contents=[ genai.types.Part.from_text(text=prompt), genai.types.Part.from_uri( file_uri=uploaded_file.uri, mime_type=uploaded_file.mime_type ) ] ) # Parse JSON response response_text = response.text.strip() logger.info(f"Received accessible video analysis response (first 300 chars): {response_text[:300]}...") # Handle potential markdown formatting if response_text.startswith("```json"): response_text = response_text.replace("```json", "").replace("```", "").strip() try: result = json.loads(response_text) except json.JSONDecodeError as e: logger.error(f"JSON parse error in accessible video analysis: {e}") # Try self-healing for this response result = await self._self_heal_accessible_video_response(response_text) # Validate required fields required_fields = ["method", "method_rationale", "dialogue_density", "placements"] for field in required_fields: if field not in result: raise ValueError(f"Missing required field in accessible video analysis: {field}") # Validate method value if result["method"] not in ["overlay", "pause_insert"]: raise ValueError(f"Invalid method value: {result['method']}") # Validate placements if len(result["placements"]) != len(ad_cue_durations): logger.warning( f"Placement count mismatch: got {len(result['placements'])}, " f"expected {len(ad_cue_durations)}" ) logger.info( f"Accessible video analysis complete: method={result['method']}, " f"dialogue_density={result['dialogue_density']:.2f}, " f"placements={len(result['placements'])}" ) return result except Exception as e: logger.error(f"Accessible video analysis failed: {type(e).__name__}: {str(e)}") raise finally: # Cleanup uploaded file if uploaded_file: try: await asyncio.to_thread(client.files.delete, name=uploaded_file.name) logger.info(f"Cleaned up uploaded file: {uploaded_file.name}") except Exception as e: logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}") async def _self_heal_accessible_video_response(self, invalid_response: str) -> dict[str, Any]: """Attempt to self-heal invalid JSON response from accessible video analysis""" logger.info("Attempting to self-heal accessible video analysis response") self_heal_prompt = f""" SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON. CRITICAL REQUIREMENTS: - The JSON MUST contain: method, method_rationale, dialogue_density, placements, total_added_duration, warnings - method must be either "overlay" or "pause_insert" - dialogue_density must be a number between 0 and 1 - placements must be an array of placement objects - Fix any JSON syntax errors (trailing commas, unterminated strings, etc.) Fix the JSON and return it: {invalid_response} """ try: response = await asyncio.to_thread( client.models.generate_content, model=self.model_name, contents=[genai.types.Part.from_text(text=self_heal_prompt)] ) response_text = response.text.strip() if response_text.startswith("```json"): response_text = response_text.replace("```json", "").replace("```", "").strip() result = json.loads(response_text) logger.info("Successfully self-healed accessible video analysis response") return result except Exception as e: logger.error(f"Self-heal attempt for accessible video analysis failed: {e}") raise ValueError("Failed to get valid JSON from accessible video analysis after self-heal") async def transcreate_content( self, captions_vtt: str, ad_vtt: str, target_language: str, brief: Optional[str] = None, _cost_ctx: Optional[dict] = None, ) -> dict[str, str]: """ Transcreate English VTT content to target language with cultural adaptation """ prompt_template = self._load_prompt("gemini_transcreation.md") # Format prompt with actual content prompt = prompt_template.format( TARGET_LANGUAGE=target_language ) user_prompt = f""" Input: - captions_vtt_en: {captions_vtt} - ad_vtt_en: {ad_vtt} - brief: {brief or "No specific brand guidelines provided"} Output: JSON: """ try: _t0 = time.monotonic() response = await asyncio.to_thread( client.models.generate_content, model=self.model_name, contents=[ genai.types.Part.from_text(text=prompt + "\n\n" + user_prompt) ] ) if _cost_ctx: asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000))) response_text = response.text.strip() # Handle potential markdown formatting if response_text.startswith("```json"): response_text = response_text.replace("```json", "").replace("```", "").strip() result = json.loads(response_text) # Validate required fields if "captions_vtt" not in result or "audio_description_vtt" not in result: raise ValueError("Missing required VTT fields in transcreation response") logger.info(f"Successfully transcreated content to {target_language}") return result except json.JSONDecodeError as e: logger.error(f"Failed to parse transcreation JSON response: {e}") raise ValueError("Invalid JSON response from transcreation") except Exception as e: logger.error(f"Transcreation failed: {e}") raise async def translate_vtt( self, vtt_content: str, target_language: str, source_language: str = "en", _cost_ctx: Optional[dict] = None, ) -> str: """ Translate VTT content using Gemini, preserving timing programmatically. Uses a two-step approach to guarantee timestamp integrity: 1. Send only the text cues (no timestamps) to Gemini as a numbered list 2. Apply translated texts back onto the original VTT using translate_preserving_timing() This avoids any possibility of Gemini drifting or altering timestamps. """ from ..lib.vtt import VTTParser, VTTEditor source_cues = VTTParser.parse(vtt_content) if not source_cues: logger.warning(f"No cues found in VTT for {target_language} translation") return vtt_content cue_count = len(source_cues) async def _attempt_translation(extra_instruction: str = "") -> list[str]: numbered_texts = "\n".join( f"{i + 1}. {cue.text.replace(chr(10), ' ')}" for i, cue in enumerate(source_cues) ) prompt = f"""Translate the following {cue_count} numbered text segments from {source_language} to {target_language}. REQUIREMENTS: - Return EXACTLY {cue_count} numbered lines, one translation per line - Format: "1. translated text", "2. translated text", etc. - Preserve speaker labels like [Speaker 1]: unchanged - Use natural, idiomatic {target_language} - Do NOT add any explanation, preamble, or extra lines{extra_instruction} Segments to translate: {numbered_texts}""" _t0 = time.monotonic() response = await asyncio.to_thread( client.models.generate_content, model=self.model_name, contents=[genai.types.Part.from_text(text=prompt)] ) if _cost_ctx: asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000))) return self._parse_numbered_translation(response.text.strip(), cue_count) try: translated_texts = await _attempt_translation() if len(translated_texts) != cue_count: logger.warning( f"Translation cue count mismatch for {target_language}: " f"expected {cue_count}, got {len(translated_texts)}. Retrying." ) translated_texts = await _attempt_translation( extra_instruction=f"\n- You MUST return exactly {cue_count} lines, no more, no less" ) if len(translated_texts) != cue_count: # Pad or truncate as last resort to avoid breaking downstream logger.warning( f"Retried translation still mismatched ({len(translated_texts)} vs {cue_count}). " f"Padding/truncating to match." ) if len(translated_texts) < cue_count: translated_texts.extend( source_cues[i].text for i in range(len(translated_texts), cue_count) ) else: translated_texts = translated_texts[:cue_count] result = VTTEditor.translate_preserving_timing(vtt_content, translated_texts) logger.info(f"Successfully translated VTT to {target_language} ({cue_count} cues)") return result except Exception as e: logger.error(f"Gemini translation failed for {target_language}: {e}") raise @staticmethod def _parse_numbered_translation(response_text: str, expected_count: int) -> list[str]: """Parse a numbered list response from Gemini into a list of translated texts.""" import re lines = response_text.strip().split("\n") results = [] for line in lines: line = line.strip() if not line: continue # Match "1. text", "1) text", or just text if already stripped match = re.match(r"^\d+[.)]\s+(.+)$", line) if match: results.append(match.group(1).strip()) elif results or re.match(r"^\d+[.)]", line) is None: # Non-numbered continuation line — append to last result or skip if results: results[-1] += " " + line return results async def rewrite_tts_cue( self, original_text: str, language: str = "en", _cost_ctx: Optional[dict] = None, ) -> str: """ Rewrite an audio description cue to be TTS-friendly. Called when TTS synthesis fails for a cue after retries. Uses Gemini to rephrase the text while preserving the visual information being described. Args: original_text: The cue text that failed TTS synthesis language: Language code for context (default: 'en') Returns: Rewritten text optimized for TTS synthesis """ prompt_template = self._load_prompt("gemini_tts_rewrite.md") prompt = prompt_template.replace( "{ORIGINAL_TEXT}", original_text ).replace( "{LANGUAGE}", language ) try: logger.info(f"Rewriting TTS cue for safety: '{original_text[:50]}...'") _t0 = time.monotonic() response = await asyncio.to_thread( client.models.generate_content, model=self.model_name, contents=[genai.types.Part.from_text(text=prompt)] ) if _cost_ctx: asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000))) result = response.text.strip() # Remove any markdown formatting or quotes that Gemini might add if result.startswith("```"): lines = result.split("\n") filtered_lines = [ line for line in lines if not line.strip().startswith("```") ] result = "\n".join(filtered_lines).strip() # Remove surrounding quotes if present if result.startswith('"') and result.endswith('"'): result = result[1:-1] if result.startswith("'") and result.endswith("'"): result = result[1:-1] logger.info(f"Rewrote TTS cue: '{original_text[:30]}...' -> '{result[:30]}...'") return result except Exception as e: logger.error(f"Failed to rewrite TTS cue: {e}") raise # Global service instance gemini_service = GeminiService()