video-accessibility/backend/app/services/gemini.py
Vadym Samoilenko ea21cace96 feat: replace SDK with direct HTTP integration to centralized cost tracker
- New services/cost_tracker.py: sync httpx preflight()/record() + async wrappers;
  BudgetExceeded exception; no-op when COST_TRACKER_BASE_URL is empty
- Preflight budget check added before ingestion (Gemini), per-language translation
  (video-native + traditional), and per-language TTS dispatch
- _record_gemini_usage and _record_tts_cost now call cost_tracker directly;
  removes broken asyncio.get_event_loop() hack from sync Celery worker
- Fix: _cost_ctx now threaded into extract_accessibility_targeted (video-native path)
- Fix: user_id/cost_project_id now propagated through dispatch_language_tts →
  synthesize_cue_task.s() and the rerender_accessible_video.py re-render path
- Remove oliver-cost-tracker SDK dependency (was commented-out/never installed)
- Drop cost_tracker_outbox_path setting and get_cost_tracker() factory
- Update COST_TRACKER_BASE_URL default to optical-dev.oliver.solutions in
  .env.prod.example, docker-compose.yml, and all Cloud Run service yamls
- Cloud Run yamls use Secret Manager ref (cost-tracker-api-key) for the API key

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 13:36:15 +01:00

985 lines
43 KiB
Python

import json
import asyncio
import time
from pathlib import Path
from typing import Any, Optional
import google.genai as genai
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger(__name__)
# Configure Gemini client
client = genai.Client(api_key=settings.gemini_api_key)
async def _record_gemini_usage(
response,
model: str,
user_id: str,
job_id: str,
project_id: Optional[str],
elapsed_ms: int,
) -> None:
try:
from ..services import cost_tracker
usage = getattr(response, "usage_metadata", None)
if usage is None:
return
await cost_tracker.aio_record(
model=model,
provider="google",
user_external_id=user_id,
project_id=project_id,
job_external_id=job_id,
input_tokens=getattr(usage, "prompt_token_count", 0) or 0,
output_tokens=getattr(usage, "candidates_token_count", 0) or 0,
latency_ms=elapsed_ms,
)
except Exception as e:
logger.warning(f"Cost tracker record failed (non-fatal): {e}")
class GeminiService:
def __init__(self):
self.model_name = 'gemini-3-pro-preview' # Gemini 3 Pro preview model
self.prompts_dir = Path(__file__).parent.parent / "prompts"
def _load_prompt(self, prompt_file: str) -> str:
"""Load prompt template from prompts directory"""
prompt_path = self.prompts_dir / prompt_file
try:
return prompt_path.read_text()
except FileNotFoundError:
logger.error(f"Prompt file not found: {prompt_file}")
raise
async def _wait_for_file_active(self, file_name: str, max_wait_seconds: int = 300) -> bool:
"""Wait for uploaded file to become ACTIVE state"""
wait_time = 1 # Start with 1 second
total_waited = 0
while total_waited < max_wait_seconds:
try:
# Get file status - use asyncio.to_thread to avoid blocking event loop
file_info = await asyncio.to_thread(client.files.get, name=file_name)
logger.info(f"File {file_name} status: {file_info.state} (waited {total_waited}s)")
if file_info.state == "ACTIVE":
logger.info(f"File {file_name} is now ACTIVE!")
return True
elif file_info.state == "FAILED":
logger.error(f"File {file_name} processing FAILED")
return False
# Wait with exponential backoff (max 30s)
logger.info(f"File not ready, waiting {wait_time}s...")
await asyncio.sleep(wait_time)
total_waited += wait_time
wait_time = min(wait_time * 1.5, 30) # Exponential backoff, max 30s
except Exception as e:
logger.error(f"Error checking file status: {e}")
await asyncio.sleep(5) # Wait 5s on error
total_waited += 5
logger.error(f"File {file_name} did not become ACTIVE within {max_wait_seconds}s")
return False
def _build_sdh_field(self, sdh_requested: bool) -> str:
if sdh_requested:
return "- sdh_captions_vtt: a valid WebVTT file as a single string, containing SDH-format captions (same timing as captions_vtt, but enriched with speaker labels, sound effects, and music notation)"
return ""
def _build_sdh_guidelines(self, sdh_requested: bool) -> str:
if not sdh_requested:
return ""
return """SDH (SUBTITLES FOR THE DEAF AND HARD OF HEARING) GUIDELINES:
Generate sdh_captions_vtt using the same cue timings as captions_vtt, enriched with:
- Speaker identification when multiple speakers are present: use "NAME:" prefix (e.g., "JOHN: Hello there") or "[NARRATOR]" for narration
- Non-speech sounds that are plot-relevant, in square brackets: [DOOR SLAMS], [PHONE RINGS], [CROWD CHEERING], [THUNDER]
- Music: use ♪ for background music cues (e.g., "♪ tense music ♪") or ♪ around sung lyrics
- Off-screen or voice-over speakers: indicate with "(off-screen)" or "[V.O.]" where relevant
- Non-speech vocalisations when relevant: [SIGHS], [LAUGHS], [SCREAMS]
- Maintain the same timestamp format as captions_vtt (HH:MM:SS.mmm --> HH:MM:SS.mmm)
- Only add sound effect cues where they add meaningful context; do not annotate every minor sound"""
def _build_brand_context_block(self, brand_context: Optional[str]) -> str:
"""Build the brand context instruction block for injection into prompts."""
if brand_context and brand_context.strip():
brands = [b.strip() for b in brand_context.split(",") if b.strip()]
if brands:
brand_list = ", ".join(f'"{b}"' for b in brands)
return (
f"The client has confirmed the following brand names appear in this video: {brand_list}. "
f"Use these exact brand names when you identify those products on screen."
)
return "No specific brand names have been provided for this video."
async def extract_accessibility(self, video_file_path: str, brand_context: Optional[str] = None, sdh_requested: bool = False, _cost_ctx: Optional[dict] = None) -> dict[str, Any]:
"""
Extract captions and audio descriptions from video using Gemini 2.0
Returns structured JSON with transcript, captions VTT, and audio description VTT
"""
prompt_template = self._load_prompt("gemini_ingestion.md")
prompt = (
prompt_template
.replace("{BRAND_CONTEXT}", self._build_brand_context_block(brand_context))
.replace("{SDH_FIELD}", self._build_sdh_field(sdh_requested))
.replace("{SDH_GUIDELINES}", self._build_sdh_guidelines(sdh_requested))
)
uploaded_file = None
try:
logger.info(f"Starting Gemini processing for video: {video_file_path}")
# Upload video file to Gemini using new API - use asyncio.to_thread to avoid blocking
logger.info("Uploading video file to Gemini API...")
uploaded_file = await asyncio.to_thread(
client.files.upload,
file=video_file_path,
config={
"display_name": f"video_processing_{Path(video_file_path).name}",
"mime_type": "video/mp4"
}
)
logger.info(f"Successfully uploaded file: {uploaded_file.name} (URI: {uploaded_file.uri})")
# Wait for file to become ACTIVE before using it
logger.info("Waiting for file to become ACTIVE...")
file_ready = await self._wait_for_file_active(uploaded_file.name)
if not file_ready:
raise Exception("File failed to become ACTIVE within timeout")
# Generate content using new API - use asyncio.to_thread to avoid blocking
logger.info("Generating content with Gemini model...")
_t0 = time.monotonic()
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[
genai.types.Part.from_text(text=prompt),
genai.types.Part.from_uri(
file_uri=uploaded_file.uri,
mime_type=uploaded_file.mime_type
)
],
config=genai.types.GenerateContentConfig(
temperature=0.2, # Lower temperature for consistent, deterministic AD output
top_p=0.8,
top_k=40,
),
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
# Parse JSON response
response_text = response.text.strip()
logger.info(f"Received Gemini response (first 200 chars): {response_text[:200]}...")
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
logger.info("Cleaned markdown formatting from response")
# Additional cleanup for common JSON issues
response_text = response_text.strip()
logger.info("Parsing JSON response...")
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error at position {e.pos}: {e.msg}")
# Log the problematic area
start = max(0, e.pos - 100)
end = min(len(response_text), e.pos + 100)
problematic_text = response_text[start:end]
logger.error(f"Problematic JSON area: ...{problematic_text}...")
raise
# Validate required fields
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
for field in required_fields:
if field not in result:
raise ValueError(f"Missing required field: {field}")
# Validate VTT format
if not result["captions_vtt"].startswith("WEBVTT"):
raise ValueError("Invalid captions VTT format")
if not result["audio_description_vtt"].startswith("WEBVTT"):
raise ValueError("Invalid audio description VTT format")
logger.info(
f"Successfully extracted accessibility content with confidence: {result['confidence']}"
)
return result
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Gemini JSON response: {e}")
logger.error(f"Raw response that failed to parse: {response_text}")
# Attempt self-healing
return await self._self_heal_response(video_file_path, response_text)
except Exception as e:
logger.error(f"Gemini extraction failed with exception: {type(e).__name__}: {str(e)}")
logger.error(f"Video file path: {video_file_path}")
# Print to stdout for immediate visibility
print(f"🚨 GEMINI ERROR: {type(e).__name__}: {str(e)}")
raise
finally:
# Guaranteed cleanup of uploaded file regardless of success/failure/cancellation
if uploaded_file:
try:
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
logger.info(f"Successfully cleaned up uploaded file: {uploaded_file.name}")
except Exception as e:
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
async def _self_heal_response(self, video_file_path: str, invalid_response: str) -> dict[str, Any]:
"""Attempt to self-heal invalid JSON response from Gemini"""
logger.info("Attempting to self-heal JSON response without re-uploading video")
# Try to fix common JSON issues first
try:
fixed_response = self._attempt_json_fix(invalid_response)
if fixed_response:
logger.info("Successfully fixed JSON without re-processing")
return fixed_response
except Exception as e:
logger.warning(f"JSON fix attempt failed: {e}")
# If simple fixes don't work, try a text-only self-heal prompt with more context
self_heal_prompt = f"""
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
CRITICAL REQUIREMENTS:
- The JSON MUST contain these exact fields: language, confidence, summary, transcript_plaintext, captions_vtt, audio_description_vtt
- If audio_description_vtt is truncated or missing, reconstruct it as a valid WebVTT with at least basic descriptions
- All VTT content must start with "WEBVTT" and have proper timestamp format (HH:MM:SS.mmm --> HH:MM:SS.mmm)
- Properly escape all quotes within strings using \"
- Fix unterminated strings by adding closing quotes
- Remove trailing commas
- Ensure all JSON is properly closed with }}
Fix the JSON and return it:
{invalid_response}
"""
try:
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
)
response_text = response.text.strip()
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
result = json.loads(response_text)
# Validate that all required fields are present after healing
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
missing_fields = [field for field in required_fields if field not in result]
if missing_fields:
logger.error(f"Self-heal lost required fields: {missing_fields}")
# If audio_description_vtt is missing, create a basic one
if "audio_description_vtt" in missing_fields:
logger.info("Creating fallback audio_description_vtt")
result["audio_description_vtt"] = "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\nVideo content with visual elements described."
# If other critical fields are missing, raise an error
remaining_missing = [f for f in missing_fields if f != "audio_description_vtt"]
if remaining_missing:
raise ValueError(f"Self-heal failed to preserve required fields: {remaining_missing}")
logger.info("Successfully self-healed Gemini response with all required fields")
return result
except Exception as e:
logger.error(f"Self-heal attempt failed: {e}")
raise ValueError("Failed to get valid JSON from Gemini after self-heal attempt")
async def extract_accessibility_targeted(
self,
video_file_path: str,
target_language: str,
brand_context: Optional[str] = None,
sdh_requested: bool = False,
_cost_ctx: Optional[dict] = None,
) -> dict[str, Any]:
"""
Extract captions and audio descriptions from video using Gemini,
generating content directly in the specified target language.
Unlike extract_accessibility() which auto-detects language, this method
takes an explicit target language and generates all outputs in that language.
This is used for "video_native" translation mode which re-processes the video
for each target language with full visual context.
Args:
video_file_path: Path to the video file
target_language: BCP-47 language code (e.g., "es", "fr", "de")
brand_context: Optional comma-separated brand names present in the video
Returns:
Structured JSON with transcript, captions VTT, and audio description VTT
all in the target language
"""
prompt_template = self._load_prompt("gemini_ingestion_targeted.md")
prompt = (
prompt_template
.replace("{TARGET_LANGUAGE}", target_language)
.replace("{BRAND_CONTEXT}", self._build_brand_context_block(brand_context))
.replace("{SDH_FIELD}", self._build_sdh_field(sdh_requested))
.replace("{SDH_GUIDELINES}", self._build_sdh_guidelines(sdh_requested))
)
uploaded_file = None
try:
logger.info(f"Starting Gemini targeted processing for video: {video_file_path}, target: {target_language}")
# Upload video file to Gemini using new API
logger.info("Uploading video file to Gemini API for targeted extraction...")
uploaded_file = await asyncio.to_thread(
client.files.upload,
file=video_file_path,
config={
"display_name": f"video_processing_targeted_{target_language}_{Path(video_file_path).name}",
"mime_type": "video/mp4"
}
)
logger.info(f"Successfully uploaded file: {uploaded_file.name} (URI: {uploaded_file.uri})")
# Wait for file to become ACTIVE before using it
logger.info("Waiting for file to become ACTIVE...")
file_ready = await self._wait_for_file_active(uploaded_file.name)
if not file_ready:
raise Exception("File failed to become ACTIVE within timeout")
# Generate content using new API
logger.info(f"Generating content with Gemini model for {target_language}...")
_t0 = time.monotonic()
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[
genai.types.Part.from_text(text=prompt),
genai.types.Part.from_uri(
file_uri=uploaded_file.uri,
mime_type=uploaded_file.mime_type
)
]
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
# Parse JSON response
response_text = response.text.strip()
logger.info(f"Received Gemini targeted response for {target_language} (first 200 chars): {response_text[:200]}...")
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
logger.info("Cleaned markdown formatting from response")
response_text = response_text.strip()
logger.info("Parsing JSON response...")
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error at position {e.pos}: {e.msg}")
start = max(0, e.pos - 100)
end = min(len(response_text), e.pos + 100)
problematic_text = response_text[start:end]
logger.error(f"Problematic JSON area: ...{problematic_text}...")
# Attempt self-healing
return await self._self_heal_targeted_response(target_language, response_text)
# Validate required fields
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
for field in required_fields:
if field not in result:
raise ValueError(f"Missing required field: {field}")
# Validate VTT format
if not result["captions_vtt"].startswith("WEBVTT"):
raise ValueError("Invalid captions VTT format")
if not result["audio_description_vtt"].startswith("WEBVTT"):
raise ValueError("Invalid audio description VTT format")
logger.info(
f"Successfully extracted targeted accessibility content for {target_language} "
f"with confidence: {result['confidence']}"
)
return result
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Gemini JSON response: {e}")
logger.error(f"Raw response that failed to parse: {response_text}")
return await self._self_heal_targeted_response(target_language, response_text)
except Exception as e:
logger.error(f"Gemini targeted extraction failed for {target_language}: {type(e).__name__}: {str(e)}")
logger.error(f"Video file path: {video_file_path}")
print(f"🚨 GEMINI TARGETED ERROR ({target_language}): {type(e).__name__}: {str(e)}")
raise
finally:
# Cleanup uploaded file
if uploaded_file:
try:
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
logger.info(f"Successfully cleaned up uploaded file: {uploaded_file.name}")
except Exception as e:
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
async def _self_heal_targeted_response(
self,
target_language: str,
invalid_response: str
) -> dict[str, Any]:
"""Attempt to self-heal invalid JSON response from targeted extraction"""
logger.info(f"Attempting to self-heal targeted response for {target_language}")
# Try to fix common JSON issues first
try:
fixed_response = self._attempt_json_fix(invalid_response)
if fixed_response:
logger.info("Successfully fixed JSON without re-processing")
return fixed_response
except Exception as e:
logger.warning(f"JSON fix attempt failed: {e}")
self_heal_prompt = f"""
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
CRITICAL REQUIREMENTS:
- The JSON MUST contain these exact fields: language, confidence, summary, transcript_plaintext, captions_vtt, audio_description_vtt
- All content should be in {target_language}
- If audio_description_vtt is truncated or missing, reconstruct it as a valid WebVTT with at least basic descriptions in {target_language}
- All VTT content must start with "WEBVTT" and have proper timestamp format (HH:MM:SS.mmm --> HH:MM:SS.mmm)
- Properly escape all quotes within strings using \"
- Fix unterminated strings by adding closing quotes
- Remove trailing commas
- Ensure all JSON is properly closed with }}
Fix the JSON and return it:
{invalid_response}
"""
try:
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
)
response_text = response.text.strip()
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
result = json.loads(response_text)
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
missing_fields = [field for field in required_fields if field not in result]
if missing_fields:
logger.error(f"Self-heal lost required fields: {missing_fields}")
if "audio_description_vtt" in missing_fields:
logger.info("Creating fallback audio_description_vtt")
result["audio_description_vtt"] = "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\nVideo content with visual elements described."
remaining_missing = [f for f in missing_fields if f != "audio_description_vtt"]
if remaining_missing:
raise ValueError(f"Self-heal failed to preserve required fields: {remaining_missing}")
logger.info(f"Successfully self-healed targeted response for {target_language}")
return result
except Exception as e:
logger.error(f"Self-heal attempt failed for {target_language}: {e}")
raise ValueError(f"Failed to get valid JSON from Gemini targeted extraction for {target_language}")
def _attempt_json_fix(self, json_text: str) -> dict[str, Any] | None:
"""Attempt to fix common JSON syntax issues"""
# Try to identify and fix common issues
fixes_tried = []
fixed_text = json_text
import re
# Fix 1: Remove trailing commas
fixed_text = re.sub(r',(\s*[}\]])', r'\1', fixed_text)
fixes_tried.append("removed trailing commas")
# Fix 2: Try to fix unterminated strings by adding closing quote and brace
if fixed_text.count('"') % 2 != 0: # Odd number of quotes suggests unterminated string
# Find the last quote and see if we need to close the JSON
last_quote_pos = fixed_text.rfind('"')
remainder = fixed_text[last_quote_pos + 1:].strip()
# If there's no closing brace after the last quote, try to fix it
if remainder and not remainder.endswith('}'):
# Try to intelligently close the JSON
if 'audio_description_vtt' in fixed_text[max(0, last_quote_pos - 100):]:
# This appears to be in the audio_description_vtt field
fixed_text += '"\n}'
fixes_tried.append("closed unterminated audio_description_vtt string")
else:
fixed_text += '"'
fixes_tried.append("closed unterminated string")
# Fix 3: Ensure JSON ends with closing brace
if not fixed_text.rstrip().endswith('}'):
fixed_text = fixed_text.rstrip() + '\n}'
fixes_tried.append("added closing brace")
try:
result = json.loads(fixed_text)
logger.info(f"JSON fixed with: {', '.join(fixes_tried)}")
# Validate that we have the required fields
required_fields = [
"language", "confidence", "summary",
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
]
missing_fields = [field for field in required_fields if field not in result]
if missing_fields:
logger.warning(f"Fixed JSON is missing required fields: {missing_fields}")
return None # Let the more advanced self-healing handle this
return result
except json.JSONDecodeError as e:
logger.debug(f"JSON fix attempt failed: {e}")
return None
async def analyze_accessible_video_placement(
self,
video_file_path: str,
ad_vtt_content: str,
ad_cue_durations: list[float]
) -> dict[str, Any]:
"""
DEPRECATED: This function is no longer called in the render pipeline.
Pause points are now derived from AD VTT cue start times and refined by Whisper.
Method selection (overlay/pause_insert) is done by user at QC Review approval.
This function is kept for potential future use or rollback scenarios.
See render_accessible_video._build_placements_from_ad_vtt() for the replacement logic.
---
Original description:
Analyze video and determine optimal method for integrating audio descriptions.
Returns placement instructions for each AD cue.
Args:
video_file_path: Path to the source video file
ad_vtt_content: The audio description VTT content
ad_cue_durations: List of actual TTS audio durations in seconds (matching VTT cue order)
Returns:
Dictionary with method choice and placement instructions for each AD cue
"""
import warnings
warnings.warn(
"analyze_accessible_video_placement is deprecated. "
"Pause points are now derived from AD VTT cue start times and refined by Whisper.",
DeprecationWarning,
stacklevel=2
)
prompt_template = self._load_prompt("gemini_accessible_video.md")
# Format prompt with AD VTT content and durations
prompt = prompt_template.replace(
"{AD_VTT_CONTENT}", ad_vtt_content
).replace(
"{AD_CUE_DURATIONS}", json.dumps(ad_cue_durations)
)
uploaded_file = None
try:
logger.info(f"Starting accessible video analysis for: {video_file_path}")
logger.info(f"AD cues to place: {len(ad_cue_durations)}")
# Upload video file to Gemini
logger.info("Uploading video file to Gemini API for accessible video analysis...")
uploaded_file = await asyncio.to_thread(
client.files.upload,
file=video_file_path,
config={
"display_name": f"accessible_video_analysis_{Path(video_file_path).name}",
"mime_type": "video/mp4"
}
)
logger.info(f"Successfully uploaded file: {uploaded_file.name}")
# Wait for file to become ACTIVE
logger.info("Waiting for file to become ACTIVE...")
file_ready = await self._wait_for_file_active(uploaded_file.name)
if not file_ready:
raise Exception("File failed to become ACTIVE within timeout")
# Generate content with video and prompt
logger.info("Analyzing video with Gemini for accessible video placement...")
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[
genai.types.Part.from_text(text=prompt),
genai.types.Part.from_uri(
file_uri=uploaded_file.uri,
mime_type=uploaded_file.mime_type
)
]
)
# Parse JSON response
response_text = response.text.strip()
logger.info(f"Received accessible video analysis response (first 300 chars): {response_text[:300]}...")
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
logger.error(f"JSON parse error in accessible video analysis: {e}")
# Try self-healing for this response
result = await self._self_heal_accessible_video_response(response_text)
# Validate required fields
required_fields = ["method", "method_rationale", "dialogue_density", "placements"]
for field in required_fields:
if field not in result:
raise ValueError(f"Missing required field in accessible video analysis: {field}")
# Validate method value
if result["method"] not in ["overlay", "pause_insert"]:
raise ValueError(f"Invalid method value: {result['method']}")
# Validate placements
if len(result["placements"]) != len(ad_cue_durations):
logger.warning(
f"Placement count mismatch: got {len(result['placements'])}, "
f"expected {len(ad_cue_durations)}"
)
logger.info(
f"Accessible video analysis complete: method={result['method']}, "
f"dialogue_density={result['dialogue_density']:.2f}, "
f"placements={len(result['placements'])}"
)
return result
except Exception as e:
logger.error(f"Accessible video analysis failed: {type(e).__name__}: {str(e)}")
raise
finally:
# Cleanup uploaded file
if uploaded_file:
try:
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
logger.info(f"Cleaned up uploaded file: {uploaded_file.name}")
except Exception as e:
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
async def _self_heal_accessible_video_response(self, invalid_response: str) -> dict[str, Any]:
"""Attempt to self-heal invalid JSON response from accessible video analysis"""
logger.info("Attempting to self-heal accessible video analysis response")
self_heal_prompt = f"""
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
CRITICAL REQUIREMENTS:
- The JSON MUST contain: method, method_rationale, dialogue_density, placements, total_added_duration, warnings
- method must be either "overlay" or "pause_insert"
- dialogue_density must be a number between 0 and 1
- placements must be an array of placement objects
- Fix any JSON syntax errors (trailing commas, unterminated strings, etc.)
Fix the JSON and return it:
{invalid_response}
"""
try:
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
)
response_text = response.text.strip()
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
result = json.loads(response_text)
logger.info("Successfully self-healed accessible video analysis response")
return result
except Exception as e:
logger.error(f"Self-heal attempt for accessible video analysis failed: {e}")
raise ValueError("Failed to get valid JSON from accessible video analysis after self-heal")
async def transcreate_content(
self,
captions_vtt: str,
ad_vtt: str,
target_language: str,
brief: Optional[str] = None,
_cost_ctx: Optional[dict] = None,
) -> dict[str, str]:
"""
Transcreate English VTT content to target language with cultural adaptation
"""
prompt_template = self._load_prompt("gemini_transcreation.md")
# Format prompt with actual content
prompt = prompt_template.format(
TARGET_LANGUAGE=target_language
)
user_prompt = f"""
Input:
- captions_vtt_en: {captions_vtt}
- ad_vtt_en: {ad_vtt}
- brief: {brief or "No specific brand guidelines provided"}
Output:
JSON:
"""
try:
_t0 = time.monotonic()
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[
genai.types.Part.from_text(text=prompt + "\n\n" + user_prompt)
]
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
response_text = response.text.strip()
# Handle potential markdown formatting
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
result = json.loads(response_text)
# Validate required fields
if "captions_vtt" not in result or "audio_description_vtt" not in result:
raise ValueError("Missing required VTT fields in transcreation response")
logger.info(f"Successfully transcreated content to {target_language}")
return result
except json.JSONDecodeError as e:
logger.error(f"Failed to parse transcreation JSON response: {e}")
raise ValueError("Invalid JSON response from transcreation")
except Exception as e:
logger.error(f"Transcreation failed: {e}")
raise
async def translate_vtt(
self,
vtt_content: str,
target_language: str,
source_language: str = "en",
_cost_ctx: Optional[dict] = None,
) -> str:
"""
Translate VTT content using Gemini, preserving timing programmatically.
Uses a two-step approach to guarantee timestamp integrity:
1. Send only the text cues (no timestamps) to Gemini as a numbered list
2. Apply translated texts back onto the original VTT using translate_preserving_timing()
This avoids any possibility of Gemini drifting or altering timestamps.
"""
from ..lib.vtt import VTTParser, VTTEditor
source_cues = VTTParser.parse(vtt_content)
if not source_cues:
logger.warning(f"No cues found in VTT for {target_language} translation")
return vtt_content
cue_count = len(source_cues)
async def _attempt_translation(extra_instruction: str = "") -> list[str]:
numbered_texts = "\n".join(
f"{i + 1}. {cue.text.replace(chr(10), ' ')}"
for i, cue in enumerate(source_cues)
)
prompt = f"""Translate the following {cue_count} numbered text segments from {source_language} to {target_language}.
REQUIREMENTS:
- Return EXACTLY {cue_count} numbered lines, one translation per line
- Format: "1. translated text", "2. translated text", etc.
- Preserve speaker labels like [Speaker 1]: unchanged
- Use natural, idiomatic {target_language}
- Do NOT add any explanation, preamble, or extra lines{extra_instruction}
Segments to translate:
{numbered_texts}"""
_t0 = time.monotonic()
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[genai.types.Part.from_text(text=prompt)]
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
return self._parse_numbered_translation(response.text.strip(), cue_count)
try:
translated_texts = await _attempt_translation()
if len(translated_texts) != cue_count:
logger.warning(
f"Translation cue count mismatch for {target_language}: "
f"expected {cue_count}, got {len(translated_texts)}. Retrying."
)
translated_texts = await _attempt_translation(
extra_instruction=f"\n- You MUST return exactly {cue_count} lines, no more, no less"
)
if len(translated_texts) != cue_count:
# Pad or truncate as last resort to avoid breaking downstream
logger.warning(
f"Retried translation still mismatched ({len(translated_texts)} vs {cue_count}). "
f"Padding/truncating to match."
)
if len(translated_texts) < cue_count:
translated_texts.extend(
source_cues[i].text
for i in range(len(translated_texts), cue_count)
)
else:
translated_texts = translated_texts[:cue_count]
result = VTTEditor.translate_preserving_timing(vtt_content, translated_texts)
logger.info(f"Successfully translated VTT to {target_language} ({cue_count} cues)")
return result
except Exception as e:
logger.error(f"Gemini translation failed for {target_language}: {e}")
raise
@staticmethod
def _parse_numbered_translation(response_text: str, expected_count: int) -> list[str]:
"""Parse a numbered list response from Gemini into a list of translated texts."""
import re
lines = response_text.strip().split("\n")
results = []
for line in lines:
line = line.strip()
if not line:
continue
# Match "1. text", "1) text", or just text if already stripped
match = re.match(r"^\d+[.)]\s+(.+)$", line)
if match:
results.append(match.group(1).strip())
elif results or re.match(r"^\d+[.)]", line) is None:
# Non-numbered continuation line — append to last result or skip
if results:
results[-1] += " " + line
return results
async def rewrite_tts_cue(
self,
original_text: str,
language: str = "en",
_cost_ctx: Optional[dict] = None,
) -> str:
"""
Rewrite an audio description cue to be TTS-friendly.
Called when TTS synthesis fails for a cue after retries. Uses Gemini
to rephrase the text while preserving the visual information being described.
Args:
original_text: The cue text that failed TTS synthesis
language: Language code for context (default: 'en')
Returns:
Rewritten text optimized for TTS synthesis
"""
prompt_template = self._load_prompt("gemini_tts_rewrite.md")
prompt = prompt_template.replace(
"{ORIGINAL_TEXT}", original_text
).replace(
"{LANGUAGE}", language
)
try:
logger.info(f"Rewriting TTS cue for safety: '{original_text[:50]}...'")
_t0 = time.monotonic()
response = await asyncio.to_thread(
client.models.generate_content,
model=self.model_name,
contents=[genai.types.Part.from_text(text=prompt)]
)
if _cost_ctx:
asyncio.create_task(_record_gemini_usage(response, self.model_name, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
result = response.text.strip()
# Remove any markdown formatting or quotes that Gemini might add
if result.startswith("```"):
lines = result.split("\n")
filtered_lines = [
line for line in lines
if not line.strip().startswith("```")
]
result = "\n".join(filtered_lines).strip()
# Remove surrounding quotes if present
if result.startswith('"') and result.endswith('"'):
result = result[1:-1]
if result.startswith("'") and result.endswith("'"):
result = result[1:-1]
logger.info(f"Rewrote TTS cue: '{original_text[:30]}...' -> '{result[:30]}...'")
return result
except Exception as e:
logger.error(f"Failed to rewrite TTS cue: {e}")
raise
# Global service instance
gemini_service = GeminiService()