semblance-dev/backend/app/routes/auth.py
Vadym Samoilenko 915c81b8f1 Complete phases D–G: quota enforcement, token invalidation, admin writes, backfill
Backend:
- token_version in JWT (bump_token_version, get_token_version on User model);
  jwt_required checks tv claim → 401 on mismatch; login routes embed version
- Quota pre-flight in all 3 LLM public methods (QuotaExceededError bubbles up)
- AI runner catches QuotaExceededError → sets status paused_quota + emits WS event
- Admin routes: POST /users (create), POST /users/<id>/reset-password,
  POST /pricing, GET /focus-groups with aggregated cost; PUT /users/<id>
  now bumps token_version on disable or role change
- backfill_usage.py: idempotent estimated-event generator for historical data,
  tiktoken for GPT models, char/3.8 for Gemini, --dry-run flag

Frontend:
- 402 interceptor dispatches quota_exceeded CustomEvent
- adminApi: createUser, resetPassword, createPricing, listFocusGroups
- UsersTab: New User dialog + Reset Password in edit dialog
- PricingTab: New Price dialog (model, provider, input/output/cached prices)
- FocusGroupsTab: focus groups table sorted by total cost
- Admin.tsx: 4th tab (Focus Groups)
- FocusGroupSession: admin-only cost badge + dismissable quota exceeded banner

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-24 18:34:48 +01:00

192 lines
7.4 KiB
Python
Executable file

from quart import Blueprint, request, jsonify
from app.auth.quart_jwt import create_access_token, jwt_required, get_jwt_identity
from app.models.user import User
from app.services.msal_service import MSALService
from app.utils.rate_limiter import rate_limit
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/register', methods=['POST'])
@rate_limit(max_requests=5, window_seconds=60)
async def register():
data = await request.get_json()
if not data or not data.get('username') or not data.get('email') or not data.get('password'):
return jsonify({"message": "Missing required fields"}), 400
username = data.get('username')
email = data.get('email')
password = data.get('password')
# Check if user already exists
if await User.find_by_username(username):
return jsonify({"message": "Username already taken"}), 409
if await User.find_by_email(email):
return jsonify({"message": "Email already registered"}), 409
# Create new user
hashed_password = User.hash_password(password)
new_user = User(username=username, email=email, password_hash=hashed_password)
user_id = await new_user.save()
# Generate access token
access_token = create_access_token(identity=str(user_id))
return jsonify({
"message": "User registered successfully",
"access_token": access_token,
"user": new_user.to_dict()
}), 201
@auth_bp.route('/login', methods=['POST'])
@rate_limit(max_requests=5, window_seconds=60)
async def login():
try:
data = await request.get_json()
if not data or not data.get('username') or not data.get('password'):
return jsonify({"message": "Missing username or password"}), 400
username = data.get('username')
password = data.get('password')
# Find user in database
try:
user_data = await User.find_by_username(username)
if not user_data:
return jsonify({"message": "Invalid username or password"}), 401
# Check password
if not User.check_password(user_data['password_hash'], password):
return jsonify({"message": "Invalid username or password"}), 401
# Generate access token (embed token_version for invalidation support)
tv = user_data.get("token_version", 0)
access_token = create_access_token(identity=str(user_data['_id']), token_version=tv)
return jsonify({
"message": "Login successful",
"access_token": access_token,
"user": {
"username": user_data['username'],
"email": user_data['email'],
"role": user_data.get('role', 'user')
}
}), 200
except Exception as e:
import logging
logging.getLogger(__name__).error(f"Database error during login: {e}")
return jsonify({"message": "Database error, please try again later"}), 500
except Exception as e:
print(f"Unexpected error in login route: {e}")
return jsonify({"message": "Internal server error"}), 500
@auth_bp.route('/me', methods=['GET'])
@jwt_required()
async def get_profile():
import logging
user_id = get_jwt_identity()
try:
user_data = await User.find_by_id(user_id)
if not user_data:
return jsonify({"message": "User not found"}), 404
return jsonify({
"username": user_data['username'],
"email": user_data['email'],
"role": user_data.get('role', 'user')
}), 200
except Exception as e:
logging.getLogger(__name__).error(f"Error in get_profile: {e}")
return jsonify({"message": "Internal server error"}), 500
@auth_bp.route('/microsoft', methods=['POST'])
async def microsoft_login():
"""Handle Microsoft OAuth authentication."""
try:
data = await request.get_json()
if not data or not data.get('id_token'):
return jsonify({"message": "Missing Microsoft ID token"}), 400
id_token = data.get('id_token')
# Initialize MSAL service and validate the token
msal_service = MSALService()
microsoft_user_info = msal_service.validate_token(id_token)
if not microsoft_user_info:
return jsonify({"message": "Invalid Microsoft ID token"}), 401
microsoft_id = microsoft_user_info.get('microsoft_id')
email = microsoft_user_info.get('email')
if not microsoft_id or not email:
return jsonify({"message": "Unable to retrieve user information from Microsoft"}), 400
# Try to find existing user by Microsoft ID or email
existing_user = None
try:
# First try to find by Microsoft ID
existing_user = await User.find_by_microsoft_id(microsoft_id)
# If not found by Microsoft ID, try by email
if not existing_user:
existing_user = await User.find_by_email(email)
# If found by email but no Microsoft ID, update the user to link Microsoft account
if existing_user and not existing_user.get('microsoft_id'):
await User.update_microsoft_id(existing_user['_id'], microsoft_id)
existing_user['microsoft_id'] = microsoft_id
existing_user['auth_type'] = 'microsoft'
except Exception as e:
print(f"Database error during Microsoft user lookup: {e}")
# Continue to create new user if lookup fails
# Create new user if not found
if not existing_user:
try:
user_data = msal_service.create_user_data(microsoft_user_info)
new_user = User(**user_data)
user_id = await new_user.save()
existing_user = {
"_id": user_id,
"username": user_data['username'],
"email": user_data['email'],
"role": user_data['role'],
"auth_type": user_data['auth_type'],
"microsoft_id": user_data['microsoft_id']
}
print(f"Created new Microsoft user: {email}")
except Exception as e:
print(f"Error creating Microsoft user: {e}")
return jsonify({"message": "Failed to create user account"}), 500
# Generate our backend JWT access token (embed token_version for invalidation support)
_tv = existing_user.get("token_version", 0)
access_token = create_access_token(identity=str(existing_user['_id']), token_version=_tv)
# Return response in same format as local login
return jsonify({
"message": "Microsoft login successful",
"access_token": access_token,
"user": {
"username": existing_user['username'],
"email": existing_user['email'],
"role": existing_user.get('role', 'user'),
"auth_type": "microsoft"
}
}), 200
except Exception as e:
print(f"Unexpected error in Microsoft login route: {e}")
return jsonify({"message": "Internal server error"}), 500