import asyncio from datetime import datetime from bson import ObjectId from celery import Task from celery.exceptions import Retry from motor.motor_asyncio import AsyncIOMotorClient from ..core.config import settings from ..core.logging import get_logger from ..models.audit_log import AuditLogCreate, AuditAction from ..services.emailer import email_service from ..services.gcs import get_signed_download_url from . import celery_app logger = get_logger(__name__) class NotifyClientTask(Task): """Async task for client notifications""" def __call__(self, *args, **kwargs): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(self.run_async(*args, **kwargs)) finally: loop.close() async def run_async(self, job_id: str): """ Pipeline 3: Client Notification Triggered when job status changes to 'completed' """ logger.info(f"Starting client notification for job {job_id}") # Connect to MongoDB client = AsyncIOMotorClient(settings.mongodb_uri) db = client[settings.mongodb_db] try: # Get job and client details job_doc = await db.jobs.find_one({"_id": job_id}) if not job_doc: logger.error(f"Job {job_id} not found in database") return # Don't retry for missing jobs if job_doc["status"] != "completed": logger.warning(f"Job {job_id} not in completed status (current: {job_doc['status']}), skipping notification") return # Get client ID and ensure proper ObjectId format client_id = job_doc["client_id"] logger.info(f"Looking up client {client_id} for job {job_id}") # Try looking up client by string ID first client_doc = await db.users.find_one({"_id": client_id}) if not client_doc: # Try as ObjectId if string lookup failed try: client_doc = await db.users.find_one({"_id": ObjectId(client_id)}) except: pass # Invalid ObjectId format if not client_doc: logger.error(f"Client {client_id} not found in database for job {job_id}") # Don't retry for missing users - this is likely a data issue return # Generate signed URLs for all outputs download_links = {} outputs = job_doc.get("outputs", {}) for language, lang_output in outputs.items(): if not isinstance(lang_output, dict): continue lang_downloads = {} # Captions VTT if "captions_vtt_gcs" in lang_output: blob_path = lang_output["captions_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") try: signed_url = await get_signed_download_url(blob_path, 24) lang_downloads["captions_vtt"] = signed_url except Exception as e: logger.warning(f"Failed to generate signed URL for captions {language}: {e}") # Audio Description VTT if "ad_vtt_gcs" in lang_output: blob_path = lang_output["ad_vtt_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") try: signed_url = await get_signed_download_url(blob_path, 24) lang_downloads["audio_description_vtt"] = signed_url except Exception as e: logger.warning(f"Failed to generate signed URL for AD VTT {language}: {e}") # Audio Description MP3 if "ad_mp3_gcs" in lang_output: blob_path = lang_output["ad_mp3_gcs"].replace(f"gs://{settings.gcs_bucket}/", "") try: signed_url = await get_signed_download_url(blob_path, 24) lang_downloads["audio_description_mp3"] = signed_url except Exception as e: logger.warning(f"Failed to generate signed URL for AD MP3 {language}: {e}") if lang_downloads: download_links[language] = lang_downloads # Send completion email (temporarily disabled) # TODO: Re-enable emails once authentication is configured email_enabled = False # Set to True to re-enable emails if email_enabled: try: success = await email_service.send_completion_email( recipient_email=client_doc["email"], job_title=job_doc["title"], download_links=download_links ) if success: logger.info(f"Successfully sent completion email to {client_doc['email']} for job {job_id}") else: logger.warning(f"Email service returned failure for job {job_id} - treating as non-retryable") except Exception as email_error: error_msg = str(email_error) logger.error(f"Email sending exception for job {job_id}: {error_msg}") # Check if this is an authentication error (non-retryable) if "401" in error_msg or "Unauthorized" in error_msg or "authentication" in error_msg.lower(): logger.warning(f"Email authentication failed for job {job_id} - treating as non-retryable configuration error") else: # Other email errors might be transient raise ValueError(f"Email sending failed: {error_msg}") else: logger.info(f"Email notifications are currently disabled - skipping email for job {job_id}") logger.info(f"Would have sent completion email to {client_doc['email']} with {sum(len(files) for files in download_links.values())} download links") # Log audit entry (regardless of email status) audit_log = AuditLogCreate( action=AuditAction.JOB_STATUS_CHANGE, description=f"Job {job_id} completed - client notification processed", resource_type="job", resource_id=job_id, resource_name=job_doc["title"], details={ "email": client_doc["email"], "download_count": sum(len(files) for files in download_links.values()), "email_sent": email_enabled, "status": "completed" } ) await db.audit_logs.insert_one(audit_log.model_dump()) logger.info(f"Successfully completed notification processing for job {job_id}") except Exception as e: error_msg = str(e) logger.error(f"Client notification failed for job {job_id}: {error_msg}") # Update job with error try: await db.jobs.update_one( {"_id": job_id}, { "$set": { "error": { "type": "notification_failure", "message": error_msg, "timestamp": datetime.utcnow().isoformat() }, "updated_at": datetime.utcnow() } } ) except Exception as update_error: logger.error(f"Failed to update job {job_id} with error: {update_error}") # Only retry for transient errors, not configuration or data errors non_retryable_patterns = [ "not found", "401", "unauthorized", "authentication", "failed to send completion email" ] should_not_retry = any(pattern in error_msg.lower() for pattern in non_retryable_patterns) if should_not_retry: logger.info(f"Skipping retry for job {job_id} due to non-retryable error: {error_msg}") return else: # This might be a transient error, let it retry logger.info(f"Allowing retry for job {job_id} due to potentially transient error: {error_msg}") raise finally: client.close() # Register the task with manual retry control @celery_app.task(bind=True, base=NotifyClientTask, max_retries=3, default_retry_delay=60) def notify_client_task(self, job_id: str): """Celery task wrapper for client notification""" # This method is called by NotifyClientTask.__call__ pass