video-accessibility/backend/app/core/dependencies.py
Vadym Samoilenko 31199f8705 chore: push all session changes — backend hardening, tests, apache config, deploy scripts
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 15:52:14 +01:00

249 lines
8.3 KiB
Python

from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from motor.motor_asyncio import AsyncIOMotorDatabase
from ..models.user import User, UserRole
from .database import get_database
from .security import decode_token
security = HTTPBearer()
# Only admins bypass tenant isolation; other staff are scoped by team membership
STAFF_ROLES = {UserRole.ADMIN}
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncIOMotorDatabase = Depends(get_database),
) -> User:
token = credentials.credentials
payload = decode_token(token)
if payload.get("type") == "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
user_doc = await db.users.find_one({"_id": user_id})
if user_doc is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
user = User(**user_doc)
# Attach org_ids hint from token as transient attribute (never used for authz)
token_org_ids = payload.get("org_ids", [])
if token_org_ids:
user.__dict__["org_ids"] = token_org_ids
return user
def require_role(required_role: UserRole):
async def role_checker(current_user: User = Depends(get_current_user)) -> User:
if current_user.role != required_role and current_user.role != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions",
)
return current_user
return role_checker
def require_roles(*required_roles: UserRole):
async def roles_checker(current_user: User = Depends(get_current_user)) -> User:
if current_user.role not in required_roles and current_user.role != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions",
)
return current_user
return roles_checker
async def get_current_user_optional(
request: Request,
db: AsyncIOMotorDatabase = Depends(get_database),
) -> User | None:
authorization: str = request.headers.get("Authorization")
if not authorization:
return None
try:
scheme, token = authorization.split()
if scheme.lower() != "bearer":
return None
payload = decode_token(token)
if payload.get("type") == "refresh":
return None
user_id: str = payload.get("sub")
if user_id is None:
return None
user_doc = await db.users.find_one({"_id": user_id})
if user_doc is None:
return None
return User(**user_doc)
except Exception:
return None
async def get_accessible_project_ids(
user: User,
db: AsyncIOMotorDatabase,
) -> list[str] | None:
"""
Returns project IDs the user may access, or None meaning "see everything".
- Admin → None (unrestricted)
- Staff (REVIEWER/LINGUIST/PRODUCTION) → scoped by team membership;
if not yet assigned to any team, falls back to None (see all)
so existing staff aren't locked out before teams are configured
- PM → projects in accessible orgs/clients (pm_client_ids legacy)
- CLIENT → projects in orgs where the user holds any membership
"""
if user.role in STAFF_ROLES:
return None
user_id = str(user.id)
# Primary path: use Redis-cached memberships (60s TTL, same cache as authz.py)
from .authz import (
_cached_memberships, # local import to avoid circular dep at module level
)
memberships_map = await _cached_memberships(user_id, db)
org_ids = list(memberships_map.keys())
if org_ids:
projects = await db.projects.find(
{"client_id": {"$in": org_ids}, "is_active": True},
{"_id": 1},
).to_list(None)
return [str(p["_id"]) for p in projects]
# Legacy fallback: team membership (used by REVIEWER/LINGUIST/PRODUCTION and legacy CLIENT)
teams = await db.teams.find(
{"member_user_ids": user_id},
{"client_id": 1},
).to_list(None)
client_ids = list({t["client_id"] for t in teams})
if client_ids:
projects = await db.projects.find(
{"client_id": {"$in": client_ids}, "is_active": True},
{"_id": 1},
).to_list(None)
return [str(p["_id"]) for p in projects]
# PM legacy: scoped via pm_client_ids
if user.role == UserRole.PROJECT_MANAGER:
pm_client_ids = user.pm_client_ids or []
if not pm_client_ids:
return []
projects = await db.projects.find(
{"client_id": {"$in": pm_client_ids}, "is_active": True},
{"_id": 1},
).to_list(None)
return [str(p["_id"]) for p in projects]
# Staff with no team assignments → unrestricted until teams are configured
if user.role in {UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION}:
return None
# CLIENT with no memberships and no teams → show nothing
return []
async def get_user_org_ids(user: User, db: AsyncIOMotorDatabase) -> list[str] | None:
"""Return org IDs the user belongs to, or None meaning unrestricted (ADMIN).
Priority: memberships → pm_client_ids (PM legacy) → team.member_user_ids (staff legacy)
"""
if user.role == UserRole.ADMIN:
return None
user_id = str(user.id)
# Primary: Membership collection
org_ids: list[str] = []
async for m in db.memberships.find({"user_id": user_id}, {"organization_id": 1}):
if m.get("organization_id"):
org_ids.append(str(m["organization_id"]))
if org_ids:
return org_ids
# PM legacy: pm_client_ids
if user.role == UserRole.PROJECT_MANAGER:
return list(user.pm_client_ids or [])
# Staff legacy: team.member_user_ids
teams = await db.teams.find({"member_user_ids": user_id}, {"client_id": 1}).to_list(None)
if teams:
return [str(t["client_id"]) for t in teams if t.get("client_id")]
return []
async def assert_job_in_user_org(job: dict, user: User, db: AsyncIOMotorDatabase) -> None:
"""Raise 404 (not 403) when user cannot access this job — avoids information disclosure."""
if user.role == UserRole.ADMIN:
return
org_ids = await get_user_org_ids(user, db)
if org_ids is None:
return # unrestricted
job_org = job.get("organization_id")
if job_org:
if job_org in org_ids:
return
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
# No organization_id — try project fallback
project_id = job.get("project_id")
if project_id:
project = await db.projects.find_one({"_id": project_id}, {"client_id": 1})
if project and project.get("client_id") in org_ids:
return
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
# Legacy: client_id == creator user_id
job_client_id = job.get("client_id")
if job_client_id and job_client_id == str(user.id):
return
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
def require_pm_for_client(client_id_param: str = "client_id"):
"""Dependency: ensures the current user is an Admin or PM for the given client."""
async def checker(
request: Request,
current_user: User = Depends(get_current_user),
) -> User:
if current_user.role == UserRole.ADMIN:
return current_user
if current_user.role != UserRole.PROJECT_MANAGER:
raise HTTPException(status_code=403, detail="Insufficient permissions")
client_id = request.path_params.get(client_id_param)
if client_id not in (current_user.pm_client_ids or []):
raise HTTPException(status_code=403, detail="Not a manager for this client")
return current_user
return checker