video-accessibility/backend/app/tasks/ingest_and_ai.py
Vadym Samoilenko ea21cace96 feat: replace SDK with direct HTTP integration to centralized cost tracker
- New services/cost_tracker.py: sync httpx preflight()/record() + async wrappers;
  BudgetExceeded exception; no-op when COST_TRACKER_BASE_URL is empty
- Preflight budget check added before ingestion (Gemini), per-language translation
  (video-native + traditional), and per-language TTS dispatch
- _record_gemini_usage and _record_tts_cost now call cost_tracker directly;
  removes broken asyncio.get_event_loop() hack from sync Celery worker
- Fix: _cost_ctx now threaded into extract_accessibility_targeted (video-native path)
- Fix: user_id/cost_project_id now propagated through dispatch_language_tts →
  synthesize_cue_task.s() and the rerender_accessible_video.py re-render path
- Remove oliver-cost-tracker SDK dependency (was commented-out/never installed)
- Drop cost_tracker_outbox_path setting and get_cost_tracker() factory
- Update COST_TRACKER_BASE_URL default to optical-dev.oliver.solutions in
  .env.prod.example, docker-compose.yml, and all Cloud Run service yamls
- Cloud Run yamls use Secret Manager ref (cost-tracker-api-key) for the API key

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 13:36:15 +01:00

379 lines
15 KiB
Python

import asyncio
import os
import tempfile
from datetime import datetime
import ffmpeg
from celery import Task
from motor.motor_asyncio import AsyncIOMotorClient
from ..core.config import settings
from ..core.logging import get_logger
from ..models.job import JobStatus
from ..services import cost_tracker
from ..services.cost_tracker import BudgetExceeded
from ..services.gcs import gcs_service, upload_vtt_to_gcs
from ..services.gemini import gemini_service
from ..services.websocket import connection_manager
from . import celery_app
logger = get_logger(__name__)
def broadcast_status_update(job_id: str, status: str, job_title: str = None, message: str = None, progress: int = None):
"""
Helper function to broadcast job status updates via WebSocket
Uses sync Redis client for Celery worker context
"""
logger.info(f"🔊 ATTEMPTING TO BROADCAST: job_id={job_id}, status={status}, job_title={job_title}, message={message}")
try:
import redis as sync_redis
from ..core.config import settings
from ..services.websocket import JobStatusUpdate
from datetime import datetime
logger.info(f"🔊 About to create JobStatusUpdate for job {job_id}")
# Create status update
update = JobStatusUpdate(
job_id=job_id,
status=status,
updated_at=datetime.utcnow(),
job_title=job_title,
message=message,
progress=progress
)
logger.info(f"🔊 Created update object, now connecting to Redis: {settings.redis_url}")
# Create synchronous Redis client
redis_client = sync_redis.Redis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=True
)
logger.info(f"🔊 Redis client created, now publishing to channels")
# Publish to channels
result1 = redis_client.publish("job_status_updates", update.model_dump_json())
result2 = redis_client.publish(f"job_status_updates:{job_id}", update.model_dump_json())
logger.info(f"🔊 Published to channels - general: {result1} subscribers, job-specific: {result2} subscribers")
# Close connection
redis_client.close()
logger.info(f"🔊 ✅ Successfully broadcasted status update for job {job_id}: {status}")
except Exception as e:
logger.error(f"🔊 ❌ Failed to broadcast status update for job {job_id}: {e}")
import traceback
logger.error(f"🔊 ❌ Full traceback: {traceback.format_exc()}")
# Don't let WebSocket failures break the worker task
class AsyncTask(Task):
"""Base task class that supports async execution"""
def __call__(self, *args, **kwargs):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self.run_async(*args, **kwargs))
finally:
loop.close()
async def run_async(self, *args, **kwargs):
raise NotImplementedError
class IngestAndAITask(AsyncTask):
async def run_async(self, job_id: str):
return await ingest_and_ai_task_impl(job_id)
@celery_app.task(bind=True, base=IngestAndAITask)
def ingest_and_ai_task(self, job_id: str):
"""
Pipeline 1: Ingestion & AI Processing
Task wrapper that delegates to async implementation
"""
# This method is called by AsyncTask.__call__
pass
async def ingest_and_ai_task_impl(job_id: str):
"""
Pipeline 1: Ingestion & AI Processing
1. Update status to 'ingesting'
2. Probe video for metadata (duration, codec)
3. Process with Gemini 2.5 Pro
4. Generate VTT files
5. Update status to 'pending_qc'
"""
logger.info(f"Starting ingestion and AI processing for job {job_id}")
# Connect to MongoDB
client = AsyncIOMotorClient(settings.mongodb_uri)
db = client[settings.mongodb_db]
try:
# Get job document to retrieve title for notifications
job_doc = await db.jobs.find_one({"_id": job_id})
if not job_doc:
logger.error(f"Job {job_id} not found in database")
return
job_title = job_doc.get("title", "Untitled Job")
logger.info(f"Processing job: {job_title}")
# Update status to ingesting
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"status": JobStatus.INGESTING.value,
"updated_at": datetime.utcnow()
},
"$push": {
"review.history": {
"at": datetime.utcnow(),
"status": JobStatus.INGESTING.value,
"by": "system"
}
}
}
)
# Broadcast ingesting status update
broadcast_status_update(
job_id,
JobStatus.INGESTING.value,
job_title=job_title,
message=f"{job_title} is being ingested"
)
# Get job details
job_doc = await db.jobs.find_one({"_id": job_id})
if not job_doc:
raise ValueError(f"Job {job_id} not found")
# Download video file temporarily for processing
source_blob_path = job_doc["source"]["gcs_uri"].replace(f"gs://{settings.gcs_bucket}/", "")
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_file:
temp_path = temp_file.name
# Download from GCS
blob = gcs_service.bucket.blob(source_blob_path)
blob.download_to_filename(temp_path)
try:
# Update status to AI processing
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"status": JobStatus.AI_PROCESSING.value,
"updated_at": datetime.utcnow()
},
"$push": {
"review.history": {
"at": datetime.utcnow(),
"status": JobStatus.AI_PROCESSING.value,
"by": "system"
}
}
}
)
# Broadcast AI processing status update
broadcast_status_update(
job_id,
JobStatus.AI_PROCESSING.value,
job_title=job_title,
message=f"{job_title} is being processed by AI"
)
# Probe video for metadata
duration = await _get_video_duration(temp_path)
# Update source with duration
await db.jobs.update_one(
{"_id": job_id},
{"$set": {"source.duration_s": duration}}
)
# Process with Gemini
brand_context = job_doc.get("brand_context")
sdh_requested = job_doc.get("requested_outputs", {}).get("sdh_vtt", False)
_cost_ctx = {
"user_id": job_doc.get("client_id", "system"),
"job_id": job_id,
"project_id": job_doc.get("cost_tracker_project_id"),
}
await cost_tracker.aio_preflight(
model=gemini_service.model_name,
user_external_id=_cost_ctx["user_id"],
project_id=_cost_ctx["project_id"],
)
ai_result = await gemini_service.extract_accessibility(
temp_path,
brand_context=brand_context,
sdh_requested=sdh_requested,
_cost_ctx=_cost_ctx,
)
# Final safety check for required fields
required_fields = ["captions_vtt", "audio_description_vtt"]
missing_fields = [field for field in required_fields if field not in ai_result]
if missing_fields:
logger.error(f"Missing required fields after AI processing: {missing_fields}")
# Create fallback content for missing fields
if "audio_description_vtt" in missing_fields:
ai_result["audio_description_vtt"] = "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\nVideo content with visual elements."
logger.info("Created fallback audio_description_vtt")
# Get detected language from Gemini response
detected_language = ai_result.get("language", "en")
language_hint = job_doc["source"].get("language_hint")
initial_language = job_doc["source"].get("language", "en")
# Log if there's a mismatch between hint/initial and detected language
if language_hint and language_hint != detected_language:
logger.warning(
f"Language mismatch for job {job_id}: "
f"hint={language_hint}, detected={detected_language}"
)
elif initial_language != "auto" and initial_language != detected_language:
logger.info(
f"Language detection for job {job_id}: "
f"initial={initial_language}, detected={detected_language}"
)
# Use detected language for output storage
source_language = detected_language
logger.info(f"Using detected language '{source_language}' for job {job_id}")
# Upload VTT files to GCS using detected language
captions_gcs_uri = await upload_vtt_to_gcs(
ai_result["captions_vtt"],
f"{job_id}/{source_language}/captions.vtt"
)
ad_gcs_uri = await upload_vtt_to_gcs(
ai_result["audio_description_vtt"],
f"{job_id}/{source_language}/ad.vtt"
)
# Upload SDH VTT if generated
sdh_gcs_uri = None
if sdh_requested and ai_result.get("sdh_captions_vtt"):
sdh_gcs_uri = await upload_vtt_to_gcs(
ai_result["sdh_captions_vtt"],
f"{job_id}/{source_language}/sdh_captions.vtt"
)
# Generate descriptive transcript (WCAG 2.1 1.2.1)
transcript_gcs_uri = None
try:
from ..services.descriptive_transcript import generate_descriptive_transcript
transcript_text = generate_descriptive_transcript(
ai_result["captions_vtt"],
ai_result["audio_description_vtt"]
)
if transcript_text:
transcript_gcs_uri = await upload_vtt_to_gcs(
transcript_text,
f"{job_id}/{source_language}/descriptive_transcript.txt"
)
logger.info(f"Generated descriptive transcript for job {job_id}, language {source_language}")
except Exception as e:
logger.warning(f"Failed to generate descriptive transcript for job {job_id}: {e}")
source_lang_output = {
"captions_vtt_gcs": captions_gcs_uri,
"ad_vtt_gcs": ad_gcs_uri,
}
if sdh_gcs_uri:
source_lang_output["sdh_captions_vtt_gcs"] = sdh_gcs_uri
if transcript_gcs_uri:
source_lang_output["descriptive_transcript_gcs"] = transcript_gcs_uri
# Update job with AI results, detected language, and outputs
# Set status to TRANSLATING to trigger translation pipeline before QC
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"status": JobStatus.TRANSLATING.value,
"source.language": source_language, # Update with detected language
"source.detected_language": detected_language,
"ai.ingestion_json": ai_result,
"ai.confidence": ai_result["confidence"],
f"outputs.{source_language}": source_lang_output,
"updated_at": datetime.utcnow()
},
"$push": {
"review.history": {
"at": datetime.utcnow(),
"status": JobStatus.TRANSLATING.value,
"by": "system"
}
}
}
)
# Broadcast status update
broadcast_status_update(
job_id,
JobStatus.TRANSLATING.value,
job_title=job_title,
message=f"{job_title} AI processing complete, starting translation pipeline"
)
logger.info(f"AI processing complete for job {job_id}, triggering translation pipeline")
# Trigger translation and synthesis pipeline
# This will process all translations, TTS, and accessible video BEFORE QC review
from .translate_and_synthesize import translate_and_synthesize_task
translate_and_synthesize_task.delay(job_id)
finally:
# Clean up temp file
os.unlink(temp_path)
except Exception as e:
logger.error(f"Ingestion and AI processing failed for job {job_id}: {e}")
# Update job with error
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"error": {
"type": "ingestion_failure",
"message": str(e),
"timestamp": datetime.utcnow().isoformat()
},
"updated_at": datetime.utcnow()
}
}
)
raise
finally:
client.close()
async def _get_video_duration(video_path: str) -> float:
"""Get video duration using ffprobe"""
try:
probe = ffmpeg.probe(video_path)
duration = float(probe['streams'][0]['duration'])
return duration
except Exception as e:
logger.warning(f"Could not determine video duration: {e}")
return 0.0