video-accessibility/backend/app/core/security.py
Vadym Samoilenko ca312d48fa chore(lint): fix all ruff errors — 0 warnings remaining
- B904 (55): add `from err` / `from None` to raise-in-except across 13 files
- F821 (1): add missing HTTPException import in routes_language_qc.py
- F841 (7): remove unused variable assignments (current_user, job_title, tts_provider, etc.)
- W293 (13): strip trailing whitespace from blank lines
- C416 (4): rewrite unnecessary dict comprehensions as dict()
- C401 (1): rewrite unnecessary generator as set comprehension
- E701 (4): split multi-statement lines in cost_tracker.py
- E741 (1): rename ambiguous `l` to `lang` in cloud_run_dispatch.py
- B007 (4): prefix unused loop variables with _ in tts.py, video_renderer.py
- I001 (1): sort imports in tasks/__init__.py (move stdlib to top)
- E402 (3): move threading/time/signals imports to top of tasks/__init__.py
- UP042 (9): replace (str, Enum) with StrEnum in all model/schema enums

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 17:13:08 +01:00

61 lines
1.9 KiB
Python

from datetime import datetime, timedelta
from typing import Any
from fastapi import HTTPException, status
from jose import JWTError, jwt
from passlib.context import CryptContext
from .config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: str | Any,
expires_delta: timedelta | None = None,
org_ids: list[str] | None = None,
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.jwt_access_ttl_min)
to_encode: dict[str, Any] = {"exp": expire, "sub": str(subject), "v": 2}
if org_ids:
to_encode["org_ids"] = org_ids
encoded_jwt = jwt.encode(to_encode, settings.jwt_secret, algorithm=settings.jwt_alg)
return encoded_jwt
def create_refresh_token(
subject: str | Any, expires_delta: timedelta | None = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(days=settings.jwt_refresh_ttl_days)
to_encode = {"exp": expire, "sub": str(subject), "type": "refresh"}
encoded_jwt = jwt.encode(to_encode, settings.jwt_secret, algorithm=settings.jwt_alg)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
if not hashed_password:
return False
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def decode_token(token: str) -> dict[str, Any]:
try:
payload = jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_alg])
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
) from None