Migrate storage from JSON files to PostgreSQL (asyncpg)

- Add asyncpg connection pool (db/pool.py) with JSONB codec registration
- Add schema.sql with users, clients, dropdown_categories, export_templates, sheets tables
- Add migrate_json.py one-time migration script for existing JSON data
- Rewrite user_store, sheets/manager, api/clients, api/dropdowns, api/export as async DB-backed
- Update all callers (auth, sheets, admin, ai_command, export) to await async functions
- Add postgres:16-alpine service to docker-compose with named volume and health check
- App container depends_on postgres; DATABASE_URL injected via env
- Schema applied automatically on startup; global categories seeded if DB is empty

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Vadym Samoilenko 2026-03-23 19:51:37 +00:00
parent b670505956
commit 8da149b84e
18 changed files with 725 additions and 467 deletions

View file

@ -24,6 +24,9 @@ python-docx>=0.8.11
openpyxl>=3.1.0
xlrd>=2.0.1
# Database
asyncpg>=0.29.0
# Data
pandas>=2.0.0
numpy>=1.24.0

View file

@ -1,19 +1,20 @@
"""
Admin API user management and dropdown Excel upload.
Admin API user management, dropdown Excel upload, export templates.
All routes require admin role.
"""
import json
import logging
import os
from quart import Blueprint, jsonify, request
from ..auth.middleware import admin_required
from ..auth.user_store import list_users, set_role, set_active
from ..api.dropdowns import save_dropdowns, parse_excel_dropdowns, detect_excel_mapping
from ..api.clients import load_clients, get_client_by_id
from ..api.export import detect_csv_template, load_export_template, save_export_template, _client_template_path, INTERNAL_FIELDS
from ..api.clients import load_clients, get_client_by_id, set_client_custom_dropdowns
from ..api.export import (
detect_csv_template, load_export_template, save_export_template,
delete_export_template, has_export_template, INTERNAL_FIELDS,
)
logger = logging.getLogger(__name__)
@ -23,7 +24,7 @@ admin_bp = Blueprint('admin', __name__, url_prefix='/api/admin')
@admin_bp.route('/users', methods=['GET'])
@admin_required
async def get_users():
return jsonify({'users': list_users()})
return jsonify({'users': await list_users()})
@admin_bp.route('/users/<user_id>', methods=['PATCH'])
@ -33,12 +34,12 @@ async def update_user(user_id: str):
user = None
if 'role' in body:
user = set_role(user_id, body['role'])
user = await set_role(user_id, body['role'])
if user is None:
return jsonify({'error': 'invalid_role_or_not_found'}), 400
if 'active' in body:
user = set_active(user_id, bool(body['active']))
user = await set_active(user_id, bool(body['active']))
if user is None:
return jsonify({'error': 'not_found'}), 404
@ -50,7 +51,6 @@ def _read_xlsx_file(file) -> bytes:
def _extract_mapping(form) -> dict | None:
"""Extract mapping override from multipart form fields (name_col, status_col, media_col)."""
try:
if 'name_col' in form and 'status_col' in form and 'media_col' in form:
return {
@ -64,7 +64,6 @@ def _extract_mapping(form) -> dict | None:
async def _parse_uploaded_xlsx(files, form=None) -> tuple[list, str | None]:
"""Returns (categories, error_message). error_message is None on success."""
file = files.get('file')
if not file:
return [], 'no_file'
@ -85,7 +84,6 @@ async def _parse_uploaded_xlsx(files, form=None) -> tuple[list, str | None]:
@admin_bp.route('/dropdowns/detect-mapping', methods=['POST'])
@admin_required
async def detect_mapping():
"""Detect column mapping from an uploaded .xlsx without saving. Returns headers + mapping."""
files = await request.files
file = files.get('file')
if not file:
@ -104,13 +102,12 @@ async def detect_mapping():
@admin_bp.route('/dropdowns/upload', methods=['POST'])
@admin_required
async def upload_dropdowns():
"""Upload a new .xlsx to update global dropdown categories."""
files = await request.files
form = await request.form
categories, err = await _parse_uploaded_xlsx(files, form)
if err:
return jsonify({'error': err}), 400
save_dropdowns(categories)
await save_dropdowns(categories)
active_count = sum(1 for c in categories if c['status'] == 'Active')
return jsonify({'success': True, 'total': len(categories), 'active': active_count,
'archived': len(categories) - active_count})
@ -119,7 +116,6 @@ async def upload_dropdowns():
@admin_bp.route('/dropdowns/preview', methods=['POST'])
@admin_required
async def preview_dropdowns():
"""Preview parsed categories from an uploaded file without saving."""
files = await request.files
form = await request.form
categories, err = await _parse_uploaded_xlsx(files, form)
@ -133,7 +129,6 @@ async def preview_dropdowns():
@admin_bp.route('/clients/<client_id>/dropdowns/detect-mapping', methods=['POST'])
@admin_required
async def detect_client_mapping(client_id: str):
"""Detect column mapping from a per-client .xlsx upload."""
files = await request.files
file = files.get('file')
if not file:
@ -152,25 +147,15 @@ async def detect_client_mapping(client_id: str):
@admin_bp.route('/clients/<client_id>/dropdowns/upload', methods=['POST'])
@admin_required
async def upload_client_dropdowns(client_id: str):
"""Upload a per-client .xlsx dropdown file. Falls back to global if not set."""
if not get_client_by_id(client_id):
if not await get_client_by_id(client_id):
return jsonify({'error': 'client_not_found'}), 404
files = await request.files
form = await request.form
categories, err = await _parse_uploaded_xlsx(files, form)
if err:
return jsonify({'error': err}), 400
save_dropdowns(categories, client_id=client_id)
# Mark client as having custom dropdowns
clients = load_clients()
for c in clients:
if c['id'] == client_id:
c['hasCustomDropdowns'] = True
break
from ..api.clients import _save_clients
_save_clients(clients)
await save_dropdowns(categories, client_id=client_id)
await set_client_custom_dropdowns(client_id, True)
active_count = sum(1 for c in categories if c['status'] == 'Active')
return jsonify({'success': True, 'total': len(categories), 'active': active_count,
'archived': len(categories) - active_count})
@ -179,7 +164,6 @@ async def upload_client_dropdowns(client_id: str):
@admin_bp.route('/clients/<client_id>/dropdowns/preview', methods=['POST'])
@admin_required
async def preview_client_dropdowns(client_id: str):
"""Preview per-client dropdown file without saving."""
files = await request.files
form = await request.form
categories, err = await _parse_uploaded_xlsx(files, form)
@ -191,18 +175,8 @@ async def preview_client_dropdowns(client_id: str):
@admin_bp.route('/clients/<client_id>/dropdowns', methods=['DELETE'])
@admin_required
async def delete_client_dropdowns(client_id: str):
"""Remove per-client dropdown override — reverts to global."""
from ..config_runtime import server_config
path = os.path.join(server_config.CLIENTS_DROPDOWNS_DIR, f"{client_id}.json")
if os.path.exists(path):
os.remove(path)
clients = load_clients()
for c in clients:
if c['id'] == client_id:
c['hasCustomDropdowns'] = False
break
from ..api.clients import _save_clients
_save_clients(clients)
await save_dropdowns([], client_id=client_id)
await set_client_custom_dropdowns(client_id, False)
return jsonify({'success': True})
@ -211,7 +185,8 @@ async def delete_client_dropdowns(client_id: str):
@admin_bp.route('/export-template', methods=['GET'])
@admin_required
async def get_global_export_template():
return jsonify({'template': load_export_template(), 'fields': INTERNAL_FIELDS})
template = await load_export_template()
return jsonify({'template': template, 'fields': INTERNAL_FIELDS})
@admin_bp.route('/export-template/detect', methods=['POST'])
@ -224,8 +199,7 @@ async def detect_global_export_template():
if not (file.filename or '').lower().endswith('.csv'):
return jsonify({'error': 'Only .csv files accepted'}), 400
try:
data = file.read()
result = detect_csv_template(data)
result = detect_csv_template(file.read())
result['fields'] = INTERNAL_FIELDS
return jsonify(result)
except Exception as e:
@ -239,31 +213,25 @@ async def save_global_export_template():
template = body.get('template')
if not template or not isinstance(template, list):
return jsonify({'error': 'invalid_template'}), 400
save_export_template(template)
await save_export_template(template)
return jsonify({'success': True, 'columns': len(template)})
@admin_bp.route('/export-template', methods=['DELETE'])
@admin_required
async def delete_global_export_template():
from ..config_runtime import server_config
if os.path.exists(server_config.EXPORT_TEMPLATE_FILE):
os.remove(server_config.EXPORT_TEMPLATE_FILE)
await delete_export_template()
return jsonify({'success': True})
@admin_bp.route('/clients/<client_id>/export-template', methods=['GET'])
@admin_required
async def get_client_export_template(client_id: str):
if not get_client_by_id(client_id):
if not await get_client_by_id(client_id):
return jsonify({'error': 'client_not_found'}), 404
path = _client_template_path(client_id)
has_custom = os.path.exists(path)
return jsonify({
'template': load_export_template(client_id),
'hasCustomTemplate': has_custom,
'fields': INTERNAL_FIELDS,
})
has_custom = await has_export_template(client_id=client_id)
template = await load_export_template(client_id=client_id)
return jsonify({'template': template, 'hasCustomTemplate': has_custom, 'fields': INTERNAL_FIELDS})
@admin_bp.route('/clients/<client_id>/export-template/detect', methods=['POST'])
@ -276,8 +244,7 @@ async def detect_client_export_template(client_id: str):
if not (file.filename or '').lower().endswith('.csv'):
return jsonify({'error': 'Only .csv files accepted'}), 400
try:
data = file.read()
result = detect_csv_template(data)
result = detect_csv_template(file.read())
result['fields'] = INTERNAL_FIELDS
return jsonify(result)
except Exception as e:
@ -287,20 +254,18 @@ async def detect_client_export_template(client_id: str):
@admin_bp.route('/clients/<client_id>/export-template', methods=['POST'])
@admin_required
async def save_client_export_template(client_id: str):
if not get_client_by_id(client_id):
if not await get_client_by_id(client_id):
return jsonify({'error': 'client_not_found'}), 404
body = await request.get_json() or {}
template = body.get('template')
if not template or not isinstance(template, list):
return jsonify({'error': 'invalid_template'}), 400
save_export_template(template, client_id=client_id)
await save_export_template(template, client_id=client_id)
return jsonify({'success': True, 'columns': len(template)})
@admin_bp.route('/clients/<client_id>/export-template', methods=['DELETE'])
@admin_required
async def delete_client_export_template(client_id: str):
path = _client_template_path(client_id)
if os.path.exists(path):
os.remove(path)
await delete_export_template(client_id=client_id)
return jsonify({'success': True})

View file

@ -1,6 +1,5 @@
"""
AI command API processes natural language commands against a sheet.
Port of the 'command' action from ac-helper/api.php using Gemini via aiohttp.
"""
import json
@ -21,7 +20,6 @@ logger = logging.getLogger(__name__)
ai_bp = Blueprint('ai', __name__, url_prefix='/api/sheets')
# Speech-to-text correction map
SPEECH_CORRECTIONS = {
'delivery balls': 'deliverables',
'delivery ball': 'deliverable',
@ -58,8 +56,8 @@ def _preprocess(command: str) -> str:
return cmd
def _build_hierarchy_rules(client_id: str = None) -> str:
categories = _load_dropdowns(client_id)
async def _build_hierarchy_rules(client_id: str = None) -> str:
categories = await _load_dropdowns(client_id)
lines = []
for cat in categories:
if cat.get('status') != 'Active':
@ -102,14 +100,14 @@ async def run_command(sheet_id: str):
if not raw_command:
return jsonify({'error': 'empty_command'}), 400
data = load_sheet_data(user_id, sheet_id)
data = await load_sheet_data(user_id, sheet_id)
if data is None:
return jsonify({'error': 'sheet_not_found'}), 404
command = _preprocess(raw_command)
template = _load_prompt_template()
client_id = get_sheet_client_id(user_id, sheet_id)
hierarchy = _build_hierarchy_rules(client_id)
client_id = await get_sheet_client_id(user_id, sheet_id)
hierarchy = await _build_hierarchy_rules(client_id)
prompt = template.format(
current_date=date.today().isoformat(),
@ -154,7 +152,7 @@ async def run_command(sheet_id: str):
item.setdefault('Status', 'Booked')
item.setdefault('Quantity', 1)
data.append(item)
update_sheet(user_id, sheet_id, data)
await update_sheet(user_id, sheet_id, data)
return jsonify({'success': True, 'operation': 'create', 'count': len(items), 'data': data})
elif operation == 'update':
@ -165,7 +163,7 @@ async def run_command(sheet_id: str):
if not target_ids or row.get('Number') in target_ids:
row.update(values)
count += 1
update_sheet(user_id, sheet_id, data)
await update_sheet(user_id, sheet_id, data)
return jsonify({'success': True, 'operation': 'update', 'count': count, 'data': data})
elif operation == 'batch_update':
@ -179,7 +177,7 @@ async def run_command(sheet_id: str):
row.update(vals)
count += 1
break
update_sheet(user_id, sheet_id, data)
await update_sheet(user_id, sheet_id, data)
return jsonify({'success': True, 'operation': 'batch_update', 'count': count, 'data': data})
elif operation == 'question':

View file

@ -7,7 +7,7 @@ from quart import Blueprint, jsonify, request
from ..auth.msal_auth import msal_auth
from ..auth.middleware import auth_required, get_current_user
from ..auth.user_store import upsert_user
from ..auth.user_store import upsert_user, get_user
logger = logging.getLogger(__name__)
@ -31,7 +31,7 @@ async def validate_token():
if not user_info:
return jsonify({'valid': False, 'error': 'invalid_token'}), 401
stored = upsert_user(user_info['oid'], user_info.get('preferred_username', ''), user_info.get('name', ''))
stored = await upsert_user(user_info['oid'], user_info.get('preferred_username', ''), user_info.get('name', ''))
return jsonify({
'valid': True,
'user': {
@ -50,9 +50,8 @@ async def validate_token():
@auth_required
async def me():
"""Return current user profile including role."""
from ..auth.user_store import get_user as get_stored_user
user = await get_current_user()
stored = get_stored_user(user['oid']) or {}
stored = await get_user(user['oid']) or {}
return jsonify({
'id': user['oid'],
'email': user.get('preferred_username'),

View file

@ -1,52 +1,58 @@
"""
Client management API.
Clients group sheets and have their own dropdown (category/media) hierarchy.
Client management API PostgreSQL-backed.
"""
import json
import logging
import os
import random
import time
import random
from datetime import datetime, timezone
from quart import Blueprint, jsonify, request
from ..auth.middleware import auth_required, admin_required
from ..config_runtime import server_config
from ..db.pool import get_pool
logger = logging.getLogger(__name__)
clients_bp = Blueprint('clients', __name__, url_prefix='/api/clients')
def load_clients() -> list:
path = server_config.CLIENTS_FILE
if not os.path.exists(path):
return []
try:
with open(path, 'r') as f:
return json.load(f)
except Exception:
return []
async def load_clients() -> list:
pool = get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch('SELECT * FROM clients ORDER BY name')
return [_row_to_dict(r) for r in rows]
def _save_clients(clients: list):
with open(server_config.CLIENTS_FILE, 'w') as f:
json.dump(clients, f, indent=2)
async def get_client_by_id(client_id: str) -> dict | None:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow('SELECT * FROM clients WHERE id = $1', client_id)
return _row_to_dict(row) if row else None
def get_client_by_id(client_id: str) -> dict | None:
for c in load_clients():
if c['id'] == client_id:
return c
return None
async def set_client_custom_dropdowns(client_id: str, value: bool):
pool = get_pool()
async with pool.acquire() as conn:
await conn.execute(
'UPDATE clients SET has_custom_dropdowns = $2 WHERE id = $1',
client_id, value
)
def _row_to_dict(row) -> dict:
return {
'id': row['id'],
'name': row['name'],
'hasCustomDropdowns': row['has_custom_dropdowns'],
'created': row['created_at'].isoformat() if row['created_at'] else None,
}
@clients_bp.route('', methods=['GET'])
@auth_required
async def list_clients():
return jsonify({'clients': load_clients()})
return jsonify({'clients': await load_clients()})
@clients_bp.route('', methods=['POST'])
@ -58,30 +64,24 @@ async def create_client():
return jsonify({'error': 'name_required', 'message': 'Client name is required'}), 400
client_id = f"client_{int(time.time())}{random.randint(100, 999)}"
client = {
'id': client_id,
'name': name,
'created': datetime.now(timezone.utc).isoformat(),
'hasCustomDropdowns': False,
}
clients = load_clients()
clients.append(client)
_save_clients(clients)
return jsonify({'client': client}), 201
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow('''
INSERT INTO clients (id, name, has_custom_dropdowns)
VALUES ($1, $2, FALSE)
RETURNING *
''', client_id, name)
return jsonify({'client': _row_to_dict(row)}), 201
@clients_bp.route('/<client_id>', methods=['DELETE'])
@admin_required
async def delete_client(client_id: str):
clients = load_clients()
clients = [c for c in clients if c['id'] != client_id]
_save_clients(clients)
# Remove client-specific dropdown file if present
dropdown_path = os.path.join(server_config.CLIENTS_DROPDOWNS_DIR, f"{client_id}.json")
if os.path.exists(dropdown_path):
os.remove(dropdown_path)
pool = get_pool()
async with pool.acquire() as conn:
# Cascades to dropdown_categories via FK; export templates by scope
await conn.execute('DELETE FROM clients WHERE id = $1', client_id)
await conn.execute("DELETE FROM export_templates WHERE scope = $1", f'client:{client_id}')
return jsonify({'success': True})
@ -89,15 +89,13 @@ async def delete_client(client_id: str):
@admin_required
async def update_client(client_id: str):
body = await request.get_json() or {}
clients = load_clients()
updated = None
for c in clients:
if c['id'] == client_id:
if 'name' in body:
c['name'] = body['name'].strip()
updated = c
break
if not updated:
return jsonify({'error': 'not_found'}), 404
_save_clients(clients)
return jsonify({'client': updated})
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow('SELECT * FROM clients WHERE id = $1', client_id)
if not row:
return jsonify({'error': 'not_found'}), 404
name = body.get('name', row['name']).strip() or row['name']
row = await conn.fetchrow(
'UPDATE clients SET name = $2 WHERE id = $1 RETURNING *', client_id, name
)
return jsonify({'client': _row_to_dict(row)})

View file

@ -1,15 +1,12 @@
"""
Dropdown data API category / media type hierarchy.
Data is loaded from dropdowns.json (seeded from Excel, updatable by admin).
Data stored in PostgreSQL. Seeded from embedded SEED_CATEGORIES if DB is empty.
"""
import json
import logging
import os
from quart import Blueprint, jsonify, request
from ..config_runtime import server_config
from ..db.pool import get_pool
logger = logging.getLogger(__name__)
@ -144,50 +141,62 @@ SEED_CATEGORIES = [
]
def _load_dropdowns(client_id: str = None) -> list:
async def _load_dropdowns(client_id: str = None) -> list:
"""
Load dropdowns. If client_id is given, try the per-client file first,
then fall back to the global file, then SEED_CATEGORIES.
All files use the same schema: [{name, status, mediaTypes}].
Load categories from DB.
If client_id is given, tries per-client rows first, falls back to global.
"""
if client_id:
client_path = os.path.join(server_config.CLIENTS_DROPDOWNS_DIR, f"{client_id}.json")
if os.path.exists(client_path):
try:
with open(client_path, 'r') as f:
return json.load(f)
except Exception:
pass
pool = get_pool()
async with pool.acquire() as conn:
if client_id:
rows = await conn.fetch(
'SELECT name, status, media_types FROM dropdown_categories WHERE client_id = $1 ORDER BY name',
client_id
)
if rows:
return [_row_to_cat(r) for r in rows]
path = server_config.DROPDOWNS_FILE
if os.path.exists(path):
try:
with open(path, 'r') as f:
return json.load(f)
except Exception:
pass
return SEED_CATEGORIES
# Global (client_id IS NULL)
rows = await conn.fetch(
'SELECT name, status, media_types FROM dropdown_categories WHERE client_id IS NULL ORDER BY name'
)
return [_row_to_cat(r) for r in rows]
def save_dropdowns(categories: list, client_id: str = None):
"""Save dropdowns. Pass client_id to save a per-client override."""
if client_id:
path = os.path.join(server_config.CLIENTS_DROPDOWNS_DIR, f"{client_id}.json")
else:
path = server_config.DROPDOWNS_FILE
with open(path, 'w') as f:
json.dump(categories, f, indent=2)
async def save_dropdowns(categories: list, client_id: str = None):
"""Replace all categories for the given scope (global if client_id is None)."""
pool = get_pool()
async with pool.acquire() as conn:
async with conn.transaction():
if client_id:
await conn.execute(
'DELETE FROM dropdown_categories WHERE client_id = $1', client_id
)
else:
await conn.execute(
'DELETE FROM dropdown_categories WHERE client_id IS NULL'
)
for cat in categories:
await conn.execute('''
INSERT INTO dropdown_categories (client_id, name, status, media_types)
VALUES ($1, $2, $3, $4)
''', client_id, cat['name'], cat.get('status', 'Active'), cat.get('mediaTypes', []))
def _row_to_cat(row) -> dict:
return {
'name': row['name'],
'status': row['status'],
'mediaTypes': row['media_types'] if row['media_types'] else [],
}
# ── Sync helpers (file parsing — no DB involved) ────────────────────────────
def detect_excel_mapping(file_bytes: bytes) -> dict:
"""
Read the first row of an .xlsx and auto-detect column mapping for
name/status/media fields. Returns:
{
headers: [...], # all header strings from row 1
mapping: {name_col, status_col, media_col}, # 0-based indices
sample: [...] # up to 5 parsed rows using detected mapping
}
Read the first row of an .xlsx and auto-detect column mapping.
Returns: {headers, mapping: {name_col, status_col, media_col}, sample}
"""
import openpyxl
from io import BytesIO
@ -222,10 +231,7 @@ def detect_excel_mapping(file_bytes: bytes) -> dict:
def parse_excel_dropdowns(file_bytes: bytes, mapping: dict = None) -> list:
"""Parse an .xlsx file into [{name, status, mediaTypes}] list.
Default columns: A=Category name (0), E=Status (4), G=Media types (6).
Pass mapping={'name_col': int, 'status_col': int, 'media_col': int} to override.
"""
"""Parse an .xlsx into [{name, status, mediaTypes}]."""
import openpyxl
from io import BytesIO
wb = openpyxl.load_workbook(BytesIO(file_bytes))
@ -246,10 +252,12 @@ def parse_excel_dropdowns(file_bytes: bytes, mapping: dict = None) -> list:
return categories
# ── Routes ───────────────────────────────────────────────────────────────────
@dropdowns_bp.route('/categories', methods=['GET'])
async def get_categories():
client_id = request.args.get('client_id') or None
categories = _load_dropdowns(client_id)
categories = await _load_dropdowns(client_id)
active_only = request.args.get('active', 'true').lower() == 'true'
if active_only:
categories = [c for c in categories if c.get('status') == 'Active']
@ -260,4 +268,4 @@ async def get_categories():
async def get_all():
"""Full dropdown data including archived, for admin preview."""
client_id = request.args.get('client_id') or None
return jsonify({'categories': _load_dropdowns(client_id)})
return jsonify({'categories': await _load_dropdowns(client_id)})

View file

@ -1,19 +1,18 @@
"""
CSV export Activation Calendar format.
Supports custom export templates: client > user > global > built-in default.
Template data stored in PostgreSQL export_templates table.
"""
import csv
import io
import json
import logging
import os
from quart import Blueprint, make_response, jsonify, request
from ..auth.middleware import auth_required, get_user_id
from ..sheets.manager import load_sheet_data, get_sheet_client_id
from ..config_runtime import server_config
from ..db.pool import get_pool
logger = logging.getLogger(__name__)
@ -48,49 +47,77 @@ _DEFAULT_TEMPLATE = [
]
def _client_template_path(client_id: str) -> str:
return os.path.join(server_config.CLIENTS_DROPDOWNS_DIR, f'{client_id}_export.json')
def _user_template_path(user_id: str) -> str:
return os.path.join(server_config.USER_EXPORT_TEMPLATES_DIR, f'{user_id}.json')
def _load_json(path: str):
try:
with open(path) as f:
return json.load(f)
except Exception:
return None
def load_export_template(client_id: str = None, user_id: str = None) -> list:
async def load_export_template(client_id: str = None, user_id: str = None) -> list:
"""
Priority: client template user template global template built-in default.
"""
if client_id:
t = _load_json(_client_template_path(client_id))
if t:
return t
if user_id:
t = _load_json(_user_template_path(user_id))
if t:
return t
t = _load_json(server_config.EXPORT_TEMPLATE_FILE)
if t:
return t
pool = get_pool()
async with pool.acquire() as conn:
if client_id:
row = await conn.fetchrow(
'SELECT columns FROM export_templates WHERE scope = $1', f'client:{client_id}'
)
if row:
return row['columns']
if user_id:
row = await conn.fetchrow(
'SELECT columns FROM export_templates WHERE scope = $1', f'user:{user_id}'
)
if row:
return row['columns']
row = await conn.fetchrow(
"SELECT columns FROM export_templates WHERE scope = 'global'"
)
if row:
return row['columns']
return _DEFAULT_TEMPLATE
def save_export_template(template: list, client_id: str = None, user_id: str = None):
async def save_export_template(template: list, client_id: str = None, user_id: str = None):
if client_id:
path = _client_template_path(client_id)
scope = f'client:{client_id}'
elif user_id:
path = _user_template_path(user_id)
scope = f'user:{user_id}'
else:
path = server_config.EXPORT_TEMPLATE_FILE
with open(path, 'w') as f:
json.dump(template, f, indent=2)
scope = 'global'
pool = get_pool()
async with pool.acquire() as conn:
await conn.execute('''
INSERT INTO export_templates (scope, columns)
VALUES ($1, $2)
ON CONFLICT (scope) DO UPDATE SET columns = $2, updated_at = NOW()
''', scope, template)
async def delete_export_template(client_id: str = None, user_id: str = None):
if client_id:
scope = f'client:{client_id}'
elif user_id:
scope = f'user:{user_id}'
else:
scope = 'global'
pool = get_pool()
async with pool.acquire() as conn:
await conn.execute('DELETE FROM export_templates WHERE scope = $1', scope)
async def has_export_template(client_id: str = None, user_id: str = None) -> bool:
if client_id:
scope = f'client:{client_id}'
elif user_id:
scope = f'user:{user_id}'
else:
scope = 'global'
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow('SELECT 1 FROM export_templates WHERE scope = $1', scope)
return row is not None
def detect_csv_template(file_bytes: bytes) -> dict:
@ -153,12 +180,12 @@ def _build_csv(data: list, template: list) -> str:
@auth_required
async def export_csv(sheet_id: str):
user_id = get_user_id()
data = load_sheet_data(user_id, sheet_id)
data = await load_sheet_data(user_id, sheet_id)
if data is None:
return {'error': 'not_found'}, 404
client_id = get_sheet_client_id(user_id, sheet_id)
template = load_export_template(client_id=client_id, user_id=user_id)
client_id = await get_sheet_client_id(user_id, sheet_id)
template = await load_export_template(client_id=client_id, user_id=user_id)
csv_content = _build_csv(data, template)
response = await make_response(csv_content)
@ -176,9 +203,8 @@ user_export_bp = Blueprint('user_export', __name__, url_prefix='/api/export')
@auth_required
async def get_user_template():
user_id = get_user_id()
path = _user_template_path(user_id)
has_custom = os.path.exists(path)
template = load_export_template(user_id=user_id)
has_custom = await has_export_template(user_id=user_id)
template = await load_export_template(user_id=user_id)
return jsonify({'template': template, 'hasCustom': has_custom, 'fields': INTERNAL_FIELDS})
@ -207,7 +233,7 @@ async def save_user_template():
template = body.get('template')
if not template or not isinstance(template, list):
return jsonify({'error': 'invalid_template'}), 400
save_export_template(template, user_id=user_id)
await save_export_template(template, user_id=user_id)
return jsonify({'success': True, 'columns': len(template)})
@ -215,7 +241,5 @@ async def save_user_template():
@auth_required
async def delete_user_template():
user_id = get_user_id()
path = _user_template_path(user_id)
if os.path.exists(path):
os.remove(path)
await delete_export_template(user_id=user_id)
return jsonify({'success': True})

View file

@ -1,5 +1,5 @@
"""
Sheet CRUD API port of ac-helper api.php sheet management.
Sheet CRUD API PostgreSQL-backed.
All routes scoped to the authenticated user.
"""
@ -22,7 +22,7 @@ sheets_bp = Blueprint('sheets', __name__, url_prefix='/api/sheets')
@auth_required
async def list_sheets():
user_id = get_user_id()
sheets = get_user_sheets(user_id)
sheets = await get_user_sheets(user_id)
return jsonify({'sheets': sheets})
@ -34,18 +34,17 @@ async def create_new_sheet():
name = body.get('name', '')
data = body.get('data', [])
client_id = body.get('client_id', '')
sheet = create_sheet(user_id, name, data, client_id)
sheet = await create_sheet(user_id, name, data, client_id)
return jsonify({'sheet': sheet}), 201
@sheets_bp.route('/<sheet_id>/client', methods=['PATCH'])
@auth_required
async def update_sheet_client(sheet_id: str):
"""Update the client associated with an existing sheet."""
user_id = get_user_id()
body = await request.get_json() or {}
client_id = body.get('client_id', '')
set_sheet_client_id(user_id, sheet_id, client_id)
await set_sheet_client_id(user_id, sheet_id, client_id)
return jsonify({'success': True})
@ -53,7 +52,7 @@ async def update_sheet_client(sheet_id: str):
@auth_required
async def get_sheet(sheet_id: str):
user_id = get_user_id()
data = load_sheet_data(user_id, sheet_id)
data = await load_sheet_data(user_id, sheet_id)
if data is None:
return jsonify({'error': 'not_found'}), 404
return jsonify({'data': data})
@ -65,7 +64,7 @@ async def update_sheet_data(sheet_id: str):
user_id = get_user_id()
body = await request.get_json() or {}
data = body.get('data', [])
update_sheet(user_id, sheet_id, data)
await update_sheet(user_id, sheet_id, data)
return jsonify({'success': True})
@ -73,7 +72,7 @@ async def update_sheet_data(sheet_id: str):
@auth_required
async def delete_sheet_route(sheet_id: str):
user_id = get_user_id()
delete_sheet(user_id, sheet_id)
await delete_sheet(user_id, sheet_id)
return jsonify({'success': True})
@ -83,7 +82,7 @@ async def rename_sheet_route(sheet_id: str):
user_id = get_user_id()
body = await request.get_json() or {}
name = body.get('name', '')
success = rename_sheet(user_id, sheet_id, name)
success = await rename_sheet(user_id, sheet_id, name)
if not success:
return jsonify({'error': 'not_found'}), 404
return jsonify({'success': True})
@ -93,7 +92,7 @@ async def rename_sheet_route(sheet_id: str):
@auth_required
async def duplicate_sheet_route(sheet_id: str):
user_id = get_user_id()
sheet = duplicate_sheet(user_id, sheet_id)
sheet = await duplicate_sheet(user_id, sheet_id)
if sheet is None:
return jsonify({'error': 'not_found'}), 404
return jsonify({'sheet': sheet}), 201
@ -102,16 +101,12 @@ async def duplicate_sheet_route(sheet_id: str):
@sheets_bp.route('/<sheet_id>/import', methods=['POST'])
@auth_required
async def import_deliverables(sheet_id: str):
"""
Import a list of deliverables into an existing sheet.
Body: { "deliverables": [...], "mode": "append" | "replace" }
"""
user_id = get_user_id()
body = await request.get_json() or {}
incoming = body.get('deliverables', [])
mode = body.get('mode', 'append')
existing = load_sheet_data(user_id, sheet_id)
existing = await load_sheet_data(user_id, sheet_id)
if existing is None:
return jsonify({'error': 'not_found'}), 404
@ -121,11 +116,10 @@ async def import_deliverables(sheet_id: str):
row['Number'] = generate_next_id(base)
row.setdefault('Status', 'Booked')
row.setdefault('Quantity', 1)
# Strip internal brief metadata fields
for k in list(row.keys()):
if k.startswith('_'):
del row[k]
base.append(row)
update_sheet(user_id, sheet_id, base)
await update_sheet(user_id, sheet_id, base)
return jsonify({'success': True, 'imported': len(incoming), 'total': len(base)})

View file

@ -19,6 +19,7 @@ from .auth import msal_auth
from .jobs import JobManager
from .ws import ws_manager
from .runners.job_runner import start_background_workers, stop_background_workers
from .db import init_pool, close_pool
# API blueprints
from .api.auth import auth_bp
@ -65,9 +66,6 @@ def create_app() -> Quart:
server_config.ensure_directories()
# Seed dropdowns.json from embedded data if not present
_seed_dropdowns_if_needed()
job_manager = JobManager.get_instance()
# Register blueprints
@ -80,6 +78,10 @@ def create_app() -> Quart:
@app.before_serving
async def startup():
logger.info("Starting AC Tool server...")
# Connect to PostgreSQL and apply schema
await init_pool(server_config.DATABASE_URL)
await _apply_schema()
await _seed_dropdowns_if_needed()
await ws_manager.start_background_tasks()
global background_workers
background_workers = await start_background_workers(
@ -95,6 +97,7 @@ def create_app() -> Quart:
if background_workers:
await stop_background_workers(background_workers)
await ws_manager.stop_background_tasks()
await close_pool()
@app.route('/health')
async def health():
@ -170,6 +173,32 @@ def create_app() -> Quart:
return app
async def _apply_schema():
"""Create tables if they don't exist (idempotent)."""
from .db.pool import get_pool
schema_path = os.path.join(os.path.dirname(__file__), 'db', 'schema.sql')
with open(schema_path, 'r') as f:
sql = f.read()
pool = get_pool()
async with pool.acquire() as conn:
await conn.execute(sql)
logger.info("Database schema applied")
async def _seed_dropdowns_if_needed():
"""Seed global dropdown categories if the DB table is empty."""
from .db.pool import get_pool
pool = get_pool()
async with pool.acquire() as conn:
count = await conn.fetchval(
'SELECT COUNT(*) FROM dropdown_categories WHERE client_id IS NULL'
)
if count == 0:
from .api.dropdowns import SEED_CATEGORIES, save_dropdowns
await save_dropdowns(SEED_CATEGORIES)
logger.info(f"Seeded {len(SEED_CATEGORIES)} global dropdown categories")
def _register_spa(app: Quart):
"""Serve the Vite-built React frontend for all non-API routes."""
import os
@ -191,15 +220,6 @@ def _register_spa(app: Quart):
return await send_from_directory(dist, 'index.html')
def _seed_dropdowns_if_needed():
"""Write initial dropdowns.json from embedded seed data if file doesn't exist."""
path = server_config.DROPDOWNS_FILE
if os.path.exists(path):
return
from .api.dropdowns import SEED_CATEGORIES, save_dropdowns
save_dropdowns(SEED_CATEGORIES)
logger.info(f"Seeded {len(SEED_CATEGORIES)} categories to {path}")
async def periodic_cleanup(job_manager: JobManager):
while True:

View file

@ -21,7 +21,6 @@ def _check_emergency_token(token: str) -> Optional[Dict[str, Any]]:
et = server_config.EMERGENCY_TOKEN
if not et or not token:
return None
# Constant-time compare to prevent timing attacks
import hmac
if hmac.compare_digest(token, et):
email = server_config.EMERGENCY_USER_EMAIL
@ -39,11 +38,9 @@ async def _extract_token_user() -> Optional[Dict[str, Any]]:
if auth_header.startswith('Bearer '):
token = auth_header[7:]
else:
# Fallback for browser download links (window.open) which can't set headers
token = request.args.get('_token', '')
if not token:
return None
# Check emergency bypass before attempting MSAL validation
emergency = _check_emergency_token(token)
if emergency:
return emergency
@ -52,14 +49,14 @@ async def _extract_token_user() -> Optional[Dict[str, Any]]:
async def _resolve_user(token_user: Dict) -> Dict:
"""
Merge token claims with our users.json store.
Merge token claims with DB user store.
Creates the user record on first login; enriches token info with role.
"""
user_id = token_user['oid']
email = token_user.get('preferred_username', '')
name = token_user.get('name', '')
stored = upsert_user(user_id, email, name)
stored = await upsert_user(user_id, email, name)
return {**token_user, 'role': stored.get('role', 'user'), 'active': stored.get('active', True)}
@ -76,8 +73,7 @@ def auth_required(f: Callable) -> Callable:
'role': role,
'active': True,
}
# Ensure dev user exists in store
upsert_user(
await upsert_user(
server_config.DEV_USER_ID,
server_config.DEV_USER_EMAIL,
server_config.DEV_USER_NAME,
@ -113,7 +109,7 @@ def admin_required(f: Callable) -> Callable:
'role': role,
'active': True,
}
upsert_user(
await upsert_user(
server_config.DEV_USER_ID,
server_config.DEV_USER_EMAIL,
server_config.DEV_USER_NAME,

View file

@ -1,96 +1,86 @@
"""
User store manages users.json (roles, active status).
User store PostgreSQL-backed.
Keyed by Azure AD oid (object ID).
"""
import json
import logging
import os
from datetime import datetime, timezone
from typing import Dict, Optional
from ..config_runtime import server_config
from ..db.pool import get_pool
logger = logging.getLogger(__name__)
_LOCK_FILE = server_config.USERS_FILE + '.lock'
def _row_to_dict(row) -> Dict:
return {
'id': row['id'],
'email': row['email'],
'name': row['name'],
'role': row['role'],
'active': row['active'],
'created': row['created_at'].isoformat() if row['created_at'] else None,
'last_seen': row['last_seen_at'].isoformat() if row['last_seen_at'] else None,
}
def _load() -> Dict:
path = server_config.USERS_FILE
if not os.path.exists(path):
return {}
try:
with open(path, 'r') as f:
return json.load(f)
except Exception:
return {}
async def get_user(user_id: str) -> Optional[Dict]:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow('SELECT * FROM users WHERE id = $1', user_id)
return _row_to_dict(row) if row else None
def _save(data: Dict):
path = server_config.USERS_FILE
with open(path, 'w') as f:
json.dump(data, f, indent=2)
def get_user(user_id: str) -> Optional[Dict]:
users = _load()
return users.get(user_id)
def upsert_user(user_id: str, email: str, name: str, role: Optional[str] = None) -> Dict:
async def upsert_user(user_id: str, email: str, name: str, role: Optional[str] = None) -> Dict:
"""
Create or update user. On creation defaults to 'user' role,
unless the email matches ADMIN_EMAIL env var (gets 'admin').
Create or update user. On first creation, grants admin if email is in ADMIN_EMAILS.
"""
users = _load()
existing = users.get(user_id)
pool = get_pool()
async with pool.acquire() as conn:
existing = await conn.fetchrow('SELECT * FROM users WHERE id = $1', user_id)
if existing is None:
# First login — admin if email is in ADMIN_EMAILS list
default_role = 'admin' if email and email.lower() in server_config.ADMIN_EMAILS else 'user'
user = {
'id': user_id,
'email': email,
'name': name,
'role': role or default_role,
'active': True,
'created': datetime.now(timezone.utc).isoformat(),
'last_seen': datetime.now(timezone.utc).isoformat(),
}
else:
user = {**existing}
user['email'] = email or existing.get('email', '')
user['name'] = name or existing.get('name', '')
user['last_seen'] = datetime.now(timezone.utc).isoformat()
if role is not None:
user['role'] = role
if existing is None:
default_role = 'admin' if email and email.lower() in server_config.ADMIN_EMAILS else 'user'
row = await conn.fetchrow('''
INSERT INTO users (id, email, name, role, active)
VALUES ($1, $2, $3, $4, TRUE)
RETURNING *
''', user_id, email or '', name or '', role or default_role)
else:
new_role = role if role is not None else existing['role']
row = await conn.fetchrow('''
UPDATE users
SET email = $2, name = $3, role = $4, last_seen_at = NOW()
WHERE id = $1
RETURNING *
''', user_id, email or existing['email'], name or existing['name'], new_role)
users[user_id] = user
_save(users)
return user
return _row_to_dict(row)
def list_users() -> list:
users = _load()
return sorted(users.values(), key=lambda u: u.get('last_seen', ''), reverse=True)
async def list_users() -> list:
pool = get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch('SELECT * FROM users ORDER BY last_seen_at DESC')
return [_row_to_dict(r) for r in rows]
def set_role(user_id: str, role: str) -> Optional[Dict]:
async def set_role(user_id: str, role: str) -> Optional[Dict]:
if role not in ('user', 'admin'):
return None
users = _load()
if user_id not in users:
return None
users[user_id]['role'] = role
_save(users)
return users[user_id]
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
'UPDATE users SET role = $2 WHERE id = $1 RETURNING *', user_id, role
)
return _row_to_dict(row) if row else None
def set_active(user_id: str, active: bool) -> Optional[Dict]:
users = _load()
if user_id not in users:
return None
users[user_id]['active'] = active
_save(users)
return users[user_id]
async def set_active(user_id: str, active: bool) -> Optional[Dict]:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
'UPDATE users SET active = $2 WHERE id = $1 RETURNING *', user_id, active
)
return _row_to_dict(row) if row else None

View file

@ -74,6 +74,9 @@ class ServerConfig:
GEMINI_API_KEY: str = os.getenv('GEMINI_API_KEY', '')
GEMINI_MODEL: str = os.getenv('GEMINI_MODEL', 'gemini-3-flash-preview')
# PostgreSQL
DATABASE_URL: str = os.getenv('DATABASE_URL', 'postgresql://achelper:achelper@localhost:5432/achelper')
# Data paths — mounted as Docker volume
DATA_DIR: str = os.getenv(
'DATA_DIR',

View file

@ -0,0 +1,4 @@
# Database module — asyncpg PostgreSQL connection pool
from .pool import init_pool, close_pool, get_pool
__all__ = ['init_pool', 'close_pool', 'get_pool']

View file

@ -0,0 +1,177 @@
"""
One-time migration: import existing JSON file data into PostgreSQL.
Run inside the container:
python -m server.db.migrate_json
Or from the project root:
cd backend && python -m server.db.migrate_json
"""
import asyncio
import json
import logging
import os
import sys
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def migrate():
# Import here to avoid circular issues at module level
from ..config_runtime import server_config
from ..db.pool import init_pool, get_pool
dsn = server_config.DATABASE_URL
if not dsn:
logger.error("DATABASE_URL not set — nothing to migrate")
sys.exit(1)
await init_pool(dsn)
pool = get_pool()
async with pool.acquire() as conn:
# ── Users ─────────────────────────────────────────────────────────────
users_file = server_config.USERS_FILE
if os.path.exists(users_file):
with open(users_file) as f:
users = json.load(f)
count = 0
for uid, u in users.items():
await conn.execute('''
INSERT INTO users (id, email, name, role, active, created_at, last_seen_at)
VALUES ($1, $2, $3, $4, $5,
COALESCE($6::timestamptz, NOW()),
COALESCE($7::timestamptz, NOW()))
ON CONFLICT (id) DO NOTHING
''', uid, u.get('email', ''), u.get('name', ''),
u.get('role', 'user'), u.get('active', True),
u.get('created'), u.get('last_seen'))
count += 1
logger.info(f"Migrated {count} users")
# ── Clients ────────────────────────────────────────────────────────────
clients_file = server_config.CLIENTS_FILE
if os.path.exists(clients_file):
with open(clients_file) as f:
clients = json.load(f)
for c in clients:
await conn.execute('''
INSERT INTO clients (id, name, has_custom_dropdowns, created_at)
VALUES ($1, $2, $3, COALESCE($4::timestamptz, NOW()))
ON CONFLICT (id) DO NOTHING
''', c['id'], c['name'], c.get('hasCustomDropdowns', False), c.get('created'))
logger.info(f"Migrated {len(clients)} clients")
# ── Global dropdowns ───────────────────────────────────────────────────
dropdowns_file = server_config.DROPDOWNS_FILE
if os.path.exists(dropdowns_file):
with open(dropdowns_file) as f:
categories = json.load(f)
# Delete existing global and re-insert (idempotent)
await conn.execute("DELETE FROM dropdown_categories WHERE client_id IS NULL")
for cat in categories:
await conn.execute('''
INSERT INTO dropdown_categories (client_id, name, status, media_types)
VALUES (NULL, $1, $2, $3)
''', cat['name'], cat.get('status', 'Active'), cat.get('mediaTypes', []))
logger.info(f"Migrated {len(categories)} global categories")
# ── Per-client dropdowns ───────────────────────────────────────────────
client_dd_dir = server_config.CLIENTS_DROPDOWNS_DIR
if os.path.isdir(client_dd_dir):
for fname in os.listdir(client_dd_dir):
if not fname.endswith('.json') or '_export' in fname:
continue
client_id = fname[:-5]
with open(os.path.join(client_dd_dir, fname)) as f:
cats = json.load(f)
await conn.execute("DELETE FROM dropdown_categories WHERE client_id = $1", client_id)
for cat in cats:
await conn.execute('''
INSERT INTO dropdown_categories (client_id, name, status, media_types)
VALUES ($1, $2, $3, $4)
''', client_id, cat['name'], cat.get('status', 'Active'), cat.get('mediaTypes', []))
logger.info(f"Migrated {len(cats)} categories for client {client_id}")
# ── Global export template ─────────────────────────────────────────────
tpl_file = server_config.EXPORT_TEMPLATE_FILE
if os.path.exists(tpl_file):
with open(tpl_file) as f:
tpl = json.load(f)
await conn.execute('''
INSERT INTO export_templates (scope, columns) VALUES ('global', $1)
ON CONFLICT (scope) DO UPDATE SET columns = $1, updated_at = NOW()
''', tpl)
logger.info("Migrated global export template")
# ── Per-client export templates ────────────────────────────────────────
if os.path.isdir(client_dd_dir):
for fname in os.listdir(client_dd_dir):
if not fname.endswith('_export.json'):
continue
client_id = fname[:-len('_export.json')]
with open(os.path.join(client_dd_dir, fname)) as f:
tpl = json.load(f)
scope = f'client:{client_id}'
await conn.execute('''
INSERT INTO export_templates (scope, columns) VALUES ($1, $2)
ON CONFLICT (scope) DO UPDATE SET columns = $2, updated_at = NOW()
''', scope, tpl)
logger.info(f"Migrated export template for client {client_id}")
# ── Per-user export templates ──────────────────────────────────────────
user_tpl_dir = server_config.USER_EXPORT_TEMPLATES_DIR
if os.path.isdir(user_tpl_dir):
for fname in os.listdir(user_tpl_dir):
if not fname.endswith('.json'):
continue
user_id = fname[:-5]
with open(os.path.join(user_tpl_dir, fname)) as f:
tpl = json.load(f)
scope = f'user:{user_id}'
await conn.execute('''
INSERT INTO export_templates (scope, columns) VALUES ($1, $2)
ON CONFLICT (scope) DO UPDATE SET columns = $2, updated_at = NOW()
''', scope, tpl)
logger.info(f"Migrated export template for user {user_id}")
# ── Sheets ─────────────────────────────────────────────────────────────
metadata_file = os.path.join(server_config.DATA_DIR, 'sheets_metadata.json')
sheets_dir = server_config.SHEETS_DIR
if os.path.exists(metadata_file):
with open(metadata_file) as f:
meta = json.load(f)
total = 0
for user_id, user_sheets in meta.items():
for sheet_meta in user_sheets:
sid = sheet_meta['id']
# Build safe filename (mirror manager.py logic)
import re
safe_uid = re.sub(r'[^a-zA-Z0-9_\-]', '_', user_id)
data_file = os.path.join(sheets_dir, f"{safe_uid}_{sid}.json")
data = []
if os.path.exists(data_file):
with open(data_file) as f:
data = json.load(f)
client_id = sheet_meta.get('client_id') or None
await conn.execute('''
INSERT INTO sheets
(id, user_id, name, client_id, data, item_count, created_at, modified_at)
VALUES ($1, $2, $3, $4, $5, $6,
COALESCE($7::timestamptz, NOW()),
COALESCE($8::timestamptz, NOW()))
ON CONFLICT (id) DO NOTHING
''', sid, user_id, sheet_meta.get('name', 'Untitled'),
client_id, data, len(data),
sheet_meta.get('created'), sheet_meta.get('modified'))
total += 1
logger.info(f"Migrated {total} sheets")
logger.info("Migration complete!")
if __name__ == '__main__':
asyncio.run(migrate())

44
backend/server/db/pool.py Normal file
View file

@ -0,0 +1,44 @@
"""
asyncpg connection pool with JSONB codec registration.
Call init_pool() once at startup and close_pool() at shutdown.
"""
import json
import logging
import asyncpg
logger = logging.getLogger(__name__)
_pool: asyncpg.Pool | None = None
async def _init_conn(conn: asyncpg.Connection):
"""Register JSONB/JSON codecs so Python dicts/lists are passed transparently."""
await conn.set_type_codec('jsonb', encoder=json.dumps, decoder=json.loads, schema='pg_catalog')
await conn.set_type_codec('json', encoder=json.dumps, decoder=json.loads, schema='pg_catalog')
async def init_pool(dsn: str):
global _pool
_pool = await asyncpg.create_pool(
dsn,
min_size=2,
max_size=10,
command_timeout=30,
init=_init_conn,
)
logger.info("PostgreSQL pool initialized")
async def close_pool():
global _pool
if _pool:
await _pool.close()
_pool = None
logger.info("PostgreSQL pool closed")
def get_pool() -> asyncpg.Pool:
if _pool is None:
raise RuntimeError("Database pool not initialized — call init_pool() first")
return _pool

View file

@ -0,0 +1,48 @@
-- AC Tool PostgreSQL schema
-- Run this once (idempotent — safe to re-run).
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
email TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL DEFAULT '',
role TEXT NOT NULL DEFAULT 'user',
active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS clients (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
has_custom_dropdowns BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- NULL client_id = global hierarchy; non-null = per-client override
CREATE TABLE IF NOT EXISTS dropdown_categories (
id SERIAL PRIMARY KEY,
client_id TEXT REFERENCES clients(id) ON DELETE CASCADE,
name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'Active',
media_types JSONB NOT NULL DEFAULT '[]'
);
CREATE INDEX IF NOT EXISTS idx_dropdown_cat_client ON dropdown_categories(client_id);
-- scope: 'global' | 'client:<id>' | 'user:<id>'
CREATE TABLE IF NOT EXISTS export_templates (
scope TEXT PRIMARY KEY,
columns JSONB NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS sheets (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
name TEXT NOT NULL,
client_id TEXT REFERENCES clients(id) ON DELETE SET NULL,
data JSONB NOT NULL DEFAULT '[]',
item_count INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
modified_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_sheets_user ON sheets(user_id);

View file

@ -1,172 +1,124 @@
"""
Sheet management Python port of sheet_helpers.php.
File-based JSON storage, one metadata file + one data file per sheet.
Sheet management PostgreSQL-backed.
All functions are async.
"""
import json
import logging
import os
import re
import time
import random
from datetime import datetime, timezone
from typing import List, Optional, Dict
import time
import random
from ..config_runtime import server_config
from ..db.pool import get_pool
logger = logging.getLogger(__name__)
METADATA_FILE = os.path.join(server_config.DATA_DIR, 'sheets_metadata.json')
async def get_user_sheets(user_id: str) -> List[Dict]:
pool = get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
'SELECT id, name, client_id, item_count, created_at, modified_at '
'FROM sheets WHERE user_id = $1 ORDER BY modified_at DESC',
user_id
)
return [_row_to_meta(r, user_id) for r in rows]
def _safe_user(user_id: str) -> str:
"""Sanitise user_id for use in filenames."""
return re.sub(r'[^a-zA-Z0-9_\-]', '_', user_id)
def _sheet_path(user_id: str, sheet_id: str) -> str:
return os.path.join(server_config.SHEETS_DIR, f"{_safe_user(user_id)}_{sheet_id}.json")
def _load_metadata() -> Dict:
if not os.path.exists(METADATA_FILE):
return {}
try:
with open(METADATA_FILE, 'r') as f:
return json.load(f)
except Exception:
return {}
def _save_metadata(meta: Dict):
with open(METADATA_FILE, 'w') as f:
json.dump(meta, f, indent=2)
def get_user_sheets(user_id: str) -> List[Dict]:
meta = _load_metadata()
return meta.get(user_id, [])
def create_sheet(user_id: str, name: str, data: List[dict] = None, client_id: str = '') -> Dict:
async def create_sheet(user_id: str, name: str, data: List[dict] = None, client_id: str = '') -> Dict:
if data is None:
data = []
sheet_id = str(int(time.time())) + str(random.randint(100, 999))
now = datetime.now(timezone.utc).isoformat()
sheet_name = name or f"Untitled Sheet — {datetime.now().strftime('%Y-%m-%d %H:%M')}"
client_id_val = client_id or None
sheet_meta = {
'id': sheet_id,
'name': name or f"Untitled Sheet — {datetime.now().strftime('%Y-%m-%d %H:%M')}",
'created': now,
'modified': now,
'itemCount': len(data),
'user': user_id,
'client_id': client_id,
}
# Write data file
path = _sheet_path(user_id, sheet_id)
with open(path, 'w') as f:
json.dump(data, f, indent=2)
# Update metadata
meta = _load_metadata()
meta.setdefault(user_id, []).append(sheet_meta)
_save_metadata(meta)
return sheet_meta
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow('''
INSERT INTO sheets (id, user_id, name, client_id, data, item_count)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id, name, client_id, item_count, created_at, modified_at
''', sheet_id, user_id, sheet_name, client_id_val, data, len(data))
return _row_to_meta(row, user_id)
def load_sheet_data(user_id: str, sheet_id: str) -> Optional[List[dict]]:
path = _sheet_path(user_id, sheet_id)
if not os.path.exists(path):
return None
try:
with open(path, 'r') as f:
return json.load(f)
except Exception:
return None
async def load_sheet_data(user_id: str, sheet_id: str) -> Optional[List[dict]]:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT data FROM sheets WHERE id = $1 AND user_id = $2',
sheet_id, user_id
)
if row is None:
return None
return row['data'] if row['data'] is not None else []
def update_sheet(user_id: str, sheet_id: str, data: List[dict]) -> bool:
path = _sheet_path(user_id, sheet_id)
with open(path, 'w') as f:
json.dump(data, f, indent=2)
# Update metadata counts
meta = _load_metadata()
if user_id in meta:
for sheet in meta[user_id]:
if sheet['id'] == sheet_id:
sheet['modified'] = datetime.now(timezone.utc).isoformat()
sheet['itemCount'] = len(data)
break
_save_metadata(meta)
return True
async def update_sheet(user_id: str, sheet_id: str, data: List[dict]) -> bool:
pool = get_pool()
async with pool.acquire() as conn:
result = await conn.execute('''
UPDATE sheets SET data = $3, item_count = $4, modified_at = NOW()
WHERE id = $1 AND user_id = $2
''', sheet_id, user_id, data, len(data))
return result != 'UPDATE 0'
def delete_sheet(user_id: str, sheet_id: str):
path = _sheet_path(user_id, sheet_id)
if os.path.exists(path):
os.remove(path)
meta = _load_metadata()
if user_id in meta:
meta[user_id] = [s for s in meta[user_id] if s['id'] != sheet_id]
_save_metadata(meta)
async def delete_sheet(user_id: str, sheet_id: str):
pool = get_pool()
async with pool.acquire() as conn:
await conn.execute(
'DELETE FROM sheets WHERE id = $1 AND user_id = $2',
sheet_id, user_id
)
def rename_sheet(user_id: str, sheet_id: str, new_name: str) -> bool:
meta = _load_metadata()
if user_id not in meta:
return False
for sheet in meta[user_id]:
if sheet['id'] == sheet_id:
sheet['name'] = new_name
sheet['modified'] = datetime.now(timezone.utc).isoformat()
_save_metadata(meta)
return True
return False
async def rename_sheet(user_id: str, sheet_id: str, new_name: str) -> bool:
pool = get_pool()
async with pool.acquire() as conn:
result = await conn.execute('''
UPDATE sheets SET name = $3, modified_at = NOW()
WHERE id = $1 AND user_id = $2
''', sheet_id, user_id, new_name)
return result != 'UPDATE 0'
def duplicate_sheet(user_id: str, sheet_id: str) -> Optional[Dict]:
data = load_sheet_data(user_id, sheet_id)
if data is None:
return None
meta = _load_metadata()
original_name = "Copy of Sheet"
for sheet in meta.get(user_id, []):
if sheet['id'] == sheet_id:
original_name = f"Copy of {sheet['name']}"
break
return create_sheet(user_id, original_name, data)
async def duplicate_sheet(user_id: str, sheet_id: str) -> Optional[Dict]:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT name, data FROM sheets WHERE id = $1 AND user_id = $2',
sheet_id, user_id
)
if row is None:
return None
return await create_sheet(user_id, f"Copy of {row['name']}", row['data'])
def get_sheet_client_id(user_id: str, sheet_id: str) -> Optional[str]:
"""Return the client_id associated with a sheet, or None."""
meta = _load_metadata()
for sheet in meta.get(user_id, []):
if sheet['id'] == sheet_id:
return sheet.get('client_id') or None
return None
async def get_sheet_client_id(user_id: str, sheet_id: str) -> Optional[str]:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT client_id FROM sheets WHERE id = $1 AND user_id = $2',
sheet_id, user_id
)
if row is None:
return None
return row['client_id']
def set_sheet_client_id(user_id: str, sheet_id: str, client_id: str):
"""Update the client_id on an existing sheet."""
meta = _load_metadata()
if user_id in meta:
for sheet in meta[user_id]:
if sheet['id'] == sheet_id:
sheet['client_id'] = client_id
_save_metadata(meta)
return True
return False
async def set_sheet_client_id(user_id: str, sheet_id: str, client_id: str):
pool = get_pool()
async with pool.acquire() as conn:
await conn.execute('''
UPDATE sheets SET client_id = $3, modified_at = NOW()
WHERE id = $1 AND user_id = $2
''', sheet_id, user_id, client_id or None)
def generate_next_id(data: List[dict]) -> str:
"""Generate the next DEL-NNN id."""
"""Generate the next DEL-NNN id. Remains sync — operates on in-memory data."""
max_id = 0
for row in data:
num_str = row.get('Number', '').replace('DEL-', '')
@ -174,6 +126,18 @@ def generate_next_id(data: List[dict]) -> str:
n = int(num_str)
if n > max_id:
max_id = n
except ValueError:
except (ValueError, AttributeError):
pass
return f"DEL-{str(max_id + 1).zfill(3)}"
def _row_to_meta(row, user_id: str) -> Dict:
return {
'id': row['id'],
'name': row['name'],
'client_id': row['client_id'],
'itemCount': row['item_count'],
'user': user_id,
'created': row['created_at'].isoformat() if row['created_at'] else None,
'modified': row['modified_at'].isoformat() if row['modified_at'] else None,
}

View file

@ -27,15 +27,35 @@
version: '3.9'
services:
postgres:
image: postgres:16-alpine
container_name: ac-tool-db
restart: unless-stopped
environment:
POSTGRES_DB: achelper
POSTGRES_USER: achelper
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-achelper_secret}
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U achelper -d achelper"]
interval: 10s
timeout: 5s
retries: 5
app:
build: .
container_name: ac-tool
restart: unless-stopped
depends_on:
postgres:
condition: service_healthy
ports:
- "${APP_PORT:-8100}:8000"
volumes:
- ./data:/app/data
environment:
DATABASE_URL: postgresql://achelper:${POSTGRES_PASSWORD:-achelper_secret}@postgres:5432/achelper
# Auth — AZURE_* names in .env, MSAL_* names read by server/config_runtime.py
AZURE_TENANT_ID: ${AZURE_TENANT_ID:-e519c2e6-bc6d-4fdf-8d9c-923c2f002385}
AZURE_CLIENT_ID: ${AZURE_CLIENT_ID:-9079054c-9620-4757-a256-23413042f1ef}
@ -113,4 +133,7 @@ services:
interval: 30s
timeout: 10s
retries: 3
start_period: 20s
start_period: 30s
volumes:
postgres_data: