video-accessibility/backend/app/tasks/notify.py
Vadym Samoilenko c7a6f13b10 feat(workflow): PR-2 workflow blockers — PM/Production dashboards, two-stage QC, role routing
Changes:
- Dashboard: add project_manager case (final review / QC counts / new job widgets)
  and production case (AI pipeline / failures widgets)
- Sidebar: add project_manager to Final Review and Audit Log nav items;
  live badge counts for QC Queue (pending_qc) and Final Review (pending_final_review)
- App.tsx: add project_manager to Final Review and Audit Log RoleGates (W-10, PM-18)
- Login: role-based redirect after login — linguist/reviewer → /qc/queue, others → /
- language_qc._assert_can_approve: enforce two-stage QC; remove linguist self-approve
  fallback; require reviewer assignment + submitted_for_review_at (W-6)
- routes_jobs.complete_job: allow project_manager to complete jobs (W-9)
- notify.py: re-enable email notifications (W-7)
- Fix 400 on cue save: treat empty-string audio_description_vtt/captions_vtt as absent
  both in backend (truthy check) and frontend (|| undefined) — root cause was adVtt
  initialising to '' when job has no AD track

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 18:18:24 +01:00

206 lines
8.7 KiB
Python

import asyncio
from datetime import datetime
from bson import ObjectId
from celery import Task
from celery.exceptions import Retry
from motor.motor_asyncio import AsyncIOMotorClient
from ..core.config import settings
from ..core.logging import get_logger
from ..models.audit_log import AuditLogCreate, AuditAction
from ..services.emailer import email_service
from ..services.gcs import get_signed_download_url
from . import celery_app
logger = get_logger(__name__)
class NotifyClientTask(Task):
"""Async task for client notifications"""
def __call__(self, *args, **kwargs):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self.run_async(*args, **kwargs))
finally:
loop.close()
async def run_async(self, job_id: str):
"""
Pipeline 3: Client Notification
Triggered when job status changes to 'completed'
"""
logger.info(f"Starting client notification for job {job_id}")
# Connect to MongoDB
client = AsyncIOMotorClient(settings.mongodb_uri)
db = client[settings.mongodb_db]
try:
# Get job and client details
job_doc = await db.jobs.find_one({"_id": job_id})
if not job_doc:
logger.error(f"Job {job_id} not found in database")
return # Don't retry for missing jobs
if job_doc["status"] != "completed":
logger.warning(f"Job {job_id} not in completed status (current: {job_doc['status']}), skipping notification")
return
# Get client ID and ensure proper ObjectId format
client_id = job_doc["client_id"]
logger.info(f"Looking up client {client_id} for job {job_id}")
# Try looking up client by string ID first
client_doc = await db.users.find_one({"_id": client_id})
if not client_doc:
# Try as ObjectId if string lookup failed
try:
client_doc = await db.users.find_one({"_id": ObjectId(client_id)})
except:
pass # Invalid ObjectId format
if not client_doc:
logger.error(f"Client {client_id} not found in database for job {job_id}")
# Don't retry for missing users - this is likely a data issue
return
# Generate signed URLs for all outputs
download_links = {}
outputs = job_doc.get("outputs", {})
for language, lang_output in outputs.items():
if not isinstance(lang_output, dict):
continue
lang_downloads = {}
# Captions VTT
if "captions_vtt_gcs" in lang_output:
blob_path = lang_output["captions_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
try:
signed_url = await get_signed_download_url(blob_path, 24)
lang_downloads["captions_vtt"] = signed_url
except Exception as e:
logger.warning(f"Failed to generate signed URL for captions {language}: {e}")
# Audio Description VTT
if "ad_vtt_gcs" in lang_output:
blob_path = lang_output["ad_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
try:
signed_url = await get_signed_download_url(blob_path, 24)
lang_downloads["audio_description_vtt"] = signed_url
except Exception as e:
logger.warning(f"Failed to generate signed URL for AD VTT {language}: {e}")
# Audio Description MP3
if "ad_mp3_gcs" in lang_output:
blob_path = lang_output["ad_mp3_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
try:
signed_url = await get_signed_download_url(blob_path, 24)
lang_downloads["audio_description_mp3"] = signed_url
except Exception as e:
logger.warning(f"Failed to generate signed URL for AD MP3 {language}: {e}")
if lang_downloads:
download_links[language] = lang_downloads
email_enabled = True
if email_enabled:
try:
success = await email_service.send_completion_email(
recipient_email=client_doc["email"],
job_title=job_doc["title"],
download_links=download_links
)
if success:
logger.info(f"Successfully sent completion email to {client_doc['email']} for job {job_id}")
else:
logger.warning(f"Email service returned failure for job {job_id} - treating as non-retryable")
except Exception as email_error:
error_msg = str(email_error)
logger.error(f"Email sending exception for job {job_id}: {error_msg}")
# Check if this is an authentication error (non-retryable)
if "401" in error_msg or "Unauthorized" in error_msg or "authentication" in error_msg.lower():
logger.warning(f"Email authentication failed for job {job_id} - treating as non-retryable configuration error")
else:
# Other email errors might be transient
raise ValueError(f"Email sending failed: {error_msg}")
else:
logger.info(f"Email notifications are currently disabled - skipping email for job {job_id}")
logger.info(f"Would have sent completion email to {client_doc['email']} with {sum(len(files) for files in download_links.values())} download links")
# Log audit entry (regardless of email status)
audit_log = AuditLogCreate(
action=AuditAction.JOB_STATUS_CHANGE,
description=f"Job {job_id} completed - client notification processed",
resource_type="job",
resource_id=job_id,
resource_name=job_doc["title"],
details={
"email": client_doc["email"],
"download_count": sum(len(files) for files in download_links.values()),
"email_sent": email_enabled,
"status": "completed"
}
)
await db.audit_logs.insert_one(audit_log.model_dump())
logger.info(f"Successfully completed notification processing for job {job_id}")
except Exception as e:
error_msg = str(e)
logger.error(f"Client notification failed for job {job_id}: {error_msg}")
# Update job with error
try:
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"error": {
"type": "notification_failure",
"message": error_msg,
"timestamp": datetime.utcnow().isoformat()
},
"updated_at": datetime.utcnow()
}
}
)
except Exception as update_error:
logger.error(f"Failed to update job {job_id} with error: {update_error}")
# Only retry for transient errors, not configuration or data errors
non_retryable_patterns = [
"not found",
"401",
"unauthorized",
"authentication",
"failed to send completion email"
]
should_not_retry = any(pattern in error_msg.lower() for pattern in non_retryable_patterns)
if should_not_retry:
logger.info(f"Skipping retry for job {job_id} due to non-retryable error: {error_msg}")
return
else:
# This might be a transient error, let it retry
logger.info(f"Allowing retry for job {job_id} due to potentially transient error: {error_msg}")
raise
finally:
client.close()
# Register the task with manual retry control
@celery_app.task(bind=True, base=NotifyClientTask, max_retries=3, default_retry_delay=60)
def notify_client_task(self, job_id: str):
"""Celery task wrapper for client notification"""
# This method is called by NotifyClientTask.__call__
pass