386 lines
No EOL
12 KiB
Python
Executable file
386 lines
No EOL
12 KiB
Python
Executable file
"""
|
|
Main Quart application for Brief Extractor GUI server
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import Optional, List
|
|
|
|
from quart import Quart, websocket, request, jsonify
|
|
from quart_cors import cors
|
|
import structlog
|
|
|
|
from .config_runtime import server_config
|
|
from .auth import msal_auth
|
|
from .jobs import JobManager
|
|
from .ws import ws_manager, WebSocketManager
|
|
from .runners.job_runner import start_background_workers, stop_background_workers
|
|
|
|
# Import API blueprints
|
|
from .api.auth import auth_bp
|
|
from .api.config import config_bp
|
|
from .api.jobs import jobs_bp
|
|
|
|
# Configure structured logging
|
|
structlog.configure(
|
|
processors=[
|
|
structlog.stdlib.filter_by_level,
|
|
structlog.stdlib.add_logger_name,
|
|
structlog.stdlib.add_log_level,
|
|
structlog.stdlib.PositionalArgumentsFormatter(),
|
|
structlog.processors.TimeStamper(fmt="ISO"),
|
|
structlog.processors.StackInfoRenderer(),
|
|
structlog.processors.format_exc_info,
|
|
structlog.processors.JSONRenderer()
|
|
],
|
|
context_class=dict,
|
|
logger_factory=structlog.stdlib.LoggerFactory(),
|
|
wrapper_class=structlog.stdlib.BoundLogger,
|
|
cache_logger_on_first_use=True,
|
|
)
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
# Global worker tasks
|
|
background_workers: List[asyncio.Task] = []
|
|
|
|
def create_app() -> Quart:
|
|
"""
|
|
Create and configure the Quart application
|
|
|
|
Returns:
|
|
Configured Quart application
|
|
"""
|
|
app = Quart(__name__)
|
|
|
|
# Configure CORS
|
|
cors_config = server_config.get_cors_config()
|
|
cors(app, **cors_config)
|
|
|
|
# Configure app
|
|
app.config.update({
|
|
'MAX_CONTENT_LENGTH': server_config.MAX_CONTENT_LENGTH,
|
|
'SECRET_KEY': server_config.SESSION_SECRET,
|
|
'SECURE_COOKIES': server_config.SECURE_COOKIES,
|
|
'HTTPS_ONLY': server_config.HTTPS_ONLY
|
|
})
|
|
|
|
# Ensure required directories exist
|
|
server_config.ensure_directories()
|
|
|
|
# Initialize components
|
|
job_manager = JobManager.get_instance()
|
|
|
|
# Register blueprints
|
|
app.register_blueprint(auth_bp)
|
|
app.register_blueprint(config_bp)
|
|
app.register_blueprint(jobs_bp)
|
|
|
|
@app.before_serving
|
|
async def startup():
|
|
"""Initialize services on app startup"""
|
|
try:
|
|
logger.info("Starting Brief Extractor GUI server...")
|
|
|
|
# Start WebSocket background tasks
|
|
await ws_manager.start_background_tasks()
|
|
|
|
# Start job processing workers
|
|
global background_workers
|
|
background_workers = await start_background_workers(
|
|
job_manager,
|
|
ws_manager,
|
|
num_workers=server_config.MAX_CONCURRENT_JOBS
|
|
)
|
|
|
|
# Schedule periodic cleanup
|
|
cleanup_task = asyncio.create_task(periodic_cleanup(job_manager))
|
|
background_workers.append(cleanup_task)
|
|
|
|
logger.info("Server startup completed successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start server: {e}", exc_info=True)
|
|
raise
|
|
|
|
@app.after_serving
|
|
async def shutdown():
|
|
"""Cleanup on app shutdown"""
|
|
try:
|
|
logger.info("Shutting down Brief Extractor GUI server...")
|
|
|
|
# Stop background workers
|
|
global background_workers
|
|
if background_workers:
|
|
await stop_background_workers(background_workers)
|
|
|
|
# Stop WebSocket tasks
|
|
await ws_manager.stop_background_tasks()
|
|
|
|
logger.info("Server shutdown completed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during shutdown: {e}", exc_info=True)
|
|
|
|
# Add signal handling to the app
|
|
import signal
|
|
|
|
def signal_handler(signum, frame):
|
|
logger.info(f"Received signal {signum}, shutting down...")
|
|
# Set shutdown flag for Hypercorn
|
|
import asyncio
|
|
try:
|
|
# Cancel all background tasks
|
|
global background_workers
|
|
if background_workers:
|
|
for worker in background_workers:
|
|
if not worker.done():
|
|
worker.cancel()
|
|
|
|
# Get the event loop and schedule shutdown
|
|
loop = asyncio.get_event_loop()
|
|
if loop.is_running():
|
|
loop.stop()
|
|
except Exception as e:
|
|
logger.error(f"Error in signal handler: {e}")
|
|
|
|
# Force exit if needed
|
|
os._exit(0)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
@app.route('/')
|
|
async def root():
|
|
"""Root endpoint - health check"""
|
|
return jsonify({
|
|
'service': 'Brief Extractor GUI',
|
|
'status': 'running',
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'version': '1.0.0',
|
|
'devMode': server_config.DEV_MODE
|
|
})
|
|
|
|
@app.route('/health')
|
|
async def health():
|
|
"""Health check endpoint"""
|
|
try:
|
|
# Check queue status
|
|
queue_size = await job_manager.get_queue_size()
|
|
active_jobs = await job_manager.get_active_jobs_count()
|
|
|
|
# Check WebSocket connections
|
|
ws_stats = await ws_manager.get_connection_stats()
|
|
|
|
return jsonify({
|
|
'status': 'healthy',
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'queue': {
|
|
'pending': queue_size,
|
|
'active': active_jobs
|
|
},
|
|
'websockets': ws_stats,
|
|
'config': {
|
|
'devMode': server_config.DEV_MODE,
|
|
'maxConcurrentJobs': server_config.MAX_CONCURRENT_JOBS,
|
|
'maxUploadSize': f"{server_config.MAX_UPLOAD_SIZE_MB}MB"
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Health check failed: {e}")
|
|
return jsonify({
|
|
'status': 'unhealthy',
|
|
'error': str(e),
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}), 500
|
|
|
|
@app.websocket('/ws')
|
|
async def websocket_handler():
|
|
"""WebSocket endpoint for real-time updates"""
|
|
client = None
|
|
try:
|
|
# Extract user info from WebSocket headers or query params
|
|
user_id = None
|
|
|
|
if server_config.DEV_MODE:
|
|
user_id = 'dev-user-id'
|
|
else:
|
|
# Try to get auth from headers (won't work in browsers)
|
|
auth_header = websocket.headers.get('Authorization')
|
|
if auth_header:
|
|
token = auth_header.replace('Bearer ', '')
|
|
user_info = await msal_auth.validate_token(token)
|
|
if user_info:
|
|
user_id = user_info['oid']
|
|
|
|
# If no auth header, try to get user from query params
|
|
# Frontend should pass the access token as a query parameter
|
|
if not user_id:
|
|
token = websocket.args.get('token')
|
|
if token:
|
|
user_info = await msal_auth.validate_token(token)
|
|
if user_info:
|
|
user_id = user_info['oid']
|
|
logger.info(f"WebSocket authenticated via query param for user: {user_id}")
|
|
|
|
# If still no user_id, reject connection
|
|
if not user_id:
|
|
logger.warning("WebSocket connection rejected - no valid authentication")
|
|
await websocket.send(json.dumps({'error': 'unauthorized', 'message': 'Authentication required'}))
|
|
return
|
|
|
|
# Register WebSocket client
|
|
client = await ws_manager.register_client(user_id)
|
|
|
|
# Send initial queue snapshot
|
|
jobs_data = job_manager.serialize_all()
|
|
await ws_manager.send_queue_snapshot(client, jobs_data)
|
|
|
|
# Keep connection alive and handle incoming messages
|
|
while True:
|
|
try:
|
|
message = await websocket.receive()
|
|
|
|
if message:
|
|
data = json.loads(message)
|
|
|
|
# Handle ping messages
|
|
if data.get('type') == 'ping':
|
|
await client.send({'type': 'pong'})
|
|
|
|
# Handle other message types as needed
|
|
# For now, we don't need to handle any incoming messages
|
|
|
|
except Exception as e:
|
|
logger.warning(f"WebSocket message error: {e}")
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(f"WebSocket error: {e}")
|
|
|
|
finally:
|
|
if client:
|
|
await ws_manager.unregister_client(client.client_id)
|
|
|
|
@app.errorhandler(400)
|
|
async def bad_request(error):
|
|
return jsonify({
|
|
'error': 'bad_request',
|
|
'message': 'Invalid request'
|
|
}), 400
|
|
|
|
@app.errorhandler(401)
|
|
async def unauthorized(error):
|
|
return jsonify({
|
|
'error': 'unauthorized',
|
|
'message': 'Authentication required'
|
|
}), 401
|
|
|
|
@app.errorhandler(403)
|
|
async def forbidden(error):
|
|
return jsonify({
|
|
'error': 'forbidden',
|
|
'message': 'Access denied'
|
|
}), 403
|
|
|
|
@app.errorhandler(404)
|
|
async def not_found(error):
|
|
return jsonify({
|
|
'error': 'not_found',
|
|
'message': 'Resource not found'
|
|
}), 404
|
|
|
|
@app.errorhandler(413)
|
|
async def payload_too_large(error):
|
|
return jsonify({
|
|
'error': 'file_too_large',
|
|
'message': f'File size exceeds {server_config.MAX_UPLOAD_SIZE_MB}MB limit'
|
|
}), 413
|
|
|
|
@app.errorhandler(500)
|
|
async def internal_error(error):
|
|
logger.error(f"Internal server error: {error}", exc_info=True)
|
|
return jsonify({
|
|
'error': 'internal_error',
|
|
'message': 'Internal server error'
|
|
}), 500
|
|
|
|
return app
|
|
|
|
async def periodic_cleanup(job_manager: JobManager):
|
|
"""
|
|
Periodic cleanup task for expired jobs and files
|
|
|
|
Args:
|
|
job_manager: JobManager instance
|
|
"""
|
|
while True:
|
|
try:
|
|
await asyncio.sleep(3600) # Run every hour
|
|
|
|
logger.info("Starting periodic cleanup...")
|
|
cleaned_count = await job_manager.cleanup_expired_jobs()
|
|
|
|
if cleaned_count > 0:
|
|
logger.info(f"Periodic cleanup completed: {cleaned_count} items removed")
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("Periodic cleanup task cancelled")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Periodic cleanup error: {e}")
|
|
|
|
def setup_signal_handlers(app: Quart):
|
|
"""
|
|
Set up signal handlers for graceful shutdown
|
|
|
|
Args:
|
|
app: Quart application instance
|
|
"""
|
|
def signal_handler(signum, frame):
|
|
logger.info(f"Received signal {signum}, shutting down...")
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
if __name__ == '__main__':
|
|
# Create application
|
|
app = create_app()
|
|
|
|
# Set up signal handlers
|
|
setup_signal_handlers(app)
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO if not server_config.DEBUG else logging.DEBUG,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
# Run with Hypercorn
|
|
import hypercorn.asyncio
|
|
from hypercorn import Config
|
|
|
|
config = Config()
|
|
config.bind = [f"{server_config.HOST}:{server_config.PORT}"]
|
|
config.workers = server_config.WORKERS
|
|
config.use_reloader = server_config.DEBUG
|
|
config.accesslog = "-" # Log to stdout
|
|
|
|
logger.info(f"Starting server on {server_config.HOST}:{server_config.PORT}")
|
|
logger.info(f"Development mode: {server_config.DEV_MODE}")
|
|
logger.info(f"Workers: {server_config.WORKERS}")
|
|
|
|
try:
|
|
hypercorn.asyncio.serve(app, config)
|
|
except KeyboardInterrupt:
|
|
logger.info("Server stopped by user")
|
|
except Exception as e:
|
|
logger.error(f"Server failed to start: {e}", exc_info=True)
|
|
sys.exit(1) |