92 lines
2.3 KiB
Python
92 lines
2.3 KiB
Python
"""
|
|
Redis Queue Helper — Push/pop jobs, track status, rate limiting
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import os
|
|
import redis
|
|
|
|
# Default connection settings
|
|
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
|
|
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
|
|
QUEUE_NAME = 'pdf:queue'
|
|
STATUS_PREFIX = 'pdf:status:'
|
|
RATE_PREFIX = 'pdf:rate:'
|
|
|
|
|
|
def get_redis():
|
|
"""Get a Redis connection."""
|
|
return redis.Redis(
|
|
host=REDIS_HOST,
|
|
port=REDIS_PORT,
|
|
decode_responses=True
|
|
)
|
|
|
|
|
|
def push_job(job_id: str, pdf_path: str, options: dict = None):
|
|
"""Push a job to the processing queue."""
|
|
r = get_redis()
|
|
payload = json.dumps({
|
|
'job_id': job_id,
|
|
'pdf_path': pdf_path,
|
|
'options': options or {},
|
|
'queued_at': time.time()
|
|
})
|
|
r.lpush(QUEUE_NAME, payload)
|
|
set_job_status(job_id, 'queued', 0, 'Waiting in queue')
|
|
|
|
|
|
def pop_job(timeout: int = 0):
|
|
"""Pop a job from the queue (blocking)."""
|
|
r = get_redis()
|
|
result = r.brpop(QUEUE_NAME, timeout=timeout)
|
|
if result:
|
|
_, payload = result
|
|
return json.loads(payload)
|
|
return None
|
|
|
|
|
|
def set_job_status(job_id: str, status: str, progress: int = 0, message: str = ''):
|
|
"""Set job status in Redis."""
|
|
r = get_redis()
|
|
data = {
|
|
'status': status,
|
|
'progress': progress,
|
|
'message': message,
|
|
'updated_at': time.time()
|
|
}
|
|
r.set(STATUS_PREFIX + job_id, json.dumps(data), ex=86400) # 24h TTL
|
|
|
|
|
|
def get_job_status(job_id: str) -> dict:
|
|
"""Get job status from Redis."""
|
|
r = get_redis()
|
|
data = r.get(STATUS_PREFIX + job_id)
|
|
if data:
|
|
return json.loads(data)
|
|
return None
|
|
|
|
|
|
def check_rate_limit(ip: str, action: str, limit: int, window: int) -> bool:
|
|
"""
|
|
Check rate limit. Returns True if within limit, False if exceeded.
|
|
|
|
Args:
|
|
ip: Client IP address
|
|
action: Action name (e.g., 'upload', 'check')
|
|
limit: Max requests allowed
|
|
window: Time window in seconds
|
|
"""
|
|
r = get_redis()
|
|
key = f"{RATE_PREFIX}{ip}:{action}"
|
|
current = r.incr(key)
|
|
if current == 1:
|
|
r.expire(key, window)
|
|
return current <= limit
|
|
|
|
|
|
def get_queue_length() -> int:
|
|
"""Get the number of jobs waiting in queue."""
|
|
r = get_redis()
|
|
return r.llen(QUEUE_NAME)
|