semblance-dev/backend/app/utils/rate_limiter.py
Vadym Samoilenko 3e1865edbd Apply Jintech security audit remediation (sprint 3) — 87/92 findings fixed
- Fix missing await on FocusGroup.get_messages() (N-L1)
- Replace time.sleep with asyncio.sleep in key_theme_service and focus_group_service (N-P10)
- Replace flask import with quart in focus_groups.py (N-S3)
- Add logger.error before all 500 returns in focus_groups.py (N-P6)
- Add logging to silent except blocks across routes (N-M10, N-M11)
- Add @rate_limit to 6 remaining AI endpoints (N-H4)
- Add --confirm flag to populate scripts before delete_many (S-H2)
- Remove hardcoded Azure ID fallbacks from msal_service.py and msalConfig.ts (A-M2, F-H4)
- Centralize make_serializable() in utils.py, remove duplicates from 3 route files (N-P7)
- Replace all datetime.utcnow() with datetime.now(timezone.utc) across entire backend (M-L2)
- AuthContext.tsx: only mark token validated on 200 success, not on non-401 errors (F-H2)
- Rename authType → auth_type in auth.py (N-S4)
- Add security_report.md and security_report.pdf with full 92-finding status

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 12:51:18 +00:00

70 lines
2.1 KiB
Python

"""
Simple in-memory rate limiter for Quart endpoints.
Uses a sliding-window counter keyed by IP address or user ID.
"""
import time
import asyncio
from collections import defaultdict
from functools import wraps
from quart import request, jsonify
class RateLimiter:
"""Thread-safe in-memory rate limiter using sliding window."""
def __init__(self):
# {key: [(timestamp, count), ...]}
self._buckets: dict[str, list] = defaultdict(list)
self._lock = asyncio.Lock()
async def is_allowed(self, key: str, max_requests: int, window_seconds: int) -> bool:
"""Return True if the request is within the rate limit."""
async with self._lock:
now = time.monotonic()
cutoff = now - window_seconds
bucket = self._buckets[key]
# Remove expired entries
self._buckets[key] = [ts for ts in bucket if ts > cutoff]
if len(self._buckets[key]) >= max_requests:
return False
self._buckets[key].append(now)
return True
_limiter = RateLimiter()
def rate_limit(max_requests: int, window_seconds: int, key_func=None):
"""
Decorator that rate-limits a Quart route.
Args:
max_requests: Maximum number of requests allowed.
window_seconds: Time window in seconds.
key_func: Callable that returns the rate-limit key from the request.
Defaults to client IP address.
"""
def decorator(f):
@wraps(f)
async def wrapper(*args, **kwargs):
if key_func:
key = key_func()
else:
# Default: rate limit by IP
key = f"{f.__name__}:{request.remote_addr}"
allowed = await _limiter.is_allowed(key, max_requests, window_seconds)
if not allowed:
return jsonify({"message": "Too many requests. Please try again later."}), 429
return await f(*args, **kwargs)
return wrapper
return decorator
def ip_key():
"""Rate limit key: function name + client IP."""
return f"{request.endpoint}:{request.remote_addr}"