Add a new "Video Native Mode" translation option that re-processes the video through Gemini for each target language, generating captions and audio descriptions directly from visual context. This produces more natural and culturally appropriate content compared to traditional VTT text translation. Changes: - Add translation_mode field to RequestedOutputs (video_native | traditional) - Create gemini_ingestion_targeted.md prompt for target language generation - Add extract_accessibility_targeted() method to Gemini service - Modify translate_and_synthesize task to handle both translation modes - Add Translation Mode UI selector in NewJob screen (video_native is default) - Remove transcreation UI (replaced by video_native mode) - Remove Google Translate service (replaced by Gemini translation) - Add LanguageSelector component with searchable dropdown 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
774 lines
33 KiB
Python
774 lines
33 KiB
Python
import json
|
|
import asyncio
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import google.genai as genai
|
|
|
|
from ..core.config import settings
|
|
from ..core.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# Configure Gemini client
|
|
client = genai.Client(api_key=settings.gemini_api_key)
|
|
|
|
class GeminiService:
|
|
def __init__(self):
|
|
self.model_name = 'gemini-3-pro-preview' # Gemini 3 Pro preview model
|
|
self.prompts_dir = Path(__file__).parent.parent / "prompts"
|
|
|
|
def _load_prompt(self, prompt_file: str) -> str:
|
|
"""Load prompt template from prompts directory"""
|
|
prompt_path = self.prompts_dir / prompt_file
|
|
try:
|
|
return prompt_path.read_text()
|
|
except FileNotFoundError:
|
|
logger.error(f"Prompt file not found: {prompt_file}")
|
|
raise
|
|
|
|
async def _wait_for_file_active(self, file_name: str, max_wait_seconds: int = 300) -> bool:
|
|
"""Wait for uploaded file to become ACTIVE state"""
|
|
wait_time = 1 # Start with 1 second
|
|
total_waited = 0
|
|
|
|
while total_waited < max_wait_seconds:
|
|
try:
|
|
# Get file status - use asyncio.to_thread to avoid blocking event loop
|
|
file_info = await asyncio.to_thread(client.files.get, name=file_name)
|
|
logger.info(f"File {file_name} status: {file_info.state} (waited {total_waited}s)")
|
|
|
|
if file_info.state == "ACTIVE":
|
|
logger.info(f"File {file_name} is now ACTIVE!")
|
|
return True
|
|
elif file_info.state == "FAILED":
|
|
logger.error(f"File {file_name} processing FAILED")
|
|
return False
|
|
|
|
# Wait with exponential backoff (max 30s)
|
|
logger.info(f"File not ready, waiting {wait_time}s...")
|
|
await asyncio.sleep(wait_time)
|
|
total_waited += wait_time
|
|
wait_time = min(wait_time * 1.5, 30) # Exponential backoff, max 30s
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking file status: {e}")
|
|
await asyncio.sleep(5) # Wait 5s on error
|
|
total_waited += 5
|
|
|
|
logger.error(f"File {file_name} did not become ACTIVE within {max_wait_seconds}s")
|
|
return False
|
|
|
|
async def extract_accessibility(self, video_file_path: str) -> dict[str, Any]:
|
|
"""
|
|
Extract captions and audio descriptions from video using Gemini 2.0
|
|
Returns structured JSON with transcript, captions VTT, and audio description VTT
|
|
"""
|
|
prompt = self._load_prompt("gemini_ingestion.md")
|
|
uploaded_file = None
|
|
|
|
try:
|
|
logger.info(f"Starting Gemini processing for video: {video_file_path}")
|
|
|
|
# Upload video file to Gemini using new API - use asyncio.to_thread to avoid blocking
|
|
logger.info("Uploading video file to Gemini API...")
|
|
uploaded_file = await asyncio.to_thread(
|
|
client.files.upload,
|
|
file=video_file_path,
|
|
config={
|
|
"display_name": f"video_processing_{Path(video_file_path).name}",
|
|
"mime_type": "video/mp4"
|
|
}
|
|
)
|
|
logger.info(f"Successfully uploaded file: {uploaded_file.name} (URI: {uploaded_file.uri})")
|
|
|
|
# Wait for file to become ACTIVE before using it
|
|
logger.info("Waiting for file to become ACTIVE...")
|
|
file_ready = await self._wait_for_file_active(uploaded_file.name)
|
|
if not file_ready:
|
|
raise Exception("File failed to become ACTIVE within timeout")
|
|
|
|
# Generate content using new API - use asyncio.to_thread to avoid blocking
|
|
logger.info("Generating content with Gemini model...")
|
|
response = await asyncio.to_thread(
|
|
client.models.generate_content,
|
|
model=self.model_name,
|
|
contents=[
|
|
genai.types.Part.from_text(text=prompt),
|
|
genai.types.Part.from_uri(
|
|
file_uri=uploaded_file.uri,
|
|
mime_type=uploaded_file.mime_type
|
|
)
|
|
]
|
|
)
|
|
|
|
# Parse JSON response
|
|
response_text = response.text.strip()
|
|
logger.info(f"Received Gemini response (first 200 chars): {response_text[:200]}...")
|
|
|
|
# Handle potential markdown formatting
|
|
if response_text.startswith("```json"):
|
|
response_text = response_text.replace("```json", "").replace("```", "").strip()
|
|
logger.info("Cleaned markdown formatting from response")
|
|
|
|
# Additional cleanup for common JSON issues
|
|
response_text = response_text.strip()
|
|
|
|
logger.info("Parsing JSON response...")
|
|
try:
|
|
result = json.loads(response_text)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"JSON parse error at position {e.pos}: {e.msg}")
|
|
# Log the problematic area
|
|
start = max(0, e.pos - 100)
|
|
end = min(len(response_text), e.pos + 100)
|
|
problematic_text = response_text[start:end]
|
|
logger.error(f"Problematic JSON area: ...{problematic_text}...")
|
|
raise
|
|
|
|
# Validate required fields
|
|
required_fields = [
|
|
"language", "confidence", "summary",
|
|
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
|
|
]
|
|
|
|
for field in required_fields:
|
|
if field not in result:
|
|
raise ValueError(f"Missing required field: {field}")
|
|
|
|
# Validate VTT format
|
|
if not result["captions_vtt"].startswith("WEBVTT"):
|
|
raise ValueError("Invalid captions VTT format")
|
|
|
|
if not result["audio_description_vtt"].startswith("WEBVTT"):
|
|
raise ValueError("Invalid audio description VTT format")
|
|
|
|
logger.info(
|
|
f"Successfully extracted accessibility content with confidence: {result['confidence']}"
|
|
)
|
|
|
|
return result
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse Gemini JSON response: {e}")
|
|
logger.error(f"Raw response that failed to parse: {response_text}")
|
|
# Attempt self-healing
|
|
return await self._self_heal_response(video_file_path, response_text)
|
|
except Exception as e:
|
|
logger.error(f"Gemini extraction failed with exception: {type(e).__name__}: {str(e)}")
|
|
logger.error(f"Video file path: {video_file_path}")
|
|
# Print to stdout for immediate visibility
|
|
print(f"🚨 GEMINI ERROR: {type(e).__name__}: {str(e)}")
|
|
raise
|
|
finally:
|
|
# Guaranteed cleanup of uploaded file regardless of success/failure/cancellation
|
|
if uploaded_file:
|
|
try:
|
|
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
|
|
logger.info(f"Successfully cleaned up uploaded file: {uploaded_file.name}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
|
|
|
|
async def _self_heal_response(self, video_file_path: str, invalid_response: str) -> dict[str, Any]:
|
|
"""Attempt to self-heal invalid JSON response from Gemini"""
|
|
logger.info("Attempting to self-heal JSON response without re-uploading video")
|
|
|
|
# Try to fix common JSON issues first
|
|
try:
|
|
fixed_response = self._attempt_json_fix(invalid_response)
|
|
if fixed_response:
|
|
logger.info("Successfully fixed JSON without re-processing")
|
|
return fixed_response
|
|
except Exception as e:
|
|
logger.warning(f"JSON fix attempt failed: {e}")
|
|
|
|
# If simple fixes don't work, try a text-only self-heal prompt with more context
|
|
self_heal_prompt = f"""
|
|
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
|
|
|
|
CRITICAL REQUIREMENTS:
|
|
- The JSON MUST contain these exact fields: language, confidence, summary, transcript_plaintext, captions_vtt, audio_description_vtt
|
|
- If audio_description_vtt is truncated or missing, reconstruct it as a valid WebVTT with at least basic descriptions
|
|
- All VTT content must start with "WEBVTT" and have proper timestamp format (HH:MM:SS.mmm --> HH:MM:SS.mmm)
|
|
- Properly escape all quotes within strings using \"
|
|
- Fix unterminated strings by adding closing quotes
|
|
- Remove trailing commas
|
|
- Ensure all JSON is properly closed with }}
|
|
|
|
Fix the JSON and return it:
|
|
|
|
{invalid_response}
|
|
"""
|
|
|
|
try:
|
|
response = await asyncio.to_thread(
|
|
client.models.generate_content,
|
|
model=self.model_name,
|
|
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
|
|
)
|
|
|
|
response_text = response.text.strip()
|
|
|
|
# Handle potential markdown formatting
|
|
if response_text.startswith("```json"):
|
|
response_text = response_text.replace("```json", "").replace("```", "").strip()
|
|
|
|
result = json.loads(response_text)
|
|
|
|
# Validate that all required fields are present after healing
|
|
required_fields = [
|
|
"language", "confidence", "summary",
|
|
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
|
|
]
|
|
|
|
missing_fields = [field for field in required_fields if field not in result]
|
|
if missing_fields:
|
|
logger.error(f"Self-heal lost required fields: {missing_fields}")
|
|
# If audio_description_vtt is missing, create a basic one
|
|
if "audio_description_vtt" in missing_fields:
|
|
logger.info("Creating fallback audio_description_vtt")
|
|
result["audio_description_vtt"] = "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\nVideo content with visual elements described."
|
|
|
|
# If other critical fields are missing, raise an error
|
|
remaining_missing = [f for f in missing_fields if f != "audio_description_vtt"]
|
|
if remaining_missing:
|
|
raise ValueError(f"Self-heal failed to preserve required fields: {remaining_missing}")
|
|
|
|
logger.info("Successfully self-healed Gemini response with all required fields")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Self-heal attempt failed: {e}")
|
|
raise ValueError("Failed to get valid JSON from Gemini after self-heal attempt")
|
|
|
|
async def extract_accessibility_targeted(
|
|
self,
|
|
video_file_path: str,
|
|
target_language: str
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Extract captions and audio descriptions from video using Gemini,
|
|
generating content directly in the specified target language.
|
|
|
|
Unlike extract_accessibility() which auto-detects language, this method
|
|
takes an explicit target language and generates all outputs in that language.
|
|
This is used for "video_native" translation mode which re-processes the video
|
|
for each target language with full visual context.
|
|
|
|
Args:
|
|
video_file_path: Path to the video file
|
|
target_language: BCP-47 language code (e.g., "es", "fr", "de")
|
|
|
|
Returns:
|
|
Structured JSON with transcript, captions VTT, and audio description VTT
|
|
all in the target language
|
|
"""
|
|
prompt_template = self._load_prompt("gemini_ingestion_targeted.md")
|
|
prompt = prompt_template.replace("{TARGET_LANGUAGE}", target_language)
|
|
uploaded_file = None
|
|
|
|
try:
|
|
logger.info(f"Starting Gemini targeted processing for video: {video_file_path}, target: {target_language}")
|
|
|
|
# Upload video file to Gemini using new API
|
|
logger.info("Uploading video file to Gemini API for targeted extraction...")
|
|
uploaded_file = await asyncio.to_thread(
|
|
client.files.upload,
|
|
file=video_file_path,
|
|
config={
|
|
"display_name": f"video_processing_targeted_{target_language}_{Path(video_file_path).name}",
|
|
"mime_type": "video/mp4"
|
|
}
|
|
)
|
|
logger.info(f"Successfully uploaded file: {uploaded_file.name} (URI: {uploaded_file.uri})")
|
|
|
|
# Wait for file to become ACTIVE before using it
|
|
logger.info("Waiting for file to become ACTIVE...")
|
|
file_ready = await self._wait_for_file_active(uploaded_file.name)
|
|
if not file_ready:
|
|
raise Exception("File failed to become ACTIVE within timeout")
|
|
|
|
# Generate content using new API
|
|
logger.info(f"Generating content with Gemini model for {target_language}...")
|
|
response = await asyncio.to_thread(
|
|
client.models.generate_content,
|
|
model=self.model_name,
|
|
contents=[
|
|
genai.types.Part.from_text(text=prompt),
|
|
genai.types.Part.from_uri(
|
|
file_uri=uploaded_file.uri,
|
|
mime_type=uploaded_file.mime_type
|
|
)
|
|
]
|
|
)
|
|
|
|
# Parse JSON response
|
|
response_text = response.text.strip()
|
|
logger.info(f"Received Gemini targeted response for {target_language} (first 200 chars): {response_text[:200]}...")
|
|
|
|
# Handle potential markdown formatting
|
|
if response_text.startswith("```json"):
|
|
response_text = response_text.replace("```json", "").replace("```", "").strip()
|
|
logger.info("Cleaned markdown formatting from response")
|
|
|
|
response_text = response_text.strip()
|
|
|
|
logger.info("Parsing JSON response...")
|
|
try:
|
|
result = json.loads(response_text)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"JSON parse error at position {e.pos}: {e.msg}")
|
|
start = max(0, e.pos - 100)
|
|
end = min(len(response_text), e.pos + 100)
|
|
problematic_text = response_text[start:end]
|
|
logger.error(f"Problematic JSON area: ...{problematic_text}...")
|
|
# Attempt self-healing
|
|
return await self._self_heal_targeted_response(target_language, response_text)
|
|
|
|
# Validate required fields
|
|
required_fields = [
|
|
"language", "confidence", "summary",
|
|
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
|
|
]
|
|
|
|
for field in required_fields:
|
|
if field not in result:
|
|
raise ValueError(f"Missing required field: {field}")
|
|
|
|
# Validate VTT format
|
|
if not result["captions_vtt"].startswith("WEBVTT"):
|
|
raise ValueError("Invalid captions VTT format")
|
|
|
|
if not result["audio_description_vtt"].startswith("WEBVTT"):
|
|
raise ValueError("Invalid audio description VTT format")
|
|
|
|
logger.info(
|
|
f"Successfully extracted targeted accessibility content for {target_language} "
|
|
f"with confidence: {result['confidence']}"
|
|
)
|
|
|
|
return result
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse Gemini JSON response: {e}")
|
|
logger.error(f"Raw response that failed to parse: {response_text}")
|
|
return await self._self_heal_targeted_response(target_language, response_text)
|
|
except Exception as e:
|
|
logger.error(f"Gemini targeted extraction failed for {target_language}: {type(e).__name__}: {str(e)}")
|
|
logger.error(f"Video file path: {video_file_path}")
|
|
print(f"🚨 GEMINI TARGETED ERROR ({target_language}): {type(e).__name__}: {str(e)}")
|
|
raise
|
|
finally:
|
|
# Cleanup uploaded file
|
|
if uploaded_file:
|
|
try:
|
|
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
|
|
logger.info(f"Successfully cleaned up uploaded file: {uploaded_file.name}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
|
|
|
|
async def _self_heal_targeted_response(
|
|
self,
|
|
target_language: str,
|
|
invalid_response: str
|
|
) -> dict[str, Any]:
|
|
"""Attempt to self-heal invalid JSON response from targeted extraction"""
|
|
logger.info(f"Attempting to self-heal targeted response for {target_language}")
|
|
|
|
# Try to fix common JSON issues first
|
|
try:
|
|
fixed_response = self._attempt_json_fix(invalid_response)
|
|
if fixed_response:
|
|
logger.info("Successfully fixed JSON without re-processing")
|
|
return fixed_response
|
|
except Exception as e:
|
|
logger.warning(f"JSON fix attempt failed: {e}")
|
|
|
|
self_heal_prompt = f"""
|
|
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
|
|
|
|
CRITICAL REQUIREMENTS:
|
|
- The JSON MUST contain these exact fields: language, confidence, summary, transcript_plaintext, captions_vtt, audio_description_vtt
|
|
- All content should be in {target_language}
|
|
- If audio_description_vtt is truncated or missing, reconstruct it as a valid WebVTT with at least basic descriptions in {target_language}
|
|
- All VTT content must start with "WEBVTT" and have proper timestamp format (HH:MM:SS.mmm --> HH:MM:SS.mmm)
|
|
- Properly escape all quotes within strings using \"
|
|
- Fix unterminated strings by adding closing quotes
|
|
- Remove trailing commas
|
|
- Ensure all JSON is properly closed with }}
|
|
|
|
Fix the JSON and return it:
|
|
|
|
{invalid_response}
|
|
"""
|
|
|
|
try:
|
|
response = await asyncio.to_thread(
|
|
client.models.generate_content,
|
|
model=self.model_name,
|
|
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
|
|
)
|
|
|
|
response_text = response.text.strip()
|
|
|
|
if response_text.startswith("```json"):
|
|
response_text = response_text.replace("```json", "").replace("```", "").strip()
|
|
|
|
result = json.loads(response_text)
|
|
|
|
required_fields = [
|
|
"language", "confidence", "summary",
|
|
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
|
|
]
|
|
|
|
missing_fields = [field for field in required_fields if field not in result]
|
|
if missing_fields:
|
|
logger.error(f"Self-heal lost required fields: {missing_fields}")
|
|
if "audio_description_vtt" in missing_fields:
|
|
logger.info("Creating fallback audio_description_vtt")
|
|
result["audio_description_vtt"] = "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\nVideo content with visual elements described."
|
|
|
|
remaining_missing = [f for f in missing_fields if f != "audio_description_vtt"]
|
|
if remaining_missing:
|
|
raise ValueError(f"Self-heal failed to preserve required fields: {remaining_missing}")
|
|
|
|
logger.info(f"Successfully self-healed targeted response for {target_language}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Self-heal attempt failed for {target_language}: {e}")
|
|
raise ValueError(f"Failed to get valid JSON from Gemini targeted extraction for {target_language}")
|
|
|
|
def _attempt_json_fix(self, json_text: str) -> dict[str, Any] | None:
|
|
"""Attempt to fix common JSON syntax issues"""
|
|
# Try to identify and fix common issues
|
|
fixes_tried = []
|
|
fixed_text = json_text
|
|
import re
|
|
|
|
# Fix 1: Remove trailing commas
|
|
fixed_text = re.sub(r',(\s*[}\]])', r'\1', fixed_text)
|
|
fixes_tried.append("removed trailing commas")
|
|
|
|
# Fix 2: Try to fix unterminated strings by adding closing quote and brace
|
|
if fixed_text.count('"') % 2 != 0: # Odd number of quotes suggests unterminated string
|
|
# Find the last quote and see if we need to close the JSON
|
|
last_quote_pos = fixed_text.rfind('"')
|
|
remainder = fixed_text[last_quote_pos + 1:].strip()
|
|
|
|
# If there's no closing brace after the last quote, try to fix it
|
|
if remainder and not remainder.endswith('}'):
|
|
# Try to intelligently close the JSON
|
|
if 'audio_description_vtt' in fixed_text[max(0, last_quote_pos - 100):]:
|
|
# This appears to be in the audio_description_vtt field
|
|
fixed_text += '"\n}'
|
|
fixes_tried.append("closed unterminated audio_description_vtt string")
|
|
else:
|
|
fixed_text += '"'
|
|
fixes_tried.append("closed unterminated string")
|
|
|
|
# Fix 3: Ensure JSON ends with closing brace
|
|
if not fixed_text.rstrip().endswith('}'):
|
|
fixed_text = fixed_text.rstrip() + '\n}'
|
|
fixes_tried.append("added closing brace")
|
|
|
|
try:
|
|
result = json.loads(fixed_text)
|
|
logger.info(f"JSON fixed with: {', '.join(fixes_tried)}")
|
|
|
|
# Validate that we have the required fields
|
|
required_fields = [
|
|
"language", "confidence", "summary",
|
|
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
|
|
]
|
|
|
|
missing_fields = [field for field in required_fields if field not in result]
|
|
if missing_fields:
|
|
logger.warning(f"Fixed JSON is missing required fields: {missing_fields}")
|
|
return None # Let the more advanced self-healing handle this
|
|
|
|
return result
|
|
except json.JSONDecodeError as e:
|
|
logger.debug(f"JSON fix attempt failed: {e}")
|
|
return None
|
|
|
|
async def analyze_accessible_video_placement(
|
|
self,
|
|
video_file_path: str,
|
|
ad_vtt_content: str,
|
|
ad_cue_durations: list[float]
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Analyze video and determine optimal method for integrating audio descriptions.
|
|
Returns placement instructions for each AD cue.
|
|
|
|
Args:
|
|
video_file_path: Path to the source video file
|
|
ad_vtt_content: The audio description VTT content
|
|
ad_cue_durations: List of actual TTS audio durations in seconds (matching VTT cue order)
|
|
|
|
Returns:
|
|
Dictionary with method choice and placement instructions for each AD cue
|
|
"""
|
|
prompt_template = self._load_prompt("gemini_accessible_video.md")
|
|
|
|
# Format prompt with AD VTT content and durations
|
|
prompt = prompt_template.replace(
|
|
"{AD_VTT_CONTENT}", ad_vtt_content
|
|
).replace(
|
|
"{AD_CUE_DURATIONS}", json.dumps(ad_cue_durations)
|
|
)
|
|
|
|
uploaded_file = None
|
|
|
|
try:
|
|
logger.info(f"Starting accessible video analysis for: {video_file_path}")
|
|
logger.info(f"AD cues to place: {len(ad_cue_durations)}")
|
|
|
|
# Upload video file to Gemini
|
|
logger.info("Uploading video file to Gemini API for accessible video analysis...")
|
|
uploaded_file = await asyncio.to_thread(
|
|
client.files.upload,
|
|
file=video_file_path,
|
|
config={
|
|
"display_name": f"accessible_video_analysis_{Path(video_file_path).name}",
|
|
"mime_type": "video/mp4"
|
|
}
|
|
)
|
|
logger.info(f"Successfully uploaded file: {uploaded_file.name}")
|
|
|
|
# Wait for file to become ACTIVE
|
|
logger.info("Waiting for file to become ACTIVE...")
|
|
file_ready = await self._wait_for_file_active(uploaded_file.name)
|
|
if not file_ready:
|
|
raise Exception("File failed to become ACTIVE within timeout")
|
|
|
|
# Generate content with video and prompt
|
|
logger.info("Analyzing video with Gemini for accessible video placement...")
|
|
response = await asyncio.to_thread(
|
|
client.models.generate_content,
|
|
model=self.model_name,
|
|
contents=[
|
|
genai.types.Part.from_text(text=prompt),
|
|
genai.types.Part.from_uri(
|
|
file_uri=uploaded_file.uri,
|
|
mime_type=uploaded_file.mime_type
|
|
)
|
|
]
|
|
)
|
|
|
|
# Parse JSON response
|
|
response_text = response.text.strip()
|
|
logger.info(f"Received accessible video analysis response (first 300 chars): {response_text[:300]}...")
|
|
|
|
# Handle potential markdown formatting
|
|
if response_text.startswith("```json"):
|
|
response_text = response_text.replace("```json", "").replace("```", "").strip()
|
|
|
|
try:
|
|
result = json.loads(response_text)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"JSON parse error in accessible video analysis: {e}")
|
|
# Try self-healing for this response
|
|
result = await self._self_heal_accessible_video_response(response_text)
|
|
|
|
# Validate required fields
|
|
required_fields = ["method", "method_rationale", "dialogue_density", "placements"]
|
|
for field in required_fields:
|
|
if field not in result:
|
|
raise ValueError(f"Missing required field in accessible video analysis: {field}")
|
|
|
|
# Validate method value
|
|
if result["method"] not in ["overlay", "pause_insert"]:
|
|
raise ValueError(f"Invalid method value: {result['method']}")
|
|
|
|
# Validate placements
|
|
if len(result["placements"]) != len(ad_cue_durations):
|
|
logger.warning(
|
|
f"Placement count mismatch: got {len(result['placements'])}, "
|
|
f"expected {len(ad_cue_durations)}"
|
|
)
|
|
|
|
logger.info(
|
|
f"Accessible video analysis complete: method={result['method']}, "
|
|
f"dialogue_density={result['dialogue_density']:.2f}, "
|
|
f"placements={len(result['placements'])}"
|
|
)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Accessible video analysis failed: {type(e).__name__}: {str(e)}")
|
|
raise
|
|
finally:
|
|
# Cleanup uploaded file
|
|
if uploaded_file:
|
|
try:
|
|
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
|
|
logger.info(f"Cleaned up uploaded file: {uploaded_file.name}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
|
|
|
|
async def _self_heal_accessible_video_response(self, invalid_response: str) -> dict[str, Any]:
|
|
"""Attempt to self-heal invalid JSON response from accessible video analysis"""
|
|
logger.info("Attempting to self-heal accessible video analysis response")
|
|
|
|
self_heal_prompt = f"""
|
|
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
|
|
|
|
CRITICAL REQUIREMENTS:
|
|
- The JSON MUST contain: method, method_rationale, dialogue_density, placements, total_added_duration, warnings
|
|
- method must be either "overlay" or "pause_insert"
|
|
- dialogue_density must be a number between 0 and 1
|
|
- placements must be an array of placement objects
|
|
- Fix any JSON syntax errors (trailing commas, unterminated strings, etc.)
|
|
|
|
Fix the JSON and return it:
|
|
|
|
{invalid_response}
|
|
"""
|
|
|
|
try:
|
|
response = await asyncio.to_thread(
|
|
client.models.generate_content,
|
|
model=self.model_name,
|
|
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
|
|
)
|
|
|
|
response_text = response.text.strip()
|
|
if response_text.startswith("```json"):
|
|
response_text = response_text.replace("```json", "").replace("```", "").strip()
|
|
|
|
result = json.loads(response_text)
|
|
logger.info("Successfully self-healed accessible video analysis response")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Self-heal attempt for accessible video analysis failed: {e}")
|
|
raise ValueError("Failed to get valid JSON from accessible video analysis after self-heal")
|
|
|
|
async def transcreate_content(
|
|
self,
|
|
captions_vtt: str,
|
|
ad_vtt: str,
|
|
target_language: str,
|
|
brief: Optional[str] = None
|
|
) -> dict[str, str]:
|
|
"""
|
|
Transcreate English VTT content to target language with cultural adaptation
|
|
"""
|
|
prompt_template = self._load_prompt("gemini_transcreation.md")
|
|
|
|
# Format prompt with actual content
|
|
prompt = prompt_template.format(
|
|
TARGET_LANGUAGE=target_language
|
|
)
|
|
|
|
user_prompt = f"""
|
|
Input:
|
|
- captions_vtt_en: {captions_vtt}
|
|
- ad_vtt_en: {ad_vtt}
|
|
- brief: {brief or "No specific brand guidelines provided"}
|
|
|
|
Output:
|
|
JSON:
|
|
"""
|
|
|
|
try:
|
|
response = await asyncio.to_thread(
|
|
client.models.generate_content,
|
|
model=self.model_name,
|
|
contents=[
|
|
genai.types.Part.from_text(text=prompt + "\n\n" + user_prompt)
|
|
]
|
|
)
|
|
|
|
response_text = response.text.strip()
|
|
|
|
# Handle potential markdown formatting
|
|
if response_text.startswith("```json"):
|
|
response_text = response_text.replace("```json", "").replace("```", "").strip()
|
|
|
|
result = json.loads(response_text)
|
|
|
|
# Validate required fields
|
|
if "captions_vtt" not in result or "audio_description_vtt" not in result:
|
|
raise ValueError("Missing required VTT fields in transcreation response")
|
|
|
|
logger.info(f"Successfully transcreated content to {target_language}")
|
|
return result
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse transcreation JSON response: {e}")
|
|
raise ValueError("Invalid JSON response from transcreation")
|
|
except Exception as e:
|
|
logger.error(f"Transcreation failed: {e}")
|
|
raise
|
|
|
|
async def translate_vtt(
|
|
self,
|
|
vtt_content: str,
|
|
target_language: str,
|
|
source_language: str = "en"
|
|
) -> str:
|
|
"""
|
|
Translate VTT content using Gemini, preserving timing and structure.
|
|
More cost-effective alternative to Google Translate API (6-36x cheaper).
|
|
|
|
Args:
|
|
vtt_content: The VTT file content to translate
|
|
target_language: The language code to translate to (e.g., 'es', 'fr')
|
|
source_language: The source language code (default: 'en')
|
|
|
|
Returns:
|
|
Translated VTT content with preserved timestamps
|
|
"""
|
|
prompt = f"""Translate this WebVTT subtitle file from {source_language} to {target_language}.
|
|
|
|
CRITICAL REQUIREMENTS:
|
|
- Preserve ALL timestamps exactly as-is (do not modify any timing)
|
|
- Keep the WEBVTT header line
|
|
- Translate ONLY the text content between timestamps
|
|
- Maintain readable line lengths (~32-40 characters per line)
|
|
- Handle idioms and slang naturally in {target_language}
|
|
- Preserve any speaker labels (e.g., "[Speaker 1]:")
|
|
- Do NOT add any explanation or markdown - return ONLY the translated VTT
|
|
|
|
VTT Content to translate:
|
|
{vtt_content}"""
|
|
|
|
try:
|
|
response = await asyncio.to_thread(
|
|
client.models.generate_content,
|
|
model=self.model_name,
|
|
contents=[genai.types.Part.from_text(text=prompt)]
|
|
)
|
|
|
|
result = response.text.strip()
|
|
|
|
# Handle potential markdown formatting
|
|
if result.startswith("```"):
|
|
# Remove markdown code blocks
|
|
lines = result.split("\n")
|
|
# Filter out lines that are just ``` or ```vtt or ```webvtt
|
|
filtered_lines = [
|
|
line for line in lines
|
|
if not line.strip().startswith("```")
|
|
]
|
|
result = "\n".join(filtered_lines).strip()
|
|
|
|
# Validate VTT format
|
|
if not result.startswith("WEBVTT"):
|
|
logger.warning("Gemini translation missing WEBVTT header, adding it")
|
|
result = "WEBVTT\n\n" + result
|
|
|
|
logger.info(f"Successfully translated VTT to {target_language} using Gemini")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Gemini translation failed for {target_language}: {e}")
|
|
raise
|
|
|
|
|
|
# Global service instance
|
|
gemini_service = GeminiService()
|