video-accessibility/backend/app/services/gemini.py
Vadym Samoilenko ca312d48fa chore(lint): fix all ruff errors — 0 warnings remaining
- B904 (55): add `from err` / `from None` to raise-in-except across 13 files
- F821 (1): add missing HTTPException import in routes_language_qc.py
- F841 (7): remove unused variable assignments (current_user, job_title, tts_provider, etc.)
- W293 (13): strip trailing whitespace from blank lines
- C416 (4): rewrite unnecessary dict comprehensions as dict()
- C401 (1): rewrite unnecessary generator as set comprehension
- E701 (4): split multi-statement lines in cost_tracker.py
- E741 (1): rename ambiguous `l` to `lang` in cloud_run_dispatch.py
- B007 (4): prefix unused loop variables with _ in tts.py, video_renderer.py
- I001 (1): sort imports in tasks/__init__.py (move stdlib to top)
- E402 (3): move threading/time/signals imports to top of tasks/__init__.py
- UP042 (9): replace (str, Enum) with StrEnum in all model/schema enums

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 17:13:08 +01:00

1038 lines
46 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import time
from pathlib import Path
from typing import Any
import google.genai as genai
from ..core.config import settings
from ..core.logging import get_logger
from ..lib import locales as locale_lib
logger = get_logger(__name__)
# Configure Gemini client
client = genai.Client(api_key=settings.gemini_api_key)
async def _record_gemini_usage(
response,
model: str,
user_id: str,
job_id: str,
project_id: str | None,
elapsed_ms: int,
) -> None:
try:
from ..services import cost_tracker
usage = getattr(response, "usage_metadata", None)
if usage is None:
return
await cost_tracker.aio_record(
model=model,
provider="google",
user_external_id=user_id,
project_id=project_id,
job_external_id=job_id,
input_tokens=getattr(usage, "prompt_token_count", 0) or 0,
output_tokens=getattr(usage, "candidates_token_count", 0) or 0,
latency_ms=elapsed_ms,
)
except Exception as e:
logger.warning(f"Cost tracker record failed (non-fatal): {e}")
class GeminiService:
_fallback_models: list[str] = [
"gemini-3-flash-preview",
"gemini-2.5-pro",
]
def __init__(self):
self.model_name = 'gemini-3.1-pro-preview'
self.prompts_dir = Path(__file__).parent.parent / "prompts"
async def _generate(self, contents: Any, config: Any = None) -> tuple[Any, str]:
"""Call generate_content, falling back on 429/503 transient errors. Returns (response, model_used)."""
for model in [self.model_name, *self._fallback_models]:
try:
kw: dict[str, Any] = {"model": model, "contents": contents}
if config is not None:
kw["config"] = config
response = await asyncio.to_thread(client.models.generate_content, **kw)
if response.text is None:
logger.warning(f"Model {model!r} returned empty response (safety block or overload), trying next fallback")
last_exc: Exception = RuntimeError(f"Model {model!r} returned empty response")
continue
if model != self.model_name:
logger.warning(f"Used fallback model {model!r} (primary unavailable)")
return response, model
except Exception as exc:
msg = str(exc)
if "429" in msg or "RESOURCE_EXHAUSTED" in msg or "503" in msg or "UNAVAILABLE" in msg:
logger.warning(f"Model {model!r} unavailable, trying next fallback")
last_exc = exc
continue
raise
raise last_exc # noqa: F821 — set in loop above when all models exhausted
def _load_prompt(self, prompt_file: str) -> str:
"""Load prompt template from prompts directory"""
prompt_path = self.prompts_dir / prompt_file
try:
return prompt_path.read_text()
except FileNotFoundError:
logger.error(f"Prompt file not found: {prompt_file}")
raise
async def _wait_for_file_active(self, file_name: str, max_wait_seconds: int = 300) -> bool:
"""Wait for uploaded file to become ACTIVE state"""
wait_time = 1 # Start with 1 second
total_waited = 0
while total_waited < max_wait_seconds:
try:
# Get file status - use asyncio.to_thread to avoid blocking event loop
file_info = await asyncio.to_thread(client.files.get, name=file_name)
logger.info(f"File {file_name} status: {file_info.state} (waited {total_waited}s)")
if file_info.state == "ACTIVE":
logger.info(f"File {file_name} is now ACTIVE!")
return True
elif file_info.state == "FAILED":
logger.error(f"File {file_name} processing FAILED")
return False
# Wait with exponential backoff (max 30s)
logger.info(f"File not ready, waiting {wait_time}s...")
await asyncio.sleep(wait_time)
total_waited += wait_time
wait_time = min(wait_time * 1.5, 30) # Exponential backoff, max 30s
except Exception as e:
logger.error(f"Error checking file status: {e}")
await asyncio.sleep(5) # Wait 5s on error
total_waited += 5
logger.error(f"File {file_name} did not become ACTIVE within {max_wait_seconds}s")
return False
def _build_sdh_field(self, sdh_requested: bool) -> str:
if sdh_requested:
return "- sdh_captions_vtt: a valid WebVTT file as a single string, containing SDH-format captions (same timing as captions_vtt, but enriched with speaker labels, sound effects, and music notation)"
return ""
def _build_sdh_guidelines(self, sdh_requested: bool) -> str:
if not sdh_requested:
return ""
return """SDH (SUBTITLES FOR THE DEAF AND HARD OF HEARING) GUIDELINES:
Generate sdh_captions_vtt using the same cue timings as captions_vtt, enriched with:
- Speaker identification when multiple speakers are present: use "NAME:" prefix (e.g., "JOHN: Hello there") or "[NARRATOR]" for narration
- Non-speech sounds that are plot-relevant, in square brackets: [DOOR SLAMS], [PHONE RINGS], [CROWD CHEERING], [THUNDER]
- Music: use ♪ for background music cues (e.g., "♪ tense music ♪") or ♪ around sung lyrics
- Off-screen or voice-over speakers: indicate with "(off-screen)" or "[V.O.]" where relevant
- Non-speech vocalisations when relevant: [SIGHS], [LAUGHS], [SCREAMS]
- Maintain the same timestamp format as captions_vtt (HH:MM:SS.mmm --> HH:MM:SS.mmm)
- Only add sound effect cues where they add meaningful context; do not annotate every minor sound"""
def _build_glossary_block(self, glossary_block: str | None) -> str:
"""Return the pre-built glossary block (from glossary_service.build_glossary_prompt_block), or empty string."""
if glossary_block and glossary_block.strip():
return glossary_block.strip()
return ""
def _build_source_has_ad_block(self, source_has_ad: bool) -> str:
if source_has_ad:
return (
"SOURCE AUDIO DESCRIPTION NOTICE: This video already has professional audio descriptions "
"embedded in its audio track. "
"1) Return an empty audio_description_vtt containing only the WEBVTT header (\"WEBVTT\\n\") — do NOT generate new audio descriptions. "
"2) For captions_vtt: transcribe ONLY the original program dialogue and relevant sound effects. "
"Do NOT caption the audio description narration — AD narration is spoken during natural pauses "
"and describes visual scenes rather than being part of the original dialogue."
)
return ""
def _build_brand_context_block(self, brand_context: str | None) -> str:
"""Build the brand context instruction block for injection into prompts."""
if brand_context and brand_context.strip():
brands = [b.strip() for b in brand_context.split(",") if b.strip()]
if brands:
brand_list = ", ".join(f'"{b}"' for b in brands)
return (
f"The client has confirmed the following brand names appear in this video: {brand_list}. "
f"Use these exact brand names when you identify those products on screen."
)
return "No specific brand names have been provided for this video."
async def extract_accessibility(self, video_file_path: str, brand_context: str | None = None, sdh_requested: bool = False, glossary_block: str | None = None, source_has_ad: bool = False, _cost_ctx: dict | None = None) -> dict[str, Any]:
"""
Extract captions and audio descriptions from video using Gemini 2.0
Returns structured JSON with transcript, captions VTT, and audio description VTT
"""
prompt_template = self._load_prompt("gemini_ingestion.md")
prompt = (
prompt_template
.replace("{BRAND_CONTEXT}", self._build_brand_context_block(brand_context))
.replace("{GLOSSARY}", self._build_glossary_block(glossary_block))
.replace("{SDH_FIELD}", self._build_sdh_field(sdh_requested))
.replace("{SDH_GUIDELINES}", self._build_sdh_guidelines(sdh_requested))
.replace("{SOURCE_HAS_AD}", self._build_source_has_ad_block(source_has_ad))
)
uploaded_file = None
try:
logger.info(f"Starting Gemini processing for video: {video_file_path}")
# Upload video file to Gemini using new API - use asyncio.to_thread to avoid blocking
logger.info("Uploading video file to Gemini API...")
uploaded_file = await asyncio.to_thread(
client.files.upload,
file=video_file_path,
config={
"display_name": f"video_processing_{Path(video_file_path).name}",
"mime_type": "video/mp4"
}
)
logger.info(f"Successfully uploaded file: {uploaded_file.name} (URI: {uploaded_file.uri})")
# Wait for file to become ACTIVE before using it
logger.info("Waiting for file to become ACTIVE...")
file_ready = await self._wait_for_file_active(uploaded_file.name)
if not file_ready:
raise Exception("File failed to become ACTIVE within timeout")
# Generate content using new API - use asyncio.to_thread to avoid blocking
logger.info("Generating content with Gemini model...")
_t0 = time.monotonic()
response, _model_used = await self._generate(
contents=[
genai.types.Part.from_text(text=prompt),
genai.types.Part.from_uri(
file_uri=uploaded_file.uri,
mime_type=uploaded_file.mime_type
)
],
config=genai.types.GenerateContentConfig(
temperature=0.2,
top_p=0.8,
top_k=40,
),
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, _model_used, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
# Parse JSON response
response_text = response.text.strip()
logger.info(f"Received Gemini response (first 200 chars): {response_text[:200]}...")
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
logger.info("Cleaned markdown formatting from response")
# Additional cleanup for common JSON issues
response_text = response_text.strip()
logger.info("Parsing JSON response...")
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error at position {e.pos}: {e.msg}")
# Log the problematic area
start = max(0, e.pos - 100)
end = min(len(response_text), e.pos + 100)
problematic_text = response_text[start:end]
logger.error(f"Problematic JSON area: ...{problematic_text}...")
raise
# Validate required fields
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
for field in required_fields:
if field not in result:
raise ValueError(f"Missing required field: {field}")
# Validate VTT format
if not result["captions_vtt"].startswith("WEBVTT"):
raise ValueError("Invalid captions VTT format")
if not result["audio_description_vtt"].startswith("WEBVTT"):
raise ValueError("Invalid audio description VTT format")
logger.info(
f"Successfully extracted accessibility content with confidence: {result['confidence']}"
)
return result
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Gemini JSON response: {e}")
logger.error(f"Raw response that failed to parse: {response_text}")
# Attempt self-healing
return await self._self_heal_response(video_file_path, response_text)
except Exception as e:
logger.error(f"Gemini extraction failed with exception: {type(e).__name__}: {str(e)}")
logger.error(f"Video file path: {video_file_path}")
# Print to stdout for immediate visibility
print(f"🚨 GEMINI ERROR: {type(e).__name__}: {str(e)}")
raise
finally:
# Guaranteed cleanup of uploaded file regardless of success/failure/cancellation
if uploaded_file:
try:
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
logger.info(f"Successfully cleaned up uploaded file: {uploaded_file.name}")
except Exception as e:
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
async def _self_heal_response(self, video_file_path: str, invalid_response: str) -> dict[str, Any]:
"""Attempt to self-heal invalid JSON response from Gemini"""
logger.info("Attempting to self-heal JSON response without re-uploading video")
# Try to fix common JSON issues first
try:
fixed_response = self._attempt_json_fix(invalid_response)
if fixed_response:
logger.info("Successfully fixed JSON without re-processing")
return fixed_response
except Exception as e:
logger.warning(f"JSON fix attempt failed: {e}")
# If simple fixes don't work, try a text-only self-heal prompt with more context
self_heal_prompt = f"""
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
CRITICAL REQUIREMENTS:
- The JSON MUST contain these exact fields: language, confidence, summary, transcript_plaintext, captions_vtt, audio_description_vtt
- If audio_description_vtt is truncated or missing, reconstruct it as a valid WebVTT with at least basic descriptions
- All VTT content must start with "WEBVTT" and have proper timestamp format (HH:MM:SS.mmm --> HH:MM:SS.mmm)
- Properly escape all quotes within strings using \"
- Fix unterminated strings by adding closing quotes
- Remove trailing commas
- Ensure all JSON is properly closed with }}
Fix the JSON and return it:
{invalid_response}
"""
try:
response, _ = await self._generate(
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
)
response_text = response.text.strip()
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
result = json.loads(response_text)
# Validate that all required fields are present after healing
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
missing_fields = [field for field in required_fields if field not in result]
if missing_fields:
logger.error(f"Self-heal lost required fields: {missing_fields}")
# If audio_description_vtt is missing, create a basic one
if "audio_description_vtt" in missing_fields:
logger.info("Creating fallback audio_description_vtt")
result["audio_description_vtt"] = "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\nVideo content with visual elements described."
# If other critical fields are missing, raise an error
remaining_missing = [f for f in missing_fields if f != "audio_description_vtt"]
if remaining_missing:
raise ValueError(f"Self-heal failed to preserve required fields: {remaining_missing}")
logger.info("Successfully self-healed Gemini response with all required fields")
return result
except Exception as e:
logger.error(f"Self-heal attempt failed: {e}")
raise ValueError("Failed to get valid JSON from Gemini after self-heal attempt") from e
async def extract_accessibility_targeted(
self,
video_file_path: str,
target_language: str,
brand_context: str | None = None,
sdh_requested: bool = False,
glossary_block: str | None = None,
_cost_ctx: dict | None = None,
) -> dict[str, Any]:
"""
Extract captions and audio descriptions from video using Gemini,
generating content directly in the specified target language.
Unlike extract_accessibility() which auto-detects language, this method
takes an explicit target language and generates all outputs in that language.
This is used for "video_native" translation mode which re-processes the video
for each target language with full visual context.
Args:
video_file_path: Path to the video file
target_language: BCP-47 language code (e.g., "es", "fr", "de")
brand_context: Optional comma-separated brand names present in the video
Returns:
Structured JSON with transcript, captions VTT, and audio description VTT
all in the target language
"""
prompt_template = self._load_prompt("gemini_ingestion_targeted.md")
prompt = (
prompt_template
.replace("{TARGET_LANGUAGE}", locale_lib.get_gemini_label(target_language))
.replace("{BRAND_CONTEXT}", self._build_brand_context_block(brand_context))
.replace("{GLOSSARY}", self._build_glossary_block(glossary_block))
.replace("{SDH_FIELD}", self._build_sdh_field(sdh_requested))
.replace("{SDH_GUIDELINES}", self._build_sdh_guidelines(sdh_requested))
)
uploaded_file = None
try:
logger.info(f"Starting Gemini targeted processing for video: {video_file_path}, target: {target_language}")
# Upload video file to Gemini using new API
logger.info("Uploading video file to Gemini API for targeted extraction...")
uploaded_file = await asyncio.to_thread(
client.files.upload,
file=video_file_path,
config={
"display_name": f"video_processing_targeted_{target_language}_{Path(video_file_path).name}",
"mime_type": "video/mp4"
}
)
logger.info(f"Successfully uploaded file: {uploaded_file.name} (URI: {uploaded_file.uri})")
# Wait for file to become ACTIVE before using it
logger.info("Waiting for file to become ACTIVE...")
file_ready = await self._wait_for_file_active(uploaded_file.name)
if not file_ready:
raise Exception("File failed to become ACTIVE within timeout")
# Generate content using new API
logger.info(f"Generating content with Gemini model for {target_language}...")
_t0 = time.monotonic()
response, _model_used = await self._generate(
contents=[
genai.types.Part.from_text(text=prompt),
genai.types.Part.from_uri(
file_uri=uploaded_file.uri,
mime_type=uploaded_file.mime_type
)
]
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, _model_used, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
# Parse JSON response
response_text = response.text.strip()
logger.info(f"Received Gemini targeted response for {target_language} (first 200 chars): {response_text[:200]}...")
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
logger.info("Cleaned markdown formatting from response")
response_text = response_text.strip()
logger.info("Parsing JSON response...")
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error at position {e.pos}: {e.msg}")
start = max(0, e.pos - 100)
end = min(len(response_text), e.pos + 100)
problematic_text = response_text[start:end]
logger.error(f"Problematic JSON area: ...{problematic_text}...")
# Attempt self-healing
return await self._self_heal_targeted_response(target_language, response_text)
# Validate required fields
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
for field in required_fields:
if field not in result:
raise ValueError(f"Missing required field: {field}")
# Validate VTT format
if not result["captions_vtt"].startswith("WEBVTT"):
raise ValueError("Invalid captions VTT format")
if not result["audio_description_vtt"].startswith("WEBVTT"):
raise ValueError("Invalid audio description VTT format")
logger.info(
f"Successfully extracted targeted accessibility content for {target_language} "
f"with confidence: {result['confidence']}"
)
return result
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Gemini JSON response: {e}")
logger.error(f"Raw response that failed to parse: {response_text}")
return await self._self_heal_targeted_response(target_language, response_text)
except Exception as e:
logger.error(f"Gemini targeted extraction failed for {target_language}: {type(e).__name__}: {str(e)}")
logger.error(f"Video file path: {video_file_path}")
print(f"🚨 GEMINI TARGETED ERROR ({target_language}): {type(e).__name__}: {str(e)}")
raise
finally:
# Cleanup uploaded file
if uploaded_file:
try:
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
logger.info(f"Successfully cleaned up uploaded file: {uploaded_file.name}")
except Exception as e:
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
async def _self_heal_targeted_response(
self,
target_language: str,
invalid_response: str
) -> dict[str, Any]:
"""Attempt to self-heal invalid JSON response from targeted extraction"""
logger.info(f"Attempting to self-heal targeted response for {target_language}")
# Try to fix common JSON issues first
try:
fixed_response = self._attempt_json_fix(invalid_response)
if fixed_response:
logger.info("Successfully fixed JSON without re-processing")
return fixed_response
except Exception as e:
logger.warning(f"JSON fix attempt failed: {e}")
self_heal_prompt = f"""
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
CRITICAL REQUIREMENTS:
- The JSON MUST contain these exact fields: language, confidence, summary, transcript_plaintext, captions_vtt, audio_description_vtt
- All content should be in {target_language}
- If audio_description_vtt is truncated or missing, reconstruct it as a valid WebVTT with at least basic descriptions in {target_language}
- All VTT content must start with "WEBVTT" and have proper timestamp format (HH:MM:SS.mmm --> HH:MM:SS.mmm)
- Properly escape all quotes within strings using \"
- Fix unterminated strings by adding closing quotes
- Remove trailing commas
- Ensure all JSON is properly closed with }}
Fix the JSON and return it:
{invalid_response}
"""
try:
response, _ = await self._generate(
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
)
response_text = response.text.strip()
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
result = json.loads(response_text)
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
missing_fields = [field for field in required_fields if field not in result]
if missing_fields:
logger.error(f"Self-heal lost required fields: {missing_fields}")
if "audio_description_vtt" in missing_fields:
logger.info("Creating fallback audio_description_vtt")
result["audio_description_vtt"] = "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\nVideo content with visual elements described."
remaining_missing = [f for f in missing_fields if f != "audio_description_vtt"]
if remaining_missing:
raise ValueError(f"Self-heal failed to preserve required fields: {remaining_missing}")
logger.info(f"Successfully self-healed targeted response for {target_language}")
return result
except Exception as e:
logger.error(f"Self-heal attempt failed for {target_language}: {e}")
raise ValueError(f"Failed to get valid JSON from Gemini targeted extraction for {target_language}") from e
def _attempt_json_fix(self, json_text: str) -> dict[str, Any] | None:
"""Attempt to fix common JSON syntax issues"""
# Try to identify and fix common issues
fixes_tried = []
fixed_text = json_text
import re
# Fix 1: Remove trailing commas
fixed_text = re.sub(r',(\s*[}\]])', r'\1', fixed_text)
fixes_tried.append("removed trailing commas")
# Fix 2: Try to fix unterminated strings by adding closing quote and brace
if fixed_text.count('"') % 2 != 0: # Odd number of quotes suggests unterminated string
# Find the last quote and see if we need to close the JSON
last_quote_pos = fixed_text.rfind('"')
remainder = fixed_text[last_quote_pos + 1:].strip()
# If there's no closing brace after the last quote, try to fix it
if remainder and not remainder.endswith('}'):
# Try to intelligently close the JSON
if 'audio_description_vtt' in fixed_text[max(0, last_quote_pos - 100):]:
# This appears to be in the audio_description_vtt field
fixed_text += '"\n}'
fixes_tried.append("closed unterminated audio_description_vtt string")
else:
fixed_text += '"'
fixes_tried.append("closed unterminated string")
# Fix 3: Ensure JSON ends with closing brace
if not fixed_text.rstrip().endswith('}'):
fixed_text = fixed_text.rstrip() + '\n}'
fixes_tried.append("added closing brace")
try:
result = json.loads(fixed_text)
logger.info(f"JSON fixed with: {', '.join(fixes_tried)}")
# Validate that we have the required fields
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
missing_fields = [field for field in required_fields if field not in result]
if missing_fields:
logger.warning(f"Fixed JSON is missing required fields: {missing_fields}")
return None # Let the more advanced self-healing handle this
return result
except json.JSONDecodeError as e:
logger.debug(f"JSON fix attempt failed: {e}")
return None
async def analyze_accessible_video_placement(
self,
video_file_path: str,
ad_vtt_content: str,
ad_cue_durations: list[float]
) -> dict[str, Any]:
"""
DEPRECATED: This function is no longer called in the render pipeline.
Pause points are now derived from AD VTT cue start times and refined by Whisper.
Method selection (overlay/pause_insert) is done by user at QC Review approval.
This function is kept for potential future use or rollback scenarios.
See render_accessible_video._build_placements_from_ad_vtt() for the replacement logic.
---
Original description:
Analyze video and determine optimal method for integrating audio descriptions.
Returns placement instructions for each AD cue.
Args:
video_file_path: Path to the source video file
ad_vtt_content: The audio description VTT content
ad_cue_durations: List of actual TTS audio durations in seconds (matching VTT cue order)
Returns:
Dictionary with method choice and placement instructions for each AD cue
"""
import warnings
warnings.warn(
"analyze_accessible_video_placement is deprecated. "
"Pause points are now derived from AD VTT cue start times and refined by Whisper.",
DeprecationWarning,
stacklevel=2
)
prompt_template = self._load_prompt("gemini_accessible_video.md")
# Format prompt with AD VTT content and durations
prompt = prompt_template.replace(
"{AD_VTT_CONTENT}", ad_vtt_content
).replace(
"{AD_CUE_DURATIONS}", json.dumps(ad_cue_durations)
)
uploaded_file = None
try:
logger.info(f"Starting accessible video analysis for: {video_file_path}")
logger.info(f"AD cues to place: {len(ad_cue_durations)}")
# Upload video file to Gemini
logger.info("Uploading video file to Gemini API for accessible video analysis...")
uploaded_file = await asyncio.to_thread(
client.files.upload,
file=video_file_path,
config={
"display_name": f"accessible_video_analysis_{Path(video_file_path).name}",
"mime_type": "video/mp4"
}
)
logger.info(f"Successfully uploaded file: {uploaded_file.name}")
# Wait for file to become ACTIVE
logger.info("Waiting for file to become ACTIVE...")
file_ready = await self._wait_for_file_active(uploaded_file.name)
if not file_ready:
raise Exception("File failed to become ACTIVE within timeout")
# Generate content with video and prompt
logger.info("Analyzing video with Gemini for accessible video placement...")
response, _ = await self._generate(
contents=[
genai.types.Part.from_text(text=prompt),
genai.types.Part.from_uri(
file_uri=uploaded_file.uri,
mime_type=uploaded_file.mime_type
)
]
)
# Parse JSON response
response_text = response.text.strip()
logger.info(f"Received accessible video analysis response (first 300 chars): {response_text[:300]}...")
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error in accessible video analysis: {e}")
# Try self-healing for this response
result = await self._self_heal_accessible_video_response(response_text)
# Validate required fields
required_fields = ["method", "method_rationale", "dialogue_density", "placements"]
for field in required_fields:
if field not in result:
raise ValueError(f"Missing required field in accessible video analysis: {field}")
# Validate method value
if result["method"] not in ["overlay", "pause_insert"]:
raise ValueError(f"Invalid method value: {result['method']}")
# Validate placements
if len(result["placements"]) != len(ad_cue_durations):
logger.warning(
f"Placement count mismatch: got {len(result['placements'])}, "
f"expected {len(ad_cue_durations)}"
)
logger.info(
f"Accessible video analysis complete: method={result['method']}, "
f"dialogue_density={result['dialogue_density']:.2f}, "
f"placements={len(result['placements'])}"
)
return result
except Exception as e:
logger.error(f"Accessible video analysis failed: {type(e).__name__}: {str(e)}")
raise
finally:
# Cleanup uploaded file
if uploaded_file:
try:
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
logger.info(f"Cleaned up uploaded file: {uploaded_file.name}")
except Exception as e:
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
async def _self_heal_accessible_video_response(self, invalid_response: str) -> dict[str, Any]:
"""Attempt to self-heal invalid JSON response from accessible video analysis"""
logger.info("Attempting to self-heal accessible video analysis response")
self_heal_prompt = f"""
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
CRITICAL REQUIREMENTS:
- The JSON MUST contain: method, method_rationale, dialogue_density, placements, total_added_duration, warnings
- method must be either "overlay" or "pause_insert"
- dialogue_density must be a number between 0 and 1
- placements must be an array of placement objects
- Fix any JSON syntax errors (trailing commas, unterminated strings, etc.)
Fix the JSON and return it:
{invalid_response}
"""
try:
response, _ = await self._generate(
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
)
response_text = response.text.strip()
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
result = json.loads(response_text)
logger.info("Successfully self-healed accessible video analysis response")
return result
except Exception as e:
logger.error(f"Self-heal attempt for accessible video analysis failed: {e}")
raise ValueError("Failed to get valid JSON from accessible video analysis after self-heal") from e
async def transcreate_content(
self,
captions_vtt: str,
ad_vtt: str,
target_language: str,
brief: str | None = None,
glossary_block: str | None = None,
_cost_ctx: dict | None = None,
) -> dict[str, str]:
"""
Transcreate English VTT content to target language with cultural adaptation
"""
prompt_template = self._load_prompt("gemini_transcreation.md")
# Format prompt with actual content
prompt = prompt_template.format(
TARGET_LANGUAGE=locale_lib.get_gemini_label(target_language),
GLOSSARY=self._build_glossary_block(glossary_block),
)
user_prompt = f"""
Input:
- captions_vtt_en: {captions_vtt}
- ad_vtt_en: {ad_vtt}
- brief: {brief or "No specific brand guidelines provided"}
Output:
JSON:
"""
try:
_t0 = time.monotonic()
response, _model_used = await self._generate(
contents=[genai.types.Part.from_text(text=prompt + "\n\n" + user_prompt)]
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, _model_used, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
response_text = response.text.strip()
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
result = json.loads(response_text)
# Validate required fields
if "captions_vtt" not in result or "audio_description_vtt" not in result:
raise ValueError("Missing required VTT fields in transcreation response")
logger.info(f"Successfully transcreated content to {target_language}")
return result
except json.JSONDecodeError as e:
logger.error(f"Failed to parse transcreation JSON response: {e}")
raise ValueError("Invalid JSON response from transcreation") from e
except Exception as e:
logger.error(f"Transcreation failed: {e}")
raise
async def translate_vtt(
self,
vtt_content: str,
target_language: str,
source_language: str = "en",
glossary_block: str | None = None,
style: str = "literal",
_cost_ctx: dict | None = None,
) -> str:
"""
Translate VTT content using Gemini, preserving timing programmatically.
Uses a two-step approach to guarantee timestamp integrity:
1. Send only the text cues (no timestamps) to Gemini as a numbered list
2. Apply translated texts back onto the original VTT using translate_preserving_timing()
style="literal" — direct translation preserving meaning exactly
style="transcreate" — culturally adapted but still returns EXACTLY N cues 1:1
"""
from ..lib.vtt import VTTEditor, VTTParser
source_cues = VTTParser.parse(vtt_content)
if not source_cues:
logger.warning(f"No cues found in VTT for {target_language} translation")
return vtt_content
cue_count = len(source_cues)
_style_instruction = (
"- Culturally adapt the text for {tgt} audiences (brand voice, natural phrasing), "
"while keeping accessibility intent and line length (~3240 chars)\n"
if style == "transcreate"
else ""
)
async def _attempt_translation(extra_instruction: str = "") -> list[str]:
numbered_texts = "\n".join(
f"{i + 1}. {cue.text.replace(chr(10), ' ')}"
for i, cue in enumerate(source_cues)
)
_src_label = locale_lib.get_gemini_label(source_language)
_tgt_label = locale_lib.get_gemini_label(target_language)
_glossary_section = self._build_glossary_block(glossary_block)
_glossary_line = f"\n\n{_glossary_section}" if _glossary_section else ""
_glossary_req = (
"\n- MUST use the exact approved terms from the glossary below — these override natural translation choices, even for English terms"
if _glossary_section else ""
)
_adapt_line = _style_instruction.format(tgt=_tgt_label) if style == "transcreate" else ""
prompt = f"""Translate the following {cue_count} numbered text segments from {_src_label} to {_tgt_label}.
REQUIREMENTS:
- Return EXACTLY {cue_count} numbered lines, one translation per line
- Format: "1. translated text", "2. translated text", etc.
- Preserve speaker labels like [Speaker 1]: unchanged
- {_adapt_line}Use natural, idiomatic {_tgt_label}
- Do NOT add any explanation, preamble, or extra lines{extra_instruction}{_glossary_req}{_glossary_line}
Segments to translate:
{numbered_texts}"""
_t0 = time.monotonic()
response, _model_used = await self._generate(
contents=[genai.types.Part.from_text(text=prompt)]
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, _model_used, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
return self._parse_numbered_translation(response.text.strip(), cue_count)
try:
translated_texts = await _attempt_translation()
if len(translated_texts) != cue_count:
logger.warning(
f"Translation cue count mismatch for {target_language}: "
f"expected {cue_count}, got {len(translated_texts)}. Retrying."
)
translated_texts = await _attempt_translation(
extra_instruction=f"\n- You MUST return exactly {cue_count} lines, no more, no less"
)
if len(translated_texts) != cue_count:
# Pad or truncate as last resort to avoid breaking downstream
logger.warning(
f"Retried translation still mismatched ({len(translated_texts)} vs {cue_count}). "
f"Padding/truncating to match."
)
if len(translated_texts) < cue_count:
translated_texts.extend(
source_cues[i].text
for i in range(len(translated_texts), cue_count)
)
else:
translated_texts = translated_texts[:cue_count]
result = VTTEditor.translate_preserving_timing(vtt_content, translated_texts)
logger.info(f"Successfully translated VTT to {target_language} ({cue_count} cues)")
return result
except Exception as e:
logger.error(f"Gemini translation failed for {target_language}: {e}")
raise
@staticmethod
def _parse_numbered_translation(response_text: str, expected_count: int) -> list[str]:
"""Parse a numbered list response from Gemini into a list of translated texts."""
import re
lines = response_text.strip().split("\n")
results = []
for line in lines:
line = line.strip()
if not line:
continue
# Match "1. text", "1) text", or just text if already stripped
match = re.match(r"^\d+[.)]\s+(.+)$", line)
if match:
results.append(match.group(1).strip())
elif results or re.match(r"^\d+[.)]", line) is None:
# Non-numbered continuation line — append to last result or skip
if results:
results[-1] += " " + line
return results
async def rewrite_tts_cue(
self,
original_text: str,
language: str = "en",
_cost_ctx: dict | None = None,
) -> str:
"""
Rewrite an audio description cue to be TTS-friendly.
Called when TTS synthesis fails for a cue after retries. Uses Gemini
to rephrase the text while preserving the visual information being described.
Args:
original_text: The cue text that failed TTS synthesis
language: Language code for context (default: 'en')
Returns:
Rewritten text optimized for TTS synthesis
"""
prompt_template = self._load_prompt("gemini_tts_rewrite.md")
prompt = prompt_template.replace(
"{ORIGINAL_TEXT}", original_text
).replace(
"{LANGUAGE}", language
)
try:
logger.info(f"Rewriting TTS cue for safety: '{original_text[:50]}...'")
_t0 = time.monotonic()
response, _model_used = await self._generate(
contents=[genai.types.Part.from_text(text=prompt)]
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, _model_used, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
result = response.text.strip()
# Remove any markdown formatting or quotes that Gemini might add
if result.startswith("```"):
lines = result.split("\n")
filtered_lines = [
line for line in lines
if not line.strip().startswith("```")
]
result = "\n".join(filtered_lines).strip()
# Remove surrounding quotes if present
if result.startswith('"') and result.endswith('"'):
result = result[1:-1]
if result.startswith("'") and result.endswith("'"):
result = result[1:-1]
logger.info(f"Rewrote TTS cue: '{original_text[:30]}...' -> '{result[:30]}...'")
return result
except Exception as e:
logger.error(f"Failed to rewrite TTS cue: {e}")
raise
# Global service instance
gemini_service = GeminiService()