import bcrypt from bson import ObjectId from app.db import get_db class User: def __init__(self, username, email, password_hash=None, role="user", auth_type="local", email_verified=False, email_verify_token=None, email_verify_expires=None, consent_terms_at=None, consent_data_processing_at=None): self.username = username self.email = email self.password_hash = password_hash self.role = role self.auth_type = auth_type self.email_verified = email_verified self.email_verify_token = email_verify_token self.email_verify_expires = email_verify_expires self.consent_terms_at = consent_terms_at self.consent_data_processing_at = consent_data_processing_at @staticmethod def hash_password(password): salt = bcrypt.gensalt() hashed = bcrypt.hashpw(password.encode('utf-8'), salt) return hashed.decode('utf-8') @staticmethod def check_password(password_hash, password): return bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')) @staticmethod async def find_by_username(username): db = await get_db() user_data = await db.users.find_one({"username": username}) return user_data @staticmethod async def find_by_email(email): db = await get_db() user_data = await db.users.find_one({"email": email}) return user_data @staticmethod async def find_by_id(user_id): db = await get_db() user_data = await db.users.find_one({"_id": ObjectId(user_id)}) return user_data @staticmethod async def find_all(query: dict = None, skip: int = 0, limit: int = 50) -> list: db = await get_db() cursor = db.users.find(query or {}).skip(skip).limit(limit).sort("username", 1) return await cursor.to_list(length=limit) @staticmethod async def count(query: dict = None) -> int: db = await get_db() return await db.users.count_documents(query or {}) @staticmethod async def update(user_id, fields: dict) -> bool: db = await get_db() result = await db.users.update_one( {"_id": ObjectId(user_id)}, {"$set": fields} ) return result.matched_count > 0 @staticmethod async def bump_token_version(user_id) -> int: db = await get_db() result = await db.users.find_one_and_update( {"_id": ObjectId(user_id)}, {"$inc": {"token_version": 1}}, return_document=True ) return result.get("token_version", 1) if result else 1 @staticmethod async def get_token_version(user_id) -> int: db = await get_db() doc = await db.users.find_one({"_id": ObjectId(user_id)}, {"token_version": 1}) return doc.get("token_version", 0) if doc else 0 def to_dict(self): return { "username": self.username, "email": self.email, "role": self.role, "auth_type": self.auth_type, } @staticmethod async def deduct_credits(user_id: str, amount: int) -> int | None: """Atomically deduct credits. Returns new balance, or None if insufficient funds.""" db = await get_db() result = await db.users.find_one_and_update( {"_id": ObjectId(user_id), "credits_balance": {"$gte": amount}}, {"$inc": {"credits_balance": -amount}}, return_document=True, projection={"credits_balance": 1}, ) if result is None: return None return result["credits_balance"] @staticmethod async def grant_credits(user_id: str, amount: int) -> int: """Add credits to user balance. Returns new balance.""" db = await get_db() result = await db.users.find_one_and_update( {"_id": ObjectId(user_id)}, {"$inc": {"credits_balance": amount}}, return_document=True, upsert=False, projection={"credits_balance": 1}, ) return result["credits_balance"] if result else 0 async def save(self): from datetime import datetime, timezone db = await get_db() now = datetime.now(timezone.utc) user_data = { "username": self.username, "email": self.email, "password_hash": self.password_hash, "role": self.role, "auth_type": self.auth_type, "credits_balance": 0, "email_verified": self.email_verified, "created_at": now, } if self.email_verify_token: user_data["email_verify_token"] = self.email_verify_token if self.email_verify_expires: user_data["email_verify_expires"] = self.email_verify_expires if self.consent_terms_at: user_data["consent_terms_at"] = self.consent_terms_at if self.consent_data_processing_at: user_data["consent_data_processing_at"] = self.consent_data_processing_at result = await db.users.insert_one(user_data) return result.inserted_id