ppt-tool/backend/services/auth_service.py
Vadym Samoilenko 19222ab36d Fix auth for optical-dev.oliver.solutions/ppt-tool/ deployment
- AZURE_AD_REDIRECT_URI set to https://optical-dev.oliver.solutions/ppt-tool/
- Root page intercepts ?code= from Azure AD and forwards to backend callback
- Post-OAuth redirect uses NEXT_PUBLIC_BASE_PATH env var (/ppt-tool/dashboard)
- Cookie secure flag driven by COOKIE_SECURE env var (true in prod)
- Dev login now works alongside Azure AD when DEV_AUTH_PASSWORD is set
- Login page shows both Microsoft SSO and dev form when both modes enabled
- docker-compose.prod.yml: add COOKIE_SECURE=true and NEXT_PUBLIC_BASE_PATH

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 17:38:42 +00:00

166 lines
5.9 KiB
Python

import os
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import jwt, JWTError
from sqlmodel import select
from models.sql.user import UserModel
from models.sql.team import TeamModel
from models.sql.team_membership import TeamMembershipModel
class AuthService:
def __init__(self):
self.tenant_id = os.getenv("AZURE_AD_TENANT_ID")
self.client_id = os.getenv("AZURE_AD_CLIENT_ID")
self.client_secret = os.getenv("AZURE_AD_CLIENT_SECRET")
self.redirect_uri = os.getenv("AZURE_AD_REDIRECT_URI", "http://localhost/api/v1/auth/callback")
self.jwt_secret = os.getenv("JWT_SECRET_KEY", "dev-secret-change-me")
self.jwt_algorithm = "HS256"
self.jwt_expiry_hours = 24
self.dev_password = os.getenv("DEV_AUTH_PASSWORD")
# Require explicit dev password if using dev mode
if self.is_dev_mode and not self.dev_password:
raise ValueError(
"DEV_AUTH_PASSWORD must be set in .env file when using dev mode. "
"Set AZURE_AD_TENANT_ID to enable Azure AD SSO instead."
)
self._msal_app = None
@property
def is_dev_mode(self) -> bool:
return not self.tenant_id
@property
def dev_login_enabled(self) -> bool:
"""Dev login is available whenever DEV_AUTH_PASSWORD is set, regardless of Azure AD config."""
return bool(self.dev_password)
@property
def msal_app(self):
if self._msal_app is None and not self.is_dev_mode:
import msal
self._msal_app = msal.ConfidentialClientApplication(
self.client_id,
authority=f"https://login.microsoftonline.com/{self.tenant_id}",
client_credential=self.client_secret,
)
return self._msal_app
def get_authorization_url(self) -> str:
if self.is_dev_mode:
raise ValueError("Azure AD not configured — use dev login")
result = self.msal_app.get_authorization_request_url(
scopes=["User.Read"],
redirect_uri=self.redirect_uri,
)
return result
async def exchange_code_for_token(self, code: str) -> dict:
if self.is_dev_mode:
raise ValueError("Azure AD not configured — use dev login")
result = self.msal_app.acquire_token_by_authorization_code(
code,
scopes=["User.Read"],
redirect_uri=self.redirect_uri,
)
if "error" in result:
raise ValueError(f"Token exchange failed: {result.get('error_description', result.get('error'))}")
return result
def create_session_jwt(self, user: UserModel) -> str:
payload = {
"sub": str(user.id),
"email": user.email,
"role": user.role,
"exp": datetime.now(timezone.utc) + timedelta(hours=self.jwt_expiry_hours),
"iat": datetime.now(timezone.utc),
}
return jwt.encode(payload, self.jwt_secret, algorithm=self.jwt_algorithm)
def validate_token(self, token: str) -> Optional[dict]:
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm])
return payload
except JWTError:
return None
async def get_or_create_user(self, claims: dict, session) -> UserModel:
"""Find user by azure_oid or create new one. Auto-adds to Oliver Team."""
azure_oid = claims.get("oid")
email = claims.get("preferred_username") or claims.get("email")
display_name = claims.get("name", email)
# Try find by azure_oid
if azure_oid:
result = await session.execute(
select(UserModel).where(UserModel.azure_oid == azure_oid)
)
user = result.scalar_one_or_none()
if user:
user.last_login_at = datetime.now(timezone.utc)
user.display_name = display_name
session.add(user)
await session.commit()
return user
# Try find by email
result = await session.execute(
select(UserModel).where(UserModel.email == email)
)
user = result.scalar_one_or_none()
if user:
if azure_oid and not user.azure_oid:
user.azure_oid = azure_oid
user.last_login_at = datetime.now(timezone.utc)
user.display_name = display_name
session.add(user)
await session.commit()
return user
# Create new user
user = UserModel(
azure_oid=azure_oid,
email=email,
display_name=display_name,
role="user",
is_active=True,
last_login_at=datetime.now(timezone.utc),
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(user)
await session.flush()
# Auto-add to Oliver Team
oliver_result = await session.execute(
select(TeamModel).where(TeamModel.is_default == True) # noqa: E712
)
oliver_team = oliver_result.scalar_one_or_none()
if oliver_team:
membership = TeamMembershipModel(
user_id=user.id,
team_id=oliver_team.id,
assigned_at=datetime.now(timezone.utc),
)
session.add(membership)
await session.commit()
return user
async def dev_login(self, email: str, password: str, session) -> Optional[UserModel]:
"""Dev login: validate password, get or create user. Works even when Azure AD is configured."""
if not self.dev_login_enabled:
return None
if password != self.dev_password:
return None
claims = {
"email": email,
"name": email.split("@")[0].replace(".", " ").title(),
}
return await self.get_or_create_user(claims, session)