video-accessibility/backend/app/core/security.py
Vadym Samoilenko 4623b89aeb feat(mt-16): JWT org_ids claim + transient user.org_ids in deps
- create_access_token gains optional org_ids: list[str] param; encodes
  {exp, sub, org_ids, v:2} — org_ids is a prefilter hint only, never
  used as authorization source of truth (Redis cache is authoritative)
- Login, MS login, refresh endpoints: fetch memberships and include
  org_ids in issued access tokens via _get_user_org_ids() helper
- routes_invitations.py accept flow: same org_ids population on token
- get_current_user: reads org_ids from payload, attaches as transient
  user.__dict__["org_ids"] — available to OrgScopedQuery for prefilter
- Force logout: rotate JWT_SECRET env var at deployment time (no code
  change needed; all existing tokens immediately invalidated)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-29 20:46:39 +01:00

59 lines
1.8 KiB
Python

from datetime import datetime, timedelta
from typing import Any, Optional, Union
from fastapi import HTTPException, status
from jose import JWTError, jwt
from passlib.context import CryptContext
from .config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
subject: Union[str, Any],
expires_delta: Optional[timedelta] = None,
org_ids: list[str] | None = None,
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.jwt_access_ttl_min)
to_encode: dict[str, Any] = {"exp": expire, "sub": str(subject), "v": 2}
if org_ids:
to_encode["org_ids"] = org_ids
encoded_jwt = jwt.encode(to_encode, settings.jwt_secret, algorithm=settings.jwt_alg)
return encoded_jwt
def create_refresh_token(
subject: Union[str, Any], expires_delta: Optional[timedelta] = None
) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(days=settings.jwt_refresh_ttl_days)
to_encode = {"exp": expire, "sub": str(subject), "type": "refresh"}
encoded_jwt = jwt.encode(to_encode, settings.jwt_secret, algorithm=settings.jwt_alg)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def decode_token(token: str) -> dict[str, Any]:
try:
payload = jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_alg])
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)