- B904 (55): add `from err` / `from None` to raise-in-except across 13 files - F821 (1): add missing HTTPException import in routes_language_qc.py - F841 (7): remove unused variable assignments (current_user, job_title, tts_provider, etc.) - W293 (13): strip trailing whitespace from blank lines - C416 (4): rewrite unnecessary dict comprehensions as dict() - C401 (1): rewrite unnecessary generator as set comprehension - E701 (4): split multi-statement lines in cost_tracker.py - E741 (1): rename ambiguous `l` to `lang` in cloud_run_dispatch.py - B007 (4): prefix unused loop variables with _ in tts.py, video_renderer.py - I001 (1): sort imports in tasks/__init__.py (move stdlib to top) - E402 (3): move threading/time/signals imports to top of tasks/__init__.py - UP042 (9): replace (str, Enum) with StrEnum in all model/schema enums Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1038 lines
46 KiB
Python
1038 lines
46 KiB
Python
import asyncio
|
||
import json
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import google.genai as genai
|
||
|
||
from ..core.config import settings
|
||
from ..core.logging import get_logger
|
||
from ..lib import locales as locale_lib
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
# Configure Gemini client
|
||
client = genai.Client(api_key=settings.gemini_api_key)
|
||
|
||
|
||
async def _record_gemini_usage(
|
||
response,
|
||
model: str,
|
||
user_id: str,
|
||
job_id: str,
|
||
project_id: str | None,
|
||
elapsed_ms: int,
|
||
) -> None:
|
||
try:
|
||
from ..services import cost_tracker
|
||
usage = getattr(response, "usage_metadata", None)
|
||
if usage is None:
|
||
return
|
||
await cost_tracker.aio_record(
|
||
model=model,
|
||
provider="google",
|
||
user_external_id=user_id,
|
||
project_id=project_id,
|
||
job_external_id=job_id,
|
||
input_tokens=getattr(usage, "prompt_token_count", 0) or 0,
|
||
output_tokens=getattr(usage, "candidates_token_count", 0) or 0,
|
||
latency_ms=elapsed_ms,
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"Cost tracker record failed (non-fatal): {e}")
|
||
|
||
|
||
class GeminiService:
|
||
_fallback_models: list[str] = [
|
||
"gemini-3-flash-preview",
|
||
"gemini-2.5-pro",
|
||
]
|
||
|
||
def __init__(self):
|
||
self.model_name = 'gemini-3.1-pro-preview'
|
||
self.prompts_dir = Path(__file__).parent.parent / "prompts"
|
||
|
||
async def _generate(self, contents: Any, config: Any = None) -> tuple[Any, str]:
|
||
"""Call generate_content, falling back on 429/503 transient errors. Returns (response, model_used)."""
|
||
for model in [self.model_name, *self._fallback_models]:
|
||
try:
|
||
kw: dict[str, Any] = {"model": model, "contents": contents}
|
||
if config is not None:
|
||
kw["config"] = config
|
||
response = await asyncio.to_thread(client.models.generate_content, **kw)
|
||
if response.text is None:
|
||
logger.warning(f"Model {model!r} returned empty response (safety block or overload), trying next fallback")
|
||
last_exc: Exception = RuntimeError(f"Model {model!r} returned empty response")
|
||
continue
|
||
if model != self.model_name:
|
||
logger.warning(f"Used fallback model {model!r} (primary unavailable)")
|
||
return response, model
|
||
except Exception as exc:
|
||
msg = str(exc)
|
||
if "429" in msg or "RESOURCE_EXHAUSTED" in msg or "503" in msg or "UNAVAILABLE" in msg:
|
||
logger.warning(f"Model {model!r} unavailable, trying next fallback")
|
||
last_exc = exc
|
||
continue
|
||
raise
|
||
raise last_exc # noqa: F821 — set in loop above when all models exhausted
|
||
|
||
def _load_prompt(self, prompt_file: str) -> str:
|
||
"""Load prompt template from prompts directory"""
|
||
prompt_path = self.prompts_dir / prompt_file
|
||
try:
|
||
return prompt_path.read_text()
|
||
except FileNotFoundError:
|
||
logger.error(f"Prompt file not found: {prompt_file}")
|
||
raise
|
||
|
||
async def _wait_for_file_active(self, file_name: str, max_wait_seconds: int = 300) -> bool:
|
||
"""Wait for uploaded file to become ACTIVE state"""
|
||
wait_time = 1 # Start with 1 second
|
||
total_waited = 0
|
||
|
||
while total_waited < max_wait_seconds:
|
||
try:
|
||
# Get file status - use asyncio.to_thread to avoid blocking event loop
|
||
file_info = await asyncio.to_thread(client.files.get, name=file_name)
|
||
logger.info(f"File {file_name} status: {file_info.state} (waited {total_waited}s)")
|
||
|
||
if file_info.state == "ACTIVE":
|
||
logger.info(f"File {file_name} is now ACTIVE!")
|
||
return True
|
||
elif file_info.state == "FAILED":
|
||
logger.error(f"File {file_name} processing FAILED")
|
||
return False
|
||
|
||
# Wait with exponential backoff (max 30s)
|
||
logger.info(f"File not ready, waiting {wait_time}s...")
|
||
await asyncio.sleep(wait_time)
|
||
total_waited += wait_time
|
||
wait_time = min(wait_time * 1.5, 30) # Exponential backoff, max 30s
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error checking file status: {e}")
|
||
await asyncio.sleep(5) # Wait 5s on error
|
||
total_waited += 5
|
||
|
||
logger.error(f"File {file_name} did not become ACTIVE within {max_wait_seconds}s")
|
||
return False
|
||
|
||
def _build_sdh_field(self, sdh_requested: bool) -> str:
|
||
if sdh_requested:
|
||
return "- sdh_captions_vtt: a valid WebVTT file as a single string, containing SDH-format captions (same timing as captions_vtt, but enriched with speaker labels, sound effects, and music notation)"
|
||
return ""
|
||
|
||
def _build_sdh_guidelines(self, sdh_requested: bool) -> str:
|
||
if not sdh_requested:
|
||
return ""
|
||
return """SDH (SUBTITLES FOR THE DEAF AND HARD OF HEARING) GUIDELINES:
|
||
Generate sdh_captions_vtt using the same cue timings as captions_vtt, enriched with:
|
||
- Speaker identification when multiple speakers are present: use "NAME:" prefix (e.g., "JOHN: Hello there") or "[NARRATOR]" for narration
|
||
- Non-speech sounds that are plot-relevant, in square brackets: [DOOR SLAMS], [PHONE RINGS], [CROWD CHEERING], [THUNDER]
|
||
- Music: use ♪ for background music cues (e.g., "♪ tense music ♪") or ♪ around sung lyrics
|
||
- Off-screen or voice-over speakers: indicate with "(off-screen)" or "[V.O.]" where relevant
|
||
- Non-speech vocalisations when relevant: [SIGHS], [LAUGHS], [SCREAMS]
|
||
- Maintain the same timestamp format as captions_vtt (HH:MM:SS.mmm --> HH:MM:SS.mmm)
|
||
- Only add sound effect cues where they add meaningful context; do not annotate every minor sound"""
|
||
|
||
def _build_glossary_block(self, glossary_block: str | None) -> str:
|
||
"""Return the pre-built glossary block (from glossary_service.build_glossary_prompt_block), or empty string."""
|
||
if glossary_block and glossary_block.strip():
|
||
return glossary_block.strip()
|
||
return ""
|
||
|
||
def _build_source_has_ad_block(self, source_has_ad: bool) -> str:
|
||
if source_has_ad:
|
||
return (
|
||
"SOURCE AUDIO DESCRIPTION NOTICE: This video already has professional audio descriptions "
|
||
"embedded in its audio track. "
|
||
"1) Return an empty audio_description_vtt containing only the WEBVTT header (\"WEBVTT\\n\") — do NOT generate new audio descriptions. "
|
||
"2) For captions_vtt: transcribe ONLY the original program dialogue and relevant sound effects. "
|
||
"Do NOT caption the audio description narration — AD narration is spoken during natural pauses "
|
||
"and describes visual scenes rather than being part of the original dialogue."
|
||
)
|
||
return ""
|
||
|
||
def _build_brand_context_block(self, brand_context: str | None) -> str:
|
||
"""Build the brand context instruction block for injection into prompts."""
|
||
if brand_context and brand_context.strip():
|
||
brands = [b.strip() for b in brand_context.split(",") if b.strip()]
|
||
if brands:
|
||
brand_list = ", ".join(f'"{b}"' for b in brands)
|
||
return (
|
||
f"The client has confirmed the following brand names appear in this video: {brand_list}. "
|
||
f"Use these exact brand names when you identify those products on screen."
|
||
)
|
||
return "No specific brand names have been provided for this video."
|
||
|
||
async def extract_accessibility(self, video_file_path: str, brand_context: str | None = None, sdh_requested: bool = False, glossary_block: str | None = None, source_has_ad: bool = False, _cost_ctx: dict | None = None) -> dict[str, Any]:
|
||
"""
|
||
Extract captions and audio descriptions from video using Gemini 2.0
|
||
Returns structured JSON with transcript, captions VTT, and audio description VTT
|
||
"""
|
||
prompt_template = self._load_prompt("gemini_ingestion.md")
|
||
prompt = (
|
||
prompt_template
|
||
.replace("{BRAND_CONTEXT}", self._build_brand_context_block(brand_context))
|
||
.replace("{GLOSSARY}", self._build_glossary_block(glossary_block))
|
||
.replace("{SDH_FIELD}", self._build_sdh_field(sdh_requested))
|
||
.replace("{SDH_GUIDELINES}", self._build_sdh_guidelines(sdh_requested))
|
||
.replace("{SOURCE_HAS_AD}", self._build_source_has_ad_block(source_has_ad))
|
||
)
|
||
uploaded_file = None
|
||
|
||
try:
|
||
logger.info(f"Starting Gemini processing for video: {video_file_path}")
|
||
|
||
# Upload video file to Gemini using new API - use asyncio.to_thread to avoid blocking
|
||
logger.info("Uploading video file to Gemini API...")
|
||
uploaded_file = await asyncio.to_thread(
|
||
client.files.upload,
|
||
file=video_file_path,
|
||
config={
|
||
"display_name": f"video_processing_{Path(video_file_path).name}",
|
||
"mime_type": "video/mp4"
|
||
}
|
||
)
|
||
logger.info(f"Successfully uploaded file: {uploaded_file.name} (URI: {uploaded_file.uri})")
|
||
|
||
# Wait for file to become ACTIVE before using it
|
||
logger.info("Waiting for file to become ACTIVE...")
|
||
file_ready = await self._wait_for_file_active(uploaded_file.name)
|
||
if not file_ready:
|
||
raise Exception("File failed to become ACTIVE within timeout")
|
||
|
||
# Generate content using new API - use asyncio.to_thread to avoid blocking
|
||
logger.info("Generating content with Gemini model...")
|
||
_t0 = time.monotonic()
|
||
response, _model_used = await self._generate(
|
||
contents=[
|
||
genai.types.Part.from_text(text=prompt),
|
||
genai.types.Part.from_uri(
|
||
file_uri=uploaded_file.uri,
|
||
mime_type=uploaded_file.mime_type
|
||
)
|
||
],
|
||
config=genai.types.GenerateContentConfig(
|
||
temperature=0.2,
|
||
top_p=0.8,
|
||
top_k=40,
|
||
),
|
||
)
|
||
if _cost_ctx:
|
||
asyncio.create_task(_record_gemini_usage(response, _model_used, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
|
||
|
||
# Parse JSON response
|
||
response_text = response.text.strip()
|
||
logger.info(f"Received Gemini response (first 200 chars): {response_text[:200]}...")
|
||
|
||
# Handle potential markdown formatting
|
||
if response_text.startswith("```json"):
|
||
response_text = response_text.replace("```json", "").replace("```", "").strip()
|
||
logger.info("Cleaned markdown formatting from response")
|
||
|
||
# Additional cleanup for common JSON issues
|
||
response_text = response_text.strip()
|
||
|
||
logger.info("Parsing JSON response...")
|
||
try:
|
||
result = json.loads(response_text)
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"JSON parse error at position {e.pos}: {e.msg}")
|
||
# Log the problematic area
|
||
start = max(0, e.pos - 100)
|
||
end = min(len(response_text), e.pos + 100)
|
||
problematic_text = response_text[start:end]
|
||
logger.error(f"Problematic JSON area: ...{problematic_text}...")
|
||
raise
|
||
|
||
# Validate required fields
|
||
required_fields = [
|
||
"language", "confidence", "summary",
|
||
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
|
||
]
|
||
|
||
for field in required_fields:
|
||
if field not in result:
|
||
raise ValueError(f"Missing required field: {field}")
|
||
|
||
# Validate VTT format
|
||
if not result["captions_vtt"].startswith("WEBVTT"):
|
||
raise ValueError("Invalid captions VTT format")
|
||
|
||
if not result["audio_description_vtt"].startswith("WEBVTT"):
|
||
raise ValueError("Invalid audio description VTT format")
|
||
|
||
logger.info(
|
||
f"Successfully extracted accessibility content with confidence: {result['confidence']}"
|
||
)
|
||
|
||
return result
|
||
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"Failed to parse Gemini JSON response: {e}")
|
||
logger.error(f"Raw response that failed to parse: {response_text}")
|
||
# Attempt self-healing
|
||
return await self._self_heal_response(video_file_path, response_text)
|
||
except Exception as e:
|
||
logger.error(f"Gemini extraction failed with exception: {type(e).__name__}: {str(e)}")
|
||
logger.error(f"Video file path: {video_file_path}")
|
||
# Print to stdout for immediate visibility
|
||
print(f"🚨 GEMINI ERROR: {type(e).__name__}: {str(e)}")
|
||
raise
|
||
finally:
|
||
# Guaranteed cleanup of uploaded file regardless of success/failure/cancellation
|
||
if uploaded_file:
|
||
try:
|
||
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
|
||
logger.info(f"Successfully cleaned up uploaded file: {uploaded_file.name}")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
|
||
|
||
async def _self_heal_response(self, video_file_path: str, invalid_response: str) -> dict[str, Any]:
|
||
"""Attempt to self-heal invalid JSON response from Gemini"""
|
||
logger.info("Attempting to self-heal JSON response without re-uploading video")
|
||
|
||
# Try to fix common JSON issues first
|
||
try:
|
||
fixed_response = self._attempt_json_fix(invalid_response)
|
||
if fixed_response:
|
||
logger.info("Successfully fixed JSON without re-processing")
|
||
return fixed_response
|
||
except Exception as e:
|
||
logger.warning(f"JSON fix attempt failed: {e}")
|
||
|
||
# If simple fixes don't work, try a text-only self-heal prompt with more context
|
||
self_heal_prompt = f"""
|
||
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
|
||
|
||
CRITICAL REQUIREMENTS:
|
||
- The JSON MUST contain these exact fields: language, confidence, summary, transcript_plaintext, captions_vtt, audio_description_vtt
|
||
- If audio_description_vtt is truncated or missing, reconstruct it as a valid WebVTT with at least basic descriptions
|
||
- All VTT content must start with "WEBVTT" and have proper timestamp format (HH:MM:SS.mmm --> HH:MM:SS.mmm)
|
||
- Properly escape all quotes within strings using \"
|
||
- Fix unterminated strings by adding closing quotes
|
||
- Remove trailing commas
|
||
- Ensure all JSON is properly closed with }}
|
||
|
||
Fix the JSON and return it:
|
||
|
||
{invalid_response}
|
||
"""
|
||
|
||
try:
|
||
response, _ = await self._generate(
|
||
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
|
||
)
|
||
|
||
response_text = response.text.strip()
|
||
|
||
# Handle potential markdown formatting
|
||
if response_text.startswith("```json"):
|
||
response_text = response_text.replace("```json", "").replace("```", "").strip()
|
||
|
||
result = json.loads(response_text)
|
||
|
||
# Validate that all required fields are present after healing
|
||
required_fields = [
|
||
"language", "confidence", "summary",
|
||
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
|
||
]
|
||
|
||
missing_fields = [field for field in required_fields if field not in result]
|
||
if missing_fields:
|
||
logger.error(f"Self-heal lost required fields: {missing_fields}")
|
||
# If audio_description_vtt is missing, create a basic one
|
||
if "audio_description_vtt" in missing_fields:
|
||
logger.info("Creating fallback audio_description_vtt")
|
||
result["audio_description_vtt"] = "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\nVideo content with visual elements described."
|
||
|
||
# If other critical fields are missing, raise an error
|
||
remaining_missing = [f for f in missing_fields if f != "audio_description_vtt"]
|
||
if remaining_missing:
|
||
raise ValueError(f"Self-heal failed to preserve required fields: {remaining_missing}")
|
||
|
||
logger.info("Successfully self-healed Gemini response with all required fields")
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"Self-heal attempt failed: {e}")
|
||
raise ValueError("Failed to get valid JSON from Gemini after self-heal attempt") from e
|
||
|
||
async def extract_accessibility_targeted(
|
||
self,
|
||
video_file_path: str,
|
||
target_language: str,
|
||
brand_context: str | None = None,
|
||
sdh_requested: bool = False,
|
||
glossary_block: str | None = None,
|
||
_cost_ctx: dict | None = None,
|
||
) -> dict[str, Any]:
|
||
"""
|
||
Extract captions and audio descriptions from video using Gemini,
|
||
generating content directly in the specified target language.
|
||
|
||
Unlike extract_accessibility() which auto-detects language, this method
|
||
takes an explicit target language and generates all outputs in that language.
|
||
This is used for "video_native" translation mode which re-processes the video
|
||
for each target language with full visual context.
|
||
|
||
Args:
|
||
video_file_path: Path to the video file
|
||
target_language: BCP-47 language code (e.g., "es", "fr", "de")
|
||
brand_context: Optional comma-separated brand names present in the video
|
||
|
||
Returns:
|
||
Structured JSON with transcript, captions VTT, and audio description VTT
|
||
all in the target language
|
||
"""
|
||
prompt_template = self._load_prompt("gemini_ingestion_targeted.md")
|
||
prompt = (
|
||
prompt_template
|
||
.replace("{TARGET_LANGUAGE}", locale_lib.get_gemini_label(target_language))
|
||
.replace("{BRAND_CONTEXT}", self._build_brand_context_block(brand_context))
|
||
.replace("{GLOSSARY}", self._build_glossary_block(glossary_block))
|
||
.replace("{SDH_FIELD}", self._build_sdh_field(sdh_requested))
|
||
.replace("{SDH_GUIDELINES}", self._build_sdh_guidelines(sdh_requested))
|
||
)
|
||
uploaded_file = None
|
||
|
||
try:
|
||
logger.info(f"Starting Gemini targeted processing for video: {video_file_path}, target: {target_language}")
|
||
|
||
# Upload video file to Gemini using new API
|
||
logger.info("Uploading video file to Gemini API for targeted extraction...")
|
||
uploaded_file = await asyncio.to_thread(
|
||
client.files.upload,
|
||
file=video_file_path,
|
||
config={
|
||
"display_name": f"video_processing_targeted_{target_language}_{Path(video_file_path).name}",
|
||
"mime_type": "video/mp4"
|
||
}
|
||
)
|
||
logger.info(f"Successfully uploaded file: {uploaded_file.name} (URI: {uploaded_file.uri})")
|
||
|
||
# Wait for file to become ACTIVE before using it
|
||
logger.info("Waiting for file to become ACTIVE...")
|
||
file_ready = await self._wait_for_file_active(uploaded_file.name)
|
||
if not file_ready:
|
||
raise Exception("File failed to become ACTIVE within timeout")
|
||
|
||
# Generate content using new API
|
||
logger.info(f"Generating content with Gemini model for {target_language}...")
|
||
_t0 = time.monotonic()
|
||
response, _model_used = await self._generate(
|
||
contents=[
|
||
genai.types.Part.from_text(text=prompt),
|
||
genai.types.Part.from_uri(
|
||
file_uri=uploaded_file.uri,
|
||
mime_type=uploaded_file.mime_type
|
||
)
|
||
]
|
||
)
|
||
if _cost_ctx:
|
||
asyncio.create_task(_record_gemini_usage(response, _model_used, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
|
||
|
||
# Parse JSON response
|
||
response_text = response.text.strip()
|
||
logger.info(f"Received Gemini targeted response for {target_language} (first 200 chars): {response_text[:200]}...")
|
||
|
||
# Handle potential markdown formatting
|
||
if response_text.startswith("```json"):
|
||
response_text = response_text.replace("```json", "").replace("```", "").strip()
|
||
logger.info("Cleaned markdown formatting from response")
|
||
|
||
response_text = response_text.strip()
|
||
|
||
logger.info("Parsing JSON response...")
|
||
try:
|
||
result = json.loads(response_text)
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"JSON parse error at position {e.pos}: {e.msg}")
|
||
start = max(0, e.pos - 100)
|
||
end = min(len(response_text), e.pos + 100)
|
||
problematic_text = response_text[start:end]
|
||
logger.error(f"Problematic JSON area: ...{problematic_text}...")
|
||
# Attempt self-healing
|
||
return await self._self_heal_targeted_response(target_language, response_text)
|
||
|
||
# Validate required fields
|
||
required_fields = [
|
||
"language", "confidence", "summary",
|
||
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
|
||
]
|
||
|
||
for field in required_fields:
|
||
if field not in result:
|
||
raise ValueError(f"Missing required field: {field}")
|
||
|
||
# Validate VTT format
|
||
if not result["captions_vtt"].startswith("WEBVTT"):
|
||
raise ValueError("Invalid captions VTT format")
|
||
|
||
if not result["audio_description_vtt"].startswith("WEBVTT"):
|
||
raise ValueError("Invalid audio description VTT format")
|
||
|
||
logger.info(
|
||
f"Successfully extracted targeted accessibility content for {target_language} "
|
||
f"with confidence: {result['confidence']}"
|
||
)
|
||
|
||
return result
|
||
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"Failed to parse Gemini JSON response: {e}")
|
||
logger.error(f"Raw response that failed to parse: {response_text}")
|
||
return await self._self_heal_targeted_response(target_language, response_text)
|
||
except Exception as e:
|
||
logger.error(f"Gemini targeted extraction failed for {target_language}: {type(e).__name__}: {str(e)}")
|
||
logger.error(f"Video file path: {video_file_path}")
|
||
print(f"🚨 GEMINI TARGETED ERROR ({target_language}): {type(e).__name__}: {str(e)}")
|
||
raise
|
||
finally:
|
||
# Cleanup uploaded file
|
||
if uploaded_file:
|
||
try:
|
||
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
|
||
logger.info(f"Successfully cleaned up uploaded file: {uploaded_file.name}")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
|
||
|
||
async def _self_heal_targeted_response(
|
||
self,
|
||
target_language: str,
|
||
invalid_response: str
|
||
) -> dict[str, Any]:
|
||
"""Attempt to self-heal invalid JSON response from targeted extraction"""
|
||
logger.info(f"Attempting to self-heal targeted response for {target_language}")
|
||
|
||
# Try to fix common JSON issues first
|
||
try:
|
||
fixed_response = self._attempt_json_fix(invalid_response)
|
||
if fixed_response:
|
||
logger.info("Successfully fixed JSON without re-processing")
|
||
return fixed_response
|
||
except Exception as e:
|
||
logger.warning(f"JSON fix attempt failed: {e}")
|
||
|
||
self_heal_prompt = f"""
|
||
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
|
||
|
||
CRITICAL REQUIREMENTS:
|
||
- The JSON MUST contain these exact fields: language, confidence, summary, transcript_plaintext, captions_vtt, audio_description_vtt
|
||
- All content should be in {target_language}
|
||
- If audio_description_vtt is truncated or missing, reconstruct it as a valid WebVTT with at least basic descriptions in {target_language}
|
||
- All VTT content must start with "WEBVTT" and have proper timestamp format (HH:MM:SS.mmm --> HH:MM:SS.mmm)
|
||
- Properly escape all quotes within strings using \"
|
||
- Fix unterminated strings by adding closing quotes
|
||
- Remove trailing commas
|
||
- Ensure all JSON is properly closed with }}
|
||
|
||
Fix the JSON and return it:
|
||
|
||
{invalid_response}
|
||
"""
|
||
|
||
try:
|
||
response, _ = await self._generate(
|
||
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
|
||
)
|
||
|
||
response_text = response.text.strip()
|
||
|
||
if response_text.startswith("```json"):
|
||
response_text = response_text.replace("```json", "").replace("```", "").strip()
|
||
|
||
result = json.loads(response_text)
|
||
|
||
required_fields = [
|
||
"language", "confidence", "summary",
|
||
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
|
||
]
|
||
|
||
missing_fields = [field for field in required_fields if field not in result]
|
||
if missing_fields:
|
||
logger.error(f"Self-heal lost required fields: {missing_fields}")
|
||
if "audio_description_vtt" in missing_fields:
|
||
logger.info("Creating fallback audio_description_vtt")
|
||
result["audio_description_vtt"] = "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\nVideo content with visual elements described."
|
||
|
||
remaining_missing = [f for f in missing_fields if f != "audio_description_vtt"]
|
||
if remaining_missing:
|
||
raise ValueError(f"Self-heal failed to preserve required fields: {remaining_missing}")
|
||
|
||
logger.info(f"Successfully self-healed targeted response for {target_language}")
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"Self-heal attempt failed for {target_language}: {e}")
|
||
raise ValueError(f"Failed to get valid JSON from Gemini targeted extraction for {target_language}") from e
|
||
|
||
def _attempt_json_fix(self, json_text: str) -> dict[str, Any] | None:
|
||
"""Attempt to fix common JSON syntax issues"""
|
||
# Try to identify and fix common issues
|
||
fixes_tried = []
|
||
fixed_text = json_text
|
||
import re
|
||
|
||
# Fix 1: Remove trailing commas
|
||
fixed_text = re.sub(r',(\s*[}\]])', r'\1', fixed_text)
|
||
fixes_tried.append("removed trailing commas")
|
||
|
||
# Fix 2: Try to fix unterminated strings by adding closing quote and brace
|
||
if fixed_text.count('"') % 2 != 0: # Odd number of quotes suggests unterminated string
|
||
# Find the last quote and see if we need to close the JSON
|
||
last_quote_pos = fixed_text.rfind('"')
|
||
remainder = fixed_text[last_quote_pos + 1:].strip()
|
||
|
||
# If there's no closing brace after the last quote, try to fix it
|
||
if remainder and not remainder.endswith('}'):
|
||
# Try to intelligently close the JSON
|
||
if 'audio_description_vtt' in fixed_text[max(0, last_quote_pos - 100):]:
|
||
# This appears to be in the audio_description_vtt field
|
||
fixed_text += '"\n}'
|
||
fixes_tried.append("closed unterminated audio_description_vtt string")
|
||
else:
|
||
fixed_text += '"'
|
||
fixes_tried.append("closed unterminated string")
|
||
|
||
# Fix 3: Ensure JSON ends with closing brace
|
||
if not fixed_text.rstrip().endswith('}'):
|
||
fixed_text = fixed_text.rstrip() + '\n}'
|
||
fixes_tried.append("added closing brace")
|
||
|
||
try:
|
||
result = json.loads(fixed_text)
|
||
logger.info(f"JSON fixed with: {', '.join(fixes_tried)}")
|
||
|
||
# Validate that we have the required fields
|
||
required_fields = [
|
||
"language", "confidence", "summary",
|
||
"transcript_plaintext", "captions_vtt", "audio_description_vtt"
|
||
]
|
||
|
||
missing_fields = [field for field in required_fields if field not in result]
|
||
if missing_fields:
|
||
logger.warning(f"Fixed JSON is missing required fields: {missing_fields}")
|
||
return None # Let the more advanced self-healing handle this
|
||
|
||
return result
|
||
except json.JSONDecodeError as e:
|
||
logger.debug(f"JSON fix attempt failed: {e}")
|
||
return None
|
||
|
||
async def analyze_accessible_video_placement(
|
||
self,
|
||
video_file_path: str,
|
||
ad_vtt_content: str,
|
||
ad_cue_durations: list[float]
|
||
) -> dict[str, Any]:
|
||
"""
|
||
DEPRECATED: This function is no longer called in the render pipeline.
|
||
|
||
Pause points are now derived from AD VTT cue start times and refined by Whisper.
|
||
Method selection (overlay/pause_insert) is done by user at QC Review approval.
|
||
|
||
This function is kept for potential future use or rollback scenarios.
|
||
See render_accessible_video._build_placements_from_ad_vtt() for the replacement logic.
|
||
|
||
---
|
||
Original description:
|
||
Analyze video and determine optimal method for integrating audio descriptions.
|
||
Returns placement instructions for each AD cue.
|
||
|
||
Args:
|
||
video_file_path: Path to the source video file
|
||
ad_vtt_content: The audio description VTT content
|
||
ad_cue_durations: List of actual TTS audio durations in seconds (matching VTT cue order)
|
||
|
||
Returns:
|
||
Dictionary with method choice and placement instructions for each AD cue
|
||
"""
|
||
import warnings
|
||
warnings.warn(
|
||
"analyze_accessible_video_placement is deprecated. "
|
||
"Pause points are now derived from AD VTT cue start times and refined by Whisper.",
|
||
DeprecationWarning,
|
||
stacklevel=2
|
||
)
|
||
prompt_template = self._load_prompt("gemini_accessible_video.md")
|
||
|
||
# Format prompt with AD VTT content and durations
|
||
prompt = prompt_template.replace(
|
||
"{AD_VTT_CONTENT}", ad_vtt_content
|
||
).replace(
|
||
"{AD_CUE_DURATIONS}", json.dumps(ad_cue_durations)
|
||
)
|
||
|
||
uploaded_file = None
|
||
|
||
try:
|
||
logger.info(f"Starting accessible video analysis for: {video_file_path}")
|
||
logger.info(f"AD cues to place: {len(ad_cue_durations)}")
|
||
|
||
# Upload video file to Gemini
|
||
logger.info("Uploading video file to Gemini API for accessible video analysis...")
|
||
uploaded_file = await asyncio.to_thread(
|
||
client.files.upload,
|
||
file=video_file_path,
|
||
config={
|
||
"display_name": f"accessible_video_analysis_{Path(video_file_path).name}",
|
||
"mime_type": "video/mp4"
|
||
}
|
||
)
|
||
logger.info(f"Successfully uploaded file: {uploaded_file.name}")
|
||
|
||
# Wait for file to become ACTIVE
|
||
logger.info("Waiting for file to become ACTIVE...")
|
||
file_ready = await self._wait_for_file_active(uploaded_file.name)
|
||
if not file_ready:
|
||
raise Exception("File failed to become ACTIVE within timeout")
|
||
|
||
# Generate content with video and prompt
|
||
logger.info("Analyzing video with Gemini for accessible video placement...")
|
||
response, _ = await self._generate(
|
||
contents=[
|
||
genai.types.Part.from_text(text=prompt),
|
||
genai.types.Part.from_uri(
|
||
file_uri=uploaded_file.uri,
|
||
mime_type=uploaded_file.mime_type
|
||
)
|
||
]
|
||
)
|
||
|
||
# Parse JSON response
|
||
response_text = response.text.strip()
|
||
logger.info(f"Received accessible video analysis response (first 300 chars): {response_text[:300]}...")
|
||
|
||
# Handle potential markdown formatting
|
||
if response_text.startswith("```json"):
|
||
response_text = response_text.replace("```json", "").replace("```", "").strip()
|
||
|
||
try:
|
||
result = json.loads(response_text)
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"JSON parse error in accessible video analysis: {e}")
|
||
# Try self-healing for this response
|
||
result = await self._self_heal_accessible_video_response(response_text)
|
||
|
||
# Validate required fields
|
||
required_fields = ["method", "method_rationale", "dialogue_density", "placements"]
|
||
for field in required_fields:
|
||
if field not in result:
|
||
raise ValueError(f"Missing required field in accessible video analysis: {field}")
|
||
|
||
# Validate method value
|
||
if result["method"] not in ["overlay", "pause_insert"]:
|
||
raise ValueError(f"Invalid method value: {result['method']}")
|
||
|
||
# Validate placements
|
||
if len(result["placements"]) != len(ad_cue_durations):
|
||
logger.warning(
|
||
f"Placement count mismatch: got {len(result['placements'])}, "
|
||
f"expected {len(ad_cue_durations)}"
|
||
)
|
||
|
||
logger.info(
|
||
f"Accessible video analysis complete: method={result['method']}, "
|
||
f"dialogue_density={result['dialogue_density']:.2f}, "
|
||
f"placements={len(result['placements'])}"
|
||
)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"Accessible video analysis failed: {type(e).__name__}: {str(e)}")
|
||
raise
|
||
finally:
|
||
# Cleanup uploaded file
|
||
if uploaded_file:
|
||
try:
|
||
await asyncio.to_thread(client.files.delete, name=uploaded_file.name)
|
||
logger.info(f"Cleaned up uploaded file: {uploaded_file.name}")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to cleanup uploaded file {uploaded_file.name}: {e}")
|
||
|
||
async def _self_heal_accessible_video_response(self, invalid_response: str) -> dict[str, Any]:
|
||
"""Attempt to self-heal invalid JSON response from accessible video analysis"""
|
||
logger.info("Attempting to self-heal accessible video analysis response")
|
||
|
||
self_heal_prompt = f"""
|
||
SYSTEM: You are a JSON repair service. Fix the malformed JSON below and return ONLY the corrected JSON.
|
||
|
||
CRITICAL REQUIREMENTS:
|
||
- The JSON MUST contain: method, method_rationale, dialogue_density, placements, total_added_duration, warnings
|
||
- method must be either "overlay" or "pause_insert"
|
||
- dialogue_density must be a number between 0 and 1
|
||
- placements must be an array of placement objects
|
||
- Fix any JSON syntax errors (trailing commas, unterminated strings, etc.)
|
||
|
||
Fix the JSON and return it:
|
||
|
||
{invalid_response}
|
||
"""
|
||
|
||
try:
|
||
response, _ = await self._generate(
|
||
contents=[genai.types.Part.from_text(text=self_heal_prompt)]
|
||
)
|
||
|
||
response_text = response.text.strip()
|
||
if response_text.startswith("```json"):
|
||
response_text = response_text.replace("```json", "").replace("```", "").strip()
|
||
|
||
result = json.loads(response_text)
|
||
logger.info("Successfully self-healed accessible video analysis response")
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"Self-heal attempt for accessible video analysis failed: {e}")
|
||
raise ValueError("Failed to get valid JSON from accessible video analysis after self-heal") from e
|
||
|
||
async def transcreate_content(
|
||
self,
|
||
captions_vtt: str,
|
||
ad_vtt: str,
|
||
target_language: str,
|
||
brief: str | None = None,
|
||
glossary_block: str | None = None,
|
||
_cost_ctx: dict | None = None,
|
||
) -> dict[str, str]:
|
||
"""
|
||
Transcreate English VTT content to target language with cultural adaptation
|
||
"""
|
||
prompt_template = self._load_prompt("gemini_transcreation.md")
|
||
|
||
# Format prompt with actual content
|
||
prompt = prompt_template.format(
|
||
TARGET_LANGUAGE=locale_lib.get_gemini_label(target_language),
|
||
GLOSSARY=self._build_glossary_block(glossary_block),
|
||
)
|
||
|
||
user_prompt = f"""
|
||
Input:
|
||
- captions_vtt_en: {captions_vtt}
|
||
- ad_vtt_en: {ad_vtt}
|
||
- brief: {brief or "No specific brand guidelines provided"}
|
||
|
||
Output:
|
||
JSON:
|
||
"""
|
||
|
||
try:
|
||
_t0 = time.monotonic()
|
||
response, _model_used = await self._generate(
|
||
contents=[genai.types.Part.from_text(text=prompt + "\n\n" + user_prompt)]
|
||
)
|
||
if _cost_ctx:
|
||
asyncio.create_task(_record_gemini_usage(response, _model_used, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
|
||
|
||
response_text = response.text.strip()
|
||
|
||
# Handle potential markdown formatting
|
||
if response_text.startswith("```json"):
|
||
response_text = response_text.replace("```json", "").replace("```", "").strip()
|
||
|
||
result = json.loads(response_text)
|
||
|
||
# Validate required fields
|
||
if "captions_vtt" not in result or "audio_description_vtt" not in result:
|
||
raise ValueError("Missing required VTT fields in transcreation response")
|
||
|
||
logger.info(f"Successfully transcreated content to {target_language}")
|
||
return result
|
||
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"Failed to parse transcreation JSON response: {e}")
|
||
raise ValueError("Invalid JSON response from transcreation") from e
|
||
except Exception as e:
|
||
logger.error(f"Transcreation failed: {e}")
|
||
raise
|
||
|
||
async def translate_vtt(
|
||
self,
|
||
vtt_content: str,
|
||
target_language: str,
|
||
source_language: str = "en",
|
||
glossary_block: str | None = None,
|
||
style: str = "literal",
|
||
_cost_ctx: dict | None = None,
|
||
) -> str:
|
||
"""
|
||
Translate VTT content using Gemini, preserving timing programmatically.
|
||
|
||
Uses a two-step approach to guarantee timestamp integrity:
|
||
1. Send only the text cues (no timestamps) to Gemini as a numbered list
|
||
2. Apply translated texts back onto the original VTT using translate_preserving_timing()
|
||
|
||
style="literal" — direct translation preserving meaning exactly
|
||
style="transcreate" — culturally adapted but still returns EXACTLY N cues 1:1
|
||
"""
|
||
from ..lib.vtt import VTTEditor, VTTParser
|
||
|
||
source_cues = VTTParser.parse(vtt_content)
|
||
if not source_cues:
|
||
logger.warning(f"No cues found in VTT for {target_language} translation")
|
||
return vtt_content
|
||
|
||
cue_count = len(source_cues)
|
||
|
||
_style_instruction = (
|
||
"- Culturally adapt the text for {tgt} audiences (brand voice, natural phrasing), "
|
||
"while keeping accessibility intent and line length (~32–40 chars)\n"
|
||
if style == "transcreate"
|
||
else ""
|
||
)
|
||
|
||
async def _attempt_translation(extra_instruction: str = "") -> list[str]:
|
||
numbered_texts = "\n".join(
|
||
f"{i + 1}. {cue.text.replace(chr(10), ' ')}"
|
||
for i, cue in enumerate(source_cues)
|
||
)
|
||
_src_label = locale_lib.get_gemini_label(source_language)
|
||
_tgt_label = locale_lib.get_gemini_label(target_language)
|
||
_glossary_section = self._build_glossary_block(glossary_block)
|
||
_glossary_line = f"\n\n{_glossary_section}" if _glossary_section else ""
|
||
_glossary_req = (
|
||
"\n- MUST use the exact approved terms from the glossary below — these override natural translation choices, even for English terms"
|
||
if _glossary_section else ""
|
||
)
|
||
_adapt_line = _style_instruction.format(tgt=_tgt_label) if style == "transcreate" else ""
|
||
prompt = f"""Translate the following {cue_count} numbered text segments from {_src_label} to {_tgt_label}.
|
||
|
||
REQUIREMENTS:
|
||
- Return EXACTLY {cue_count} numbered lines, one translation per line
|
||
- Format: "1. translated text", "2. translated text", etc.
|
||
- Preserve speaker labels like [Speaker 1]: unchanged
|
||
- {_adapt_line}Use natural, idiomatic {_tgt_label}
|
||
- Do NOT add any explanation, preamble, or extra lines{extra_instruction}{_glossary_req}{_glossary_line}
|
||
|
||
Segments to translate:
|
||
{numbered_texts}"""
|
||
|
||
_t0 = time.monotonic()
|
||
response, _model_used = await self._generate(
|
||
contents=[genai.types.Part.from_text(text=prompt)]
|
||
)
|
||
if _cost_ctx:
|
||
asyncio.create_task(_record_gemini_usage(response, _model_used, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
|
||
return self._parse_numbered_translation(response.text.strip(), cue_count)
|
||
|
||
try:
|
||
translated_texts = await _attempt_translation()
|
||
|
||
if len(translated_texts) != cue_count:
|
||
logger.warning(
|
||
f"Translation cue count mismatch for {target_language}: "
|
||
f"expected {cue_count}, got {len(translated_texts)}. Retrying."
|
||
)
|
||
translated_texts = await _attempt_translation(
|
||
extra_instruction=f"\n- You MUST return exactly {cue_count} lines, no more, no less"
|
||
)
|
||
|
||
if len(translated_texts) != cue_count:
|
||
# Pad or truncate as last resort to avoid breaking downstream
|
||
logger.warning(
|
||
f"Retried translation still mismatched ({len(translated_texts)} vs {cue_count}). "
|
||
f"Padding/truncating to match."
|
||
)
|
||
if len(translated_texts) < cue_count:
|
||
translated_texts.extend(
|
||
source_cues[i].text
|
||
for i in range(len(translated_texts), cue_count)
|
||
)
|
||
else:
|
||
translated_texts = translated_texts[:cue_count]
|
||
|
||
result = VTTEditor.translate_preserving_timing(vtt_content, translated_texts)
|
||
logger.info(f"Successfully translated VTT to {target_language} ({cue_count} cues)")
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"Gemini translation failed for {target_language}: {e}")
|
||
raise
|
||
|
||
@staticmethod
|
||
def _parse_numbered_translation(response_text: str, expected_count: int) -> list[str]:
|
||
"""Parse a numbered list response from Gemini into a list of translated texts."""
|
||
import re
|
||
lines = response_text.strip().split("\n")
|
||
results = []
|
||
for line in lines:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
# Match "1. text", "1) text", or just text if already stripped
|
||
match = re.match(r"^\d+[.)]\s+(.+)$", line)
|
||
if match:
|
||
results.append(match.group(1).strip())
|
||
elif results or re.match(r"^\d+[.)]", line) is None:
|
||
# Non-numbered continuation line — append to last result or skip
|
||
if results:
|
||
results[-1] += " " + line
|
||
return results
|
||
|
||
async def rewrite_tts_cue(
|
||
self,
|
||
original_text: str,
|
||
language: str = "en",
|
||
_cost_ctx: dict | None = None,
|
||
) -> str:
|
||
"""
|
||
Rewrite an audio description cue to be TTS-friendly.
|
||
|
||
Called when TTS synthesis fails for a cue after retries. Uses Gemini
|
||
to rephrase the text while preserving the visual information being described.
|
||
|
||
Args:
|
||
original_text: The cue text that failed TTS synthesis
|
||
language: Language code for context (default: 'en')
|
||
|
||
Returns:
|
||
Rewritten text optimized for TTS synthesis
|
||
"""
|
||
prompt_template = self._load_prompt("gemini_tts_rewrite.md")
|
||
prompt = prompt_template.replace(
|
||
"{ORIGINAL_TEXT}", original_text
|
||
).replace(
|
||
"{LANGUAGE}", language
|
||
)
|
||
|
||
try:
|
||
logger.info(f"Rewriting TTS cue for safety: '{original_text[:50]}...'")
|
||
|
||
_t0 = time.monotonic()
|
||
response, _model_used = await self._generate(
|
||
contents=[genai.types.Part.from_text(text=prompt)]
|
||
)
|
||
if _cost_ctx:
|
||
asyncio.create_task(_record_gemini_usage(response, _model_used, _cost_ctx.get("user_id", "system"), _cost_ctx.get("job_id", ""), _cost_ctx.get("project_id"), int((time.monotonic() - _t0) * 1000)))
|
||
|
||
result = response.text.strip()
|
||
|
||
# Remove any markdown formatting or quotes that Gemini might add
|
||
if result.startswith("```"):
|
||
lines = result.split("\n")
|
||
filtered_lines = [
|
||
line for line in lines
|
||
if not line.strip().startswith("```")
|
||
]
|
||
result = "\n".join(filtered_lines).strip()
|
||
|
||
# Remove surrounding quotes if present
|
||
if result.startswith('"') and result.endswith('"'):
|
||
result = result[1:-1]
|
||
if result.startswith("'") and result.endswith("'"):
|
||
result = result[1:-1]
|
||
|
||
logger.info(f"Rewrote TTS cue: '{original_text[:30]}...' -> '{result[:30]}...'")
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to rewrite TTS cue: {e}")
|
||
raise
|
||
|
||
|
||
# Global service instance
|
||
gemini_service = GeminiService()
|