""" Conversation Decision Service Uses LLM to make intelligent decisions about conversation flow, participant selection, and moderation. """ from typing import Dict, Any, Optional, List import asyncio import json from app.services.llm_service import LLMService, LLMServiceError from app.services.conversation_context_service import ConversationContextService from app.utils.prompt_loader import load_prompt, PromptLoaderError class ConversationDecisionError(Exception): """Exception raised for errors in conversation decision making.""" pass class ConversationDecisionService: """Service for making LLM-based conversation decisions.""" @staticmethod async def decide_next_action(focus_group_id: str, temperature: float = 0.7, mode: str = "ai") -> Dict[str, Any]: """ Use LLM to decide the next action in the conversation. Args: focus_group_id: The focus group ID temperature: LLM temperature for decision making mode: The conversation mode - "ai" for autonomous mode, "manual" for manual mode Returns: Dictionary containing the decision and action details Raises: ConversationDecisionError: If there's an issue with decision making """ print(f"🎯 Decision request: {mode} mode for focus group {focus_group_id}") try: from app.services.llm_usage_context import set_llm_context set_llm_context(feature="conversation_decision", focus_group_id=focus_group_id) # Get full conversation context context = await ConversationContextService.get_full_context(focus_group_id) formatted_context = ConversationContextService.format_context_for_llm(context) # Load the appropriate prompt based on mode try: if mode == "manual": prompt_name = 'conversation-participant-selection' else: prompt_name = 'conversation-decision-engine' prompt = load_prompt(prompt_name, formatted_context) except PromptLoaderError as e: print(f"❌ Error loading {mode} mode prompt: {str(e)}") raise ConversationDecisionError(f"Error loading {mode} mode prompt: {str(e)}") # Get LLM model for this focus group from app.models.focus_group import FocusGroup focus_group = await FocusGroup.find_by_id(focus_group_id) llm_model = focus_group.get('llm_model') if focus_group else None # Get LLM decision with timeout to prevent infinite hang try: response = await asyncio.wait_for( LLMService.generate_content( prompt=prompt, temperature=temperature, model_name=llm_model ), timeout=60 ) # Parse the JSON response decision = LLMService.parse_json_response(response) # Validate decision structure if not ConversationDecisionService._validate_decision(decision): print(f"❌ Invalid decision structure from LLM: {decision}") raise ConversationDecisionError("Invalid decision structure from LLM") # Set up logging import logging logger = logging.getLogger(__name__) # Log essential decision info with reasoning action = decision.get('action', 'unknown') reasoning = decision.get('reasoning', 'No reasoning provided') if action == 'participant_respond': participant_id = decision.get('details', {}).get('participant_id', 'unknown') logger.info(f"🎯 LLM DECISION RESULT: {action} for participant {participant_id} - {reasoning}") print(f"✅ Decision: {action} for participant {participant_id}") else: logger.info(f"🎯 LLM DECISION RESULT: {action} - {reasoning}") print(f"✅ Decision: {action}") return decision except asyncio.TimeoutError: print(f"⏱️ LLM decision timed out after 60s") raise ConversationDecisionError("LLM decision timed out after 60s") except LLMServiceError as e: print(f"❌ LLM Service Error: {str(e)}") raise ConversationDecisionError(f"Error getting LLM decision: {str(e)}") except Exception as e: print(f"❌ Unexpected error in LLM processing: {str(e)}") raise ConversationDecisionError(f"Unexpected error in LLM processing: {str(e)}") except ConversationDecisionError: # Re-raise ConversationDecisionError as-is raise except Exception as e: print(f"❌ Unexpected error in conversation decision making: {str(e)}") import traceback print(f"❌ Full traceback: {traceback.format_exc()}") raise ConversationDecisionError(f"Error in conversation decision making: {str(e)}") @staticmethod def _validate_decision(decision: Dict[str, Any]) -> bool: """Validate that the LLM decision has the correct structure.""" if not isinstance(decision, dict): return False # Check required fields required_fields = ['action', 'reasoning', 'details'] for field in required_fields: if field not in decision: return False # Validate optional discussion_guide_position_id field if present if 'discussion_guide_position_id' in decision: if not isinstance(decision['discussion_guide_position_id'], str) or not decision['discussion_guide_position_id'].strip(): return False # Check action type valid_actions = ['moderator_speak', 'participant_respond', 'participant_interaction', 'probe_trigger', 'end_session'] if decision['action'] not in valid_actions: return False # Validate details based on action type details = decision['details'] if not isinstance(details, dict): return False action = decision['action'] if action == 'moderator_speak': required_details = ['message_type', 'content'] return all(field in details for field in required_details) elif action == 'participant_respond': required_details = ['participant_id', 'call_out', 'topic_context'] return all(field in details for field in required_details) elif action == 'participant_interaction': required_details = ['participant_ids', 'interaction_type', 'moderator_prompt'] return all(field in details for field in required_details) and isinstance(details['participant_ids'], list) elif action == 'probe_trigger': required_details = ['trigger_type', 'probe_question'] return all(field in details for field in required_details) elif action == 'end_session': required_details = ['completion_reason', 'closing_message'] return all(field in details for field in required_details) return True @staticmethod async def select_next_participant(focus_group_id: str, current_topic: str, temperature: float = 0.7) -> Dict[str, Any]: """ Use LLM to select the next participant to respond. Args: focus_group_id: The focus group ID current_topic: The current topic being discussed temperature: LLM temperature for selection Returns: Dictionary containing participant selection details """ try: decision = await ConversationDecisionService.decide_next_action(focus_group_id, temperature) if decision['action'] == 'participant_respond': return { 'participant_id': decision['details']['participant_id'], 'call_out': decision['details']['call_out'], 'topic_context': decision['details']['topic_context'], 'reasoning': decision['reasoning'] } else: # If LLM decided on a different action, return that instead return { 'alternative_action': decision['action'], 'decision': decision } except ConversationDecisionError as e: raise e except Exception as e: raise ConversationDecisionError(f"Error selecting participant: {str(e)}") @staticmethod async def detect_probe_triggers(focus_group_id: str, temperature: float = 0.7) -> Dict[str, Any]: """ Use LLM to detect if probe triggers are needed. Args: focus_group_id: The focus group ID temperature: LLM temperature for detection Returns: Dictionary containing probe trigger information """ try: decision = await ConversationDecisionService.decide_next_action(focus_group_id, temperature) if decision['action'] == 'probe_trigger': return { 'trigger_detected': True, 'trigger_type': decision['details']['trigger_type'], 'probe_question': decision['details']['probe_question'], 'target_participants': decision['details'].get('target_participants', []), 'reasoning': decision['reasoning'] } else: return { 'trigger_detected': False, 'alternative_action': decision['action'], 'decision': decision } except ConversationDecisionError as e: raise e except Exception as e: raise ConversationDecisionError(f"Error detecting probe triggers: {str(e)}") @staticmethod async def generate_moderator_response(focus_group_id: str, context: str, temperature: float = 0.7) -> Dict[str, Any]: """ Use LLM to generate appropriate moderator response. Args: focus_group_id: The focus group ID context: Additional context for the response temperature: LLM temperature for generation Returns: Dictionary containing moderator response details """ try: decision = await ConversationDecisionService.decide_next_action(focus_group_id, temperature) if decision['action'] == 'moderator_speak': return { 'message_type': decision['details']['message_type'], 'content': decision['details']['content'], 'target_participants': decision['details'].get('target_participants', []), 'reasoning': decision['reasoning'] } else: return { 'alternative_action': decision['action'], 'decision': decision } except ConversationDecisionError as e: raise e except Exception as e: raise ConversationDecisionError(f"Error generating moderator response: {str(e)}") @staticmethod async def detect_persona_interactions(focus_group_id: str, temperature: float = 0.7) -> Dict[str, Any]: """ Use LLM to detect when personas should interact directly. Args: focus_group_id: The focus group ID temperature: LLM temperature for detection Returns: Dictionary containing persona interaction details """ try: decision = await ConversationDecisionService.decide_next_action(focus_group_id, temperature) if decision['action'] == 'participant_interaction': return { 'interaction_needed': True, 'participant_ids': decision['details']['participant_ids'], 'interaction_type': decision['details']['interaction_type'], 'moderator_prompt': decision['details']['moderator_prompt'], 'reasoning': decision['reasoning'] } else: return { 'interaction_needed': False, 'alternative_action': decision['action'], 'decision': decision } except ConversationDecisionError as e: raise e except Exception as e: raise ConversationDecisionError(f"Error detecting persona interactions: {str(e)}") @staticmethod async def should_end_session(focus_group_id: str, temperature: float = 0.7) -> Dict[str, Any]: """ Use LLM to determine if the session should end. Args: focus_group_id: The focus group ID temperature: LLM temperature for decision Returns: Dictionary containing session ending decision """ try: decision = await ConversationDecisionService.decide_next_action(focus_group_id, temperature) if decision['action'] == 'end_session': return { 'should_end': True, 'completion_reason': decision['details']['completion_reason'], 'closing_message': decision['details']['closing_message'], 'reasoning': decision['reasoning'] } else: return { 'should_end': False, 'continue_action': decision['action'], 'decision': decision } except ConversationDecisionError as e: raise e except Exception as e: raise ConversationDecisionError(f"Error determining session end: {str(e)}") @staticmethod async def get_conversation_insights(focus_group_id: str, temperature: float = 0.7) -> Dict[str, Any]: """ Use LLM to generate insights about the current conversation state. Args: focus_group_id: The focus group ID temperature: LLM temperature for analysis Returns: Dictionary containing conversation insights """ try: # Get conversation context context = await ConversationContextService.get_full_context(focus_group_id) # Create a specialized prompt for insights insight_prompt = f""" Analyze the current focus group conversation and provide insights: {ConversationContextService.format_context_for_llm(context)} Please provide insights in the following JSON format: {{ "participation_balance": "balanced" | "unbalanced" | "needs_attention", "conversation_energy": "high" | "medium" | "low", "topic_engagement": "high" | "medium" | "low", "sentiment_trend": "positive" | "neutral" | "negative", "key_themes": ["theme1", "theme2", "theme3"], "recommendations": ["rec1", "rec2", "rec3"], "next_suggested_action": "specific recommendation for next step" }} """ # Get LLM model for this focus group from app.models.focus_group import FocusGroup focus_group = await FocusGroup.find_by_id(focus_group_id) llm_model = focus_group.get('llm_model') if focus_group else None response = await LLMService.generate_content( prompt=insight_prompt, temperature=temperature, model_name=llm_model ) insights = LLMService.parse_json_response(response) return insights except Exception as e: raise ConversationDecisionError(f"Error generating conversation insights: {str(e)}")