forge/backend/scripts/reimport_storage.py

109 lines
3.3 KiB
Python

#!/usr/bin/env python3
"""
Re-import existing files from storage directory into database
"""
import os
import sys
from pathlib import Path
from datetime import datetime
import mimetypes
sys.path.insert(0, '/app')
from app.database import SessionLocal
from app.models.user import User
from app.models.asset import Asset
def get_file_type(mime_type):
"""Determine file type from MIME type"""
if not mime_type:
return 'other'
if mime_type.startswith('image/'):
return 'image'
if mime_type.startswith('video/'):
return 'video'
if mime_type.startswith('audio/'):
return 'audio'
if mime_type.startswith('text/') or 'document' in mime_type:
return 'document'
return 'other'
def reimport_files():
db = SessionLocal()
try:
# Get or create default user
user = db.query(User).filter(User.email == "test@forge.ai").first()
if not user:
user = User(
email="test@forge.ai",
name="Test User",
is_active=True
)
db.add(user)
db.commit()
db.refresh(user)
print(f"✓ Created user: {user.email}")
storage_path = "/app/storage"
imported = 0
skipped = 0
# Scan all subdirectories
for subdir in ['images', 'videos', 'audio', 'audios', 'documents']:
dir_path = os.path.join(storage_path, subdir)
if not os.path.exists(dir_path):
continue
print(f"\n📁 Scanning {subdir}/...")
for filename in os.listdir(dir_path):
file_path = os.path.join(dir_path, filename)
if not os.path.isfile(file_path):
continue
# Check if already exists
existing = db.query(Asset).filter(Asset.file_path == file_path).first()
if existing:
skipped += 1
continue
# Get file info
file_stat = os.stat(file_path)
mime_type, _ = mimetypes.guess_type(filename)
file_type = get_file_type(mime_type)
# Create asset record
asset = Asset(
user_id=user.id,
original_filename=filename,
stored_filename=filename,
file_path=file_path,
file_type=file_type,
mime_type=mime_type or 'application/octet-stream',
created_at=datetime.fromtimestamp(file_stat.st_ctime)
)
db.add(asset)
imported += 1
if imported % 50 == 0:
db.commit()
print(f" Imported {imported} files...")
db.commit()
print(f"\n✅ Import complete!")
print(f" Imported: {imported} files")
print(f" Skipped: {skipped} files (already in database)")
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
finally:
db.close()
if __name__ == "__main__":
reimport_files()