oliver-metadata-tool/app/routers/auth.py
SamoilenkoVadym f8711c1ace Switch SSO to client-side MSAL.js (no client secret needed)
- Frontend MSAL.js handles Azure AD popup login
- Backend validates access token via Graph API
- Removed server-side MSAL redirect flow (get_auth_url, acquire_token)
- MicrosoftSSO class simplified: only needs Graph API validation
- No AZURE_CLIENT_SECRET required

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-02-09 21:42:10 +00:00

190 lines
6 KiB
Python

"""Authentication router: login, logout, Microsoft SSO."""
import secrets
import logging
from typing import Dict
from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from ..config import get_settings, Settings
from ..dependencies import get_auth_service, get_current_user_optional
from ..security import limiter
from ..services.auth_service import AuthService
logger = logging.getLogger(__name__)
router = APIRouter(tags=["auth"])
# Templates are set from main.py after mounting
_templates: Jinja2Templates = None
def set_templates(templates: Jinja2Templates):
global _templates
_templates = templates
@router.get("/login", response_class=HTMLResponse)
async def login_page(
request: Request,
error: str = None,
info: str = None,
settings: Settings = Depends(get_settings),
auth: AuthService = Depends(get_auth_service),
):
"""Render login page."""
# If already logged in, redirect to index
user = await get_current_user_optional(request)
if user:
root = request.scope.get("root_path", "")
return RedirectResponse(url=f"{root}/", status_code=302)
return _templates.TemplateResponse(
"login.html",
{
"request": request,
"error": error,
"info": info,
"sso_enabled": auth.sso_enabled,
"azure_client_id": settings.AZURE_CLIENT_ID if auth.sso_enabled else "",
"azure_tenant_id": settings.AZURE_TENANT_ID if auth.sso_enabled else "",
"enable_test_user": settings.ENABLE_TEST_USER,
"app_version": settings.APP_VERSION,
},
)
@router.post("/login")
@limiter.limit("5/minute")
async def login_submit(
request: Request,
username: str = Form(...),
password: str = Form(...),
settings: Settings = Depends(get_settings),
auth: AuthService = Depends(get_auth_service),
):
"""Process login form. Rate limited to 5 attempts per minute."""
username = username.strip()
if not username or not password:
return _templates.TemplateResponse(
"login.html",
{
"request": request,
"error": "Please enter both username and password",
"sso_enabled": auth.sso_enabled,
"enable_test_user": settings.ENABLE_TEST_USER,
"app_version": settings.APP_VERSION,
},
)
result = auth.authenticate_user(username, password)
if not result["success"]:
return _templates.TemplateResponse(
"login.html",
{
"request": request,
"error": result.get("error"),
"sso_enabled": auth.sso_enabled,
"enable_test_user": settings.ENABLE_TEST_USER,
"app_version": settings.APP_VERSION,
},
)
user = result["user"]
session_id = auth.create_session(
user=user,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
if not session_id:
return _templates.TemplateResponse(
"login.html",
{
"request": request,
"error": "Failed to create session",
"sso_enabled": auth.sso_enabled,
"enable_test_user": settings.ENABLE_TEST_USER,
"app_version": settings.APP_VERSION,
},
)
# Set session data
request.session["user_id"] = user["id"]
request.session["username"] = user["username"]
request.session["session_id"] = session_id
root = request.scope.get("root_path", "")
next_url = request.query_params.get("next", "/")
# Prefix with root_path if next_url is a relative path
if next_url.startswith("/") and not next_url.startswith(root):
next_url = f"{root}{next_url}"
return RedirectResponse(url=next_url, status_code=302)
@router.get("/logout")
async def logout(
request: Request,
auth: AuthService = Depends(get_auth_service),
):
"""Logout and destroy session."""
user_id = request.session.get("user_id")
session_id = request.session.get("session_id")
if session_id:
auth.destroy_session(session_id, user_id)
request.session.clear()
root = request.scope.get("root_path", "")
return RedirectResponse(url=f"{root}/login", status_code=302)
@router.post("/auth/azure-token")
async def auth_azure_token(
request: Request,
auth: AuthService = Depends(get_auth_service),
):
"""Validate Azure AD access token from client-side MSAL.js.
Frontend handles the OAuth popup/redirect via MSAL.js,
then POSTs the access_token here for server-side validation.
"""
from ..dependencies import get_database
from fastapi.responses import JSONResponse
data = await request.json()
access_token = data.get("access_token", "")
if not access_token:
return JSONResponse({"error": "No access token provided"}, status_code=400)
# Validate token by calling Microsoft Graph API
user_info = auth.sso.get_user_info(access_token)
if not user_info:
return JSONResponse({"error": "Invalid or expired token"}, status_code=401)
# Create or update user from Azure AD info
db = get_database()
user = auth.sso.create_or_update_user(user_info, db)
if not user:
return JSONResponse({"error": "Failed to create user account"}, status_code=500)
# Create session
session_id = auth.create_session(
user=user,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
if not session_id:
return JSONResponse({"error": "Failed to create session"}, status_code=500)
# Set session cookies
request.session["user_id"] = user["id"]
request.session["username"] = user["username"]
request.session["session_id"] = session_id
root = request.scope.get("root_path", "")
return {"success": True, "redirect": f"{root}/"}