Backend:
- AppUser model with email, name, role (viewer/editor/admin), azure_oid
- Users API: GET /users/me (current user + role), GET /users (admin: list all),
PUT /users/{id}/role (admin: change role)
- Auto-create user on first login: first user = admin, rest = editor
- get_or_create_user helper for role lookup
- require_role helper for permission checks
Frontend:
- UserRoleContext provides role to all components
- useUserRole() hook: isAdmin, isEditor, isViewer
- Nav items filtered by role: GMAL Editor + Users only for admin
- Dashboard: Ingest button admin-only, New Project editor-only
- User Management page: list all users, change roles via dropdown
- Role badges: admin (red), editor (gold), viewer (grey)
Roles:
- Viewer: view projects, download exports
- Editor: create/edit projects, upload, match, build ratecards
- Admin: all + GMAL Editor, data ingest, user management
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
112 lines
3.5 KiB
Python
112 lines
3.5 KiB
Python
"""User management and role-based access control endpoints."""
|
|
|
|
from datetime import datetime
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.database import get_db
|
|
from app.models.user import AppUser, UserRole
|
|
from app.middleware.auth import get_current_user
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
async def get_or_create_user(db: AsyncSession, auth_user: dict) -> AppUser:
|
|
"""Get or create an AppUser from the auth user dict."""
|
|
email = (auth_user.get("email") or "").lower().strip()
|
|
if not email:
|
|
email = (auth_user.get("oid") or "unknown") + "@local"
|
|
|
|
result = await db.execute(select(AppUser).where(AppUser.email == email))
|
|
app_user = result.scalar_one_or_none()
|
|
|
|
if not app_user:
|
|
# First user is admin, rest are editors
|
|
count_result = await db.execute(select(AppUser))
|
|
is_first = len(count_result.scalars().all()) == 0
|
|
|
|
app_user = AppUser(
|
|
email=email,
|
|
name=auth_user.get("name"),
|
|
role=UserRole.ADMIN if is_first else UserRole.EDITOR,
|
|
azure_oid=auth_user.get("oid"),
|
|
)
|
|
db.add(app_user)
|
|
await db.flush()
|
|
|
|
app_user.last_login = datetime.utcnow()
|
|
if auth_user.get("name") and not app_user.name:
|
|
app_user.name = auth_user["name"]
|
|
await db.commit()
|
|
return app_user
|
|
|
|
|
|
async def require_role(role: UserRole, db: AsyncSession, auth_user: dict):
|
|
"""Check that the current user has at least the given role."""
|
|
app_user = await get_or_create_user(db, auth_user)
|
|
role_order = {UserRole.VIEWER: 0, UserRole.EDITOR: 1, UserRole.ADMIN: 2}
|
|
if role_order.get(app_user.role, 0) < role_order.get(role, 0):
|
|
raise HTTPException(status_code=403, detail=f"{role.value} access required")
|
|
return app_user
|
|
|
|
|
|
@router.get("/me")
|
|
async def get_current_user_info(
|
|
db: AsyncSession = Depends(get_db),
|
|
auth_user: dict = Depends(get_current_user),
|
|
):
|
|
app_user = await get_or_create_user(db, auth_user)
|
|
return {
|
|
"id": app_user.id,
|
|
"email": app_user.email,
|
|
"name": app_user.name,
|
|
"role": app_user.role.value,
|
|
}
|
|
|
|
|
|
@router.get("")
|
|
async def list_users(
|
|
db: AsyncSession = Depends(get_db),
|
|
auth_user: dict = Depends(get_current_user),
|
|
):
|
|
"""List all users (admin only)."""
|
|
await require_role(UserRole.ADMIN, db, auth_user)
|
|
|
|
result = await db.execute(select(AppUser).order_by(AppUser.email))
|
|
users = result.scalars().all()
|
|
return [
|
|
{
|
|
"id": u.id,
|
|
"email": u.email,
|
|
"name": u.name,
|
|
"role": u.role.value,
|
|
"last_login": u.last_login.isoformat() if u.last_login else None,
|
|
}
|
|
for u in users
|
|
]
|
|
|
|
|
|
@router.put("/{user_id}/role")
|
|
async def update_user_role(
|
|
user_id: int,
|
|
data: dict,
|
|
db: AsyncSession = Depends(get_db),
|
|
auth_user: dict = Depends(get_current_user),
|
|
):
|
|
"""Change a user's role (admin only)."""
|
|
await require_role(UserRole.ADMIN, db, auth_user)
|
|
|
|
new_role = data.get("role")
|
|
if new_role not in [r.value for r in UserRole]:
|
|
raise HTTPException(status_code=400, detail=f"Invalid role: {new_role}")
|
|
|
|
result = await db.execute(select(AppUser).where(AppUser.id == user_id))
|
|
target = result.scalar_one_or_none()
|
|
if not target:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
target.role = UserRole(new_role)
|
|
await db.commit()
|
|
return {"detail": f"User {target.email} role updated to {new_role}"}
|