376 lines
No EOL
14 KiB
Python
Executable file
376 lines
No EOL
14 KiB
Python
Executable file
"""
|
|
AI Runner Service
|
|
|
|
Provides a single dedicated thread with an asyncio event loop for all AI conversations.
|
|
This fixes Motor loop affinity issues and improves scalability by avoiding one-thread-per-conversation.
|
|
|
|
Based on GPT-5 recommendations for clean async/threading architecture.
|
|
"""
|
|
|
|
import asyncio
|
|
import threading
|
|
import logging
|
|
from typing import Dict, Any, Optional, Callable, Awaitable
|
|
from datetime import datetime
|
|
from concurrent.futures import Future
|
|
import weakref
|
|
|
|
from app.db import get_db
|
|
from motor.motor_asyncio import AsyncIOMotorClient
|
|
|
|
|
|
class AIRunnerService:
|
|
"""Singleton service that runs all AI conversations in a dedicated thread with single event loop."""
|
|
|
|
_instance: Optional['AIRunnerService'] = None
|
|
_lock = threading.Lock()
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
with cls._lock:
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
if hasattr(self, '_initialized'):
|
|
return
|
|
|
|
self.logger = logging.getLogger(__name__)
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
self._running = False
|
|
self._stopping = False
|
|
|
|
# Task registry for tracking and cancellation
|
|
self._active_conversations: Dict[str, asyncio.Task] = {} # focus_group_id -> Task
|
|
self._task_registry_lock = asyncio.Lock() # Will be created on the AI loop
|
|
|
|
# Database client for AI operations (will be created on AI loop)
|
|
self._db_client: Optional[AsyncIOMotorClient] = None
|
|
self._db = None
|
|
|
|
self._initialized = True
|
|
|
|
def start(self) -> None:
|
|
"""Start the AI runner thread and event loop."""
|
|
if self._running:
|
|
self.logger.warning("AI Runner already running")
|
|
return
|
|
|
|
self.logger.info("Starting AI Runner service...")
|
|
self._stopping = False
|
|
self._thread = threading.Thread(target=self._run_event_loop, daemon=True)
|
|
self._thread.start()
|
|
|
|
# Wait for loop to be ready
|
|
while self._loop is None and not self._stopping:
|
|
threading.Event().wait(0.01)
|
|
|
|
if self._loop:
|
|
self.logger.info("AI Runner service started successfully")
|
|
else:
|
|
self.logger.error("Failed to start AI Runner service")
|
|
|
|
def stop(self) -> None:
|
|
"""Stop the AI runner service gracefully (idempotent)."""
|
|
self.logger.info("Stopping AI Runner service...")
|
|
self._stopping = True
|
|
|
|
# Get references (they might change during shutdown)
|
|
thread = self._thread
|
|
loop = self._loop
|
|
|
|
if loop is not None:
|
|
# Cancel all active conversations
|
|
try:
|
|
future = asyncio.run_coroutine_threadsafe(self._cancel_all_conversations(), loop)
|
|
future.result(timeout=3.0)
|
|
self.logger.info("All AI conversations cancelled")
|
|
except Exception as e:
|
|
self.logger.warning(f"Error cancelling conversations (continuing shutdown): {e}")
|
|
|
|
# Stop the event loop
|
|
try:
|
|
loop.call_soon_threadsafe(loop.stop)
|
|
self.logger.info("AI Runner event loop stop requested")
|
|
except Exception as e:
|
|
self.logger.warning(f"Error stopping event loop (continuing): {e}")
|
|
|
|
# Always try to join thread if it exists
|
|
if thread and thread.is_alive():
|
|
self.logger.info("Waiting for AI Runner thread to finish...")
|
|
thread.join(timeout=5.0)
|
|
if thread.is_alive():
|
|
self.logger.warning("AI Runner thread did not shut down gracefully")
|
|
else:
|
|
self.logger.info("AI Runner thread joined successfully")
|
|
|
|
self._running = False
|
|
self.logger.info("AI Runner service shutdown complete")
|
|
|
|
def _run_event_loop(self) -> None:
|
|
"""Main thread function that runs the asyncio event loop."""
|
|
try:
|
|
# Create new event loop for this thread
|
|
self._loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self._loop)
|
|
|
|
# Initialize async resources on this loop
|
|
self._loop.run_until_complete(self._initialize_async_resources())
|
|
|
|
self._running = True
|
|
self.logger.info("AI Runner event loop started")
|
|
|
|
# Run the event loop
|
|
self._loop.run_forever()
|
|
|
|
# Cleanup phase after loop.stop()
|
|
self.logger.info("AI Runner event loop stopped, cleaning up...")
|
|
self._loop.run_until_complete(self._cleanup_async_resources())
|
|
|
|
# Cancel any remaining tasks
|
|
pending = [t for t in asyncio.all_tasks(loop=self._loop) if t is not asyncio.current_task(self._loop)]
|
|
for t in pending:
|
|
t.cancel()
|
|
if pending:
|
|
self._loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
|
self.logger.info(f"Cancelled {len(pending)} remaining tasks")
|
|
|
|
# Shutdown async generators and default executor (Python 3.9+)
|
|
try:
|
|
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
|
|
except Exception as e:
|
|
self.logger.debug(f"Error shutting down async generators: {e}")
|
|
|
|
try:
|
|
self._loop.run_until_complete(self._loop.shutdown_default_executor())
|
|
except Exception as e:
|
|
self.logger.debug(f"Error shutting down default executor: {e}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"AI Runner event loop error: {e}")
|
|
finally:
|
|
# Close the loop
|
|
try:
|
|
if self._loop:
|
|
self._loop.close()
|
|
self.logger.info("AI Runner event loop closed")
|
|
except Exception as e:
|
|
self.logger.debug(f"Error closing loop: {e}")
|
|
|
|
self._running = False
|
|
self.logger.info("AI Runner thread cleanup complete")
|
|
|
|
async def _initialize_async_resources(self) -> None:
|
|
"""Initialize async resources (database, HTTP clients) on the AI loop."""
|
|
try:
|
|
# Initialize database connection
|
|
self._db = await get_db()
|
|
self.logger.info("AI Runner: Database connection initialized")
|
|
|
|
# Create task registry lock on this loop
|
|
self._task_registry_lock = asyncio.Lock()
|
|
|
|
self.logger.info("AI Runner: Async resources initialized")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to initialize AI Runner async resources: {e}")
|
|
raise
|
|
|
|
async def _cleanup_async_resources(self) -> None:
|
|
"""Cleanup async resources."""
|
|
try:
|
|
# Cancel any remaining tasks
|
|
await self._cancel_all_conversations()
|
|
|
|
# Close database connections
|
|
if self._db_client:
|
|
self._db_client.close()
|
|
|
|
self.logger.info("AI Runner: Async resources cleaned up")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error cleaning up AI Runner resources: {e}")
|
|
|
|
async def _cancel_all_conversations(self) -> None:
|
|
"""Cancel all active conversation tasks."""
|
|
if not self._task_registry_lock:
|
|
return
|
|
|
|
async with self._task_registry_lock:
|
|
tasks_to_cancel = list(self._active_conversations.values())
|
|
self._active_conversations.clear()
|
|
|
|
if tasks_to_cancel:
|
|
self.logger.info(f"Cancelling {len(tasks_to_cancel)} active conversations")
|
|
for task in tasks_to_cancel:
|
|
if not task.done():
|
|
task.cancel()
|
|
|
|
# Wait for cancellation to complete
|
|
if tasks_to_cancel:
|
|
try:
|
|
await asyncio.gather(*tasks_to_cancel, return_exceptions=True)
|
|
except Exception as e:
|
|
self.logger.debug(f"Expected cancellation errors: {e}")
|
|
|
|
def submit_conversation(self, focus_group_id: str, coro: Awaitable[Any]) -> Future:
|
|
"""
|
|
Submit a conversation coroutine to run on the AI event loop.
|
|
|
|
Args:
|
|
focus_group_id: The focus group ID for tracking
|
|
coro: The coroutine to execute
|
|
|
|
Returns:
|
|
Future that will contain the result
|
|
"""
|
|
if not self._running or not self._loop:
|
|
raise RuntimeError("AI Runner is not running")
|
|
|
|
# Use run_coroutine_threadsafe to schedule on the AI loop
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
self._run_conversation_with_tracking(focus_group_id, coro),
|
|
self._loop
|
|
)
|
|
|
|
return future
|
|
|
|
async def _run_conversation_with_tracking(self, focus_group_id: str, coro: Awaitable[Any]) -> Any:
|
|
"""Run a conversation coroutine with proper task tracking."""
|
|
# Create task for this conversation
|
|
task = asyncio.create_task(coro)
|
|
|
|
# Register the task
|
|
async with self._task_registry_lock:
|
|
# Cancel existing conversation for this focus group if any
|
|
existing_task = self._active_conversations.get(focus_group_id)
|
|
if existing_task and not existing_task.done():
|
|
self.logger.info(f"Cancelling existing conversation for focus group {focus_group_id}")
|
|
existing_task.cancel()
|
|
try:
|
|
await existing_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
self._active_conversations[focus_group_id] = task
|
|
|
|
try:
|
|
# Run the conversation
|
|
self.logger.info(f"Starting AI conversation for focus group {focus_group_id}")
|
|
result = await task
|
|
self.logger.info(f"AI conversation completed for focus group {focus_group_id}")
|
|
return result
|
|
|
|
except asyncio.CancelledError:
|
|
self.logger.info(f"AI conversation cancelled for focus group {focus_group_id}")
|
|
raise
|
|
except Exception as e:
|
|
self.logger.error(f"AI conversation error for focus group {focus_group_id}: {e}")
|
|
raise
|
|
finally:
|
|
# Unregister the task
|
|
async with self._task_registry_lock:
|
|
if self._active_conversations.get(focus_group_id) is task:
|
|
del self._active_conversations[focus_group_id]
|
|
|
|
def stop_conversation(self, focus_group_id: str) -> bool:
|
|
"""
|
|
Stop a specific conversation.
|
|
|
|
Args:
|
|
focus_group_id: The focus group ID to stop
|
|
|
|
Returns:
|
|
True if conversation was found and cancelled, False otherwise
|
|
"""
|
|
if not self._running or not self._loop:
|
|
return False
|
|
|
|
# Schedule cancellation on the AI loop
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
self._cancel_conversation(focus_group_id),
|
|
self._loop
|
|
)
|
|
|
|
try:
|
|
return future.result(timeout=5.0)
|
|
except Exception as e:
|
|
self.logger.error(f"Error stopping conversation {focus_group_id}: {e}")
|
|
return False
|
|
|
|
async def _cancel_conversation(self, focus_group_id: str) -> bool:
|
|
"""Cancel a specific conversation task."""
|
|
async with self._task_registry_lock:
|
|
task = self._active_conversations.get(focus_group_id)
|
|
if task and not task.done():
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
return True
|
|
return False
|
|
|
|
def get_active_conversations(self) -> Dict[str, Dict[str, Any]]:
|
|
"""Get information about active conversations."""
|
|
if not self._running or not self._loop:
|
|
return {}
|
|
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
self._get_conversation_info(),
|
|
self._loop
|
|
)
|
|
|
|
try:
|
|
return future.result(timeout=2.0)
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting conversation info: {e}")
|
|
return {}
|
|
|
|
async def _get_conversation_info(self) -> Dict[str, Dict[str, Any]]:
|
|
"""Get conversation information from the AI loop."""
|
|
async with self._task_registry_lock:
|
|
info = {}
|
|
for focus_group_id, task in self._active_conversations.items():
|
|
info[focus_group_id] = {
|
|
'status': 'running' if not task.done() else 'completed',
|
|
'cancelled': task.cancelled() if task.done() else False,
|
|
'exception': str(task.exception()) if task.done() and task.exception() else None
|
|
}
|
|
return info
|
|
|
|
@property
|
|
def is_running(self) -> bool:
|
|
"""Check if the AI Runner is running."""
|
|
return self._running
|
|
|
|
@property
|
|
def active_conversation_count(self) -> int:
|
|
"""Get count of active conversations."""
|
|
return len(self._active_conversations) if self._active_conversations else 0
|
|
|
|
|
|
# Global AI Runner instance
|
|
_ai_runner: Optional[AIRunnerService] = None
|
|
|
|
def get_ai_runner() -> AIRunnerService:
|
|
"""Get the global AI Runner instance."""
|
|
global _ai_runner
|
|
if _ai_runner is None:
|
|
_ai_runner = AIRunnerService()
|
|
return _ai_runner
|
|
|
|
def init_ai_runner() -> None:
|
|
"""Initialize and start the AI Runner service."""
|
|
ai_runner = get_ai_runner()
|
|
if not ai_runner.is_running:
|
|
ai_runner.start()
|
|
|
|
def shutdown_ai_runner() -> None:
|
|
"""Shutdown the AI Runner service."""
|
|
global _ai_runner
|
|
if _ai_runner and _ai_runner.is_running:
|
|
_ai_runner.stop()
|
|
_ai_runner = None |