- Step 14: Brand enforcement service (font/color/logo replacement, WCAG contrast check, LLM prompt context) - Step 15: Enhanced outline & slide content generation with brand context, content summary, "no hallucination" instructions - Step 15b: LLM auto-fallback retry logic across providers (FALLBACK_LLM_PROVIDERS env) - Step 16: Redis/ARQ job queue — worker entry point, presentation & master deck workers, job status/SSE endpoints, graceful fallback to BackgroundTasks when Redis unavailable Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
58 lines
1.6 KiB
Python
58 lines
1.6 KiB
Python
"""Redis service: connection pool and job progress utilities."""
|
|
import json
|
|
import os
|
|
import uuid
|
|
from typing import Optional
|
|
|
|
from arq import create_pool
|
|
from arq.connections import ArqRedis, RedisSettings
|
|
|
|
|
|
_pool: Optional[ArqRedis] = None
|
|
|
|
|
|
def _get_redis_settings() -> RedisSettings:
|
|
"""Parse REDIS_URL env var into ARQ RedisSettings."""
|
|
url = os.environ.get("REDIS_URL", "redis://localhost:6379/0")
|
|
return RedisSettings.from_dsn(url)
|
|
|
|
|
|
async def get_arq_pool() -> ArqRedis:
|
|
"""Get or create the shared ARQ Redis connection pool."""
|
|
global _pool
|
|
if _pool is None:
|
|
_pool = await create_pool(_get_redis_settings())
|
|
return _pool
|
|
|
|
|
|
async def close_arq_pool() -> None:
|
|
"""Close the shared ARQ Redis pool (call on app shutdown)."""
|
|
global _pool
|
|
if _pool is not None:
|
|
await _pool.aclose()
|
|
_pool = None
|
|
|
|
|
|
async def enqueue_job(function_name: str, **kwargs) -> Optional[str]:
|
|
"""Enqueue a job via ARQ. Returns the ARQ job ID."""
|
|
pool = await get_arq_pool()
|
|
job = await pool.enqueue_job(function_name, **kwargs)
|
|
return job.job_id if job else None
|
|
|
|
|
|
async def publish_job_progress(
|
|
job_id: uuid.UUID,
|
|
progress: int,
|
|
message: str,
|
|
status: str = "processing",
|
|
) -> None:
|
|
"""Publish a progress event to Redis pub/sub for SSE consumers."""
|
|
pool = await get_arq_pool()
|
|
channel = f"job:{job_id}:progress"
|
|
payload = json.dumps({
|
|
"job_id": str(job_id),
|
|
"progress": progress,
|
|
"message": message,
|
|
"status": status,
|
|
})
|
|
await pool.publish(channel, payload.encode())
|