Barclays-banner-builder/backend/app/api/admin.py
Vadym Samoilenko e53892013f Replace TOV checkboxes with free-text field (tov_text)
- Add tov_text (nullable Text) column to system_prompts table
- Migration 0002: ALTER TABLE ADD COLUMN tov_text
- AdminPage: checkboxes replaced with textarea — write TOV rules like a prompt
- copy_generation: use tov_text verbatim instead of building from boolean flags
- Legacy boolean flags kept in DB for backward compat, no longer in UI/schemas

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 14:05:51 +01:00

259 lines
8.1 KiB
Python

"""Admin panel API — System Prompt, TOV configuration, and User management.
Only users with role='admin' can access these endpoints.
All previous prompt versions are preserved for FCA audit trail.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
import bcrypt
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy import select, update, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.auth import get_current_user
from app.database import get_db
from app.models.system_prompt import SystemPrompt
from app.models.user import User
router = APIRouter()
def require_admin(current_user: User = Depends(get_current_user)) -> User:
if current_user.role != "admin":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
return current_user
class SystemPromptCreate(BaseModel):
label: str
system_text: str
tov_text: str | None = None
tov_banned_phrases: str = "say hello to,give you peace of mind,it's a win-win,win-win,game-changer"
short_title_max: int = 32
long_body_max: int = 128
cta_max: int = 50
class SystemPromptOut(BaseModel):
id: uuid.UUID
version: int
label: str
system_text: str
tov_text: str | None = None
tov_banned_phrases: str
short_title_max: int
long_body_max: int
cta_max: int
is_active: bool
created_at: datetime
class Config:
from_attributes = True
@router.get("/system-prompts", response_model=list[SystemPromptOut])
async def list_system_prompts(
db: AsyncSession = Depends(get_db),
admin: User = Depends(require_admin),
):
result = await db.execute(select(SystemPrompt).order_by(SystemPrompt.version.desc()))
return result.scalars().all()
@router.get("/system-prompts/active", response_model=SystemPromptOut)
async def get_active_prompt(
db: AsyncSession = Depends(get_db),
admin: User = Depends(require_admin),
):
result = await db.execute(select(SystemPrompt).where(SystemPrompt.is_active == True)) # noqa: E712
prompt = result.scalar_one_or_none()
if not prompt:
raise HTTPException(status_code=404, detail="No active system prompt")
return prompt
@router.post("/system-prompts", response_model=SystemPromptOut, status_code=201)
async def create_system_prompt(
payload: SystemPromptCreate,
db: AsyncSession = Depends(get_db),
admin: User = Depends(require_admin),
):
# Determine next version number
result = await db.execute(select(SystemPrompt).order_by(SystemPrompt.version.desc()).limit(1))
latest = result.scalar_one_or_none()
next_version = (latest.version + 1) if latest else 1
prompt = SystemPrompt(
version=next_version,
created_by_id=admin.id,
created_at=datetime.now(timezone.utc),
**payload.model_dump(),
)
db.add(prompt)
await db.commit()
await db.refresh(prompt)
return prompt
@router.post("/system-prompts/{prompt_id}/activate", response_model=SystemPromptOut)
async def activate_system_prompt(
prompt_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
admin: User = Depends(require_admin),
):
# Deactivate all
await db.execute(update(SystemPrompt).values(is_active=False))
result = await db.execute(select(SystemPrompt).where(SystemPrompt.id == prompt_id))
prompt = result.scalar_one_or_none()
if not prompt:
raise HTTPException(status_code=404, detail="System prompt not found")
prompt.is_active = True
await db.commit()
await db.refresh(prompt)
return prompt
@router.put("/system-prompts/{prompt_id}", response_model=SystemPromptOut)
async def update_system_prompt(
prompt_id: uuid.UUID,
payload: SystemPromptCreate,
db: AsyncSession = Depends(get_db),
admin: User = Depends(require_admin),
):
"""Update creates a new version (immutable audit trail) and deactivates old."""
result = await db.execute(select(SystemPrompt).where(SystemPrompt.id == prompt_id))
old = result.scalar_one_or_none()
if not old:
raise HTTPException(status_code=404, detail="System prompt not found")
was_active = old.is_active
old.is_active = False
new_version = old.version + 1
new_prompt = SystemPrompt(
version=new_version,
created_by_id=admin.id,
created_at=datetime.now(timezone.utc),
is_active=was_active,
**payload.model_dump(),
)
db.add(new_prompt)
await db.commit()
await db.refresh(new_prompt)
return new_prompt
# ── User management ───────────────────────────────────────────────────────────
class UserOut(BaseModel):
id: uuid.UUID
email: str
role: str
created_at: datetime
class Config:
from_attributes = True
class UserCreate(BaseModel):
email: str
password: str
role: str = "user"
class UserUpdate(BaseModel):
email: str | None = None
role: str | None = None
password: str | None = None
@router.get("/users", response_model=list[UserOut])
async def list_users(
db: AsyncSession = Depends(get_db),
admin: User = Depends(require_admin),
):
result = await db.execute(select(User).order_by(User.created_at))
return result.scalars().all()
@router.post("/users", response_model=UserOut, status_code=201)
async def create_user(
payload: UserCreate,
db: AsyncSession = Depends(get_db),
admin: User = Depends(require_admin),
):
existing = await db.execute(select(User).where(func.lower(User.email) == payload.email.lower()))
if existing.scalar_one_or_none():
raise HTTPException(status_code=409, detail="Email already registered")
if payload.role not in ("user", "admin"):
raise HTTPException(status_code=422, detail="Role must be 'user' or 'admin'")
hashed = bcrypt.hashpw(payload.password.encode(), bcrypt.gensalt()).decode()
user = User(email=payload.email.lower(), hashed_password=hashed, role=payload.role)
db.add(user)
await db.commit()
await db.refresh(user)
return user
@router.patch("/users/{user_id}", response_model=UserOut)
async def update_user(
user_id: uuid.UUID,
payload: UserUpdate,
db: AsyncSession = Depends(get_db),
admin: User = Depends(require_admin),
):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if payload.role is not None:
if payload.role not in ("user", "admin"):
raise HTTPException(status_code=422, detail="Role must be 'user' or 'admin'")
# Prevent removing last admin
if user.role == "admin" and payload.role != "admin":
count = await db.execute(select(func.count()).select_from(User).where(User.role == "admin"))
if count.scalar() <= 1:
raise HTTPException(status_code=409, detail="Cannot demote the only admin")
user.role = payload.role
if payload.email is not None:
user.email = payload.email.lower()
if payload.password is not None:
user.hashed_password = bcrypt.hashpw(payload.password.encode(), bcrypt.gensalt()).decode()
await db.commit()
await db.refresh(user)
return user
@router.delete("/users/{user_id}", status_code=204)
async def delete_user(
user_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
admin: User = Depends(require_admin),
):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if user.id == admin.id:
raise HTTPException(status_code=409, detail="Cannot delete your own account")
if user.role == "admin":
count = await db.execute(select(func.count()).select_from(User).where(User.role == "admin"))
if count.scalar() <= 1:
raise HTTPException(status_code=409, detail="Cannot delete the only admin")
await db.delete(user)
await db.commit()