obsidian/wiki/tech-patterns/redis-celery-worker-queue.md
2026-05-10 21:21:13 +01:00

5.8 KiB

title aliases tags sources created updated
Redis + Celery Async Worker Queue
celery
task-queue
worker
redis-queue
redis
celery
async
worker
queue
python
01 Projects/enterprise-ai-hub-nexus
01 Projects/video-accessibility
01 Projects/pdf-accessibility
2026-04-15 2026-04-29

Redis + Celery Async Worker Queue

Pattern for offloading long-running AI/processing tasks to background workers. Used in the heaviest Oliver processing pipelines.

Key Takeaways

  • Redis is both the message broker AND result backend for Celery
  • Use Celery when tasks take >5s (AI inference, video processing, PDF analysis)
  • Celery beat for scheduled recurring tasks (e.g., SharePoint sync)
  • PDF Accessibility uses Redis queue directly (pdf:queue) without Celery — simpler worker.py daemon
  • Always poll for task status from the frontend; never block on long tasks

When to Use

  • Video processing pipelines (multi-phase, minutes-long)
  • Scheduled sync jobs (Celery beat)
  • Any task that would timeout an HTTP request (>30s)
  • Parallel AI analysis tasks

Key Details

Standard Setup

# docker-compose.yml
services:
  redis:
    image: redis:7
    ports: ["6379:6379"]
  worker:
    build: ./backend
    command: celery -A app.celery worker --loglevel=info
    depends_on: [redis]
  beat:
    build: ./backend
    command: celery -A app.celery beat --loglevel=info
    depends_on: [redis]

Task Definition

@celery.task
def process_video(video_id: str):
    # Long-running pipeline
    phase_1_ingest(video_id)
    phase_2_caption(video_id)   # Gemini 2.5 Pro
    phase_3_translate(video_id)
    phase_4_tts(video_id)

Polling Pattern (Frontend)

// Poll until complete
const poll = async (jobId) => {
  const { status } = await api.get(`/jobs/${jobId}/status`)
  if (status === 'pending') setTimeout(() => poll(jobId), 2000)
}

Projects Using This Pattern

Pipeline Phases (Video Accessibility)

1. Upload → Ingestion worker
2. Gemini 2.5 Pro → VTT captions
3. Audio Description generation
4. QC review (approve/reject/edit VTT)
5. Translation → 50+ languages
6. TTS synthesis (GCP TTS + ElevenLabs)
7. Final delivery

AI Queue Separation Pattern

[!tip] Prevent lightweight tasks from starving behind long-running AI jobs Dedicated ai-queue for ingest/translate/render tasks; default queue for fast operations.

When AI-heavy tasks (video ingest, translation, TTS rendering) share a queue with lightweight tasks (status checks, metadata updates), a burst of AI jobs blocks the lightweight tasks from running. The fix is a second worker bound exclusively to a separate ai-queue.

# docker-compose.yml — two worker containers, separate queues
services:
  worker:
    build: ./backend
    command: celery -A app.celery worker --loglevel=info -Q celery  # default queue
    depends_on: [redis]
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0

  worker-ai:
    build: ./backend
    command: celery -A app.celery worker --loglevel=info -Q ai-queue  # AI tasks only
    depends_on: [redis]
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    deploy:
      resources:
        limits:
          memory: 4G  # AI workers need more RAM
# Task routing — AI-heavy tasks go to ai-queue
@celery.task(queue="ai-queue")
def ingest_video(video_id: str):
    ...  # Long-running: minutes

@celery.task(queue="ai-queue")
def translate_captions(video_id: str, lang: str):
    ...  # AI inference

@celery.task(queue="ai-queue")
def render_audio_description(video_id: str):
    ...  # TTS synthesis

# Lightweight tasks stay on default queue
@celery.task  # no queue= → goes to default "celery" queue
def update_job_status(job_id: str, status: str):
    ...  # Fast DB update

@celery.task
def send_completion_email(job_id: str):
    ...  # Fast notification

Queue routing in Celery config:

# celery_config.py
from kombu import Queue

CELERY_TASK_QUEUES = (
    Queue("celery"),     # default lightweight queue
    Queue("ai-queue"),   # AI-heavy tasks
)
CELERY_TASK_DEFAULT_QUEUE = "celery"

Gotchas & Lessons

  • Celery beat needs its own container — it manages schedules independently from workers
  • Proactive token refresh required for long Celery jobs that need M365 access (Enterprise Nexus)
  • worker.py simpler alternative to Celery for single-queue use (PDF Accessibility pattern)
  • Always store job status in DB (not just Redis) so it survives Redis restart
  • video_accessibility_development_plan.txt is the authoritative spec — always read before touching that pipeline
  • AI queue separation is critical when mixing ingest/translate/render with lightweight tasks — without it, a 20-minute AI job blocks all status updates
  • Worker restart orphans in-flight tasks — tasks executing when a container restarts are killed silently; MongoDB job stays in ai_processing forever. Check stuck jobs before every deploy and reprocess manually after. See wiki/concepts/celery-worker-restart-inflight-task-loss