video-accessibility/backend/app/services/gemini.py
michael e44210ea64 feat: auto-rewrite TTS cues that fail synthesis
When TTS synthesis fails after 3 retries, the system now:
- Sends problematic cue text to Gemini for TTS-safe rewriting
- Updates the VTT file in GCS with rewritten text
- Retries TTS synthesis with the new text
- Records successful rewrites in job.tts_rewrites field

UI changes:
- JobDetail shows amber caution box with original/rewritten text
- JobsList shows warning icon next to jobs with rewrites
- Error display clarifies text shown is "after rewrite attempt"

Files changed:
- backend/app/models/job.py: Add tts_rewrites field
- backend/app/prompts/gemini_tts_rewrite.md: New prompt template
- backend/app/services/gemini.py: Add rewrite_tts_cue method
- backend/app/tasks/tts_synthesis.py: Add VTT update utilities
- backend/app/tasks/translate_and_synthesize.py: Rewrite+retry logic
- frontend/src/types/api.ts: Add TTSRewriteItem type
- frontend/src/routes/jobs/JobDetail.tsx: Caution display
- frontend/src/routes/jobs/JobsList.tsx: Warning indicator

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 14:42:50 -06:00

849 lines
36 KiB
Python

import json
import asyncio
from pathlib import Path
from typing import Any, Optional
import google.genai as genai
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger(__name__)
# Configure Gemini client
client = genai.Client(api_key=settings.gemini_api_key)
class GeminiService:
def __init__(self):
self.model_name = 'gemini-3-pro-preview' # Gemini 3 Pro preview model
self.prompts_dir = Path(__file__).parent.parent / "prompts"
def _load_prompt(self, prompt_file: str) -> str:
"""Load prompt template from prompts directory"""
prompt_path = self.prompts_dir / prompt_file
try:
return prompt_path.read_text()
except FileNotFoundError:
logger.error(f"Prompt file not found: {prompt_file}")
raise
async def _wait_for_file_active(self, file_name: str, max_wait_seconds: int = 300) -> bool:
"""Wait for uploaded file to become ACTIVE state"""
wait_time = 1 # Start with 1 second
total_waited = 0
while total_waited < max_wait_seconds:
try:
# Get file status - use asyncio.to_thread to avoid blocking event loop
file_info = await asyncio.to_thread(client.files.get, name=file_name)
logger.info(f"File {file_name} status: {file_info.state} (waited {total_waited}s)")
if file_info.state == "ACTIVE":
logger.info(f"File {file_name} is now ACTIVE!")
return True
elif file_info.state == "FAILED":
logger.error(f"File {file_name} processing FAILED")
return False
# Wait with exponential backoff (max 30s)
logger.info(f"File not ready, waiting {wait_time}s...")
await asyncio.sleep(wait_time)
total_waited += wait_time
wait_time = min(wait_time * 1.5, 30) # Exponential backoff, max 30s
except Exception as e:
logger.error(f"Error checking file status: {e}")
await asyncio.sleep(5) # Wait 5s on error
total_waited += 5
logger.error(f"File {file_name} did not become ACTIVE within {max_wait_seconds}s")
return False
async def extract_accessibility(self, video_file_path: str) -> dict[str, Any]:
"""
Extract captions and audio descriptions from video using Gemini 2.0
Returns structured JSON with transcript, captions VTT, and audio description VTT
"""
prompt = self._load_prompt("gemini_ingestion.md")
uploaded_file = None
try:
logger.info(f"Starting Gemini processing for video: {video_file_path}")
# Upload video file to Gemini using new API - use asyncio.to_thread to avoid blocking
logger.info("Uploading video file to Gemini API...")
uploaded_file = await asyncio.to_thread(
client.files.upload,
file=video_file_path,
config={
"display_name": f"video_processing_{Path(video_file_path).name}",
"mime_type": "video/mp4"
}
)
logger.info(f"Successfully uploaded file: {uploaded_file.name} (URI: {uploaded_file.uri})")
# Wait for file to become ACTIVE before using it
logger.info("Waiting for file to become ACTIVE...")
file_ready = await self._wait_for_file_active(uploaded_file.name)
if not file_ready:
raise Exception("File failed to become ACTIVE within timeout")
# Generate content using new API - use asyncio.to_thread to avoid blocking
logger.info("Generating content with Gemini model...")
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[
genai.types.Part.from_text(text=prompt),
genai.types.Part.from_uri(
file_uri=uploaded_file.uri,
mime_type=uploaded_file.mime_type
)
]
)
# Parse JSON response
response_text = response.text.strip()
logger.info(f"Received Gemini response (first 200 chars): {response_text[:200]}...")
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
logger.info("Cleaned markdown formatting from response")
# Additional cleanup for common JSON issues
response_text = response_text.strip()
logger.info("Parsing JSON response...")
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error at position {e.pos}: {e.msg}")
# Log the problematic area
start = max(0, e.pos - 100)
end = min(len(response_text), e.pos + 100)
problematic_text = response_text[start:end]
logger.error(f"Problematic JSON area: ...{problematic_text}...")
raise
# Validate required fields
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
for field in required_fields:
if field not in result:
raise ValueError(f"Missing required field: {field}")
# Validate VTT format
if not result["captions_vtt"].startswith("WEBVTT"):
raise ValueError("Invalid captions VTT format")
if not result["audio_description_vtt"].startswith("WEBVTT"):
raise ValueError("Invalid audio description VTT format")
logger.info(
f"Successfully extracted accessibility content with confidence: {result['confidence']}"
)
return result
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Gemini JSON response: {e}")
logger.error(f"Raw response that failed to parse: {response_text}")
# Attempt self-healing
return await self._self_heal_response(video_file_path, response_text)
except Exception as e:
logger.error(f"Gemini extraction failed with exception: {type(e).__name__}: {str(e)}")
logger.error(f"Video file path: {video_file_path}")
# Print to stdout for immediate visibility
print(f"🚨 GEMINI ERROR: {type(e).__name__}: {str(e)}")
raise
finally:
# Guaranteed cleanup of uploaded file regardless of success/failure/cancellation
if uploaded_file:
try:
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
logger.info(f"Successfully cleaned up uploaded file: {uploaded_file.name}")
except Exception as e:
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
async def _self_heal_response(self, video_file_path: str, invalid_response: str) -> dict[str, Any]:
"""Attempt to self-heal invalid JSON response from Gemini"""
logger.info("Attempting to self-heal JSON response without re-uploading video")
# Try to fix common JSON issues first
try:
fixed_response = self._attempt_json_fix(invalid_response)
if fixed_response:
logger.info("Successfully fixed JSON without re-processing")
return fixed_response
except Exception as e:
logger.warning(f"JSON fix attempt failed: {e}")
# If simple fixes don't work, try a text-only self-heal prompt with more context
self_heal_prompt = f"""
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
CRITICAL REQUIREMENTS:
- The JSON MUST contain these exact fields: language, confidence, summary, transcript_plaintext, captions_vtt, audio_description_vtt
- If audio_description_vtt is truncated or missing, reconstruct it as a valid WebVTT with at least basic descriptions
- All VTT content must start with "WEBVTT" and have proper timestamp format (HH:MM:SS.mmm --> HH:MM:SS.mmm)
- Properly escape all quotes within strings using \"
- Fix unterminated strings by adding closing quotes
- Remove trailing commas
- Ensure all JSON is properly closed with }}
Fix the JSON and return it:
{invalid_response}
"""
try:
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
)
response_text = response.text.strip()
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
result = json.loads(response_text)
# Validate that all required fields are present after healing
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
missing_fields = [field for field in required_fields if field not in result]
if missing_fields:
logger.error(f"Self-heal lost required fields: {missing_fields}")
# If audio_description_vtt is missing, create a basic one
if "audio_description_vtt" in missing_fields:
logger.info("Creating fallback audio_description_vtt")
result["audio_description_vtt"] = "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\nVideo content with visual elements described."
# If other critical fields are missing, raise an error
remaining_missing = [f for f in missing_fields if f != "audio_description_vtt"]
if remaining_missing:
raise ValueError(f"Self-heal failed to preserve required fields: {remaining_missing}")
logger.info("Successfully self-healed Gemini response with all required fields")
return result
except Exception as e:
logger.error(f"Self-heal attempt failed: {e}")
raise ValueError("Failed to get valid JSON from Gemini after self-heal attempt")
async def extract_accessibility_targeted(
self,
video_file_path: str,
target_language: str
) -> dict[str, Any]:
"""
Extract captions and audio descriptions from video using Gemini,
generating content directly in the specified target language.
Unlike extract_accessibility() which auto-detects language, this method
takes an explicit target language and generates all outputs in that language.
This is used for "video_native" translation mode which re-processes the video
for each target language with full visual context.
Args:
video_file_path: Path to the video file
target_language: BCP-47 language code (e.g., "es", "fr", "de")
Returns:
Structured JSON with transcript, captions VTT, and audio description VTT
all in the target language
"""
prompt_template = self._load_prompt("gemini_ingestion_targeted.md")
prompt = prompt_template.replace("{TARGET_LANGUAGE}", target_language)
uploaded_file = None
try:
logger.info(f"Starting Gemini targeted processing for video: {video_file_path}, target: {target_language}")
# Upload video file to Gemini using new API
logger.info("Uploading video file to Gemini API for targeted extraction...")
uploaded_file = await asyncio.to_thread(
client.files.upload,
file=video_file_path,
config={
"display_name": f"video_processing_targeted_{target_language}_{Path(video_file_path).name}",
"mime_type": "video/mp4"
}
)
logger.info(f"Successfully uploaded file: {uploaded_file.name} (URI: {uploaded_file.uri})")
# Wait for file to become ACTIVE before using it
logger.info("Waiting for file to become ACTIVE...")
file_ready = await self._wait_for_file_active(uploaded_file.name)
if not file_ready:
raise Exception("File failed to become ACTIVE within timeout")
# Generate content using new API
logger.info(f"Generating content with Gemini model for {target_language}...")
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[
genai.types.Part.from_text(text=prompt),
genai.types.Part.from_uri(
file_uri=uploaded_file.uri,
mime_type=uploaded_file.mime_type
)
]
)
# Parse JSON response
response_text = response.text.strip()
logger.info(f"Received Gemini targeted response for {target_language} (first 200 chars): {response_text[:200]}...")
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
logger.info("Cleaned markdown formatting from response")
response_text = response_text.strip()
logger.info("Parsing JSON response...")
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error at position {e.pos}: {e.msg}")
start = max(0, e.pos - 100)
end = min(len(response_text), e.pos + 100)
problematic_text = response_text[start:end]
logger.error(f"Problematic JSON area: ...{problematic_text}...")
# Attempt self-healing
return await self._self_heal_targeted_response(target_language, response_text)
# Validate required fields
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
for field in required_fields:
if field not in result:
raise ValueError(f"Missing required field: {field}")
# Validate VTT format
if not result["captions_vtt"].startswith("WEBVTT"):
raise ValueError("Invalid captions VTT format")
if not result["audio_description_vtt"].startswith("WEBVTT"):
raise ValueError("Invalid audio description VTT format")
logger.info(
f"Successfully extracted targeted accessibility content for {target_language} "
f"with confidence: {result['confidence']}"
)
return result
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Gemini JSON response: {e}")
logger.error(f"Raw response that failed to parse: {response_text}")
return await self._self_heal_targeted_response(target_language, response_text)
except Exception as e:
logger.error(f"Gemini targeted extraction failed for {target_language}: {type(e).__name__}: {str(e)}")
logger.error(f"Video file path: {video_file_path}")
print(f"🚨 GEMINI TARGETED ERROR ({target_language}): {type(e).__name__}: {str(e)}")
raise
finally:
# Cleanup uploaded file
if uploaded_file:
try:
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
logger.info(f"Successfully cleaned up uploaded file: {uploaded_file.name}")
except Exception as e:
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
async def _self_heal_targeted_response(
self,
target_language: str,
invalid_response: str
) -> dict[str, Any]:
"""Attempt to self-heal invalid JSON response from targeted extraction"""
logger.info(f"Attempting to self-heal targeted response for {target_language}")
# Try to fix common JSON issues first
try:
fixed_response = self._attempt_json_fix(invalid_response)
if fixed_response:
logger.info("Successfully fixed JSON without re-processing")
return fixed_response
except Exception as e:
logger.warning(f"JSON fix attempt failed: {e}")
self_heal_prompt = f"""
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
CRITICAL REQUIREMENTS:
- The JSON MUST contain these exact fields: language, confidence, summary, transcript_plaintext, captions_vtt, audio_description_vtt
- All content should be in {target_language}
- If audio_description_vtt is truncated or missing, reconstruct it as a valid WebVTT with at least basic descriptions in {target_language}
- All VTT content must start with "WEBVTT" and have proper timestamp format (HH:MM:SS.mmm --> HH:MM:SS.mmm)
- Properly escape all quotes within strings using \"
- Fix unterminated strings by adding closing quotes
- Remove trailing commas
- Ensure all JSON is properly closed with }}
Fix the JSON and return it:
{invalid_response}
"""
try:
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
)
response_text = response.text.strip()
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
result = json.loads(response_text)
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
missing_fields = [field for field in required_fields if field not in result]
if missing_fields:
logger.error(f"Self-heal lost required fields: {missing_fields}")
if "audio_description_vtt" in missing_fields:
logger.info("Creating fallback audio_description_vtt")
result["audio_description_vtt"] = "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\nVideo content with visual elements described."
remaining_missing = [f for f in missing_fields if f != "audio_description_vtt"]
if remaining_missing:
raise ValueError(f"Self-heal failed to preserve required fields: {remaining_missing}")
logger.info(f"Successfully self-healed targeted response for {target_language}")
return result
except Exception as e:
logger.error(f"Self-heal attempt failed for {target_language}: {e}")
raise ValueError(f"Failed to get valid JSON from Gemini targeted extraction for {target_language}")
def _attempt_json_fix(self, json_text: str) -> dict[str, Any] | None:
"""Attempt to fix common JSON syntax issues"""
# Try to identify and fix common issues
fixes_tried = []
fixed_text = json_text
import re
# Fix 1: Remove trailing commas
fixed_text = re.sub(r',(\s*[}\]])', r'\1', fixed_text)
fixes_tried.append("removed trailing commas")
# Fix 2: Try to fix unterminated strings by adding closing quote and brace
if fixed_text.count('"') % 2 != 0: # Odd number of quotes suggests unterminated string
# Find the last quote and see if we need to close the JSON
last_quote_pos = fixed_text.rfind('"')
remainder = fixed_text[last_quote_pos + 1:].strip()
# If there's no closing brace after the last quote, try to fix it
if remainder and not remainder.endswith('}'):
# Try to intelligently close the JSON
if 'audio_description_vtt' in fixed_text[max(0, last_quote_pos - 100):]:
# This appears to be in the audio_description_vtt field
fixed_text += '"\n}'
fixes_tried.append("closed unterminated audio_description_vtt string")
else:
fixed_text += '"'
fixes_tried.append("closed unterminated string")
# Fix 3: Ensure JSON ends with closing brace
if not fixed_text.rstrip().endswith('}'):
fixed_text = fixed_text.rstrip() + '\n}'
fixes_tried.append("added closing brace")
try:
result = json.loads(fixed_text)
logger.info(f"JSON fixed with: {', '.join(fixes_tried)}")
# Validate that we have the required fields
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
missing_fields = [field for field in required_fields if field not in result]
if missing_fields:
logger.warning(f"Fixed JSON is missing required fields: {missing_fields}")
return None # Let the more advanced self-healing handle this
return result
except json.JSONDecodeError as e:
logger.debug(f"JSON fix attempt failed: {e}")
return None
async def analyze_accessible_video_placement(
self,
video_file_path: str,
ad_vtt_content: str,
ad_cue_durations: list[float]
) -> dict[str, Any]:
"""
DEPRECATED: This function is no longer called in the render pipeline.
Pause points are now derived from AD VTT cue start times and refined by Whisper.
Method selection (overlay/pause_insert) is done by user at QC Review approval.
This function is kept for potential future use or rollback scenarios.
See render_accessible_video._build_placements_from_ad_vtt() for the replacement logic.
---
Original description:
Analyze video and determine optimal method for integrating audio descriptions.
Returns placement instructions for each AD cue.
Args:
video_file_path: Path to the source video file
ad_vtt_content: The audio description VTT content
ad_cue_durations: List of actual TTS audio durations in seconds (matching VTT cue order)
Returns:
Dictionary with method choice and placement instructions for each AD cue
"""
import warnings
warnings.warn(
"analyze_accessible_video_placement is deprecated. "
"Pause points are now derived from AD VTT cue start times and refined by Whisper.",
DeprecationWarning,
stacklevel=2
)
prompt_template = self._load_prompt("gemini_accessible_video.md")
# Format prompt with AD VTT content and durations
prompt = prompt_template.replace(
"{AD_VTT_CONTENT}", ad_vtt_content
).replace(
"{AD_CUE_DURATIONS}", json.dumps(ad_cue_durations)
)
uploaded_file = None
try:
logger.info(f"Starting accessible video analysis for: {video_file_path}")
logger.info(f"AD cues to place: {len(ad_cue_durations)}")
# Upload video file to Gemini
logger.info("Uploading video file to Gemini API for accessible video analysis...")
uploaded_file = await asyncio.to_thread(
client.files.upload,
file=video_file_path,
config={
"display_name": f"accessible_video_analysis_{Path(video_file_path).name}",
"mime_type": "video/mp4"
}
)
logger.info(f"Successfully uploaded file: {uploaded_file.name}")
# Wait for file to become ACTIVE
logger.info("Waiting for file to become ACTIVE...")
file_ready = await self._wait_for_file_active(uploaded_file.name)
if not file_ready:
raise Exception("File failed to become ACTIVE within timeout")
# Generate content with video and prompt
logger.info("Analyzing video with Gemini for accessible video placement...")
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[
genai.types.Part.from_text(text=prompt),
genai.types.Part.from_uri(
file_uri=uploaded_file.uri,
mime_type=uploaded_file.mime_type
)
]
)
# Parse JSON response
response_text = response.text.strip()
logger.info(f"Received accessible video analysis response (first 300 chars): {response_text[:300]}...")
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error in accessible video analysis: {e}")
# Try self-healing for this response
result = await self._self_heal_accessible_video_response(response_text)
# Validate required fields
required_fields = ["method", "method_rationale", "dialogue_density", "placements"]
for field in required_fields:
if field not in result:
raise ValueError(f"Missing required field in accessible video analysis: {field}")
# Validate method value
if result["method"] not in ["overlay", "pause_insert"]:
raise ValueError(f"Invalid method value: {result['method']}")
# Validate placements
if len(result["placements"]) != len(ad_cue_durations):
logger.warning(
f"Placement count mismatch: got {len(result['placements'])}, "
f"expected {len(ad_cue_durations)}"
)
logger.info(
f"Accessible video analysis complete: method={result['method']}, "
f"dialogue_density={result['dialogue_density']:.2f}, "
f"placements={len(result['placements'])}"
)
return result
except Exception as e:
logger.error(f"Accessible video analysis failed: {type(e).__name__}: {str(e)}")
raise
finally:
# Cleanup uploaded file
if uploaded_file:
try:
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
logger.info(f"Cleaned up uploaded file: {uploaded_file.name}")
except Exception as e:
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
async def _self_heal_accessible_video_response(self, invalid_response: str) -> dict[str, Any]:
"""Attempt to self-heal invalid JSON response from accessible video analysis"""
logger.info("Attempting to self-heal accessible video analysis response")
self_heal_prompt = f"""
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
CRITICAL REQUIREMENTS:
- The JSON MUST contain: method, method_rationale, dialogue_density, placements, total_added_duration, warnings
- method must be either "overlay" or "pause_insert"
- dialogue_density must be a number between 0 and 1
- placements must be an array of placement objects
- Fix any JSON syntax errors (trailing commas, unterminated strings, etc.)
Fix the JSON and return it:
{invalid_response}
"""
try:
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
)
response_text = response.text.strip()
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
result = json.loads(response_text)
logger.info("Successfully self-healed accessible video analysis response")
return result
except Exception as e:
logger.error(f"Self-heal attempt for accessible video analysis failed: {e}")
raise ValueError("Failed to get valid JSON from accessible video analysis after self-heal")
async def transcreate_content(
self,
captions_vtt: str,
ad_vtt: str,
target_language: str,
brief: Optional[str] = None
) -> dict[str, str]:
"""
Transcreate English VTT content to target language with cultural adaptation
"""
prompt_template = self._load_prompt("gemini_transcreation.md")
# Format prompt with actual content
prompt = prompt_template.format(
TARGET_LANGUAGE=target_language
)
user_prompt = f"""
Input:
- captions_vtt_en: {captions_vtt}
- ad_vtt_en: {ad_vtt}
- brief: {brief or "No specific brand guidelines provided"}
Output:
JSON:
"""
try:
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[
genai.types.Part.from_text(text=prompt + "\n\n" + user_prompt)
]
)
response_text = response.text.strip()
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
result = json.loads(response_text)
# Validate required fields
if "captions_vtt" not in result or "audio_description_vtt" not in result:
raise ValueError("Missing required VTT fields in transcreation response")
logger.info(f"Successfully transcreated content to {target_language}")
return result
except json.JSONDecodeError as e:
logger.error(f"Failed to parse transcreation JSON response: {e}")
raise ValueError("Invalid JSON response from transcreation")
except Exception as e:
logger.error(f"Transcreation failed: {e}")
raise
async def translate_vtt(
self,
vtt_content: str,
target_language: str,
source_language: str = "en"
) -> str:
"""
Translate VTT content using Gemini, preserving timing and structure.
More cost-effective alternative to Google Translate API (6-36x cheaper).
Args:
vtt_content: The VTT file content to translate
target_language: The language code to translate to (e.g., 'es', 'fr')
source_language: The source language code (default: 'en')
Returns:
Translated VTT content with preserved timestamps
"""
prompt = f"""Translate this WebVTT subtitle file from {source_language} to {target_language}.
CRITICAL REQUIREMENTS:
- Preserve ALL timestamps exactly as-is (do not modify any timing)
- Keep the WEBVTT header line
- Translate ONLY the text content between timestamps
- Maintain readable line lengths (~32-40 characters per line)
- Handle idioms and slang naturally in {target_language}
- Preserve any speaker labels (e.g., "[Speaker 1]:")
- Do NOT add any explanation or markdown - return ONLY the translated VTT
VTT Content to translate:
{vtt_content}"""
try:
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[genai.types.Part.from_text(text=prompt)]
)
result = response.text.strip()
# Handle potential markdown formatting
if result.startswith("```"):
# Remove markdown code blocks
lines = result.split("\n")
# Filter out lines that are just ``` or ```vtt or ```webvtt
filtered_lines = [
line for line in lines
if not line.strip().startswith("```")
]
result = "\n".join(filtered_lines).strip()
# Validate VTT format
if not result.startswith("WEBVTT"):
logger.warning("Gemini translation missing WEBVTT header, adding it")
result = "WEBVTT\n\n" + result
logger.info(f"Successfully translated VTT to {target_language} using Gemini")
return result
except Exception as e:
logger.error(f"Gemini translation failed for {target_language}: {e}")
raise
async def rewrite_tts_cue(
self,
original_text: str,
language: str = "en"
) -> str:
"""
Rewrite an audio description cue to be TTS-friendly.
Called when TTS synthesis fails for a cue after retries. Uses Gemini
to rephrase the text while preserving the visual information being described.
Args:
original_text: The cue text that failed TTS synthesis
language: Language code for context (default: 'en')
Returns:
Rewritten text optimized for TTS synthesis
"""
prompt_template = self._load_prompt("gemini_tts_rewrite.md")
prompt = prompt_template.replace(
"{ORIGINAL_TEXT}", original_text
).replace(
"{LANGUAGE}", language
)
try:
logger.info(f"Rewriting TTS cue for safety: '{original_text[:50]}...'")
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[genai.types.Part.from_text(text=prompt)]
)
result = response.text.strip()
# Remove any markdown formatting or quotes that Gemini might add
if result.startswith("```"):
lines = result.split("\n")
filtered_lines = [
line for line in lines
if not line.strip().startswith("```")
]
result = "\n".join(filtered_lines).strip()
# Remove surrounding quotes if present
if result.startswith('"') and result.endswith('"'):
result = result[1:-1]
if result.startswith("'") and result.endswith("'"):
result = result[1:-1]
logger.info(f"Rewrote TTS cue: '{original_text[:30]}...' -> '{result[:30]}...'")
return result
except Exception as e:
logger.error(f"Failed to rewrite TTS cue: {e}")
raise
# Global service instance
gemini_service = GeminiService()