""" Main Quart application for Brief Extractor GUI server """ import asyncio import json import logging import os import signal import sys from datetime import datetime from typing import Optional, List from quart import Quart, websocket, request, jsonify from quart_cors import cors import structlog from .config_runtime import server_config from .auth import msal_auth from .jobs import JobManager from .ws import ws_manager, WebSocketManager from .runners.job_runner import start_background_workers, stop_background_workers # Import API blueprints from .api.auth import auth_bp from .api.config import config_bp from .api.jobs import jobs_bp # Configure structured logging structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="ISO"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.JSONRenderer() ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) logger = structlog.get_logger(__name__) # Global worker tasks background_workers: List[asyncio.Task] = [] def create_app() -> Quart: """ Create and configure the Quart application Returns: Configured Quart application """ app = Quart(__name__) # Configure CORS cors_config = server_config.get_cors_config() cors(app, **cors_config) # Configure app app.config.update({ 'MAX_CONTENT_LENGTH': server_config.MAX_CONTENT_LENGTH, 'SECRET_KEY': server_config.SESSION_SECRET, 'SECURE_COOKIES': server_config.SECURE_COOKIES, 'HTTPS_ONLY': server_config.HTTPS_ONLY }) # Ensure required directories exist server_config.ensure_directories() # Initialize components job_manager = JobManager.get_instance() # Register blueprints app.register_blueprint(auth_bp) app.register_blueprint(config_bp) app.register_blueprint(jobs_bp) @app.before_serving async def startup(): """Initialize services on app startup""" try: logger.info("Starting Brief Extractor GUI server...") # Start WebSocket background tasks await ws_manager.start_background_tasks() # Start job processing workers global background_workers background_workers = await start_background_workers( job_manager, ws_manager, num_workers=server_config.MAX_CONCURRENT_JOBS ) # Schedule periodic cleanup cleanup_task = asyncio.create_task(periodic_cleanup(job_manager)) background_workers.append(cleanup_task) logger.info("Server startup completed successfully") except Exception as e: logger.error(f"Failed to start server: {e}", exc_info=True) raise @app.after_serving async def shutdown(): """Cleanup on app shutdown""" try: logger.info("Shutting down Brief Extractor GUI server...") # Stop background workers global background_workers if background_workers: await stop_background_workers(background_workers) # Stop WebSocket tasks await ws_manager.stop_background_tasks() logger.info("Server shutdown completed") except Exception as e: logger.error(f"Error during shutdown: {e}", exc_info=True) # Add signal handling to the app import signal def signal_handler(signum, frame): logger.info(f"Received signal {signum}, shutting down...") # Set shutdown flag for Hypercorn import asyncio try: # Cancel all background tasks global background_workers if background_workers: for worker in background_workers: if not worker.done(): worker.cancel() # Get the event loop and schedule shutdown loop = asyncio.get_event_loop() if loop.is_running(): loop.stop() except Exception as e: logger.error(f"Error in signal handler: {e}") # Force exit if needed os._exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @app.route('/') async def root(): """Root endpoint - health check""" return jsonify({ 'service': 'Brief Extractor GUI', 'status': 'running', 'timestamp': datetime.utcnow().isoformat(), 'version': '1.0.0', 'devMode': server_config.DEV_MODE }) @app.route('/health') async def health(): """Health check endpoint""" try: # Check queue status queue_size = await job_manager.get_queue_size() active_jobs = await job_manager.get_active_jobs_count() # Check WebSocket connections ws_stats = await ws_manager.get_connection_stats() return jsonify({ 'status': 'healthy', 'timestamp': datetime.utcnow().isoformat(), 'queue': { 'pending': queue_size, 'active': active_jobs }, 'websockets': ws_stats, 'config': { 'devMode': server_config.DEV_MODE, 'maxConcurrentJobs': server_config.MAX_CONCURRENT_JOBS, 'maxUploadSize': f"{server_config.MAX_UPLOAD_SIZE_MB}MB" } }) except Exception as e: logger.error(f"Health check failed: {e}") return jsonify({ 'status': 'unhealthy', 'error': str(e), 'timestamp': datetime.utcnow().isoformat() }), 500 @app.websocket('/ws') async def websocket_handler(): """WebSocket endpoint for real-time updates""" client = None try: # Extract user info from WebSocket headers or query params user_id = None if server_config.DEV_MODE: user_id = 'dev-user-id' else: # Try to get auth from headers (won't work in browsers) auth_header = websocket.headers.get('Authorization') if auth_header: token = auth_header.replace('Bearer ', '') user_info = await msal_auth.validate_token(token) if user_info: user_id = user_info['oid'] # If no auth header, try to get user from query params # Frontend should pass the access token as a query parameter if not user_id: token = websocket.args.get('token') if token: user_info = await msal_auth.validate_token(token) if user_info: user_id = user_info['oid'] logger.info(f"WebSocket authenticated via query param for user: {user_id}") # If still no user_id, reject connection if not user_id: logger.warning("WebSocket connection rejected - no valid authentication") await websocket.send(json.dumps({'error': 'unauthorized', 'message': 'Authentication required'})) return # Register WebSocket client client = await ws_manager.register_client(user_id) # Send initial queue snapshot jobs_data = job_manager.serialize_all() await ws_manager.send_queue_snapshot(client, jobs_data) # Keep connection alive and handle incoming messages while True: try: message = await websocket.receive() if message: data = json.loads(message) # Handle ping messages if data.get('type') == 'ping': await client.send({'type': 'pong'}) # Handle other message types as needed # For now, we don't need to handle any incoming messages except Exception as e: logger.warning(f"WebSocket message error: {e}") break except Exception as e: logger.error(f"WebSocket error: {e}") finally: if client: await ws_manager.unregister_client(client.client_id) @app.errorhandler(400) async def bad_request(error): return jsonify({ 'error': 'bad_request', 'message': 'Invalid request' }), 400 @app.errorhandler(401) async def unauthorized(error): return jsonify({ 'error': 'unauthorized', 'message': 'Authentication required' }), 401 @app.errorhandler(403) async def forbidden(error): return jsonify({ 'error': 'forbidden', 'message': 'Access denied' }), 403 @app.errorhandler(404) async def not_found(error): return jsonify({ 'error': 'not_found', 'message': 'Resource not found' }), 404 @app.errorhandler(413) async def payload_too_large(error): return jsonify({ 'error': 'file_too_large', 'message': f'File size exceeds {server_config.MAX_UPLOAD_SIZE_MB}MB limit' }), 413 @app.errorhandler(500) async def internal_error(error): logger.error(f"Internal server error: {error}", exc_info=True) return jsonify({ 'error': 'internal_error', 'message': 'Internal server error' }), 500 return app async def periodic_cleanup(job_manager: JobManager): """ Periodic cleanup task for expired jobs and files Args: job_manager: JobManager instance """ while True: try: await asyncio.sleep(3600) # Run every hour logger.info("Starting periodic cleanup...") cleaned_count = await job_manager.cleanup_expired_jobs() if cleaned_count > 0: logger.info(f"Periodic cleanup completed: {cleaned_count} items removed") except asyncio.CancelledError: logger.info("Periodic cleanup task cancelled") break except Exception as e: logger.error(f"Periodic cleanup error: {e}") def setup_signal_handlers(app: Quart): """ Set up signal handlers for graceful shutdown Args: app: Quart application instance """ def signal_handler(signum, frame): logger.info(f"Received signal {signum}, shutting down...") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) if __name__ == '__main__': # Create application app = create_app() # Set up signal handlers setup_signal_handlers(app) # Configure logging logging.basicConfig( level=logging.INFO if not server_config.DEBUG else logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Run with Hypercorn import hypercorn.asyncio from hypercorn import Config config = Config() config.bind = [f"{server_config.HOST}:{server_config.PORT}"] config.workers = server_config.WORKERS config.use_reloader = server_config.DEBUG config.accesslog = "-" # Log to stdout logger.info(f"Starting server on {server_config.HOST}:{server_config.PORT}") logger.info(f"Development mode: {server_config.DEV_MODE}") logger.info(f"Workers: {server_config.WORKERS}") try: hypercorn.asyncio.serve(app, config) except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Server failed to start: {e}", exc_info=True) sys.exit(1)