from datetime import datetime, timedelta from bson import ObjectId import database from database import users_collection, agents_collection, agent_usage_collection import auth from auth import hash_password, verify_password import re async def get_user_by_email(email: str): return await users_collection.find_one({"email": email}) async def get_user_by_azure_ad_id(azure_ad_id: str): """Get user by Azure AD Object ID""" return await users_collection.find_one({"azure_ad_id": azure_ad_id}) async def create_user(email: str, password: str = None, full_name: str = None, is_admin: bool = False, auth_provider: str = "local"): now = datetime.utcnow() user_doc = { "email": email, "full_name": full_name, "is_active": True, "is_admin": is_admin, "auth_provider": auth_provider, "created_at": now, "updated_at": now, } # Only add hashed_password for local auth users if auth_provider == "local" and password: user_doc["hashed_password"] = hash_password(password) result = await users_collection.insert_one(user_doc) user_doc["_id"] = result.inserted_id return user_doc async def create_or_update_azure_user(azure_profile: dict): """ Create or update user from Azure AD profile Returns the user document """ azure_ad_id = azure_profile.get("azure_ad_id") email = azure_profile.get("email") if not azure_ad_id or not email: raise ValueError("Azure AD profile missing required fields") now = datetime.utcnow() # Check if user exists by Azure AD ID first, then by email existing_user = await get_user_by_azure_ad_id(azure_ad_id) if not existing_user: existing_user = await get_user_by_email(email) if existing_user: # Update existing user with Azure AD info update_data = { "azure_ad_id": azure_ad_id, "email": email, "full_name": azure_profile.get("full_name") or existing_user.get("full_name"), "auth_provider": "azure_ad", "updated_at": now, } await users_collection.update_one( {"_id": existing_user["_id"]}, {"$set": update_data} ) # Return updated user return await users_collection.find_one({"_id": existing_user["_id"]}) else: # Create new user from Azure AD profile user_doc = { "azure_ad_id": azure_ad_id, "email": email, "full_name": azure_profile.get("full_name"), "is_active": True, "is_admin": False, # New users are not admin by default "auth_provider": "azure_ad", "created_at": now, "updated_at": now, } result = await users_collection.insert_one(user_doc) user_doc["_id"] = result.inserted_id return user_doc async def authenticate_user(email: str, password: str): """Authenticate user with local credentials (fallback method)""" print(f"🔐 CRUD: Authenticating user {email}") user = await get_user_by_email(email) if not user: print(f"🔐 CRUD: User {email} not found in database") return None print(f"🔐 CRUD: User found - auth_provider: {user.get('auth_provider')}") print(f"🔐 CRUD: Has hashed_password: {'hashed_password' in user}") # Only authenticate local users with password if user.get("auth_provider") != "local": print(f"🔐 CRUD: User is not local auth provider: {user.get('auth_provider')}") return None if not user.get("hashed_password"): print("🔐 CRUD: User has no hashed_password field") return None print("🔐 CRUD: Verifying password...") password_valid = verify_password(password, user["hashed_password"]) print(f"🔐 CRUD: Password verification result: {password_valid}") if not password_valid: return None print("🔐 CRUD: Authentication successful!") return user # Agent CRUD operations async def create_agent(agent_data: dict, user_id: str, skip_time_check: bool = False): # Check for duplicate agent names from the same user created recently (within 5 minutes) if not skip_time_check: from datetime import timedelta five_minutes_ago = datetime.utcnow() - timedelta(minutes=5) existing_agent = await agents_collection.find_one({ "agent_name": agent_data.get("agent_name"), "created_by": user_id, "created_at": {"$gte": five_minutes_ago} }) if existing_agent: raise ValueError(f"Agent with name '{agent_data.get('agent_name')}' was already created recently. Please wait before creating another agent with the same name.") now = datetime.utcnow() agent_doc = { **agent_data, "created_by": user_id, "created_at": now, "updated_at": now, } # Initialize quality audit fields if not provided if "quality_audit_status" not in agent_doc: agent_doc["quality_audit_status"] = False if "quality_audit_updated_by" not in agent_doc: agent_doc["quality_audit_updated_by"] = None if "quality_audit_updated_at" not in agent_doc: agent_doc["quality_audit_updated_at"] = None if "quality_audit_updated_by_name" not in agent_doc: agent_doc["quality_audit_updated_by_name"] = None if "risk_factor" not in agent_doc: agent_doc["risk_factor"] = None result = await agents_collection.insert_one(agent_doc) agent_doc["_id"] = result.inserted_id return agent_doc async def get_agent_by_id(agent_id: str): try: return await agents_collection.find_one({"_id": ObjectId(agent_id)}) except: return None async def get_agents_by_user(user_id: str, status_filter: str = None, limit: int = None, user_email: str = None): # Build query to include both owned agents and contact person agents query_conditions = [{"created_by": user_id}] # If user email is provided, also include agents where user is the contact person if user_email: # Escape special regex characters in email and do case-insensitive exact match escaped_email = re.escape(user_email) query_conditions.append({"agent_contact_person": {"$regex": f"^{escaped_email}$", "$options": "i"}}) query = {"$or": query_conditions} if status_filter: query["agent_status"] = status_filter cursor = agents_collection.find(query).sort("created_at", -1) if limit: cursor = cursor.limit(limit) return await cursor.to_list(length=None) async def get_all_agents(status_filter: str = None, limit: int = None): query = {} if status_filter: query["agent_status"] = status_filter cursor = agents_collection.find(query).sort("created_at", -1) if limit: cursor = cursor.limit(limit) return await cursor.to_list(length=None) async def search_agents(search_term: str, user_id: str = None): """Search agents by name, description, or tags""" query = { "$or": [ {"agent_name": {"$regex": search_term, "$options": "i"}}, {"agent_description": {"$regex": search_term, "$options": "i"}}, {"agent_tags": {"$regex": search_term, "$options": "i"}}, {"agent_department": {"$regex": search_term, "$options": "i"}} ] } if user_id: query["created_by"] = user_id return await agents_collection.find(query).sort("created_at", -1).to_list(length=None) async def get_agent_stats(): """Get agent statistics""" pipeline = [ { "$group": { "_id": "$agent_status", "count": {"$sum": 1} } } ] stats = await agents_collection.aggregate(pipeline).to_list(length=None) return {stat["_id"]: stat["count"] for stat in stats} async def update_agent(agent_id: str, update_data: dict, user_id: str = None, admin_user_info: dict = None, last_edited_by: str = None): try: filter_query = {"_id": ObjectId(agent_id)} if user_id: filter_query["created_by"] = user_id # Handle Quality Audit updates specially if "quality_audit_status" in update_data and admin_user_info: # Get current agent to check if quality audit status is changing current_agent = await get_agent_by_id(agent_id) if current_agent and current_agent.get("quality_audit_status") != update_data["quality_audit_status"]: # Quality audit status is changing, update audit fields with separate timestamp update_data["quality_audit_updated_by"] = admin_user_info.get("user_id") update_data["quality_audit_updated_at"] = datetime.utcnow().isoformat() update_data["quality_audit_updated_by_name"] = admin_user_info.get("user_name", admin_user_info.get("email")) # If Quality Audit is being unchecked, clear Risk Factor if update_data["quality_audit_status"] is False: update_data["risk_factor"] = None update_data["updated_at"] = datetime.utcnow() if last_edited_by: update_data["last_edited_by"] = last_edited_by result = await agents_collection.update_one( filter_query, {"$set": update_data} ) if result.modified_count: return await get_agent_by_id(agent_id) return None except: return None async def update_agent_quality_audit(agent_id: str, quality_audit_status: bool, admin_user_info: dict): """Update only the quality audit status of an agent (admin only)""" try: # Get current agent to ensure it exists current_agent = await get_agent_by_id(agent_id) if not current_agent: return None # Only update if status is actually changing if current_agent.get("quality_audit_status") == quality_audit_status: return current_agent update_data = { "quality_audit_status": quality_audit_status, "quality_audit_updated_by": admin_user_info.get("user_id"), "quality_audit_updated_at": datetime.utcnow().isoformat(), "quality_audit_updated_by_name": admin_user_info.get("user_name", admin_user_info.get("email")) } result = await agents_collection.update_one( {"_id": ObjectId(agent_id)}, {"$set": update_data} ) if result.modified_count: return await get_agent_by_id(agent_id) return None except: return None async def delete_agent(agent_id: str, user_id: str = None): try: print(f"🗑️ CRUD: Deleting agent {agent_id} with user_id filter: {user_id}") filter_query = {"_id": ObjectId(agent_id)} if user_id: filter_query["created_by"] = user_id print(f"🗑️ CRUD: Filter query: {filter_query}") result = await agents_collection.delete_one(filter_query) print(f"🗑️ CRUD: Delete result - deleted_count: {result.deleted_count}") return result.deleted_count > 0 except Exception as e: print(f"🗑️ CRUD: Exception during delete: {e}") return False async def get_user_by_id(user_id: str): try: return await users_collection.find_one({"_id": ObjectId(user_id)}) except: return None async def get_all_users(): return await users_collection.find({}).to_list(length=None) async def update_user(user_id: str, update_data: dict): try: update_data["updated_at"] = datetime.utcnow() result = await users_collection.update_one( {"_id": ObjectId(user_id)}, {"$set": update_data} ) if result.modified_count: return await get_user_by_id(user_id) return None except: return None async def delete_user(user_id: str): try: result = await users_collection.delete_one({"_id": ObjectId(user_id)}) return result.deleted_count > 0 except: return False async def admin_create_local_user(email: str, password: str, full_name: str = None, is_admin: bool = False): """Create a new local user (admin only)""" # Check if user already exists existing = await get_user_by_email(email) if existing: raise ValueError("User with this email already exists") # Validate password length if len(password) < 8: raise ValueError("Password must be at least 8 characters") return await create_user( email=email, password=password, full_name=full_name, is_admin=is_admin, auth_provider="local" ) async def admin_reset_user_password(user_id: str, new_password: str): """Reset password for a local user (admin only)""" user = await get_user_by_id(user_id) if not user: raise ValueError("User not found") if user.get("auth_provider") != "local": raise ValueError("Cannot reset password for SSO users") if len(new_password) < 8: raise ValueError("Password must be at least 8 characters") hashed = hash_password(new_password) result = await users_collection.update_one( {"_id": ObjectId(user_id)}, {"$set": {"hashed_password": hashed, "updated_at": datetime.utcnow()}} ) return result.modified_count > 0 async def change_user_password(user_id: str, current_password: str, new_password: str): """Change password for current user (requires current password verification)""" user = await get_user_by_id(user_id) if not user: raise ValueError("User not found") if user.get("auth_provider") != "local": raise ValueError("Cannot change password for SSO users") if not user.get("hashed_password"): raise ValueError("User has no password set") # Verify current password if not verify_password(current_password, user["hashed_password"]): raise ValueError("Current password is incorrect") if len(new_password) < 8: raise ValueError("New password must be at least 8 characters") hashed = hash_password(new_password) result = await users_collection.update_one( {"_id": ObjectId(user_id)}, {"$set": {"hashed_password": hashed, "updated_at": datetime.utcnow()}} ) return result.modified_count > 0 async def get_agent_by_name(agent_name: str): """Get agent by exact name match globally""" return await agents_collection.find_one({"agent_name": agent_name}) def _agent_data_differs(existing_agent: dict, new_agent_data: dict) -> bool: """Compare agent data excluding name and metadata fields to detect differences""" comparable_fields = [ "agent_description", "agent_purpose", "agent_version", "agent_status", "agent_location", "agent_department", "agent_contact_person", "agent_tags", "agent_userbase", "agent_capabilities", "agent_metadata", "url" ] for field in comparable_fields: existing_value = existing_agent.get(field) new_value = new_agent_data.get(field) if existing_value != new_value: return True return False async def create_agent_usage_record(agent_name: str, agent_data: dict): """Create usage record for existing agent and update main record if data differs""" now = datetime.utcnow() usage_doc = { "agent_name": agent_name, "agent_data": agent_data, "timestamp": now, "created_at": now } try: result = await agent_usage_collection.insert_one(usage_doc) existing_agent = await get_agent_by_name(agent_name) if existing_agent and _agent_data_differs(existing_agent, agent_data): update_data = {k: v for k, v in agent_data.items() if k != "agent_name"} update_data["updated_at"] = now await agents_collection.update_one( {"agent_name": agent_name}, {"$set": update_data} ) return result.inserted_id except Exception as e: raise Exception(f"Failed to store agent usage data: {str(e)}") async def get_agent_usage_stats(agent_name: str, start_date: datetime = None, end_date: datetime = None): """Get usage statistics for an agent within date range""" query = {"agent_name": agent_name} if start_date or end_date: date_filter = {} if start_date: date_filter["$gte"] = start_date if end_date: date_filter["$lte"] = end_date query["timestamp"] = date_filter # Get total count total_count = await agent_usage_collection.count_documents(query) # Get first and last usage first_usage = await agent_usage_collection.find_one(query, sort=[("timestamp", 1)]) last_usage = await agent_usage_collection.find_one(query, sort=[("timestamp", -1)]) return { "total_usage_count": total_count, "first_usage": first_usage["timestamp"] if first_usage else None, "last_usage": last_usage["timestamp"] if last_usage else None } async def get_agent_usage_by_period(agent_name: str, period: str = "daily", start_date: datetime = None, end_date: datetime = None): """Get usage data grouped by time period (daily, weekly, monthly)""" query = {"agent_name": agent_name} if start_date or end_date: date_filter = {} if start_date: date_filter["$gte"] = start_date if end_date: date_filter["$lte"] = end_date query["timestamp"] = date_filter # Define grouping format based on period if period == "daily": date_format = "%Y-%m-%d" elif period == "weekly": date_format = "%Y-W%U" elif period == "monthly": date_format = "%Y-%m" else: date_format = "%Y-%m-%d" pipeline = [ {"$match": query}, { "$group": { "_id": {"$dateToString": {"format": date_format, "date": "$timestamp"}}, "count": {"$sum": 1} } }, {"$sort": {"_id": 1}} ] results = await agent_usage_collection.aggregate(pipeline).to_list(length=None) return {result["_id"]: result["count"] for result in results} async def create_agent_from_collector(agent_data: dict): """Create agent from agent collector API data (no user ownership) Automatically stores all fields including optional usage tracking fields: - usage_timeline: Array of {date, message_count} objects - conversation_count, unique_users, total_messages: Integer metrics - first_used, last_used: ISO datetime strings """ now = datetime.utcnow() # Set automatic timestamps if not provided if not agent_data.get("agent_created_at"): agent_data["agent_created_at"] = now.isoformat() if not agent_data.get("agent_updated_at"): agent_data["agent_updated_at"] = now.isoformat() agent_doc = { **agent_data, # Includes all fields including usage tracking fields if present "created_by": "agent_collector_api", # Special marker for agent collector created agents "created_at": now, "updated_at": now, } try: result = await agents_collection.insert_one(agent_doc) agent_doc["_id"] = result.inserted_id return agent_doc except Exception as e: raise Exception(f"Failed to store agent data: {str(e)}")