ppt-tool/backend/services/auth_service.py
Vadym Samoilenko f2f729a50b Switch Azure AD auth to MSAL SPA (browser-side token exchange)
- Replace server-side ConfidentialClientApplication + OAuth callback
  with MSAL browser popup flow (PKCE, no client_secret required)
- Backend: add POST /sso-token endpoint that validates Azure AD ID token
  via Microsoft JWKS, issues session cookie; remove /login + /callback
- Frontend: install @azure/msal-browser + @azure/msal-react, wrap app
  with MsalProvider, login page uses loginPopup() → sends id_token to backend
- Pass NEXT_PUBLIC_AZURE_* env vars through next.config.mjs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 12:34:52 +00:00

154 lines
5.4 KiB
Python

import os
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional
import httpx
from jose import jwt, JWTError
from sqlmodel import select
from models.sql.user import UserModel
from models.sql.team import TeamModel
from models.sql.team_membership import TeamMembershipModel
class AuthService:
def __init__(self):
self.tenant_id = os.getenv("AZURE_AD_TENANT_ID")
self.client_id = os.getenv("AZURE_AD_CLIENT_ID")
self.jwt_secret = os.getenv("JWT_SECRET_KEY", "dev-secret-change-me")
self.jwt_algorithm = "HS256"
self.jwt_expiry_hours = 24
self.dev_password = os.getenv("DEV_AUTH_PASSWORD")
# Require explicit dev password if using dev mode
if self.is_dev_mode and not self.dev_password:
raise ValueError(
"DEV_AUTH_PASSWORD must be set in .env file when using dev mode. "
"Set AZURE_AD_TENANT_ID to enable Azure AD SSO instead."
)
self._jwks_cache: Optional[dict] = None
@property
def is_dev_mode(self) -> bool:
return not self.tenant_id
@property
def dev_login_enabled(self) -> bool:
"""Dev login is available whenever DEV_AUTH_PASSWORD is set, regardless of Azure AD config."""
return bool(self.dev_password)
async def validate_azure_token(self, id_token: str) -> dict:
"""Validate Azure AD ID token from MSAL SPA flow using Microsoft's public JWKS."""
if self.is_dev_mode:
raise ValueError("Azure AD not configured")
# Fetch JWKS (Microsoft public keys)
if not self._jwks_cache:
jwks_uri = f"https://login.microsoftonline.com/{self.tenant_id}/discovery/v2.0/keys"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(jwks_uri)
resp.raise_for_status()
self._jwks_cache = resp.json()
claims = jwt.decode(
id_token,
self._jwks_cache,
algorithms=["RS256"],
audience=self.client_id,
)
return claims
def create_session_jwt(self, user: UserModel) -> str:
payload = {
"sub": str(user.id),
"email": user.email,
"role": user.role,
"exp": datetime.now(timezone.utc) + timedelta(hours=self.jwt_expiry_hours),
"iat": datetime.now(timezone.utc),
}
return jwt.encode(payload, self.jwt_secret, algorithm=self.jwt_algorithm)
def validate_token(self, token: str) -> Optional[dict]:
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm])
return payload
except JWTError:
return None
async def get_or_create_user(self, claims: dict, session) -> UserModel:
"""Find user by azure_oid or create new one. Auto-adds to Oliver Team."""
azure_oid = claims.get("oid")
email = claims.get("preferred_username") or claims.get("email")
display_name = claims.get("name", email)
# Try find by azure_oid
if azure_oid:
result = await session.execute(
select(UserModel).where(UserModel.azure_oid == azure_oid)
)
user = result.scalar_one_or_none()
if user:
user.last_login_at = datetime.now(timezone.utc)
user.display_name = display_name
session.add(user)
await session.commit()
return user
# Try find by email
result = await session.execute(
select(UserModel).where(UserModel.email == email)
)
user = result.scalar_one_or_none()
if user:
if azure_oid and not user.azure_oid:
user.azure_oid = azure_oid
user.last_login_at = datetime.now(timezone.utc)
user.display_name = display_name
session.add(user)
await session.commit()
return user
# Create new user
user = UserModel(
azure_oid=azure_oid,
email=email,
display_name=display_name,
role="user",
is_active=True,
last_login_at=datetime.now(timezone.utc),
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(user)
await session.flush()
# Auto-add to Oliver Team
oliver_result = await session.execute(
select(TeamModel).where(TeamModel.is_default == True) # noqa: E712
)
oliver_team = oliver_result.scalar_one_or_none()
if oliver_team:
membership = TeamMembershipModel(
user_id=user.id,
team_id=oliver_team.id,
assigned_at=datetime.now(timezone.utc),
)
session.add(membership)
await session.commit()
return user
async def dev_login(self, email: str, password: str, session) -> Optional[UserModel]:
"""Dev login: validate password, get or create user. Works even when Azure AD is configured."""
if not self.dev_login_enabled:
return None
if password != self.dev_password:
return None
claims = {
"email": email,
"name": email.split("@")[0].replace(".", " ").title(),
}
return await self.get_or_create_user(claims, session)