Some checks failed
Deploy to Production / deploy (push) Failing after 0s
- Replace cyan/violet design tokens with warm dark slate + orange (#E89B3C) palette - Add Space Grotesk display font; new utilities: .outline-display, .orange-band, .corner-card, .persona-orb - New brand components: Logo (hexagonal SVG), Header (pill nav + glass blur), Footer (4-col), PublicLayout, AppLayout, UserDropdown - Rewrite Index.tsx as full sales funnel: Hero → Stats → Orange band → How it works → Pricing (API) → FAQ → Final CTA - Rewrite Dashboard.tsx with real API data: credits balance, MTD spend, personas count, focus groups count, active tasks, recent transactions - Rewrite auth pages (Login, Register, VerifyEmail, NotFound, Billing) with two-column orange-panel layout - Replace hardcoded mock numbers in Dashboard with billingApi / personasApi / focusGroupsApi / usageApi calls - Delete legacy components: Navigation.tsx, Hero.tsx, FeatureCard.tsx - Add nested layout routing in App.tsx: PublicLayout for guests, AppLayout for protected routes - Color sweep inner pages: replace all purple-500/600 with primary token - Purge all semblance / Oliver / optical-dev references; rename semblance_app_documentation.md → cohorta_app_documentation.md; update backend scripts to cohorta_db Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
246 lines
No EOL
8.9 KiB
Python
Executable file
246 lines
No EOL
8.9 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Legacy Folder Migration Script
|
|
|
|
This script migrates legacy folders to be compatible with the hierarchy system
|
|
by adding missing fields that are required for drag & drop functionality.
|
|
|
|
Usage:
|
|
python migrate_legacy_folders.py --dry-run # Preview changes
|
|
python migrate_legacy_folders.py # Execute migration
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import argparse
|
|
from datetime import datetime, timezone
|
|
from motor.motor_asyncio import AsyncIOMotorClient
|
|
import os
|
|
import logging
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
async def get_db_connection():
|
|
"""Get database connection using the same logic as the app."""
|
|
mongo_user = os.environ.get('MONGO_USER')
|
|
mongo_pass = os.environ.get('MONGO_PASS')
|
|
mongo_host = os.environ.get('MONGO_HOST', 'localhost')
|
|
mongo_port = os.environ.get('MONGO_PORT', '27017')
|
|
|
|
# Try with standard credentials first
|
|
standard_credentials = [
|
|
{"user": "admin", "pass": "admin", "db": "admin"},
|
|
{"user": "mongodb", "pass": "mongodb", "db": "admin"},
|
|
{"user": "root", "pass": "root", "db": "admin"},
|
|
{"user": "user", "pass": "pass", "db": "admin"}
|
|
]
|
|
|
|
# Try each set of standard credentials
|
|
for creds in standard_credentials:
|
|
try:
|
|
uri = f"mongodb://{creds['user']}:{creds['pass']}@{mongo_host}:{mongo_port}/cohorta_db?authSource={creds['db']}"
|
|
motor_client = AsyncIOMotorClient(uri, serverSelectionTimeoutMS=2000)
|
|
database = motor_client.cohorta_db
|
|
# Test the connection
|
|
await database.command('ping')
|
|
logger.info(f"Connected to MongoDB with credentials: {creds['user']}")
|
|
return motor_client, database
|
|
except Exception as e:
|
|
continue
|
|
|
|
# Try without authentication
|
|
try:
|
|
motor_client = AsyncIOMotorClient(f'mongodb://{mongo_host}:{mongo_port}', serverSelectionTimeoutMS=5000)
|
|
database = motor_client.cohorta_db
|
|
await database.command('ping')
|
|
# Test write access
|
|
test_result = await database.test_collection.insert_one({"test": "migration_test"})
|
|
await database.test_collection.delete_one({"_id": test_result.inserted_id})
|
|
logger.info("Connected to MongoDB without authentication")
|
|
return motor_client, database
|
|
except Exception as e:
|
|
logger.error(f"Could not connect without auth: {e}")
|
|
|
|
# Try with environment variables
|
|
if mongo_user and mongo_pass:
|
|
try:
|
|
uri = f"mongodb://{mongo_user}:{mongo_pass}@{mongo_host}:{mongo_port}/cohorta_db?authSource=admin"
|
|
motor_client = AsyncIOMotorClient(uri, serverSelectionTimeoutMS=5000)
|
|
database = motor_client.cohorta_db
|
|
await database.command('ping')
|
|
logger.info(f"Connected to MongoDB with env credentials: {mongo_user}")
|
|
return motor_client, database
|
|
except Exception as e:
|
|
logger.error(f"Failed with environment credentials: {e}")
|
|
|
|
raise Exception("Could not establish database connection")
|
|
|
|
async def find_user_id(database, username):
|
|
"""Find user ID by username."""
|
|
try:
|
|
user = await database.users.find_one({"username": username})
|
|
if user:
|
|
return str(user["_id"])
|
|
else:
|
|
logger.warning(f"User '{username}' not found in database")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error finding user '{username}': {e}")
|
|
return None
|
|
|
|
async def find_legacy_folders(database):
|
|
"""Find folders that are missing the level field."""
|
|
try:
|
|
# Query for folders that don't have the 'level' field
|
|
cursor = database.folders.find({"level": {"$exists": False}})
|
|
folders = await cursor.to_list(length=None)
|
|
return folders
|
|
except Exception as e:
|
|
logger.error(f"Error finding legacy folders: {e}")
|
|
return []
|
|
|
|
async def migrate_folder(database, folder, user_id, dry_run=False):
|
|
"""Migrate a single folder to add missing hierarchy fields."""
|
|
folder_id = folder["_id"]
|
|
folder_name = folder.get("name", "Unknown")
|
|
|
|
# Prepare update fields
|
|
update_fields = {}
|
|
|
|
# Add level field (make all legacy folders root level)
|
|
if "level" not in folder:
|
|
update_fields["level"] = 0
|
|
logger.info(f" Adding level: 0")
|
|
|
|
# Add parent_folder_id field if missing
|
|
if "parent_folder_id" not in folder:
|
|
update_fields["parent_folder_id"] = None
|
|
logger.info(f" Adding parent_folder_id: None")
|
|
|
|
# Add created_by field if missing and we have a user_id
|
|
if "created_by" not in folder and user_id:
|
|
update_fields["created_by"] = user_id
|
|
logger.info(f" Adding created_by: {user_id}")
|
|
|
|
# Add created_at field if missing
|
|
if "created_at" not in folder:
|
|
update_fields["created_at"] = datetime.now(timezone.utc)
|
|
logger.info(f" Adding created_at: {update_fields['created_at']}")
|
|
|
|
# Add updated_at field
|
|
update_fields["updated_at"] = datetime.now(timezone.utc)
|
|
|
|
if not update_fields:
|
|
logger.info(f" No updates needed for folder '{folder_name}'")
|
|
return True
|
|
|
|
if dry_run:
|
|
logger.info(f" [DRY RUN] Would update folder '{folder_name}' with: {update_fields}")
|
|
return True
|
|
|
|
try:
|
|
# Update the folder
|
|
result = await database.folders.update_one(
|
|
{"_id": folder_id},
|
|
{"$set": update_fields}
|
|
)
|
|
|
|
if result.modified_count > 0:
|
|
logger.info(f" Successfully updated folder '{folder_name}'")
|
|
return True
|
|
else:
|
|
logger.warning(f" No changes made to folder '{folder_name}' (already up to date)")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f" Error updating folder '{folder_name}': {e}")
|
|
return False
|
|
|
|
async def migrate_legacy_folders(dry_run=False):
|
|
"""Main migration function."""
|
|
client = None
|
|
try:
|
|
# Connect to database
|
|
logger.info("Connecting to MongoDB...")
|
|
client, database = await get_db_connection()
|
|
|
|
# Look up user ID for 'user'
|
|
logger.info("Looking up user ID for username 'user'...")
|
|
user_id = await find_user_id(database, 'user')
|
|
if user_id:
|
|
logger.info(f"Found user ID: {user_id}")
|
|
else:
|
|
logger.warning("Could not find user 'user' - will skip created_by field")
|
|
|
|
# Find legacy folders
|
|
logger.info("Searching for legacy folders...")
|
|
legacy_folders = await find_legacy_folders(database)
|
|
|
|
if not legacy_folders:
|
|
logger.info("No legacy folders found. All folders are up to date!")
|
|
return True
|
|
|
|
logger.info(f"Found {len(legacy_folders)} legacy folder(s) to migrate:")
|
|
|
|
# Display summary of folders to be migrated
|
|
for folder in legacy_folders:
|
|
folder_name = folder.get("name", "Unknown")
|
|
logger.info(f" - {folder_name} (ID: {folder['_id']})")
|
|
|
|
if dry_run:
|
|
logger.info("\n--- DRY RUN MODE - No actual changes will be made ---")
|
|
else:
|
|
logger.info("\n--- EXECUTING MIGRATION ---")
|
|
|
|
# Migrate each folder
|
|
success_count = 0
|
|
for i, folder in enumerate(legacy_folders, 1):
|
|
folder_name = folder.get("name", "Unknown")
|
|
logger.info(f"\n[{i}/{len(legacy_folders)}] Processing folder: {folder_name}")
|
|
|
|
success = await migrate_folder(database, folder, user_id, dry_run)
|
|
if success:
|
|
success_count += 1
|
|
else:
|
|
logger.error(f"Failed to migrate folder: {folder_name}")
|
|
|
|
# Summary
|
|
logger.info(f"\n--- MIGRATION COMPLETE ---")
|
|
logger.info(f"Successfully processed: {success_count}/{len(legacy_folders)} folders")
|
|
|
|
if dry_run:
|
|
logger.info("Run without --dry-run flag to execute the actual migration.")
|
|
else:
|
|
logger.info("Migration completed successfully!")
|
|
|
|
return success_count == len(legacy_folders)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Migration failed: {e}")
|
|
return False
|
|
finally:
|
|
if client:
|
|
client.close()
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Migrate legacy folders for drag & drop compatibility')
|
|
parser.add_argument('--dry-run', action='store_true',
|
|
help='Preview changes without executing them')
|
|
|
|
args = parser.parse_args()
|
|
|
|
logger.info("=== Legacy Folder Migration Script ===")
|
|
logger.info(f"Mode: {'DRY RUN' if args.dry_run else 'EXECUTE'}")
|
|
|
|
# Run the migration
|
|
success = asyncio.run(migrate_legacy_folders(dry_run=args.dry_run))
|
|
|
|
if success:
|
|
sys.exit(0)
|
|
else:
|
|
logger.error("Migration failed!")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |