ppt-tool/backend/services/auth_service.py
Vadym Samoilenko cf21ba4516 Phase 1-2: Foundation + Admin Panel & Client Management
Phase 1 (Foundation):
- Project restructure (presenton-main → backend/ + frontend/)
- Database schema (8 new models, Alembic config, seed script)
- Auth (Azure AD SSO + dev bypass, JWT sessions, AuthMiddleware)
- RBAC (access_service, rbac_middleware, admin routers)
- Audit logging (fire-and-forget, AuditMiddleware, admin router)
- i18n (react-i18next with 5 namespace files)

Phase 2 (Admin Panel & Client Management):
- Admin panel shell (sidebar layout, role guard, 12 pages)
- Redux admin slice with 18 async thunks
- User management (role changes, deactivation)
- Client management (CRUD, brand config, team management)
- Brand config editor (colors, fonts, logos, voice rules)
- Master deck upload & parser (PPTX → HTML → React pipeline)
- Audit log viewer with filters and CSV/JSON export

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 15:37:17 +00:00

154 lines
5.4 KiB
Python

import os
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import jwt, JWTError
from sqlmodel import select
from models.sql.user import UserModel
from models.sql.team import TeamModel
from models.sql.team_membership import TeamMembershipModel
class AuthService:
def __init__(self):
self.tenant_id = os.getenv("AZURE_AD_TENANT_ID")
self.client_id = os.getenv("AZURE_AD_CLIENT_ID")
self.client_secret = os.getenv("AZURE_AD_CLIENT_SECRET")
self.redirect_uri = os.getenv("AZURE_AD_REDIRECT_URI", "http://localhost/api/v1/auth/callback")
self.jwt_secret = os.getenv("JWT_SECRET_KEY", "dev-secret-change-me")
self.jwt_algorithm = "HS256"
self.jwt_expiry_hours = 24
self.dev_password = os.getenv("DEV_AUTH_PASSWORD", "devpass123")
self._msal_app = None
@property
def is_dev_mode(self) -> bool:
return not self.tenant_id
@property
def msal_app(self):
if self._msal_app is None and not self.is_dev_mode:
import msal
self._msal_app = msal.ConfidentialClientApplication(
self.client_id,
authority=f"https://login.microsoftonline.com/{self.tenant_id}",
client_credential=self.client_secret,
)
return self._msal_app
def get_authorization_url(self) -> str:
if self.is_dev_mode:
raise ValueError("Azure AD not configured — use dev login")
result = self.msal_app.get_authorization_request_url(
scopes=["User.Read"],
redirect_uri=self.redirect_uri,
)
return result
async def exchange_code_for_token(self, code: str) -> dict:
if self.is_dev_mode:
raise ValueError("Azure AD not configured — use dev login")
result = self.msal_app.acquire_token_by_authorization_code(
code,
scopes=["User.Read"],
redirect_uri=self.redirect_uri,
)
if "error" in result:
raise ValueError(f"Token exchange failed: {result.get('error_description', result.get('error'))}")
return result
def create_session_jwt(self, user: UserModel) -> str:
payload = {
"sub": str(user.id),
"email": user.email,
"role": user.role,
"exp": datetime.now(timezone.utc) + timedelta(hours=self.jwt_expiry_hours),
"iat": datetime.now(timezone.utc),
}
return jwt.encode(payload, self.jwt_secret, algorithm=self.jwt_algorithm)
def validate_token(self, token: str) -> Optional[dict]:
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm])
return payload
except JWTError:
return None
async def get_or_create_user(self, claims: dict, session) -> UserModel:
"""Find user by azure_oid or create new one. Auto-adds to Oliver Team."""
azure_oid = claims.get("oid")
email = claims.get("preferred_username") or claims.get("email")
display_name = claims.get("name", email)
# Try find by azure_oid
if azure_oid:
result = await session.execute(
select(UserModel).where(UserModel.azure_oid == azure_oid)
)
user = result.scalar_one_or_none()
if user:
user.last_login_at = datetime.now(timezone.utc)
user.display_name = display_name
session.add(user)
await session.commit()
return user
# Try find by email
result = await session.execute(
select(UserModel).where(UserModel.email == email)
)
user = result.scalar_one_or_none()
if user:
if azure_oid and not user.azure_oid:
user.azure_oid = azure_oid
user.last_login_at = datetime.now(timezone.utc)
user.display_name = display_name
session.add(user)
await session.commit()
return user
# Create new user
user = UserModel(
azure_oid=azure_oid,
email=email,
display_name=display_name,
role="user",
is_active=True,
last_login_at=datetime.now(timezone.utc),
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(user)
await session.flush()
# Auto-add to Oliver Team
oliver_result = await session.execute(
select(TeamModel).where(TeamModel.is_default == True) # noqa: E712
)
oliver_team = oliver_result.scalar_one_or_none()
if oliver_team:
membership = TeamMembershipModel(
user_id=user.id,
team_id=oliver_team.id,
assigned_at=datetime.now(timezone.utc),
)
session.add(membership)
await session.commit()
return user
async def dev_login(self, email: str, password: str, session) -> Optional[UserModel]:
"""Dev-mode login: validate password, get or create user."""
if not self.is_dev_mode:
return None
if password != self.dev_password:
return None
claims = {
"email": email,
"name": email.split("@")[0].replace(".", " ").title(),
}
return await self.get_or_create_user(claims, session)