semblance-dev/backend/app/services/ai_runner_service.py
2025-12-19 19:26:16 +00:00

376 lines
No EOL
14 KiB
Python
Executable file

"""
AI Runner Service
Provides a single dedicated thread with an asyncio event loop for all AI conversations.
This fixes Motor loop affinity issues and improves scalability by avoiding one-thread-per-conversation.
Based on GPT-5 recommendations for clean async/threading architecture.
"""
import asyncio
import threading
import logging
from typing import Dict, Any, Optional, Callable, Awaitable
from datetime import datetime
from concurrent.futures import Future
import weakref
from app.db import get_db
from motor.motor_asyncio import AsyncIOMotorClient
class AIRunnerService:
"""Singleton service that runs all AI conversations in a dedicated thread with single event loop."""
_instance: Optional['AIRunnerService'] = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
self.logger = logging.getLogger(__name__)
self._thread: Optional[threading.Thread] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._running = False
self._stopping = False
# Task registry for tracking and cancellation
self._active_conversations: Dict[str, asyncio.Task] = {} # focus_group_id -> Task
self._task_registry_lock = asyncio.Lock() # Will be created on the AI loop
# Database client for AI operations (will be created on AI loop)
self._db_client: Optional[AsyncIOMotorClient] = None
self._db = None
self._initialized = True
def start(self) -> None:
"""Start the AI runner thread and event loop."""
if self._running:
self.logger.warning("AI Runner already running")
return
self.logger.info("Starting AI Runner service...")
self._stopping = False
self._thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._thread.start()
# Wait for loop to be ready
while self._loop is None and not self._stopping:
threading.Event().wait(0.01)
if self._loop:
self.logger.info("AI Runner service started successfully")
else:
self.logger.error("Failed to start AI Runner service")
def stop(self) -> None:
"""Stop the AI runner service gracefully (idempotent)."""
self.logger.info("Stopping AI Runner service...")
self._stopping = True
# Get references (they might change during shutdown)
thread = self._thread
loop = self._loop
if loop is not None:
# Cancel all active conversations
try:
future = asyncio.run_coroutine_threadsafe(self._cancel_all_conversations(), loop)
future.result(timeout=3.0)
self.logger.info("All AI conversations cancelled")
except Exception as e:
self.logger.warning(f"Error cancelling conversations (continuing shutdown): {e}")
# Stop the event loop
try:
loop.call_soon_threadsafe(loop.stop)
self.logger.info("AI Runner event loop stop requested")
except Exception as e:
self.logger.warning(f"Error stopping event loop (continuing): {e}")
# Always try to join thread if it exists
if thread and thread.is_alive():
self.logger.info("Waiting for AI Runner thread to finish...")
thread.join(timeout=5.0)
if thread.is_alive():
self.logger.warning("AI Runner thread did not shut down gracefully")
else:
self.logger.info("AI Runner thread joined successfully")
self._running = False
self.logger.info("AI Runner service shutdown complete")
def _run_event_loop(self) -> None:
"""Main thread function that runs the asyncio event loop."""
try:
# Create new event loop for this thread
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
# Initialize async resources on this loop
self._loop.run_until_complete(self._initialize_async_resources())
self._running = True
self.logger.info("AI Runner event loop started")
# Run the event loop
self._loop.run_forever()
# Cleanup phase after loop.stop()
self.logger.info("AI Runner event loop stopped, cleaning up...")
self._loop.run_until_complete(self._cleanup_async_resources())
# Cancel any remaining tasks
pending = [t for t in asyncio.all_tasks(loop=self._loop) if t is not asyncio.current_task(self._loop)]
for t in pending:
t.cancel()
if pending:
self._loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
self.logger.info(f"Cancelled {len(pending)} remaining tasks")
# Shutdown async generators and default executor (Python 3.9+)
try:
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
except Exception as e:
self.logger.debug(f"Error shutting down async generators: {e}")
try:
self._loop.run_until_complete(self._loop.shutdown_default_executor())
except Exception as e:
self.logger.debug(f"Error shutting down default executor: {e}")
except Exception as e:
self.logger.error(f"AI Runner event loop error: {e}")
finally:
# Close the loop
try:
if self._loop:
self._loop.close()
self.logger.info("AI Runner event loop closed")
except Exception as e:
self.logger.debug(f"Error closing loop: {e}")
self._running = False
self.logger.info("AI Runner thread cleanup complete")
async def _initialize_async_resources(self) -> None:
"""Initialize async resources (database, HTTP clients) on the AI loop."""
try:
# Initialize database connection
self._db = await get_db()
self.logger.info("AI Runner: Database connection initialized")
# Create task registry lock on this loop
self._task_registry_lock = asyncio.Lock()
self.logger.info("AI Runner: Async resources initialized")
except Exception as e:
self.logger.error(f"Failed to initialize AI Runner async resources: {e}")
raise
async def _cleanup_async_resources(self) -> None:
"""Cleanup async resources."""
try:
# Cancel any remaining tasks
await self._cancel_all_conversations()
# Close database connections
if self._db_client:
self._db_client.close()
self.logger.info("AI Runner: Async resources cleaned up")
except Exception as e:
self.logger.error(f"Error cleaning up AI Runner resources: {e}")
async def _cancel_all_conversations(self) -> None:
"""Cancel all active conversation tasks."""
if not self._task_registry_lock:
return
async with self._task_registry_lock:
tasks_to_cancel = list(self._active_conversations.values())
self._active_conversations.clear()
if tasks_to_cancel:
self.logger.info(f"Cancelling {len(tasks_to_cancel)} active conversations")
for task in tasks_to_cancel:
if not task.done():
task.cancel()
# Wait for cancellation to complete
if tasks_to_cancel:
try:
await asyncio.gather(*tasks_to_cancel, return_exceptions=True)
except Exception as e:
self.logger.debug(f"Expected cancellation errors: {e}")
def submit_conversation(self, focus_group_id: str, coro: Awaitable[Any]) -> Future:
"""
Submit a conversation coroutine to run on the AI event loop.
Args:
focus_group_id: The focus group ID for tracking
coro: The coroutine to execute
Returns:
Future that will contain the result
"""
if not self._running or not self._loop:
raise RuntimeError("AI Runner is not running")
# Use run_coroutine_threadsafe to schedule on the AI loop
future = asyncio.run_coroutine_threadsafe(
self._run_conversation_with_tracking(focus_group_id, coro),
self._loop
)
return future
async def _run_conversation_with_tracking(self, focus_group_id: str, coro: Awaitable[Any]) -> Any:
"""Run a conversation coroutine with proper task tracking."""
# Create task for this conversation
task = asyncio.create_task(coro)
# Register the task
async with self._task_registry_lock:
# Cancel existing conversation for this focus group if any
existing_task = self._active_conversations.get(focus_group_id)
if existing_task and not existing_task.done():
self.logger.info(f"Cancelling existing conversation for focus group {focus_group_id}")
existing_task.cancel()
try:
await existing_task
except asyncio.CancelledError:
pass
self._active_conversations[focus_group_id] = task
try:
# Run the conversation
self.logger.info(f"Starting AI conversation for focus group {focus_group_id}")
result = await task
self.logger.info(f"AI conversation completed for focus group {focus_group_id}")
return result
except asyncio.CancelledError:
self.logger.info(f"AI conversation cancelled for focus group {focus_group_id}")
raise
except Exception as e:
self.logger.error(f"AI conversation error for focus group {focus_group_id}: {e}")
raise
finally:
# Unregister the task
async with self._task_registry_lock:
if self._active_conversations.get(focus_group_id) is task:
del self._active_conversations[focus_group_id]
def stop_conversation(self, focus_group_id: str) -> bool:
"""
Stop a specific conversation.
Args:
focus_group_id: The focus group ID to stop
Returns:
True if conversation was found and cancelled, False otherwise
"""
if not self._running or not self._loop:
return False
# Schedule cancellation on the AI loop
future = asyncio.run_coroutine_threadsafe(
self._cancel_conversation(focus_group_id),
self._loop
)
try:
return future.result(timeout=5.0)
except Exception as e:
self.logger.error(f"Error stopping conversation {focus_group_id}: {e}")
return False
async def _cancel_conversation(self, focus_group_id: str) -> bool:
"""Cancel a specific conversation task."""
async with self._task_registry_lock:
task = self._active_conversations.get(focus_group_id)
if task and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
return True
return False
def get_active_conversations(self) -> Dict[str, Dict[str, Any]]:
"""Get information about active conversations."""
if not self._running or not self._loop:
return {}
future = asyncio.run_coroutine_threadsafe(
self._get_conversation_info(),
self._loop
)
try:
return future.result(timeout=2.0)
except Exception as e:
self.logger.error(f"Error getting conversation info: {e}")
return {}
async def _get_conversation_info(self) -> Dict[str, Dict[str, Any]]:
"""Get conversation information from the AI loop."""
async with self._task_registry_lock:
info = {}
for focus_group_id, task in self._active_conversations.items():
info[focus_group_id] = {
'status': 'running' if not task.done() else 'completed',
'cancelled': task.cancelled() if task.done() else False,
'exception': str(task.exception()) if task.done() and task.exception() else None
}
return info
@property
def is_running(self) -> bool:
"""Check if the AI Runner is running."""
return self._running
@property
def active_conversation_count(self) -> int:
"""Get count of active conversations."""
return len(self._active_conversations) if self._active_conversations else 0
# Global AI Runner instance
_ai_runner: Optional[AIRunnerService] = None
def get_ai_runner() -> AIRunnerService:
"""Get the global AI Runner instance."""
global _ai_runner
if _ai_runner is None:
_ai_runner = AIRunnerService()
return _ai_runner
def init_ai_runner() -> None:
"""Initialize and start the AI Runner service."""
ai_runner = get_ai_runner()
if not ai_runner.is_running:
ai_runner.start()
def shutdown_ai_runner() -> None:
"""Shutdown the AI Runner service."""
global _ai_runner
if _ai_runner and _ai_runner.is_running:
_ai_runner.stop()
_ai_runner = None