""" Tests for redis_queue.py — all Redis calls are mocked. """ import pytest import json import time from unittest.mock import patch, MagicMock class TestRedisQueuePushJob: @patch("redis_queue.get_redis") def test_push_job_basic(self, mock_get_redis): mock_r = MagicMock() mock_get_redis.return_value = mock_r from redis_queue import push_job push_job("pdf_abc123", "/uploads/test.pdf") # Should LPUSH to queue mock_r.lpush.assert_called_once() call_args = mock_r.lpush.call_args assert call_args[0][0] == "pdf:queue" payload = json.loads(call_args[0][1]) assert payload["job_id"] == "pdf_abc123" assert payload["pdf_path"] == "/uploads/test.pdf" @patch("redis_queue.get_redis") def test_push_job_with_options(self, mock_get_redis): mock_r = MagicMock() mock_get_redis.return_value = mock_r from redis_queue import push_job push_job("pdf_xyz", "/test.pdf", options={"quick_mode": True}) payload = json.loads(mock_r.lpush.call_args[0][1]) assert payload["options"]["quick_mode"] is True @patch("redis_queue.get_redis") def test_push_job_sets_status(self, mock_get_redis): mock_r = MagicMock() mock_get_redis.return_value = mock_r from redis_queue import push_job push_job("pdf_status1", "/test.pdf") # Should also call set (for status) — at least 1 set call assert mock_r.set.called class TestRedisQueuePopJob: @patch("redis_queue.get_redis") def test_pop_job_with_data(self, mock_get_redis): mock_r = MagicMock() payload = json.dumps({"job_id": "pdf_abc", "pdf_path": "/test.pdf", "options": {}}) mock_r.brpop.return_value = ("pdf:queue", payload) mock_get_redis.return_value = mock_r from redis_queue import pop_job result = pop_job(timeout=5) assert result["job_id"] == "pdf_abc" mock_r.brpop.assert_called_once_with("pdf:queue", timeout=5) @patch("redis_queue.get_redis") def test_pop_job_empty_queue(self, mock_get_redis): mock_r = MagicMock() mock_r.brpop.return_value = None mock_get_redis.return_value = mock_r from redis_queue import pop_job result = pop_job(timeout=1) assert result is None class TestRedisQueueStatus: @patch("redis_queue.get_redis") def test_set_job_status(self, mock_get_redis): mock_r = MagicMock() mock_get_redis.return_value = mock_r from redis_queue import set_job_status set_job_status("pdf_test", "processing", 50, "Halfway done") mock_r.set.assert_called_once() call_args = mock_r.set.call_args key = call_args[0][0] assert key == "pdf:status:pdf_test" data = json.loads(call_args[0][1]) assert data["status"] == "processing" assert data["progress"] == 50 assert data["message"] == "Halfway done" # Should have 24h TTL assert call_args[1]["ex"] == 86400 @patch("redis_queue.get_redis") def test_get_job_status_found(self, mock_get_redis): mock_r = MagicMock() status_data = json.dumps({"status": "completed", "progress": 100, "message": "Done"}) mock_r.get.return_value = status_data mock_get_redis.return_value = mock_r from redis_queue import get_job_status result = get_job_status("pdf_xyz") assert result["status"] == "completed" assert result["progress"] == 100 @patch("redis_queue.get_redis") def test_get_job_status_not_found(self, mock_get_redis): mock_r = MagicMock() mock_r.get.return_value = None mock_get_redis.return_value = mock_r from redis_queue import get_job_status result = get_job_status("pdf_nonexistent") assert result is None class TestRedisQueueRateLimit: @patch("redis_queue.get_redis") def test_rate_limit_within_limit(self, mock_get_redis): mock_r = MagicMock() mock_r.incr.return_value = 1 mock_get_redis.return_value = mock_r from redis_queue import check_rate_limit result = check_rate_limit("192.168.1.1", "upload", limit=10, window=3600) assert result is True mock_r.expire.assert_called_once() @patch("redis_queue.get_redis") def test_rate_limit_exceeded(self, mock_get_redis): mock_r = MagicMock() mock_r.incr.return_value = 11 mock_get_redis.return_value = mock_r from redis_queue import check_rate_limit result = check_rate_limit("192.168.1.1", "upload", limit=10, window=3600) assert result is False @patch("redis_queue.get_redis") def test_rate_limit_at_boundary(self, mock_get_redis): mock_r = MagicMock() mock_r.incr.return_value = 10 mock_get_redis.return_value = mock_r from redis_queue import check_rate_limit result = check_rate_limit("10.0.0.1", "check", limit=10, window=1800) assert result is True @patch("redis_queue.get_redis") def test_rate_limit_expire_only_on_first(self, mock_get_redis): mock_r = MagicMock() mock_r.incr.return_value = 5 # Not the first call mock_get_redis.return_value = mock_r from redis_queue import check_rate_limit check_rate_limit("10.0.0.1", "upload", limit=10, window=3600) # Expire should NOT be called (current != 1) mock_r.expire.assert_not_called() class TestRedisQueueLength: @patch("redis_queue.get_redis") def test_get_queue_length(self, mock_get_redis): mock_r = MagicMock() mock_r.llen.return_value = 5 mock_get_redis.return_value = mock_r from redis_queue import get_queue_length assert get_queue_length() == 5 mock_r.llen.assert_called_once_with("pdf:queue") @patch("redis_queue.get_redis") def test_get_queue_length_empty(self, mock_get_redis): mock_r = MagicMock() mock_r.llen.return_value = 0 mock_get_redis.return_value = mock_r from redis_queue import get_queue_length assert get_queue_length() == 0 class TestGetRedis: @patch("redis_queue.redis.Redis") def test_get_redis_uses_configured_host(self, mock_redis_class): from redis_queue import get_redis, REDIS_HOST, REDIS_PORT get_redis() mock_redis_class.assert_called_once_with( host=REDIS_HOST, port=REDIS_PORT, decode_responses=True, ) if __name__ == "__main__": pytest.main([__file__, "-v"])