"""User management and role-based access control endpoints.""" from datetime import datetime from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.database import get_db from app.models.user import AppUser, UserRole from app.middleware.auth import get_current_user router = APIRouter() async def get_or_create_user(db: AsyncSession, auth_user: dict) -> AppUser: """Get or create an AppUser from the auth user dict.""" email = (auth_user.get("email") or "").lower().strip() if not email: email = (auth_user.get("oid") or "unknown") + "@local" result = await db.execute(select(AppUser).where(AppUser.email == email)) app_user = result.scalar_one_or_none() if not app_user: # First user is admin, rest are editors count_result = await db.execute(select(AppUser)) is_first = len(count_result.scalars().all()) == 0 app_user = AppUser( email=email, name=auth_user.get("name"), role=UserRole.ADMIN if is_first else UserRole.EDITOR, azure_oid=auth_user.get("oid"), ) db.add(app_user) await db.flush() app_user.last_login = datetime.utcnow() if auth_user.get("name") and not app_user.name: app_user.name = auth_user["name"] await db.commit() return app_user async def require_role(role: UserRole, db: AsyncSession, auth_user: dict): """Check that the current user has at least the given role.""" app_user = await get_or_create_user(db, auth_user) role_order = {UserRole.VIEWER: 0, UserRole.EDITOR: 1, UserRole.ADMIN: 2} if role_order.get(app_user.role, 0) < role_order.get(role, 0): raise HTTPException(status_code=403, detail=f"{role.value} access required") return app_user @router.get("/me") async def get_current_user_info( db: AsyncSession = Depends(get_db), auth_user: dict = Depends(get_current_user), ): app_user = await get_or_create_user(db, auth_user) return { "id": app_user.id, "email": app_user.email, "name": app_user.name, "role": app_user.role.value, } @router.get("") async def list_users( db: AsyncSession = Depends(get_db), auth_user: dict = Depends(get_current_user), ): """List all users (admin only).""" await require_role(UserRole.ADMIN, db, auth_user) result = await db.execute(select(AppUser).order_by(AppUser.email)) users = result.scalars().all() return [ { "id": u.id, "email": u.email, "name": u.name, "role": u.role.value, "last_login": u.last_login.isoformat() if u.last_login else None, } for u in users ] @router.put("/{user_id}/role") async def update_user_role( user_id: int, data: dict, db: AsyncSession = Depends(get_db), auth_user: dict = Depends(get_current_user), ): """Change a user's role (admin only).""" await require_role(UserRole.ADMIN, db, auth_user) new_role = data.get("role") if new_role not in [r.value for r in UserRole]: raise HTTPException(status_code=400, detail=f"Invalid role: {new_role}") result = await db.execute(select(AppUser).where(AppUser.id == user_id)) target = result.scalar_one_or_none() if not target: raise HTTPException(status_code=404, detail="User not found") target.role = UserRole(new_role) await db.commit() return {"detail": f"User {target.email} role updated to {new_role}"}