""" Main Quart application — AC Tool (AC Helper + Brief Extractor unified) """ import asyncio import json import logging import os import signal from datetime import datetime from typing import List from quart import Quart, websocket, jsonify from quart_cors import cors import structlog from .config_runtime import server_config from .auth import msal_auth from .jobs import JobManager from .ws import ws_manager from .runners.job_runner import start_background_workers, stop_background_workers from .db import init_pool, close_pool # API blueprints from .api.auth import auth_bp from .api.jobs import jobs_bp from .api.config import config_bp from .api.sheets import sheets_bp from .api.export import export_bp, user_export_bp from .api.ai_command import ai_bp from .api.dropdowns import dropdowns_bp from .api.admin import admin_bp from .api.clients import clients_bp structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="ISO"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.JSONRenderer(), ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) logger = structlog.get_logger(__name__) background_workers: List[asyncio.Task] = [] def create_app() -> Quart: app = Quart(__name__) cors_cfg = server_config.get_cors_config() cors(app, **cors_cfg) app.config.update({ 'MAX_CONTENT_LENGTH': server_config.MAX_CONTENT_LENGTH, 'SECRET_KEY': server_config.SESSION_SECRET, }) server_config.ensure_directories() job_manager = JobManager.get_instance() # Register blueprints for bp in [auth_bp, jobs_bp, config_bp, sheets_bp, export_bp, user_export_bp, ai_bp, dropdowns_bp, admin_bp, clients_bp]: app.register_blueprint(bp) # Serve React SPA static files (built by Vite into /app/frontend/dist) _register_spa(app) @app.before_serving async def startup(): logger.info("Starting AC Tool server...") # Connect to PostgreSQL and apply schema await init_pool(server_config.DATABASE_URL) await _apply_schema() await _seed_dropdowns_if_needed() await ws_manager.start_background_tasks() global background_workers background_workers = await start_background_workers( job_manager, ws_manager, num_workers=server_config.MAX_CONCURRENT_JOBS ) background_workers.append(asyncio.create_task(periodic_cleanup(job_manager))) logger.info("Server started", dev_mode=server_config.DEV_MODE) @app.after_serving async def shutdown(): logger.info("Shutting down AC Tool server...") global background_workers if background_workers: await stop_background_workers(background_workers) await ws_manager.stop_background_tasks() await close_pool() @app.route('/health') async def health(): queue_size = await job_manager.get_queue_size() active_jobs = await job_manager.get_active_jobs_count() ws_stats = await ws_manager.get_connection_stats() return jsonify({ 'status': 'healthy', 'timestamp': datetime.utcnow().isoformat(), 'queue': {'pending': queue_size, 'active': active_jobs}, 'websockets': ws_stats, 'devMode': server_config.DEV_MODE, }) @app.websocket('/ws') async def websocket_handler(): client = None try: if server_config.DEV_MODE: user_id = server_config.DEV_USER_ID else: user_id = None token = websocket.args.get('token') or (websocket.headers.get('Authorization', '')[7:]) if token: from .auth.msal_auth import msal_auth as _msal info = await _msal.validate_token(token) if info: user_id = info['oid'] if not user_id: await websocket.send(json.dumps({'error': 'unauthorized'})) return client = await ws_manager.register_client(user_id) jobs_data = job_manager.serialize_all() await ws_manager.send_queue_snapshot(client, jobs_data) while True: try: msg = await websocket.receive() if msg: data = json.loads(msg) if data.get('type') == 'ping': await client.send({'type': 'pong'}) except Exception: break except Exception as e: logger.error(f"WebSocket error: {e}") finally: if client: await ws_manager.unregister_client(client.client_id) # Error handlers @app.errorhandler(401) async def unauthorized(e): return jsonify({'error': 'unauthorized'}), 401 @app.errorhandler(403) async def forbidden(e): return jsonify({'error': 'forbidden'}), 403 @app.errorhandler(404) async def not_found(e): return jsonify({'error': 'not_found'}), 404 @app.errorhandler(413) async def too_large(e): return jsonify({'error': 'file_too_large', 'message': f'Max {server_config.MAX_UPLOAD_SIZE_MB}MB'}), 413 @app.errorhandler(500) async def internal(e): return jsonify({'error': 'internal_error'}), 500 return app async def _apply_schema(): """Create tables if they don't exist (idempotent).""" from .db.pool import get_pool schema_path = os.path.join(os.path.dirname(__file__), 'db', 'schema.sql') with open(schema_path, 'r') as f: sql = f.read() pool = get_pool() async with pool.acquire() as conn: await conn.execute(sql) logger.info("Database schema applied") async def _seed_dropdowns_if_needed(): """Seed global dropdown categories if the DB table is empty.""" from .db.pool import get_pool pool = get_pool() async with pool.acquire() as conn: count = await conn.fetchval( 'SELECT COUNT(*) FROM dropdown_categories WHERE client_id IS NULL' ) if count == 0: from .api.dropdowns import SEED_CATEGORIES, save_dropdowns await save_dropdowns(SEED_CATEGORIES) logger.info(f"Seeded {len(SEED_CATEGORIES)} global dropdown categories") def _register_spa(app: Quart): """Serve the Vite-built React frontend for all non-API routes.""" import os from quart import send_from_directory, send_file dist = os.environ.get('FRONTEND_DIST', os.path.join(os.path.dirname(os.path.dirname(__file__)), '..', 'frontend', 'dist')) dist = os.path.abspath(dist) if not os.path.isdir(dist): logger.warning(f"Frontend dist not found at {dist} — API-only mode") return @app.route('/', defaults={'path': ''}) @app.route('/') async def serve_spa(path): full = os.path.join(dist, path) if path and os.path.isfile(full): return await send_from_directory(dist, path) return await send_from_directory(dist, 'index.html') async def periodic_cleanup(job_manager: JobManager): while True: try: await asyncio.sleep(3600) cleaned = await job_manager.cleanup_expired_jobs() if cleaned: logger.info(f"Periodic cleanup: {cleaned} items removed") except asyncio.CancelledError: break except Exception as e: logger.error(f"Cleanup error: {e}")