contract-query/backend/app/core/auth.py
2025-08-14 15:03:33 -05:00

81 lines
No EOL
2.7 KiB
Python

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional
from motor.motor_asyncio import AsyncIOMotorDatabase
from bson import ObjectId
from .security import verify_token
from ..config.database import get_database
from ..models.user import UserInDB, UserRole
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncIOMotorDatabase = Depends(get_database)
) -> UserInDB:
"""Get current authenticated user"""
token = credentials.credentials
payload = verify_token(token)
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = await db.users.find_one({"_id": ObjectId(user_id)})
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"},
)
return UserInDB(**user)
async def get_current_active_user(
current_user: UserInDB = Depends(get_current_user)
) -> UserInDB:
"""Get current active user"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return current_user
async def get_current_admin_user(
current_user: UserInDB = Depends(get_current_active_user)
) -> UserInDB:
"""Get current admin user"""
if current_user.role != UserRole.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
async def has_index_access(
index_id: str,
current_user: UserInDB = Depends(get_current_active_user)
) -> bool:
"""Check if user has access to specific index"""
# Admin users have access to all indices
if current_user.role == UserRole.ADMIN:
return True
# Check if user has explicit access to this index
return index_id in current_user.index_access
def require_index_access(index_id: str):
"""Dependency to require index access"""
async def check_access(current_user: UserInDB = Depends(get_current_active_user)):
if not await has_index_access(index_id, current_user):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied to this index"
)
return current_user
return check_access