Use NullPool for Celery workers so connections are opened/closed per task instead of accumulating in per-process pools. Add worker_process_init signal to dispose inherited engines on fork. Keep QueuePool for the web service. Increase PostgreSQL max_connections to 200 as a safety net. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
61 lines
1.8 KiB
Python
61 lines
1.8 KiB
Python
"""Celery application configuration and Beat schedule."""
|
|
|
|
import os
|
|
import sys
|
|
|
|
from celery import Celery
|
|
from celery.schedules import crontab
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from app.config import settings
|
|
from celery.signals import worker_process_init
|
|
|
|
|
|
@worker_process_init.connect
|
|
def init_worker_process(**kwargs):
|
|
"""Dispose inherited engine in forked workers to prevent shared socket corruption."""
|
|
from app.database import engine
|
|
engine.dispose()
|
|
|
|
|
|
# Create Celery app
|
|
app = Celery("pah_workers")
|
|
|
|
# Configure Celery
|
|
app.config_from_object(
|
|
{
|
|
"broker_url": settings.CELERY_BROKER_URL,
|
|
"result_backend": settings.CELERY_RESULT_BACKEND,
|
|
"task_serializer": "json",
|
|
"result_serializer": "json",
|
|
"accept_content": ["json"],
|
|
"timezone": "UTC",
|
|
"task_track_started": True,
|
|
"task_acks_late": True,
|
|
"worker_prefetch_multiplier": 1,
|
|
# Queue routing: video generation runs on dedicated queue
|
|
"task_routes": {
|
|
"tasks.workers.create_video": {"queue": "video"},
|
|
},
|
|
"task_default_queue": "celery",
|
|
}
|
|
)
|
|
|
|
# Beat schedule for periodic tasks
|
|
# Note: Most tasks are now event-based. check_timeouts remains as a safety net
|
|
# for edge cases (e.g., scheduled timeout tasks that failed to run).
|
|
app.conf.beat_schedule = {
|
|
"check-stale-submissions": {
|
|
"task": "tasks.workers.check_timeouts",
|
|
"schedule": 1800.0, # Every 30 minutes (safety net only)
|
|
},
|
|
"cleanup-old-files": {
|
|
"task": "tasks.workers.cleanup_old_files",
|
|
"schedule": crontab(hour=3, minute=0), # Daily at 3 AM
|
|
},
|
|
}
|
|
|
|
# Auto-discover tasks (module is tasks.workers)
|
|
app.autodiscover_tasks(["tasks"], related_name="workers")
|