agent_tracker/crud.py
nickviljoen 54ecd31bdd Add registration form redesign with completion flow for LibreChat-synced agents
Rebuilds the agent registration form into 7 governance sections (Identity,
Classification, Autonomy, IP, Tech Stack, Data Safety, Performance, Declarations)
and introduces a completion flow for agents that come in via the LibreChat
collector without the new required fields.

- New form fields: business_entity, client_scope, agent_classification,
  autonomy_level, ip_ownership, foundation_model, validated_by/date, evals_method,
  plus nested safety / pii / declarations objects and a registration_complete flag.
- registration_complete defaults to true for form-submitted agents and false for
  collector-created ones; existing agents are grandfathered via a startup migration.
- Owner-by-email lookup so LibreChat-synced agents surface in the user's "My Agents"
  view with an Incomplete badge and Complete CTA. Submitting the completion form
  reassigns created_by from the collector marker to the user.
- Daily APScheduler job sends a digest reminder email per owner with a 7-day
  cooldown and 4-nudge cap (configurable). Manual trigger via
  POST /api/admin/completion-reminders/send.
- Admin banner + modal for collector agents whose contact email doesn't match an
  active user, with one-click reassignment.
- Gemini audit extended to also return agent_classification and an autonomy hint;
  applied to agents on next batch run alongside discipline/department.
- New filter dimensions on agent management + admin dashboard: Business Entity,
  Agent Type, Autonomy, plus a Compliance Risks quick toggle.
- CSV export/import gains 21 columns covering all governance fields.
- Discipline 'Optimization' renamed to 'Optimisation' with idempotent startup
  migration; Gemini system prompt and template dropdowns updated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 18:26:55 +02:00

897 lines
33 KiB
Python

from datetime import datetime, timedelta
from bson import ObjectId
import database
from database import users_collection, agents_collection, agent_usage_collection, agent_ratings_collection
import auth
from auth import hash_password, verify_password
import re
async def get_user_by_email(email: str):
return await users_collection.find_one({"email": email})
async def get_user_by_azure_ad_id(azure_ad_id: str):
"""Get user by Azure AD Object ID"""
return await users_collection.find_one({"azure_ad_id": azure_ad_id})
async def create_user(email: str, password: str = None, full_name: str = None, is_admin: bool = False, auth_provider: str = "local"):
now = datetime.utcnow()
user_doc = {
"email": email,
"full_name": full_name,
"is_active": True,
"is_admin": is_admin,
"auth_provider": auth_provider,
"created_at": now,
"updated_at": now,
}
# Only add hashed_password for local auth users
if auth_provider == "local" and password:
user_doc["hashed_password"] = hash_password(password)
result = await users_collection.insert_one(user_doc)
user_doc["_id"] = result.inserted_id
return user_doc
async def create_or_update_azure_user(azure_profile: dict):
"""
Create or update user from Azure AD profile
Returns the user document
"""
azure_ad_id = azure_profile.get("azure_ad_id")
email = azure_profile.get("email")
if not azure_ad_id or not email:
raise ValueError("Azure AD profile missing required fields")
now = datetime.utcnow()
# Check if user exists by Azure AD ID first, then by email
existing_user = await get_user_by_azure_ad_id(azure_ad_id)
if not existing_user:
existing_user = await get_user_by_email(email)
if existing_user:
# Update existing user with Azure AD info
update_data = {
"azure_ad_id": azure_ad_id,
"email": email,
"full_name": azure_profile.get("full_name") or existing_user.get("full_name"),
"auth_provider": "azure_ad",
"updated_at": now,
}
await users_collection.update_one(
{"_id": existing_user["_id"]},
{"$set": update_data}
)
# Return updated user
return await users_collection.find_one({"_id": existing_user["_id"]})
else:
# Create new user from Azure AD profile
user_doc = {
"azure_ad_id": azure_ad_id,
"email": email,
"full_name": azure_profile.get("full_name"),
"is_active": True,
"is_admin": False, # New users are not admin by default
"auth_provider": "azure_ad",
"created_at": now,
"updated_at": now,
}
result = await users_collection.insert_one(user_doc)
user_doc["_id"] = result.inserted_id
return user_doc
async def authenticate_user(email: str, password: str):
"""Authenticate user with local credentials (fallback method)"""
print(f"🔐 CRUD: Authenticating user {email}")
user = await get_user_by_email(email)
if not user:
print(f"🔐 CRUD: User {email} not found in database")
return None
print(f"🔐 CRUD: User found - auth_provider: {user.get('auth_provider')}")
print(f"🔐 CRUD: Has hashed_password: {'hashed_password' in user}")
# Only authenticate local users with password
if user.get("auth_provider") != "local":
print(f"🔐 CRUD: User is not local auth provider: {user.get('auth_provider')}")
return None
if not user.get("hashed_password"):
print("🔐 CRUD: User has no hashed_password field")
return None
print("🔐 CRUD: Verifying password...")
password_valid = verify_password(password, user["hashed_password"])
print(f"🔐 CRUD: Password verification result: {password_valid}")
if not password_valid:
return None
print("🔐 CRUD: Authentication successful!")
return user
# Agent CRUD operations
async def create_agent(agent_data: dict, user_id: str, skip_time_check: bool = False):
# Check for duplicate agent names from the same user created recently (within 5 minutes)
if not skip_time_check:
from datetime import timedelta
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
existing_agent = await agents_collection.find_one({
"agent_name": agent_data.get("agent_name"),
"created_by": user_id,
"created_at": {"$gte": five_minutes_ago}
})
if existing_agent:
raise ValueError(f"Agent with name '{agent_data.get('agent_name')}' was already created recently. Please wait before creating another agent with the same name.")
now = datetime.utcnow()
agent_doc = {
**agent_data,
"created_by": user_id,
"created_at": now,
"updated_at": now,
}
# Initialize quality audit fields if not provided
if "quality_audit_status" not in agent_doc:
agent_doc["quality_audit_status"] = False
if "quality_audit_updated_by" not in agent_doc:
agent_doc["quality_audit_updated_by"] = None
if "quality_audit_updated_at" not in agent_doc:
agent_doc["quality_audit_updated_at"] = None
if "quality_audit_updated_by_name" not in agent_doc:
agent_doc["quality_audit_updated_by_name"] = None
if "risk_factor" not in agent_doc:
agent_doc["risk_factor"] = None
if "discipline" not in agent_doc:
agent_doc["discipline"] = None
if "rating" not in agent_doc:
agent_doc["rating"] = None
if "total_tokens" not in agent_doc:
agent_doc["total_tokens"] = None
if "prompt_tokens" not in agent_doc:
agent_doc["prompt_tokens"] = None
if "completion_tokens" not in agent_doc:
agent_doc["completion_tokens"] = None
result = await agents_collection.insert_one(agent_doc)
agent_doc["_id"] = result.inserted_id
return agent_doc
async def get_agent_by_id(agent_id: str):
try:
return await agents_collection.find_one({"_id": ObjectId(agent_id)})
except:
return None
async def get_agents_by_user(user_id: str, status_filter: str = None, limit: int = None, user_email: str = None):
# Build query to include both owned agents and contact person agents
query_conditions = [{"created_by": user_id}]
# If user email is provided, also include agents where user is the contact person
if user_email:
# Escape special regex characters in email and do case-insensitive exact match
escaped_email = re.escape(user_email)
query_conditions.append({"agent_contact_person": {"$regex": f"^{escaped_email}$", "$options": "i"}})
query = {"$or": query_conditions}
if status_filter:
query["agent_status"] = status_filter
cursor = agents_collection.find(query).sort("created_at", -1)
if limit:
cursor = cursor.limit(limit)
return await cursor.to_list(length=None)
async def get_all_agents(status_filter: str = None, limit: int = None):
query = {}
if status_filter:
query["agent_status"] = status_filter
cursor = agents_collection.find(query).sort("created_at", -1)
if limit:
cursor = cursor.limit(limit)
return await cursor.to_list(length=None)
async def search_agents(search_term: str, user_id: str = None):
"""Search agents by name, description, or tags"""
query = {
"$or": [
{"agent_name": {"$regex": search_term, "$options": "i"}},
{"agent_description": {"$regex": search_term, "$options": "i"}},
{"agent_tags": {"$regex": search_term, "$options": "i"}},
{"agent_department": {"$regex": search_term, "$options": "i"}},
{"discipline": {"$regex": search_term, "$options": "i"}}
]
}
if user_id:
query["created_by"] = user_id
return await agents_collection.find(query).sort("created_at", -1).to_list(length=None)
async def get_agent_stats(discipline_filter: str = None):
"""Get agent statistics"""
pipeline = []
if discipline_filter:
pipeline.append({"$match": {"discipline": discipline_filter}})
pipeline.append({
"$group": {
"_id": "$agent_status",
"count": {"$sum": 1}
}
})
stats = await agents_collection.aggregate(pipeline).to_list(length=None)
return {stat["_id"]: stat["count"] for stat in stats}
async def get_analytics_summary(status_filter: str = None, discipline_filter: str = None):
"""Get aggregated analytics summary across all agents"""
match_filter = {}
if status_filter:
match_filter["agent_status"] = status_filter
if discipline_filter:
match_filter["discipline"] = discipline_filter
pipeline = []
if match_filter:
pipeline.append({"$match": match_filter})
pipeline.append({
"$group": {
"_id": None,
"total_messages": {"$sum": {"$ifNull": ["$total_messages", 0]}},
"total_tokens": {"$sum": {"$ifNull": ["$total_tokens", 0]}},
"prompt_tokens": {"$sum": {"$ifNull": ["$prompt_tokens", 0]}},
"completion_tokens": {"$sum": {"$ifNull": ["$completion_tokens", 0]}},
"conversation_count": {"$sum": {"$ifNull": ["$conversation_count", 0]}},
"unique_users": {"$sum": {"$ifNull": ["$unique_users", 0]}},
}
})
results = await agents_collection.aggregate(pipeline).to_list(length=1)
if results:
r = results[0]
del r["_id"]
return r
return {
"total_messages": 0, "total_tokens": 0, "prompt_tokens": 0,
"completion_tokens": 0, "conversation_count": 0, "unique_users": 0,
}
async def get_discipline_breakdown(status_filter: str = None):
"""Group agents by discipline field"""
pipeline = []
if status_filter:
pipeline.append({"$match": {"agent_status": status_filter}})
pipeline.append({"$group": {"_id": "$discipline", "count": {"$sum": 1}}})
results = await agents_collection.aggregate(pipeline).to_list(length=None)
return {(r["_id"] or "Unassigned"): r["count"] for r in results}
async def get_aggregated_usage_timeline(limit_days: int = 90, status_filter: str = None, discipline_filter: str = None):
"""Aggregate usage_timeline across all agents for system-wide daily totals"""
match_filter = {"usage_timeline": {"$exists": True, "$ne": []}}
if status_filter:
match_filter["agent_status"] = status_filter
if discipline_filter:
match_filter["discipline"] = discipline_filter
pipeline = [
{"$match": match_filter},
{"$unwind": "$usage_timeline"},
{
"$group": {
"_id": "$usage_timeline.date",
"message_count": {"$sum": {"$ifNull": ["$usage_timeline.message_count", 0]}},
"token_count": {"$sum": {"$ifNull": ["$usage_timeline.token_count", 0]}},
}
},
{"$sort": {"_id": 1}},
{"$limit": limit_days},
]
results = await agents_collection.aggregate(pipeline).to_list(length=None)
return [{"date": r["_id"], "message_count": r["message_count"], "token_count": r["token_count"]} for r in results]
async def get_top_agents(sort_field: str, limit: int = 5, status_filter: str = None, discipline_filter: str = None):
"""Return top N agents by a given numeric field"""
match_filter = {sort_field: {"$exists": True, "$ne": None, "$gt": 0}}
if status_filter:
match_filter["agent_status"] = status_filter
if discipline_filter:
match_filter["discipline"] = discipline_filter
pipeline = [
{"$match": match_filter},
{"$sort": {sort_field: -1}},
{"$limit": limit},
{
"$project": {
"agent_name": 1,
"agent_status": 1,
"discipline": 1,
"total_messages": 1,
"total_tokens": 1,
"last_used": 1,
}
},
]
results = await agents_collection.aggregate(pipeline).to_list(length=None)
for r in results:
r["_id"] = str(r["_id"])
return results
async def get_recently_active_agents(limit: int = 10, status_filter: str = None, discipline_filter: str = None):
"""Return recently active agents sorted by last_used"""
match_filter = {"last_used": {"$exists": True, "$ne": None}}
if status_filter:
match_filter["agent_status"] = status_filter
if discipline_filter:
match_filter["discipline"] = discipline_filter
pipeline = [
{"$match": match_filter},
{"$sort": {"last_used": -1}},
{"$limit": limit},
{
"$project": {
"agent_name": 1,
"agent_status": 1,
"discipline": 1,
"total_messages": 1,
"total_tokens": 1,
"last_used": 1,
}
},
]
results = await agents_collection.aggregate(pipeline).to_list(length=None)
for r in results:
r["_id"] = str(r["_id"])
return results
async def update_agent(agent_id: str, update_data: dict, user_id: str = None, admin_user_info: dict = None, last_edited_by: str = None):
try:
filter_query = {"_id": ObjectId(agent_id)}
if user_id:
filter_query["created_by"] = user_id
# Handle Quality Audit updates specially
if "quality_audit_status" in update_data and admin_user_info:
# Get current agent to check if quality audit status is changing
current_agent = await get_agent_by_id(agent_id)
if current_agent and current_agent.get("quality_audit_status") != update_data["quality_audit_status"]:
# Quality audit status is changing, update audit fields with separate timestamp
update_data["quality_audit_updated_by"] = admin_user_info.get("user_id")
update_data["quality_audit_updated_at"] = datetime.utcnow().isoformat()
update_data["quality_audit_updated_by_name"] = admin_user_info.get("user_name", admin_user_info.get("email"))
# If Quality Audit is being unchecked, clear Risk Factor
if update_data["quality_audit_status"] is False:
update_data["risk_factor"] = None
update_data["updated_at"] = datetime.utcnow()
if last_edited_by:
update_data["last_edited_by"] = last_edited_by
result = await agents_collection.update_one(
filter_query,
{"$set": update_data}
)
if result.modified_count:
return await get_agent_by_id(agent_id)
return None
except:
return None
async def update_agent_quality_audit(agent_id: str, quality_audit_status: bool, admin_user_info: dict):
"""Update only the quality audit status of an agent (admin only)"""
try:
# Get current agent to ensure it exists
current_agent = await get_agent_by_id(agent_id)
if not current_agent:
return None
# Only update if status is actually changing
if current_agent.get("quality_audit_status") == quality_audit_status:
return current_agent
update_data = {
"quality_audit_status": quality_audit_status,
"quality_audit_updated_by": admin_user_info.get("user_id"),
"quality_audit_updated_at": datetime.utcnow().isoformat(),
"quality_audit_updated_by_name": admin_user_info.get("user_name", admin_user_info.get("email"))
}
result = await agents_collection.update_one(
{"_id": ObjectId(agent_id)},
{"$set": update_data}
)
if result.modified_count:
return await get_agent_by_id(agent_id)
return None
except:
return None
async def delete_agent(agent_id: str, user_id: str = None):
try:
print(f"🗑️ CRUD: Deleting agent {agent_id} with user_id filter: {user_id}")
filter_query = {"_id": ObjectId(agent_id)}
if user_id:
filter_query["created_by"] = user_id
print(f"🗑️ CRUD: Filter query: {filter_query}")
result = await agents_collection.delete_one(filter_query)
print(f"🗑️ CRUD: Delete result - deleted_count: {result.deleted_count}")
return result.deleted_count > 0
except Exception as e:
print(f"🗑️ CRUD: Exception during delete: {e}")
return False
async def get_user_by_id(user_id: str):
try:
return await users_collection.find_one({"_id": ObjectId(user_id)})
except:
return None
async def get_all_users():
return await users_collection.find({}).to_list(length=None)
async def update_user(user_id: str, update_data: dict):
try:
update_data["updated_at"] = datetime.utcnow()
result = await users_collection.update_one(
{"_id": ObjectId(user_id)},
{"$set": update_data}
)
if result.modified_count:
return await get_user_by_id(user_id)
return None
except:
return None
async def delete_user(user_id: str):
try:
result = await users_collection.delete_one({"_id": ObjectId(user_id)})
return result.deleted_count > 0
except:
return False
async def admin_create_local_user(email: str, password: str, full_name: str = None, is_admin: bool = False):
"""Create a new local user (admin only)"""
# Check if user already exists
existing = await get_user_by_email(email)
if existing:
raise ValueError("User with this email already exists")
# Validate password length
if len(password) < 8:
raise ValueError("Password must be at least 8 characters")
return await create_user(
email=email,
password=password,
full_name=full_name,
is_admin=is_admin,
auth_provider="local"
)
async def admin_reset_user_password(user_id: str, new_password: str):
"""Reset password for a local user (admin only)"""
user = await get_user_by_id(user_id)
if not user:
raise ValueError("User not found")
if user.get("auth_provider") != "local":
raise ValueError("Cannot reset password for SSO users")
if len(new_password) < 8:
raise ValueError("Password must be at least 8 characters")
hashed = hash_password(new_password)
result = await users_collection.update_one(
{"_id": ObjectId(user_id)},
{"$set": {"hashed_password": hashed, "updated_at": datetime.utcnow()}}
)
return result.modified_count > 0
async def change_user_password(user_id: str, current_password: str, new_password: str):
"""Change password for current user (requires current password verification)"""
user = await get_user_by_id(user_id)
if not user:
raise ValueError("User not found")
if user.get("auth_provider") != "local":
raise ValueError("Cannot change password for SSO users")
if not user.get("hashed_password"):
raise ValueError("User has no password set")
# Verify current password
if not verify_password(current_password, user["hashed_password"]):
raise ValueError("Current password is incorrect")
if len(new_password) < 8:
raise ValueError("New password must be at least 8 characters")
hashed = hash_password(new_password)
result = await users_collection.update_one(
{"_id": ObjectId(user_id)},
{"$set": {"hashed_password": hashed, "updated_at": datetime.utcnow()}}
)
return result.modified_count > 0
async def get_agent_by_name(agent_name: str):
"""Get agent by exact name match globally"""
return await agents_collection.find_one({"agent_name": agent_name})
def _agent_data_differs(existing_agent: dict, new_agent_data: dict) -> bool:
"""Compare agent data excluding name and metadata fields to detect differences"""
comparable_fields = [
"agent_description", "agent_purpose", "agent_version", "agent_status",
"agent_location", "agent_department", "agent_contact_person",
"agent_tags", "agent_userbase", "agent_capabilities", "agent_metadata",
"url", "discipline"
]
for field in comparable_fields:
existing_value = existing_agent.get(field)
new_value = new_agent_data.get(field)
if existing_value != new_value:
return True
return False
async def create_agent_usage_record(agent_name: str, agent_data: dict):
"""Create usage record for existing agent and update main record if data differs"""
now = datetime.utcnow()
usage_doc = {
"agent_name": agent_name,
"agent_data": agent_data,
"timestamp": now,
"created_at": now
}
try:
result = await agent_usage_collection.insert_one(usage_doc)
existing_agent = await get_agent_by_name(agent_name)
if existing_agent and _agent_data_differs(existing_agent, agent_data):
update_data = {k: v for k, v in agent_data.items() if k != "agent_name"}
update_data["updated_at"] = now
await agents_collection.update_one(
{"agent_name": agent_name},
{"$set": update_data}
)
return result.inserted_id
except Exception as e:
raise Exception(f"Failed to store agent usage data: {str(e)}")
async def get_agent_usage_stats(agent_name: str, start_date: datetime = None, end_date: datetime = None):
"""Get usage statistics for an agent within date range"""
query = {"agent_name": agent_name}
if start_date or end_date:
date_filter = {}
if start_date:
date_filter["$gte"] = start_date
if end_date:
date_filter["$lte"] = end_date
query["timestamp"] = date_filter
# Get total count
total_count = await agent_usage_collection.count_documents(query)
# Get first and last usage
first_usage = await agent_usage_collection.find_one(query, sort=[("timestamp", 1)])
last_usage = await agent_usage_collection.find_one(query, sort=[("timestamp", -1)])
return {
"total_usage_count": total_count,
"first_usage": first_usage["timestamp"] if first_usage else None,
"last_usage": last_usage["timestamp"] if last_usage else None
}
async def get_agent_usage_by_period(agent_name: str, period: str = "daily", start_date: datetime = None, end_date: datetime = None):
"""Get usage data grouped by time period (daily, weekly, monthly)"""
query = {"agent_name": agent_name}
if start_date or end_date:
date_filter = {}
if start_date:
date_filter["$gte"] = start_date
if end_date:
date_filter["$lte"] = end_date
query["timestamp"] = date_filter
# Define grouping format based on period
if period == "daily":
date_format = "%Y-%m-%d"
elif period == "weekly":
date_format = "%Y-W%U"
elif period == "monthly":
date_format = "%Y-%m"
else:
date_format = "%Y-%m-%d"
pipeline = [
{"$match": query},
{
"$group": {
"_id": {"$dateToString": {"format": date_format, "date": "$timestamp"}},
"count": {"$sum": 1}
}
},
{"$sort": {"_id": 1}}
]
results = await agent_usage_collection.aggregate(pipeline).to_list(length=None)
return {result["_id"]: result["count"] for result in results}
async def create_agent_from_collector(agent_data: dict):
"""Create agent from agent collector API data (no user ownership)
Automatically stores all fields including optional usage tracking fields:
- usage_timeline: Array of {date, message_count} objects
- conversation_count, unique_users, total_messages: Integer metrics
- first_used, last_used: ISO datetime strings
"""
now = datetime.utcnow()
# Set automatic timestamps if not provided
if not agent_data.get("agent_created_at"):
agent_data["agent_created_at"] = now.isoformat()
if not agent_data.get("agent_updated_at"):
agent_data["agent_updated_at"] = now.isoformat()
agent_doc = {
**agent_data, # Includes all fields including usage tracking fields if present
"created_by": "agent_collector_api", # Special marker for agent collector created agents
"created_at": now,
"updated_at": now,
}
# Collector-sourced agents always start incomplete — they're missing the new
# governance fields (business_entity, autonomy_level, ip_ownership, declarations,
# etc.) that only the form / completion flow can populate. Owner gets nudged via
# the completion-reminder email until they finish registration.
agent_doc.setdefault("registration_complete", False)
try:
result = await agents_collection.insert_one(agent_doc)
agent_doc["_id"] = result.inserted_id
return agent_doc
except Exception as e:
raise Exception(f"Failed to store agent data: {str(e)}")
# Per-user rating functions
async def upsert_agent_rating(agent_id: str, user_id: str, rating: float):
"""Insert or update a user's individual rating for an agent"""
now = datetime.utcnow()
result = await agent_ratings_collection.update_one(
{"agent_id": agent_id, "user_id": user_id},
{
"$set": {"rating": rating, "updated_at": now},
"$setOnInsert": {"created_at": now}
},
upsert=True
)
return result
async def get_agent_rating_stats(agent_id: str):
"""Get average rating and count for an agent via aggregation"""
pipeline = [
{"$match": {"agent_id": agent_id}},
{"$group": {
"_id": None,
"avg_rating": {"$avg": "$rating"},
"count": {"$sum": 1}
}}
]
results = await agent_ratings_collection.aggregate(pipeline).to_list(length=1)
if results:
return {
"avg_rating": round(results[0]["avg_rating"], 1),
"count": results[0]["count"]
}
return {"avg_rating": None, "count": 0}
async def get_user_rating_for_agent(agent_id: str, user_id: str):
"""Fetch current user's own rating for an agent"""
doc = await agent_ratings_collection.find_one(
{"agent_id": agent_id, "user_id": user_id}
)
return doc["rating"] if doc else None
async def update_agent_average_rating(agent_id: str):
"""Recalculate and store average rating on the agent document"""
stats = await get_agent_rating_stats(agent_id)
update_data = {
"rating": stats["avg_rating"],
"rating_count": stats["count"],
"updated_at": datetime.utcnow()
}
await agents_collection.update_one(
{"_id": ObjectId(agent_id)},
{"$set": update_data}
)
return stats
async def get_incomplete_agents_for_user(user_email: str, user_id: str, is_admin: bool = False):
"""Return agents needing the new registration form completed.
Owner resolution mirrors the completion-flow design (PLAN §5b):
- direct ownership (`created_by == user_id`), or
- LibreChat-synced ownership (`created_by == "agent_collector_api"` AND
`agent_contact_person == user_email`).
Admins see all incomplete agents.
"""
incomplete_filter = {"registration_complete": {"$ne": True}}
if is_admin:
query = incomplete_filter
else:
ownership = {"$or": [
{"created_by": user_id},
{"$and": [
{"created_by": "agent_collector_api"},
{"agent_contact_person": user_email},
]},
]}
query = {"$and": [incomplete_filter, ownership]}
cursor = agents_collection.find(query).sort("created_at", -1)
return await cursor.to_list(length=None)
async def get_unresolved_owner_agents():
"""Return collector-created agents whose `agent_contact_person` doesn't match any active user.
These are the ~5% of LibreChat-synced agents that won't get a completion-reminder
email and need an admin to either reassign ownership or delete them.
"""
# All collector-created agents
cursor = agents_collection.find(
{"created_by": "agent_collector_api"},
{"agent_name": 1, "agent_contact_person": 1, "registration_complete": 1, "created_at": 1, "_id": 1}
)
candidates = await cursor.to_list(length=None)
if not candidates:
return []
# Resolve which contact emails belong to actual active users (case-insensitive).
contact_emails = {(a.get("agent_contact_person") or "").strip().lower() for a in candidates}
contact_emails.discard("")
user_cursor = users_collection.find(
{"is_active": True, "email": {"$ne": None}},
{"email": 1},
)
known_user_emails = set()
async for u in user_cursor:
e = (u.get("email") or "").strip().lower()
if e:
known_user_emails.add(e)
unresolved = []
for a in candidates:
email = (a.get("agent_contact_person") or "").strip().lower()
if not email or email not in known_user_emails:
unresolved.append(a)
return unresolved
async def reassign_agent_owner(agent_id: str, new_owner_user_id: str, new_contact_email: str):
"""Admin tool: reassign a collector-created agent to a real user.
Sets `created_by` to the user's ID and `agent_contact_person` to their email,
so they pick up the completion-reminder flow on the next run.
"""
update = {"updated_at": datetime.utcnow()}
if new_owner_user_id:
update["created_by"] = new_owner_user_id
if new_contact_email:
update["agent_contact_person"] = new_contact_email
await agents_collection.update_one(
{"_id": ObjectId(agent_id)},
{"$set": update},
)
return await agents_collection.find_one({"_id": ObjectId(agent_id)})
async def complete_agent_registration(
agent_id: str,
completion_data: dict,
completing_user_id: str,
completing_user_email: str,
):
"""Apply registration-form completion to an agent.
- Writes all completion fields onto the agent document.
- Sets `registration_complete = True`.
- If the agent was originally created by the collector marker, reassigns
`created_by` to the submitting user so they can manage it normally afterwards.
"""
existing = await agents_collection.find_one({"_id": ObjectId(agent_id)})
if not existing:
raise ValueError(f"Agent {agent_id} not found")
update_fields = {
**completion_data,
"registration_complete": True,
"last_edited_by": completing_user_email,
"updated_at": datetime.utcnow(),
}
# Reassign ownership from the collector marker to the user who finished registration.
if existing.get("created_by") == "agent_collector_api":
update_fields["created_by"] = completing_user_id
await agents_collection.update_one(
{"_id": ObjectId(agent_id)},
{"$set": update_fields},
)
return await agents_collection.find_one({"_id": ObjectId(agent_id)})
async def migrate_pencil_agents_discipline():
"""Update agents with 'pencil' in name that have no discipline set to 'Pencil Agents'"""
result = await agents_collection.update_many(
{
"agent_name": {"$regex": "pencil", "$options": "i"},
"$or": [
{"discipline": None},
{"discipline": ""},
{"discipline": {"$exists": False}}
]
},
{"$set": {"discipline": "Pencil Agents"}}
)
return result.modified_count
async def migrate_optimisation_spelling():
"""Rename discipline from 'Optimization' (US) to 'Optimisation' (UK)."""
result = await agents_collection.update_many(
{"discipline": "Optimization"},
{"$set": {"discipline": "Optimisation"}}
)
return result.modified_count
async def backfill_registration_complete():
"""One-time backfill of `registration_complete` for agents that pre-date the field.
Form-registered agents are grandfathered as complete (existing fields are valid under the
legacy schema). Collector-created agents are marked incomplete so they enter the
completion-reminder flow.
"""
form_result = await agents_collection.update_many(
{
"registration_complete": {"$exists": False},
"created_by": {"$ne": "agent_collector_api"},
},
{"$set": {"registration_complete": True}},
)
collector_result = await agents_collection.update_many(
{
"registration_complete": {"$exists": False},
"created_by": "agent_collector_api",
},
{"$set": {"registration_complete": False}},
)
return {
"form_complete": form_result.modified_count,
"collector_incomplete": collector_result.modified_count,
}