#!/usr/bin/env python3 """Create test users for the accessible video platform.""" import asyncio from datetime import datetime from passlib.context import CryptContext from motor.motor_asyncio import AsyncIOMotorClient from app.core.config import settings from app.models.user import UserRole pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") async def create_test_users(): """Create test users in the database.""" print("Connecting to MongoDB...") client = AsyncIOMotorClient(settings.mongodb_uri) db = client[settings.mongodb_db] # Test connection await client.admin.command('ping') print("Connected to MongoDB successfully") users_collection = db.users # Check if users already exist existing_admin = await users_collection.find_one({"email": "admin@example.com"}) existing_reviewer = await users_collection.find_one({"email": "reviewer@example.com"}) test_users = [ { "_id": "admin-001", "email": "admin@example.com", "hashed_password": pwd_context.hash("admin"), "full_name": "Admin User", "role": UserRole.ADMIN.value, "auth_provider": "local", "is_active": True, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow(), }, { "_id": "reviewer-001", "email": "reviewer@example.com", "hashed_password": pwd_context.hash("reviewer"), "full_name": "Reviewer User", "role": UserRole.REVIEWER.value, "auth_provider": "local", "is_active": True, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow(), }, { "_id": "production-001", "email": "production@example.com", "hashed_password": pwd_context.hash("production"), "full_name": "Production User", "role": UserRole.PRODUCTION.value, "auth_provider": "local", "is_active": True, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow(), }, { "_id": "client-001", "email": "client@example.com", "hashed_password": pwd_context.hash("client123"), "full_name": "Client User", "role": UserRole.CLIENT.value, "auth_provider": "local", "is_active": True, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow(), } ] for user in test_users: existing = await users_collection.find_one({"email": user["email"]}) if existing: print(f"User {user['email']} already exists, skipping...") continue result = await users_collection.insert_one(user) print(f"Created user: {user['email']} (ID: {result.inserted_id})") # Show all users print("\nAll users in database:") async for user in users_collection.find({}, {"email": 1, "role": 1, "is_active": 1}): print(f" {user['email']} - {user['role']} - Active: {user['is_active']}") client.close() print("Done!") if __name__ == "__main__": asyncio.run(create_test_users())