- NewBrief: use useAllProjects() (was useProjects('') which never fired)
- NewBrief: expand languages from 12 to 52 options with region variants
- NewBrief: add Assign To dropdown from org members
- Backend: add GET /clients/all-projects endpoint for cross-client project listing
- Backend: add assignee_id to JobBriefCreate/JobBriefResponse models + routes
- notify.py: send completion email to PMs (pm_client_ids) not client user — fixes email never arriving (was looking up users._id by client entity ID)
- Downloads: add Download All button that fetches all files sequentially
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
198 lines
8.3 KiB
Python
198 lines
8.3 KiB
Python
import asyncio
|
|
from datetime import datetime
|
|
|
|
from celery import Task
|
|
from motor.motor_asyncio import AsyncIOMotorClient
|
|
|
|
from ..core.config import settings
|
|
from ..core.logging import get_logger
|
|
from ..models.audit_log import AuditAction, AuditLogCreate
|
|
from ..services.emailer import email_service
|
|
from ..services.gcs import get_signed_download_url
|
|
from . import celery_app
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class NotifyClientTask(Task):
|
|
"""Async task for client notifications"""
|
|
def __call__(self, *args, **kwargs):
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
return loop.run_until_complete(self.run_async(*args, **kwargs))
|
|
finally:
|
|
loop.close()
|
|
|
|
async def run_async(self, job_id: str):
|
|
"""
|
|
Pipeline 3: Client Notification
|
|
Triggered when job status changes to 'completed'
|
|
"""
|
|
logger.info(f"Starting client notification for job {job_id}")
|
|
|
|
# Connect to MongoDB
|
|
client = AsyncIOMotorClient(settings.mongodb_uri)
|
|
db = client[settings.mongodb_db]
|
|
|
|
try:
|
|
# Get job and client details
|
|
job_doc = await db.jobs.find_one({"_id": job_id})
|
|
if not job_doc:
|
|
logger.error(f"Job {job_id} not found in database")
|
|
return # Don't retry for missing jobs
|
|
|
|
if job_doc["status"] != "completed":
|
|
logger.warning(f"Job {job_id} not in completed status (current: {job_doc['status']}), skipping notification")
|
|
return
|
|
|
|
# Look up PMs assigned to this client — they are the internal recipients
|
|
client_id = job_doc["client_id"]
|
|
logger.info(f"Looking up PMs for client {client_id} for job {job_id}")
|
|
|
|
pm_docs = await db.users.find(
|
|
{"pm_client_ids": client_id},
|
|
{"_id": 1, "email": 1, "full_name": 1},
|
|
).to_list(None)
|
|
|
|
if not pm_docs:
|
|
# Fallback: any PRODUCTION/ADMIN user
|
|
pm_docs = await db.users.find(
|
|
{"role": {"$in": ["admin", "production"]}},
|
|
{"_id": 1, "email": 1, "full_name": 1},
|
|
).limit(5).to_list(None)
|
|
|
|
if not pm_docs:
|
|
logger.error(f"No PM or admin found to notify for job {job_id} (client {client_id})")
|
|
return
|
|
|
|
recipient_emails = [u["email"] for u in pm_docs if u.get("email")]
|
|
logger.info(f"Sending completion notification to {len(recipient_emails)} PMs: {recipient_emails}")
|
|
|
|
# Generate signed URLs for all outputs
|
|
download_links = {}
|
|
outputs = job_doc.get("outputs", {})
|
|
|
|
for language, lang_output in outputs.items():
|
|
if not isinstance(lang_output, dict):
|
|
continue
|
|
|
|
lang_downloads = {}
|
|
|
|
# Captions VTT
|
|
if "captions_vtt_gcs" in lang_output:
|
|
blob_path = lang_output["captions_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
|
|
try:
|
|
signed_url = await get_signed_download_url(blob_path, settings.upload_signed_url_ttl_hours)
|
|
lang_downloads["captions_vtt"] = signed_url
|
|
except Exception as e:
|
|
logger.warning(f"Failed to generate signed URL for captions {language}: {e}")
|
|
|
|
# Audio Description VTT
|
|
if "ad_vtt_gcs" in lang_output:
|
|
blob_path = lang_output["ad_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
|
|
try:
|
|
signed_url = await get_signed_download_url(blob_path, settings.upload_signed_url_ttl_hours)
|
|
lang_downloads["audio_description_vtt"] = signed_url
|
|
except Exception as e:
|
|
logger.warning(f"Failed to generate signed URL for AD VTT {language}: {e}")
|
|
|
|
# Audio Description MP3
|
|
if "ad_mp3_gcs" in lang_output:
|
|
blob_path = lang_output["ad_mp3_gcs"].replace(f"gs://{settings.gcs_bucket}/", "")
|
|
try:
|
|
signed_url = await get_signed_download_url(blob_path, settings.upload_signed_url_ttl_hours)
|
|
lang_downloads["audio_description_mp3"] = signed_url
|
|
except Exception as e:
|
|
logger.warning(f"Failed to generate signed URL for AD MP3 {language}: {e}")
|
|
|
|
if lang_downloads:
|
|
download_links[language] = lang_downloads
|
|
|
|
for recipient_email in recipient_emails:
|
|
try:
|
|
success = await email_service.send_completion_email(
|
|
recipient_email=recipient_email,
|
|
job_title=job_doc["title"],
|
|
download_links=download_links
|
|
)
|
|
if success:
|
|
logger.info(f"Sent completion email to {recipient_email} for job {job_id}")
|
|
else:
|
|
logger.warning(f"Email service returned failure for {recipient_email}, job {job_id}")
|
|
except Exception as email_error:
|
|
error_msg = str(email_error)
|
|
logger.error(f"Email error for {recipient_email}, job {job_id}: {error_msg}")
|
|
if "401" in error_msg or "Unauthorized" in error_msg or "authentication" in error_msg.lower():
|
|
logger.warning("Email auth failed — non-retryable config error")
|
|
else:
|
|
raise ValueError(f"Email sending failed: {error_msg}") from email_error
|
|
|
|
# Log audit entry (regardless of email status)
|
|
audit_log = AuditLogCreate(
|
|
action=AuditAction.JOB_STATUS_CHANGE,
|
|
description=f"Job {job_id} completed - client notification processed",
|
|
resource_type="job",
|
|
resource_id=job_id,
|
|
resource_name=job_doc["title"],
|
|
details={
|
|
"recipients": recipient_emails,
|
|
"download_count": sum(len(files) for files in download_links.values()),
|
|
"status": "completed"
|
|
}
|
|
)
|
|
await db.audit_logs.insert_one(audit_log.model_dump())
|
|
|
|
logger.info(f"Successfully completed notification processing for job {job_id}")
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
logger.error(f"Client notification failed for job {job_id}: {error_msg}")
|
|
|
|
# Update job with error
|
|
try:
|
|
await db.jobs.update_one(
|
|
{"_id": job_id},
|
|
{
|
|
"$set": {
|
|
"error": {
|
|
"type": "notification_failure",
|
|
"message": error_msg,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
},
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
}
|
|
)
|
|
except Exception as update_error:
|
|
logger.error(f"Failed to update job {job_id} with error: {update_error}")
|
|
|
|
# Only retry for transient errors, not configuration or data errors
|
|
non_retryable_patterns = [
|
|
"not found",
|
|
"401",
|
|
"unauthorized",
|
|
"authentication",
|
|
"failed to send completion email"
|
|
]
|
|
|
|
should_not_retry = any(pattern in error_msg.lower() for pattern in non_retryable_patterns)
|
|
|
|
if should_not_retry:
|
|
logger.info(f"Skipping retry for job {job_id} due to non-retryable error: {error_msg}")
|
|
return
|
|
else:
|
|
# This might be a transient error, let it retry
|
|
logger.info(f"Allowing retry for job {job_id} due to potentially transient error: {error_msg}")
|
|
raise
|
|
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
# Register the task with manual retry control
|
|
@celery_app.task(bind=True, base=NotifyClientTask, max_retries=3, default_retry_delay=60)
|
|
def notify_client_task(self, job_id: str):
|
|
"""Celery task wrapper for client notification"""
|
|
# This method is called by NotifyClientTask.__call__
|
|
pass
|