- Redesigned frontend with Outfit/Figtree typography, coral accent palette, noise texture, glassmorphism header, and staggered animations - Split monolithic index.html into modular JS (app, api, upload, batch, results, page-viewer, utils) and extracted CSS - Fixed worker.py to generate page images for Visual Page Inspector - Added Docker Compose stack (web, worker, redis, postgres) - Added batch upload, HTML report export, rate limiting, and Redis queue - Extended test suite with checker, remediation, worker, and DB tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
204 lines
6.5 KiB
Python
204 lines
6.5 KiB
Python
"""
|
|
Tests for redis_queue.py — all Redis calls are mocked.
|
|
"""
|
|
|
|
import pytest
|
|
import json
|
|
import time
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
|
|
class TestRedisQueuePushJob:
|
|
@patch("redis_queue.get_redis")
|
|
def test_push_job_basic(self, mock_get_redis):
|
|
mock_r = MagicMock()
|
|
mock_get_redis.return_value = mock_r
|
|
|
|
from redis_queue import push_job
|
|
push_job("pdf_abc123", "/uploads/test.pdf")
|
|
|
|
# Should LPUSH to queue
|
|
mock_r.lpush.assert_called_once()
|
|
call_args = mock_r.lpush.call_args
|
|
assert call_args[0][0] == "pdf:queue"
|
|
payload = json.loads(call_args[0][1])
|
|
assert payload["job_id"] == "pdf_abc123"
|
|
assert payload["pdf_path"] == "/uploads/test.pdf"
|
|
|
|
@patch("redis_queue.get_redis")
|
|
def test_push_job_with_options(self, mock_get_redis):
|
|
mock_r = MagicMock()
|
|
mock_get_redis.return_value = mock_r
|
|
|
|
from redis_queue import push_job
|
|
push_job("pdf_xyz", "/test.pdf", options={"quick_mode": True})
|
|
|
|
payload = json.loads(mock_r.lpush.call_args[0][1])
|
|
assert payload["options"]["quick_mode"] is True
|
|
|
|
@patch("redis_queue.get_redis")
|
|
def test_push_job_sets_status(self, mock_get_redis):
|
|
mock_r = MagicMock()
|
|
mock_get_redis.return_value = mock_r
|
|
|
|
from redis_queue import push_job
|
|
push_job("pdf_status1", "/test.pdf")
|
|
|
|
# Should also call set (for status) — at least 1 set call
|
|
assert mock_r.set.called
|
|
|
|
|
|
class TestRedisQueuePopJob:
|
|
@patch("redis_queue.get_redis")
|
|
def test_pop_job_with_data(self, mock_get_redis):
|
|
mock_r = MagicMock()
|
|
payload = json.dumps({"job_id": "pdf_abc", "pdf_path": "/test.pdf", "options": {}})
|
|
mock_r.brpop.return_value = ("pdf:queue", payload)
|
|
mock_get_redis.return_value = mock_r
|
|
|
|
from redis_queue import pop_job
|
|
result = pop_job(timeout=5)
|
|
|
|
assert result["job_id"] == "pdf_abc"
|
|
mock_r.brpop.assert_called_once_with("pdf:queue", timeout=5)
|
|
|
|
@patch("redis_queue.get_redis")
|
|
def test_pop_job_empty_queue(self, mock_get_redis):
|
|
mock_r = MagicMock()
|
|
mock_r.brpop.return_value = None
|
|
mock_get_redis.return_value = mock_r
|
|
|
|
from redis_queue import pop_job
|
|
result = pop_job(timeout=1)
|
|
|
|
assert result is None
|
|
|
|
|
|
class TestRedisQueueStatus:
|
|
@patch("redis_queue.get_redis")
|
|
def test_set_job_status(self, mock_get_redis):
|
|
mock_r = MagicMock()
|
|
mock_get_redis.return_value = mock_r
|
|
|
|
from redis_queue import set_job_status
|
|
set_job_status("pdf_test", "processing", 50, "Halfway done")
|
|
|
|
mock_r.set.assert_called_once()
|
|
call_args = mock_r.set.call_args
|
|
key = call_args[0][0]
|
|
assert key == "pdf:status:pdf_test"
|
|
data = json.loads(call_args[0][1])
|
|
assert data["status"] == "processing"
|
|
assert data["progress"] == 50
|
|
assert data["message"] == "Halfway done"
|
|
# Should have 24h TTL
|
|
assert call_args[1]["ex"] == 86400
|
|
|
|
@patch("redis_queue.get_redis")
|
|
def test_get_job_status_found(self, mock_get_redis):
|
|
mock_r = MagicMock()
|
|
status_data = json.dumps({"status": "completed", "progress": 100, "message": "Done"})
|
|
mock_r.get.return_value = status_data
|
|
mock_get_redis.return_value = mock_r
|
|
|
|
from redis_queue import get_job_status
|
|
result = get_job_status("pdf_xyz")
|
|
|
|
assert result["status"] == "completed"
|
|
assert result["progress"] == 100
|
|
|
|
@patch("redis_queue.get_redis")
|
|
def test_get_job_status_not_found(self, mock_get_redis):
|
|
mock_r = MagicMock()
|
|
mock_r.get.return_value = None
|
|
mock_get_redis.return_value = mock_r
|
|
|
|
from redis_queue import get_job_status
|
|
result = get_job_status("pdf_nonexistent")
|
|
|
|
assert result is None
|
|
|
|
|
|
class TestRedisQueueRateLimit:
|
|
@patch("redis_queue.get_redis")
|
|
def test_rate_limit_within_limit(self, mock_get_redis):
|
|
mock_r = MagicMock()
|
|
mock_r.incr.return_value = 1
|
|
mock_get_redis.return_value = mock_r
|
|
|
|
from redis_queue import check_rate_limit
|
|
result = check_rate_limit("192.168.1.1", "upload", limit=10, window=3600)
|
|
|
|
assert result is True
|
|
mock_r.expire.assert_called_once()
|
|
|
|
@patch("redis_queue.get_redis")
|
|
def test_rate_limit_exceeded(self, mock_get_redis):
|
|
mock_r = MagicMock()
|
|
mock_r.incr.return_value = 11
|
|
mock_get_redis.return_value = mock_r
|
|
|
|
from redis_queue import check_rate_limit
|
|
result = check_rate_limit("192.168.1.1", "upload", limit=10, window=3600)
|
|
|
|
assert result is False
|
|
|
|
@patch("redis_queue.get_redis")
|
|
def test_rate_limit_at_boundary(self, mock_get_redis):
|
|
mock_r = MagicMock()
|
|
mock_r.incr.return_value = 10
|
|
mock_get_redis.return_value = mock_r
|
|
|
|
from redis_queue import check_rate_limit
|
|
result = check_rate_limit("10.0.0.1", "check", limit=10, window=1800)
|
|
|
|
assert result is True
|
|
|
|
@patch("redis_queue.get_redis")
|
|
def test_rate_limit_expire_only_on_first(self, mock_get_redis):
|
|
mock_r = MagicMock()
|
|
mock_r.incr.return_value = 5 # Not the first call
|
|
mock_get_redis.return_value = mock_r
|
|
|
|
from redis_queue import check_rate_limit
|
|
check_rate_limit("10.0.0.1", "upload", limit=10, window=3600)
|
|
|
|
# Expire should NOT be called (current != 1)
|
|
mock_r.expire.assert_not_called()
|
|
|
|
|
|
class TestRedisQueueLength:
|
|
@patch("redis_queue.get_redis")
|
|
def test_get_queue_length(self, mock_get_redis):
|
|
mock_r = MagicMock()
|
|
mock_r.llen.return_value = 5
|
|
mock_get_redis.return_value = mock_r
|
|
|
|
from redis_queue import get_queue_length
|
|
assert get_queue_length() == 5
|
|
mock_r.llen.assert_called_once_with("pdf:queue")
|
|
|
|
@patch("redis_queue.get_redis")
|
|
def test_get_queue_length_empty(self, mock_get_redis):
|
|
mock_r = MagicMock()
|
|
mock_r.llen.return_value = 0
|
|
mock_get_redis.return_value = mock_r
|
|
|
|
from redis_queue import get_queue_length
|
|
assert get_queue_length() == 0
|
|
|
|
|
|
class TestGetRedis:
|
|
@patch("redis_queue.redis.Redis")
|
|
def test_get_redis_uses_configured_host(self, mock_redis_class):
|
|
from redis_queue import get_redis, REDIS_HOST, REDIS_PORT
|
|
get_redis()
|
|
mock_redis_class.assert_called_once_with(
|
|
host=REDIS_HOST,
|
|
port=REDIS_PORT,
|
|
decode_responses=True,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|