"""Admin panel API — System Prompt, TOV configuration, and User management. Only users with role='admin' can access these endpoints. All previous prompt versions are preserved for FCA audit trail. """ from __future__ import annotations import uuid from datetime import datetime, timezone import bcrypt from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel from sqlalchemy import select, update, func from sqlalchemy.ext.asyncio import AsyncSession from app.api.auth import get_current_user from app.database import get_db from app.models.system_prompt import SystemPrompt from app.models.user import User router = APIRouter() def require_admin(current_user: User = Depends(get_current_user)) -> User: if current_user.role != "admin": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required") return current_user class SystemPromptCreate(BaseModel): label: str system_text: str tov_text: str | None = None tov_banned_phrases: str = "say hello to,give you peace of mind,it's a win-win,win-win,game-changer" short_title_max: int = 32 long_body_max: int = 128 cta_max: int = 50 class SystemPromptOut(BaseModel): id: uuid.UUID version: int label: str system_text: str tov_text: str | None = None tov_banned_phrases: str short_title_max: int long_body_max: int cta_max: int is_active: bool created_at: datetime class Config: from_attributes = True @router.get("/system-prompts", response_model=list[SystemPromptOut]) async def list_system_prompts( db: AsyncSession = Depends(get_db), admin: User = Depends(require_admin), ): result = await db.execute(select(SystemPrompt).order_by(SystemPrompt.version.desc())) return result.scalars().all() @router.get("/system-prompts/active", response_model=SystemPromptOut) async def get_active_prompt( db: AsyncSession = Depends(get_db), admin: User = Depends(require_admin), ): result = await db.execute(select(SystemPrompt).where(SystemPrompt.is_active == True)) # noqa: E712 prompt = result.scalar_one_or_none() if not prompt: raise HTTPException(status_code=404, detail="No active system prompt") return prompt @router.post("/system-prompts", response_model=SystemPromptOut, status_code=201) async def create_system_prompt( payload: SystemPromptCreate, db: AsyncSession = Depends(get_db), admin: User = Depends(require_admin), ): # Determine next version number result = await db.execute(select(SystemPrompt).order_by(SystemPrompt.version.desc()).limit(1)) latest = result.scalar_one_or_none() next_version = (latest.version + 1) if latest else 1 prompt = SystemPrompt( version=next_version, created_by_id=admin.id, created_at=datetime.now(timezone.utc), **payload.model_dump(), ) db.add(prompt) await db.commit() await db.refresh(prompt) return prompt @router.post("/system-prompts/{prompt_id}/activate", response_model=SystemPromptOut) async def activate_system_prompt( prompt_id: uuid.UUID, db: AsyncSession = Depends(get_db), admin: User = Depends(require_admin), ): # Deactivate all await db.execute(update(SystemPrompt).values(is_active=False)) result = await db.execute(select(SystemPrompt).where(SystemPrompt.id == prompt_id)) prompt = result.scalar_one_or_none() if not prompt: raise HTTPException(status_code=404, detail="System prompt not found") prompt.is_active = True await db.commit() await db.refresh(prompt) return prompt @router.put("/system-prompts/{prompt_id}", response_model=SystemPromptOut) async def update_system_prompt( prompt_id: uuid.UUID, payload: SystemPromptCreate, db: AsyncSession = Depends(get_db), admin: User = Depends(require_admin), ): """Update creates a new version (immutable audit trail) and deactivates old.""" result = await db.execute(select(SystemPrompt).where(SystemPrompt.id == prompt_id)) old = result.scalar_one_or_none() if not old: raise HTTPException(status_code=404, detail="System prompt not found") was_active = old.is_active old.is_active = False new_version = old.version + 1 new_prompt = SystemPrompt( version=new_version, created_by_id=admin.id, created_at=datetime.now(timezone.utc), is_active=was_active, **payload.model_dump(), ) db.add(new_prompt) await db.commit() await db.refresh(new_prompt) return new_prompt # ── User management ─────────────────────────────────────────────────────────── class UserOut(BaseModel): id: uuid.UUID email: str role: str created_at: datetime class Config: from_attributes = True class UserCreate(BaseModel): email: str password: str role: str = "user" class UserUpdate(BaseModel): email: str | None = None role: str | None = None password: str | None = None @router.get("/users", response_model=list[UserOut]) async def list_users( db: AsyncSession = Depends(get_db), admin: User = Depends(require_admin), ): result = await db.execute(select(User).order_by(User.created_at)) return result.scalars().all() @router.post("/users", response_model=UserOut, status_code=201) async def create_user( payload: UserCreate, db: AsyncSession = Depends(get_db), admin: User = Depends(require_admin), ): existing = await db.execute(select(User).where(func.lower(User.email) == payload.email.lower())) if existing.scalar_one_or_none(): raise HTTPException(status_code=409, detail="Email already registered") if payload.role not in ("user", "admin"): raise HTTPException(status_code=422, detail="Role must be 'user' or 'admin'") hashed = bcrypt.hashpw(payload.password.encode(), bcrypt.gensalt()).decode() user = User(email=payload.email.lower(), hashed_password=hashed, role=payload.role) db.add(user) await db.commit() await db.refresh(user) return user @router.patch("/users/{user_id}", response_model=UserOut) async def update_user( user_id: uuid.UUID, payload: UserUpdate, db: AsyncSession = Depends(get_db), admin: User = Depends(require_admin), ): result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") if payload.role is not None: if payload.role not in ("user", "admin"): raise HTTPException(status_code=422, detail="Role must be 'user' or 'admin'") # Prevent removing last admin if user.role == "admin" and payload.role != "admin": count = await db.execute(select(func.count()).select_from(User).where(User.role == "admin")) if count.scalar() <= 1: raise HTTPException(status_code=409, detail="Cannot demote the only admin") user.role = payload.role if payload.email is not None: user.email = payload.email.lower() if payload.password is not None: user.hashed_password = bcrypt.hashpw(payload.password.encode(), bcrypt.gensalt()).decode() await db.commit() await db.refresh(user) return user @router.delete("/users/{user_id}", status_code=204) async def delete_user( user_id: uuid.UUID, db: AsyncSession = Depends(get_db), admin: User = Depends(require_admin), ): result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") if user.id == admin.id: raise HTTPException(status_code=409, detail="Cannot delete your own account") if user.role == "admin": count = await db.execute(select(func.count()).select_from(User).where(User.role == "admin")) if count.scalar() <= 1: raise HTTPException(status_code=409, detail="Cannot delete the only admin") await db.delete(user) await db.commit()