video-accessibility/backend/app/tasks/notify.py
Vadym Samoilenko 5d8d992e5a feat(briefs+notify+downloads): fix projects dropdown, add assignee, expand languages, fix PM email, add Download All
- NewBrief: use useAllProjects() (was useProjects('') which never fired)
- NewBrief: expand languages from 12 to 52 options with region variants
- NewBrief: add Assign To dropdown from org members
- Backend: add GET /clients/all-projects endpoint for cross-client project listing
- Backend: add assignee_id to JobBriefCreate/JobBriefResponse models + routes
- notify.py: send completion email to PMs (pm_client_ids) not client user — fixes email never arriving (was looking up users._id by client entity ID)
- Downloads: add Download All button that fetches all files sequentially

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 21:47:28 +01:00

198 lines
8.3 KiB
Python

import asyncio
from datetime import datetime
from celery import Task
from motor.motor_asyncio import AsyncIOMotorClient
from ..core.config import settings
from ..core.logging import get_logger
from ..models.audit_log import AuditAction, AuditLogCreate
from ..services.emailer import email_service
from ..services.gcs import get_signed_download_url
from . import celery_app
logger = get_logger(__name__)
class NotifyClientTask(Task):
"""Async task for client notifications"""
def __call__(self, *args, **kwargs):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self.run_async(*args, **kwargs))
finally:
loop.close()
async def run_async(self, job_id: str):
"""
Pipeline 3: Client Notification
Triggered when job status changes to 'completed'
"""
logger.info(f"Starting client notification for job {job_id}")
# Connect to MongoDB
client = AsyncIOMotorClient(settings.mongodb_uri)
db = client[settings.mongodb_db]
try:
# Get job and client details
job_doc = await db.jobs.find_one({"_id": job_id})
if not job_doc:
logger.error(f"Job {job_id} not found in database")
return # Don't retry for missing jobs
if job_doc["status"] != "completed":
logger.warning(f"Job {job_id} not in completed status (current: {job_doc['status']}), skipping notification")
return
# Look up PMs assigned to this client — they are the internal recipients
client_id = job_doc["client_id"]
logger.info(f"Looking up PMs for client {client_id} for job {job_id}")
pm_docs = await db.users.find(
{"pm_client_ids": client_id},
{"_id": 1, "email": 1, "full_name": 1},
).to_list(None)
if not pm_docs:
# Fallback: any PRODUCTION/ADMIN user
pm_docs = await db.users.find(
{"role": {"$in": ["admin", "production"]}},
{"_id": 1, "email": 1, "full_name": 1},
).limit(5).to_list(None)
if not pm_docs:
logger.error(f"No PM or admin found to notify for job {job_id} (client {client_id})")
return
recipient_emails = [u["email"] for u in pm_docs if u.get("email")]
logger.info(f"Sending completion notification to {len(recipient_emails)} PMs: {recipient_emails}")
# Generate signed URLs for all outputs
download_links = {}
outputs = job_doc.get("outputs", {})
for language, lang_output in outputs.items():
if not isinstance(lang_output, dict):
continue
lang_downloads = {}
# Captions VTT
if "captions_vtt_gcs" in lang_output:
blob_path = lang_output["captions_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
try:
signed_url = await get_signed_download_url(blob_path, settings.upload_signed_url_ttl_hours)
lang_downloads["captions_vtt"] = signed_url
except Exception as e:
logger.warning(f"Failed to generate signed URL for captions {language}: {e}")
# Audio Description VTT
if "ad_vtt_gcs" in lang_output:
blob_path = lang_output["ad_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
try:
signed_url = await get_signed_download_url(blob_path, settings.upload_signed_url_ttl_hours)
lang_downloads["audio_description_vtt"] = signed_url
except Exception as e:
logger.warning(f"Failed to generate signed URL for AD VTT {language}: {e}")
# Audio Description MP3
if "ad_mp3_gcs" in lang_output:
blob_path = lang_output["ad_mp3_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
try:
signed_url = await get_signed_download_url(blob_path, settings.upload_signed_url_ttl_hours)
lang_downloads["audio_description_mp3"] = signed_url
except Exception as e:
logger.warning(f"Failed to generate signed URL for AD MP3 {language}: {e}")
if lang_downloads:
download_links[language] = lang_downloads
for recipient_email in recipient_emails:
try:
success = await email_service.send_completion_email(
recipient_email=recipient_email,
job_title=job_doc["title"],
download_links=download_links
)
if success:
logger.info(f"Sent completion email to {recipient_email} for job {job_id}")
else:
logger.warning(f"Email service returned failure for {recipient_email}, job {job_id}")
except Exception as email_error:
error_msg = str(email_error)
logger.error(f"Email error for {recipient_email}, job {job_id}: {error_msg}")
if "401" in error_msg or "Unauthorized" in error_msg or "authentication" in error_msg.lower():
logger.warning("Email auth failed — non-retryable config error")
else:
raise ValueError(f"Email sending failed: {error_msg}") from email_error
# Log audit entry (regardless of email status)
audit_log = AuditLogCreate(
action=AuditAction.JOB_STATUS_CHANGE,
description=f"Job {job_id} completed - client notification processed",
resource_type="job",
resource_id=job_id,
resource_name=job_doc["title"],
details={
"recipients": recipient_emails,
"download_count": sum(len(files) for files in download_links.values()),
"status": "completed"
}
)
await db.audit_logs.insert_one(audit_log.model_dump())
logger.info(f"Successfully completed notification processing for job {job_id}")
except Exception as e:
error_msg = str(e)
logger.error(f"Client notification failed for job {job_id}: {error_msg}")
# Update job with error
try:
await db.jobs.update_one(
{"_id": job_id},
{
"$set": {
"error": {
"type": "notification_failure",
"message": error_msg,
"timestamp": datetime.utcnow().isoformat()
},
"updated_at": datetime.utcnow()
}
}
)
except Exception as update_error:
logger.error(f"Failed to update job {job_id} with error: {update_error}")
# Only retry for transient errors, not configuration or data errors
non_retryable_patterns = [
"not found",
"401",
"unauthorized",
"authentication",
"failed to send completion email"
]
should_not_retry = any(pattern in error_msg.lower() for pattern in non_retryable_patterns)
if should_not_retry:
logger.info(f"Skipping retry for job {job_id} due to non-retryable error: {error_msg}")
return
else:
# This might be a transient error, let it retry
logger.info(f"Allowing retry for job {job_id} due to potentially transient error: {error_msg}")
raise
finally:
client.close()
# Register the task with manual retry control
@celery_app.task(bind=True, base=NotifyClientTask, max_retries=3, default_retry_delay=60)
def notify_client_task(self, job_id: str):
"""Celery task wrapper for client notification"""
# This method is called by NotifyClientTask.__call__
pass