304 lines
No EOL
12 KiB
Python
Executable file
304 lines
No EOL
12 KiB
Python
Executable file
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import asyncio
|
|
import signal
|
|
import threading
|
|
|
|
# Set up temp directories FIRST, before any imports that might use temp files
|
|
def setup_early_temp_directories():
|
|
"""Set up temp directories before Quart imports."""
|
|
backend_dir = os.path.dirname(os.path.abspath(__file__))
|
|
temp_dir = os.path.join(backend_dir, 'temp')
|
|
|
|
try:
|
|
os.makedirs(temp_dir, exist_ok=True)
|
|
os.chmod(temp_dir, 0o755)
|
|
|
|
# Test write permissions
|
|
test_file = os.path.join(temp_dir, 'test_write_early')
|
|
with open(test_file, 'w') as f:
|
|
f.write('test')
|
|
os.remove(test_file)
|
|
|
|
# Set environment variables BEFORE any imports that use tempfile
|
|
os.environ['TMPDIR'] = temp_dir
|
|
os.environ['TEMP'] = temp_dir
|
|
os.environ['TMP'] = temp_dir
|
|
tempfile.tempdir = temp_dir
|
|
|
|
print(f"✓ Early temp directory setup: {temp_dir}")
|
|
return temp_dir
|
|
except Exception as e:
|
|
print(f"Warning: Early temp directory setup failed: {e}")
|
|
return None
|
|
|
|
# Set up temp directories before importing app
|
|
setup_early_temp_directories()
|
|
|
|
from app import create_app
|
|
from app.models.user import User
|
|
|
|
# Create the ASGI app (which wraps Quart + SocketIO)
|
|
asgi_app = create_app()
|
|
# Extract the Quart app from the ASGI wrapper
|
|
quart_app = asgi_app.quart_app
|
|
|
|
# Initialize database on startup
|
|
async def initialize_database():
|
|
pass # Default user creation removed (security: use populate_db.py script instead)
|
|
|
|
# Use the ASGI app for the server
|
|
app = asgi_app
|
|
|
|
async def startup():
|
|
"""Initialize the application on startup."""
|
|
await initialize_database()
|
|
|
|
# Signal handlers are now managed within the asyncio event loop for proper async shutdown
|
|
|
|
async def run_server():
|
|
"""Run the server with enhanced shutdown pattern and diagnostics."""
|
|
import hypercorn.asyncio
|
|
from hypercorn import Config
|
|
import faulthandler
|
|
import threading
|
|
|
|
# Enable fault handler for debugging
|
|
faulthandler.register(signal.SIGUSR1 if hasattr(signal, 'SIGUSR1') else signal.SIGBREAK)
|
|
|
|
print("Starting Quart + SocketIO app with hypercorn ASGI server...")
|
|
print("📡 WebSocket functionality enabled")
|
|
print("🤖 AI Runner active for autonomous conversations")
|
|
print("⚡ All operations async and non-blocking")
|
|
print("🛑 Use Ctrl-C for graceful shutdown")
|
|
print("🔍 Debug: Send SIGUSR1 for stack dump if it hangs")
|
|
print("Started Semblance back end service")
|
|
|
|
# Create hypercorn config with debug settings
|
|
config = Config()
|
|
config.bind = ["0.0.0.0:5137"]
|
|
config.debug = False
|
|
config.use_reloader = False
|
|
config.loglevel = "info" # Enable more logging
|
|
config.lifespan = "on" # Ensure lifespan is enabled
|
|
config.startup_timeout = 60
|
|
config.shutdown_timeout = 5 # Shorter for faster shutdown
|
|
config.graceful_timeout = 3 # Shorter for faster shutdown
|
|
config.keep_alive_timeout = 2 # Cut lingering connections faster
|
|
|
|
# Add startup tasks to Quart app
|
|
@quart_app.before_serving
|
|
async def startup_task():
|
|
await startup()
|
|
|
|
# Add shutdown tasks to Quart app with timeout protection
|
|
@quart_app.after_serving
|
|
async def shutdown_task():
|
|
print("🛑 Quart app shutting down...")
|
|
|
|
# 1. Shutdown SocketIO first to stop WebSocket tasks
|
|
try:
|
|
print("🔌 Shutting down SocketIO...")
|
|
from app.extensions import socketio_server
|
|
|
|
# First disconnect all clients
|
|
print("🔌 Disconnecting all SocketIO clients...")
|
|
try:
|
|
# Get all sessions and disconnect them
|
|
for sid in list(socketio_server.manager.get_participants('/', None) or []):
|
|
try:
|
|
await socketio_server.disconnect(sid, namespace='/')
|
|
except Exception as disconnect_err:
|
|
print(f"Error disconnecting {sid}: {disconnect_err}")
|
|
print("✅ All SocketIO clients disconnected")
|
|
except Exception as disconnect_err:
|
|
print(f"Error during client disconnection: {disconnect_err}")
|
|
|
|
# Then shutdown the server
|
|
try:
|
|
if hasattr(socketio_server, 'shutdown'):
|
|
await socketio_server.shutdown()
|
|
print("✅ SocketIO server shutdown complete")
|
|
else:
|
|
# Try shutting down the underlying engine.io server
|
|
if hasattr(socketio_server, 'eio') and hasattr(socketio_server.eio, 'shutdown'):
|
|
await socketio_server.eio.shutdown()
|
|
print("✅ EngineIO shutdown complete")
|
|
else:
|
|
print("⚠️ No shutdown method available - relying on task cancellation")
|
|
except Exception as shutdown_err:
|
|
print(f"SocketIO shutdown error: {shutdown_err}")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Overall SocketIO shutdown error: {e}")
|
|
|
|
# 2. Shutdown AI Runner
|
|
try:
|
|
await asyncio.wait_for(
|
|
asyncio.to_thread(shutdown_ai_runner_safe),
|
|
timeout=3.0 # Shorter timeout
|
|
)
|
|
print("✅ AI Runner shutdown complete")
|
|
except asyncio.TimeoutError:
|
|
print("⏱️ AI Runner shutdown timed out; continuing")
|
|
except Exception as e:
|
|
print(f"⚠️ AI Runner shutdown error: {e}")
|
|
|
|
# 3. Close database connections
|
|
try:
|
|
print("🗄️ Closing database connections...")
|
|
await close_database_connections()
|
|
print("✅ Database connections closed")
|
|
except Exception as e:
|
|
print(f"⚠️ Database close error: {e}")
|
|
|
|
# 4. Cancel any remaining engineio/socketio tasks
|
|
await cancel_socketio_tasks()
|
|
|
|
def shutdown_ai_runner_safe():
|
|
"""Safe AI Runner shutdown that doesn't block the main event loop."""
|
|
try:
|
|
from app.services.ai_runner_service import shutdown_ai_runner
|
|
shutdown_ai_runner()
|
|
except Exception as e:
|
|
print(f"Error in AI Runner shutdown: {e}")
|
|
|
|
async def close_database_connections():
|
|
"""Close all database connections to stop PyMongo background threads."""
|
|
try:
|
|
# Close the global Motor client singleton
|
|
from app.db import close_db_connections
|
|
await asyncio.to_thread(close_db_connections)
|
|
except Exception as e:
|
|
print(f"Database connection close error: {e}")
|
|
|
|
async def cancel_socketio_tasks():
|
|
"""Cancel any remaining SocketIO/EngineIO tasks."""
|
|
try:
|
|
print("🧹 Cancelling remaining SocketIO tasks...")
|
|
cancelled_count = 0
|
|
|
|
for task in list(asyncio.all_tasks()):
|
|
if task.done() or task is asyncio.current_task():
|
|
continue
|
|
|
|
# Check if this is a SocketIO/EngineIO related task
|
|
try:
|
|
coro = task.get_coro()
|
|
module = getattr(coro, '__module__', '')
|
|
qualname = getattr(coro, '__qualname__', '')
|
|
full_name = f'{module}.{qualname}'
|
|
|
|
socketio_patterns = [
|
|
'engineio.async_socket',
|
|
'engineio.async_server',
|
|
'socketio.async_server',
|
|
'AsyncSocket._websocket_handler',
|
|
'AsyncSocket._send_ping',
|
|
'AsyncServer._service_task'
|
|
]
|
|
|
|
if any(pattern in full_name for pattern in socketio_patterns):
|
|
task.cancel()
|
|
cancelled_count += 1
|
|
|
|
except Exception:
|
|
pass # Ignore task inspection errors
|
|
|
|
if cancelled_count > 0:
|
|
print(f"🧹 Cancelled {cancelled_count} SocketIO tasks")
|
|
# Give cancellations time to complete
|
|
await asyncio.sleep(0.5)
|
|
|
|
except Exception as e:
|
|
print(f"Task cancellation error: {e}")
|
|
|
|
# Create shutdown event for Hypercorn
|
|
shutdown_event = asyncio.Event()
|
|
_second_signal = False # For double Ctrl-C force exit
|
|
|
|
def _raise_shutdown():
|
|
"""Signal handler with force-exit on second Ctrl-C."""
|
|
nonlocal _second_signal
|
|
|
|
if _second_signal:
|
|
print("🔥 Second Ctrl-C - forcing exit!")
|
|
import os
|
|
os._exit(1)
|
|
|
|
_second_signal = True
|
|
print("\\n🛑 Shutdown signal received...")
|
|
print("📊 Active threads:", [t.name for t in threading.enumerate()])
|
|
shutdown_event.set()
|
|
|
|
# Watchdog to ensure serve() returns
|
|
async def shutdown_watchdog():
|
|
await asyncio.sleep(4) # Wait 4 seconds (should be enough with enhanced cleanup)
|
|
if not shutdown_event.is_set():
|
|
return # Already completed
|
|
print("⏱️ Shutdown taking too long - dumping diagnostics...")
|
|
|
|
# Dump asyncio tasks
|
|
print("\\n=== Active asyncio tasks ===")
|
|
for task in asyncio.all_tasks():
|
|
if not task.done():
|
|
print(f"- {task.get_name()}: {task}")
|
|
|
|
# Dump threads
|
|
print("\\n=== Active threads ===")
|
|
for thread in threading.enumerate():
|
|
print(f"- {thread.name}: daemon={thread.daemon}, alive={thread.is_alive()}")
|
|
|
|
print("🔥 Use second Ctrl-C to force exit if needed")
|
|
|
|
asyncio.create_task(shutdown_watchdog(), name="shutdown-watchdog")
|
|
|
|
# Register signal handlers
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
loop.add_signal_handler(signal.SIGINT, _raise_shutdown)
|
|
loop.add_signal_handler(signal.SIGTERM, _raise_shutdown)
|
|
except NotImplementedError:
|
|
# Windows fallback
|
|
signal.signal(signal.SIGINT, lambda *_: loop.call_soon_threadsafe(_raise_shutdown))
|
|
signal.signal(signal.SIGTERM, lambda *_: loop.call_soon_threadsafe(_raise_shutdown))
|
|
|
|
# Let Hypercorn handle shutdown gracefully with the trigger
|
|
print("🔍 Debug: Starting Hypercorn serve with shutdown_trigger...")
|
|
await hypercorn.asyncio.serve(
|
|
asgi_app,
|
|
config,
|
|
shutdown_trigger=shutdown_event.wait,
|
|
)
|
|
|
|
print("🛑 Hypercorn server stopped")
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
# Check if hypercorn is available
|
|
try:
|
|
import hypercorn.asyncio
|
|
from hypercorn import Config
|
|
except ImportError as e:
|
|
print("❌ Error: hypercorn is required for WebSocket functionality!")
|
|
print("Please install hypercorn: pip install hypercorn")
|
|
print(f"ImportError: {e}")
|
|
sys.exit(1)
|
|
|
|
# Run the server with proper shutdown handling
|
|
asyncio.run(run_server())
|
|
|
|
except KeyboardInterrupt:
|
|
print("\\n🛑 Keyboard interrupt - shutting down...")
|
|
except Exception as e:
|
|
print(f"❌ Unexpected error: {e}")
|
|
try:
|
|
from app.services.ai_runner_service import shutdown_ai_runner
|
|
shutdown_ai_runner()
|
|
except:
|
|
pass
|
|
sys.exit(1)
|
|
|
|
print("👋 Server stopped") |