""" Conversation Context Service Aggregates and formats all context needed for LLM-based conversation decisions. Also handles multimodal conversation context building with visual assets. """ from typing import Dict, List, Any, Optional from datetime import datetime, timedelta, timezone import json import os from collections import defaultdict, Counter from app.models.focus_group import FocusGroup from app.models.persona import Persona class ConversationContextService: """Service for aggregating conversation context for LLM decision making.""" @staticmethod async def get_full_context(focus_group_id: str) -> Dict[str, Any]: """ Get complete conversation context for LLM decision making. Args: focus_group_id: The focus group ID Returns: Dictionary containing all context for LLM decision making """ try: # Get focus group data focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: raise ValueError(f"Focus group {focus_group_id} not found") # Get all participants participants = await ConversationContextService._get_participants_context(focus_group) # Get conversation history messages = await FocusGroup.get_messages(focus_group_id) conversation_history = ConversationContextService._format_conversation_history(messages) # Get conversation analytics analytics = ConversationContextService._analyze_conversation(messages, participants) # Get discussion guide context discussion_guide_context = ConversationContextService._get_discussion_guide_context(focus_group) # Calculate elapsed time created_at = focus_group.get('created_at') if created_at: # MongoDB may return naive datetimes — make timezone-aware before subtracting if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) elapsed_minutes = (datetime.now(timezone.utc) - created_at).total_seconds() / 60 else: elapsed_minutes = 0 return { 'focus_group_topic': focus_group.get('topic', 'Unknown'), 'focus_group_duration': focus_group.get('duration', 60), 'current_time': round(elapsed_minutes, 1), 'discussion_guide_context': discussion_guide_context, 'current_section': ConversationContextService._get_current_section(focus_group), 'participants_context': participants, 'conversation_history': conversation_history, 'conversation_analytics': analytics } except Exception as e: raise Exception(f"Error getting conversation context: {str(e)}") @staticmethod async def _get_participants_context(focus_group: Dict[str, Any]) -> List[Dict[str, Any]]: """Get formatted participant context with OCEAN traits and participation stats.""" participants = [] participant_ids = focus_group.get('participants', []) for participant_id in participant_ids: persona = await Persona.find_by_id(participant_id) if persona: participant_context = { 'id': participant_id, 'name': persona.get('name', 'Unknown'), 'demographics': { 'age': persona.get('age', 'Unknown'), 'gender': persona.get('gender', 'Unknown'), 'occupation': persona.get('occupation', 'Unknown'), 'location': persona.get('location', 'Unknown'), 'education': persona.get('education', 'Unknown') }, 'personality': { 'description': persona.get('personality', 'No description'), 'ocean_traits': persona.get('oceanTraits', {}), 'goals': persona.get('goals', []), 'frustrations': persona.get('frustrations', []), 'motivations': persona.get('motivations', []) }, 'interests': persona.get('interests', ''), 'background': { 'think_feel_do': persona.get('thinkFeelDo', {}), 'scenarios': persona.get('scenarios', []) } } participants.append(participant_context) return participants @staticmethod def _format_conversation_history(messages: List[Dict[str, Any]], limit: int = 20) -> List[Dict[str, Any]]: """Format recent conversation history for LLM consumption.""" if not messages: return [] # Get recent messages recent_messages = messages[-limit:] if len(messages) > limit else messages formatted_messages = [] for msg in recent_messages: formatted_msg = { 'id': msg.get('_id', msg.get('id', 'unknown')), 'sender_id': msg.get('senderId', 'unknown'), 'sender_type': 'moderator' if msg.get('senderId') == 'moderator' else 'participant', 'message_type': msg.get('type', 'response'), 'content': msg.get('text', ''), 'timestamp': msg.get('created_at', ''), 'highlighted': msg.get('highlighted', False) } formatted_messages.append(formatted_msg) return formatted_messages @staticmethod def _analyze_conversation(messages: List[Dict[str, Any]], participants: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze conversation for participation patterns and sentiment.""" if not messages: return { 'total_messages': 0, 'participant_stats': {}, 'recent_activity': {}, 'sentiment_analysis': {}, 'topic_emergence': [] } # Participation statistics participant_stats = defaultdict(lambda: { 'total_messages': 0, 'recent_messages': 0, 'last_message_index': -1, 'avg_message_length': 0, 'participation_percentage': 0 }) # Count messages by participant total_participant_messages = 0 message_lengths = defaultdict(list) for i, msg in enumerate(messages): sender_id = msg.get('senderId') if sender_id != 'moderator': participant_stats[sender_id]['total_messages'] += 1 participant_stats[sender_id]['last_message_index'] = i message_length = len(msg.get('text', '')) message_lengths[sender_id].append(message_length) total_participant_messages += 1 # Count recent messages (last 10) if i >= len(messages) - 10: participant_stats[sender_id]['recent_messages'] += 1 # Calculate averages and percentages for participant_id, stats in participant_stats.items(): if message_lengths[participant_id]: stats['avg_message_length'] = sum(message_lengths[participant_id]) / len(message_lengths[participant_id]) if total_participant_messages > 0: stats['participation_percentage'] = (stats['total_messages'] / total_participant_messages) * 100 # Recent activity analysis recent_messages = messages[-10:] if len(messages) > 10 else messages recent_activity = { 'last_speaker': recent_messages[-1].get('senderId', 'unknown') if recent_messages else 'none', 'last_message_type': recent_messages[-1].get('type', 'unknown') if recent_messages else 'none', 'messages_in_last_10': len([m for m in recent_messages if m.get('senderId') != 'moderator']), 'unique_speakers_in_last_10': len(set([m.get('senderId') for m in recent_messages if m.get('senderId') != 'moderator'])) } # Basic sentiment analysis (simple keyword-based for now) sentiment_analysis = ConversationContextService._analyze_sentiment(recent_messages) # Topic emergence detection topic_emergence = ConversationContextService._detect_emerging_topics(recent_messages) return { 'total_messages': len(messages), 'participant_stats': dict(participant_stats), 'recent_activity': recent_activity, 'sentiment_analysis': sentiment_analysis, 'topic_emergence': topic_emergence } @staticmethod def _analyze_sentiment(messages: List[Dict[str, Any]]) -> Dict[str, Any]: """Basic sentiment analysis of recent messages.""" if not messages: return {'overall_sentiment': 'neutral', 'agreement_level': 0.5, 'energy_level': 0.5} # Simple keyword-based sentiment analysis positive_words = ['good', 'great', 'excellent', 'love', 'like', 'amazing', 'wonderful', 'fantastic', 'agree', 'yes', 'definitely', 'absolutely'] negative_words = ['bad', 'terrible', 'hate', 'dislike', 'awful', 'horrible', 'disagree', 'no', 'never', 'wrong', 'problem', 'issue'] agreement_words = ['agree', 'yes', 'exactly', 'definitely', 'absolutely', 'same', 'similar', 'too', 'also'] disagreement_words = ['disagree', 'no', 'but', 'however', 'different', 'opposite', 'wrong', 'not'] positive_count = 0 negative_count = 0 agreement_count = 0 disagreement_count = 0 total_words = 0 for msg in messages: text = msg.get('text', '').lower() words = text.split() total_words += len(words) for word in words: if word in positive_words: positive_count += 1 elif word in negative_words: negative_count += 1 if word in agreement_words: agreement_count += 1 elif word in disagreement_words: disagreement_count += 1 # Calculate sentiment scores if total_words > 0: positive_ratio = positive_count / total_words negative_ratio = negative_count / total_words agreement_ratio = agreement_count / total_words disagreement_ratio = disagreement_count / total_words else: positive_ratio = negative_ratio = agreement_ratio = disagreement_ratio = 0 # Determine overall sentiment if positive_ratio > negative_ratio * 1.5: overall_sentiment = 'positive' elif negative_ratio > positive_ratio * 1.5: overall_sentiment = 'negative' else: overall_sentiment = 'neutral' # Calculate agreement level if agreement_ratio + disagreement_ratio > 0: agreement_level = agreement_ratio / (agreement_ratio + disagreement_ratio) else: agreement_level = 0.5 return { 'overall_sentiment': overall_sentiment, 'agreement_level': agreement_level, 'energy_level': min(1.0, (positive_ratio + negative_ratio) * 10), # Rough energy estimate 'sentiment_variance': abs(positive_ratio - negative_ratio) } @staticmethod def _detect_emerging_topics(messages: List[Dict[str, Any]]) -> List[str]: """Detect emerging topics from recent conversation.""" if not messages: return [] # Simple topic detection based on word frequency word_freq = Counter() # Common words to ignore stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'can', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we', 'they', 'me', 'him', 'her', 'us', 'them', 'my', 'your', 'his', 'her', 'its', 'our', 'their'} for msg in messages: text = msg.get('text', '').lower() words = [word.strip('.,!?;:"()[]{}') for word in text.split()] for word in words: if len(word) > 3 and word not in stop_words: word_freq[word] += 1 # Get most frequent words as potential topics emerging_topics = [word for word, count in word_freq.most_common(10) if count > 1] return emerging_topics @staticmethod def _get_discussion_guide_context(focus_group: Dict[str, Any]) -> str: """Get full detailed discussion guide context for the LLM.""" discussion_guide = focus_group.get('discussionGuide', '') if isinstance(discussion_guide, str): return discussion_guide elif isinstance(discussion_guide, dict): # For structured discussion guides, provide complete detail for LLM decision making title = discussion_guide.get('title', 'Unknown') sections = discussion_guide.get('sections', []) context = f"DISCUSSION GUIDE: {title}\n" context += f"Total Duration: {discussion_guide.get('total_duration', 'Unknown')} minutes\n\n" for i, section in enumerate(sections): section_id = section.get('id', f'section-{i}') section_title = section.get('title', 'Unknown Section') section_duration = section.get('duration', 0) section_type = section.get('type', 'unknown') context += f"SECTION {i+1}: {section_title} [ID: {section_id}]\n" context += f"- Type: {section_type}\n" context += f"- Duration: {section_duration} minutes\n" # Add section content if available if section.get('content'): context += f"- Overview: {section['content']}\n" # Add detailed activities activities = section.get('activities', []) if activities: context += f"- Activities ({len(activities)}):\n" for j, activity in enumerate(activities): activity_id = activity.get('id', f'activity-{j}') activity_content = activity.get('content', 'No content') activity_time = activity.get('time_limit', 'No limit') context += f" • [ID: {activity_id}] {activity_content} (Time: {activity_time} min)\n" # Add probes if available if activity.get('probes'): probes = activity['probes'] if isinstance(probes, list) and probes: # Handle both string and dict probes probe_strings = [] for probe in probes: if isinstance(probe, str): probe_strings.append(probe) elif isinstance(probe, dict): # Extract content from probe dict probe_strings.append(probe.get('content', str(probe))) else: probe_strings.append(str(probe)) context += f" Probes: {'; '.join(probe_strings)}\n" # Add detailed questions questions = section.get('questions', []) if questions: context += f"- Questions ({len(questions)}):\n" for j, question in enumerate(questions): question_id = question.get('id', f'question-{j}') question_content = question.get('content', 'No content') question_time = question.get('time_limit', 'No limit') context += f" • [ID: {question_id}] {question_content} (Time: {question_time} min)\n" # Add probes if available if question.get('probes'): probes = question['probes'] if isinstance(probes, list) and probes: # Handle both string and dict probes probe_strings = [] for probe in probes: if isinstance(probe, str): probe_strings.append(probe) elif isinstance(probe, dict): # Extract content from probe dict probe_strings.append(probe.get('content', str(probe))) else: probe_strings.append(str(probe)) context += f" Probes: {'; '.join(probe_strings)}\n" # Add subsections if available subsections = section.get('subsections', []) if subsections: context += f"- Subsections ({len(subsections)}):\n" for subsection in subsections: subsection_id = subsection.get('id', 'unknown') subsection_title = subsection.get('title', 'Unknown') subsection_duration = subsection.get('duration', 0) context += f" • {subsection_title} [ID: {subsection_id}] ({subsection_duration} min)\n" # Add subsection questions with prominent IDs subsection_questions = subsection.get('questions', []) if subsection_questions: context += f" Questions ({len(subsection_questions)}):\n" for k, sub_question in enumerate(subsection_questions): sub_question_id = sub_question.get('id', f'sub-question-{k}') sub_question_content = sub_question.get('content', 'No content') sub_question_time = sub_question.get('time_limit', 'No limit') context += f" - [ID: {sub_question_id}] {sub_question_content} (Time: {sub_question_time} min)\n" # Add probes if available if sub_question.get('probes'): probes = sub_question['probes'] if isinstance(probes, list) and probes: # Handle both string and dict probes probe_strings = [] for probe in probes: if isinstance(probe, str): probe_strings.append(probe) elif isinstance(probe, dict): # Extract content from probe dict probe_strings.append(probe.get('content', str(probe))) else: probe_strings.append(str(probe)) context += f" Probes: {'; '.join(probe_strings)}\n" context += "\n" return context else: return "No discussion guide available" @staticmethod def _get_current_section(focus_group: Dict[str, Any]) -> str: """Get current section information.""" moderator_position = focus_group.get('moderator_position', {}) discussion_guide = focus_group.get('discussionGuide', {}) if isinstance(discussion_guide, dict) and 'sections' in discussion_guide: section_index = moderator_position.get('section_index', 0) sections = discussion_guide['sections'] if section_index < len(sections): current_section = sections[section_index] return f"{current_section.get('title', 'Unknown')} ({current_section.get('type', 'unknown')})" return "Discussion in progress" @staticmethod def format_context_for_llm(context: Dict[str, Any]) -> str: """Format the context dictionary for LLM consumption.""" formatted_context = {} # Format participants context participants_text = "" for participant in context.get('participants_context', []): ocean_traits = participant.get('personality', {}).get('ocean_traits', {}) participants_text += f"\n**{participant['name']}** (ID: {participant['id']})\n" participants_text += f"- Demographics: {participant['demographics']['age']}, {participant['demographics']['gender']}, {participant['demographics']['occupation']}\n" participants_text += f"- Location: {participant['demographics']['location']}\n" participants_text += f"- Personality: {participant['personality']['description']}\n" if ocean_traits: participants_text += f"- OCEAN Traits: " traits = [] for trait, value in ocean_traits.items(): if isinstance(value, (int, float)): traits.append(f"{trait.capitalize()}: {value}/100") participants_text += ", ".join(traits) + "\n" goals = participant.get('personality', {}).get('goals', []) if goals: participants_text += f"- Goals: {', '.join(goals[:3])}\n" interests = participant.get('interests', '') if interests: participants_text += f"- Interests: {interests}\n" participants_text += "\n" # Format conversation history history_text = "" for msg in context.get('conversation_history', []): sender_name = msg['sender_id'] if msg['sender_type'] == 'moderator' else f"Participant {msg['sender_id']}" history_text += f"**{sender_name}**: {msg['content']}\n" # Format analytics analytics = context.get('conversation_analytics', {}) analytics_text = f""" **Participation Statistics:** - Total messages: {analytics.get('total_messages', 0)} - Recent activity: {analytics.get('recent_activity', {}).get('messages_in_last_10', 0)} messages in last 10 - Last speaker: {analytics.get('recent_activity', {}).get('last_speaker', 'unknown')} **Sentiment Analysis:** - Overall sentiment: {analytics.get('sentiment_analysis', {}).get('overall_sentiment', 'neutral')} - Agreement level: {analytics.get('sentiment_analysis', {}).get('agreement_level', 0.5):.1%} - Energy level: {analytics.get('sentiment_analysis', {}).get('energy_level', 0.5):.1%} **Emerging Topics:** {', '.join(analytics.get('topic_emergence', [])[:5])} """ formatted_context.update({ 'focus_group_topic': context.get('focus_group_topic', 'Unknown'), 'focus_group_duration': context.get('focus_group_duration', 60), 'current_time': context.get('current_time', 0), 'discussion_guide_context': context.get('discussion_guide_context', 'No guide available'), 'current_section': context.get('current_section', 'Unknown'), 'participants_context': participants_text, 'conversation_history': history_text, 'conversation_analytics': analytics_text }) return formatted_context # ================== MULTIMODAL CONVERSATION CONTEXT METHODS ================== @staticmethod async def build_multimodal_context(focus_group_id: str, messages: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: """ Build complete multimodal conversation context including text and images in proper sequence. Args: focus_group_id: The focus group ID messages: Optional list of messages (if not provided, will fetch from database) Returns: Dictionary containing structured conversation context for LLM consumption """ try: print(f"🎯 Building multimodal context for focus group {focus_group_id}") # Get messages with visual context if not provided if messages is None: messages = await FocusGroup.get_messages_with_visual_context(focus_group_id) # Get active visual context active_visual_context = await FocusGroup.get_active_visual_context(focus_group_id) print(f" - Total messages: {len(messages)}") print(f" - Active visual assets: {len(active_visual_context)}") # Build visual timeline visual_timeline = ConversationContextService._extract_visual_timeline(active_visual_context) # Build conversation context with images interspersed conversation_context = ConversationContextService._prepare_llm_context( messages, visual_timeline, focus_group_id ) # Build text-only context for backwards compatibility text_context = ConversationContextService._format_text_context_simple(messages) result = { "has_visual_context": len(active_visual_context) > 0, "conversation_context": conversation_context, "text_context": text_context, # For backwards compatibility "visual_timeline": visual_timeline, "total_messages": len(messages), "total_visual_assets": len(active_visual_context) } print(f"✅ Built multimodal context: {len(conversation_context)} context items") return result except Exception as e: print(f"❌ Error building multimodal context: {e}") raise Exception(f"Error building conversation context: {str(e)}") @staticmethod def _extract_visual_timeline(active_visual_context: List[Dict[str, Any]]) -> Dict[int, List[Dict[str, Any]]]: """ Extract timeline of when each image was introduced in the conversation. Args: active_visual_context: List of active visual asset records Returns: Dictionary mapping sequence numbers to lists of assets activated at that point """ visual_timeline = {} for asset in active_visual_context: sequence = asset.get("activated_at_sequence", 0) if sequence not in visual_timeline: visual_timeline[sequence] = [] visual_timeline[sequence].append(asset) print(f"📸 Visual timeline: {len(visual_timeline)} activation points") return visual_timeline @staticmethod def _prepare_llm_context( messages: List[Dict[str, Any]], visual_timeline: Dict[int, List[Dict[str, Any]]], focus_group_id: str ) -> List[Dict[str, Any]]: """ Prepare conversation context for LLM consumption with images interspersed in proper sequence. Args: messages: List of messages in chronological order visual_timeline: Timeline of visual asset activations focus_group_id: Focus group ID for asset path resolution Returns: List of context items including both text and image elements """ conversation_context = [] for i, message in enumerate(messages): sequence = i + 1 # Add text message to context sender = message.get('senderId', 'Unknown') text = message.get('text', '') msg_type = message.get('type', 'response') # Format sender name if msg_type == 'question': formatted_sender = "MODERATOR" elif msg_type == 'system': formatted_sender = "SYSTEM" else: formatted_sender = sender conversation_context.append({ "type": "text", "content": f"{formatted_sender}: {text}", "sequence": sequence, "message_id": message.get("_id"), "sender_id": sender, "message_type": msg_type }) # Add any visual assets that were activated at this sequence point if sequence in visual_timeline: for asset in visual_timeline[sequence]: asset_path = ConversationContextService._resolve_asset_path( focus_group_id, asset["filename"] ) display_reference = asset.get("display_reference", asset["filename"]) conversation_context.append({ "type": "image", "path": asset_path, "filename": asset["filename"], "display_reference": display_reference, "sequence": sequence, "activated_at_message_id": asset.get("activated_at_message_id"), "activation_timestamp": asset.get("activation_timestamp") }) print(f"🖼️ Added image to context: {asset['filename']} ({display_reference}) at sequence {sequence}") return conversation_context @staticmethod def _resolve_asset_path(focus_group_id: str, filename: str) -> str: """ Resolve the full path to a creative asset file. Args: focus_group_id: The focus group ID filename: The asset filename Returns: Full path to the asset file """ base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # Go up to backend/ asset_path = os.path.join(base_dir, 'uploads', f'focus-group-{focus_group_id}', filename) return asset_path @staticmethod def _format_text_context_simple(messages: List[Dict[str, Any]]) -> str: """ Format messages as text-only context for backwards compatibility. Args: messages: List of messages Returns: Formatted text context string """ if not messages: return "No previous messages." # Limit to the most recent messages for context recent_messages = messages[-50:] # Last 50 messages formatted = [] for msg in recent_messages: sender = msg.get('senderId', 'Unknown') text = msg.get('text', '') msg_type = msg.get('type', 'response') # Format differently based on message type if msg_type == 'question': formatted.append(f"MODERATOR ({sender}): {text}") elif msg_type == 'system': formatted.append(f"SYSTEM: {text}") else: formatted.append(f"{sender}: {text}") return "\n".join(formatted) @staticmethod async def get_current_visual_assets(focus_group_id: str) -> List[str]: """ Get list of asset paths that are currently active in conversation context. Args: focus_group_id: The focus group ID Returns: List of full paths to currently active visual assets """ try: active_context = await FocusGroup.get_active_visual_context(focus_group_id) asset_paths = [] for asset in active_context: asset_path = ConversationContextService._resolve_asset_path( focus_group_id, asset["filename"] ) asset_paths.append(asset_path) print(f"🎨 Current visual assets for {focus_group_id}: {len(asset_paths)} files") return asset_paths except Exception as e: print(f"❌ Error getting current visual assets: {e}") return [] @staticmethod async def has_visual_context(focus_group_id: str) -> bool: """ Check if a focus group currently has any active visual context. Args: focus_group_id: The focus group ID Returns: True if there are active visual assets, False otherwise """ try: active_context = await FocusGroup.get_active_visual_context(focus_group_id) return len(active_context) > 0 except Exception as e: print(f"❌ Error checking visual context: {e}") return False