- Add tov_text (nullable Text) column to system_prompts table - Migration 0002: ALTER TABLE ADD COLUMN tov_text - AdminPage: checkboxes replaced with textarea — write TOV rules like a prompt - copy_generation: use tov_text verbatim instead of building from boolean flags - Legacy boolean flags kept in DB for backward compat, no longer in UI/schemas Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
259 lines
8.1 KiB
Python
259 lines
8.1 KiB
Python
"""Admin panel API — System Prompt, TOV configuration, and User management.
|
|
|
|
Only users with role='admin' can access these endpoints.
|
|
All previous prompt versions are preserved for FCA audit trail.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
import bcrypt
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import select, update, func
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.api.auth import get_current_user
|
|
from app.database import get_db
|
|
from app.models.system_prompt import SystemPrompt
|
|
from app.models.user import User
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def require_admin(current_user: User = Depends(get_current_user)) -> User:
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
|
|
return current_user
|
|
|
|
|
|
class SystemPromptCreate(BaseModel):
|
|
label: str
|
|
system_text: str
|
|
tov_text: str | None = None
|
|
tov_banned_phrases: str = "say hello to,give you peace of mind,it's a win-win,win-win,game-changer"
|
|
short_title_max: int = 32
|
|
long_body_max: int = 128
|
|
cta_max: int = 50
|
|
|
|
|
|
class SystemPromptOut(BaseModel):
|
|
id: uuid.UUID
|
|
version: int
|
|
label: str
|
|
system_text: str
|
|
tov_text: str | None = None
|
|
tov_banned_phrases: str
|
|
short_title_max: int
|
|
long_body_max: int
|
|
cta_max: int
|
|
is_active: bool
|
|
created_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
@router.get("/system-prompts", response_model=list[SystemPromptOut])
|
|
async def list_system_prompts(
|
|
db: AsyncSession = Depends(get_db),
|
|
admin: User = Depends(require_admin),
|
|
):
|
|
result = await db.execute(select(SystemPrompt).order_by(SystemPrompt.version.desc()))
|
|
return result.scalars().all()
|
|
|
|
|
|
@router.get("/system-prompts/active", response_model=SystemPromptOut)
|
|
async def get_active_prompt(
|
|
db: AsyncSession = Depends(get_db),
|
|
admin: User = Depends(require_admin),
|
|
):
|
|
result = await db.execute(select(SystemPrompt).where(SystemPrompt.is_active == True)) # noqa: E712
|
|
prompt = result.scalar_one_or_none()
|
|
if not prompt:
|
|
raise HTTPException(status_code=404, detail="No active system prompt")
|
|
return prompt
|
|
|
|
|
|
@router.post("/system-prompts", response_model=SystemPromptOut, status_code=201)
|
|
async def create_system_prompt(
|
|
payload: SystemPromptCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
admin: User = Depends(require_admin),
|
|
):
|
|
# Determine next version number
|
|
result = await db.execute(select(SystemPrompt).order_by(SystemPrompt.version.desc()).limit(1))
|
|
latest = result.scalar_one_or_none()
|
|
next_version = (latest.version + 1) if latest else 1
|
|
|
|
prompt = SystemPrompt(
|
|
version=next_version,
|
|
created_by_id=admin.id,
|
|
created_at=datetime.now(timezone.utc),
|
|
**payload.model_dump(),
|
|
)
|
|
db.add(prompt)
|
|
await db.commit()
|
|
await db.refresh(prompt)
|
|
return prompt
|
|
|
|
|
|
@router.post("/system-prompts/{prompt_id}/activate", response_model=SystemPromptOut)
|
|
async def activate_system_prompt(
|
|
prompt_id: uuid.UUID,
|
|
db: AsyncSession = Depends(get_db),
|
|
admin: User = Depends(require_admin),
|
|
):
|
|
# Deactivate all
|
|
await db.execute(update(SystemPrompt).values(is_active=False))
|
|
|
|
result = await db.execute(select(SystemPrompt).where(SystemPrompt.id == prompt_id))
|
|
prompt = result.scalar_one_or_none()
|
|
if not prompt:
|
|
raise HTTPException(status_code=404, detail="System prompt not found")
|
|
|
|
prompt.is_active = True
|
|
await db.commit()
|
|
await db.refresh(prompt)
|
|
return prompt
|
|
|
|
|
|
@router.put("/system-prompts/{prompt_id}", response_model=SystemPromptOut)
|
|
async def update_system_prompt(
|
|
prompt_id: uuid.UUID,
|
|
payload: SystemPromptCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
admin: User = Depends(require_admin),
|
|
):
|
|
"""Update creates a new version (immutable audit trail) and deactivates old."""
|
|
result = await db.execute(select(SystemPrompt).where(SystemPrompt.id == prompt_id))
|
|
old = result.scalar_one_or_none()
|
|
if not old:
|
|
raise HTTPException(status_code=404, detail="System prompt not found")
|
|
|
|
was_active = old.is_active
|
|
old.is_active = False
|
|
|
|
new_version = old.version + 1
|
|
new_prompt = SystemPrompt(
|
|
version=new_version,
|
|
created_by_id=admin.id,
|
|
created_at=datetime.now(timezone.utc),
|
|
is_active=was_active,
|
|
**payload.model_dump(),
|
|
)
|
|
db.add(new_prompt)
|
|
await db.commit()
|
|
await db.refresh(new_prompt)
|
|
return new_prompt
|
|
|
|
|
|
# ── User management ───────────────────────────────────────────────────────────
|
|
|
|
class UserOut(BaseModel):
|
|
id: uuid.UUID
|
|
email: str
|
|
role: str
|
|
created_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class UserCreate(BaseModel):
|
|
email: str
|
|
password: str
|
|
role: str = "user"
|
|
|
|
|
|
class UserUpdate(BaseModel):
|
|
email: str | None = None
|
|
role: str | None = None
|
|
password: str | None = None
|
|
|
|
|
|
@router.get("/users", response_model=list[UserOut])
|
|
async def list_users(
|
|
db: AsyncSession = Depends(get_db),
|
|
admin: User = Depends(require_admin),
|
|
):
|
|
result = await db.execute(select(User).order_by(User.created_at))
|
|
return result.scalars().all()
|
|
|
|
|
|
@router.post("/users", response_model=UserOut, status_code=201)
|
|
async def create_user(
|
|
payload: UserCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
admin: User = Depends(require_admin),
|
|
):
|
|
existing = await db.execute(select(User).where(func.lower(User.email) == payload.email.lower()))
|
|
if existing.scalar_one_or_none():
|
|
raise HTTPException(status_code=409, detail="Email already registered")
|
|
|
|
if payload.role not in ("user", "admin"):
|
|
raise HTTPException(status_code=422, detail="Role must be 'user' or 'admin'")
|
|
|
|
hashed = bcrypt.hashpw(payload.password.encode(), bcrypt.gensalt()).decode()
|
|
user = User(email=payload.email.lower(), hashed_password=hashed, role=payload.role)
|
|
db.add(user)
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
@router.patch("/users/{user_id}", response_model=UserOut)
|
|
async def update_user(
|
|
user_id: uuid.UUID,
|
|
payload: UserUpdate,
|
|
db: AsyncSession = Depends(get_db),
|
|
admin: User = Depends(require_admin),
|
|
):
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
if payload.role is not None:
|
|
if payload.role not in ("user", "admin"):
|
|
raise HTTPException(status_code=422, detail="Role must be 'user' or 'admin'")
|
|
# Prevent removing last admin
|
|
if user.role == "admin" and payload.role != "admin":
|
|
count = await db.execute(select(func.count()).select_from(User).where(User.role == "admin"))
|
|
if count.scalar() <= 1:
|
|
raise HTTPException(status_code=409, detail="Cannot demote the only admin")
|
|
user.role = payload.role
|
|
|
|
if payload.email is not None:
|
|
user.email = payload.email.lower()
|
|
|
|
if payload.password is not None:
|
|
user.hashed_password = bcrypt.hashpw(payload.password.encode(), bcrypt.gensalt()).decode()
|
|
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
@router.delete("/users/{user_id}", status_code=204)
|
|
async def delete_user(
|
|
user_id: uuid.UUID,
|
|
db: AsyncSession = Depends(get_db),
|
|
admin: User = Depends(require_admin),
|
|
):
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
if user.id == admin.id:
|
|
raise HTTPException(status_code=409, detail="Cannot delete your own account")
|
|
|
|
if user.role == "admin":
|
|
count = await db.execute(select(func.count()).select_from(User).where(User.role == "admin"))
|
|
if count.scalar() <= 1:
|
|
raise HTTPException(status_code=409, detail="Cannot delete the only admin")
|
|
|
|
await db.delete(user)
|
|
await db.commit()
|