""" Conversation State Manager Manages conversation state, analytics, and tracking for autonomous focus groups. """ from typing import Dict, List, Any, Optional from datetime import datetime, timedelta, timezone from collections import defaultdict import json from app.models.focus_group import FocusGroup class ConversationStateManager: """Manager for conversation state and analytics.""" def __init__(self, focus_group_id: str): self.focus_group_id = focus_group_id self.state_cache = {} self.analytics_cache = {} self.cache_ttl = 60 # seconds self.last_cache_update = None async def get_conversation_state(self) -> Dict[str, Any]: """ Get the current conversation state. Returns: Dictionary containing conversation state information """ try: # Check cache first if self._is_cache_valid(): return self.state_cache focus_group = await FocusGroup.find_by_id(self.focus_group_id) if not focus_group: return {"error": "Focus group not found"} # Get messages messages = await FocusGroup.get_messages(self.focus_group_id) # Calculate conversation state state = { "focus_group_id": self.focus_group_id, "status": focus_group.get('status', 'unknown'), "total_messages": len(messages), "participants": focus_group.get('participants', []), "created_at": focus_group.get('created_at'), "updated_at": focus_group.get('updated_at'), "conversation_flow": self._analyze_conversation_flow(messages), "current_topic": self._get_current_topic(messages), "participation_stats": self._calculate_participation_stats(messages, focus_group.get('participants', [])), "conversation_health": self._assess_conversation_health(messages), "moderator_position": focus_group.get('moderator_position', {}), "autonomous_state": { "is_autonomous": focus_group.get('status', '').startswith('autonomous'), "started_at": focus_group.get('autonomous_started_at'), "paused_at": focus_group.get('autonomous_paused_at'), "resumed_at": focus_group.get('autonomous_resumed_at'), "ended_at": focus_group.get('autonomous_ended_at') } } # Update cache self.state_cache = state self.last_cache_update = datetime.now(timezone.utc) return state except Exception as e: return {"error": f"Error getting conversation state: {str(e)}"} async def get_conversation_analytics(self) -> Dict[str, Any]: """ Get detailed conversation analytics. Returns: Dictionary containing conversation analytics """ try: # Check cache first if self._is_analytics_cache_valid(): return self.analytics_cache focus_group = await FocusGroup.find_by_id(self.focus_group_id) if not focus_group: return {"error": "Focus group not found"} messages = await FocusGroup.get_messages(self.focus_group_id) participants = focus_group.get('participants', []) analytics = { "focus_group_id": self.focus_group_id, "overview": self._generate_overview_analytics(messages, participants), "participation": self._generate_participation_analytics(messages, participants), "conversation_flow": self._generate_flow_analytics(messages), "sentiment_analysis": self._generate_sentiment_analytics(messages), "topic_analysis": self._generate_topic_analytics(messages), "timing_analysis": self._generate_timing_analytics(messages), "quality_metrics": self._generate_quality_metrics(messages), "recommendations": self._generate_recommendations(messages, participants) } # Update cache self.analytics_cache = analytics self.last_cache_update = datetime.now(timezone.utc) return analytics except Exception as e: return {"error": f"Error getting conversation analytics: {str(e)}"} async def update_conversation_state(self, updates: Dict[str, Any]) -> Dict[str, Any]: """ Update conversation state. Args: updates: Dictionary of updates to apply Returns: Dictionary containing update result """ try: # Update focus group success = await FocusGroup.update(self.focus_group_id, updates) if success: # Clear cache to force refresh self._clear_cache() return { "message": "Conversation state updated", "focus_group_id": self.focus_group_id, "updates": updates } else: return {"error": "Failed to update conversation state"} except Exception as e: return {"error": f"Error updating conversation state: {str(e)}"} async def start_autonomous_mode(self) -> Dict[str, Any]: """Start autonomous conversation mode.""" return await self.update_conversation_state({ 'status': 'ai_mode', 'autonomous_started_at': datetime.now(timezone.utc) }) async def end_autonomous_mode(self, reason: str = "completed") -> Dict[str, Any]: """End autonomous conversation mode.""" if reason == "completed": status = 'completed' else: status = 'active' return await self.update_conversation_state({ 'status': status, 'autonomous_ended_at': datetime.now(timezone.utc), 'completion_reason': reason }) def _is_cache_valid(self) -> bool: """Check if the state cache is still valid.""" if not self.last_cache_update or not self.state_cache: return False elapsed = (datetime.now(timezone.utc) - self.last_cache_update).total_seconds() return elapsed < self.cache_ttl def _is_analytics_cache_valid(self) -> bool: """Check if the analytics cache is still valid.""" if not self.last_cache_update or not self.analytics_cache: return False elapsed = (datetime.now(timezone.utc) - self.last_cache_update).total_seconds() return elapsed < self.cache_ttl def _clear_cache(self): """Clear all caches.""" self.state_cache = {} self.analytics_cache = {} self.last_cache_update = None def _analyze_conversation_flow(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze the flow of conversation.""" if not messages: return {"status": "no_messages", "flow_type": "none"} # Count message types message_types = defaultdict(int) for msg in messages: message_types[msg.get('type', 'unknown')] += 1 # Analyze flow patterns recent_senders = [] for msg in messages[-10:]: # Last 10 messages sender = msg.get('senderId', 'unknown') recent_senders.append(sender) # Determine flow characteristics unique_recent_senders = len(set(recent_senders)) total_recent = len(recent_senders) if unique_recent_senders == 1: flow_type = "monologue" elif unique_recent_senders == 2: flow_type = "dialogue" else: flow_type = "group_discussion" return { "status": "active" if len(messages) > 0 else "inactive", "flow_type": flow_type, "message_types": dict(message_types), "recent_participation": { "unique_speakers": unique_recent_senders, "total_messages": total_recent, "diversity_ratio": unique_recent_senders / total_recent if total_recent > 0 else 0 } } def _get_current_topic(self, messages: List[Dict[str, Any]]) -> str: """Get the current topic of discussion.""" if not messages: return "No discussion yet" # Get the most recent moderator question for msg in reversed(messages): if msg.get('senderId') == 'moderator' and msg.get('type') == 'question': return msg.get('text', 'Unknown topic')[:100] + "..." return "General discussion" def _calculate_participation_stats(self, messages: List[Dict[str, Any]], participants: List[str]) -> Dict[str, Any]: """Calculate participation statistics.""" stats = { "total_participants": len(participants), "active_participants": 0, "participation_balance": "unknown", "participant_details": {} } if not messages: return stats # Count messages per participant participant_counts = defaultdict(int) for msg in messages: sender = msg.get('senderId') if sender != 'moderator': participant_counts[sender] += 1 # Calculate statistics active_participants = len(participant_counts) stats["active_participants"] = active_participants # Calculate participation balance if active_participants > 0: message_counts = list(participant_counts.values()) max_messages = max(message_counts) min_messages = min(message_counts) if max_messages - min_messages <= 2: stats["participation_balance"] = "balanced" elif max_messages > min_messages * 2: stats["participation_balance"] = "unbalanced" else: stats["participation_balance"] = "moderately_balanced" # Detailed participant stats for participant_id in participants: stats["participant_details"][participant_id] = { "message_count": participant_counts[participant_id], "participation_percentage": (participant_counts[participant_id] / len([m for m in messages if m.get('senderId') != 'moderator'])) * 100 if any(m.get('senderId') != 'moderator' for m in messages) else 0, "is_active": participant_counts[participant_id] > 0 } return stats def _assess_conversation_health(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]: """Assess the health of the conversation.""" if not messages: return {"status": "inactive", "score": 0, "indicators": []} health_indicators = [] health_score = 0 # Check message frequency recent_messages = [m for m in messages if m.get('created_at')] if len(recent_messages) >= 5: health_score += 25 health_indicators.append("active_discussion") # Check participation diversity unique_senders = len(set(m.get('senderId') for m in messages[-10:] if m.get('senderId') != 'moderator')) if unique_senders >= 3: health_score += 25 health_indicators.append("diverse_participation") # Check message quality (length as proxy) avg_length = sum(len(m.get('text', '')) for m in messages) / len(messages) if avg_length >= 50: health_score += 25 health_indicators.append("substantive_responses") # Check for questions and engagement question_count = sum(1 for m in messages if '?' in m.get('text', '')) if question_count >= 3: health_score += 25 health_indicators.append("engaged_inquiry") # Determine overall health status if health_score >= 75: status = "excellent" elif health_score >= 50: status = "good" elif health_score >= 25: status = "fair" else: status = "poor" return { "status": status, "score": health_score, "indicators": health_indicators } def _generate_overview_analytics(self, messages: List[Dict[str, Any]], participants: List[str]) -> Dict[str, Any]: """Generate overview analytics.""" return { "total_messages": len(messages), "participant_messages": len([m for m in messages if m.get('senderId') != 'moderator']), "moderator_messages": len([m for m in messages if m.get('senderId') == 'moderator']), "total_participants": len(participants), "active_participants": len(set(m.get('senderId') for m in messages if m.get('senderId') != 'moderator')), "avg_message_length": sum(len(m.get('text', '')) for m in messages) / len(messages) if messages else 0, "conversation_duration": self._calculate_conversation_duration(messages) } def _generate_participation_analytics(self, messages: List[Dict[str, Any]], participants: List[str]) -> Dict[str, Any]: """Generate participation analytics.""" participant_stats = defaultdict(lambda: { "message_count": 0, "total_words": 0, "avg_message_length": 0, "participation_percentage": 0 }) total_participant_messages = 0 for msg in messages: sender = msg.get('senderId') if sender != 'moderator': text = msg.get('text', '') word_count = len(text.split()) participant_stats[sender]["message_count"] += 1 participant_stats[sender]["total_words"] += word_count total_participant_messages += 1 # Calculate percentages and averages for sender, stats in participant_stats.items(): if stats["message_count"] > 0: stats["avg_message_length"] = stats["total_words"] / stats["message_count"] stats["participation_percentage"] = (stats["message_count"] / total_participant_messages) * 100 return { "participant_stats": dict(participant_stats), "participation_balance": self._calculate_participation_balance(participant_stats), "dominant_participants": self._identify_dominant_participants(participant_stats), "quiet_participants": self._identify_quiet_participants(participants, participant_stats) } def _generate_flow_analytics(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate conversation flow analytics.""" if not messages: return {"interaction_patterns": [], "flow_quality": "none"} # Analyze interaction patterns interaction_patterns = [] for i in range(len(messages) - 1): current_sender = messages[i].get('senderId') next_sender = messages[i + 1].get('senderId') if current_sender != next_sender: interaction_patterns.append({ "from": current_sender, "to": next_sender, "type": "response" if next_sender != 'moderator' else "moderation" }) return { "interaction_patterns": interaction_patterns[-10:], # Last 10 interactions "flow_quality": self._assess_flow_quality(interaction_patterns), "turn_taking_analysis": self._analyze_turn_taking(messages) } def _generate_sentiment_analytics(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate sentiment analytics.""" # Simple sentiment analysis based on keywords positive_words = ['good', 'great', 'excellent', 'love', 'like', 'amazing', 'wonderful', 'agree', 'yes'] negative_words = ['bad', 'terrible', 'hate', 'dislike', 'awful', 'horrible', 'disagree', 'no', 'problem'] sentiment_scores = [] for msg in messages: text = msg.get('text', '').lower() positive_count = sum(1 for word in positive_words if word in text) negative_count = sum(1 for word in negative_words if word in text) if positive_count > negative_count: sentiment_scores.append(1) elif negative_count > positive_count: sentiment_scores.append(-1) else: sentiment_scores.append(0) avg_sentiment = sum(sentiment_scores) / len(sentiment_scores) if sentiment_scores else 0 return { "overall_sentiment": "positive" if avg_sentiment > 0.2 else "negative" if avg_sentiment < -0.2 else "neutral", "sentiment_trend": self._calculate_sentiment_trend(sentiment_scores), "sentiment_distribution": { "positive": sentiment_scores.count(1), "neutral": sentiment_scores.count(0), "negative": sentiment_scores.count(-1) } } def _generate_topic_analytics(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate topic analytics.""" # Simple topic extraction based on word frequency from collections import Counter all_words = [] for msg in messages: text = msg.get('text', '').lower() words = [word.strip('.,!?;:"()[]{}') for word in text.split()] all_words.extend(words) # Filter out common words stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were', 'i', 'you', 'it', 'we', 'they', 'this', 'that'} filtered_words = [word for word in all_words if len(word) > 3 and word not in stop_words] word_freq = Counter(filtered_words) return { "top_topics": [{"word": word, "frequency": freq} for word, freq in word_freq.most_common(10)], "topic_diversity": len(word_freq), "vocabulary_richness": len(set(all_words)) / len(all_words) if all_words else 0 } def _generate_timing_analytics(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate timing analytics.""" if not messages: return {"avg_response_time": 0, "message_frequency": 0} # Calculate response times (simplified) response_times = [] for i in range(1, len(messages)): current_time = messages[i].get('created_at') previous_time = messages[i - 1].get('created_at') if current_time and previous_time: if isinstance(current_time, datetime) and isinstance(previous_time, datetime): response_time = (current_time - previous_time).total_seconds() response_times.append(response_time) avg_response_time = sum(response_times) / len(response_times) if response_times else 0 return { "avg_response_time": avg_response_time, "message_frequency": len(messages) / max(1, self._calculate_conversation_duration(messages)), "response_time_distribution": self._categorize_response_times(response_times) } def _generate_quality_metrics(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]: """Generate quality metrics.""" if not messages: return {"engagement_score": 0, "depth_score": 0, "quality_score": 0} # Calculate engagement (questions, exclamations, etc.) engagement_indicators = 0 for msg in messages: text = msg.get('text', '') if '?' in text: engagement_indicators += 1 if '!' in text: engagement_indicators += 1 engagement_score = min(100, (engagement_indicators / len(messages)) * 100) # Calculate depth (message length, detail) total_words = sum(len(msg.get('text', '').split()) for msg in messages) avg_words_per_message = total_words / len(messages) depth_score = min(100, avg_words_per_message * 2) # Rough scoring # Overall quality score quality_score = (engagement_score + depth_score) / 2 return { "engagement_score": engagement_score, "depth_score": depth_score, "quality_score": quality_score, "total_words": total_words, "avg_words_per_message": avg_words_per_message } def _generate_recommendations(self, messages: List[Dict[str, Any]], participants: List[str]) -> List[str]: """Generate recommendations based on analytics.""" recommendations = [] # Analyze participation participant_counts = defaultdict(int) for msg in messages: if msg.get('senderId') != 'moderator': participant_counts[msg.get('senderId')] += 1 # Check for quiet participants quiet_participants = [p for p in participants if participant_counts[p] == 0] if quiet_participants: recommendations.append(f"Encourage participation from {len(quiet_participants)} quiet participants") # Check for dominant participants if participant_counts: max_messages = max(participant_counts.values()) total_messages = sum(participant_counts.values()) if max_messages > total_messages * 0.4: recommendations.append("One participant is dominating the conversation - consider moderating") # Check conversation depth if messages: avg_length = sum(len(msg.get('text', '')) for msg in messages) / len(messages) if avg_length < 30: recommendations.append("Responses are quite brief - consider asking follow-up questions") # Check for questions question_count = sum(1 for msg in messages if '?' in msg.get('text', '')) if question_count < len(messages) * 0.1: recommendations.append("Low level of inquiry - encourage more questions between participants") return recommendations[:5] # Return top 5 recommendations def _calculate_conversation_duration(self, messages: List[Dict[str, Any]]) -> float: """Calculate conversation duration in minutes.""" if not messages: return 0 timestamps = [msg.get('created_at') for msg in messages if msg.get('created_at')] if not timestamps: return 0 # Filter out non-datetime objects valid_timestamps = [ts for ts in timestamps if isinstance(ts, datetime)] if len(valid_timestamps) < 2: return 0 duration = (max(valid_timestamps) - min(valid_timestamps)).total_seconds() / 60 return duration def _calculate_participation_balance(self, participant_stats: Dict[str, Dict[str, Any]]) -> str: """Calculate participation balance.""" if not participant_stats: return "no_data" message_counts = [stats["message_count"] for stats in participant_stats.values()] if not message_counts: return "no_messages" max_messages = max(message_counts) min_messages = min(message_counts) if max_messages - min_messages <= 2: return "balanced" elif max_messages > min_messages * 3: return "highly_unbalanced" else: return "moderately_unbalanced" def _identify_dominant_participants(self, participant_stats: Dict[str, Dict[str, Any]]) -> List[str]: """Identify dominant participants.""" if not participant_stats: return [] # Find participants with >40% of messages dominant = [] for participant, stats in participant_stats.items(): if stats["participation_percentage"] > 40: dominant.append(participant) return dominant def _identify_quiet_participants(self, participants: List[str], participant_stats: Dict[str, Dict[str, Any]]) -> List[str]: """Identify quiet participants.""" quiet = [] for participant in participants: if participant not in participant_stats or participant_stats[participant]["message_count"] == 0: quiet.append(participant) return quiet def _assess_flow_quality(self, interaction_patterns: List[Dict[str, Any]]) -> str: """Assess the quality of conversation flow.""" if not interaction_patterns: return "none" # Check for varied interactions unique_interactions = len(set((p["from"], p["to"]) for p in interaction_patterns)) if unique_interactions >= len(interaction_patterns) * 0.7: return "excellent" elif unique_interactions >= len(interaction_patterns) * 0.5: return "good" elif unique_interactions >= len(interaction_patterns) * 0.3: return "fair" else: return "poor" def _analyze_turn_taking(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze turn-taking patterns.""" if len(messages) < 3: return {"pattern": "insufficient_data"} # Count consecutive messages from same sender consecutive_counts = [] current_sender = None current_count = 0 for msg in messages: sender = msg.get('senderId') if sender == current_sender: current_count += 1 else: if current_count > 0: consecutive_counts.append(current_count) current_sender = sender current_count = 1 if current_count > 0: consecutive_counts.append(current_count) avg_consecutive = sum(consecutive_counts) / len(consecutive_counts) if consecutive_counts else 0 return { "pattern": "healthy" if avg_consecutive <= 2 else "some_domination" if avg_consecutive <= 4 else "poor_turn_taking", "avg_consecutive_messages": avg_consecutive, "max_consecutive": max(consecutive_counts) if consecutive_counts else 0 } def _calculate_sentiment_trend(self, sentiment_scores: List[int]) -> str: """Calculate sentiment trend.""" if len(sentiment_scores) < 3: return "insufficient_data" # Compare first half vs second half mid_point = len(sentiment_scores) // 2 first_half_avg = sum(sentiment_scores[:mid_point]) / mid_point if mid_point > 0 else 0 second_half_avg = sum(sentiment_scores[mid_point:]) / (len(sentiment_scores) - mid_point) if second_half_avg > first_half_avg + 0.2: return "improving" elif second_half_avg < first_half_avg - 0.2: return "declining" else: return "stable" def _categorize_response_times(self, response_times: List[float]) -> Dict[str, int]: """Categorize response times.""" if not response_times: return {"fast": 0, "medium": 0, "slow": 0} fast = sum(1 for t in response_times if t < 30) # < 30 seconds medium = sum(1 for t in response_times if 30 <= t < 120) # 30s - 2min slow = sum(1 for t in response_times if t >= 120) # > 2min return {"fast": fast, "medium": medium, "slow": slow}