import bcrypt from bson import ObjectId from app.db import get_db class User: def __init__(self, username, email, password_hash=None, role="user", auth_type="local", microsoft_id=None): self.username = username self.email = email self.password_hash = password_hash self.role = role self.auth_type = auth_type self.microsoft_id = microsoft_id @staticmethod def hash_password(password): salt = bcrypt.gensalt() hashed = bcrypt.hashpw(password.encode('utf-8'), salt) return hashed.decode('utf-8') @staticmethod def check_password(password_hash, password): return bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')) @staticmethod async def find_by_username(username): db = await get_db() user_data = await db.users.find_one({"username": username}) return user_data @staticmethod async def find_by_email(email): db = await get_db() user_data = await db.users.find_one({"email": email}) return user_data @staticmethod async def find_by_id(user_id): db = await get_db() user_data = await db.users.find_one({"_id": ObjectId(user_id)}) return user_data @staticmethod async def find_by_microsoft_id(microsoft_id): db = await get_db() user_data = await db.users.find_one({"microsoft_id": microsoft_id}) return user_data @staticmethod async def update_microsoft_id(user_id, microsoft_id): db = await get_db() result = await db.users.update_one( {"_id": ObjectId(user_id)}, {"$set": {"microsoft_id": microsoft_id, "auth_type": "microsoft"}} ) return result.modified_count > 0 @staticmethod async def find_all(query: dict = None, skip: int = 0, limit: int = 50) -> list: db = await get_db() cursor = db.users.find(query or {}).skip(skip).limit(limit).sort("username", 1) return await cursor.to_list(length=limit) @staticmethod async def count(query: dict = None) -> int: db = await get_db() return await db.users.count_documents(query or {}) @staticmethod async def update(user_id, fields: dict) -> bool: db = await get_db() result = await db.users.update_one( {"_id": ObjectId(user_id)}, {"$set": fields} ) return result.matched_count > 0 @staticmethod async def bump_token_version(user_id) -> int: db = await get_db() result = await db.users.find_one_and_update( {"_id": ObjectId(user_id)}, {"$inc": {"token_version": 1}}, return_document=True ) return result.get("token_version", 1) if result else 1 @staticmethod async def get_token_version(user_id) -> int: db = await get_db() doc = await db.users.find_one({"_id": ObjectId(user_id)}, {"token_version": 1}) return doc.get("token_version", 0) if doc else 0 def to_dict(self): return { "username": self.username, "email": self.email, "role": self.role, "auth_type": self.auth_type, "microsoft_id": self.microsoft_id } async def save(self): db = await get_db() user_data = { "username": self.username, "email": self.email, "password_hash": self.password_hash, "role": self.role, "auth_type": self.auth_type, "microsoft_id": self.microsoft_id } result = await db.users.insert_one(user_data) return result.inserted_id