Rebuilds the agent registration form into 7 governance sections (Identity, Classification, Autonomy, IP, Tech Stack, Data Safety, Performance, Declarations) and introduces a completion flow for agents that come in via the LibreChat collector without the new required fields. - New form fields: business_entity, client_scope, agent_classification, autonomy_level, ip_ownership, foundation_model, validated_by/date, evals_method, plus nested safety / pii / declarations objects and a registration_complete flag. - registration_complete defaults to true for form-submitted agents and false for collector-created ones; existing agents are grandfathered via a startup migration. - Owner-by-email lookup so LibreChat-synced agents surface in the user's "My Agents" view with an Incomplete badge and Complete CTA. Submitting the completion form reassigns created_by from the collector marker to the user. - Daily APScheduler job sends a digest reminder email per owner with a 7-day cooldown and 4-nudge cap (configurable). Manual trigger via POST /api/admin/completion-reminders/send. - Admin banner + modal for collector agents whose contact email doesn't match an active user, with one-click reassignment. - Gemini audit extended to also return agent_classification and an autonomy hint; applied to agents on next batch run alongside discipline/department. - New filter dimensions on agent management + admin dashboard: Business Entity, Agent Type, Autonomy, plus a Compliance Risks quick toggle. - CSV export/import gains 21 columns covering all governance fields. - Discipline 'Optimization' renamed to 'Optimisation' with idempotent startup migration; Gemini system prompt and template dropdowns updated. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
897 lines
33 KiB
Python
897 lines
33 KiB
Python
from datetime import datetime, timedelta
|
|
from bson import ObjectId
|
|
import database
|
|
from database import users_collection, agents_collection, agent_usage_collection, agent_ratings_collection
|
|
import auth
|
|
from auth import hash_password, verify_password
|
|
import re
|
|
|
|
async def get_user_by_email(email: str):
|
|
return await users_collection.find_one({"email": email})
|
|
|
|
async def get_user_by_azure_ad_id(azure_ad_id: str):
|
|
"""Get user by Azure AD Object ID"""
|
|
return await users_collection.find_one({"azure_ad_id": azure_ad_id})
|
|
|
|
async def create_user(email: str, password: str = None, full_name: str = None, is_admin: bool = False, auth_provider: str = "local"):
|
|
now = datetime.utcnow()
|
|
user_doc = {
|
|
"email": email,
|
|
"full_name": full_name,
|
|
"is_active": True,
|
|
"is_admin": is_admin,
|
|
"auth_provider": auth_provider,
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
}
|
|
|
|
# Only add hashed_password for local auth users
|
|
if auth_provider == "local" and password:
|
|
user_doc["hashed_password"] = hash_password(password)
|
|
|
|
result = await users_collection.insert_one(user_doc)
|
|
user_doc["_id"] = result.inserted_id
|
|
return user_doc
|
|
|
|
async def create_or_update_azure_user(azure_profile: dict):
|
|
"""
|
|
Create or update user from Azure AD profile
|
|
Returns the user document
|
|
"""
|
|
azure_ad_id = azure_profile.get("azure_ad_id")
|
|
email = azure_profile.get("email")
|
|
|
|
if not azure_ad_id or not email:
|
|
raise ValueError("Azure AD profile missing required fields")
|
|
|
|
now = datetime.utcnow()
|
|
|
|
# Check if user exists by Azure AD ID first, then by email
|
|
existing_user = await get_user_by_azure_ad_id(azure_ad_id)
|
|
if not existing_user:
|
|
existing_user = await get_user_by_email(email)
|
|
|
|
if existing_user:
|
|
# Update existing user with Azure AD info
|
|
update_data = {
|
|
"azure_ad_id": azure_ad_id,
|
|
"email": email,
|
|
"full_name": azure_profile.get("full_name") or existing_user.get("full_name"),
|
|
"auth_provider": "azure_ad",
|
|
"updated_at": now,
|
|
}
|
|
|
|
await users_collection.update_one(
|
|
{"_id": existing_user["_id"]},
|
|
{"$set": update_data}
|
|
)
|
|
|
|
# Return updated user
|
|
return await users_collection.find_one({"_id": existing_user["_id"]})
|
|
else:
|
|
# Create new user from Azure AD profile
|
|
user_doc = {
|
|
"azure_ad_id": azure_ad_id,
|
|
"email": email,
|
|
"full_name": azure_profile.get("full_name"),
|
|
"is_active": True,
|
|
"is_admin": False, # New users are not admin by default
|
|
"auth_provider": "azure_ad",
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
}
|
|
|
|
result = await users_collection.insert_one(user_doc)
|
|
user_doc["_id"] = result.inserted_id
|
|
return user_doc
|
|
|
|
async def authenticate_user(email: str, password: str):
|
|
"""Authenticate user with local credentials (fallback method)"""
|
|
print(f"🔐 CRUD: Authenticating user {email}")
|
|
|
|
user = await get_user_by_email(email)
|
|
if not user:
|
|
print(f"🔐 CRUD: User {email} not found in database")
|
|
return None
|
|
|
|
print(f"🔐 CRUD: User found - auth_provider: {user.get('auth_provider')}")
|
|
print(f"🔐 CRUD: Has hashed_password: {'hashed_password' in user}")
|
|
|
|
# Only authenticate local users with password
|
|
if user.get("auth_provider") != "local":
|
|
print(f"🔐 CRUD: User is not local auth provider: {user.get('auth_provider')}")
|
|
return None
|
|
|
|
if not user.get("hashed_password"):
|
|
print("🔐 CRUD: User has no hashed_password field")
|
|
return None
|
|
|
|
print("🔐 CRUD: Verifying password...")
|
|
password_valid = verify_password(password, user["hashed_password"])
|
|
print(f"🔐 CRUD: Password verification result: {password_valid}")
|
|
|
|
if not password_valid:
|
|
return None
|
|
|
|
print("🔐 CRUD: Authentication successful!")
|
|
return user
|
|
|
|
# Agent CRUD operations
|
|
async def create_agent(agent_data: dict, user_id: str, skip_time_check: bool = False):
|
|
# Check for duplicate agent names from the same user created recently (within 5 minutes)
|
|
if not skip_time_check:
|
|
from datetime import timedelta
|
|
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
|
|
|
|
existing_agent = await agents_collection.find_one({
|
|
"agent_name": agent_data.get("agent_name"),
|
|
"created_by": user_id,
|
|
"created_at": {"$gte": five_minutes_ago}
|
|
})
|
|
|
|
if existing_agent:
|
|
raise ValueError(f"Agent with name '{agent_data.get('agent_name')}' was already created recently. Please wait before creating another agent with the same name.")
|
|
|
|
now = datetime.utcnow()
|
|
agent_doc = {
|
|
**agent_data,
|
|
"created_by": user_id,
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
}
|
|
|
|
# Initialize quality audit fields if not provided
|
|
if "quality_audit_status" not in agent_doc:
|
|
agent_doc["quality_audit_status"] = False
|
|
if "quality_audit_updated_by" not in agent_doc:
|
|
agent_doc["quality_audit_updated_by"] = None
|
|
if "quality_audit_updated_at" not in agent_doc:
|
|
agent_doc["quality_audit_updated_at"] = None
|
|
if "quality_audit_updated_by_name" not in agent_doc:
|
|
agent_doc["quality_audit_updated_by_name"] = None
|
|
if "risk_factor" not in agent_doc:
|
|
agent_doc["risk_factor"] = None
|
|
if "discipline" not in agent_doc:
|
|
agent_doc["discipline"] = None
|
|
if "rating" not in agent_doc:
|
|
agent_doc["rating"] = None
|
|
if "total_tokens" not in agent_doc:
|
|
agent_doc["total_tokens"] = None
|
|
if "prompt_tokens" not in agent_doc:
|
|
agent_doc["prompt_tokens"] = None
|
|
if "completion_tokens" not in agent_doc:
|
|
agent_doc["completion_tokens"] = None
|
|
result = await agents_collection.insert_one(agent_doc)
|
|
agent_doc["_id"] = result.inserted_id
|
|
return agent_doc
|
|
|
|
async def get_agent_by_id(agent_id: str):
|
|
try:
|
|
return await agents_collection.find_one({"_id": ObjectId(agent_id)})
|
|
except:
|
|
return None
|
|
|
|
async def get_agents_by_user(user_id: str, status_filter: str = None, limit: int = None, user_email: str = None):
|
|
# Build query to include both owned agents and contact person agents
|
|
query_conditions = [{"created_by": user_id}]
|
|
|
|
# If user email is provided, also include agents where user is the contact person
|
|
if user_email:
|
|
# Escape special regex characters in email and do case-insensitive exact match
|
|
escaped_email = re.escape(user_email)
|
|
query_conditions.append({"agent_contact_person": {"$regex": f"^{escaped_email}$", "$options": "i"}})
|
|
|
|
query = {"$or": query_conditions}
|
|
|
|
if status_filter:
|
|
query["agent_status"] = status_filter
|
|
|
|
cursor = agents_collection.find(query).sort("created_at", -1)
|
|
if limit:
|
|
cursor = cursor.limit(limit)
|
|
return await cursor.to_list(length=None)
|
|
|
|
async def get_all_agents(status_filter: str = None, limit: int = None):
|
|
query = {}
|
|
if status_filter:
|
|
query["agent_status"] = status_filter
|
|
|
|
cursor = agents_collection.find(query).sort("created_at", -1)
|
|
if limit:
|
|
cursor = cursor.limit(limit)
|
|
return await cursor.to_list(length=None)
|
|
|
|
async def search_agents(search_term: str, user_id: str = None):
|
|
"""Search agents by name, description, or tags"""
|
|
query = {
|
|
"$or": [
|
|
{"agent_name": {"$regex": search_term, "$options": "i"}},
|
|
{"agent_description": {"$regex": search_term, "$options": "i"}},
|
|
{"agent_tags": {"$regex": search_term, "$options": "i"}},
|
|
{"agent_department": {"$regex": search_term, "$options": "i"}},
|
|
{"discipline": {"$regex": search_term, "$options": "i"}}
|
|
]
|
|
}
|
|
|
|
if user_id:
|
|
query["created_by"] = user_id
|
|
|
|
return await agents_collection.find(query).sort("created_at", -1).to_list(length=None)
|
|
|
|
async def get_agent_stats(discipline_filter: str = None):
|
|
"""Get agent statistics"""
|
|
pipeline = []
|
|
if discipline_filter:
|
|
pipeline.append({"$match": {"discipline": discipline_filter}})
|
|
pipeline.append({
|
|
"$group": {
|
|
"_id": "$agent_status",
|
|
"count": {"$sum": 1}
|
|
}
|
|
})
|
|
|
|
stats = await agents_collection.aggregate(pipeline).to_list(length=None)
|
|
return {stat["_id"]: stat["count"] for stat in stats}
|
|
|
|
async def get_analytics_summary(status_filter: str = None, discipline_filter: str = None):
|
|
"""Get aggregated analytics summary across all agents"""
|
|
match_filter = {}
|
|
if status_filter:
|
|
match_filter["agent_status"] = status_filter
|
|
if discipline_filter:
|
|
match_filter["discipline"] = discipline_filter
|
|
|
|
pipeline = []
|
|
if match_filter:
|
|
pipeline.append({"$match": match_filter})
|
|
pipeline.append({
|
|
"$group": {
|
|
"_id": None,
|
|
"total_messages": {"$sum": {"$ifNull": ["$total_messages", 0]}},
|
|
"total_tokens": {"$sum": {"$ifNull": ["$total_tokens", 0]}},
|
|
"prompt_tokens": {"$sum": {"$ifNull": ["$prompt_tokens", 0]}},
|
|
"completion_tokens": {"$sum": {"$ifNull": ["$completion_tokens", 0]}},
|
|
"conversation_count": {"$sum": {"$ifNull": ["$conversation_count", 0]}},
|
|
"unique_users": {"$sum": {"$ifNull": ["$unique_users", 0]}},
|
|
}
|
|
})
|
|
results = await agents_collection.aggregate(pipeline).to_list(length=1)
|
|
if results:
|
|
r = results[0]
|
|
del r["_id"]
|
|
return r
|
|
return {
|
|
"total_messages": 0, "total_tokens": 0, "prompt_tokens": 0,
|
|
"completion_tokens": 0, "conversation_count": 0, "unique_users": 0,
|
|
}
|
|
|
|
async def get_discipline_breakdown(status_filter: str = None):
|
|
"""Group agents by discipline field"""
|
|
pipeline = []
|
|
if status_filter:
|
|
pipeline.append({"$match": {"agent_status": status_filter}})
|
|
pipeline.append({"$group": {"_id": "$discipline", "count": {"$sum": 1}}})
|
|
results = await agents_collection.aggregate(pipeline).to_list(length=None)
|
|
return {(r["_id"] or "Unassigned"): r["count"] for r in results}
|
|
|
|
async def get_aggregated_usage_timeline(limit_days: int = 90, status_filter: str = None, discipline_filter: str = None):
|
|
"""Aggregate usage_timeline across all agents for system-wide daily totals"""
|
|
match_filter = {"usage_timeline": {"$exists": True, "$ne": []}}
|
|
if status_filter:
|
|
match_filter["agent_status"] = status_filter
|
|
if discipline_filter:
|
|
match_filter["discipline"] = discipline_filter
|
|
|
|
pipeline = [
|
|
{"$match": match_filter},
|
|
{"$unwind": "$usage_timeline"},
|
|
{
|
|
"$group": {
|
|
"_id": "$usage_timeline.date",
|
|
"message_count": {"$sum": {"$ifNull": ["$usage_timeline.message_count", 0]}},
|
|
"token_count": {"$sum": {"$ifNull": ["$usage_timeline.token_count", 0]}},
|
|
}
|
|
},
|
|
{"$sort": {"_id": 1}},
|
|
{"$limit": limit_days},
|
|
]
|
|
results = await agents_collection.aggregate(pipeline).to_list(length=None)
|
|
return [{"date": r["_id"], "message_count": r["message_count"], "token_count": r["token_count"]} for r in results]
|
|
|
|
async def get_top_agents(sort_field: str, limit: int = 5, status_filter: str = None, discipline_filter: str = None):
|
|
"""Return top N agents by a given numeric field"""
|
|
match_filter = {sort_field: {"$exists": True, "$ne": None, "$gt": 0}}
|
|
if status_filter:
|
|
match_filter["agent_status"] = status_filter
|
|
if discipline_filter:
|
|
match_filter["discipline"] = discipline_filter
|
|
|
|
pipeline = [
|
|
{"$match": match_filter},
|
|
{"$sort": {sort_field: -1}},
|
|
{"$limit": limit},
|
|
{
|
|
"$project": {
|
|
"agent_name": 1,
|
|
"agent_status": 1,
|
|
"discipline": 1,
|
|
"total_messages": 1,
|
|
"total_tokens": 1,
|
|
"last_used": 1,
|
|
}
|
|
},
|
|
]
|
|
results = await agents_collection.aggregate(pipeline).to_list(length=None)
|
|
for r in results:
|
|
r["_id"] = str(r["_id"])
|
|
return results
|
|
|
|
async def get_recently_active_agents(limit: int = 10, status_filter: str = None, discipline_filter: str = None):
|
|
"""Return recently active agents sorted by last_used"""
|
|
match_filter = {"last_used": {"$exists": True, "$ne": None}}
|
|
if status_filter:
|
|
match_filter["agent_status"] = status_filter
|
|
if discipline_filter:
|
|
match_filter["discipline"] = discipline_filter
|
|
|
|
pipeline = [
|
|
{"$match": match_filter},
|
|
{"$sort": {"last_used": -1}},
|
|
{"$limit": limit},
|
|
{
|
|
"$project": {
|
|
"agent_name": 1,
|
|
"agent_status": 1,
|
|
"discipline": 1,
|
|
"total_messages": 1,
|
|
"total_tokens": 1,
|
|
"last_used": 1,
|
|
}
|
|
},
|
|
]
|
|
results = await agents_collection.aggregate(pipeline).to_list(length=None)
|
|
for r in results:
|
|
r["_id"] = str(r["_id"])
|
|
return results
|
|
|
|
async def update_agent(agent_id: str, update_data: dict, user_id: str = None, admin_user_info: dict = None, last_edited_by: str = None):
|
|
try:
|
|
filter_query = {"_id": ObjectId(agent_id)}
|
|
if user_id:
|
|
filter_query["created_by"] = user_id
|
|
|
|
# Handle Quality Audit updates specially
|
|
if "quality_audit_status" in update_data and admin_user_info:
|
|
# Get current agent to check if quality audit status is changing
|
|
current_agent = await get_agent_by_id(agent_id)
|
|
|
|
if current_agent and current_agent.get("quality_audit_status") != update_data["quality_audit_status"]:
|
|
# Quality audit status is changing, update audit fields with separate timestamp
|
|
update_data["quality_audit_updated_by"] = admin_user_info.get("user_id")
|
|
update_data["quality_audit_updated_at"] = datetime.utcnow().isoformat()
|
|
update_data["quality_audit_updated_by_name"] = admin_user_info.get("user_name", admin_user_info.get("email"))
|
|
|
|
# If Quality Audit is being unchecked, clear Risk Factor
|
|
if update_data["quality_audit_status"] is False:
|
|
update_data["risk_factor"] = None
|
|
|
|
update_data["updated_at"] = datetime.utcnow()
|
|
if last_edited_by:
|
|
update_data["last_edited_by"] = last_edited_by
|
|
result = await agents_collection.update_one(
|
|
filter_query,
|
|
{"$set": update_data}
|
|
)
|
|
if result.modified_count:
|
|
return await get_agent_by_id(agent_id)
|
|
return None
|
|
except:
|
|
return None
|
|
|
|
async def update_agent_quality_audit(agent_id: str, quality_audit_status: bool, admin_user_info: dict):
|
|
"""Update only the quality audit status of an agent (admin only)"""
|
|
try:
|
|
# Get current agent to ensure it exists
|
|
current_agent = await get_agent_by_id(agent_id)
|
|
if not current_agent:
|
|
return None
|
|
|
|
# Only update if status is actually changing
|
|
if current_agent.get("quality_audit_status") == quality_audit_status:
|
|
return current_agent
|
|
|
|
update_data = {
|
|
"quality_audit_status": quality_audit_status,
|
|
"quality_audit_updated_by": admin_user_info.get("user_id"),
|
|
"quality_audit_updated_at": datetime.utcnow().isoformat(),
|
|
"quality_audit_updated_by_name": admin_user_info.get("user_name", admin_user_info.get("email"))
|
|
}
|
|
|
|
result = await agents_collection.update_one(
|
|
{"_id": ObjectId(agent_id)},
|
|
{"$set": update_data}
|
|
)
|
|
|
|
if result.modified_count:
|
|
return await get_agent_by_id(agent_id)
|
|
return None
|
|
except:
|
|
return None
|
|
|
|
async def delete_agent(agent_id: str, user_id: str = None):
|
|
try:
|
|
print(f"🗑️ CRUD: Deleting agent {agent_id} with user_id filter: {user_id}")
|
|
filter_query = {"_id": ObjectId(agent_id)}
|
|
if user_id:
|
|
filter_query["created_by"] = user_id
|
|
|
|
print(f"🗑️ CRUD: Filter query: {filter_query}")
|
|
result = await agents_collection.delete_one(filter_query)
|
|
print(f"🗑️ CRUD: Delete result - deleted_count: {result.deleted_count}")
|
|
return result.deleted_count > 0
|
|
except Exception as e:
|
|
print(f"🗑️ CRUD: Exception during delete: {e}")
|
|
return False
|
|
|
|
async def get_user_by_id(user_id: str):
|
|
try:
|
|
return await users_collection.find_one({"_id": ObjectId(user_id)})
|
|
except:
|
|
return None
|
|
|
|
async def get_all_users():
|
|
return await users_collection.find({}).to_list(length=None)
|
|
|
|
async def update_user(user_id: str, update_data: dict):
|
|
try:
|
|
update_data["updated_at"] = datetime.utcnow()
|
|
result = await users_collection.update_one(
|
|
{"_id": ObjectId(user_id)},
|
|
{"$set": update_data}
|
|
)
|
|
if result.modified_count:
|
|
return await get_user_by_id(user_id)
|
|
return None
|
|
except:
|
|
return None
|
|
|
|
async def delete_user(user_id: str):
|
|
try:
|
|
result = await users_collection.delete_one({"_id": ObjectId(user_id)})
|
|
return result.deleted_count > 0
|
|
except:
|
|
return False
|
|
|
|
async def admin_create_local_user(email: str, password: str, full_name: str = None, is_admin: bool = False):
|
|
"""Create a new local user (admin only)"""
|
|
# Check if user already exists
|
|
existing = await get_user_by_email(email)
|
|
if existing:
|
|
raise ValueError("User with this email already exists")
|
|
|
|
# Validate password length
|
|
if len(password) < 8:
|
|
raise ValueError("Password must be at least 8 characters")
|
|
|
|
return await create_user(
|
|
email=email,
|
|
password=password,
|
|
full_name=full_name,
|
|
is_admin=is_admin,
|
|
auth_provider="local"
|
|
)
|
|
|
|
async def admin_reset_user_password(user_id: str, new_password: str):
|
|
"""Reset password for a local user (admin only)"""
|
|
user = await get_user_by_id(user_id)
|
|
if not user:
|
|
raise ValueError("User not found")
|
|
|
|
if user.get("auth_provider") != "local":
|
|
raise ValueError("Cannot reset password for SSO users")
|
|
|
|
if len(new_password) < 8:
|
|
raise ValueError("Password must be at least 8 characters")
|
|
|
|
hashed = hash_password(new_password)
|
|
result = await users_collection.update_one(
|
|
{"_id": ObjectId(user_id)},
|
|
{"$set": {"hashed_password": hashed, "updated_at": datetime.utcnow()}}
|
|
)
|
|
return result.modified_count > 0
|
|
|
|
async def change_user_password(user_id: str, current_password: str, new_password: str):
|
|
"""Change password for current user (requires current password verification)"""
|
|
user = await get_user_by_id(user_id)
|
|
if not user:
|
|
raise ValueError("User not found")
|
|
|
|
if user.get("auth_provider") != "local":
|
|
raise ValueError("Cannot change password for SSO users")
|
|
|
|
if not user.get("hashed_password"):
|
|
raise ValueError("User has no password set")
|
|
|
|
# Verify current password
|
|
if not verify_password(current_password, user["hashed_password"]):
|
|
raise ValueError("Current password is incorrect")
|
|
|
|
if len(new_password) < 8:
|
|
raise ValueError("New password must be at least 8 characters")
|
|
|
|
hashed = hash_password(new_password)
|
|
result = await users_collection.update_one(
|
|
{"_id": ObjectId(user_id)},
|
|
{"$set": {"hashed_password": hashed, "updated_at": datetime.utcnow()}}
|
|
)
|
|
return result.modified_count > 0
|
|
|
|
async def get_agent_by_name(agent_name: str):
|
|
"""Get agent by exact name match globally"""
|
|
return await agents_collection.find_one({"agent_name": agent_name})
|
|
|
|
def _agent_data_differs(existing_agent: dict, new_agent_data: dict) -> bool:
|
|
"""Compare agent data excluding name and metadata fields to detect differences"""
|
|
comparable_fields = [
|
|
"agent_description", "agent_purpose", "agent_version", "agent_status",
|
|
"agent_location", "agent_department", "agent_contact_person",
|
|
"agent_tags", "agent_userbase", "agent_capabilities", "agent_metadata",
|
|
"url", "discipline"
|
|
]
|
|
|
|
for field in comparable_fields:
|
|
existing_value = existing_agent.get(field)
|
|
new_value = new_agent_data.get(field)
|
|
|
|
if existing_value != new_value:
|
|
return True
|
|
|
|
return False
|
|
|
|
async def create_agent_usage_record(agent_name: str, agent_data: dict):
|
|
"""Create usage record for existing agent and update main record if data differs"""
|
|
now = datetime.utcnow()
|
|
usage_doc = {
|
|
"agent_name": agent_name,
|
|
"agent_data": agent_data,
|
|
"timestamp": now,
|
|
"created_at": now
|
|
}
|
|
|
|
try:
|
|
result = await agent_usage_collection.insert_one(usage_doc)
|
|
|
|
existing_agent = await get_agent_by_name(agent_name)
|
|
if existing_agent and _agent_data_differs(existing_agent, agent_data):
|
|
update_data = {k: v for k, v in agent_data.items() if k != "agent_name"}
|
|
update_data["updated_at"] = now
|
|
|
|
await agents_collection.update_one(
|
|
{"agent_name": agent_name},
|
|
{"$set": update_data}
|
|
)
|
|
|
|
return result.inserted_id
|
|
except Exception as e:
|
|
raise Exception(f"Failed to store agent usage data: {str(e)}")
|
|
|
|
async def get_agent_usage_stats(agent_name: str, start_date: datetime = None, end_date: datetime = None):
|
|
"""Get usage statistics for an agent within date range"""
|
|
query = {"agent_name": agent_name}
|
|
|
|
if start_date or end_date:
|
|
date_filter = {}
|
|
if start_date:
|
|
date_filter["$gte"] = start_date
|
|
if end_date:
|
|
date_filter["$lte"] = end_date
|
|
query["timestamp"] = date_filter
|
|
|
|
# Get total count
|
|
total_count = await agent_usage_collection.count_documents(query)
|
|
|
|
# Get first and last usage
|
|
first_usage = await agent_usage_collection.find_one(query, sort=[("timestamp", 1)])
|
|
last_usage = await agent_usage_collection.find_one(query, sort=[("timestamp", -1)])
|
|
|
|
return {
|
|
"total_usage_count": total_count,
|
|
"first_usage": first_usage["timestamp"] if first_usage else None,
|
|
"last_usage": last_usage["timestamp"] if last_usage else None
|
|
}
|
|
|
|
async def get_agent_usage_by_period(agent_name: str, period: str = "daily", start_date: datetime = None, end_date: datetime = None):
|
|
"""Get usage data grouped by time period (daily, weekly, monthly)"""
|
|
query = {"agent_name": agent_name}
|
|
|
|
if start_date or end_date:
|
|
date_filter = {}
|
|
if start_date:
|
|
date_filter["$gte"] = start_date
|
|
if end_date:
|
|
date_filter["$lte"] = end_date
|
|
query["timestamp"] = date_filter
|
|
|
|
# Define grouping format based on period
|
|
if period == "daily":
|
|
date_format = "%Y-%m-%d"
|
|
elif period == "weekly":
|
|
date_format = "%Y-W%U"
|
|
elif period == "monthly":
|
|
date_format = "%Y-%m"
|
|
else:
|
|
date_format = "%Y-%m-%d"
|
|
|
|
pipeline = [
|
|
{"$match": query},
|
|
{
|
|
"$group": {
|
|
"_id": {"$dateToString": {"format": date_format, "date": "$timestamp"}},
|
|
"count": {"$sum": 1}
|
|
}
|
|
},
|
|
{"$sort": {"_id": 1}}
|
|
]
|
|
|
|
results = await agent_usage_collection.aggregate(pipeline).to_list(length=None)
|
|
return {result["_id"]: result["count"] for result in results}
|
|
|
|
async def create_agent_from_collector(agent_data: dict):
|
|
"""Create agent from agent collector API data (no user ownership)
|
|
|
|
Automatically stores all fields including optional usage tracking fields:
|
|
- usage_timeline: Array of {date, message_count} objects
|
|
- conversation_count, unique_users, total_messages: Integer metrics
|
|
- first_used, last_used: ISO datetime strings
|
|
"""
|
|
now = datetime.utcnow()
|
|
|
|
# Set automatic timestamps if not provided
|
|
if not agent_data.get("agent_created_at"):
|
|
agent_data["agent_created_at"] = now.isoformat()
|
|
if not agent_data.get("agent_updated_at"):
|
|
agent_data["agent_updated_at"] = now.isoformat()
|
|
|
|
agent_doc = {
|
|
**agent_data, # Includes all fields including usage tracking fields if present
|
|
"created_by": "agent_collector_api", # Special marker for agent collector created agents
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
}
|
|
|
|
# Collector-sourced agents always start incomplete — they're missing the new
|
|
# governance fields (business_entity, autonomy_level, ip_ownership, declarations,
|
|
# etc.) that only the form / completion flow can populate. Owner gets nudged via
|
|
# the completion-reminder email until they finish registration.
|
|
agent_doc.setdefault("registration_complete", False)
|
|
|
|
try:
|
|
result = await agents_collection.insert_one(agent_doc)
|
|
agent_doc["_id"] = result.inserted_id
|
|
return agent_doc
|
|
except Exception as e:
|
|
raise Exception(f"Failed to store agent data: {str(e)}")
|
|
|
|
|
|
# Per-user rating functions
|
|
|
|
async def upsert_agent_rating(agent_id: str, user_id: str, rating: float):
|
|
"""Insert or update a user's individual rating for an agent"""
|
|
now = datetime.utcnow()
|
|
result = await agent_ratings_collection.update_one(
|
|
{"agent_id": agent_id, "user_id": user_id},
|
|
{
|
|
"$set": {"rating": rating, "updated_at": now},
|
|
"$setOnInsert": {"created_at": now}
|
|
},
|
|
upsert=True
|
|
)
|
|
return result
|
|
|
|
async def get_agent_rating_stats(agent_id: str):
|
|
"""Get average rating and count for an agent via aggregation"""
|
|
pipeline = [
|
|
{"$match": {"agent_id": agent_id}},
|
|
{"$group": {
|
|
"_id": None,
|
|
"avg_rating": {"$avg": "$rating"},
|
|
"count": {"$sum": 1}
|
|
}}
|
|
]
|
|
results = await agent_ratings_collection.aggregate(pipeline).to_list(length=1)
|
|
if results:
|
|
return {
|
|
"avg_rating": round(results[0]["avg_rating"], 1),
|
|
"count": results[0]["count"]
|
|
}
|
|
return {"avg_rating": None, "count": 0}
|
|
|
|
async def get_user_rating_for_agent(agent_id: str, user_id: str):
|
|
"""Fetch current user's own rating for an agent"""
|
|
doc = await agent_ratings_collection.find_one(
|
|
{"agent_id": agent_id, "user_id": user_id}
|
|
)
|
|
return doc["rating"] if doc else None
|
|
|
|
async def update_agent_average_rating(agent_id: str):
|
|
"""Recalculate and store average rating on the agent document"""
|
|
stats = await get_agent_rating_stats(agent_id)
|
|
update_data = {
|
|
"rating": stats["avg_rating"],
|
|
"rating_count": stats["count"],
|
|
"updated_at": datetime.utcnow()
|
|
}
|
|
await agents_collection.update_one(
|
|
{"_id": ObjectId(agent_id)},
|
|
{"$set": update_data}
|
|
)
|
|
return stats
|
|
|
|
async def get_incomplete_agents_for_user(user_email: str, user_id: str, is_admin: bool = False):
|
|
"""Return agents needing the new registration form completed.
|
|
|
|
Owner resolution mirrors the completion-flow design (PLAN §5b):
|
|
- direct ownership (`created_by == user_id`), or
|
|
- LibreChat-synced ownership (`created_by == "agent_collector_api"` AND
|
|
`agent_contact_person == user_email`).
|
|
|
|
Admins see all incomplete agents.
|
|
"""
|
|
incomplete_filter = {"registration_complete": {"$ne": True}}
|
|
|
|
if is_admin:
|
|
query = incomplete_filter
|
|
else:
|
|
ownership = {"$or": [
|
|
{"created_by": user_id},
|
|
{"$and": [
|
|
{"created_by": "agent_collector_api"},
|
|
{"agent_contact_person": user_email},
|
|
]},
|
|
]}
|
|
query = {"$and": [incomplete_filter, ownership]}
|
|
|
|
cursor = agents_collection.find(query).sort("created_at", -1)
|
|
return await cursor.to_list(length=None)
|
|
|
|
|
|
async def get_unresolved_owner_agents():
|
|
"""Return collector-created agents whose `agent_contact_person` doesn't match any active user.
|
|
|
|
These are the ~5% of LibreChat-synced agents that won't get a completion-reminder
|
|
email and need an admin to either reassign ownership or delete them.
|
|
"""
|
|
# All collector-created agents
|
|
cursor = agents_collection.find(
|
|
{"created_by": "agent_collector_api"},
|
|
{"agent_name": 1, "agent_contact_person": 1, "registration_complete": 1, "created_at": 1, "_id": 1}
|
|
)
|
|
candidates = await cursor.to_list(length=None)
|
|
|
|
if not candidates:
|
|
return []
|
|
|
|
# Resolve which contact emails belong to actual active users (case-insensitive).
|
|
contact_emails = {(a.get("agent_contact_person") or "").strip().lower() for a in candidates}
|
|
contact_emails.discard("")
|
|
user_cursor = users_collection.find(
|
|
{"is_active": True, "email": {"$ne": None}},
|
|
{"email": 1},
|
|
)
|
|
known_user_emails = set()
|
|
async for u in user_cursor:
|
|
e = (u.get("email") or "").strip().lower()
|
|
if e:
|
|
known_user_emails.add(e)
|
|
|
|
unresolved = []
|
|
for a in candidates:
|
|
email = (a.get("agent_contact_person") or "").strip().lower()
|
|
if not email or email not in known_user_emails:
|
|
unresolved.append(a)
|
|
return unresolved
|
|
|
|
|
|
async def reassign_agent_owner(agent_id: str, new_owner_user_id: str, new_contact_email: str):
|
|
"""Admin tool: reassign a collector-created agent to a real user.
|
|
|
|
Sets `created_by` to the user's ID and `agent_contact_person` to their email,
|
|
so they pick up the completion-reminder flow on the next run.
|
|
"""
|
|
update = {"updated_at": datetime.utcnow()}
|
|
if new_owner_user_id:
|
|
update["created_by"] = new_owner_user_id
|
|
if new_contact_email:
|
|
update["agent_contact_person"] = new_contact_email
|
|
await agents_collection.update_one(
|
|
{"_id": ObjectId(agent_id)},
|
|
{"$set": update},
|
|
)
|
|
return await agents_collection.find_one({"_id": ObjectId(agent_id)})
|
|
|
|
|
|
async def complete_agent_registration(
|
|
agent_id: str,
|
|
completion_data: dict,
|
|
completing_user_id: str,
|
|
completing_user_email: str,
|
|
):
|
|
"""Apply registration-form completion to an agent.
|
|
|
|
- Writes all completion fields onto the agent document.
|
|
- Sets `registration_complete = True`.
|
|
- If the agent was originally created by the collector marker, reassigns
|
|
`created_by` to the submitting user so they can manage it normally afterwards.
|
|
"""
|
|
existing = await agents_collection.find_one({"_id": ObjectId(agent_id)})
|
|
if not existing:
|
|
raise ValueError(f"Agent {agent_id} not found")
|
|
|
|
update_fields = {
|
|
**completion_data,
|
|
"registration_complete": True,
|
|
"last_edited_by": completing_user_email,
|
|
"updated_at": datetime.utcnow(),
|
|
}
|
|
|
|
# Reassign ownership from the collector marker to the user who finished registration.
|
|
if existing.get("created_by") == "agent_collector_api":
|
|
update_fields["created_by"] = completing_user_id
|
|
|
|
await agents_collection.update_one(
|
|
{"_id": ObjectId(agent_id)},
|
|
{"$set": update_fields},
|
|
)
|
|
return await agents_collection.find_one({"_id": ObjectId(agent_id)})
|
|
|
|
|
|
async def migrate_pencil_agents_discipline():
|
|
"""Update agents with 'pencil' in name that have no discipline set to 'Pencil Agents'"""
|
|
result = await agents_collection.update_many(
|
|
{
|
|
"agent_name": {"$regex": "pencil", "$options": "i"},
|
|
"$or": [
|
|
{"discipline": None},
|
|
{"discipline": ""},
|
|
{"discipline": {"$exists": False}}
|
|
]
|
|
},
|
|
{"$set": {"discipline": "Pencil Agents"}}
|
|
)
|
|
return result.modified_count
|
|
|
|
|
|
async def migrate_optimisation_spelling():
|
|
"""Rename discipline from 'Optimization' (US) to 'Optimisation' (UK)."""
|
|
result = await agents_collection.update_many(
|
|
{"discipline": "Optimization"},
|
|
{"$set": {"discipline": "Optimisation"}}
|
|
)
|
|
return result.modified_count
|
|
|
|
|
|
async def backfill_registration_complete():
|
|
"""One-time backfill of `registration_complete` for agents that pre-date the field.
|
|
|
|
Form-registered agents are grandfathered as complete (existing fields are valid under the
|
|
legacy schema). Collector-created agents are marked incomplete so they enter the
|
|
completion-reminder flow.
|
|
"""
|
|
form_result = await agents_collection.update_many(
|
|
{
|
|
"registration_complete": {"$exists": False},
|
|
"created_by": {"$ne": "agent_collector_api"},
|
|
},
|
|
{"$set": {"registration_complete": True}},
|
|
)
|
|
collector_result = await agents_collection.update_many(
|
|
{
|
|
"registration_complete": {"$exists": False},
|
|
"created_by": "agent_collector_api",
|
|
},
|
|
{"$set": {"registration_complete": False}},
|
|
)
|
|
return {
|
|
"form_complete": form_result.modified_count,
|
|
"collector_incomplete": collector_result.modified_count,
|
|
}
|