asyncpg requires datetime instances for TIMESTAMPTZ columns, not strings. Added _parse_dt() helper that converts ISO strings (with or without tz) to timezone-aware datetime, falling back to NOW() if the value is missing. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
193 lines
9.2 KiB
Python
193 lines
9.2 KiB
Python
"""
|
|
One-time migration: import existing JSON file data into PostgreSQL.
|
|
|
|
Run inside the container:
|
|
python -m server.db.migrate_json
|
|
|
|
Or from the project root:
|
|
cd backend && python -m server.db.migrate_json
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _parse_dt(value) -> datetime | None:
|
|
"""Parse an ISO timestamp string (or None) into a timezone-aware datetime."""
|
|
if not value:
|
|
return None
|
|
try:
|
|
dt = datetime.fromisoformat(value)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
async def migrate():
|
|
# Import here to avoid circular issues at module level
|
|
from ..config_runtime import server_config
|
|
from ..db.pool import init_pool, get_pool
|
|
|
|
dsn = server_config.DATABASE_URL
|
|
if not dsn:
|
|
logger.error("DATABASE_URL not set — nothing to migrate")
|
|
sys.exit(1)
|
|
|
|
await init_pool(dsn)
|
|
pool = get_pool()
|
|
|
|
async with pool.acquire() as conn:
|
|
# ── Users ─────────────────────────────────────────────────────────────
|
|
users_file = server_config.USERS_FILE
|
|
if os.path.exists(users_file):
|
|
with open(users_file) as f:
|
|
users = json.load(f)
|
|
count = 0
|
|
now = datetime.now(timezone.utc)
|
|
for uid, u in users.items():
|
|
await conn.execute('''
|
|
INSERT INTO users (id, email, name, role, active, created_at, last_seen_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
|
ON CONFLICT (id) DO NOTHING
|
|
''', uid, u.get('email', ''), u.get('name', ''),
|
|
u.get('role', 'user'), u.get('active', True),
|
|
_parse_dt(u.get('created')) or now,
|
|
_parse_dt(u.get('last_seen')) or now)
|
|
count += 1
|
|
logger.info(f"Migrated {count} users")
|
|
|
|
# ── Clients ────────────────────────────────────────────────────────────
|
|
clients_file = server_config.CLIENTS_FILE
|
|
if os.path.exists(clients_file):
|
|
with open(clients_file) as f:
|
|
clients = json.load(f)
|
|
now = datetime.now(timezone.utc)
|
|
for c in clients:
|
|
await conn.execute('''
|
|
INSERT INTO clients (id, name, has_custom_dropdowns, created_at)
|
|
VALUES ($1, $2, $3, $4)
|
|
ON CONFLICT (id) DO NOTHING
|
|
''', c['id'], c['name'], c.get('hasCustomDropdowns', False),
|
|
_parse_dt(c.get('created')) or now)
|
|
logger.info(f"Migrated {len(clients)} clients")
|
|
|
|
# ── Global dropdowns ───────────────────────────────────────────────────
|
|
dropdowns_file = server_config.DROPDOWNS_FILE
|
|
if os.path.exists(dropdowns_file):
|
|
with open(dropdowns_file) as f:
|
|
categories = json.load(f)
|
|
# Delete existing global and re-insert (idempotent)
|
|
await conn.execute("DELETE FROM dropdown_categories WHERE client_id IS NULL")
|
|
for cat in categories:
|
|
await conn.execute('''
|
|
INSERT INTO dropdown_categories (client_id, name, status, media_types)
|
|
VALUES (NULL, $1, $2, $3)
|
|
''', cat['name'], cat.get('status', 'Active'), cat.get('mediaTypes', []))
|
|
logger.info(f"Migrated {len(categories)} global categories")
|
|
|
|
# ── Per-client dropdowns ───────────────────────────────────────────────
|
|
client_dd_dir = server_config.CLIENTS_DROPDOWNS_DIR
|
|
if os.path.isdir(client_dd_dir):
|
|
for fname in os.listdir(client_dd_dir):
|
|
if not fname.endswith('.json') or '_export' in fname:
|
|
continue
|
|
client_id = fname[:-5]
|
|
with open(os.path.join(client_dd_dir, fname)) as f:
|
|
cats = json.load(f)
|
|
await conn.execute("DELETE FROM dropdown_categories WHERE client_id = $1", client_id)
|
|
for cat in cats:
|
|
await conn.execute('''
|
|
INSERT INTO dropdown_categories (client_id, name, status, media_types)
|
|
VALUES ($1, $2, $3, $4)
|
|
''', client_id, cat['name'], cat.get('status', 'Active'), cat.get('mediaTypes', []))
|
|
logger.info(f"Migrated {len(cats)} categories for client {client_id}")
|
|
|
|
# ── Global export template ─────────────────────────────────────────────
|
|
tpl_file = server_config.EXPORT_TEMPLATE_FILE
|
|
if os.path.exists(tpl_file):
|
|
with open(tpl_file) as f:
|
|
tpl = json.load(f)
|
|
await conn.execute('''
|
|
INSERT INTO export_templates (scope, columns) VALUES ('global', $1)
|
|
ON CONFLICT (scope) DO UPDATE SET columns = $1, updated_at = NOW()
|
|
''', tpl)
|
|
logger.info("Migrated global export template")
|
|
|
|
# ── Per-client export templates ────────────────────────────────────────
|
|
if os.path.isdir(client_dd_dir):
|
|
for fname in os.listdir(client_dd_dir):
|
|
if not fname.endswith('_export.json'):
|
|
continue
|
|
client_id = fname[:-len('_export.json')]
|
|
with open(os.path.join(client_dd_dir, fname)) as f:
|
|
tpl = json.load(f)
|
|
scope = f'client:{client_id}'
|
|
await conn.execute('''
|
|
INSERT INTO export_templates (scope, columns) VALUES ($1, $2)
|
|
ON CONFLICT (scope) DO UPDATE SET columns = $2, updated_at = NOW()
|
|
''', scope, tpl)
|
|
logger.info(f"Migrated export template for client {client_id}")
|
|
|
|
# ── Per-user export templates ──────────────────────────────────────────
|
|
user_tpl_dir = server_config.USER_EXPORT_TEMPLATES_DIR
|
|
if os.path.isdir(user_tpl_dir):
|
|
for fname in os.listdir(user_tpl_dir):
|
|
if not fname.endswith('.json'):
|
|
continue
|
|
user_id = fname[:-5]
|
|
with open(os.path.join(user_tpl_dir, fname)) as f:
|
|
tpl = json.load(f)
|
|
scope = f'user:{user_id}'
|
|
await conn.execute('''
|
|
INSERT INTO export_templates (scope, columns) VALUES ($1, $2)
|
|
ON CONFLICT (scope) DO UPDATE SET columns = $2, updated_at = NOW()
|
|
''', scope, tpl)
|
|
logger.info(f"Migrated export template for user {user_id}")
|
|
|
|
# ── Sheets ─────────────────────────────────────────────────────────────
|
|
metadata_file = os.path.join(server_config.DATA_DIR, 'sheets_metadata.json')
|
|
sheets_dir = server_config.SHEETS_DIR
|
|
if os.path.exists(metadata_file):
|
|
with open(metadata_file) as f:
|
|
meta = json.load(f)
|
|
total = 0
|
|
for user_id, user_sheets in meta.items():
|
|
for sheet_meta in user_sheets:
|
|
sid = sheet_meta['id']
|
|
# Build safe filename (mirror manager.py logic)
|
|
import re
|
|
safe_uid = re.sub(r'[^a-zA-Z0-9_\-]', '_', user_id)
|
|
data_file = os.path.join(sheets_dir, f"{safe_uid}_{sid}.json")
|
|
data = []
|
|
if os.path.exists(data_file):
|
|
with open(data_file) as f:
|
|
data = json.load(f)
|
|
|
|
client_id = sheet_meta.get('client_id') or None
|
|
now = datetime.now(timezone.utc)
|
|
await conn.execute('''
|
|
INSERT INTO sheets
|
|
(id, user_id, name, client_id, data, item_count, created_at, modified_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
ON CONFLICT (id) DO NOTHING
|
|
''', sid, user_id, sheet_meta.get('name', 'Untitled'),
|
|
client_id, data, len(data),
|
|
_parse_dt(sheet_meta.get('created')) or now,
|
|
_parse_dt(sheet_meta.get('modified')) or now)
|
|
total += 1
|
|
logger.info(f"Migrated {total} sheets")
|
|
|
|
logger.info("Migration complete!")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(migrate())
|