gmal-scope-builder/backend/app/api/users.py
DJP c49f83a5a2 Role-based access control: Viewer / Editor / Admin
Backend:
- AppUser model with email, name, role (viewer/editor/admin), azure_oid
- Users API: GET /users/me (current user + role), GET /users (admin: list all),
  PUT /users/{id}/role (admin: change role)
- Auto-create user on first login: first user = admin, rest = editor
- get_or_create_user helper for role lookup
- require_role helper for permission checks

Frontend:
- UserRoleContext provides role to all components
- useUserRole() hook: isAdmin, isEditor, isViewer
- Nav items filtered by role: GMAL Editor + Users only for admin
- Dashboard: Ingest button admin-only, New Project editor-only
- User Management page: list all users, change roles via dropdown
- Role badges: admin (red), editor (gold), viewer (grey)

Roles:
- Viewer: view projects, download exports
- Editor: create/edit projects, upload, match, build ratecards
- Admin: all + GMAL Editor, data ingest, user management

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 12:29:04 -04:00

112 lines
3.5 KiB
Python

"""User management and role-based access control endpoints."""
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.user import AppUser, UserRole
from app.middleware.auth import get_current_user
router = APIRouter()
async def get_or_create_user(db: AsyncSession, auth_user: dict) -> AppUser:
"""Get or create an AppUser from the auth user dict."""
email = (auth_user.get("email") or "").lower().strip()
if not email:
email = (auth_user.get("oid") or "unknown") + "@local"
result = await db.execute(select(AppUser).where(AppUser.email == email))
app_user = result.scalar_one_or_none()
if not app_user:
# First user is admin, rest are editors
count_result = await db.execute(select(AppUser))
is_first = len(count_result.scalars().all()) == 0
app_user = AppUser(
email=email,
name=auth_user.get("name"),
role=UserRole.ADMIN if is_first else UserRole.EDITOR,
azure_oid=auth_user.get("oid"),
)
db.add(app_user)
await db.flush()
app_user.last_login = datetime.utcnow()
if auth_user.get("name") and not app_user.name:
app_user.name = auth_user["name"]
await db.commit()
return app_user
async def require_role(role: UserRole, db: AsyncSession, auth_user: dict):
"""Check that the current user has at least the given role."""
app_user = await get_or_create_user(db, auth_user)
role_order = {UserRole.VIEWER: 0, UserRole.EDITOR: 1, UserRole.ADMIN: 2}
if role_order.get(app_user.role, 0) < role_order.get(role, 0):
raise HTTPException(status_code=403, detail=f"{role.value} access required")
return app_user
@router.get("/me")
async def get_current_user_info(
db: AsyncSession = Depends(get_db),
auth_user: dict = Depends(get_current_user),
):
app_user = await get_or_create_user(db, auth_user)
return {
"id": app_user.id,
"email": app_user.email,
"name": app_user.name,
"role": app_user.role.value,
}
@router.get("")
async def list_users(
db: AsyncSession = Depends(get_db),
auth_user: dict = Depends(get_current_user),
):
"""List all users (admin only)."""
await require_role(UserRole.ADMIN, db, auth_user)
result = await db.execute(select(AppUser).order_by(AppUser.email))
users = result.scalars().all()
return [
{
"id": u.id,
"email": u.email,
"name": u.name,
"role": u.role.value,
"last_login": u.last_login.isoformat() if u.last_login else None,
}
for u in users
]
@router.put("/{user_id}/role")
async def update_user_role(
user_id: int,
data: dict,
db: AsyncSession = Depends(get_db),
auth_user: dict = Depends(get_current_user),
):
"""Change a user's role (admin only)."""
await require_role(UserRole.ADMIN, db, auth_user)
new_role = data.get("role")
if new_role not in [r.value for r in UserRole]:
raise HTTPException(status_code=400, detail=f"Invalid role: {new_role}")
result = await db.execute(select(AppUser).where(AppUser.id == user_id))
target = result.scalar_one_or_none()
if not target:
raise HTTPException(status_code=404, detail="User not found")
target.role = UserRole(new_role)
await db.commit()
return {"detail": f"User {target.email} role updated to {new_role}"}