pdf-accessibility/redis_queue.py
Vadym Samoilenko 112719b2c5 Add Docker stack, frontend redesign, and visual page inspector fix
- Redesigned frontend with Outfit/Figtree typography, coral accent palette,
  noise texture, glassmorphism header, and staggered animations
- Split monolithic index.html into modular JS (app, api, upload, batch,
  results, page-viewer, utils) and extracted CSS
- Fixed worker.py to generate page images for Visual Page Inspector
- Added Docker Compose stack (web, worker, redis, postgres)
- Added batch upload, HTML report export, rate limiting, and Redis queue
- Extended test suite with checker, remediation, worker, and DB tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 18:12:44 +00:00

92 lines
2.3 KiB
Python

"""
Redis Queue Helper — Push/pop jobs, track status, rate limiting
"""
import json
import time
import os
import redis
# Default connection settings
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
QUEUE_NAME = 'pdf:queue'
STATUS_PREFIX = 'pdf:status:'
RATE_PREFIX = 'pdf:rate:'
def get_redis():
"""Get a Redis connection."""
return redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
decode_responses=True
)
def push_job(job_id: str, pdf_path: str, options: dict = None):
"""Push a job to the processing queue."""
r = get_redis()
payload = json.dumps({
'job_id': job_id,
'pdf_path': pdf_path,
'options': options or {},
'queued_at': time.time()
})
r.lpush(QUEUE_NAME, payload)
set_job_status(job_id, 'queued', 0, 'Waiting in queue')
def pop_job(timeout: int = 0):
"""Pop a job from the queue (blocking)."""
r = get_redis()
result = r.brpop(QUEUE_NAME, timeout=timeout)
if result:
_, payload = result
return json.loads(payload)
return None
def set_job_status(job_id: str, status: str, progress: int = 0, message: str = ''):
"""Set job status in Redis."""
r = get_redis()
data = {
'status': status,
'progress': progress,
'message': message,
'updated_at': time.time()
}
r.set(STATUS_PREFIX + job_id, json.dumps(data), ex=86400) # 24h TTL
def get_job_status(job_id: str) -> dict:
"""Get job status from Redis."""
r = get_redis()
data = r.get(STATUS_PREFIX + job_id)
if data:
return json.loads(data)
return None
def check_rate_limit(ip: str, action: str, limit: int, window: int) -> bool:
"""
Check rate limit. Returns True if within limit, False if exceeded.
Args:
ip: Client IP address
action: Action name (e.g., 'upload', 'check')
limit: Max requests allowed
window: Time window in seconds
"""
r = get_redis()
key = f"{RATE_PREFIX}{ip}:{action}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current <= limit
def get_queue_length() -> int:
"""Get the number of jobs waiting in queue."""
r = get_redis()
return r.llen(QUEUE_NAME)