import logging from bson import ObjectId from app.db import get_db from datetime import datetime, timezone logger = logging.getLogger(__name__) class Folder: @staticmethod async def create(folder_data, user_id): """Create a new folder with hierarchical support.""" db = await get_db() # Add metadata folder_data["created_at"] = datetime.now(timezone.utc) folder_data["created_by"] = user_id # Handle hierarchy parent_folder_id = folder_data.get("parent_folder_id") if parent_folder_id: # Validate parent folder exists parent_folder = await db.folders.find_one({"_id": ObjectId(parent_folder_id)}) if not parent_folder: raise ValueError("Parent folder not found") # Validate hierarchy depth (max 2 levels: parent -> child) parent_level = parent_folder.get("level", 0) if parent_level >= 1: raise ValueError("Maximum folder depth exceeded (max 2 levels)") folder_data["level"] = parent_level + 1 else: folder_data["parent_folder_id"] = None folder_data["level"] = 0 # Note: No longer storing persona_ids in folders - using persona-centric storage result = await db.folders.insert_one(folder_data) return str(result.inserted_id) @staticmethod async def find_by_id(folder_id): """Find a folder by its ID.""" db = await get_db() try: folder = await db.folders.find_one({"_id": ObjectId(folder_id)}) if folder: folder["_id"] = str(folder["_id"]) return folder except Exception as e: logger.error(f"Error in find_by_id: {e}") return None @staticmethod async def find_by_user(user_id, limit=100): """Find all folders created by a specific user.""" db = await get_db() cursor = db.folders.find({"created_by": user_id}).sort("created_at", -1).limit(limit) folders = await cursor.to_list(length=limit) result = [] for folder in folders: folder["_id"] = str(folder["_id"]) result.append(folder) return result @staticmethod async def get_all(limit=100): """Get all folders (for debugging/admin purposes).""" try: db = await get_db() cursor = db.folders.find().sort("created_at", -1).limit(limit) folders = await cursor.to_list(length=limit) result = [] for folder in folders: folder["_id"] = str(folder["_id"]) result.append(folder) return result except Exception as e: logger.error(f"Error in Folder.get_all: {e}") return [] @staticmethod async def update(folder_id, data): """Update a folder.""" db = await get_db() # Create a copy of the data to avoid modifying the original filtered_data = data.copy() # Remove fields that shouldn't be updated if '_id' in filtered_data: del filtered_data['_id'] if 'id' in filtered_data: del filtered_data['id'] if 'created_at' in filtered_data: del filtered_data['created_at'] if 'created_by' in filtered_data: del filtered_data['created_by'] # Set the updated timestamp filtered_data["updated_at"] = datetime.now(timezone.utc) result = await db.folders.update_one( {"_id": ObjectId(folder_id)}, {"$set": filtered_data} ) return result.modified_count > 0 @staticmethod async def delete(folder_id): """Delete a folder.""" db = await get_db() try: result = await db.folders.delete_one({"_id": ObjectId(folder_id)}) return result.deleted_count > 0 except Exception as e: logger.error(f"Error in delete: {e}") return False @staticmethod async def add_persona(folder_id, persona_id): """Add a persona to a folder (persona-centric storage).""" db = await get_db() try: logger.debug(f"FOLDER ADD_PERSONA: folder_id={folder_id}, persona_id={persona_id}") # Check if persona exists persona = await db.personas.find_one({"_id": ObjectId(persona_id)}) if not persona: logger.warning(f"FOLDER ADD_PERSONA: Persona {persona_id} not found") return False logger.debug(f"FOLDER ADD_PERSONA: Found persona {persona.get('name', 'Unknown')} ({persona_id})") logger.debug(f"FOLDER ADD_PERSONA: Current folder_ids: {persona.get('folder_ids', 'None')}") # Only update the persona's folder_ids - single source of truth persona_result = await db.personas.update_one( {"_id": ObjectId(persona_id)}, {"$addToSet": {"folder_ids": folder_id}, "$set": {"updated_at": datetime.now(timezone.utc)}} ) logger.debug(f"FOLDER ADD_PERSONA: Update result - modified_count: {persona_result.modified_count}, matched_count: {persona_result.matched_count}") # Verify the update updated_persona = await db.personas.find_one({"_id": ObjectId(persona_id)}) logger.debug(f"FOLDER ADD_PERSONA: Updated folder_ids: {updated_persona.get('folder_ids', 'None')}") # Update folder's updated_at timestamp await db.folders.update_one( {"_id": ObjectId(folder_id)}, {"$set": {"updated_at": datetime.now(timezone.utc)}} ) return persona_result.modified_count > 0 except Exception as e: logger.error(f"FOLDER ADD_PERSONA ERROR: {e}") import traceback logger.error(f"FOLDER ADD_PERSONA TRACEBACK: {traceback.format_exc()}") return False @staticmethod async def remove_persona(folder_id, persona_id): """Remove a persona from a folder (persona-centric storage).""" db = await get_db() try: logger.debug(f"FOLDER REMOVE_PERSONA: folder_id={folder_id}, persona_id={persona_id}") # Check if persona exists persona = await db.personas.find_one({"_id": ObjectId(persona_id)}) if not persona: logger.warning(f"FOLDER REMOVE_PERSONA: Persona {persona_id} not found") return False logger.debug(f"FOLDER REMOVE_PERSONA: Found persona {persona.get('name', 'Unknown')} ({persona_id})") logger.debug(f"FOLDER REMOVE_PERSONA: Current folder_ids: {persona.get('folder_ids', 'None')}") # Only update the persona's folder_ids - single source of truth persona_result = await db.personas.update_one( {"_id": ObjectId(persona_id)}, {"$pull": {"folder_ids": folder_id}, "$set": {"updated_at": datetime.now(timezone.utc)}} ) logger.debug(f"FOLDER REMOVE_PERSONA: Update result - modified_count: {persona_result.modified_count}, matched_count: {persona_result.matched_count}") # Verify the update updated_persona = await db.personas.find_one({"_id": ObjectId(persona_id)}) logger.debug(f"FOLDER REMOVE_PERSONA: Updated folder_ids: {updated_persona.get('folder_ids', 'None')}") # Update folder's updated_at timestamp await db.folders.update_one( {"_id": ObjectId(folder_id)}, {"$set": {"updated_at": datetime.now(timezone.utc)}} ) return persona_result.modified_count > 0 except Exception as e: logger.error(f"FOLDER REMOVE_PERSONA ERROR: {e}") import traceback logger.error(f"FOLDER REMOVE_PERSONA TRACEBACK: {traceback.format_exc()}") return False @staticmethod async def add_personas_batch(folder_id, persona_ids): """Add multiple personas to a folder (persona-centric storage).""" db = await get_db() try: logger.debug(f"FOLDER ADD_PERSONAS_BATCH: folder_id={folder_id}, persona_ids={persona_ids}") # Add folder to each persona's folder_ids - single source of truth persona_results = [] for persona_id in persona_ids: try: logger.debug(f"FOLDER BATCH: Processing persona {persona_id}") # Check if persona exists persona = await db.personas.find_one({"_id": ObjectId(persona_id)}) if not persona: logger.warning(f"FOLDER BATCH: Persona {persona_id} not found") persona_results.append(False) continue logger.debug(f"FOLDER BATCH: Found persona {persona.get('name', 'Unknown')}") logger.debug(f"FOLDER BATCH: Current folder_ids: {persona.get('folder_ids', 'None')}") result = await db.personas.update_one( {"_id": ObjectId(persona_id)}, {"$addToSet": {"folder_ids": folder_id}, "$set": {"updated_at": datetime.now(timezone.utc)}} ) logger.debug(f"FOLDER BATCH: Update result for {persona_id} - modified: {result.modified_count}") persona_results.append(result.modified_count > 0) except Exception as e: logger.error(f"FOLDER BATCH ERROR for persona {persona_id}: {e}") persona_results.append(False) # Update folder's updated_at timestamp await db.folders.update_one( {"_id": ObjectId(folder_id)}, {"$set": {"updated_at": datetime.now(timezone.utc)}} ) success_count = sum(1 for r in persona_results if r) logger.debug(f"FOLDER ADD_PERSONAS_BATCH: {success_count}/{len(persona_ids)} personas updated successfully") return any(persona_results) except Exception as e: logger.error(f"FOLDER ADD_PERSONAS_BATCH ERROR: {e}") import traceback logger.error(f"FOLDER ADD_PERSONAS_BATCH TRACEBACK: {traceback.format_exc()}") return False @staticmethod async def remove_personas_batch(folder_id, persona_ids): """Remove multiple personas from a folder (persona-centric storage).""" db = await get_db() try: logger.debug(f"FOLDER REMOVE_PERSONAS_BATCH: folder_id={folder_id}, persona_ids={persona_ids}") # Remove folder from each persona's folder_ids - single source of truth persona_results = [] for persona_id in persona_ids: try: logger.debug(f"FOLDER REMOVE_BATCH: Processing persona {persona_id}") # Check if persona exists persona = await db.personas.find_one({"_id": ObjectId(persona_id)}) if not persona: logger.warning(f"FOLDER REMOVE_BATCH: Persona {persona_id} not found") persona_results.append(False) continue logger.debug(f"FOLDER REMOVE_BATCH: Found persona {persona.get('name', 'Unknown')}") logger.debug(f"FOLDER REMOVE_BATCH: Current folder_ids: {persona.get('folder_ids', 'None')}") result = await db.personas.update_one( {"_id": ObjectId(persona_id)}, {"$pull": {"folder_ids": folder_id}, "$set": {"updated_at": datetime.now(timezone.utc)}} ) logger.debug(f"FOLDER REMOVE_BATCH: Update result for {persona_id} - modified: {result.modified_count}") persona_results.append(result.modified_count > 0) except Exception as e: logger.error(f"FOLDER REMOVE_BATCH ERROR for persona {persona_id}: {e}") persona_results.append(False) # Update folder's updated_at timestamp await db.folders.update_one( {"_id": ObjectId(folder_id)}, {"$set": {"updated_at": datetime.now(timezone.utc)}} ) success_count = sum(1 for r in persona_results if r) logger.debug(f"FOLDER REMOVE_PERSONAS_BATCH: {success_count}/{len(persona_ids)} personas updated successfully") return any(persona_results) except Exception as e: logger.error(f"FOLDER REMOVE_PERSONAS_BATCH ERROR: {e}") import traceback logger.error(f"FOLDER REMOVE_PERSONAS_BATCH TRACEBACK: {traceback.format_exc()}") return False @staticmethod async def get_folders_containing_persona(persona_id, user_id=None): """Find all folders that contain a specific persona (persona-centric storage).""" db = await get_db() try: # Get the persona to see which folders it belongs to persona = await db.personas.find_one({"_id": ObjectId(persona_id)}) if not persona or not persona.get("folder_ids"): return [] # Get folders by their IDs folder_ids = [ObjectId(fid) for fid in persona["folder_ids"]] query = {"_id": {"$in": folder_ids}} # Optionally filter by user if user_id: query["created_by"] = user_id cursor = db.folders.find(query) folders = await cursor.to_list(length=None) result = [] for folder in folders: folder["_id"] = str(folder["_id"]) result.append(folder) return result except Exception as e: logger.error(f"Error getting folders for persona {persona_id}: {e}") return [] @staticmethod async def get_folder_tree(user_id=None, limit=100): """Get all folders organized in a hierarchical tree structure.""" db = await get_db() try: # Get all folders query = {} if user_id: query["created_by"] = user_id cursor = db.folders.find(query).sort("created_at", -1).limit(limit) folders = await cursor.to_list(length=limit) # Convert ObjectIds to strings processed_folders = [] for folder in folders: folder["_id"] = str(folder["_id"]) if folder.get("parent_folder_id"): folder["parent_folder_id"] = str(folder["parent_folder_id"]) processed_folders.append(folder) return processed_folders except Exception as e: logger.error(f"Error in Folder.get_folder_tree: {e}") return [] @staticmethod async def get_descendants(folder_id): """Get all descendant folders of a given folder.""" db = await get_db() try: descendants = [] # Get immediate children children_cursor = db.folders.find({"parent_folder_id": folder_id}) children = await children_cursor.to_list(length=None) for child in children: child["_id"] = str(child["_id"]) if child.get("parent_folder_id"): child["parent_folder_id"] = str(child["parent_folder_id"]) descendants.append(child) # Since we only support 2 levels, children can't have children # But we'll keep this structure for potential future expansion return descendants except Exception as e: logger.error(f"Error getting descendants for folder {folder_id}: {e}") return [] @staticmethod async def get_sibling_names(parent_id): """Get names of all folders that would be siblings in the target location.""" db = await get_db() try: if parent_id: # Get folders at the same level under the parent siblings_cursor = db.folders.find({"parent_folder_id": parent_id}) else: # Get root level folders siblings_cursor = db.folders.find({"$or": [{"parent_folder_id": None}, {"level": 0}]}) siblings = await siblings_cursor.to_list(length=None) return [folder.get("name", "") for folder in siblings] except Exception as e: logger.error(f"Error getting sibling names: {e}") return [] @staticmethod def generate_unique_name(desired_name, existing_names): """Generate a unique name by adding suffix if conflicts exist.""" if desired_name not in existing_names: return desired_name counter = 1 while f"{desired_name}_{counter}" in existing_names: counter += 1 return f"{desired_name}_{counter}" @staticmethod async def move_folder(folder_id, new_parent_id, user_id): """Move a folder to a new parent with automatic hierarchy flattening.""" db = await get_db() try: # Get the folder to move folder = await db.folders.find_one({"_id": ObjectId(folder_id)}) if not folder: return False, "Folder not found" # Ownership check (M-H2) if user_id and folder.get("created_by") != user_id: return False, "Permission denied" # Check if trying to move into current parent (redundant operation) if new_parent_id and folder.get("parent_folder_id") == new_parent_id: return False, "Folder is already in this location" # If moving to root and folder has children, do nothing (it's already at root) if not new_parent_id: descendants = await Folder.get_descendants(folder_id) if len(descendants) > 0: return True, "Folder with children is already at root level" # Validate new parent new_level = 0 if new_parent_id: parent_folder = await db.folders.find_one({"_id": ObjectId(new_parent_id)}) if not parent_folder: return False, "Parent folder not found" # Check if moving into descendant (prevent circular reference) if parent_folder.get("parent_folder_id") == folder_id: return False, "Cannot move folder into its own descendant" parent_level = parent_folder.get("level", 0) if parent_level >= 1: return False, "Maximum folder depth exceeded" new_level = parent_level + 1 # Get children of the folder being moved descendants = await Folder.get_descendants(folder_id) # Get existing sibling names at the destination to handle conflicts existing_names = await Folder.get_sibling_names(new_parent_id) moved_folders = [] # First, move the main folder original_name = folder.get("name", "") unique_name = Folder.generate_unique_name(original_name, existing_names) if unique_name != original_name: # Update the folder name if there was a conflict await db.folders.update_one( {"_id": ObjectId(folder_id)}, {"$set": {"name": unique_name, "updated_at": datetime.now(timezone.utc)}} ) # Move the main folder update_data = { "parent_folder_id": new_parent_id, "level": new_level, "updated_at": datetime.now(timezone.utc) } result = await db.folders.update_one( {"_id": ObjectId(folder_id)}, {"$set": update_data} ) if result.modified_count > 0: moved_folders.append({"name": unique_name, "original_name": original_name}) existing_names.append(unique_name) # Add to existing names for subsequent conflicts # If there are children, flatten them to the same level if len(descendants) > 0: for child in descendants: child_name = child.get("name", "") unique_child_name = Folder.generate_unique_name(child_name, existing_names) # Update child folder name if there was a conflict if unique_child_name != child_name: await db.folders.update_one( {"_id": ObjectId(child["_id"])}, {"$set": {"name": unique_child_name, "updated_at": datetime.now(timezone.utc)}} ) # Move child to the same parent as the moved folder (flattening) child_result = await db.folders.update_one( {"_id": ObjectId(child["_id"])}, {"$set": { "parent_folder_id": new_parent_id, "level": new_level, "updated_at": datetime.now(timezone.utc) }} ) if child_result.modified_count > 0: moved_folders.append({"name": unique_child_name, "original_name": child_name}) existing_names.append(unique_child_name) # Add to existing names # Build success message if len(moved_folders) == 1: message = f"Folder moved successfully" else: subfolder_names = [f["name"] for f in moved_folders[1:]] # Exclude main folder message = f"Folder and {len(subfolder_names)} subfolders moved successfully (flattened): {', '.join(subfolder_names)}" return len(moved_folders) > 0, message except Exception as e: logger.error(f"Error moving folder {folder_id}: {e}") return False, f"Error moving folder: {str(e)}" @staticmethod async def delete_hierarchy(folder_id, user_id): """Delete a folder and all its descendants.""" db = await get_db() try: # Get the folder to delete folder = await db.folders.find_one({"_id": ObjectId(folder_id)}) if not folder: return False, "Folder not found" # Ownership check (M-H2) if user_id and folder.get("created_by") != user_id: return False, "Permission denied" # Get all descendants descendants = await Folder.get_descendants(folder_id) all_folder_ids = [folder_id] + [desc["_id"] for desc in descendants] # Remove folder references from all personas for fid in all_folder_ids: await db.personas.update_many( {"folder_ids": fid}, {"$pull": {"folder_ids": fid}, "$set": {"updated_at": datetime.now(timezone.utc)}} ) # Delete all folders in the hierarchy object_ids = [ObjectId(fid) for fid in all_folder_ids] result = await db.folders.delete_many({"_id": {"$in": object_ids}}) return result.deleted_count > 0, f"Deleted {result.deleted_count} folders" except Exception as e: logger.error(f"Error deleting folder hierarchy {folder_id}: {e}") return False, f"Error deleting folder: {str(e)}"