from quart import Blueprint, request, jsonify from app.auth.quart_jwt import create_access_token, jwt_required, get_jwt_identity from app.models.user import User from app.services.msal_service import MSALService auth_bp = Blueprint('auth', __name__) @auth_bp.route('/register', methods=['POST']) async def register(): data = await request.get_json() if not data or not data.get('username') or not data.get('email') or not data.get('password'): return jsonify({"message": "Missing required fields"}), 400 username = data.get('username') email = data.get('email') password = data.get('password') # Check if user already exists if await User.find_by_username(username): return jsonify({"message": "Username already taken"}), 409 if await User.find_by_email(email): return jsonify({"message": "Email already registered"}), 409 # Create new user hashed_password = User.hash_password(password) new_user = User(username=username, email=email, password_hash=hashed_password) user_id = await new_user.save() # Generate access token access_token = create_access_token(identity=str(user_id)) return jsonify({ "message": "User registered successfully", "access_token": access_token, "user": new_user.to_dict() }), 201 @auth_bp.route('/login', methods=['POST']) async def login(): try: data = await request.get_json() if not data or not data.get('username') or not data.get('password'): return jsonify({"message": "Missing username or password"}), 400 username = data.get('username') password = data.get('password') # Default credentials for development/testing if username == "user" and password == "pass": # Create a mock user with a valid ObjectId from bson import ObjectId default_id = str(ObjectId()) user_mock = { "_id": default_id, "username": "user", "email": "user@example.com", "role": "admin" } # Generate access token access_token = create_access_token(identity=default_id) return jsonify({ "message": "Login successful (default user)", "access_token": access_token, "user": { "username": user_mock['username'], "email": user_mock['email'], "role": user_mock['role'] } }), 200 # Try to find user in database try: # Find user by username user_data = await User.find_by_username(username) if not user_data: return jsonify({"message": "Invalid username or password"}), 401 # Check password if not User.check_password(user_data['password_hash'], password): return jsonify({"message": "Invalid username or password"}), 401 # Generate access token access_token = create_access_token(identity=str(user_data['_id'])) return jsonify({ "message": "Login successful", "access_token": access_token, "user": { "username": user_data['username'], "email": user_data['email'], "role": user_data.get('role', 'user') } }), 200 except Exception as e: print(f"Database error during login: {e}") # If we can't access the database but it's the default user, still allow login if username == "user" and password == "pass": # This was handled above pass else: return jsonify({"message": "Database error, please try again later"}), 500 except Exception as e: print(f"Unexpected error in login route: {e}") return jsonify({"message": "Internal server error"}), 500 @auth_bp.route('/me', methods=['GET']) @jwt_required() async def get_profile(): user_id = get_jwt_identity() # Handle the default_id case specially if user_id == "default_id": # Return mock user data for default_id return jsonify({ "username": "user", "email": "user@example.com", "role": "admin" }), 200 try: user_data = await User.find_by_id(user_id) if not user_data: return jsonify({"message": "User not found"}), 404 return jsonify({ "username": user_data['username'], "email": user_data['email'], "role": user_data.get('role', 'user') }), 200 except Exception as e: print(f"Error in get_profile: {e}") # If there's an error, still return default user data return jsonify({ "username": "user", "email": "user@example.com", "role": "user" }), 200 @auth_bp.route('/microsoft', methods=['POST']) async def microsoft_login(): """Handle Microsoft OAuth authentication.""" try: data = await request.get_json() if not data or not data.get('id_token'): return jsonify({"message": "Missing Microsoft ID token"}), 400 id_token = data.get('id_token') # Initialize MSAL service and validate the token msal_service = MSALService() microsoft_user_info = msal_service.validate_token(id_token) if not microsoft_user_info: return jsonify({"message": "Invalid Microsoft ID token"}), 401 microsoft_id = microsoft_user_info.get('microsoft_id') email = microsoft_user_info.get('email') if not microsoft_id or not email: return jsonify({"message": "Unable to retrieve user information from Microsoft"}), 400 # Try to find existing user by Microsoft ID or email existing_user = None try: # First try to find by Microsoft ID existing_user = await User.find_by_microsoft_id(microsoft_id) # If not found by Microsoft ID, try by email if not existing_user: existing_user = await User.find_by_email(email) # If found by email but no Microsoft ID, update the user to link Microsoft account if existing_user and not existing_user.get('microsoft_id'): await User.update_microsoft_id(existing_user['_id'], microsoft_id) existing_user['microsoft_id'] = microsoft_id existing_user['auth_type'] = 'microsoft' except Exception as e: print(f"Database error during Microsoft user lookup: {e}") # Continue to create new user if lookup fails # Create new user if not found if not existing_user: try: user_data = msal_service.create_user_data(microsoft_user_info) new_user = User(**user_data) user_id = await new_user.save() existing_user = { "_id": user_id, "username": user_data['username'], "email": user_data['email'], "role": user_data['role'], "auth_type": user_data['auth_type'], "microsoft_id": user_data['microsoft_id'] } print(f"Created new Microsoft user: {email}") except Exception as e: print(f"Error creating Microsoft user: {e}") return jsonify({"message": "Failed to create user account"}), 500 # Generate our backend JWT access token access_token = create_access_token(identity=str(existing_user['_id'])) # Return response in same format as local login return jsonify({ "message": "Microsoft login successful", "access_token": access_token, "user": { "username": existing_user['username'], "email": existing_user['email'], "role": existing_user.get('role', 'user'), "authType": "microsoft" } }), 200 except Exception as e: print(f"Unexpected error in Microsoft login route: {e}") return jsonify({"message": "Internal server error"}), 500 @auth_bp.route('/refresh-token', methods=['POST']) async def refresh_token(): """Generate a new token for testing during JWT system migration.""" try: data = (await request.get_json()) or {} user_id = data.get('user_id', 'default_user') # Create a new token with our Quart-JWT system access_token = create_access_token(user_id) return jsonify({ "message": "Token refreshed successfully", "access_token": access_token, "user_id": user_id, "system": "quart-jwt" }), 200 except Exception as e: return jsonify({"message": f"Token refresh failed: {str(e)}"}), 500