from celery import Celery from celery.signals import task_failure, task_success, task_retry from ..core.config import settings from ..core.logging import get_logger logger = get_logger(__name__) celery_app = Celery( "accessible-video-tasks", broker=settings.redis_url, backend=settings.redis_url, ) celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="UTC", enable_utc=True, task_track_started=True, task_time_limit=30 * 60, # 30 minutes default task_soft_time_limit=25 * 60, # 25 minutes default worker_prefetch_multiplier=1, task_acks_late=True, worker_max_tasks_per_child=1000, task_routes={ "app.tasks.ingest_and_ai.*": {"queue": "ingest"}, "app.tasks.translate_and_synthesize.*": {"queue": "default"}, "app.tasks.tts_synthesis.synthesize_cue_task": {"queue": "tts"}, "app.tasks.render_accessible_video.*": {"queue": "render"}, "app.tasks.rerender_accessible_video.*": {"queue": "render"}, "app.tasks.notify.*": {"queue": "notify"}, "app.tasks.ffmpeg_operations.*": {"queue": "ffmpeg"}, "app.tasks.whisper_transcribe.*": {"queue": "whisper"}, }, task_default_queue="default", task_create_missing_queues=True, # Task-specific timeout overrides task_annotations={}, ) # Add a simple test task for debugging @celery_app.task def test_task(message="test"): """Simple test task to verify worker connectivity""" logger.info(f"๐Ÿงช TEST TASK EXECUTED: {message}") print(f"๐Ÿงช TEST TASK EXECUTED: {message}") return f"Test task completed: {message}" # Add task received handler for debugging from celery.signals import task_received, task_prerun, worker_ready import threading import time @worker_ready.connect def worker_ready_handler(sender=None, **kwargs): """Log when worker is ready and start heartbeat""" logger.info(f"๐ŸŸข WORKER READY: {sender}") print(f"๐ŸŸข WORKER READY: {sender} - Worker is online and listening!") # Change stream monitoring has been removed - workflow triggering now handled directly by API endpoints logger.info("Workflow triggering handled directly by API endpoints - no change stream monitoring needed") @task_received.connect def task_received_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retries=None, eta=None, **kwds): """Log when a task is received by the worker""" logger.info(f"๐ŸŽฏ TASK RECEIVED: {task} [{task_id}] with args: {args}") print(f"๐ŸŽฏ TASK RECEIVED: {task} [{task_id}] - Worker is picking up the task!") @task_prerun.connect def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds): """Log when a task starts executing""" logger.info(f"๐Ÿš€ TASK STARTING: {task} [{task_id}]") print(f"๐Ÿš€ TASK STARTING: {task} [{task_id}] - About to execute!") # Celery signal handlers for centralized logging @task_failure.connect def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwargs): """Log task failures to centralized logging""" exception_type = exception.__class__.__name__ if exception else "Unknown" exception_msg = str(exception) if exception else "No details" # Log comprehensive error details error_details = f""" === CELERY TASK FAILURE === Task: {sender} Task ID: {task_id} Exception Type: {exception_type} Exception Message: {exception_msg} Full Traceback: {traceback} Additional Info: {einfo} ============================= """ logger.error(error_details) # Also log to stdout for immediate visibility print(f"๐Ÿšจ TASK FAILURE: {sender} [{task_id}] - {exception_type}: {exception_msg}") if traceback: print(f"Full traceback:\n{traceback}") @task_success.connect def task_success_handler(sender=None, result=None, **kwargs): """Log task success""" result_str = str(result)[:100] if result else "No result" logger.info(f"Celery task completed: {sender} - Result: {result_str}") @task_retry.connect def task_retry_handler(sender=None, task_id=None, reason=None, einfo=None, **kwargs): """Log task retries""" reason_str = str(reason) if reason else "No reason provided" logger.warning(f"Celery task retry: {sender} [{task_id}] - Reason: {reason_str}") def import_task_modules(): """Import all task modules to register them with Celery""" try: from . import ingest_and_ai # noqa: E402, F401 from . import translate_and_synthesize # noqa: E402, F401 from . import tts_synthesis # noqa: E402, F401 from . import render_accessible_video # noqa: E402, F401 from . import rerender_accessible_video # noqa: E402, F401 from . import notify # noqa: E402, F401 from . import ffmpeg_operations # noqa: E402, F401 from . import whisper_transcribe # noqa: E402, F401 logger.info("Successfully imported all task modules") except Exception as e: logger.error(f"Error importing task modules: {e}") # Import task modules at startup import_task_modules()