- Add `await db.refresh(user)` after `db.flush()` in create_user and update_user so server-generated `updated_at` is available before model_validate (async SQLAlchemy cannot lazy-load expired attributes) - Add DialogDescription to satisfy Radix UI aria requirement - Wrap form fields in <form> to resolve browser password-not-in-form warning Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
132 lines
4.2 KiB
Python
132 lines
4.2 KiB
Python
from uuid import UUID
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.auth.service import AuthService
|
|
from app.dependencies import get_db, require_role
|
|
from app.models.user import User, UserClient, UserStatus
|
|
from app.schemas.common import PaginatedResponse
|
|
from app.schemas.user import UserCreate, UserResponse, UserUpdate
|
|
|
|
router = APIRouter(prefix="/users", tags=["users"])
|
|
auth_service = AuthService()
|
|
|
|
|
|
@router.post(
|
|
"",
|
|
response_model=UserResponse,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
async def create_user(
|
|
body: UserCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: dict = Depends(require_role(["admin"])),
|
|
) -> UserResponse:
|
|
"""Create a new user (admin only)."""
|
|
# Check for duplicate email
|
|
existing = await db.execute(select(User).where(User.email == body.email))
|
|
if existing.scalar_one_or_none() is not None:
|
|
raise HTTPException(status_code=400, detail="Email already registered")
|
|
|
|
user = User(
|
|
email=body.email,
|
|
name=body.name,
|
|
password_hash=auth_service.hash_password(body.password),
|
|
role=body.role,
|
|
status=UserStatus.active,
|
|
)
|
|
db.add(user)
|
|
await db.flush()
|
|
|
|
# Associate with clients
|
|
for client_id in body.client_ids:
|
|
uc = UserClient(user_id=user.id, client_id=client_id)
|
|
db.add(uc)
|
|
|
|
await db.flush()
|
|
await db.refresh(user)
|
|
return UserResponse.model_validate(user)
|
|
|
|
|
|
@router.get("", response_model=PaginatedResponse[UserResponse])
|
|
async def list_users(
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(20, ge=1, le=100),
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: dict = Depends(require_role(["admin"])),
|
|
) -> PaginatedResponse[UserResponse]:
|
|
"""List all users (admin only)."""
|
|
count_result = await db.execute(select(func.count(User.id)))
|
|
total = count_result.scalar() or 0
|
|
|
|
result = await db.execute(
|
|
select(User)
|
|
.order_by(User.created_at.desc())
|
|
.offset((page - 1) * page_size)
|
|
.limit(page_size)
|
|
)
|
|
users = [UserResponse.model_validate(u) for u in result.scalars().all()]
|
|
|
|
pages = (total + page_size - 1) // page_size if total > 0 else 1
|
|
return PaginatedResponse(
|
|
items=users, total=total, page=page, page_size=page_size, pages=pages
|
|
)
|
|
|
|
|
|
@router.get("/{user_id}", response_model=UserResponse)
|
|
async def get_user(
|
|
user_id: UUID,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: dict = Depends(require_role(["admin"])),
|
|
) -> UserResponse:
|
|
"""Get a user by ID (admin only)."""
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
if user is None:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return UserResponse.model_validate(user)
|
|
|
|
|
|
@router.put("/{user_id}", response_model=UserResponse)
|
|
async def update_user(
|
|
user_id: UUID,
|
|
body: UserUpdate,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: dict = Depends(require_role(["admin"])),
|
|
) -> UserResponse:
|
|
"""Update a user (admin only)."""
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
if user is None:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
update_data = body.model_dump(exclude_unset=True)
|
|
if "password" in update_data:
|
|
update_data["password_hash"] = auth_service.hash_password(
|
|
update_data.pop("password")
|
|
)
|
|
|
|
for field, value in update_data.items():
|
|
setattr(user, field, value)
|
|
|
|
await db.flush()
|
|
await db.refresh(user)
|
|
return UserResponse.model_validate(user)
|
|
|
|
|
|
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_user(
|
|
user_id: UUID,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: dict = Depends(require_role(["admin"])),
|
|
) -> None:
|
|
"""Soft-delete a user by setting status to inactive (admin only)."""
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
if user is None:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
user.status = UserStatus.inactive
|
|
await db.flush()
|