brief-extractor/backend/server/app.py
2026-03-06 18:42:46 +00:00

386 lines
No EOL
12 KiB
Python
Executable file

"""
Main Quart application for Brief Extractor GUI server
"""
import asyncio
import json
import logging
import os
import signal
import sys
from datetime import datetime
from typing import Optional, List
from quart import Quart, websocket, request, jsonify
from quart_cors import cors
import structlog
from .config_runtime import server_config
from .auth import msal_auth
from .jobs import JobManager
from .ws import ws_manager, WebSocketManager
from .runners.job_runner import start_background_workers, stop_background_workers
# Import API blueprints
from .api.auth import auth_bp
from .api.config import config_bp
from .api.jobs import jobs_bp
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="ISO"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger(__name__)
# Global worker tasks
background_workers: List[asyncio.Task] = []
def create_app() -> Quart:
"""
Create and configure the Quart application
Returns:
Configured Quart application
"""
app = Quart(__name__)
# Configure CORS
cors_config = server_config.get_cors_config()
cors(app, **cors_config)
# Configure app
app.config.update({
'MAX_CONTENT_LENGTH': server_config.MAX_CONTENT_LENGTH,
'SECRET_KEY': server_config.SESSION_SECRET,
'SECURE_COOKIES': server_config.SECURE_COOKIES,
'HTTPS_ONLY': server_config.HTTPS_ONLY
})
# Ensure required directories exist
server_config.ensure_directories()
# Initialize components
job_manager = JobManager.get_instance()
# Register blueprints
app.register_blueprint(auth_bp)
app.register_blueprint(config_bp)
app.register_blueprint(jobs_bp)
@app.before_serving
async def startup():
"""Initialize services on app startup"""
try:
logger.info("Starting Brief Extractor GUI server...")
# Start WebSocket background tasks
await ws_manager.start_background_tasks()
# Start job processing workers
global background_workers
background_workers = await start_background_workers(
job_manager,
ws_manager,
num_workers=server_config.MAX_CONCURRENT_JOBS
)
# Schedule periodic cleanup
cleanup_task = asyncio.create_task(periodic_cleanup(job_manager))
background_workers.append(cleanup_task)
logger.info("Server startup completed successfully")
except Exception as e:
logger.error(f"Failed to start server: {e}", exc_info=True)
raise
@app.after_serving
async def shutdown():
"""Cleanup on app shutdown"""
try:
logger.info("Shutting down Brief Extractor GUI server...")
# Stop background workers
global background_workers
if background_workers:
await stop_background_workers(background_workers)
# Stop WebSocket tasks
await ws_manager.stop_background_tasks()
logger.info("Server shutdown completed")
except Exception as e:
logger.error(f"Error during shutdown: {e}", exc_info=True)
# Add signal handling to the app
import signal
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, shutting down...")
# Set shutdown flag for Hypercorn
import asyncio
try:
# Cancel all background tasks
global background_workers
if background_workers:
for worker in background_workers:
if not worker.done():
worker.cancel()
# Get the event loop and schedule shutdown
loop = asyncio.get_event_loop()
if loop.is_running():
loop.stop()
except Exception as e:
logger.error(f"Error in signal handler: {e}")
# Force exit if needed
os._exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
@app.route('/')
async def root():
"""Root endpoint - health check"""
return jsonify({
'service': 'Brief Extractor GUI',
'status': 'running',
'timestamp': datetime.utcnow().isoformat(),
'version': '1.0.0',
'devMode': server_config.DEV_MODE
})
@app.route('/health')
async def health():
"""Health check endpoint"""
try:
# Check queue status
queue_size = await job_manager.get_queue_size()
active_jobs = await job_manager.get_active_jobs_count()
# Check WebSocket connections
ws_stats = await ws_manager.get_connection_stats()
return jsonify({
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'queue': {
'pending': queue_size,
'active': active_jobs
},
'websockets': ws_stats,
'config': {
'devMode': server_config.DEV_MODE,
'maxConcurrentJobs': server_config.MAX_CONCURRENT_JOBS,
'maxUploadSize': f"{server_config.MAX_UPLOAD_SIZE_MB}MB"
}
})
except Exception as e:
logger.error(f"Health check failed: {e}")
return jsonify({
'status': 'unhealthy',
'error': str(e),
'timestamp': datetime.utcnow().isoformat()
}), 500
@app.websocket('/ws')
async def websocket_handler():
"""WebSocket endpoint for real-time updates"""
client = None
try:
# Extract user info from WebSocket headers or query params
user_id = None
if server_config.DEV_MODE:
user_id = 'dev-user-id'
else:
# Try to get auth from headers (won't work in browsers)
auth_header = websocket.headers.get('Authorization')
if auth_header:
token = auth_header.replace('Bearer ', '')
user_info = await msal_auth.validate_token(token)
if user_info:
user_id = user_info['oid']
# If no auth header, try to get user from query params
# Frontend should pass the access token as a query parameter
if not user_id:
token = websocket.args.get('token')
if token:
user_info = await msal_auth.validate_token(token)
if user_info:
user_id = user_info['oid']
logger.info(f"WebSocket authenticated via query param for user: {user_id}")
# If still no user_id, reject connection
if not user_id:
logger.warning("WebSocket connection rejected - no valid authentication")
await websocket.send(json.dumps({'error': 'unauthorized', 'message': 'Authentication required'}))
return
# Register WebSocket client
client = await ws_manager.register_client(user_id)
# Send initial queue snapshot
jobs_data = job_manager.serialize_all()
await ws_manager.send_queue_snapshot(client, jobs_data)
# Keep connection alive and handle incoming messages
while True:
try:
message = await websocket.receive()
if message:
data = json.loads(message)
# Handle ping messages
if data.get('type') == 'ping':
await client.send({'type': 'pong'})
# Handle other message types as needed
# For now, we don't need to handle any incoming messages
except Exception as e:
logger.warning(f"WebSocket message error: {e}")
break
except Exception as e:
logger.error(f"WebSocket error: {e}")
finally:
if client:
await ws_manager.unregister_client(client.client_id)
@app.errorhandler(400)
async def bad_request(error):
return jsonify({
'error': 'bad_request',
'message': 'Invalid request'
}), 400
@app.errorhandler(401)
async def unauthorized(error):
return jsonify({
'error': 'unauthorized',
'message': 'Authentication required'
}), 401
@app.errorhandler(403)
async def forbidden(error):
return jsonify({
'error': 'forbidden',
'message': 'Access denied'
}), 403
@app.errorhandler(404)
async def not_found(error):
return jsonify({
'error': 'not_found',
'message': 'Resource not found'
}), 404
@app.errorhandler(413)
async def payload_too_large(error):
return jsonify({
'error': 'file_too_large',
'message': f'File size exceeds {server_config.MAX_UPLOAD_SIZE_MB}MB limit'
}), 413
@app.errorhandler(500)
async def internal_error(error):
logger.error(f"Internal server error: {error}", exc_info=True)
return jsonify({
'error': 'internal_error',
'message': 'Internal server error'
}), 500
return app
async def periodic_cleanup(job_manager: JobManager):
"""
Periodic cleanup task for expired jobs and files
Args:
job_manager: JobManager instance
"""
while True:
try:
await asyncio.sleep(3600) # Run every hour
logger.info("Starting periodic cleanup...")
cleaned_count = await job_manager.cleanup_expired_jobs()
if cleaned_count > 0:
logger.info(f"Periodic cleanup completed: {cleaned_count} items removed")
except asyncio.CancelledError:
logger.info("Periodic cleanup task cancelled")
break
except Exception as e:
logger.error(f"Periodic cleanup error: {e}")
def setup_signal_handlers(app: Quart):
"""
Set up signal handlers for graceful shutdown
Args:
app: Quart application instance
"""
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, shutting down...")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if __name__ == '__main__':
# Create application
app = create_app()
# Set up signal handlers
setup_signal_handlers(app)
# Configure logging
logging.basicConfig(
level=logging.INFO if not server_config.DEBUG else logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Run with Hypercorn
import hypercorn.asyncio
from hypercorn import Config
config = Config()
config.bind = [f"{server_config.HOST}:{server_config.PORT}"]
config.workers = server_config.WORKERS
config.use_reloader = server_config.DEBUG
config.accesslog = "-" # Log to stdout
logger.info(f"Starting server on {server_config.HOST}:{server_config.PORT}")
logger.info(f"Development mode: {server_config.DEV_MODE}")
logger.info(f"Workers: {server_config.WORKERS}")
try:
hypercorn.asyncio.serve(app, config)
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Server failed to start: {e}", exc_info=True)
sys.exit(1)