solventum-image-metadata/src/auth.py
SamoilenkoVadym 3deaa5ef40 Initial commit: Oliver Metadata Tool (FastAPI)
Complete Flask → FastAPI migration with:
- FastAPI app with session auth, Azure AD SSO, rate limiting
- SQLite-backed session store (survives restarts)
- Bulk AI metadata generation with SSE progress
- Admin panel (user management, audit log, AI usage)
- Subpath deployment support (ROOT_PATH config)
- Docker + deploy.sh for production deployment
- Test suite (auth, upload, templates, imports, admin, sessions)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-02-09 21:23:42 +00:00

324 lines
9.7 KiB
Python

"""Authentication and authorization module."""
import os
import secrets
from functools import wraps
from flask import session, redirect, url_for, request
from typing import Dict, Optional
from .database import Database
from .utils import get_logger
logger = get_logger(__name__)
# Initialize database
db = Database()
def login_required(f):
"""
Decorator to require login for routes.
Usage:
@app.route('/protected')
@login_required
def protected_route():
return 'Protected content'
"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
# Save the original URL to redirect after login
return redirect(url_for('login', next=request.url))
# Check if session is still valid in database
session_id = session.get('session_id')
if session_id:
db_session = db.get_session(session_id)
if not db_session:
# Session expired or invalid
session.clear()
return redirect(url_for('login', next=request.url))
return f(*args, **kwargs)
return decorated_function
def authenticate_user(username: str, password: str) -> Dict:
"""
Authenticate user with username and password.
Args:
username: Username
password: Plain text password
Returns:
Dictionary with 'success' boolean and either 'user' dict or 'error' message
"""
try:
# Import werkzeug for password verification
from werkzeug.security import check_password_hash
# Check test user first (hardcoded for testing)
if username == 'tester' and password == 'oliveradmin':
user = db.get_user_by_username('tester')
if user:
logger.info(f"Test user '{username}' authenticated successfully")
return {'success': True, 'user': user}
# Check database for other users
user = db.get_user_by_username(username)
if user and user.get('password_hash'):
if check_password_hash(user['password_hash'], password):
logger.info(f"User '{username}' authenticated successfully (database)")
return {'success': True, 'user': user}
logger.warning(f"Authentication failed for user '{username}'")
return {'success': False, 'error': 'Invalid username or password'}
except ImportError:
logger.error("werkzeug not available - cannot verify passwords")
return {'success': False, 'error': 'Authentication system not available'}
except Exception as e:
logger.error(f"Authentication error: {e}")
return {'success': False, 'error': 'Authentication error occurred'}
def create_user_session(user: Dict, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> str:
"""
Create a new session for authenticated user.
Args:
user: User dictionary from database
ip_address: Client IP address
user_agent: Client user agent string
Returns:
Session ID
"""
session_id = secrets.token_urlsafe(32)
user_id = user['id']
# Create session in database
success = db.create_session(
user_id=user_id,
session_id=session_id,
expires_in_hours=24,
ip_address=ip_address,
user_agent=user_agent
)
if success:
# Update last login timestamp
db.update_last_login(user_id)
# Log the login action
db.log_action(user_id, 'login', f'IP: {ip_address}')
logger.info(f"Created session for user {user['username']} (ID: {user_id})")
return session_id
else:
logger.error(f"Failed to create session for user {user_id}")
return None
def destroy_user_session(session_id: str, user_id: Optional[int] = None):
"""
Destroy user session (logout).
Args:
session_id: Session ID to destroy
user_id: Optional user ID for logging
"""
db.delete_session(session_id)
if user_id:
db.log_action(user_id, 'logout', f'Session: {session_id}')
logger.info(f"User {user_id} logged out")
def get_current_user() -> Optional[Dict]:
"""
Get current logged-in user from session.
Returns:
User dictionary or None if not logged in
"""
user_id = session.get('user_id')
if user_id:
return db.get_user_by_id(user_id)
return None
def cleanup_sessions():
"""Clean up expired sessions from database."""
db.cleanup_expired_sessions()
class MicrosoftSSO:
"""Microsoft SSO authentication handler using MSAL."""
def __init__(self):
"""Initialize Microsoft SSO with environment variables."""
self.client_id = os.getenv('AZURE_CLIENT_ID')
self.client_secret = os.getenv('AZURE_CLIENT_SECRET')
self.tenant_id = os.getenv('AZURE_TENANT_ID')
self.redirect_uri = os.getenv('REDIRECT_URI', 'http://localhost:5001/auth/callback')
# Check if SSO is configured
if not all([self.client_id, self.client_secret, self.tenant_id]):
self.enabled = False
logger.warning("Microsoft SSO not configured (missing Azure credentials)")
return
try:
import msal
self.authority = f"https://login.microsoftonline.com/{self.tenant_id}"
self.app = msal.ConfidentialClientApplication(
self.client_id,
authority=self.authority,
client_credential=self.client_secret
)
self.enabled = True
logger.info("Microsoft SSO initialized successfully")
except ImportError:
self.enabled = False
logger.warning("Microsoft SSO not available (msal library not installed)")
except Exception as e:
self.enabled = False
logger.error(f"Failed to initialize Microsoft SSO: {e}")
def get_auth_url(self, state: Optional[str] = None) -> Optional[str]:
"""
Get Microsoft login URL.
Args:
state: State parameter for CSRF protection
Returns:
Authorization URL or None if SSO not enabled
"""
if not self.enabled:
return None
try:
return self.app.get_authorization_request_url(
scopes=["User.Read"],
state=state,
redirect_uri=self.redirect_uri
)
except Exception as e:
logger.error(f"Error generating auth URL: {e}")
return None
def acquire_token(self, auth_code: str) -> Optional[Dict]:
"""
Exchange authorization code for access token.
Args:
auth_code: Authorization code from Microsoft
Returns:
Token result dictionary or None if failed
"""
if not self.enabled:
return None
try:
result = self.app.acquire_token_by_authorization_code(
auth_code,
scopes=["User.Read"],
redirect_uri=self.redirect_uri
)
return result
except Exception as e:
logger.error(f"Error acquiring token: {e}")
return None
def get_user_info(self, access_token: str) -> Optional[Dict]:
"""
Get user info from Microsoft Graph API.
Args:
access_token: Access token from Microsoft
Returns:
User info dictionary or None if failed
"""
if not self.enabled:
return None
try:
import requests
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(
'https://graph.microsoft.com/v1.0/me',
headers=headers,
timeout=10
)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Graph API error: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error fetching user info: {e}")
return None
def create_or_update_user(self, user_info: Dict) -> Optional[Dict]:
"""
Create or update user from SSO login.
Args:
user_info: User information from Microsoft Graph
Returns:
User dictionary or None if failed
"""
try:
email = user_info.get('mail') or user_info.get('userPrincipalName')
username = email.split('@')[0] if email else user_info.get('displayName', 'unknown')
full_name = user_info.get('displayName')
# Check if user exists
user = db.get_user_by_username(username)
if not user:
# Create new user
user_id = db.create_user(
username=username,
email=email,
full_name=full_name,
auth_method='sso'
)
if user_id:
user = db.get_user_by_id(user_id)
logger.info(f"Created new SSO user: {username}")
else:
logger.error(f"Failed to create SSO user: {username}")
return None
else:
logger.info(f"Existing SSO user logged in: {username}")
return user
except Exception as e:
logger.error(f"Error creating/updating SSO user: {e}")
return None
# Initialize Microsoft SSO
sso = MicrosoftSSO()
def is_sso_enabled() -> bool:
"""Check if Microsoft SSO is enabled and configured."""
return sso.enabled
def get_sso_instance() -> MicrosoftSSO:
"""Get Microsoft SSO instance."""
return sso