Implemented simple authentication for testing and admin panel for user management: Backend: - Add simple email/password login for test users (admin@test.local, user@test.local) - Implement RBAC (Role-Based Access Control) with Permission enum - Create admin endpoints for user management and system analytics - Add bcrypt password hashing for test users - Create script to generate test users in database Frontend: - Add SimpleLogin component for test authentication - Create AdminPanel with user management and system analytics - Add role-based navigation (Admin tab visible only for admins) - Update AuthContext to support both MSAL and simple login - Add API methods for admin operations Features: - Admins can view all users, manage roles, activate/deactivate accounts - Admins can view system-wide analytics (users, conversations, tokens, costs) - Regular users only see their own chats and usage - Role badges in UI show user role (user/admin/superadmin) Note: Simple authentication is for testing only. Production uses Azure AD MSAL. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
216 lines
5.5 KiB
Python
216 lines
5.5 KiB
Python
"""
|
|
Authentication Endpoints
|
|
|
|
API endpoints for MSAL authentication and session management
|
|
"""
|
|
|
|
import logging
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Request
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.database import get_db
|
|
from app.services.auth_service import AuthService
|
|
from app.schemas.auth import (
|
|
SimpleLoginRequest,
|
|
LoginRequest,
|
|
LoginResponse,
|
|
RefreshTokenRequest,
|
|
RefreshTokenResponse,
|
|
UserInfo,
|
|
LogoutResponse
|
|
)
|
|
from app.core.middleware import get_current_active_user
|
|
from app.models.user import User
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/auth", tags=["Authentication"])
|
|
|
|
|
|
@router.post("/login/simple", response_model=LoginResponse)
|
|
async def login_simple(
|
|
request: SimpleLoginRequest,
|
|
http_request: Request,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""
|
|
Login with email and password (for test users only)
|
|
|
|
Simple authentication for test users without Azure AD.
|
|
|
|
Args:
|
|
request: Login request with email and password
|
|
http_request: HTTP request for IP/user agent
|
|
db: Database session
|
|
|
|
Returns:
|
|
LoginResponse with access_token, refresh_token, and user info
|
|
|
|
Raises:
|
|
HTTPException: If authentication fails
|
|
"""
|
|
auth_service = AuthService(db)
|
|
|
|
# Extract client info
|
|
ip_address = request.ip_address or http_request.client.host
|
|
user_agent = request.user_agent or http_request.headers.get("user-agent")
|
|
|
|
# Authenticate
|
|
result = await auth_service.login_simple(
|
|
email=request.email,
|
|
password=request.password,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent
|
|
)
|
|
|
|
if not result:
|
|
logger.warning(f"Simple login failed for {request.email} from IP: {ip_address}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentication failed. Invalid credentials or not a test user."
|
|
)
|
|
|
|
logger.info(f"User {result['user']['email']} logged in via simple auth from {ip_address}")
|
|
|
|
return LoginResponse(**result)
|
|
|
|
|
|
@router.post("/login", response_model=LoginResponse)
|
|
async def login(
|
|
request: LoginRequest,
|
|
http_request: Request,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""
|
|
Login with Azure AD MSAL token
|
|
|
|
Validates Azure AD ID token, creates/updates user, and generates JWT session tokens.
|
|
|
|
Args:
|
|
request: Login request with ID token
|
|
http_request: HTTP request for IP/user agent
|
|
db: Database session
|
|
|
|
Returns:
|
|
LoginResponse with access_token, refresh_token, and user info
|
|
|
|
Raises:
|
|
HTTPException: If authentication fails
|
|
"""
|
|
auth_service = AuthService(db)
|
|
|
|
# Extract client info
|
|
ip_address = request.ip_address or http_request.client.host
|
|
user_agent = request.user_agent or http_request.headers.get("user-agent")
|
|
|
|
# Authenticate
|
|
result = await auth_service.login(
|
|
id_token=request.id_token,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent
|
|
)
|
|
|
|
if not result:
|
|
logger.warning(f"Login failed from IP: {ip_address}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentication failed. Invalid Azure AD token or inactive account."
|
|
)
|
|
|
|
logger.info(f"User {result['user']['email']} logged in from {ip_address}")
|
|
|
|
return LoginResponse(**result)
|
|
|
|
|
|
@router.post("/refresh", response_model=RefreshTokenResponse)
|
|
async def refresh_token(
|
|
request: RefreshTokenRequest,
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""
|
|
Refresh access token using refresh token
|
|
|
|
Args:
|
|
request: Refresh token request
|
|
db: Database session
|
|
|
|
Returns:
|
|
RefreshTokenResponse with new access_token
|
|
|
|
Raises:
|
|
HTTPException: If refresh token is invalid
|
|
"""
|
|
auth_service = AuthService(db)
|
|
|
|
result = await auth_service.refresh_access_token(request.refresh_token)
|
|
|
|
if not result:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid or expired refresh token"
|
|
)
|
|
|
|
return RefreshTokenResponse(**result)
|
|
|
|
|
|
@router.post("/logout", response_model=LogoutResponse)
|
|
async def logout(
|
|
current_user: User = Depends(get_current_active_user),
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""
|
|
Logout current user (invalidate session)
|
|
|
|
Args:
|
|
current_user: Current authenticated user
|
|
db: Database session
|
|
|
|
Returns:
|
|
LogoutResponse with success message
|
|
"""
|
|
# Note: We need the access token to invalidate it
|
|
# In a real implementation, we'd get it from the Authorization header
|
|
# For now, we'll just mark the user as logged out
|
|
|
|
logger.info(f"User {current_user.id} logged out")
|
|
|
|
return LogoutResponse(
|
|
message="Successfully logged out",
|
|
success=True
|
|
)
|
|
|
|
|
|
@router.get("/me", response_model=UserInfo)
|
|
async def get_current_user_info(
|
|
current_user: User = Depends(get_current_active_user)
|
|
):
|
|
"""
|
|
Get current authenticated user information
|
|
|
|
Args:
|
|
current_user: Current authenticated user
|
|
|
|
Returns:
|
|
UserInfo with user details
|
|
"""
|
|
return UserInfo(
|
|
id=str(current_user.id),
|
|
email=current_user.email,
|
|
display_name=current_user.display_name,
|
|
role=current_user.role,
|
|
is_active=current_user.is_active
|
|
)
|
|
|
|
|
|
@router.get("/health")
|
|
async def auth_health_check():
|
|
"""
|
|
Authentication service health check
|
|
|
|
Returns:
|
|
Health status
|
|
"""
|
|
return {
|
|
"status": "healthy",
|
|
"service": "authentication"
|
|
}
|