semblance_backup/backend/run.py
2025-12-19 19:26:16 +00:00

305 lines
No EOL
12 KiB
Python
Executable file

import os
import subprocess
import sys
import tempfile
import asyncio
import signal
import threading
# Set up temp directories FIRST, before any imports that might use temp files
def setup_early_temp_directories():
"""Set up temp directories before Quart imports."""
backend_dir = os.path.dirname(os.path.abspath(__file__))
temp_dir = os.path.join(backend_dir, 'temp')
try:
os.makedirs(temp_dir, exist_ok=True)
os.chmod(temp_dir, 0o755)
# Test write permissions
test_file = os.path.join(temp_dir, 'test_write_early')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
# Set environment variables BEFORE any imports that use tempfile
os.environ['TMPDIR'] = temp_dir
os.environ['TEMP'] = temp_dir
os.environ['TMP'] = temp_dir
tempfile.tempdir = temp_dir
print(f"✓ Early temp directory setup: {temp_dir}")
return temp_dir
except Exception as e:
print(f"Warning: Early temp directory setup failed: {e}")
return None
# Set up temp directories before importing app
setup_early_temp_directories()
from app import create_app
from app.models.user import User
# Create the ASGI app (which wraps Quart + SocketIO)
asgi_app = create_app()
# Extract the Quart app from the ASGI wrapper
quart_app = asgi_app.quart_app
# Initialize database on startup
async def initialize_database():
# Create default user if it doesn't exist
await User.create_default_user()
# Use the ASGI app for the server
app = asgi_app
async def startup():
"""Initialize the application on startup."""
await initialize_database()
# Signal handlers are now managed within the asyncio event loop for proper async shutdown
async def run_server():
"""Run the server with enhanced shutdown pattern and diagnostics."""
import hypercorn.asyncio
from hypercorn import Config
import faulthandler
import threading
# Enable fault handler for debugging
faulthandler.register(signal.SIGUSR1 if hasattr(signal, 'SIGUSR1') else signal.SIGBREAK)
print("Starting Quart + SocketIO app with hypercorn ASGI server...")
print("📡 WebSocket functionality enabled")
print("🤖 AI Runner active for autonomous conversations")
print("⚡ All operations async and non-blocking")
print("🛑 Use Ctrl-C for graceful shutdown")
print("🔍 Debug: Send SIGUSR1 for stack dump if it hangs")
print("Started Semblance back end service")
# Create hypercorn config with debug settings
config = Config()
config.bind = ["0.0.0.0:5137"]
config.debug = False
config.use_reloader = False
config.loglevel = "info" # Enable more logging
config.lifespan = "on" # Ensure lifespan is enabled
config.startup_timeout = 60
config.shutdown_timeout = 5 # Shorter for faster shutdown
config.graceful_timeout = 3 # Shorter for faster shutdown
config.keep_alive_timeout = 2 # Cut lingering connections faster
# Add startup tasks to Quart app
@quart_app.before_serving
async def startup_task():
await startup()
# Add shutdown tasks to Quart app with timeout protection
@quart_app.after_serving
async def shutdown_task():
print("🛑 Quart app shutting down...")
# 1. Shutdown SocketIO first to stop WebSocket tasks
try:
print("🔌 Shutting down SocketIO...")
from app.extensions import socketio_server
# First disconnect all clients
print("🔌 Disconnecting all SocketIO clients...")
try:
# Get all sessions and disconnect them
for sid in list(socketio_server.manager.get_participants('/', None) or []):
try:
await socketio_server.disconnect(sid, namespace='/')
except Exception as disconnect_err:
print(f"Error disconnecting {sid}: {disconnect_err}")
print("✅ All SocketIO clients disconnected")
except Exception as disconnect_err:
print(f"Error during client disconnection: {disconnect_err}")
# Then shutdown the server
try:
if hasattr(socketio_server, 'shutdown'):
await socketio_server.shutdown()
print("✅ SocketIO server shutdown complete")
else:
# Try shutting down the underlying engine.io server
if hasattr(socketio_server, 'eio') and hasattr(socketio_server.eio, 'shutdown'):
await socketio_server.eio.shutdown()
print("✅ EngineIO shutdown complete")
else:
print("⚠️ No shutdown method available - relying on task cancellation")
except Exception as shutdown_err:
print(f"SocketIO shutdown error: {shutdown_err}")
except Exception as e:
print(f"⚠️ Overall SocketIO shutdown error: {e}")
# 2. Shutdown AI Runner
try:
await asyncio.wait_for(
asyncio.to_thread(shutdown_ai_runner_safe),
timeout=3.0 # Shorter timeout
)
print("✅ AI Runner shutdown complete")
except asyncio.TimeoutError:
print("⏱️ AI Runner shutdown timed out; continuing")
except Exception as e:
print(f"⚠️ AI Runner shutdown error: {e}")
# 3. Close database connections
try:
print("🗄️ Closing database connections...")
await close_database_connections()
print("✅ Database connections closed")
except Exception as e:
print(f"⚠️ Database close error: {e}")
# 4. Cancel any remaining engineio/socketio tasks
await cancel_socketio_tasks()
def shutdown_ai_runner_safe():
"""Safe AI Runner shutdown that doesn't block the main event loop."""
try:
from app.services.ai_runner_service import shutdown_ai_runner
shutdown_ai_runner()
except Exception as e:
print(f"Error in AI Runner shutdown: {e}")
async def close_database_connections():
"""Close all database connections to stop PyMongo background threads."""
try:
# Close the global Motor client singleton
from app.db import close_db_connections
await asyncio.to_thread(close_db_connections)
except Exception as e:
print(f"Database connection close error: {e}")
async def cancel_socketio_tasks():
"""Cancel any remaining SocketIO/EngineIO tasks."""
try:
print("🧹 Cancelling remaining SocketIO tasks...")
cancelled_count = 0
for task in list(asyncio.all_tasks()):
if task.done() or task is asyncio.current_task():
continue
# Check if this is a SocketIO/EngineIO related task
try:
coro = task.get_coro()
module = getattr(coro, '__module__', '')
qualname = getattr(coro, '__qualname__', '')
full_name = f'{module}.{qualname}'
socketio_patterns = [
'engineio.async_socket',
'engineio.async_server',
'socketio.async_server',
'AsyncSocket._websocket_handler',
'AsyncSocket._send_ping',
'AsyncServer._service_task'
]
if any(pattern in full_name for pattern in socketio_patterns):
task.cancel()
cancelled_count += 1
except Exception:
pass # Ignore task inspection errors
if cancelled_count > 0:
print(f"🧹 Cancelled {cancelled_count} SocketIO tasks")
# Give cancellations time to complete
await asyncio.sleep(0.5)
except Exception as e:
print(f"Task cancellation error: {e}")
# Create shutdown event for Hypercorn
shutdown_event = asyncio.Event()
_second_signal = False # For double Ctrl-C force exit
def _raise_shutdown():
"""Signal handler with force-exit on second Ctrl-C."""
nonlocal _second_signal
if _second_signal:
print("🔥 Second Ctrl-C - forcing exit!")
import os
os._exit(1)
_second_signal = True
print("\\n🛑 Shutdown signal received...")
print("📊 Active threads:", [t.name for t in threading.enumerate()])
shutdown_event.set()
# Watchdog to ensure serve() returns
async def shutdown_watchdog():
await asyncio.sleep(4) # Wait 4 seconds (should be enough with enhanced cleanup)
if not shutdown_event.is_set():
return # Already completed
print("⏱️ Shutdown taking too long - dumping diagnostics...")
# Dump asyncio tasks
print("\\n=== Active asyncio tasks ===")
for task in asyncio.all_tasks():
if not task.done():
print(f"- {task.get_name()}: {task}")
# Dump threads
print("\\n=== Active threads ===")
for thread in threading.enumerate():
print(f"- {thread.name}: daemon={thread.daemon}, alive={thread.is_alive()}")
print("🔥 Use second Ctrl-C to force exit if needed")
asyncio.create_task(shutdown_watchdog(), name="shutdown-watchdog")
# Register signal handlers
loop = asyncio.get_running_loop()
try:
loop.add_signal_handler(signal.SIGINT, _raise_shutdown)
loop.add_signal_handler(signal.SIGTERM, _raise_shutdown)
except NotImplementedError:
# Windows fallback
signal.signal(signal.SIGINT, lambda *_: loop.call_soon_threadsafe(_raise_shutdown))
signal.signal(signal.SIGTERM, lambda *_: loop.call_soon_threadsafe(_raise_shutdown))
# Let Hypercorn handle shutdown gracefully with the trigger
print("🔍 Debug: Starting Hypercorn serve with shutdown_trigger...")
await hypercorn.asyncio.serve(
asgi_app,
config,
shutdown_trigger=shutdown_event.wait,
)
print("🛑 Hypercorn server stopped")
if __name__ == '__main__':
try:
# Check if hypercorn is available
try:
import hypercorn.asyncio
from hypercorn import Config
except ImportError as e:
print("❌ Error: hypercorn is required for WebSocket functionality!")
print("Please install hypercorn: pip install hypercorn")
print(f"ImportError: {e}")
sys.exit(1)
# Run the server with proper shutdown handling
asyncio.run(run_server())
except KeyboardInterrupt:
print("\\n🛑 Keyboard interrupt - shutting down...")
except Exception as e:
print(f"❌ Unexpected error: {e}")
try:
from app.services.ai_runner_service import shutdown_ai_runner
shutdown_ai_runner()
except:
pass
sys.exit(1)
print("👋 Server stopped")