contract-query/backend/app/api/v1/auth.py
2025-08-14 15:03:33 -05:00

303 lines
No EOL
10 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status # type: ignore
from fastapi.security import HTTPBearer # type: ignore
from motor.motor_asyncio import AsyncIOMotorDatabase # type: ignore
from pydantic import BaseModel # type: ignore
from datetime import timedelta
from bson import ObjectId # type: ignore
import logging
from ...config.database import get_database
from ...config.settings import settings
from ...core.security import verify_password, get_password_hash, create_access_token
from ...models.user import UserInDB, UserCreate, UserRole, UserResponse, AuthMethod
from ...core.auth import get_current_active_user
from ...services.sso_service import sso_service
router = APIRouter()
logger = logging.getLogger(__name__)
class LoginRequest(BaseModel):
email: str
password: str
class LoginResponse(BaseModel):
access_token: str
token_type: str
user: dict
class RegisterRequest(BaseModel):
email: str
password: str
role: UserRole = UserRole.USER
class SSOLoginRequest(BaseModel):
access_token: str
class SSOConfigResponse(BaseModel):
client_id: str
authority: str
redirect_uri: str
enabled: bool
@router.post("/login", response_model=LoginResponse)
async def login(
login_data: LoginRequest,
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Authenticate user with local credentials and return access token"""
# Find user by email
user = await db.users.find_one({"email": login_data.email})
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
user_obj = UserInDB(**user)
# Check if user has a local password (for local auth)
if not user_obj.hashed_password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User account requires SSO authentication",
headers={"WWW-Authenticate": "Bearer"},
)
# Verify password
if not verify_password(login_data.password, user_obj.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Check if user is active
if not user_obj.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
# Create access token
access_token = create_access_token(data={"sub": str(user_obj.id)})
return LoginResponse(
access_token=access_token,
token_type="bearer",
user={
"id": str(user_obj.id),
"email": user_obj.email,
"role": user_obj.role,
"is_active": user_obj.is_active,
"index_access": user_obj.index_access
}
)
@router.post("/register", response_model=dict)
async def register(
register_data: RegisterRequest,
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Register a new user"""
# Check if user already exists
existing_user = await db.users.find_one({"email": register_data.email})
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User with this email already exists"
)
# Create new user
hashed_password = get_password_hash(register_data.password)
user_data = UserCreate(
email=register_data.email,
password=register_data.password,
role=register_data.role
)
user_dict = user_data.dict()
user_dict["hashed_password"] = hashed_password
del user_dict["password"]
user_dict["index_access"] = []
# Insert user into database
result = await db.users.insert_one(user_dict)
return {"message": "User registered successfully", "user_id": str(result.inserted_id)}
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(
current_user: UserInDB = Depends(get_current_active_user)
):
"""Get current user information"""
return UserResponse(
_id=current_user.id,
email=current_user.email,
role=current_user.role,
is_active=current_user.is_active,
index_access=current_user.index_access,
auth_method=current_user.auth_method,
sso_provider=current_user.sso_provider,
sso_name=current_user.sso_name,
last_sso_login=current_user.last_sso_login,
created_at=current_user.created_at,
updated_at=current_user.updated_at
)
@router.post("/refresh", response_model=LoginResponse)
async def refresh_token(
current_user: UserInDB = Depends(get_current_active_user)
):
"""Refresh access token for active user"""
# Create new access token
access_token = create_access_token(data={"sub": str(current_user.id)})
return LoginResponse(
access_token=access_token,
token_type="bearer",
user={
"id": str(current_user.id),
"email": current_user.email,
"role": current_user.role,
"is_active": current_user.is_active,
"auth_method": current_user.auth_method,
"sso_provider": current_user.sso_provider,
"sso_name": current_user.sso_name,
"index_access": current_user.index_access
}
)
@router.post("/logout")
async def logout():
"""Logout user (client should discard token)"""
return {"message": "Logged out successfully"}
@router.get("/sso/config", response_model=SSOConfigResponse)
async def get_sso_config():
"""Get SSO configuration for frontend"""
if not settings.sso_enabled:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="SSO is not enabled"
)
if not all([settings.azure_client_id, settings.azure_authority, settings.azure_redirect_uri]):
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="SSO is not properly configured"
)
return SSOConfigResponse(
client_id=settings.azure_client_id,
authority=settings.azure_authority,
redirect_uri=settings.azure_redirect_uri,
enabled=settings.sso_enabled
)
@router.post("/sso/validate", response_model=LoginResponse)
async def sso_login(sso_data: SSOLoginRequest):
"""Validate SSO token and authenticate user"""
logger.info("=== SSO Login Request ===")
logger.info(f"SSO enabled: {settings.sso_enabled}")
logger.info(f"Token length: {len(sso_data.access_token) if sso_data.access_token else 'None'}")
if not settings.sso_enabled:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="SSO is not enabled"
)
try:
logger.info("Starting SSO token processing...")
# Process SSO login using the service
user = await sso_service.process_sso_login(sso_data.access_token)
logger.info(f"SSO processing successful, user: {user.email}")
# Create our internal JWT token
access_token = create_access_token(data={"sub": str(user.id)})
logger.info("Internal JWT token created successfully")
return LoginResponse(
access_token=access_token,
token_type="bearer",
user={
"id": str(user.id),
"email": user.email,
"role": user.role,
"is_active": user.is_active,
"auth_method": user.auth_method,
"sso_provider": user.sso_provider,
"sso_name": user.sso_name,
"index_access": user.index_access
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"SSO authentication failed: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"SSO authentication failed: {str(e)}"
)
@router.post("/login/local", response_model=LoginResponse)
async def local_login(
login_data: LoginRequest,
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Explicit local authentication (backup admin login)"""
if not settings.allow_local_admin:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Local authentication is disabled"
)
# Only allow admin@oliver.agency for local backup
if login_data.email != "admin@oliver.agency":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Local authentication only available for admin account"
)
# Use the same logic as regular login
return await login(login_data, db)
# Initialize default users
@router.post("/init-users")
async def init_default_users(
db: AsyncIOMotorDatabase = Depends(get_database)
):
"""Initialize default users (admin and user)"""
# Check if admin user exists
admin_exists = await db.users.find_one({"email": "admin@oliver.agency"})
if not admin_exists:
admin_user = {
"email": "admin@oliver.agency",
"hashed_password": get_password_hash("admin123"),
"role": UserRole.ADMIN,
"is_active": True,
"auth_method": AuthMethod.LOCAL,
"index_access": [],
"created_at": None,
"updated_at": None
}
await db.users.insert_one(admin_user)
# Check if regular user exists
user_exists = await db.users.find_one({"email": "user@oliver.agency"})
if not user_exists:
regular_user = {
"email": "user@oliver.agency",
"hashed_password": get_password_hash("user123"),
"role": UserRole.USER,
"is_active": True,
"auth_method": AuthMethod.LOCAL,
"index_access": [],
"created_at": None,
"updated_at": None
}
await db.users.insert_one(regular_user)
return {"message": "Default users initialized"}