""" AI Runner Service Provides a single dedicated thread with an asyncio event loop for all AI conversations. This fixes Motor loop affinity issues and improves scalability by avoiding one-thread-per-conversation. Based on GPT-5 recommendations for clean async/threading architecture. """ import asyncio import threading import logging from typing import Dict, Any, Optional, Callable, Awaitable from datetime import datetime from concurrent.futures import Future import weakref from app.db import get_db from motor.motor_asyncio import AsyncIOMotorClient class AIRunnerService: """Singleton service that runs all AI conversations in a dedicated thread with single event loop.""" _instance: Optional['AIRunnerService'] = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if hasattr(self, '_initialized'): return self.logger = logging.getLogger(__name__) self._thread: Optional[threading.Thread] = None self._loop: Optional[asyncio.AbstractEventLoop] = None self._running = False self._stopping = False # Task registry for tracking and cancellation self._active_conversations: Dict[str, asyncio.Task] = {} # focus_group_id -> Task self._task_registry_lock = asyncio.Lock() # Will be created on the AI loop # Database client for AI operations (will be created on AI loop) self._db_client: Optional[AsyncIOMotorClient] = None self._db = None self._initialized = True def start(self) -> None: """Start the AI runner thread and event loop.""" if self._running: self.logger.warning("AI Runner already running") return self.logger.info("Starting AI Runner service...") self._stopping = False self._thread = threading.Thread(target=self._run_event_loop, daemon=True) self._thread.start() # Wait for loop to be ready while self._loop is None and not self._stopping: threading.Event().wait(0.01) if self._loop: self.logger.info("AI Runner service started successfully") else: self.logger.error("Failed to start AI Runner service") def stop(self) -> None: """Stop the AI runner service gracefully (idempotent).""" self.logger.info("Stopping AI Runner service...") self._stopping = True # Get references (they might change during shutdown) thread = self._thread loop = self._loop if loop is not None: # Cancel all active conversations try: future = asyncio.run_coroutine_threadsafe(self._cancel_all_conversations(), loop) future.result(timeout=3.0) self.logger.info("All AI conversations cancelled") except Exception as e: self.logger.warning(f"Error cancelling conversations (continuing shutdown): {e}") # Stop the event loop try: loop.call_soon_threadsafe(loop.stop) self.logger.info("AI Runner event loop stop requested") except Exception as e: self.logger.warning(f"Error stopping event loop (continuing): {e}") # Always try to join thread if it exists if thread and thread.is_alive(): self.logger.info("Waiting for AI Runner thread to finish...") thread.join(timeout=5.0) if thread.is_alive(): self.logger.warning("AI Runner thread did not shut down gracefully") else: self.logger.info("AI Runner thread joined successfully") self._running = False self.logger.info("AI Runner service shutdown complete") def _run_event_loop(self) -> None: """Main thread function that runs the asyncio event loop.""" try: # Create new event loop for this thread self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) # Initialize async resources on this loop self._loop.run_until_complete(self._initialize_async_resources()) self._running = True self.logger.info("AI Runner event loop started") # Run the event loop self._loop.run_forever() # Cleanup phase after loop.stop() self.logger.info("AI Runner event loop stopped, cleaning up...") self._loop.run_until_complete(self._cleanup_async_resources()) # Cancel any remaining tasks pending = [t for t in asyncio.all_tasks(loop=self._loop) if t is not asyncio.current_task(self._loop)] for t in pending: t.cancel() if pending: self._loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) self.logger.info(f"Cancelled {len(pending)} remaining tasks") # Shutdown async generators and default executor (Python 3.9+) try: self._loop.run_until_complete(self._loop.shutdown_asyncgens()) except Exception as e: self.logger.debug(f"Error shutting down async generators: {e}") try: self._loop.run_until_complete(self._loop.shutdown_default_executor()) except Exception as e: self.logger.debug(f"Error shutting down default executor: {e}") except Exception as e: self.logger.error(f"AI Runner event loop error: {e}") finally: # Close the loop try: if self._loop: self._loop.close() self.logger.info("AI Runner event loop closed") except Exception as e: self.logger.debug(f"Error closing loop: {e}") self._running = False self.logger.info("AI Runner thread cleanup complete") async def _initialize_async_resources(self) -> None: """Initialize async resources (database, HTTP clients) on the AI loop.""" try: # Initialize database connection self._db = await get_db() self.logger.info("AI Runner: Database connection initialized") # Create task registry lock on this loop self._task_registry_lock = asyncio.Lock() self.logger.info("AI Runner: Async resources initialized") except Exception as e: self.logger.error(f"Failed to initialize AI Runner async resources: {e}") raise async def _cleanup_async_resources(self) -> None: """Cleanup async resources.""" try: # Cancel any remaining tasks await self._cancel_all_conversations() # Close database connections if self._db_client: self._db_client.close() self.logger.info("AI Runner: Async resources cleaned up") except Exception as e: self.logger.error(f"Error cleaning up AI Runner resources: {e}") async def _cancel_all_conversations(self) -> None: """Cancel all active conversation tasks.""" if not self._task_registry_lock: return async with self._task_registry_lock: tasks_to_cancel = list(self._active_conversations.values()) self._active_conversations.clear() if tasks_to_cancel: self.logger.info(f"Cancelling {len(tasks_to_cancel)} active conversations") for task in tasks_to_cancel: if not task.done(): task.cancel() # Wait for cancellation to complete if tasks_to_cancel: try: await asyncio.gather(*tasks_to_cancel, return_exceptions=True) except Exception as e: self.logger.debug(f"Expected cancellation errors: {e}") def submit_conversation(self, focus_group_id: str, coro: Awaitable[Any]) -> Future: """ Submit a conversation coroutine to run on the AI event loop. Args: focus_group_id: The focus group ID for tracking coro: The coroutine to execute Returns: Future that will contain the result """ if not self._running or not self._loop: raise RuntimeError("AI Runner is not running") # Use run_coroutine_threadsafe to schedule on the AI loop future = asyncio.run_coroutine_threadsafe( self._run_conversation_with_tracking(focus_group_id, coro), self._loop ) return future async def _run_conversation_with_tracking(self, focus_group_id: str, coro: Awaitable[Any]) -> Any: """Run a conversation coroutine with proper task tracking.""" # Create task for this conversation task = asyncio.create_task(coro) # Register the task async with self._task_registry_lock: # Cancel existing conversation for this focus group if any existing_task = self._active_conversations.get(focus_group_id) if existing_task and not existing_task.done(): self.logger.info(f"Cancelling existing conversation for focus group {focus_group_id}") existing_task.cancel() try: await existing_task except asyncio.CancelledError: pass self._active_conversations[focus_group_id] = task try: # Run the conversation self.logger.info(f"Starting AI conversation for focus group {focus_group_id}") result = await task self.logger.info(f"AI conversation completed for focus group {focus_group_id}") return result except asyncio.CancelledError: self.logger.info(f"AI conversation cancelled for focus group {focus_group_id}") raise except Exception as e: self.logger.error(f"AI conversation error for focus group {focus_group_id}: {e}") raise finally: # Unregister the task async with self._task_registry_lock: if self._active_conversations.get(focus_group_id) is task: del self._active_conversations[focus_group_id] def stop_conversation(self, focus_group_id: str) -> bool: """ Stop a specific conversation. Args: focus_group_id: The focus group ID to stop Returns: True if conversation was found and cancelled, False otherwise """ if not self._running or not self._loop: return False # Schedule cancellation on the AI loop future = asyncio.run_coroutine_threadsafe( self._cancel_conversation(focus_group_id), self._loop ) try: return future.result(timeout=5.0) except Exception as e: self.logger.error(f"Error stopping conversation {focus_group_id}: {e}") return False async def _cancel_conversation(self, focus_group_id: str) -> bool: """Cancel a specific conversation task.""" async with self._task_registry_lock: task = self._active_conversations.get(focus_group_id) if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass return True return False def get_active_conversations(self) -> Dict[str, Dict[str, Any]]: """Get information about active conversations.""" if not self._running or not self._loop: return {} future = asyncio.run_coroutine_threadsafe( self._get_conversation_info(), self._loop ) try: return future.result(timeout=2.0) except Exception as e: self.logger.error(f"Error getting conversation info: {e}") return {} async def _get_conversation_info(self) -> Dict[str, Dict[str, Any]]: """Get conversation information from the AI loop.""" async with self._task_registry_lock: info = {} for focus_group_id, task in self._active_conversations.items(): info[focus_group_id] = { 'status': 'running' if not task.done() else 'completed', 'cancelled': task.cancelled() if task.done() else False, 'exception': str(task.exception()) if task.done() and task.exception() else None } return info @property def is_running(self) -> bool: """Check if the AI Runner is running.""" return self._running @property def active_conversation_count(self) -> int: """Get count of active conversations.""" return len(self._active_conversations) if self._active_conversations else 0 # Global AI Runner instance _ai_runner: Optional[AIRunnerService] = None def get_ai_runner() -> AIRunnerService: """Get the global AI Runner instance.""" global _ai_runner if _ai_runner is None: _ai_runner = AIRunnerService() return _ai_runner def init_ai_runner() -> None: """Initialize and start the AI Runner service.""" ai_runner = get_ai_runner() if not ai_runner.is_running: ai_runner.start() def shutdown_ai_runner() -> None: """Shutdown the AI Runner service.""" global _ai_runner if _ai_runner and _ai_runner.is_running: _ai_runner.stop() _ai_runner = None