cohorta/backend/app/models/user.py
Vadym Samoilenko 5491d2d73d
Some checks failed
Deploy to Production / deploy (push) Failing after 0s
Rebrand to Cohorta + full UI redesign + registration with email verification
- Complete dark-theme redesign inspired by ai-impress.com (navy + cyan + violet palette)
- New Syne display font + gradient logo mark + SVG favicon
- New Navigation: glass-morphism, gradient logo, Get Started CTA
- New Hero: animated glow orbs, mock focus-group chat UI, stats row
- New landing: Features grid, How-It-Works steps, CTA banner
- New Footer: AImpress LTD branding, © AImpress LTD. All rights reserved.
- New Login page: dark card, password visibility toggle, link to Register
- New Register page: full form, benefits row, 50 free credits pitch
- New VerifyEmail page: token verification flow with auto-redirect
- Backend: email_service.py using Resend API for verification emails
- Backend: /api/auth/register, /verify-email, /resend-verification endpoints
- User model: email_verified, email_verify_token, email_verify_expires fields
- Gitea Actions CI/CD: auto-deploy to aimpress server on push to main

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 18:40:08 +01:00

133 lines
4.6 KiB
Python
Executable file

import bcrypt
from bson import ObjectId
from app.db import get_db
class User:
def __init__(self, username, email, password_hash=None, role="user", auth_type="local",
email_verified=False, email_verify_token=None, email_verify_expires=None):
self.username = username
self.email = email
self.password_hash = password_hash
self.role = role
self.auth_type = auth_type
self.email_verified = email_verified
self.email_verify_token = email_verify_token
self.email_verify_expires = email_verify_expires
@staticmethod
def hash_password(password):
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
@staticmethod
def check_password(password_hash, password):
return bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8'))
@staticmethod
async def find_by_username(username):
db = await get_db()
user_data = await db.users.find_one({"username": username})
return user_data
@staticmethod
async def find_by_email(email):
db = await get_db()
user_data = await db.users.find_one({"email": email})
return user_data
@staticmethod
async def find_by_id(user_id):
db = await get_db()
user_data = await db.users.find_one({"_id": ObjectId(user_id)})
return user_data
@staticmethod
async def find_all(query: dict = None, skip: int = 0, limit: int = 50) -> list:
db = await get_db()
cursor = db.users.find(query or {}).skip(skip).limit(limit).sort("username", 1)
return await cursor.to_list(length=limit)
@staticmethod
async def count(query: dict = None) -> int:
db = await get_db()
return await db.users.count_documents(query or {})
@staticmethod
async def update(user_id, fields: dict) -> bool:
db = await get_db()
result = await db.users.update_one(
{"_id": ObjectId(user_id)},
{"$set": fields}
)
return result.matched_count > 0
@staticmethod
async def bump_token_version(user_id) -> int:
db = await get_db()
result = await db.users.find_one_and_update(
{"_id": ObjectId(user_id)},
{"$inc": {"token_version": 1}},
return_document=True
)
return result.get("token_version", 1) if result else 1
@staticmethod
async def get_token_version(user_id) -> int:
db = await get_db()
doc = await db.users.find_one({"_id": ObjectId(user_id)}, {"token_version": 1})
return doc.get("token_version", 0) if doc else 0
def to_dict(self):
return {
"username": self.username,
"email": self.email,
"role": self.role,
"auth_type": self.auth_type,
}
@staticmethod
async def deduct_credits(user_id: str, amount: int) -> int | None:
"""Atomically deduct credits. Returns new balance, or None if insufficient funds."""
db = await get_db()
result = await db.users.find_one_and_update(
{"_id": ObjectId(user_id), "credits_balance": {"$gte": amount}},
{"$inc": {"credits_balance": -amount}},
return_document=True,
projection={"credits_balance": 1},
)
if result is None:
return None
return result["credits_balance"]
@staticmethod
async def grant_credits(user_id: str, amount: int) -> int:
"""Add credits to user balance. Returns new balance."""
db = await get_db()
result = await db.users.find_one_and_update(
{"_id": ObjectId(user_id)},
{"$inc": {"credits_balance": amount}},
return_document=True,
upsert=False,
projection={"credits_balance": 1},
)
return result["credits_balance"] if result else 0
async def save(self):
db = await get_db()
user_data = {
"username": self.username,
"email": self.email,
"password_hash": self.password_hash,
"role": self.role,
"auth_type": self.auth_type,
"credits_balance": 0,
"email_verified": self.email_verified,
}
if self.email_verify_token:
user_data["email_verify_token"] = self.email_verify_token
if self.email_verify_expires:
user_data["email_verify_expires"] = self.email_verify_expires
result = await db.users.insert_one(user_data)
return result.inserted_id