presenton/servers/fastapi/api/v1/auth/router.py

83 lines
2.5 KiB
Python

from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel, Field
from starlette.responses import JSONResponse
from utils.simple_auth import (
clear_session_cookie,
create_session_token,
get_auth_status,
get_session_token_from_request,
is_auth_configured,
set_session_cookie,
setup_initial_credentials,
verify_credentials,
)
API_V1_AUTH_ROUTER = APIRouter(prefix="/api/v1/auth", tags=["Auth"])
class AuthCredentialsRequest(BaseModel):
username: str = Field(min_length=3, max_length=128)
password: str = Field(min_length=6, max_length=256)
@API_V1_AUTH_ROUTER.get("/status")
async def get_status(request: Request):
token = get_session_token_from_request(request)
return get_auth_status(token)
@API_V1_AUTH_ROUTER.get("/verify")
async def verify_session(request: Request):
auth_status = get_auth_status(get_session_token_from_request(request))
if not auth_status["configured"] or not auth_status["authenticated"]:
raise HTTPException(status_code=401, detail="Unauthorized")
return {
"authenticated": True,
"username": auth_status.get("username"),
}
@API_V1_AUTH_ROUTER.post("/setup")
async def setup_credentials(body: AuthCredentialsRequest, request: Request):
if is_auth_configured():
raise HTTPException(status_code=409, detail="Credentials already configured")
try:
setup_initial_credentials(body.username, body.password)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
username = body.username.strip()
return JSONResponse(
{
"configured": True,
"authenticated": False,
"username": username,
}
)
@API_V1_AUTH_ROUTER.post("/login")
async def login(body: AuthCredentialsRequest, request: Request):
if not is_auth_configured():
raise HTTPException(status_code=428, detail="Login setup is required")
if not verify_credentials(body.username, body.password):
raise HTTPException(status_code=401, detail="Unauthorized")
username = body.username.strip()
token = create_session_token(username)
response = JSONResponse(
{"configured": True, "authenticated": True, "username": username}
)
set_session_cookie(response, token, request)
return response
@API_V1_AUTH_ROUTER.post("/logout")
async def logout(request: Request):
response = JSONResponse({"success": True})
clear_session_cookie(response, request)
return response