""" Key Theme Generation Service This service provides functions for generating key themes from focus group discussions. """ import asyncio import logging import re from typing import Dict, Any, List, Optional from app.services.llm_service import LLMService, LLMServiceError from app.utils.prompt_loader import load_prompt, PromptLoaderError from app.models.focus_group import FocusGroup from app.models.persona import Persona class KeyThemeServiceError(Exception): """Exception raised for errors in key theme generation.""" pass class KeyThemeService: """Service for generating key themes from focus group discussions.""" @staticmethod async def generate_key_themes( focus_group_id: str, temperature: float = 0.7, llm_model: Optional[str] = None ) -> List[Dict[str, str]]: """ Generate key themes from a focus group discussion. Args: focus_group_id: The ID of the focus group temperature: Controls randomness in generation (default: 0.7) llm_model: Optional LLM model to use for generation Returns: A list of key theme objects with title and description fields Raises: KeyThemeServiceError: If there's an issue with the generation process """ logger = logging.getLogger(__name__) from app.services.llm_usage_context import set_llm_context set_llm_context(feature="key_themes", focus_group_id=focus_group_id) logger.info(f"Starting key theme generation for focus group {focus_group_id} with temperature {temperature}") logger.info(f"Using LLM model: {llm_model or 'default'}") try: # Get the focus group focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: raise KeyThemeServiceError(f"Focus group not found with ID: {focus_group_id}") # Get all messages from the focus group messages = await FocusGroup.get_messages(focus_group_id) if not messages: raise KeyThemeServiceError("No messages found in this focus group") logger.info(f"Found {len(messages)} messages in focus group {focus_group_id}") # Get all participants (personas) in the focus group participants_data = [] if 'participants' in focus_group and focus_group['participants']: for persona_id in focus_group['participants']: try: persona = await Persona.find_by_id(persona_id) if persona: participants_data.append(persona) except Exception as e: print(f"Error fetching participant {persona_id}: {e}") # Generate key themes using LLM return await KeyThemeService._extract_themes_from_discussion( messages=messages, participants=participants_data, discussion_guide=focus_group.get('discussionGuide', ''), temperature=temperature, llm_model=llm_model ) except Exception as e: raise KeyThemeServiceError(f"Error generating key themes: {str(e)}") @staticmethod async def _extract_themes_from_discussion( messages: List[Dict[str, Any]], participants: List[Dict[str, Any]], discussion_guide: str, temperature: float = 0.7, llm_model: Optional[str] = None ) -> List[Dict[str, str]]: """ Extract key themes from a discussion using LLM. Args: messages: List of discussion messages participants: List of participant personas discussion_guide: The discussion guide for the focus group temperature: Controls randomness in generation llm_model: Optional LLM model to use for generation Returns: A list of key theme objects with title and description Raises: KeyThemeServiceError: If there's an issue with the LLM processing """ logger = logging.getLogger(__name__) logger.info(f"Beginning theme extraction from {len(messages)} messages") logger.info(f"Theme extraction using LLM model: {llm_model or 'default (gemini-3-pro-preview)'}") try: # Load and prepare the prompt for the LLM try: prompt = KeyThemeService._build_theme_extraction_prompt( messages=messages, participants=participants, discussion_guide=discussion_guide ) logger.debug("Successfully loaded and built theme extraction prompt") except PromptLoaderError as e: logger.error(f"Failed to load theme extraction prompt: {str(e)}") raise KeyThemeServiceError(f"Error loading theme extraction prompt: {str(e)}") # Load system prompt try: system_prompt = load_prompt('key-theme-system') logger.debug("Successfully loaded system prompt") except PromptLoaderError as e: logger.error(f"Failed to load system prompt: {str(e)}") raise KeyThemeServiceError(f"Error loading system prompt: {str(e)}") # Call the LLM to generate themes with retry logic max_retries = 3 last_error = None logger.info(f"Starting LLM theme generation with maximum {max_retries} attempts") for attempt in range(max_retries): attempt_num = attempt + 1 logger.info(f"Attempt {attempt_num}/{max_retries}: Calling LLM ({llm_model or 'gemini-3-pro-preview'}) for theme generation") try: themes = await LLMService.generate_structured_array( prompt=prompt, temperature=temperature, system_prompt=system_prompt, model_name=llm_model ) logger.info(f"Attempt {attempt_num}/{max_retries}: LLM ({llm_model or 'gemini-3-pro-preview'}) call successful, received {len(themes)} themes") # Validate the response structure validated_themes = [] for theme in themes: if isinstance(theme, dict) and 'title' in theme and 'description' in theme: validated_theme = { 'title': theme['title'], 'description': theme['description'] } # Add quotes if present if 'quotes' in theme and isinstance(theme['quotes'], list): # Validate and clean quotes format, extracting message IDs validated_quotes = [] for quote in theme['quotes']: if isinstance(quote, str) and quote.strip(): quote_data = KeyThemeService._parse_quote_with_message_id(quote.strip()) # Validate that the quote exists in the original messages if KeyThemeService._validate_quote_exists(quote_data, messages): validated_quotes.append(quote_data) else: logger.warning(f"Quote validation failed for theme '{theme.get('title', 'Unknown')}': {quote[:100]}...") validated_theme['quotes'] = validated_quotes else: validated_theme['quotes'] = [] validated_themes.append(validated_theme) logger.info(f"Theme generation completed successfully with {len(validated_themes)} validated themes using {llm_model or 'gemini-3-pro-preview'}") return validated_themes except LLMServiceError as e: last_error = e error_message = str(e).lower() logger.warning(f"Attempt {attempt_num}/{max_retries}: LLM call failed with error: {str(e)}") # Check if this is a retryable error (Google API internal errors) if "500" in error_message or "internal error" in error_message: if attempt < max_retries - 1: # Wait before retrying (exponential backoff) wait_time = 2 ** attempt # 1s, 2s, 4s logger.info(f"Retryable error detected. Waiting {wait_time} seconds before retry {attempt_num + 1}/{max_retries}") await asyncio.sleep(wait_time) continue else: logger.error(f"Retryable error detected but max retries ({max_retries}) reached") else: logger.error(f"Non-retryable error detected: {str(e)}") # If it's not a retryable error or we've exhausted retries, re-raise raise KeyThemeServiceError(f"LLM error: {str(e)}") # If we've exhausted all retries, raise the last error logger.error(f"All {max_retries} attempts failed. Final error: {str(last_error)}") raise KeyThemeServiceError(f"LLM error after {max_retries} attempts: {str(last_error)}") except Exception as e: raise KeyThemeServiceError(f"Error extracting themes: {str(e)}") @staticmethod def _build_theme_extraction_prompt( messages: List[Dict[str, Any]], participants: List[Dict[str, Any]], discussion_guide: str ) -> str: """ Build the prompt for theme extraction. Args: messages: List of discussion messages participants: List of participant personas discussion_guide: The discussion guide for the focus group Returns: A formatted prompt string for the LLM """ # Format the discussion messages with IDs for quote tracking formatted_messages = [] for msg in messages: sender_id = msg.get('senderId', '') sender_name = "AI Moderator" if sender_id == "moderator" else f"Participant {sender_id}" # Find the participant name if available for participant in participants: participant_id = participant.get('_id', '') or participant.get('id', '') if participant_id == sender_id: sender_name = participant.get('name', sender_name) break text = msg.get('text', '') message_id = msg.get('id', '') or msg.get('_id', '') # Include message ID in the formatted message for quote tracking formatted_messages.append(f"[MSG_ID:{message_id}] {sender_name}: {text}") # Format the participant profiles formatted_profiles = [] for participant in participants: name = participant.get('name', 'Unknown') age = participant.get('age', 'Unknown') occupation = participant.get('occupation', 'Unknown') background = participant.get('background', '') profile = f"Name: {name}\nAge: {age}\nOccupation: {occupation}" if background: profile += f"\nBackground: {background}" formatted_profiles.append(profile) # Join formatted profiles and messages with newlines profiles_text = "\n".join(formatted_profiles) messages_text = "\n".join(formatted_messages) # Load and format the theme extraction prompt try: prompt = load_prompt('key-theme-extraction', { 'discussion_guide': discussion_guide, 'profiles_text': profiles_text, 'messages_text': messages_text }) except PromptLoaderError as e: raise KeyThemeServiceError(f"Error loading theme extraction prompt: {str(e)}") return prompt @staticmethod def _parse_quote_with_message_id(quote: str) -> dict: """ Parse a quote string to extract message ID, speaker, and text. Expected format: "[MSG_ID:message_id] [Speaker Name]: quote text" Args: quote: The quote string to parse Returns: A dictionary with 'message_id', 'speaker', 'text', and 'original' fields """ logger = logging.getLogger(__name__) # Initialize default structure quote_data = { 'message_id': None, 'speaker': None, 'text': quote, 'original': quote } try: # Try to parse format: [MSG_ID:message_id] [Speaker Name]: quote text (legacy with brackets) msg_id_pattern_brackets = r'^\[MSG_ID:([^\]]+)\]\s*\[([^\]]+)\]:\s*(.*)$' match = re.match(msg_id_pattern_brackets, quote) if match: quote_data['message_id'] = match.group(1) quote_data['speaker'] = match.group(2) quote_data['text'] = match.group(3) logger.debug(f"Successfully parsed quote with message ID (bracketed format): {quote_data['message_id']}") return quote_data # Try to parse format: [MSG_ID:message_id] Speaker Name: quote text (current LLM format) msg_id_pattern = r'^\[MSG_ID:([^\]]+)\]\s*([^:]+):\s*(.*)$' match = re.match(msg_id_pattern, quote) if match: quote_data['message_id'] = match.group(1) quote_data['speaker'] = match.group(2).strip() quote_data['text'] = match.group(3) logger.debug(f"Successfully parsed quote with message ID (standard format): {quote_data['message_id']}") return quote_data # Fallback: Try legacy format [Speaker Name]: quote text legacy_pattern = r'^\[([^\]]+)\]:\s*(.*)$' legacy_match = re.match(legacy_pattern, quote) if legacy_match: quote_data['speaker'] = legacy_match.group(1) quote_data['text'] = legacy_match.group(2) logger.warning(f"Quote using legacy format without message ID: {quote[:50]}...") return quote_data # If no pattern matches, log warning and return as-is logger.warning(f"Quote does not match expected format: {quote[:50]}...") return quote_data except Exception as e: logger.error(f"Error parsing quote '{quote[:50]}...': {str(e)}") return quote_data @staticmethod def _validate_quote_exists(quote_data: dict, messages: List[Dict[str, Any]]) -> bool: """ Validate that a quote actually exists in the original messages. Args: quote_data: The parsed quote data with message_id, speaker, text, etc. messages: List of original discussion messages Returns: True if the quote can be validated, False otherwise """ logger = logging.getLogger(__name__) quote_text = quote_data.get('text', '').strip() message_id = quote_data.get('message_id') if not quote_text: logger.warning("Quote validation failed: empty quote text") return False # Strategy 1: Direct message ID lookup (most reliable) if message_id: target_message = None for msg in messages: msg_id = msg.get('id', '') or msg.get('_id', '') if msg_id == message_id: target_message = msg break if target_message: msg_text = target_message.get('text', '') # Check if quote text exists in the target message if quote_text.lower() in msg_text.lower() or msg_text.lower() in quote_text.lower(): logger.debug(f"Quote validated via message ID {message_id}") return True else: logger.warning(f"Quote text doesn't match message ID {message_id}: quote='{quote_text[:50]}...', msg='{msg_text[:50]}...'") # Fall through to text-based validation else: logger.warning(f"Message ID {message_id} not found in messages") # Fall through to text-based validation # Strategy 2: Text-based validation (fallback) # Normalize text for comparison def normalize_text(text): return text.lower().strip().replace('\n', ' ').replace('\r', '') normalized_quote = normalize_text(quote_text) # Look for the quote text in any message for msg in messages: msg_text = normalize_text(msg.get('text', '')) # Check for exact substring match if normalized_quote in msg_text or msg_text in normalized_quote: logger.debug(f"Quote validated via text matching in message: {msg.get('id', 'unknown')}") return True # Check for high similarity (fuzzy matching) if len(normalized_quote) > 10 and len(msg_text) > 10: # Simple word overlap check quote_words = set(normalized_quote.split()) msg_words = set(msg_text.split()) if len(quote_words) > 0 and len(msg_words) > 0: overlap = len(quote_words.intersection(msg_words)) quote_word_ratio = overlap / len(quote_words) msg_word_ratio = overlap / len(msg_words) # If 70% of quote words are in message, or 70% of message words are in quote if quote_word_ratio >= 0.7 or msg_word_ratio >= 0.7: logger.debug(f"Quote validated via fuzzy matching (overlap: {overlap}/{len(quote_words)} words)") return True logger.warning(f"Quote validation failed: no matching message found for '{quote_text[:50]}...'") return False