ac-tool/backend/server/app.py
Vadym Samoilenko 8da149b84e Migrate storage from JSON files to PostgreSQL (asyncpg)
- Add asyncpg connection pool (db/pool.py) with JSONB codec registration
- Add schema.sql with users, clients, dropdown_categories, export_templates, sheets tables
- Add migrate_json.py one-time migration script for existing JSON data
- Rewrite user_store, sheets/manager, api/clients, api/dropdowns, api/export as async DB-backed
- Update all callers (auth, sheets, admin, ai_command, export) to await async functions
- Add postgres:16-alpine service to docker-compose with named volume and health check
- App container depends_on postgres; DATABASE_URL injected via env
- Schema applied automatically on startup; global categories seeded if DB is empty

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 19:51:37 +00:00

234 lines
7.7 KiB
Python

"""
Main Quart application — AC Tool (AC Helper + Brief Extractor unified)
"""
import asyncio
import json
import logging
import os
import signal
from datetime import datetime
from typing import List
from quart import Quart, websocket, jsonify
from quart_cors import cors
import structlog
from .config_runtime import server_config
from .auth import msal_auth
from .jobs import JobManager
from .ws import ws_manager
from .runners.job_runner import start_background_workers, stop_background_workers
from .db import init_pool, close_pool
# API blueprints
from .api.auth import auth_bp
from .api.jobs import jobs_bp
from .api.config import config_bp
from .api.sheets import sheets_bp
from .api.export import export_bp, user_export_bp
from .api.ai_command import ai_bp
from .api.dropdowns import dropdowns_bp
from .api.admin import admin_bp
from .api.clients import clients_bp
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="ISO"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger(__name__)
background_workers: List[asyncio.Task] = []
def create_app() -> Quart:
app = Quart(__name__)
cors_cfg = server_config.get_cors_config()
cors(app, **cors_cfg)
app.config.update({
'MAX_CONTENT_LENGTH': server_config.MAX_CONTENT_LENGTH,
'SECRET_KEY': server_config.SESSION_SECRET,
})
server_config.ensure_directories()
job_manager = JobManager.get_instance()
# Register blueprints
for bp in [auth_bp, jobs_bp, config_bp, sheets_bp, export_bp, user_export_bp, ai_bp, dropdowns_bp, admin_bp, clients_bp]:
app.register_blueprint(bp)
# Serve React SPA static files (built by Vite into /app/frontend/dist)
_register_spa(app)
@app.before_serving
async def startup():
logger.info("Starting AC Tool server...")
# Connect to PostgreSQL and apply schema
await init_pool(server_config.DATABASE_URL)
await _apply_schema()
await _seed_dropdowns_if_needed()
await ws_manager.start_background_tasks()
global background_workers
background_workers = await start_background_workers(
job_manager, ws_manager, num_workers=server_config.MAX_CONCURRENT_JOBS
)
background_workers.append(asyncio.create_task(periodic_cleanup(job_manager)))
logger.info("Server started", dev_mode=server_config.DEV_MODE)
@app.after_serving
async def shutdown():
logger.info("Shutting down AC Tool server...")
global background_workers
if background_workers:
await stop_background_workers(background_workers)
await ws_manager.stop_background_tasks()
await close_pool()
@app.route('/health')
async def health():
queue_size = await job_manager.get_queue_size()
active_jobs = await job_manager.get_active_jobs_count()
ws_stats = await ws_manager.get_connection_stats()
return jsonify({
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'queue': {'pending': queue_size, 'active': active_jobs},
'websockets': ws_stats,
'devMode': server_config.DEV_MODE,
})
@app.websocket('/ws')
async def websocket_handler():
client = None
try:
if server_config.DEV_MODE:
user_id = server_config.DEV_USER_ID
else:
user_id = None
token = websocket.args.get('token') or (websocket.headers.get('Authorization', '')[7:])
if token:
from .auth.msal_auth import msal_auth as _msal
info = await _msal.validate_token(token)
if info:
user_id = info['oid']
if not user_id:
await websocket.send(json.dumps({'error': 'unauthorized'}))
return
client = await ws_manager.register_client(user_id)
jobs_data = job_manager.serialize_all()
await ws_manager.send_queue_snapshot(client, jobs_data)
while True:
try:
msg = await websocket.receive()
if msg:
data = json.loads(msg)
if data.get('type') == 'ping':
await client.send({'type': 'pong'})
except Exception:
break
except Exception as e:
logger.error(f"WebSocket error: {e}")
finally:
if client:
await ws_manager.unregister_client(client.client_id)
# Error handlers
@app.errorhandler(401)
async def unauthorized(e):
return jsonify({'error': 'unauthorized'}), 401
@app.errorhandler(403)
async def forbidden(e):
return jsonify({'error': 'forbidden'}), 403
@app.errorhandler(404)
async def not_found(e):
return jsonify({'error': 'not_found'}), 404
@app.errorhandler(413)
async def too_large(e):
return jsonify({'error': 'file_too_large', 'message': f'Max {server_config.MAX_UPLOAD_SIZE_MB}MB'}), 413
@app.errorhandler(500)
async def internal(e):
return jsonify({'error': 'internal_error'}), 500
return app
async def _apply_schema():
"""Create tables if they don't exist (idempotent)."""
from .db.pool import get_pool
schema_path = os.path.join(os.path.dirname(__file__), 'db', 'schema.sql')
with open(schema_path, 'r') as f:
sql = f.read()
pool = get_pool()
async with pool.acquire() as conn:
await conn.execute(sql)
logger.info("Database schema applied")
async def _seed_dropdowns_if_needed():
"""Seed global dropdown categories if the DB table is empty."""
from .db.pool import get_pool
pool = get_pool()
async with pool.acquire() as conn:
count = await conn.fetchval(
'SELECT COUNT(*) FROM dropdown_categories WHERE client_id IS NULL'
)
if count == 0:
from .api.dropdowns import SEED_CATEGORIES, save_dropdowns
await save_dropdowns(SEED_CATEGORIES)
logger.info(f"Seeded {len(SEED_CATEGORIES)} global dropdown categories")
def _register_spa(app: Quart):
"""Serve the Vite-built React frontend for all non-API routes."""
import os
from quart import send_from_directory, send_file
dist = os.environ.get('FRONTEND_DIST', os.path.join(os.path.dirname(os.path.dirname(__file__)), '..', 'frontend', 'dist'))
dist = os.path.abspath(dist)
if not os.path.isdir(dist):
logger.warning(f"Frontend dist not found at {dist} — API-only mode")
return
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
async def serve_spa(path):
full = os.path.join(dist, path)
if path and os.path.isfile(full):
return await send_from_directory(dist, path)
return await send_from_directory(dist, 'index.html')
async def periodic_cleanup(job_manager: JobManager):
while True:
try:
await asyncio.sleep(3600)
cleaned = await job_manager.cleanup_expired_jobs()
if cleaned:
logger.info(f"Periodic cleanup: {cleaned} items removed")
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Cleanup error: {e}")