ppt-tool/backend/services/redis_service.py
Vadym Samoilenko a0d73b3b63 Phase 4: Generation Pipeline — brand enforcement, enhanced LLM calls, ARQ job queue
- Step 14: Brand enforcement service (font/color/logo replacement, WCAG contrast check, LLM prompt context)
- Step 15: Enhanced outline & slide content generation with brand context, content summary, "no hallucination" instructions
- Step 15b: LLM auto-fallback retry logic across providers (FALLBACK_LLM_PROVIDERS env)
- Step 16: Redis/ARQ job queue — worker entry point, presentation & master deck workers, job status/SSE endpoints, graceful fallback to BackgroundTasks when Redis unavailable

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 16:15:25 +00:00

58 lines
1.6 KiB
Python

"""Redis service: connection pool and job progress utilities."""
import json
import os
import uuid
from typing import Optional
from arq import create_pool
from arq.connections import ArqRedis, RedisSettings
_pool: Optional[ArqRedis] = None
def _get_redis_settings() -> RedisSettings:
"""Parse REDIS_URL env var into ARQ RedisSettings."""
url = os.environ.get("REDIS_URL", "redis://localhost:6379/0")
return RedisSettings.from_dsn(url)
async def get_arq_pool() -> ArqRedis:
"""Get or create the shared ARQ Redis connection pool."""
global _pool
if _pool is None:
_pool = await create_pool(_get_redis_settings())
return _pool
async def close_arq_pool() -> None:
"""Close the shared ARQ Redis pool (call on app shutdown)."""
global _pool
if _pool is not None:
await _pool.aclose()
_pool = None
async def enqueue_job(function_name: str, **kwargs) -> Optional[str]:
"""Enqueue a job via ARQ. Returns the ARQ job ID."""
pool = await get_arq_pool()
job = await pool.enqueue_job(function_name, **kwargs)
return job.job_id if job else None
async def publish_job_progress(
job_id: uuid.UUID,
progress: int,
message: str,
status: str = "processing",
) -> None:
"""Publish a progress event to Redis pub/sub for SSE consumers."""
pool = await get_arq_pool()
channel = f"job:{job_id}:progress"
payload = json.dumps({
"job_id": str(job_id),
"progress": progress,
"message": message,
"status": status,
})
await pool.publish(channel, payload.encode())