Complete Flask → FastAPI migration with: - FastAPI app with session auth, Azure AD SSO, rate limiting - SQLite-backed session store (survives restarts) - Bulk AI metadata generation with SSE progress - Admin panel (user management, audit log, AI usage) - Subpath deployment support (ROOT_PATH config) - Docker + deploy.sh for production deployment - Test suite (auth, upload, templates, imports, admin, sessions) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
108 lines
3.2 KiB
Python
108 lines
3.2 KiB
Python
"""Admin service: user management, audit log, AI usage stats."""
|
|
|
|
import logging
|
|
from typing import Dict, List, Optional
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AdminService:
|
|
"""Business logic for admin operations."""
|
|
|
|
def __init__(self, database):
|
|
self.db = database
|
|
|
|
# --- User Management ---
|
|
|
|
def list_users(self, include_inactive: bool = False) -> List[Dict]:
|
|
"""Get all users with sanitized output (no password hashes)."""
|
|
users = self.db.get_all_users(include_inactive=include_inactive)
|
|
for user in users:
|
|
user.pop("password_hash", None)
|
|
return users
|
|
|
|
def get_user(self, user_id: int) -> Optional[Dict]:
|
|
"""Get single user by ID."""
|
|
user = self.db.get_user_by_id(user_id)
|
|
if user:
|
|
user.pop("password_hash", None)
|
|
return user
|
|
|
|
def create_user(
|
|
self,
|
|
username: str,
|
|
email: str = "",
|
|
full_name: str = "",
|
|
role: str = "user",
|
|
password: str = None,
|
|
auth_method: str = "local",
|
|
) -> Optional[int]:
|
|
"""Create a new user."""
|
|
password_hash = None
|
|
if password:
|
|
from werkzeug.security import generate_password_hash
|
|
password_hash = generate_password_hash(password)
|
|
|
|
return self.db.create_user(
|
|
username=username,
|
|
password_hash=password_hash,
|
|
email=email,
|
|
full_name=full_name,
|
|
auth_method=auth_method,
|
|
role=role,
|
|
)
|
|
|
|
def update_user(self, user_id: int, updates: Dict) -> bool:
|
|
"""Update user fields (role, is_active, full_name, email)."""
|
|
allowed_fields = {"role", "is_active", "full_name", "email"}
|
|
filtered = {k: v for k, v in updates.items() if k in allowed_fields}
|
|
if not filtered:
|
|
return False
|
|
return self.db.update_user(user_id, filtered)
|
|
|
|
def deactivate_user(self, user_id: int) -> bool:
|
|
"""Deactivate a user account."""
|
|
return self.db.update_user(user_id, {"is_active": 0})
|
|
|
|
def activate_user(self, user_id: int) -> bool:
|
|
"""Reactivate a user account."""
|
|
return self.db.update_user(user_id, {"is_active": 1})
|
|
|
|
# --- Audit Log ---
|
|
|
|
def get_audit_log(
|
|
self,
|
|
user_id: Optional[int] = None,
|
|
action: Optional[str] = None,
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
) -> List[Dict]:
|
|
"""Get audit log with optional filters."""
|
|
return self.db.get_audit_log(
|
|
user_id=user_id,
|
|
action=action,
|
|
limit=limit,
|
|
offset=offset,
|
|
)
|
|
|
|
# --- AI Usage Stats ---
|
|
|
|
def get_ai_usage_stats(self) -> Dict:
|
|
"""Get aggregate AI usage statistics."""
|
|
return self.db.get_ai_usage_stats()
|
|
|
|
def get_ai_usage_by_user(self, limit: int = 50) -> List[Dict]:
|
|
"""Get AI usage broken down by user."""
|
|
return self.db.get_ai_usage_by_user(limit=limit)
|
|
|
|
# --- Dashboard Stats ---
|
|
|
|
def get_dashboard_stats(self) -> Dict:
|
|
"""Get combined statistics for admin dashboard."""
|
|
db_stats = self.db.get_stats()
|
|
ai_stats = self.db.get_ai_usage_stats()
|
|
return {
|
|
**db_stats,
|
|
"ai_usage": ai_stats,
|
|
}
|