""" Redis Queue Helper — Push/pop jobs, track status, rate limiting """ import json import time import os import redis # Default connection settings REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') REDIS_PORT = int(os.getenv('REDIS_PORT', 6379)) QUEUE_NAME = 'pdf:queue' STATUS_PREFIX = 'pdf:status:' RATE_PREFIX = 'pdf:rate:' def get_redis(): """Get a Redis connection.""" return redis.Redis( host=REDIS_HOST, port=REDIS_PORT, decode_responses=True ) def push_job(job_id: str, pdf_path: str, options: dict = None): """Push a job to the processing queue.""" r = get_redis() payload = json.dumps({ 'job_id': job_id, 'pdf_path': pdf_path, 'options': options or {}, 'queued_at': time.time() }) r.lpush(QUEUE_NAME, payload) set_job_status(job_id, 'queued', 0, 'Waiting in queue') def pop_job(timeout: int = 0): """Pop a job from the queue (blocking).""" r = get_redis() result = r.brpop(QUEUE_NAME, timeout=timeout) if result: _, payload = result return json.loads(payload) return None def set_job_status(job_id: str, status: str, progress: int = 0, message: str = ''): """Set job status in Redis.""" r = get_redis() data = { 'status': status, 'progress': progress, 'message': message, 'updated_at': time.time() } r.set(STATUS_PREFIX + job_id, json.dumps(data), ex=86400) # 24h TTL def get_job_status(job_id: str) -> dict: """Get job status from Redis.""" r = get_redis() data = r.get(STATUS_PREFIX + job_id) if data: return json.loads(data) return None def check_rate_limit(ip: str, action: str, limit: int, window: int) -> bool: """ Check rate limit. Returns True if within limit, False if exceeded. Args: ip: Client IP address action: Action name (e.g., 'upload', 'check') limit: Max requests allowed window: Time window in seconds """ r = get_redis() key = f"{RATE_PREFIX}{ip}:{action}" current = r.incr(key) if current == 1: r.expire(key, window) return current <= limit def get_queue_length() -> int: """Get the number of jobs waiting in queue.""" r = get_redis() return r.llen(QUEUE_NAME)