Enhance authentication handling in FastAPI by adding basic authentication support and improving session token management. Introduced new utility functions for retrieving configured usernames and basic auth credentials, and updated middleware to utilize these enhancements for better session management.

This commit is contained in:
sudipnext 2026-04-23 17:00:17 +05:45
parent ad3b31a359
commit f8da1802fd
3 changed files with 56 additions and 2 deletions

View file

@ -3,7 +3,12 @@ from starlette.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from utils.get_env import get_can_change_keys_env
from utils.simple_auth import get_auth_status, get_session_token_from_request
from utils.simple_auth import (
get_auth_status,
get_basic_auth_credentials_from_request,
get_session_token_from_request,
verify_credentials,
)
from utils.user_config import update_env_with_user_config
@ -55,6 +60,13 @@ class SessionAuthMiddleware(BaseHTTPMiddleware):
)
if not auth_status["authenticated"]:
basic_credentials = get_basic_auth_credentials_from_request(request)
if basic_credentials and verify_credentials(
basic_credentials[0], basic_credentials[1]
):
request.state.auth_username = basic_credentials[0].strip()
return await call_next(request)
return JSONResponse(
status_code=401,
content={"detail": "Unauthorized"},

View file

@ -2,12 +2,23 @@ import aiohttp
from fastapi import HTTPException
from templates.presentation_layout import PresentationLayoutModel
from utils.simple_auth import (
SESSION_COOKIE_NAME,
create_session_token,
get_configured_auth_username,
)
async def get_layout_by_name(layout_name: str) -> PresentationLayoutModel:
url = f"http://localhost/api/template?group={layout_name}"
headers = {}
auth_username = get_configured_auth_username()
if auth_username:
internal_token = create_session_token(auth_username)
headers["Cookie"] = f"{SESSION_COOKIE_NAME}={internal_token}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
async with session.get(url, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
raise HTTPException(

View file

@ -98,6 +98,14 @@ def is_auth_configured() -> bool:
return bool(config.get("AUTH_USERNAME") and config.get("AUTH_PASSWORD_HASH"))
def get_configured_auth_username() -> Optional[str]:
config = _load_user_config()
username = config.get("AUTH_USERNAME")
if isinstance(username, str) and username.strip():
return username.strip()
return None
def setup_initial_credentials(username: str, password: str) -> None:
cleaned_username = (username or "").strip()
if len(cleaned_username) < 3:
@ -244,6 +252,29 @@ def get_session_token_from_request(request: Request) -> Optional[str]:
return None
def get_basic_auth_credentials_from_request(
request: Request,
) -> Optional[tuple[str, str]]:
auth_header = request.headers.get("Authorization", "")
if not auth_header.lower().startswith("basic "):
return None
encoded_value = auth_header[6:].strip()
if not encoded_value:
return None
try:
decoded_value = base64.b64decode(encoded_value).decode("utf-8")
except Exception:
return None
if ":" not in decoded_value:
return None
username, password = decoded_value.split(":", 1)
return username, password
def get_auth_status(session_token: Optional[str] = None) -> dict:
config = _load_user_config()
configured = bool(config.get("AUTH_USERNAME") and config.get("AUTH_PASSWORD_HASH"))