5.8 KiB
5.8 KiB
| title | aliases | tags | sources | created | updated | |||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Redis + Celery Async Worker Queue |
|
|
|
2026-04-15 | 2026-04-29 |
Redis + Celery Async Worker Queue
Pattern for offloading long-running AI/processing tasks to background workers. Used in the heaviest Oliver processing pipelines.
Key Takeaways
- Redis is both the message broker AND result backend for Celery
- Use Celery when tasks take >5s (AI inference, video processing, PDF analysis)
Celery beatfor scheduled recurring tasks (e.g., SharePoint sync)- PDF Accessibility uses Redis queue directly (
pdf:queue) without Celery — simplerworker.pydaemon - Always poll for task status from the frontend; never block on long tasks
When to Use
- Video processing pipelines (multi-phase, minutes-long)
- Scheduled sync jobs (Celery beat)
- Any task that would timeout an HTTP request (>30s)
- Parallel AI analysis tasks
Key Details
Standard Setup
# docker-compose.yml
services:
redis:
image: redis:7
ports: ["6379:6379"]
worker:
build: ./backend
command: celery -A app.celery worker --loglevel=info
depends_on: [redis]
beat:
build: ./backend
command: celery -A app.celery beat --loglevel=info
depends_on: [redis]
Task Definition
@celery.task
def process_video(video_id: str):
# Long-running pipeline
phase_1_ingest(video_id)
phase_2_caption(video_id) # Gemini 2.5 Pro
phase_3_translate(video_id)
phase_4_tts(video_id)
Polling Pattern (Frontend)
// Poll until complete
const poll = async (jobId) => {
const { status } = await api.get(`/jobs/${jobId}/status`)
if (status === 'pending') setTimeout(() => poll(jobId), 2000)
}
Projects Using This Pattern
- 01 Projects/enterprise-ai-hub-nexus/Enterprise AI Hub Nexus — Celery beat for SharePoint sync + scheduled tasks; Redis 7 + PostgreSQL
- 01 Projects/video-accessibility/Video Accessibility Platform — Celery workers for 7-phase video pipeline; Redis + MongoDB Atlas + GCS
- 01 Projects/pdf-accessibility/PDF Accessibility Checker — Custom
worker.pydaemon readingpdf:queuefrom Redis; PostgreSQL for job tracking
Pipeline Phases (Video Accessibility)
1. Upload → Ingestion worker
2. Gemini 2.5 Pro → VTT captions
3. Audio Description generation
4. QC review (approve/reject/edit VTT)
5. Translation → 50+ languages
6. TTS synthesis (GCP TTS + ElevenLabs)
7. Final delivery
AI Queue Separation Pattern
[!tip] Prevent lightweight tasks from starving behind long-running AI jobs Dedicated
ai-queuefor ingest/translate/render tasks; default queue for fast operations.
When AI-heavy tasks (video ingest, translation, TTS rendering) share a queue with lightweight tasks (status checks, metadata updates), a burst of AI jobs blocks the lightweight tasks from running. The fix is a second worker bound exclusively to a separate ai-queue.
# docker-compose.yml — two worker containers, separate queues
services:
worker:
build: ./backend
command: celery -A app.celery worker --loglevel=info -Q celery # default queue
depends_on: [redis]
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
worker-ai:
build: ./backend
command: celery -A app.celery worker --loglevel=info -Q ai-queue # AI tasks only
depends_on: [redis]
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
deploy:
resources:
limits:
memory: 4G # AI workers need more RAM
# Task routing — AI-heavy tasks go to ai-queue
@celery.task(queue="ai-queue")
def ingest_video(video_id: str):
... # Long-running: minutes
@celery.task(queue="ai-queue")
def translate_captions(video_id: str, lang: str):
... # AI inference
@celery.task(queue="ai-queue")
def render_audio_description(video_id: str):
... # TTS synthesis
# Lightweight tasks stay on default queue
@celery.task # no queue= → goes to default "celery" queue
def update_job_status(job_id: str, status: str):
... # Fast DB update
@celery.task
def send_completion_email(job_id: str):
... # Fast notification
Queue routing in Celery config:
# celery_config.py
from kombu import Queue
CELERY_TASK_QUEUES = (
Queue("celery"), # default lightweight queue
Queue("ai-queue"), # AI-heavy tasks
)
CELERY_TASK_DEFAULT_QUEUE = "celery"
Gotchas & Lessons
- Celery beat needs its own container — it manages schedules independently from workers
- Proactive token refresh required for long Celery jobs that need M365 access (Enterprise Nexus)
worker.pysimpler alternative to Celery for single-queue use (PDF Accessibility pattern)- Always store job status in DB (not just Redis) so it survives Redis restart
video_accessibility_development_plan.txtis the authoritative spec — always read before touching that pipeline- AI queue separation is critical when mixing ingest/translate/render with lightweight tasks — without it, a 20-minute AI job blocks all status updates
- Worker restart orphans in-flight tasks — tasks executing when a container restarts are killed silently; MongoDB job stays in
ai_processingforever. Check stuck jobs before every deploy and reprocess manually after. See wiki/concepts/celery-worker-restart-inflight-task-loss
Related
- wiki/tech-patterns/fastapi-python-docker — the API layer above
- wiki/tech-patterns/python-ai-agents — what the workers execute
- wiki/architecture/gcp-deployment-lb-timeout — why polling beats streaming