- Replace server-side ConfidentialClientApplication + OAuth callback with MSAL browser popup flow (PKCE, no client_secret required) - Backend: add POST /sso-token endpoint that validates Azure AD ID token via Microsoft JWKS, issues session cookie; remove /login + /callback - Frontend: install @azure/msal-browser + @azure/msal-react, wrap app with MsalProvider, login page uses loginPopup() → sends id_token to backend - Pass NEXT_PUBLIC_AZURE_* env vars through next.config.mjs Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
154 lines
5.4 KiB
Python
154 lines
5.4 KiB
Python
import os
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from jose import jwt, JWTError
|
|
from sqlmodel import select
|
|
|
|
from models.sql.user import UserModel
|
|
from models.sql.team import TeamModel
|
|
from models.sql.team_membership import TeamMembershipModel
|
|
|
|
|
|
class AuthService:
|
|
def __init__(self):
|
|
self.tenant_id = os.getenv("AZURE_AD_TENANT_ID")
|
|
self.client_id = os.getenv("AZURE_AD_CLIENT_ID")
|
|
self.jwt_secret = os.getenv("JWT_SECRET_KEY", "dev-secret-change-me")
|
|
self.jwt_algorithm = "HS256"
|
|
self.jwt_expiry_hours = 24
|
|
self.dev_password = os.getenv("DEV_AUTH_PASSWORD")
|
|
|
|
# Require explicit dev password if using dev mode
|
|
if self.is_dev_mode and not self.dev_password:
|
|
raise ValueError(
|
|
"DEV_AUTH_PASSWORD must be set in .env file when using dev mode. "
|
|
"Set AZURE_AD_TENANT_ID to enable Azure AD SSO instead."
|
|
)
|
|
|
|
self._jwks_cache: Optional[dict] = None
|
|
|
|
@property
|
|
def is_dev_mode(self) -> bool:
|
|
return not self.tenant_id
|
|
|
|
@property
|
|
def dev_login_enabled(self) -> bool:
|
|
"""Dev login is available whenever DEV_AUTH_PASSWORD is set, regardless of Azure AD config."""
|
|
return bool(self.dev_password)
|
|
|
|
async def validate_azure_token(self, id_token: str) -> dict:
|
|
"""Validate Azure AD ID token from MSAL SPA flow using Microsoft's public JWKS."""
|
|
if self.is_dev_mode:
|
|
raise ValueError("Azure AD not configured")
|
|
|
|
# Fetch JWKS (Microsoft public keys)
|
|
if not self._jwks_cache:
|
|
jwks_uri = f"https://login.microsoftonline.com/{self.tenant_id}/discovery/v2.0/keys"
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.get(jwks_uri)
|
|
resp.raise_for_status()
|
|
self._jwks_cache = resp.json()
|
|
|
|
claims = jwt.decode(
|
|
id_token,
|
|
self._jwks_cache,
|
|
algorithms=["RS256"],
|
|
audience=self.client_id,
|
|
)
|
|
return claims
|
|
|
|
def create_session_jwt(self, user: UserModel) -> str:
|
|
payload = {
|
|
"sub": str(user.id),
|
|
"email": user.email,
|
|
"role": user.role,
|
|
"exp": datetime.now(timezone.utc) + timedelta(hours=self.jwt_expiry_hours),
|
|
"iat": datetime.now(timezone.utc),
|
|
}
|
|
return jwt.encode(payload, self.jwt_secret, algorithm=self.jwt_algorithm)
|
|
|
|
def validate_token(self, token: str) -> Optional[dict]:
|
|
try:
|
|
payload = jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm])
|
|
return payload
|
|
except JWTError:
|
|
return None
|
|
|
|
async def get_or_create_user(self, claims: dict, session) -> UserModel:
|
|
"""Find user by azure_oid or create new one. Auto-adds to Oliver Team."""
|
|
azure_oid = claims.get("oid")
|
|
email = claims.get("preferred_username") or claims.get("email")
|
|
display_name = claims.get("name", email)
|
|
|
|
# Try find by azure_oid
|
|
if azure_oid:
|
|
result = await session.execute(
|
|
select(UserModel).where(UserModel.azure_oid == azure_oid)
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
if user:
|
|
user.last_login_at = datetime.now(timezone.utc)
|
|
user.display_name = display_name
|
|
session.add(user)
|
|
await session.commit()
|
|
return user
|
|
|
|
# Try find by email
|
|
result = await session.execute(
|
|
select(UserModel).where(UserModel.email == email)
|
|
)
|
|
user = result.scalar_one_or_none()
|
|
if user:
|
|
if azure_oid and not user.azure_oid:
|
|
user.azure_oid = azure_oid
|
|
user.last_login_at = datetime.now(timezone.utc)
|
|
user.display_name = display_name
|
|
session.add(user)
|
|
await session.commit()
|
|
return user
|
|
|
|
# Create new user
|
|
user = UserModel(
|
|
azure_oid=azure_oid,
|
|
email=email,
|
|
display_name=display_name,
|
|
role="user",
|
|
is_active=True,
|
|
last_login_at=datetime.now(timezone.utc),
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
)
|
|
session.add(user)
|
|
await session.flush()
|
|
|
|
# Auto-add to Oliver Team
|
|
oliver_result = await session.execute(
|
|
select(TeamModel).where(TeamModel.is_default == True) # noqa: E712
|
|
)
|
|
oliver_team = oliver_result.scalar_one_or_none()
|
|
if oliver_team:
|
|
membership = TeamMembershipModel(
|
|
user_id=user.id,
|
|
team_id=oliver_team.id,
|
|
assigned_at=datetime.now(timezone.utc),
|
|
)
|
|
session.add(membership)
|
|
|
|
await session.commit()
|
|
return user
|
|
|
|
async def dev_login(self, email: str, password: str, session) -> Optional[UserModel]:
|
|
"""Dev login: validate password, get or create user. Works even when Azure AD is configured."""
|
|
if not self.dev_login_enabled:
|
|
return None
|
|
if password != self.dev_password:
|
|
return None
|
|
|
|
claims = {
|
|
"email": email,
|
|
"name": email.split("@")[0].replace(".", " ").title(),
|
|
}
|
|
return await self.get_or_create_user(claims, session)
|