diff --git a/requirements.txt b/requirements.txt index 643b617..c68b7b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -38,3 +38,7 @@ PyExifTool>=0.5.6 # Web Framework Flask>=3.0.0 +Werkzeug>=3.0.0 + +# Authentication & SSO +msal>=1.20.0 # Microsoft Authentication Library for SSO (optional) diff --git a/src/auth.py b/src/auth.py new file mode 100644 index 0000000..37b89e4 --- /dev/null +++ b/src/auth.py @@ -0,0 +1,324 @@ +"""Authentication and authorization module.""" + +import os +import secrets +from functools import wraps +from flask import session, redirect, url_for, request +from typing import Dict, Optional +from .database import Database +from .utils import get_logger + +logger = get_logger(__name__) + +# Initialize database +db = Database() + + +def login_required(f): + """ + Decorator to require login for routes. + + Usage: + @app.route('/protected') + @login_required + def protected_route(): + return 'Protected content' + """ + @wraps(f) + def decorated_function(*args, **kwargs): + if 'user_id' not in session: + # Save the original URL to redirect after login + return redirect(url_for('login', next=request.url)) + + # Check if session is still valid in database + session_id = session.get('session_id') + if session_id: + db_session = db.get_session(session_id) + if not db_session: + # Session expired or invalid + session.clear() + return redirect(url_for('login', next=request.url)) + + return f(*args, **kwargs) + return decorated_function + + +def authenticate_user(username: str, password: str) -> Dict: + """ + Authenticate user with username and password. + + Args: + username: Username + password: Plain text password + + Returns: + Dictionary with 'success' boolean and either 'user' dict or 'error' message + """ + try: + # Import werkzeug for password verification + from werkzeug.security import check_password_hash + + # Check test user first (hardcoded for testing) + if username == 'tester' and password == 'oliveradmin': + user = db.get_user_by_username('tester') + if user: + logger.info(f"Test user '{username}' authenticated successfully") + return {'success': True, 'user': user} + + # Check database for other users + user = db.get_user_by_username(username) + + if user and user.get('password_hash'): + if check_password_hash(user['password_hash'], password): + logger.info(f"User '{username}' authenticated successfully (database)") + return {'success': True, 'user': user} + + logger.warning(f"Authentication failed for user '{username}'") + return {'success': False, 'error': 'Invalid username or password'} + + except ImportError: + logger.error("werkzeug not available - cannot verify passwords") + return {'success': False, 'error': 'Authentication system not available'} + except Exception as e: + logger.error(f"Authentication error: {e}") + return {'success': False, 'error': 'Authentication error occurred'} + + +def create_user_session(user: Dict, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> str: + """ + Create a new session for authenticated user. + + Args: + user: User dictionary from database + ip_address: Client IP address + user_agent: Client user agent string + + Returns: + Session ID + """ + session_id = secrets.token_urlsafe(32) + user_id = user['id'] + + # Create session in database + success = db.create_session( + user_id=user_id, + session_id=session_id, + expires_in_hours=24, + ip_address=ip_address, + user_agent=user_agent + ) + + if success: + # Update last login timestamp + db.update_last_login(user_id) + + # Log the login action + db.log_action(user_id, 'login', f'IP: {ip_address}') + + logger.info(f"Created session for user {user['username']} (ID: {user_id})") + return session_id + else: + logger.error(f"Failed to create session for user {user_id}") + return None + + +def destroy_user_session(session_id: str, user_id: Optional[int] = None): + """ + Destroy user session (logout). + + Args: + session_id: Session ID to destroy + user_id: Optional user ID for logging + """ + db.delete_session(session_id) + + if user_id: + db.log_action(user_id, 'logout', f'Session: {session_id}') + logger.info(f"User {user_id} logged out") + + +def get_current_user() -> Optional[Dict]: + """ + Get current logged-in user from session. + + Returns: + User dictionary or None if not logged in + """ + user_id = session.get('user_id') + if user_id: + return db.get_user_by_id(user_id) + return None + + +def cleanup_sessions(): + """Clean up expired sessions from database.""" + db.cleanup_expired_sessions() + + +class MicrosoftSSO: + """Microsoft SSO authentication handler using MSAL.""" + + def __init__(self): + """Initialize Microsoft SSO with environment variables.""" + self.client_id = os.getenv('AZURE_CLIENT_ID') + self.client_secret = os.getenv('AZURE_CLIENT_SECRET') + self.tenant_id = os.getenv('AZURE_TENANT_ID') + self.redirect_uri = os.getenv('REDIRECT_URI', 'http://localhost:5001/auth/callback') + + # Check if SSO is configured + if not all([self.client_id, self.client_secret, self.tenant_id]): + self.enabled = False + logger.warning("Microsoft SSO not configured (missing Azure credentials)") + return + + try: + import msal + self.authority = f"https://login.microsoftonline.com/{self.tenant_id}" + self.app = msal.ConfidentialClientApplication( + self.client_id, + authority=self.authority, + client_credential=self.client_secret + ) + self.enabled = True + logger.info("Microsoft SSO initialized successfully") + except ImportError: + self.enabled = False + logger.warning("Microsoft SSO not available (msal library not installed)") + except Exception as e: + self.enabled = False + logger.error(f"Failed to initialize Microsoft SSO: {e}") + + def get_auth_url(self, state: Optional[str] = None) -> Optional[str]: + """ + Get Microsoft login URL. + + Args: + state: State parameter for CSRF protection + + Returns: + Authorization URL or None if SSO not enabled + """ + if not self.enabled: + return None + + try: + return self.app.get_authorization_request_url( + scopes=["User.Read"], + state=state, + redirect_uri=self.redirect_uri + ) + except Exception as e: + logger.error(f"Error generating auth URL: {e}") + return None + + def acquire_token(self, auth_code: str) -> Optional[Dict]: + """ + Exchange authorization code for access token. + + Args: + auth_code: Authorization code from Microsoft + + Returns: + Token result dictionary or None if failed + """ + if not self.enabled: + return None + + try: + result = self.app.acquire_token_by_authorization_code( + auth_code, + scopes=["User.Read"], + redirect_uri=self.redirect_uri + ) + return result + except Exception as e: + logger.error(f"Error acquiring token: {e}") + return None + + def get_user_info(self, access_token: str) -> Optional[Dict]: + """ + Get user info from Microsoft Graph API. + + Args: + access_token: Access token from Microsoft + + Returns: + User info dictionary or None if failed + """ + if not self.enabled: + return None + + try: + import requests + headers = {'Authorization': f'Bearer {access_token}'} + response = requests.get( + 'https://graph.microsoft.com/v1.0/me', + headers=headers, + timeout=10 + ) + + if response.status_code == 200: + return response.json() + else: + logger.error(f"Graph API error: {response.status_code}") + return None + + except Exception as e: + logger.error(f"Error fetching user info: {e}") + return None + + def create_or_update_user(self, user_info: Dict) -> Optional[Dict]: + """ + Create or update user from SSO login. + + Args: + user_info: User information from Microsoft Graph + + Returns: + User dictionary or None if failed + """ + try: + email = user_info.get('mail') or user_info.get('userPrincipalName') + username = email.split('@')[0] if email else user_info.get('displayName', 'unknown') + full_name = user_info.get('displayName') + + # Check if user exists + user = db.get_user_by_username(username) + + if not user: + # Create new user + user_id = db.create_user( + username=username, + email=email, + full_name=full_name, + auth_method='sso' + ) + + if user_id: + user = db.get_user_by_id(user_id) + logger.info(f"Created new SSO user: {username}") + else: + logger.error(f"Failed to create SSO user: {username}") + return None + else: + logger.info(f"Existing SSO user logged in: {username}") + + return user + + except Exception as e: + logger.error(f"Error creating/updating SSO user: {e}") + return None + + +# Initialize Microsoft SSO +sso = MicrosoftSSO() + + +def is_sso_enabled() -> bool: + """Check if Microsoft SSO is enabled and configured.""" + return sso.enabled + + +def get_sso_instance() -> MicrosoftSSO: + """Get Microsoft SSO instance.""" + return sso diff --git a/src/database.py b/src/database.py new file mode 100644 index 0000000..dd45ae8 --- /dev/null +++ b/src/database.py @@ -0,0 +1,414 @@ +"""Database management for user authentication and sessions.""" + +import sqlite3 +from datetime import datetime, timedelta +from typing import Optional, Dict, List +from pathlib import Path +from .utils import get_logger + +logger = get_logger(__name__) + + +class Database: + """SQLite database manager for Oliver Metadata Tool.""" + + def __init__(self, db_path: str = 'oliver_metadata.db'): + """ + Initialize database connection. + + Args: + db_path: Path to SQLite database file + """ + self.db_path = db_path + self.conn = sqlite3.connect(db_path, check_same_thread=False) + self.conn.row_factory = sqlite3.Row + self.create_tables() + logger.info(f"Database initialized at {db_path}") + + def create_tables(self): + """Create database tables if they don't exist.""" + # Users table + self.conn.execute(''' + CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + username TEXT UNIQUE NOT NULL, + password_hash TEXT, + email TEXT, + full_name TEXT, + auth_method TEXT DEFAULT 'local', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + last_login TIMESTAMP, + is_active BOOLEAN DEFAULT 1 + ) + ''') + + # Sessions table + self.conn.execute(''' + CREATE TABLE IF NOT EXISTS sessions ( + session_id TEXT PRIMARY KEY, + user_id INTEGER NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + expires_at TIMESTAMP NOT NULL, + ip_address TEXT, + user_agent TEXT, + FOREIGN KEY (user_id) REFERENCES users (id) + ) + ''') + + # Audit log table + self.conn.execute(''' + CREATE TABLE IF NOT EXISTS audit_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + action TEXT NOT NULL, + details TEXT, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (user_id) REFERENCES users (id) + ) + ''') + + # Create indexes for performance + self.conn.execute(''' + CREATE INDEX IF NOT EXISTS idx_sessions_user_id + ON sessions(user_id) + ''') + + self.conn.execute(''' + CREATE INDEX IF NOT EXISTS idx_sessions_expires_at + ON sessions(expires_at) + ''') + + self.conn.execute(''' + CREATE INDEX IF NOT EXISTS idx_audit_user_id + ON audit_log(user_id) + ''') + + self.conn.execute(''' + CREATE INDEX IF NOT EXISTS idx_audit_timestamp + ON audit_log(timestamp) + ''') + + self.conn.commit() + logger.info("Database tables created/verified") + + # Create test user if not exists + self.create_test_user() + + def create_test_user(self): + """Create test user (tester/oliveradmin) if doesn't exist.""" + try: + cursor = self.conn.execute('SELECT id FROM users WHERE username = ?', ('tester',)) + if not cursor.fetchone(): + # Import werkzeug here to avoid dependency issues + try: + from werkzeug.security import generate_password_hash + password_hash = generate_password_hash('oliveradmin') + + self.conn.execute(''' + INSERT INTO users (username, password_hash, email, full_name, auth_method) + VALUES (?, ?, ?, ?, ?) + ''', ('tester', password_hash, 'tester@oliver.local', 'Test User', 'local')) + self.conn.commit() + logger.info("Test user 'tester' created successfully") + except ImportError: + logger.warning("werkzeug not available - test user not created") + except Exception as e: + logger.error(f"Error creating test user: {e}") + + def get_user_by_username(self, username: str) -> Optional[Dict]: + """ + Get user by username. + + Args: + username: Username to look up + + Returns: + User dictionary or None if not found + """ + try: + cursor = self.conn.execute(''' + SELECT * FROM users WHERE username = ? AND is_active = 1 + ''', (username,)) + row = cursor.fetchone() + return dict(row) if row else None + except Exception as e: + logger.error(f"Error fetching user '{username}': {e}") + return None + + def get_user_by_id(self, user_id: int) -> Optional[Dict]: + """ + Get user by ID. + + Args: + user_id: User ID to look up + + Returns: + User dictionary or None if not found + """ + try: + cursor = self.conn.execute(''' + SELECT * FROM users WHERE id = ? AND is_active = 1 + ''', (user_id,)) + row = cursor.fetchone() + return dict(row) if row else None + except Exception as e: + logger.error(f"Error fetching user ID {user_id}: {e}") + return None + + def create_user( + self, + username: str, + password_hash: Optional[str] = None, + email: Optional[str] = None, + full_name: Optional[str] = None, + auth_method: str = 'local' + ) -> Optional[int]: + """ + Create a new user. + + Args: + username: Unique username + password_hash: Hashed password (for local auth) + email: User email + full_name: User's full name + auth_method: Authentication method ('local' or 'sso') + + Returns: + User ID if successful, None otherwise + """ + try: + cursor = self.conn.execute(''' + INSERT INTO users (username, password_hash, email, full_name, auth_method) + VALUES (?, ?, ?, ?, ?) + ''', (username, password_hash, email, full_name, auth_method)) + self.conn.commit() + + user_id = cursor.lastrowid + logger.info(f"Created user '{username}' (ID: {user_id})") + return user_id + + except sqlite3.IntegrityError: + logger.warning(f"User '{username}' already exists") + return None + except Exception as e: + logger.error(f"Error creating user '{username}': {e}") + return None + + def update_last_login(self, user_id: int): + """ + Update user's last login timestamp. + + Args: + user_id: User ID + """ + try: + self.conn.execute(''' + UPDATE users SET last_login = CURRENT_TIMESTAMP WHERE id = ? + ''', (user_id,)) + self.conn.commit() + except Exception as e: + logger.error(f"Error updating last login for user {user_id}: {e}") + + def create_session( + self, + user_id: int, + session_id: str, + expires_in_hours: int = 24, + ip_address: Optional[str] = None, + user_agent: Optional[str] = None + ) -> bool: + """ + Create new session for user. + + Args: + user_id: User ID + session_id: Unique session identifier + expires_in_hours: Session expiration time in hours + ip_address: Client IP address + user_agent: Client user agent string + + Returns: + True if successful + """ + try: + expires_at = datetime.now() + timedelta(hours=expires_in_hours) + self.conn.execute(''' + INSERT INTO sessions (session_id, user_id, expires_at, ip_address, user_agent) + VALUES (?, ?, ?, ?, ?) + ''', (session_id, user_id, expires_at, ip_address, user_agent)) + self.conn.commit() + logger.info(f"Created session for user {user_id}") + return True + except Exception as e: + logger.error(f"Error creating session: {e}") + return False + + def get_session(self, session_id: str) -> Optional[Dict]: + """ + Get session by ID. + + Args: + session_id: Session ID to look up + + Returns: + Session dictionary or None if not found/expired + """ + try: + cursor = self.conn.execute(''' + SELECT s.*, u.username, u.email, u.full_name + FROM sessions s + JOIN users u ON s.user_id = u.id + WHERE s.session_id = ? AND s.expires_at > CURRENT_TIMESTAMP + ''', (session_id,)) + row = cursor.fetchone() + return dict(row) if row else None + except Exception as e: + logger.error(f"Error fetching session: {e}") + return None + + def delete_session(self, session_id: str) -> bool: + """ + Delete session (logout). + + Args: + session_id: Session ID to delete + + Returns: + True if successful + """ + try: + self.conn.execute('DELETE FROM sessions WHERE session_id = ?', (session_id,)) + self.conn.commit() + logger.info(f"Deleted session {session_id}") + return True + except Exception as e: + logger.error(f"Error deleting session: {e}") + return False + + def cleanup_expired_sessions(self): + """Remove expired sessions from database.""" + try: + cursor = self.conn.execute(''' + DELETE FROM sessions WHERE expires_at < CURRENT_TIMESTAMP + ''') + self.conn.commit() + deleted_count = cursor.rowcount + if deleted_count > 0: + logger.info(f"Cleaned up {deleted_count} expired sessions") + except Exception as e: + logger.error(f"Error cleaning up sessions: {e}") + + def log_action(self, user_id: int, action: str, details: Optional[str] = None): + """ + Log user action to audit trail. + + Args: + user_id: User ID performing the action + action: Action description (e.g., "login", "update_metadata", "export") + details: Optional additional details (JSON string or plain text) + """ + try: + self.conn.execute(''' + INSERT INTO audit_log (user_id, action, details) + VALUES (?, ?, ?) + ''', (user_id, action, details)) + self.conn.commit() + except Exception as e: + logger.error(f"Error logging action: {e}") + + def get_user_activity( + self, + user_id: int, + limit: int = 100, + offset: int = 0 + ) -> List[Dict]: + """ + Get user activity log. + + Args: + user_id: User ID + limit: Maximum number of records to return + offset: Offset for pagination + + Returns: + List of activity records + """ + try: + cursor = self.conn.execute(''' + SELECT * FROM audit_log + WHERE user_id = ? + ORDER BY timestamp DESC + LIMIT ? OFFSET ? + ''', (user_id, limit, offset)) + + return [dict(row) for row in cursor.fetchall()] + except Exception as e: + logger.error(f"Error fetching user activity: {e}") + return [] + + def get_all_users(self, include_inactive: bool = False) -> List[Dict]: + """ + Get all users. + + Args: + include_inactive: Include inactive users + + Returns: + List of user dictionaries + """ + try: + query = 'SELECT * FROM users' + if not include_inactive: + query += ' WHERE is_active = 1' + query += ' ORDER BY created_at DESC' + + cursor = self.conn.execute(query) + return [dict(row) for row in cursor.fetchall()] + except Exception as e: + logger.error(f"Error fetching users: {e}") + return [] + + def get_stats(self) -> Dict: + """ + Get database statistics. + + Returns: + Statistics dictionary + """ + try: + stats = {} + + # Count users + cursor = self.conn.execute('SELECT COUNT(*) as count FROM users WHERE is_active = 1') + stats['active_users'] = cursor.fetchone()['count'] + + # Count active sessions + cursor = self.conn.execute(''' + SELECT COUNT(*) as count FROM sessions + WHERE expires_at > CURRENT_TIMESTAMP + ''') + stats['active_sessions'] = cursor.fetchone()['count'] + + # Count audit log entries + cursor = self.conn.execute('SELECT COUNT(*) as count FROM audit_log') + stats['audit_entries'] = cursor.fetchone()['count'] + + # Recent activity (last 24 hours) + cursor = self.conn.execute(''' + SELECT COUNT(*) as count FROM audit_log + WHERE timestamp > datetime('now', '-24 hours') + ''') + stats['recent_activity'] = cursor.fetchone()['count'] + + return stats + except Exception as e: + logger.error(f"Error fetching stats: {e}") + return {} + + def close(self): + """Close database connection.""" + try: + self.conn.close() + logger.info("Database connection closed") + except Exception as e: + logger.error(f"Error closing database: {e}") diff --git a/templates/login.html b/templates/login.html new file mode 100644 index 0000000..5815198 --- /dev/null +++ b/templates/login.html @@ -0,0 +1,239 @@ + + +
+ + +Sign in to continue
+