from datetime import datetime, timedelta from bson import ObjectId import database from database import users_collection, agents_collection, agent_usage_collection, agent_ratings_collection import auth from auth import hash_password, verify_password import re async def get_user_by_email(email: str): return await users_collection.find_one({"email": email}) async def get_user_by_azure_ad_id(azure_ad_id: str): """Get user by Azure AD Object ID""" return await users_collection.find_one({"azure_ad_id": azure_ad_id}) async def create_user(email: str, password: str = None, full_name: str = None, is_admin: bool = False, auth_provider: str = "local"): now = datetime.utcnow() user_doc = { "email": email, "full_name": full_name, "is_active": True, "is_admin": is_admin, "auth_provider": auth_provider, "created_at": now, "updated_at": now, } # Only add hashed_password for local auth users if auth_provider == "local" and password: user_doc["hashed_password"] = hash_password(password) result = await users_collection.insert_one(user_doc) user_doc["_id"] = result.inserted_id return user_doc async def create_or_update_azure_user(azure_profile: dict): """ Create or update user from Azure AD profile Returns the user document """ azure_ad_id = azure_profile.get("azure_ad_id") email = azure_profile.get("email") if not azure_ad_id or not email: raise ValueError("Azure AD profile missing required fields") now = datetime.utcnow() # Check if user exists by Azure AD ID first, then by email existing_user = await get_user_by_azure_ad_id(azure_ad_id) if not existing_user: existing_user = await get_user_by_email(email) if existing_user: # Update existing user with Azure AD info update_data = { "azure_ad_id": azure_ad_id, "email": email, "full_name": azure_profile.get("full_name") or existing_user.get("full_name"), "auth_provider": "azure_ad", "updated_at": now, } await users_collection.update_one( {"_id": existing_user["_id"]}, {"$set": update_data} ) # Return updated user return await users_collection.find_one({"_id": existing_user["_id"]}) else: # Create new user from Azure AD profile user_doc = { "azure_ad_id": azure_ad_id, "email": email, "full_name": azure_profile.get("full_name"), "is_active": True, "is_admin": False, # New users are not admin by default "auth_provider": "azure_ad", "created_at": now, "updated_at": now, } result = await users_collection.insert_one(user_doc) user_doc["_id"] = result.inserted_id return user_doc async def authenticate_user(email: str, password: str): """Authenticate user with local credentials (fallback method)""" print(f"๐Ÿ” CRUD: Authenticating user {email}") user = await get_user_by_email(email) if not user: print(f"๐Ÿ” CRUD: User {email} not found in database") return None print(f"๐Ÿ” CRUD: User found - auth_provider: {user.get('auth_provider')}") print(f"๐Ÿ” CRUD: Has hashed_password: {'hashed_password' in user}") # Only authenticate local users with password if user.get("auth_provider") != "local": print(f"๐Ÿ” CRUD: User is not local auth provider: {user.get('auth_provider')}") return None if not user.get("hashed_password"): print("๐Ÿ” CRUD: User has no hashed_password field") return None print("๐Ÿ” CRUD: Verifying password...") password_valid = verify_password(password, user["hashed_password"]) print(f"๐Ÿ” CRUD: Password verification result: {password_valid}") if not password_valid: return None print("๐Ÿ” CRUD: Authentication successful!") return user # Agent CRUD operations async def create_agent(agent_data: dict, user_id: str, skip_time_check: bool = False): # Check for duplicate agent names from the same user created recently (within 5 minutes) if not skip_time_check: from datetime import timedelta five_minutes_ago = datetime.utcnow() - timedelta(minutes=5) existing_agent = await agents_collection.find_one({ "agent_name": agent_data.get("agent_name"), "created_by": user_id, "created_at": {"$gte": five_minutes_ago} }) if existing_agent: raise ValueError(f"Agent with name '{agent_data.get('agent_name')}' was already created recently. Please wait before creating another agent with the same name.") now = datetime.utcnow() agent_doc = { **agent_data, "created_by": user_id, "created_at": now, "updated_at": now, } # Initialize quality audit fields if not provided if "quality_audit_status" not in agent_doc: agent_doc["quality_audit_status"] = False if "quality_audit_updated_by" not in agent_doc: agent_doc["quality_audit_updated_by"] = None if "quality_audit_updated_at" not in agent_doc: agent_doc["quality_audit_updated_at"] = None if "quality_audit_updated_by_name" not in agent_doc: agent_doc["quality_audit_updated_by_name"] = None if "risk_factor" not in agent_doc: agent_doc["risk_factor"] = None if "discipline" not in agent_doc: agent_doc["discipline"] = None if "rating" not in agent_doc: agent_doc["rating"] = None if "total_tokens" not in agent_doc: agent_doc["total_tokens"] = None if "prompt_tokens" not in agent_doc: agent_doc["prompt_tokens"] = None if "completion_tokens" not in agent_doc: agent_doc["completion_tokens"] = None result = await agents_collection.insert_one(agent_doc) agent_doc["_id"] = result.inserted_id return agent_doc async def get_agent_by_id(agent_id: str): try: return await agents_collection.find_one({"_id": ObjectId(agent_id)}) except: return None async def get_agents_by_user(user_id: str, status_filter: str = None, limit: int = None, user_email: str = None): # Build query to include both owned agents and contact person agents query_conditions = [{"created_by": user_id}] # If user email is provided, also include agents where user is the contact person if user_email: # Escape special regex characters in email and do case-insensitive exact match escaped_email = re.escape(user_email) query_conditions.append({"agent_contact_person": {"$regex": f"^{escaped_email}$", "$options": "i"}}) query = {"$or": query_conditions} if status_filter: query["agent_status"] = status_filter cursor = agents_collection.find(query).sort("created_at", -1) if limit: cursor = cursor.limit(limit) return await cursor.to_list(length=None) async def get_all_agents(status_filter: str = None, limit: int = None): query = {} if status_filter: query["agent_status"] = status_filter cursor = agents_collection.find(query).sort("created_at", -1) if limit: cursor = cursor.limit(limit) return await cursor.to_list(length=None) async def search_agents(search_term: str, user_id: str = None): """Search agents by name, description, or tags""" query = { "$or": [ {"agent_name": {"$regex": search_term, "$options": "i"}}, {"agent_description": {"$regex": search_term, "$options": "i"}}, {"agent_tags": {"$regex": search_term, "$options": "i"}}, {"agent_department": {"$regex": search_term, "$options": "i"}}, {"discipline": {"$regex": search_term, "$options": "i"}} ] } if user_id: query["created_by"] = user_id return await agents_collection.find(query).sort("created_at", -1).to_list(length=None) async def get_agent_stats(discipline_filter: str = None): """Get agent statistics""" pipeline = [] if discipline_filter: pipeline.append({"$match": {"discipline": discipline_filter}}) pipeline.append({ "$group": { "_id": "$agent_status", "count": {"$sum": 1} } }) stats = await agents_collection.aggregate(pipeline).to_list(length=None) return {stat["_id"]: stat["count"] for stat in stats} async def get_analytics_summary(status_filter: str = None, discipline_filter: str = None): """Get aggregated analytics summary across all agents""" match_filter = {} if status_filter: match_filter["agent_status"] = status_filter if discipline_filter: match_filter["discipline"] = discipline_filter pipeline = [] if match_filter: pipeline.append({"$match": match_filter}) pipeline.append({ "$group": { "_id": None, "total_messages": {"$sum": {"$ifNull": ["$total_messages", 0]}}, "total_tokens": {"$sum": {"$ifNull": ["$total_tokens", 0]}}, "prompt_tokens": {"$sum": {"$ifNull": ["$prompt_tokens", 0]}}, "completion_tokens": {"$sum": {"$ifNull": ["$completion_tokens", 0]}}, "conversation_count": {"$sum": {"$ifNull": ["$conversation_count", 0]}}, "unique_users": {"$sum": {"$ifNull": ["$unique_users", 0]}}, } }) results = await agents_collection.aggregate(pipeline).to_list(length=1) if results: r = results[0] del r["_id"] return r return { "total_messages": 0, "total_tokens": 0, "prompt_tokens": 0, "completion_tokens": 0, "conversation_count": 0, "unique_users": 0, } async def get_discipline_breakdown(status_filter: str = None): """Group agents by discipline field""" pipeline = [] if status_filter: pipeline.append({"$match": {"agent_status": status_filter}}) pipeline.append({"$group": {"_id": "$discipline", "count": {"$sum": 1}}}) results = await agents_collection.aggregate(pipeline).to_list(length=None) return {(r["_id"] or "Unassigned"): r["count"] for r in results} async def get_aggregated_usage_timeline(limit_days: int = 90, status_filter: str = None, discipline_filter: str = None): """Aggregate usage_timeline across all agents for system-wide daily totals""" match_filter = {"usage_timeline": {"$exists": True, "$ne": []}} if status_filter: match_filter["agent_status"] = status_filter if discipline_filter: match_filter["discipline"] = discipline_filter pipeline = [ {"$match": match_filter}, {"$unwind": "$usage_timeline"}, { "$group": { "_id": "$usage_timeline.date", "message_count": {"$sum": {"$ifNull": ["$usage_timeline.message_count", 0]}}, "token_count": {"$sum": {"$ifNull": ["$usage_timeline.token_count", 0]}}, } }, {"$sort": {"_id": 1}}, {"$limit": limit_days}, ] results = await agents_collection.aggregate(pipeline).to_list(length=None) return [{"date": r["_id"], "message_count": r["message_count"], "token_count": r["token_count"]} for r in results] async def get_top_agents(sort_field: str, limit: int = 5, status_filter: str = None, discipline_filter: str = None): """Return top N agents by a given numeric field""" match_filter = {sort_field: {"$exists": True, "$ne": None, "$gt": 0}} if status_filter: match_filter["agent_status"] = status_filter if discipline_filter: match_filter["discipline"] = discipline_filter pipeline = [ {"$match": match_filter}, {"$sort": {sort_field: -1}}, {"$limit": limit}, { "$project": { "agent_name": 1, "agent_status": 1, "discipline": 1, "total_messages": 1, "total_tokens": 1, "last_used": 1, } }, ] results = await agents_collection.aggregate(pipeline).to_list(length=None) for r in results: r["_id"] = str(r["_id"]) return results async def get_recently_active_agents(limit: int = 10, status_filter: str = None, discipline_filter: str = None): """Return recently active agents sorted by last_used""" match_filter = {"last_used": {"$exists": True, "$ne": None}} if status_filter: match_filter["agent_status"] = status_filter if discipline_filter: match_filter["discipline"] = discipline_filter pipeline = [ {"$match": match_filter}, {"$sort": {"last_used": -1}}, {"$limit": limit}, { "$project": { "agent_name": 1, "agent_status": 1, "discipline": 1, "total_messages": 1, "total_tokens": 1, "last_used": 1, } }, ] results = await agents_collection.aggregate(pipeline).to_list(length=None) for r in results: r["_id"] = str(r["_id"]) return results async def update_agent(agent_id: str, update_data: dict, user_id: str = None, admin_user_info: dict = None, last_edited_by: str = None): try: filter_query = {"_id": ObjectId(agent_id)} if user_id: filter_query["created_by"] = user_id # Handle Quality Audit updates specially if "quality_audit_status" in update_data and admin_user_info: # Get current agent to check if quality audit status is changing current_agent = await get_agent_by_id(agent_id) if current_agent and current_agent.get("quality_audit_status") != update_data["quality_audit_status"]: # Quality audit status is changing, update audit fields with separate timestamp update_data["quality_audit_updated_by"] = admin_user_info.get("user_id") update_data["quality_audit_updated_at"] = datetime.utcnow().isoformat() update_data["quality_audit_updated_by_name"] = admin_user_info.get("user_name", admin_user_info.get("email")) # If Quality Audit is being unchecked, clear Risk Factor if update_data["quality_audit_status"] is False: update_data["risk_factor"] = None update_data["updated_at"] = datetime.utcnow() if last_edited_by: update_data["last_edited_by"] = last_edited_by result = await agents_collection.update_one( filter_query, {"$set": update_data} ) if result.modified_count: return await get_agent_by_id(agent_id) return None except: return None async def update_agent_quality_audit(agent_id: str, quality_audit_status: bool, admin_user_info: dict): """Update only the quality audit status of an agent (admin only)""" try: # Get current agent to ensure it exists current_agent = await get_agent_by_id(agent_id) if not current_agent: return None # Only update if status is actually changing if current_agent.get("quality_audit_status") == quality_audit_status: return current_agent update_data = { "quality_audit_status": quality_audit_status, "quality_audit_updated_by": admin_user_info.get("user_id"), "quality_audit_updated_at": datetime.utcnow().isoformat(), "quality_audit_updated_by_name": admin_user_info.get("user_name", admin_user_info.get("email")) } result = await agents_collection.update_one( {"_id": ObjectId(agent_id)}, {"$set": update_data} ) if result.modified_count: return await get_agent_by_id(agent_id) return None except: return None async def delete_agent(agent_id: str, user_id: str = None): try: print(f"๐Ÿ—‘๏ธ CRUD: Deleting agent {agent_id} with user_id filter: {user_id}") filter_query = {"_id": ObjectId(agent_id)} if user_id: filter_query["created_by"] = user_id print(f"๐Ÿ—‘๏ธ CRUD: Filter query: {filter_query}") result = await agents_collection.delete_one(filter_query) print(f"๐Ÿ—‘๏ธ CRUD: Delete result - deleted_count: {result.deleted_count}") return result.deleted_count > 0 except Exception as e: print(f"๐Ÿ—‘๏ธ CRUD: Exception during delete: {e}") return False async def get_user_by_id(user_id: str): try: return await users_collection.find_one({"_id": ObjectId(user_id)}) except: return None async def get_all_users(): return await users_collection.find({}).to_list(length=None) async def update_user(user_id: str, update_data: dict): try: update_data["updated_at"] = datetime.utcnow() result = await users_collection.update_one( {"_id": ObjectId(user_id)}, {"$set": update_data} ) if result.modified_count: return await get_user_by_id(user_id) return None except: return None async def delete_user(user_id: str): try: result = await users_collection.delete_one({"_id": ObjectId(user_id)}) return result.deleted_count > 0 except: return False async def admin_create_local_user(email: str, password: str, full_name: str = None, is_admin: bool = False): """Create a new local user (admin only)""" # Check if user already exists existing = await get_user_by_email(email) if existing: raise ValueError("User with this email already exists") # Validate password length if len(password) < 8: raise ValueError("Password must be at least 8 characters") return await create_user( email=email, password=password, full_name=full_name, is_admin=is_admin, auth_provider="local" ) async def admin_reset_user_password(user_id: str, new_password: str): """Reset password for a local user (admin only)""" user = await get_user_by_id(user_id) if not user: raise ValueError("User not found") if user.get("auth_provider") != "local": raise ValueError("Cannot reset password for SSO users") if len(new_password) < 8: raise ValueError("Password must be at least 8 characters") hashed = hash_password(new_password) result = await users_collection.update_one( {"_id": ObjectId(user_id)}, {"$set": {"hashed_password": hashed, "updated_at": datetime.utcnow()}} ) return result.modified_count > 0 async def change_user_password(user_id: str, current_password: str, new_password: str): """Change password for current user (requires current password verification)""" user = await get_user_by_id(user_id) if not user: raise ValueError("User not found") if user.get("auth_provider") != "local": raise ValueError("Cannot change password for SSO users") if not user.get("hashed_password"): raise ValueError("User has no password set") # Verify current password if not verify_password(current_password, user["hashed_password"]): raise ValueError("Current password is incorrect") if len(new_password) < 8: raise ValueError("New password must be at least 8 characters") hashed = hash_password(new_password) result = await users_collection.update_one( {"_id": ObjectId(user_id)}, {"$set": {"hashed_password": hashed, "updated_at": datetime.utcnow()}} ) return result.modified_count > 0 async def get_agent_by_name(agent_name: str): """Get agent by exact name match globally""" return await agents_collection.find_one({"agent_name": agent_name}) def _agent_data_differs(existing_agent: dict, new_agent_data: dict) -> bool: """Compare agent data excluding name and metadata fields to detect differences""" comparable_fields = [ "agent_description", "agent_purpose", "agent_version", "agent_status", "agent_location", "agent_department", "agent_contact_person", "agent_tags", "agent_userbase", "agent_capabilities", "agent_metadata", "url", "discipline" ] for field in comparable_fields: existing_value = existing_agent.get(field) new_value = new_agent_data.get(field) if existing_value != new_value: return True return False async def create_agent_usage_record(agent_name: str, agent_data: dict): """Create usage record for existing agent and update main record if data differs""" now = datetime.utcnow() usage_doc = { "agent_name": agent_name, "agent_data": agent_data, "timestamp": now, "created_at": now } try: result = await agent_usage_collection.insert_one(usage_doc) existing_agent = await get_agent_by_name(agent_name) if existing_agent and _agent_data_differs(existing_agent, agent_data): update_data = {k: v for k, v in agent_data.items() if k != "agent_name"} update_data["updated_at"] = now await agents_collection.update_one( {"agent_name": agent_name}, {"$set": update_data} ) return result.inserted_id except Exception as e: raise Exception(f"Failed to store agent usage data: {str(e)}") async def get_agent_usage_stats(agent_name: str, start_date: datetime = None, end_date: datetime = None): """Get usage statistics for an agent within date range""" query = {"agent_name": agent_name} if start_date or end_date: date_filter = {} if start_date: date_filter["$gte"] = start_date if end_date: date_filter["$lte"] = end_date query["timestamp"] = date_filter # Get total count total_count = await agent_usage_collection.count_documents(query) # Get first and last usage first_usage = await agent_usage_collection.find_one(query, sort=[("timestamp", 1)]) last_usage = await agent_usage_collection.find_one(query, sort=[("timestamp", -1)]) return { "total_usage_count": total_count, "first_usage": first_usage["timestamp"] if first_usage else None, "last_usage": last_usage["timestamp"] if last_usage else None } async def get_agent_usage_by_period(agent_name: str, period: str = "daily", start_date: datetime = None, end_date: datetime = None): """Get usage data grouped by time period (daily, weekly, monthly)""" query = {"agent_name": agent_name} if start_date or end_date: date_filter = {} if start_date: date_filter["$gte"] = start_date if end_date: date_filter["$lte"] = end_date query["timestamp"] = date_filter # Define grouping format based on period if period == "daily": date_format = "%Y-%m-%d" elif period == "weekly": date_format = "%Y-W%U" elif period == "monthly": date_format = "%Y-%m" else: date_format = "%Y-%m-%d" pipeline = [ {"$match": query}, { "$group": { "_id": {"$dateToString": {"format": date_format, "date": "$timestamp"}}, "count": {"$sum": 1} } }, {"$sort": {"_id": 1}} ] results = await agent_usage_collection.aggregate(pipeline).to_list(length=None) return {result["_id"]: result["count"] for result in results} async def create_agent_from_collector(agent_data: dict): """Create agent from agent collector API data (no user ownership) Automatically stores all fields including optional usage tracking fields: - usage_timeline: Array of {date, message_count} objects - conversation_count, unique_users, total_messages: Integer metrics - first_used, last_used: ISO datetime strings """ now = datetime.utcnow() # Set automatic timestamps if not provided if not agent_data.get("agent_created_at"): agent_data["agent_created_at"] = now.isoformat() if not agent_data.get("agent_updated_at"): agent_data["agent_updated_at"] = now.isoformat() agent_doc = { **agent_data, # Includes all fields including usage tracking fields if present "created_by": "agent_collector_api", # Special marker for agent collector created agents "created_at": now, "updated_at": now, } # Collector-sourced agents always start incomplete โ€” they're missing the new # governance fields (business_entity, autonomy_level, ip_ownership, declarations, # etc.) that only the form / completion flow can populate. Owner gets nudged via # the completion-reminder email until they finish registration. agent_doc.setdefault("registration_complete", False) try: result = await agents_collection.insert_one(agent_doc) agent_doc["_id"] = result.inserted_id return agent_doc except Exception as e: raise Exception(f"Failed to store agent data: {str(e)}") # Per-user rating functions async def upsert_agent_rating(agent_id: str, user_id: str, rating: float): """Insert or update a user's individual rating for an agent""" now = datetime.utcnow() result = await agent_ratings_collection.update_one( {"agent_id": agent_id, "user_id": user_id}, { "$set": {"rating": rating, "updated_at": now}, "$setOnInsert": {"created_at": now} }, upsert=True ) return result async def get_agent_rating_stats(agent_id: str): """Get average rating and count for an agent via aggregation""" pipeline = [ {"$match": {"agent_id": agent_id}}, {"$group": { "_id": None, "avg_rating": {"$avg": "$rating"}, "count": {"$sum": 1} }} ] results = await agent_ratings_collection.aggregate(pipeline).to_list(length=1) if results: return { "avg_rating": round(results[0]["avg_rating"], 1), "count": results[0]["count"] } return {"avg_rating": None, "count": 0} async def get_user_rating_for_agent(agent_id: str, user_id: str): """Fetch current user's own rating for an agent""" doc = await agent_ratings_collection.find_one( {"agent_id": agent_id, "user_id": user_id} ) return doc["rating"] if doc else None async def update_agent_average_rating(agent_id: str): """Recalculate and store average rating on the agent document""" stats = await get_agent_rating_stats(agent_id) update_data = { "rating": stats["avg_rating"], "rating_count": stats["count"], "updated_at": datetime.utcnow() } await agents_collection.update_one( {"_id": ObjectId(agent_id)}, {"$set": update_data} ) return stats async def get_incomplete_agents_for_user(user_email: str, user_id: str, is_admin: bool = False): """Return agents needing the new registration form completed. Owner resolution mirrors the completion-flow design (PLAN ยง5b): - direct ownership (`created_by == user_id`), or - LibreChat-synced ownership (`created_by == "agent_collector_api"` AND `agent_contact_person == user_email`). Admins see all incomplete agents. """ incomplete_filter = {"registration_complete": {"$ne": True}} if is_admin: query = incomplete_filter else: ownership = {"$or": [ {"created_by": user_id}, {"$and": [ {"created_by": "agent_collector_api"}, {"agent_contact_person": user_email}, ]}, ]} query = {"$and": [incomplete_filter, ownership]} cursor = agents_collection.find(query).sort("created_at", -1) return await cursor.to_list(length=None) async def get_unresolved_owner_agents(): """Return collector-created agents whose `agent_contact_person` doesn't match any active user. These are the ~5% of LibreChat-synced agents that won't get a completion-reminder email and need an admin to either reassign ownership or delete them. """ # All collector-created agents cursor = agents_collection.find( {"created_by": "agent_collector_api"}, {"agent_name": 1, "agent_contact_person": 1, "registration_complete": 1, "created_at": 1, "_id": 1} ) candidates = await cursor.to_list(length=None) if not candidates: return [] # Resolve which contact emails belong to actual active users (case-insensitive). contact_emails = {(a.get("agent_contact_person") or "").strip().lower() for a in candidates} contact_emails.discard("") user_cursor = users_collection.find( {"is_active": True, "email": {"$ne": None}}, {"email": 1}, ) known_user_emails = set() async for u in user_cursor: e = (u.get("email") or "").strip().lower() if e: known_user_emails.add(e) unresolved = [] for a in candidates: email = (a.get("agent_contact_person") or "").strip().lower() if not email or email not in known_user_emails: unresolved.append(a) return unresolved async def reassign_agent_owner(agent_id: str, new_owner_user_id: str, new_contact_email: str): """Admin tool: reassign a collector-created agent to a real user. Sets `created_by` to the user's ID and `agent_contact_person` to their email, so they pick up the completion-reminder flow on the next run. """ update = {"updated_at": datetime.utcnow()} if new_owner_user_id: update["created_by"] = new_owner_user_id if new_contact_email: update["agent_contact_person"] = new_contact_email await agents_collection.update_one( {"_id": ObjectId(agent_id)}, {"$set": update}, ) return await agents_collection.find_one({"_id": ObjectId(agent_id)}) async def complete_agent_registration( agent_id: str, completion_data: dict, completing_user_id: str, completing_user_email: str, ): """Apply registration-form completion to an agent. - Writes all completion fields onto the agent document. - Sets `registration_complete = True`. - If the agent was originally created by the collector marker, reassigns `created_by` to the submitting user so they can manage it normally afterwards. """ existing = await agents_collection.find_one({"_id": ObjectId(agent_id)}) if not existing: raise ValueError(f"Agent {agent_id} not found") update_fields = { **completion_data, "registration_complete": True, "last_edited_by": completing_user_email, "updated_at": datetime.utcnow(), } # Reassign ownership from the collector marker to the user who finished registration. if existing.get("created_by") == "agent_collector_api": update_fields["created_by"] = completing_user_id await agents_collection.update_one( {"_id": ObjectId(agent_id)}, {"$set": update_fields}, ) return await agents_collection.find_one({"_id": ObjectId(agent_id)}) async def migrate_pencil_agents_discipline(): """Update agents with 'pencil' in name that have no discipline set to 'Pencil Agents'""" result = await agents_collection.update_many( { "agent_name": {"$regex": "pencil", "$options": "i"}, "$or": [ {"discipline": None}, {"discipline": ""}, {"discipline": {"$exists": False}} ] }, {"$set": {"discipline": "Pencil Agents"}} ) return result.modified_count async def migrate_optimisation_spelling(): """Rename discipline from 'Optimization' (US) to 'Optimisation' (UK).""" result = await agents_collection.update_many( {"discipline": "Optimization"}, {"$set": {"discipline": "Optimisation"}} ) return result.modified_count async def backfill_registration_complete(): """One-time backfill of `registration_complete` for agents that pre-date the field. Form-registered agents are grandfathered as complete (existing fields are valid under the legacy schema). Collector-created agents are marked incomplete so they enter the completion-reminder flow. """ form_result = await agents_collection.update_many( { "registration_complete": {"$exists": False}, "created_by": {"$ne": "agent_collector_api"}, }, {"$set": {"registration_complete": True}}, ) collector_result = await agents_collection.update_many( { "registration_complete": {"$exists": False}, "created_by": "agent_collector_api", }, {"$set": {"registration_complete": False}}, ) return { "form_complete": form_result.modified_count, "collector_incomplete": collector_result.modified_count, }