- Frontend MSAL.js handles Azure AD popup login - Backend validates access token via Graph API - Removed server-side MSAL redirect flow (get_auth_url, acquire_token) - MicrosoftSSO class simplified: only needs Graph API validation - No AZURE_CLIENT_SECRET required Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
190 lines
6 KiB
Python
190 lines
6 KiB
Python
"""Authentication router: login, logout, Microsoft SSO."""
|
|
|
|
import secrets
|
|
import logging
|
|
from typing import Dict
|
|
from fastapi import APIRouter, Request, Depends, Form
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
from ..config import get_settings, Settings
|
|
from ..dependencies import get_auth_service, get_current_user_optional
|
|
from ..security import limiter
|
|
from ..services.auth_service import AuthService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(tags=["auth"])
|
|
|
|
# Templates are set from main.py after mounting
|
|
_templates: Jinja2Templates = None
|
|
|
|
|
|
def set_templates(templates: Jinja2Templates):
|
|
global _templates
|
|
_templates = templates
|
|
|
|
|
|
@router.get("/login", response_class=HTMLResponse)
|
|
async def login_page(
|
|
request: Request,
|
|
error: str = None,
|
|
info: str = None,
|
|
settings: Settings = Depends(get_settings),
|
|
auth: AuthService = Depends(get_auth_service),
|
|
):
|
|
"""Render login page."""
|
|
# If already logged in, redirect to index
|
|
user = await get_current_user_optional(request)
|
|
if user:
|
|
root = request.scope.get("root_path", "")
|
|
return RedirectResponse(url=f"{root}/", status_code=302)
|
|
|
|
return _templates.TemplateResponse(
|
|
"login.html",
|
|
{
|
|
"request": request,
|
|
"error": error,
|
|
"info": info,
|
|
"sso_enabled": auth.sso_enabled,
|
|
"azure_client_id": settings.AZURE_CLIENT_ID if auth.sso_enabled else "",
|
|
"azure_tenant_id": settings.AZURE_TENANT_ID if auth.sso_enabled else "",
|
|
"enable_test_user": settings.ENABLE_TEST_USER,
|
|
"app_version": settings.APP_VERSION,
|
|
},
|
|
)
|
|
|
|
|
|
@router.post("/login")
|
|
@limiter.limit("5/minute")
|
|
async def login_submit(
|
|
request: Request,
|
|
username: str = Form(...),
|
|
password: str = Form(...),
|
|
settings: Settings = Depends(get_settings),
|
|
auth: AuthService = Depends(get_auth_service),
|
|
):
|
|
"""Process login form. Rate limited to 5 attempts per minute."""
|
|
username = username.strip()
|
|
if not username or not password:
|
|
return _templates.TemplateResponse(
|
|
"login.html",
|
|
{
|
|
"request": request,
|
|
"error": "Please enter both username and password",
|
|
"sso_enabled": auth.sso_enabled,
|
|
"enable_test_user": settings.ENABLE_TEST_USER,
|
|
"app_version": settings.APP_VERSION,
|
|
},
|
|
)
|
|
|
|
result = auth.authenticate_user(username, password)
|
|
|
|
if not result["success"]:
|
|
return _templates.TemplateResponse(
|
|
"login.html",
|
|
{
|
|
"request": request,
|
|
"error": result.get("error"),
|
|
"sso_enabled": auth.sso_enabled,
|
|
"enable_test_user": settings.ENABLE_TEST_USER,
|
|
"app_version": settings.APP_VERSION,
|
|
},
|
|
)
|
|
|
|
user = result["user"]
|
|
session_id = auth.create_session(
|
|
user=user,
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent"),
|
|
)
|
|
|
|
if not session_id:
|
|
return _templates.TemplateResponse(
|
|
"login.html",
|
|
{
|
|
"request": request,
|
|
"error": "Failed to create session",
|
|
"sso_enabled": auth.sso_enabled,
|
|
"enable_test_user": settings.ENABLE_TEST_USER,
|
|
"app_version": settings.APP_VERSION,
|
|
},
|
|
)
|
|
|
|
# Set session data
|
|
request.session["user_id"] = user["id"]
|
|
request.session["username"] = user["username"]
|
|
request.session["session_id"] = session_id
|
|
|
|
root = request.scope.get("root_path", "")
|
|
next_url = request.query_params.get("next", "/")
|
|
# Prefix with root_path if next_url is a relative path
|
|
if next_url.startswith("/") and not next_url.startswith(root):
|
|
next_url = f"{root}{next_url}"
|
|
return RedirectResponse(url=next_url, status_code=302)
|
|
|
|
|
|
@router.get("/logout")
|
|
async def logout(
|
|
request: Request,
|
|
auth: AuthService = Depends(get_auth_service),
|
|
):
|
|
"""Logout and destroy session."""
|
|
user_id = request.session.get("user_id")
|
|
session_id = request.session.get("session_id")
|
|
|
|
if session_id:
|
|
auth.destroy_session(session_id, user_id)
|
|
|
|
request.session.clear()
|
|
root = request.scope.get("root_path", "")
|
|
return RedirectResponse(url=f"{root}/login", status_code=302)
|
|
|
|
|
|
@router.post("/auth/azure-token")
|
|
async def auth_azure_token(
|
|
request: Request,
|
|
auth: AuthService = Depends(get_auth_service),
|
|
):
|
|
"""Validate Azure AD access token from client-side MSAL.js.
|
|
|
|
Frontend handles the OAuth popup/redirect via MSAL.js,
|
|
then POSTs the access_token here for server-side validation.
|
|
"""
|
|
from ..dependencies import get_database
|
|
from fastapi.responses import JSONResponse
|
|
|
|
data = await request.json()
|
|
access_token = data.get("access_token", "")
|
|
|
|
if not access_token:
|
|
return JSONResponse({"error": "No access token provided"}, status_code=400)
|
|
|
|
# Validate token by calling Microsoft Graph API
|
|
user_info = auth.sso.get_user_info(access_token)
|
|
if not user_info:
|
|
return JSONResponse({"error": "Invalid or expired token"}, status_code=401)
|
|
|
|
# Create or update user from Azure AD info
|
|
db = get_database()
|
|
user = auth.sso.create_or_update_user(user_info, db)
|
|
if not user:
|
|
return JSONResponse({"error": "Failed to create user account"}, status_code=500)
|
|
|
|
# Create session
|
|
session_id = auth.create_session(
|
|
user=user,
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent"),
|
|
)
|
|
|
|
if not session_id:
|
|
return JSONResponse({"error": "Failed to create session"}, status_code=500)
|
|
|
|
# Set session cookies
|
|
request.session["user_id"] = user["id"]
|
|
request.session["username"] = user["username"]
|
|
request.session["session_id"] = session_id
|
|
|
|
root = request.scope.get("root_path", "")
|
|
return {"success": True, "redirect": f"{root}/"}
|