video-accessibility/backend/app/tasks/__init__.py

126 lines
4.4 KiB
Python

from celery import Celery
from celery.signals import task_failure, task_success, task_retry
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger(__name__)
celery_app = Celery(
"accessible-video-tasks",
broker=settings.redis_url,
backend=settings.redis_url,
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=30 * 60, # 30 minutes default
task_soft_time_limit=25 * 60, # 25 minutes default
worker_prefetch_multiplier=1,
task_acks_late=True,
worker_max_tasks_per_child=1000,
task_routes={
"app.tasks.ingest_and_ai.*": {"queue": "ingest"},
"app.tasks.translate_and_synthesize.*": {"queue": "default"},
"app.tasks.notify.*": {"queue": "notify"},
},
task_default_queue="default",
task_create_missing_queues=True,
# Task-specific timeout overrides
task_annotations={},
)
# Add a simple test task for debugging
@celery_app.task
def test_task(message="test"):
"""Simple test task to verify worker connectivity"""
logger.info(f"🧪 TEST TASK EXECUTED: {message}")
print(f"🧪 TEST TASK EXECUTED: {message}")
return f"Test task completed: {message}"
# Add task received handler for debugging
from celery.signals import task_received, task_prerun, worker_ready
import threading
import time
@worker_ready.connect
def worker_ready_handler(sender=None, **kwargs):
"""Log when worker is ready and start heartbeat"""
logger.info(f"🟢 WORKER READY: {sender}")
print(f"🟢 WORKER READY: {sender} - Worker is online and listening!")
# Change stream monitoring has been removed - workflow triggering now handled directly by API endpoints
logger.info("Workflow triggering handled directly by API endpoints - no change stream monitoring needed")
@task_received.connect
def task_received_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retries=None, eta=None, **kwds):
"""Log when a task is received by the worker"""
logger.info(f"🎯 TASK RECEIVED: {task} [{task_id}] with args: {args}")
print(f"🎯 TASK RECEIVED: {task} [{task_id}] - Worker is picking up the task!")
@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
"""Log when a task starts executing"""
logger.info(f"🚀 TASK STARTING: {task} [{task_id}]")
print(f"🚀 TASK STARTING: {task} [{task_id}] - About to execute!")
# Celery signal handlers for centralized logging
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwargs):
"""Log task failures to centralized logging"""
exception_type = exception.__class__.__name__ if exception else "Unknown"
exception_msg = str(exception) if exception else "No details"
# Log comprehensive error details
error_details = f"""
=== CELERY TASK FAILURE ===
Task: {sender}
Task ID: {task_id}
Exception Type: {exception_type}
Exception Message: {exception_msg}
Full Traceback:
{traceback}
Additional Info: {einfo}
=============================
"""
logger.error(error_details)
# Also log to stdout for immediate visibility
print(f"🚨 TASK FAILURE: {sender} [{task_id}] - {exception_type}: {exception_msg}")
if traceback:
print(f"Full traceback:\n{traceback}")
@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):
"""Log task success"""
result_str = str(result)[:100] if result else "No result"
logger.info(f"Celery task completed: {sender} - Result: {result_str}")
@task_retry.connect
def task_retry_handler(sender=None, task_id=None, reason=None, einfo=None, **kwargs):
"""Log task retries"""
reason_str = str(reason) if reason else "No reason provided"
logger.warning(f"Celery task retry: {sender} [{task_id}] - Reason: {reason_str}")
def import_task_modules():
"""Import all task modules to register them with Celery"""
try:
from . import ingest_and_ai # noqa: E402, F401
from . import translate_and_synthesize # noqa: E402, F401
from . import notify # noqa: E402, F401
logger.info("Successfully imported all task modules")
except Exception as e:
logger.error(f"Error importing task modules: {e}")
# Import task modules at startup
import_task_modules()