143 lines
No EOL
5 KiB
Python
143 lines
No EOL
5 KiB
Python
from flask import Blueprint, request, jsonify
|
|
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity
|
|
from app.models.user import User
|
|
|
|
auth_bp = Blueprint('auth', __name__)
|
|
|
|
@auth_bp.route('/register', methods=['POST'])
|
|
def register():
|
|
data = request.get_json()
|
|
|
|
if not data or not data.get('username') or not data.get('email') or not data.get('password'):
|
|
return jsonify({"message": "Missing required fields"}), 400
|
|
|
|
username = data.get('username')
|
|
email = data.get('email')
|
|
password = data.get('password')
|
|
|
|
# Check if user already exists
|
|
if User.find_by_username(username):
|
|
return jsonify({"message": "Username already taken"}), 409
|
|
if User.find_by_email(email):
|
|
return jsonify({"message": "Email already registered"}), 409
|
|
|
|
# Create new user
|
|
hashed_password = User.hash_password(password)
|
|
new_user = User(username=username, email=email, password_hash=hashed_password)
|
|
user_id = new_user.save()
|
|
|
|
# Generate access token
|
|
access_token = create_access_token(identity=str(user_id))
|
|
|
|
return jsonify({
|
|
"message": "User registered successfully",
|
|
"access_token": access_token,
|
|
"user": new_user.to_dict()
|
|
}), 201
|
|
|
|
@auth_bp.route('/login', methods=['POST'])
|
|
def login():
|
|
try:
|
|
data = request.get_json()
|
|
|
|
if not data or not data.get('username') or not data.get('password'):
|
|
return jsonify({"message": "Missing username or password"}), 400
|
|
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
|
|
# Default credentials for development/testing
|
|
if username == "user" and password == "pass":
|
|
# Create a mock user with a valid ObjectId
|
|
from bson import ObjectId
|
|
default_id = str(ObjectId())
|
|
|
|
user_mock = {
|
|
"_id": default_id,
|
|
"username": "user",
|
|
"email": "user@example.com",
|
|
"role": "admin"
|
|
}
|
|
|
|
# Generate access token
|
|
access_token = create_access_token(identity=default_id)
|
|
|
|
return jsonify({
|
|
"message": "Login successful (default user)",
|
|
"access_token": access_token,
|
|
"user": {
|
|
"username": user_mock['username'],
|
|
"email": user_mock['email'],
|
|
"role": user_mock['role']
|
|
}
|
|
}), 200
|
|
|
|
# Try to find user in database
|
|
try:
|
|
# Find user by username
|
|
user_data = User.find_by_username(username)
|
|
if not user_data:
|
|
return jsonify({"message": "Invalid username or password"}), 401
|
|
|
|
# Check password
|
|
if not User.check_password(user_data['password_hash'], password):
|
|
return jsonify({"message": "Invalid username or password"}), 401
|
|
|
|
# Generate access token
|
|
access_token = create_access_token(identity=str(user_data['_id']))
|
|
|
|
return jsonify({
|
|
"message": "Login successful",
|
|
"access_token": access_token,
|
|
"user": {
|
|
"username": user_data['username'],
|
|
"email": user_data['email'],
|
|
"role": user_data.get('role', 'user')
|
|
}
|
|
}), 200
|
|
except Exception as e:
|
|
print(f"Database error during login: {e}")
|
|
# If we can't access the database but it's the default user, still allow login
|
|
if username == "user" and password == "pass":
|
|
# This was handled above
|
|
pass
|
|
else:
|
|
return jsonify({"message": "Database error, please try again later"}), 500
|
|
|
|
except Exception as e:
|
|
print(f"Unexpected error in login route: {e}")
|
|
return jsonify({"message": "Internal server error"}), 500
|
|
|
|
@auth_bp.route('/me', methods=['GET'])
|
|
@jwt_required()
|
|
def get_profile():
|
|
user_id = get_jwt_identity()
|
|
|
|
# Handle the default_id case specially
|
|
if user_id == "default_id":
|
|
# Return mock user data for default_id
|
|
return jsonify({
|
|
"username": "user",
|
|
"email": "user@example.com",
|
|
"role": "admin"
|
|
}), 200
|
|
|
|
try:
|
|
user_data = User.find_by_id(user_id)
|
|
|
|
if not user_data:
|
|
return jsonify({"message": "User not found"}), 404
|
|
|
|
return jsonify({
|
|
"username": user_data['username'],
|
|
"email": user_data['email'],
|
|
"role": user_data.get('role', 'user')
|
|
}), 200
|
|
except Exception as e:
|
|
print(f"Error in get_profile: {e}")
|
|
# If there's an error, still return default user data
|
|
return jsonify({
|
|
"username": "user",
|
|
"email": "user@example.com",
|
|
"role": "user"
|
|
}), 200 |