"""Database management for user authentication and sessions.""" import sqlite3 import os from datetime import datetime, timedelta from typing import Optional, Dict, List from pathlib import Path from .utils import get_logger logger = get_logger(__name__) class Database: """SQLite database manager for Oliver Metadata Tool.""" def __init__(self, db_path: str = None): """ Initialize database connection. Args: db_path: Path to SQLite database file (None = auto-detect based on environment) """ # Auto-detect database path based on environment if db_path is None: DOCKER_MODE = os.getenv('DOCKER_MODE', 'false').lower() == 'true' if DOCKER_MODE: # Use persistent data directory in Docker db_dir = Path('/app/data') db_dir.mkdir(parents=True, exist_ok=True) db_path = str(db_dir / 'oliver_metadata.db') else: # Use current directory for local development db_path = 'oliver_metadata.db' self.db_path = db_path self.conn = sqlite3.connect(db_path, check_same_thread=False) self.conn.row_factory = sqlite3.Row self.create_tables() logger.info(f"Database initialized at {db_path}") def create_tables(self): """Create database tables if they don't exist.""" # Users table self.conn.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT, email TEXT, full_name TEXT, auth_method TEXT DEFAULT 'local', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_login TIMESTAMP, is_active BOOLEAN DEFAULT 1 ) ''') # Sessions table self.conn.execute(''' CREATE TABLE IF NOT EXISTS sessions ( session_id TEXT PRIMARY KEY, user_id INTEGER NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP NOT NULL, ip_address TEXT, user_agent TEXT, FOREIGN KEY (user_id) REFERENCES users (id) ) ''') # Audit log table self.conn.execute(''' CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, action TEXT NOT NULL, details TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id) ) ''') # Create indexes for performance self.conn.execute(''' CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON sessions(user_id) ''') self.conn.execute(''' CREATE INDEX IF NOT EXISTS idx_sessions_expires_at ON sessions(expires_at) ''') self.conn.execute(''' CREATE INDEX IF NOT EXISTS idx_audit_user_id ON audit_log(user_id) ''') self.conn.execute(''' CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_log(timestamp) ''') self.conn.commit() logger.info("Database tables created/verified") def get_user_by_username(self, username: str) -> Optional[Dict]: """ Get user by username. Args: username: Username to look up Returns: User dictionary or None if not found """ try: cursor = self.conn.execute(''' SELECT * FROM users WHERE username = ? AND is_active = 1 ''', (username,)) row = cursor.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"Error fetching user '{username}': {e}") return None def get_user_by_id(self, user_id: int) -> Optional[Dict]: """ Get user by ID. Args: user_id: User ID to look up Returns: User dictionary or None if not found """ try: cursor = self.conn.execute(''' SELECT * FROM users WHERE id = ? AND is_active = 1 ''', (user_id,)) row = cursor.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"Error fetching user ID {user_id}: {e}") return None def create_user( self, username: str, password_hash: Optional[str] = None, email: Optional[str] = None, full_name: Optional[str] = None, auth_method: str = 'local' ) -> Optional[int]: """ Create a new user. Args: username: Unique username password_hash: Hashed password (for local auth) email: User email full_name: User's full name auth_method: Authentication method ('local' or 'sso') Returns: User ID if successful, None otherwise """ try: cursor = self.conn.execute(''' INSERT INTO users (username, password_hash, email, full_name, auth_method) VALUES (?, ?, ?, ?, ?) ''', (username, password_hash, email, full_name, auth_method)) self.conn.commit() user_id = cursor.lastrowid logger.info(f"Created user '{username}' (ID: {user_id})") return user_id except sqlite3.IntegrityError: logger.warning(f"User '{username}' already exists") return None except Exception as e: logger.error(f"Error creating user '{username}': {e}") return None def update_last_login(self, user_id: int): """ Update user's last login timestamp. Args: user_id: User ID """ try: self.conn.execute(''' UPDATE users SET last_login = CURRENT_TIMESTAMP WHERE id = ? ''', (user_id,)) self.conn.commit() except Exception as e: logger.error(f"Error updating last login for user {user_id}: {e}") def create_session( self, user_id: int, session_id: str, expires_in_hours: int = 24, ip_address: Optional[str] = None, user_agent: Optional[str] = None ) -> bool: """ Create new session for user. Args: user_id: User ID session_id: Unique session identifier expires_in_hours: Session expiration time in hours ip_address: Client IP address user_agent: Client user agent string Returns: True if successful """ try: expires_at = datetime.now() + timedelta(hours=expires_in_hours) self.conn.execute(''' INSERT INTO sessions (session_id, user_id, expires_at, ip_address, user_agent) VALUES (?, ?, ?, ?, ?) ''', (session_id, user_id, expires_at, ip_address, user_agent)) self.conn.commit() logger.info(f"Created session for user {user_id}") return True except Exception as e: logger.error(f"Error creating session: {e}") return False def get_session(self, session_id: str) -> Optional[Dict]: """ Get session by ID. Args: session_id: Session ID to look up Returns: Session dictionary or None if not found/expired """ try: cursor = self.conn.execute(''' SELECT s.*, u.username, u.email, u.full_name FROM sessions s JOIN users u ON s.user_id = u.id WHERE s.session_id = ? AND s.expires_at > CURRENT_TIMESTAMP ''', (session_id,)) row = cursor.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"Error fetching session: {e}") return None def delete_session(self, session_id: str) -> bool: """ Delete session (logout). Args: session_id: Session ID to delete Returns: True if successful """ try: self.conn.execute('DELETE FROM sessions WHERE session_id = ?', (session_id,)) self.conn.commit() logger.info(f"Deleted session {session_id}") return True except Exception as e: logger.error(f"Error deleting session: {e}") return False def cleanup_expired_sessions(self): """Remove expired sessions from database.""" try: cursor = self.conn.execute(''' DELETE FROM sessions WHERE expires_at < CURRENT_TIMESTAMP ''') self.conn.commit() deleted_count = cursor.rowcount if deleted_count > 0: logger.info(f"Cleaned up {deleted_count} expired sessions") except Exception as e: logger.error(f"Error cleaning up sessions: {e}") def log_action(self, user_id: int, action: str, details: Optional[str] = None): """ Log user action to audit trail. Args: user_id: User ID performing the action action: Action description (e.g., "login", "update_metadata", "export") details: Optional additional details (JSON string or plain text) """ try: self.conn.execute(''' INSERT INTO audit_log (user_id, action, details) VALUES (?, ?, ?) ''', (user_id, action, details)) self.conn.commit() except Exception as e: logger.error(f"Error logging action: {e}") def get_user_activity( self, user_id: int, limit: int = 100, offset: int = 0 ) -> List[Dict]: """ Get user activity log. Args: user_id: User ID limit: Maximum number of records to return offset: Offset for pagination Returns: List of activity records """ try: cursor = self.conn.execute(''' SELECT * FROM audit_log WHERE user_id = ? ORDER BY timestamp DESC LIMIT ? OFFSET ? ''', (user_id, limit, offset)) return [dict(row) for row in cursor.fetchall()] except Exception as e: logger.error(f"Error fetching user activity: {e}") return [] def get_all_users(self, include_inactive: bool = False) -> List[Dict]: """ Get all users. Args: include_inactive: Include inactive users Returns: List of user dictionaries """ try: query = 'SELECT * FROM users' if not include_inactive: query += ' WHERE is_active = 1' query += ' ORDER BY created_at DESC' cursor = self.conn.execute(query) return [dict(row) for row in cursor.fetchall()] except Exception as e: logger.error(f"Error fetching users: {e}") return [] def get_stats(self) -> Dict: """ Get database statistics. Returns: Statistics dictionary """ try: stats = {} # Count users cursor = self.conn.execute('SELECT COUNT(*) as count FROM users WHERE is_active = 1') stats['active_users'] = cursor.fetchone()['count'] # Count active sessions cursor = self.conn.execute(''' SELECT COUNT(*) as count FROM sessions WHERE expires_at > CURRENT_TIMESTAMP ''') stats['active_sessions'] = cursor.fetchone()['count'] # Count audit log entries cursor = self.conn.execute('SELECT COUNT(*) as count FROM audit_log') stats['audit_entries'] = cursor.fetchone()['count'] # Recent activity (last 24 hours) cursor = self.conn.execute(''' SELECT COUNT(*) as count FROM audit_log WHERE timestamp > datetime('now', '-24 hours') ''') stats['recent_activity'] = cursor.fetchone()['count'] return stats except Exception as e: logger.error(f"Error fetching stats: {e}") return {} def close(self): """Close database connection.""" try: self.conn.close() logger.info("Database connection closed") except Exception as e: logger.error(f"Error closing database: {e}")