- Fix missing await on FocusGroup.get_messages() (N-L1) - Replace time.sleep with asyncio.sleep in key_theme_service and focus_group_service (N-P10) - Replace flask import with quart in focus_groups.py (N-S3) - Add logger.error before all 500 returns in focus_groups.py (N-P6) - Add logging to silent except blocks across routes (N-M10, N-M11) - Add @rate_limit to 6 remaining AI endpoints (N-H4) - Add --confirm flag to populate scripts before delete_many (S-H2) - Remove hardcoded Azure ID fallbacks from msal_service.py and msalConfig.ts (A-M2, F-H4) - Centralize make_serializable() in utils.py, remove duplicates from 3 route files (N-P7) - Replace all datetime.utcnow() with datetime.now(timezone.utc) across entire backend (M-L2) - AuthContext.tsx: only mark token validated on 200 success, not on non-401 errors (F-H2) - Rename authType → auth_type in auth.py (N-S4) - Add security_report.md and security_report.pdf with full 92-finding status Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
190 lines
7.1 KiB
Python
Executable file
190 lines
7.1 KiB
Python
Executable file
from quart import Blueprint, request, jsonify
|
|
from app.auth.quart_jwt import create_access_token, jwt_required, get_jwt_identity
|
|
from app.models.user import User
|
|
from app.services.msal_service import MSALService
|
|
from app.utils.rate_limiter import rate_limit
|
|
|
|
auth_bp = Blueprint('auth', __name__)
|
|
|
|
@auth_bp.route('/register', methods=['POST'])
|
|
@rate_limit(max_requests=5, window_seconds=60)
|
|
async def register():
|
|
data = await request.get_json()
|
|
|
|
if not data or not data.get('username') or not data.get('email') or not data.get('password'):
|
|
return jsonify({"message": "Missing required fields"}), 400
|
|
|
|
username = data.get('username')
|
|
email = data.get('email')
|
|
password = data.get('password')
|
|
|
|
# Check if user already exists
|
|
if await User.find_by_username(username):
|
|
return jsonify({"message": "Username already taken"}), 409
|
|
if await User.find_by_email(email):
|
|
return jsonify({"message": "Email already registered"}), 409
|
|
|
|
# Create new user
|
|
hashed_password = User.hash_password(password)
|
|
new_user = User(username=username, email=email, password_hash=hashed_password)
|
|
user_id = await new_user.save()
|
|
|
|
# Generate access token
|
|
access_token = create_access_token(identity=str(user_id))
|
|
|
|
return jsonify({
|
|
"message": "User registered successfully",
|
|
"access_token": access_token,
|
|
"user": new_user.to_dict()
|
|
}), 201
|
|
|
|
@auth_bp.route('/login', methods=['POST'])
|
|
@rate_limit(max_requests=5, window_seconds=60)
|
|
async def login():
|
|
try:
|
|
data = await request.get_json()
|
|
|
|
if not data or not data.get('username') or not data.get('password'):
|
|
return jsonify({"message": "Missing username or password"}), 400
|
|
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
|
|
# Find user in database
|
|
try:
|
|
user_data = await User.find_by_username(username)
|
|
if not user_data:
|
|
return jsonify({"message": "Invalid username or password"}), 401
|
|
|
|
# Check password
|
|
if not User.check_password(user_data['password_hash'], password):
|
|
return jsonify({"message": "Invalid username or password"}), 401
|
|
|
|
# Generate access token
|
|
access_token = create_access_token(identity=str(user_data['_id']))
|
|
|
|
return jsonify({
|
|
"message": "Login successful",
|
|
"access_token": access_token,
|
|
"user": {
|
|
"username": user_data['username'],
|
|
"email": user_data['email'],
|
|
"role": user_data.get('role', 'user')
|
|
}
|
|
}), 200
|
|
except Exception as e:
|
|
import logging
|
|
logging.getLogger(__name__).error(f"Database error during login: {e}")
|
|
return jsonify({"message": "Database error, please try again later"}), 500
|
|
|
|
except Exception as e:
|
|
print(f"Unexpected error in login route: {e}")
|
|
return jsonify({"message": "Internal server error"}), 500
|
|
|
|
@auth_bp.route('/me', methods=['GET'])
|
|
@jwt_required()
|
|
async def get_profile():
|
|
import logging
|
|
user_id = get_jwt_identity()
|
|
|
|
try:
|
|
user_data = await User.find_by_id(user_id)
|
|
|
|
if not user_data:
|
|
return jsonify({"message": "User not found"}), 404
|
|
|
|
return jsonify({
|
|
"username": user_data['username'],
|
|
"email": user_data['email'],
|
|
"role": user_data.get('role', 'user')
|
|
}), 200
|
|
except Exception as e:
|
|
logging.getLogger(__name__).error(f"Error in get_profile: {e}")
|
|
return jsonify({"message": "Internal server error"}), 500
|
|
|
|
@auth_bp.route('/microsoft', methods=['POST'])
|
|
async def microsoft_login():
|
|
"""Handle Microsoft OAuth authentication."""
|
|
try:
|
|
data = await request.get_json()
|
|
|
|
if not data or not data.get('id_token'):
|
|
return jsonify({"message": "Missing Microsoft ID token"}), 400
|
|
|
|
id_token = data.get('id_token')
|
|
|
|
# Initialize MSAL service and validate the token
|
|
msal_service = MSALService()
|
|
microsoft_user_info = msal_service.validate_token(id_token)
|
|
|
|
if not microsoft_user_info:
|
|
return jsonify({"message": "Invalid Microsoft ID token"}), 401
|
|
|
|
microsoft_id = microsoft_user_info.get('microsoft_id')
|
|
email = microsoft_user_info.get('email')
|
|
|
|
if not microsoft_id or not email:
|
|
return jsonify({"message": "Unable to retrieve user information from Microsoft"}), 400
|
|
|
|
# Try to find existing user by Microsoft ID or email
|
|
existing_user = None
|
|
try:
|
|
# First try to find by Microsoft ID
|
|
existing_user = await User.find_by_microsoft_id(microsoft_id)
|
|
|
|
# If not found by Microsoft ID, try by email
|
|
if not existing_user:
|
|
existing_user = await User.find_by_email(email)
|
|
|
|
# If found by email but no Microsoft ID, update the user to link Microsoft account
|
|
if existing_user and not existing_user.get('microsoft_id'):
|
|
await User.update_microsoft_id(existing_user['_id'], microsoft_id)
|
|
existing_user['microsoft_id'] = microsoft_id
|
|
existing_user['auth_type'] = 'microsoft'
|
|
|
|
except Exception as e:
|
|
print(f"Database error during Microsoft user lookup: {e}")
|
|
# Continue to create new user if lookup fails
|
|
|
|
# Create new user if not found
|
|
if not existing_user:
|
|
try:
|
|
user_data = msal_service.create_user_data(microsoft_user_info)
|
|
new_user = User(**user_data)
|
|
user_id = await new_user.save()
|
|
|
|
existing_user = {
|
|
"_id": user_id,
|
|
"username": user_data['username'],
|
|
"email": user_data['email'],
|
|
"role": user_data['role'],
|
|
"auth_type": user_data['auth_type'],
|
|
"microsoft_id": user_data['microsoft_id']
|
|
}
|
|
|
|
print(f"Created new Microsoft user: {email}")
|
|
|
|
except Exception as e:
|
|
print(f"Error creating Microsoft user: {e}")
|
|
return jsonify({"message": "Failed to create user account"}), 500
|
|
|
|
# Generate our backend JWT access token
|
|
access_token = create_access_token(identity=str(existing_user['_id']))
|
|
|
|
# Return response in same format as local login
|
|
return jsonify({
|
|
"message": "Microsoft login successful",
|
|
"access_token": access_token,
|
|
"user": {
|
|
"username": existing_user['username'],
|
|
"email": existing_user['email'],
|
|
"role": existing_user.get('role', 'user'),
|
|
"auth_type": "microsoft"
|
|
}
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
print(f"Unexpected error in Microsoft login route: {e}")
|
|
return jsonify({"message": "Internal server error"}), 500
|
|
|
|
|