- _apply_silent_participant_override: new Python-level override that forces any participant who hasn't spoken yet into the next turn (fires before contrarian check, language-aware Russian/English call-out) - _execute_participant_respond: now sends call_out as a moderator message before generating the participant response (was silently skipped before, causing 0-action loops and incoherent conversation flow); uses .get() instead of [] to avoid KeyError when LLM omits optional fields - _apply_contrarian_override: language-aware call-out message (Russian if last moderator message contains Cyrillic) - conversation-decision-engine.md: explicit rules that call_out MUST name the participant and topic_context MUST give a specific angle; silent participants must be called before repeat speakers Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1401 lines
No EOL
68 KiB
Python
Executable file
1401 lines
No EOL
68 KiB
Python
Executable file
"""
|
||
Autonomous Conversation Controller
|
||
Orchestrates the autonomous conversation flow for focus groups using LLM decision making.
|
||
"""
|
||
|
||
from typing import Dict, Any, Optional, List
|
||
import asyncio
|
||
import time
|
||
from datetime import datetime, timedelta, timezone
|
||
import logging
|
||
|
||
from app.services.conversation_decision_service import ConversationDecisionService, ConversationDecisionError
|
||
from app.services.focus_group_response_service import generate_persona_response, FocusGroupResponseError
|
||
from app.services.ai_moderator_service import AIModeratorService
|
||
from app.models.focus_group import FocusGroup # Now fully async
|
||
from app.models.persona import Persona
|
||
|
||
|
||
class AutonomousConversationController:
|
||
"""Controller for managing autonomous conversation flow."""
|
||
|
||
def __init__(self, focus_group_id: str, logger: Optional[logging.Logger] = None):
|
||
self.focus_group_id = focus_group_id
|
||
self.logger = logger or logging.getLogger(__name__)
|
||
self.is_running = False
|
||
self.conversation_state = "idle" # idle, running, paused, completed, error
|
||
self.is_generating = False # Track when actively generating responses
|
||
self.last_action_time = None
|
||
self.action_count = 0
|
||
self.max_actions_per_session = 500 # Safety limit
|
||
|
||
# Timing configuration
|
||
self.min_delay_between_actions = 3 # seconds
|
||
self.max_delay_between_actions = 10 # seconds
|
||
self.response_timeout = 120 # seconds (LLM calls can be slow in production)
|
||
|
||
# Edge case tracking
|
||
self.consecutive_silence_count = 0
|
||
self.max_consecutive_silence = 3
|
||
self.participant_dominance_threshold = 0.4 # 40% of messages
|
||
|
||
# Reasoning history tracking
|
||
self.reasoning_history = [] # List of recent AI decisions with reasoning
|
||
self.max_reasoning_history = 20 # Keep last 20 decisions
|
||
|
||
# Initialize state from database on construction
|
||
self._initialize_state_from_database()
|
||
|
||
def _initialize_state_from_database(self):
|
||
"""Initialize the controller's state with defaults (database check happens during start)."""
|
||
# Set default state - actual database check will happen when start_autonomous_conversation is called
|
||
self.is_running = False
|
||
self.conversation_state = "idle"
|
||
self.logger.debug(f"Initialized controller with default state - database state will be checked on start")
|
||
|
||
async def start_autonomous_conversation(self, initial_prompt: Optional[str] = None) -> Dict[str, Any]:
|
||
"""
|
||
Start the autonomous conversation flow.
|
||
|
||
Args:
|
||
initial_prompt: Optional initial prompt to start the conversation
|
||
|
||
Returns:
|
||
Dictionary containing the start result
|
||
"""
|
||
try:
|
||
if self.is_running:
|
||
# If conversation is already running, reset it and restart
|
||
self.logger.info(f"Conversation already running for focus group {self.focus_group_id}, resetting state and restarting")
|
||
await self.stop_conversation("restart")
|
||
# Reset the running state to allow restart
|
||
self.is_running = False
|
||
self.conversation_state = "stopped"
|
||
|
||
# Validate focus group exists (using async model)
|
||
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
||
if not focus_group:
|
||
return {"error": "Focus group not found"}
|
||
|
||
# Check if focus group has participants
|
||
participants = focus_group.get('participants', [])
|
||
if not participants:
|
||
return {"error": "Focus group has no participants"}
|
||
|
||
# Update focus group status (using async model)
|
||
await FocusGroup.update(self.focus_group_id, {
|
||
'status': 'ai_mode',
|
||
'autonomous_started_at': datetime.now(timezone.utc)
|
||
})
|
||
|
||
self.is_running = True
|
||
self.conversation_state = "running"
|
||
self.action_count = 0
|
||
self.consecutive_silence_count = 0
|
||
|
||
self.logger.info(f"Starting autonomous conversation for focus group {self.focus_group_id}")
|
||
|
||
# Add initial prompt if provided
|
||
if initial_prompt:
|
||
await self._add_moderator_message(initial_prompt, "question")
|
||
|
||
# Start the conversation loop and run it to completion
|
||
# (This will now run in background thread from the API route)
|
||
result = await self._run_conversation_loop()
|
||
|
||
return {
|
||
"message": "Autonomous conversation started",
|
||
"focus_group_id": self.focus_group_id,
|
||
"state": self.conversation_state,
|
||
"conversation_result": result
|
||
}
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error starting autonomous conversation: {str(e)}")
|
||
self.conversation_state = "error"
|
||
return {"error": f"Failed to start autonomous conversation: {str(e)}"}
|
||
|
||
async def stop_conversation(self, reason: str = "manual_stop") -> Dict[str, Any]:
|
||
"""
|
||
Stop the autonomous conversation.
|
||
|
||
Args:
|
||
reason: Reason for stopping the conversation
|
||
|
||
Returns:
|
||
Dictionary containing the stop result
|
||
"""
|
||
try:
|
||
self.is_running = False
|
||
self.conversation_state = "completed"
|
||
|
||
# Update focus group status (using async model)
|
||
status = 'completed' if reason in ['completed', 'discussion_guide_completed', 'natural_completion'] else 'active'
|
||
await FocusGroup.update(self.focus_group_id, {
|
||
'status': status,
|
||
'autonomous_ended_at': datetime.now(timezone.utc),
|
||
'completion_reason': reason
|
||
})
|
||
|
||
# GPT-5 fix: Emit AI status update to notify frontend of completion
|
||
# The FocusGroup.update() will trigger the websocket event automatically
|
||
|
||
# Mode events are now handled by AIModeratorService.end_session_with_concluding_statement()
|
||
# to prevent duplicate mode event indicators
|
||
|
||
self.logger.info(f"Stopped autonomous conversation for focus group {self.focus_group_id}: {reason}")
|
||
|
||
# Add completion message only for certain reasons (not manual_stop)
|
||
completion_messages = {
|
||
"natural_completion": "We've reached the end of our discussion guide. Thank you everyone for your valuable insights.",
|
||
"discussion_guide_completed": "We've covered all topics in our discussion guide. Thank you everyone for your valuable insights and participation.",
|
||
"time_limit": "We've reached our scheduled time limit. Thank you for your participation in this focus group.",
|
||
"error": "The session has been ended due to a technical issue. Thank you for your participation."
|
||
}
|
||
|
||
# Only add completion message for specific reasons (skip manual_stop)
|
||
if reason in completion_messages:
|
||
# Use the AI moderator service to properly end the session with mode events
|
||
from app.services.ai_moderator_service import AIModeratorService
|
||
|
||
ending_result = await AIModeratorService.end_session_with_concluding_statement(
|
||
self.focus_group_id, reason
|
||
)
|
||
|
||
if "error" in ending_result:
|
||
self.logger.error(f"Error ending session with concluding statement: {ending_result['error']}")
|
||
# Fallback to simple message
|
||
completion_message = completion_messages[reason]
|
||
await self._add_moderator_message(completion_message, "system")
|
||
else:
|
||
self.logger.info(f"Successfully ended session with concluding statement: {ending_result.get('concluding_statement', '')[:100]}...")
|
||
elif reason == "manual_stop":
|
||
# For manual stops, add a mode event to indicate AI session concluded
|
||
mode_event_id = await FocusGroup.add_mode_event(
|
||
focus_group_id=self.focus_group_id,
|
||
event_type='ai_session_concluded'
|
||
)
|
||
if mode_event_id:
|
||
self.logger.info(f"🎯 Added AI session concluded mode event for manual stop: {mode_event_id}")
|
||
else:
|
||
self.logger.warning(f"Failed to add AI session concluded mode event for manual stop")
|
||
|
||
# For discussion guide completion, ensure all items are marked as completed (100% progress)
|
||
if reason in ["discussion_guide_completed", "natural_completion"]:
|
||
await self._mark_all_questions_completed()
|
||
|
||
return {
|
||
"message": "Autonomous conversation stopped",
|
||
"focus_group_id": self.focus_group_id,
|
||
"state": self.conversation_state,
|
||
"reason": reason,
|
||
"action_count": self.action_count
|
||
}
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error stopping conversation: {str(e)}")
|
||
return {"error": f"Failed to stop conversation: {str(e)}"}
|
||
|
||
async def _run_conversation_loop(self) -> Dict[str, Any]:
|
||
"""
|
||
Main conversation loop that makes decisions and executes actions.
|
||
|
||
Returns:
|
||
Dictionary containing the loop result
|
||
"""
|
||
try:
|
||
while self.is_running and self.action_count < self.max_actions_per_session:
|
||
# Check if we should continue
|
||
if not await self._should_continue_conversation():
|
||
break
|
||
|
||
# Make a decision about the next action
|
||
decision = await self._make_conversation_decision()
|
||
|
||
if not decision:
|
||
self.consecutive_silence_count += 1
|
||
if self.consecutive_silence_count >= self.max_consecutive_silence:
|
||
self.logger.warning("Too many consecutive silent decisions, ending conversation")
|
||
await self.stop_conversation("excessive_silence")
|
||
break
|
||
continue
|
||
|
||
# Execute the decision
|
||
result = await self._execute_decision(decision)
|
||
|
||
# Update reasoning history with execution result
|
||
reasoning_id = decision.get('reasoning_id')
|
||
await self._update_reasoning_execution(reasoning_id, result)
|
||
|
||
if result.get("error"):
|
||
self.logger.error(f"Error executing decision: {result['error']}")
|
||
continue
|
||
|
||
# Reset silence count on successful action
|
||
self.consecutive_silence_count = 0
|
||
self.action_count += 1
|
||
self.last_action_time = datetime.now(timezone.utc)
|
||
|
||
# GPT-5 fix: Yield to eventlet hub after each action to flush WebSocket frames
|
||
await self._yield_to_eventlet()
|
||
|
||
# Wait before next action
|
||
await self._wait_between_actions()
|
||
|
||
# Check for session end conditions
|
||
if decision.get("action") == "end_session":
|
||
await self.stop_conversation("natural_completion")
|
||
break
|
||
|
||
# Handle loop exit conditions
|
||
if self.action_count >= self.max_actions_per_session:
|
||
await self.stop_conversation("action_limit_reached")
|
||
|
||
return {
|
||
"message": "Conversation loop completed",
|
||
"action_count": self.action_count,
|
||
"state": self.conversation_state
|
||
}
|
||
|
||
except Exception as e:
|
||
# Handle quota exhaustion — pause gracefully instead of erroring
|
||
try:
|
||
from app.models.quota import QuotaExceededError
|
||
if isinstance(e, QuotaExceededError):
|
||
self.logger.warning(
|
||
f"Quota exceeded for focus group {self.focus_group_id}: {e}"
|
||
)
|
||
self.is_running = False
|
||
self.conversation_state = "paused_quota"
|
||
await FocusGroup.update(self.focus_group_id, {"status": "paused_quota"})
|
||
try:
|
||
from app.websocket_manager_async import websocket_manager
|
||
await websocket_manager.emit_to_focus_group(
|
||
self.focus_group_id,
|
||
"quota_exceeded",
|
||
{
|
||
"scope": e.scope,
|
||
"limit_usd": e.limit_usd,
|
||
"used_usd": e.used_usd,
|
||
"focus_group_id": self.focus_group_id,
|
||
},
|
||
)
|
||
except Exception as _we:
|
||
self.logger.warning(f"Could not emit quota_exceeded WS event: {_we}")
|
||
return {"error": "quota_exceeded", "scope": e.scope}
|
||
except ImportError:
|
||
pass
|
||
self.logger.error(f"Error in conversation loop: {str(e)}")
|
||
self.conversation_state = "error"
|
||
await self.stop_conversation("error")
|
||
return {"error": f"Conversation loop error: {str(e)}"}
|
||
|
||
async def _should_continue_conversation(self) -> bool:
|
||
"""Check if the conversation should continue based on various conditions."""
|
||
try:
|
||
# Check focus group status (using async model)
|
||
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
||
if not focus_group:
|
||
return False
|
||
|
||
current_status = focus_group.get('status', '')
|
||
|
||
# Check if autonomous mode should stop - any status other than 'ai_mode' means stop
|
||
if current_status != 'ai_mode':
|
||
self.is_running = False
|
||
self.logger.info(f"Autonomous conversation stopping due to status change: {current_status}")
|
||
return False
|
||
|
||
# Check if discussion guide is completed
|
||
if await self._is_discussion_guide_completed(focus_group):
|
||
await self.stop_conversation("discussion_guide_completed")
|
||
return False
|
||
|
||
# Note: Time limits are now informational only and don't stop conversations
|
||
# Users can see elapsed time in the UI but it won't auto-stop the session
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error checking conversation continuation: {str(e)}")
|
||
return False
|
||
|
||
async def _is_discussion_guide_completed(self, focus_group: Dict[str, Any]) -> bool:
|
||
"""
|
||
Check if the discussion guide has been completed.
|
||
|
||
Args:
|
||
focus_group: The focus group data
|
||
|
||
Returns:
|
||
True if the discussion guide is completed, False otherwise
|
||
"""
|
||
try:
|
||
discussion_guide = focus_group.get('discussionGuide', {})
|
||
|
||
# Handle legacy markdown format - never auto-complete for legacy guides
|
||
if isinstance(discussion_guide, str):
|
||
return False
|
||
|
||
# If no structured guide, don't auto-complete
|
||
if not discussion_guide or 'sections' not in discussion_guide:
|
||
return False
|
||
|
||
# Get current moderator position
|
||
moderator_position = focus_group.get('moderator_position', {
|
||
'section_index': 0,
|
||
'item_index': 0,
|
||
'item_type': 'activity'
|
||
})
|
||
|
||
sections = discussion_guide['sections']
|
||
section_index = moderator_position.get('section_index', 0)
|
||
item_index = moderator_position.get('item_index', 0)
|
||
item_type = moderator_position.get('item_type', 'activity')
|
||
|
||
# Check if we're past the last section
|
||
if section_index >= len(sections):
|
||
return True
|
||
|
||
# Check if we're in the last section and past all items
|
||
if section_index == len(sections) - 1:
|
||
last_section = sections[section_index]
|
||
|
||
# Get the total number of items in the last section
|
||
activities_count = len(last_section.get('activities', []))
|
||
questions_count = len(last_section.get('questions', []))
|
||
|
||
# If we're past all activities and questions, guide is complete
|
||
if item_type == 'activity' and item_index >= activities_count:
|
||
# Check if there are questions to move to
|
||
if questions_count > 0:
|
||
return False # Still have questions to cover
|
||
else:
|
||
return True # No questions, guide complete
|
||
elif item_type == 'question' and item_index >= questions_count:
|
||
return True # Past all questions in last section
|
||
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error checking discussion guide completion: {str(e)}")
|
||
return False
|
||
|
||
async def _make_conversation_decision(self) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
Make a decision about the next conversation action using the LLM.
|
||
|
||
Returns:
|
||
Dictionary containing the decision (with reasoning_id added), or None if no decision could be made
|
||
"""
|
||
try:
|
||
decision = await ConversationDecisionService.decide_next_action(
|
||
self.focus_group_id,
|
||
temperature=0.7,
|
||
mode='ai'
|
||
)
|
||
|
||
self.logger.info(f"LLM Decision: {decision['action']} - {decision['reasoning']}")
|
||
|
||
all_messages = await FocusGroup.get_messages(self.focus_group_id)
|
||
|
||
# Python override 1: force silent participants in before contrarian logic
|
||
decision = await self._apply_silent_participant_override(decision, all_messages)
|
||
if decision.get('reasoning', '').startswith('Python override: silent'):
|
||
self.logger.info(f"Silent override: {decision['reasoning']}")
|
||
|
||
# Python override 2: contrarian when consensus detected
|
||
decision = await self._apply_contrarian_override(decision, all_messages)
|
||
if decision.get('reasoning', '').startswith('Python override: consensus'):
|
||
self.logger.info(f"Contrarian override: {decision['reasoning']}")
|
||
|
||
# Store reasoning in history for UI display and get the database ID
|
||
reasoning_id = await self._store_reasoning(decision)
|
||
|
||
# Add the reasoning_id to the decision for later use
|
||
decision['reasoning_id'] = reasoning_id
|
||
|
||
return decision
|
||
|
||
except ConversationDecisionError as e:
|
||
self.logger.error(f"Error making conversation decision: {str(e)}")
|
||
return None
|
||
except Exception as e:
|
||
self.logger.error(f"Unexpected error in decision making: {str(e)}")
|
||
return None
|
||
|
||
async def _apply_silent_participant_override(
|
||
self, decision: Dict[str, Any], all_messages: List[Dict[str, Any]]
|
||
) -> Dict[str, Any]:
|
||
"""Force a silent participant into the conversation.
|
||
|
||
Fires when any participant hasn't spoken at all AND the decision is not
|
||
already targeting them. Skips if action is end_session or probe_trigger
|
||
(those are more important).
|
||
"""
|
||
if decision.get('action') in ('end_session',):
|
||
return decision
|
||
|
||
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
||
if not focus_group:
|
||
return decision
|
||
|
||
participant_ids = focus_group.get('participants', [])
|
||
if not participant_ids:
|
||
return decision
|
||
|
||
# Participants who have never spoken
|
||
spoke_ids = {m.get('senderId') for m in all_messages if m.get('senderId') != 'moderator'}
|
||
silent_ids = [pid for pid in participant_ids if str(pid) not in {str(s) for s in spoke_ids}]
|
||
|
||
if not silent_ids:
|
||
return decision # Everyone has spoken at least once
|
||
|
||
# Pick the first silent participant not spoken in the last 2 turns
|
||
recent_speakers = {m.get('senderId') for m in all_messages[-4:]}
|
||
candidates = [pid for pid in silent_ids if str(pid) not in {str(r) for r in recent_speakers}]
|
||
target_id = str(candidates[0] if candidates else silent_ids[0])
|
||
|
||
persona = await Persona.find_by_id(target_id)
|
||
if not persona:
|
||
return decision
|
||
|
||
target_name = persona.get('name', 'participant')
|
||
|
||
# Detect conversation language from last moderator message
|
||
last_mod = next(
|
||
(m.get('text', '') for m in reversed(all_messages) if m.get('senderId') == 'moderator'),
|
||
''
|
||
)
|
||
# Simple heuristic: Cyrillic chars → Russian call-out
|
||
is_russian = any('Ѐ' <= c <= 'ӿ' for c in last_mod)
|
||
if is_russian:
|
||
call_msg = f"{target_name}, мы ещё не слышали вас. Что думаете об этом?"
|
||
ctx = "Поделитесь своей точкой зрения на то, что обсуждалось. Будьте конкретны и честны в соответствии со своим характером."
|
||
else:
|
||
call_msg = f"{target_name}, we haven't heard from you yet — what's your take on this?"
|
||
ctx = "Share your perspective on what has been discussed. Be specific and authentic to your personality."
|
||
|
||
self.logger.info(f"🔕 Silent override: {target_name} has not spoken — forcing participation")
|
||
return {
|
||
'action': 'participant_respond',
|
||
'reasoning': f'Python override: silent participant — {target_name} has not spoken yet',
|
||
'details': {
|
||
'participant_id': target_id,
|
||
'call_out': call_msg,
|
||
'topic_context': ctx,
|
||
},
|
||
'discussion_guide_position_id': decision.get('discussion_guide_position_id', '1'),
|
||
}
|
||
|
||
async def _apply_contrarian_override(
|
||
self, decision: Dict[str, Any], all_messages: List[Dict[str, Any]]
|
||
) -> Dict[str, Any]:
|
||
"""Override moderator/probe action with a contrarian participant when consensus is detected.
|
||
|
||
Only fires when: action is moderator_speak or probe_trigger AND the last 8 participant
|
||
messages are heavily agreement-weighted AND a contrarian persona (low agreeableness or
|
||
high neuroticism) hasn't spoken in the last 6 turns.
|
||
"""
|
||
if decision.get('action') not in ('moderator_speak', 'probe_trigger'):
|
||
return decision
|
||
|
||
# Measure recent agreement level in the last 8 participant messages
|
||
participant_msgs = [m for m in all_messages if m.get('senderId') != 'moderator'][-8:]
|
||
if len(participant_msgs) < 3:
|
||
return decision # Not enough data
|
||
|
||
agreement_words = {'agree', 'yes', 'exactly', 'definitely', 'absolutely', 'same', 'too', 'also',
|
||
'согласен', 'согласна', 'конечно', 'точно', 'именно', 'да', 'тоже'}
|
||
disagreement_words = {'disagree', 'but', 'however', 'different', 'wrong', 'not',
|
||
'не', 'нет', 'однако', 'зато', 'хотя', 'против', 'сомневаюсь'}
|
||
agree_count = 0
|
||
total_words = 0
|
||
for m in participant_msgs:
|
||
words = m.get('text', '').lower().split()
|
||
total_words += len(words)
|
||
agree_count += sum(1 for w in words if w.strip(".,!?;:\"'") in agreement_words)
|
||
if total_words == 0:
|
||
return decision
|
||
agreement_ratio = agree_count / total_words
|
||
if agreement_ratio < 0.06: # ~6% threshold — below this, no override
|
||
return decision
|
||
|
||
# Find recent speakers (last 6 messages)
|
||
recent_speaker_ids = {m.get('senderId') for m in all_messages[-6:]}
|
||
|
||
# Load participants and find a contrarian not spoken recently
|
||
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
||
if not focus_group:
|
||
return decision
|
||
participant_ids = focus_group.get('participants', [])
|
||
contrarian = None
|
||
for pid in participant_ids:
|
||
if pid in recent_speaker_ids:
|
||
continue
|
||
persona = await Persona.find_by_id(pid)
|
||
if not persona:
|
||
continue
|
||
ocean = persona.get('oceanTraits', {})
|
||
agreeableness = ocean.get('agreeableness', 50)
|
||
neuroticism = ocean.get('neuroticism', 50)
|
||
personality = str(persona.get('personality', '')).lower()
|
||
is_contrarian = (
|
||
agreeableness < 40
|
||
or neuroticism > 65
|
||
or any(w in personality for w in ('skeptic', 'sceptic', 'critical', 'скептик', 'критик'))
|
||
)
|
||
if is_contrarian:
|
||
contrarian = (pid, persona.get('name', 'participant'))
|
||
break
|
||
|
||
if not contrarian:
|
||
return decision
|
||
|
||
contrarian_id, contrarian_name = contrarian
|
||
self.logger.info(
|
||
f"🔄 Contrarian override: high agreement ({agreement_ratio:.0%}) → calling {contrarian_name}"
|
||
)
|
||
|
||
# Language-aware call-out
|
||
all_msgs_local = all_messages # already in scope
|
||
last_mod_text = next(
|
||
(m.get('text', '') for m in reversed(all_msgs_local) if m.get('senderId') == 'moderator'),
|
||
''
|
||
)
|
||
is_ru = any('Ѐ' <= c <= 'ӿ' for c in last_mod_text)
|
||
if is_ru:
|
||
call_msg = f"{contrarian_name}, все вроде соглашаются — а вы что думаете? Есть сомнения или возражения?"
|
||
ctx_msg = "Группа пришла к единому мнению. Выразите свои конкретные сомнения, риски или минусы с вашей точки зрения."
|
||
else:
|
||
call_msg = f"{contrarian_name}, the group seems to agree — do you have any reservations or a different take?"
|
||
ctx_msg = "The group has been agreeing. Express your specific reservations, concerns, or the downsides you see from your perspective."
|
||
|
||
return {
|
||
'action': 'participant_respond',
|
||
'reasoning': (
|
||
f'Python override: consensus ({agreement_ratio:.0%} agreement ratio) — '
|
||
f'calling contrarian {contrarian_name}'
|
||
),
|
||
'details': {
|
||
'participant_id': str(contrarian_id),
|
||
'call_out': call_msg,
|
||
'topic_context': ctx_msg,
|
||
},
|
||
'discussion_guide_position_id': decision.get('discussion_guide_position_id', '1'),
|
||
}
|
||
|
||
async def _store_reasoning(self, decision: Dict[str, Any]) -> Optional[str]:
|
||
"""
|
||
Store reasoning from AI decision for UI display.
|
||
|
||
Args:
|
||
decision: The decision dictionary from ConversationDecisionService
|
||
|
||
Returns:
|
||
The ID of the stored reasoning entry, or None if failed
|
||
"""
|
||
try:
|
||
reasoning_entry = {
|
||
'timestamp': datetime.now(timezone.utc).isoformat(),
|
||
'action': decision.get('action', 'unknown'),
|
||
'reasoning': decision.get('reasoning', 'No reasoning provided'),
|
||
'details': decision.get('details', {}),
|
||
'execution_status': 'pending',
|
||
'execution_result': None
|
||
}
|
||
|
||
# Store to database for persistence
|
||
reasoning_id = await FocusGroup.add_reasoning_entry(self.focus_group_id, reasoning_entry)
|
||
|
||
# Also keep in memory for quick access during active session
|
||
reasoning_entry['_id'] = reasoning_id # Add the database ID
|
||
self.reasoning_history.insert(0, reasoning_entry)
|
||
|
||
# Maintain max history size in memory
|
||
if len(self.reasoning_history) > self.max_reasoning_history:
|
||
self.reasoning_history = self.reasoning_history[:self.max_reasoning_history]
|
||
|
||
return reasoning_id
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error storing reasoning: {str(e)}")
|
||
return None
|
||
|
||
async def _update_reasoning_execution(self, reasoning_id: Optional[str], execution_result: Dict[str, Any]) -> None:
|
||
"""
|
||
Update the reasoning entry with execution results.
|
||
|
||
Args:
|
||
reasoning_id: The ID of the reasoning entry to update
|
||
execution_result: The result from executing the decision
|
||
"""
|
||
try:
|
||
# Update the database record
|
||
if reasoning_id:
|
||
await FocusGroup.update_reasoning_execution(self.focus_group_id, reasoning_id, execution_result)
|
||
|
||
# Also update in memory for quick access during active session
|
||
if self.reasoning_history:
|
||
latest_entry = self.reasoning_history[0]
|
||
latest_entry['execution_status'] = 'success' if not execution_result.get('error') else 'error'
|
||
latest_entry['execution_result'] = execution_result
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error updating reasoning execution: {str(e)}")
|
||
|
||
|
||
async def _execute_decision(self, decision: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""
|
||
Execute a conversation decision.
|
||
|
||
Args:
|
||
decision: The decision to execute
|
||
|
||
Returns:
|
||
Dictionary containing the execution result
|
||
"""
|
||
try:
|
||
action = decision["action"]
|
||
details = decision["details"]
|
||
|
||
self.logger.info(f"Executing decision: {action}")
|
||
self.logger.debug(f"Decision details: {details}")
|
||
|
||
# For moderator_speak actions, update position FIRST so creative review detection works correctly
|
||
if action == "moderator_speak":
|
||
# Update position before creating message so current position is correct for image detection
|
||
self.logger.debug(f"Updating discussion guide position BEFORE moderator_speak execution")
|
||
await self._update_discussion_guide_position(decision)
|
||
result = await self._execute_moderator_speak(details)
|
||
|
||
elif action == "participant_respond":
|
||
result = await self._execute_participant_respond(details)
|
||
# Update position after other actions
|
||
if not result.get("error"):
|
||
self.logger.debug(f"Updating discussion guide position after successful {action}")
|
||
await self._update_discussion_guide_position(decision)
|
||
else:
|
||
self.logger.warning(f"Skipping position update due to action error: {result.get('error')}")
|
||
|
||
elif action == "participant_interaction":
|
||
result = await self._execute_participant_interaction(details)
|
||
# Update position after other actions
|
||
if not result.get("error"):
|
||
self.logger.debug(f"Updating discussion guide position after successful {action}")
|
||
await self._update_discussion_guide_position(decision)
|
||
else:
|
||
self.logger.warning(f"Skipping position update due to action error: {result.get('error')}")
|
||
|
||
elif action == "probe_trigger":
|
||
result = await self._execute_probe_trigger(details)
|
||
# Update position after other actions
|
||
if not result.get("error"):
|
||
self.logger.debug(f"Updating discussion guide position after successful {action}")
|
||
await self._update_discussion_guide_position(decision)
|
||
else:
|
||
self.logger.warning(f"Skipping position update due to action error: {result.get('error')}")
|
||
|
||
elif action == "end_session":
|
||
result = await self._execute_end_session(details)
|
||
# Update position after other actions
|
||
if not result.get("error"):
|
||
self.logger.debug(f"Updating discussion guide position after successful {action}")
|
||
await self._update_discussion_guide_position(decision)
|
||
else:
|
||
self.logger.warning(f"Skipping position update due to action error: {result.get('error')}")
|
||
|
||
else:
|
||
error_msg = f"Unknown action type: {action}"
|
||
self.logger.error(error_msg)
|
||
return {"error": error_msg}
|
||
|
||
self.logger.debug(f"Action {action} execution result: {result}")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
error_msg = f"Decision execution error: {str(e)}"
|
||
self.logger.error(f"❌ Error executing decision: {error_msg}")
|
||
import traceback
|
||
self.logger.error(f"❌ Full traceback: {traceback.format_exc()}")
|
||
return {"error": error_msg}
|
||
|
||
async def _execute_moderator_speak(self, details: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""Execute moderator speak action."""
|
||
try:
|
||
content = details["content"]
|
||
message_type = details.get("message_type", "question")
|
||
|
||
# Add moderator message
|
||
message_id = await self._add_moderator_message(content, message_type)
|
||
|
||
return {
|
||
"message": "Moderator message added",
|
||
"message_id": message_id,
|
||
"content": content
|
||
}
|
||
|
||
except Exception as e:
|
||
return {"error": f"Error in moderator speak: {str(e)}"}
|
||
|
||
async def _execute_participant_respond(self, details: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""Execute participant respond action."""
|
||
try:
|
||
self.is_generating = True
|
||
|
||
participant_id = details.get("participant_id") or details.get("id")
|
||
topic_context = details.get("topic_context") or details.get("topic") or ""
|
||
call_out = details.get("call_out") or details.get("callOut") or ""
|
||
|
||
if not participant_id:
|
||
return {"error": "participant_id missing from decision details"}
|
||
|
||
# Send moderator call-out message so the conversation flow is visible
|
||
if call_out:
|
||
await self._add_moderator_message(call_out, "question")
|
||
|
||
result = await self._generate_participant_response(participant_id, topic_context)
|
||
return result
|
||
|
||
except Exception as e:
|
||
error_msg = f"Error in participant respond: {str(e)}"
|
||
self.logger.error(f"❌ {error_msg}")
|
||
import traceback
|
||
self.logger.error(f"❌ Full traceback: {traceback.format_exc()}")
|
||
return {"error": error_msg}
|
||
finally:
|
||
self.is_generating = False # Always clear generating state
|
||
|
||
async def _execute_participant_interaction(self, details: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""Execute participant interaction action."""
|
||
try:
|
||
self.is_generating = True # Mark as generating
|
||
|
||
participant_ids = details["participant_ids"]
|
||
moderator_prompt = details["moderator_prompt"]
|
||
|
||
# Add moderator prompt for interaction
|
||
await self._add_moderator_message(moderator_prompt, "question")
|
||
|
||
# Fetch all messages once so we can find what each person last said
|
||
all_messages = await FocusGroup.get_messages(self.focus_group_id)
|
||
|
||
results = []
|
||
for participant_id in participant_ids:
|
||
# Build context: include the other participant's most recent message
|
||
other_ids = [pid for pid in participant_ids if pid != participant_id]
|
||
p2p_context = moderator_prompt
|
||
if other_ids and all_messages:
|
||
other_msgs = [
|
||
m for m in all_messages
|
||
if m.get('senderId') in other_ids and m.get('type') != 'question'
|
||
]
|
||
if other_msgs:
|
||
last_msg = other_msgs[-1]
|
||
other_persona = await Persona.find_by_id(other_ids[0])
|
||
other_name = other_persona.get('name', 'another participant') if other_persona else 'another participant'
|
||
p2p_context = (
|
||
f"{moderator_prompt}\n\n"
|
||
f"[DIRECT RESPONSE: You are responding to {other_name} who just said: "
|
||
f'"{last_msg.get("text", "").strip()}"]'
|
||
)
|
||
|
||
result = await self._generate_participant_response(participant_id, p2p_context)
|
||
results.append(result)
|
||
|
||
return {
|
||
"message": "Participant interaction executed",
|
||
"participant_responses": results
|
||
}
|
||
|
||
except Exception as e:
|
||
return {"error": f"Error in participant interaction: {str(e)}"}
|
||
finally:
|
||
self.is_generating = False # Always clear generating state
|
||
|
||
async def _execute_probe_trigger(self, details: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""Execute probe trigger action."""
|
||
try:
|
||
probe_question = details["probe_question"]
|
||
|
||
# Add probe question
|
||
message_id = await self._add_moderator_message(probe_question, "probe")
|
||
|
||
return {
|
||
"message": "Probe trigger executed",
|
||
"message_id": message_id,
|
||
"probe_question": probe_question
|
||
}
|
||
|
||
except Exception as e:
|
||
return {"error": f"Error in probe trigger: {str(e)}"}
|
||
|
||
async def _execute_end_session(self, details: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""Execute end session action."""
|
||
try:
|
||
closing_message = details["closing_message"]
|
||
completion_reason = details["completion_reason"]
|
||
|
||
# Add closing message
|
||
await self._add_moderator_message(closing_message, "system")
|
||
|
||
# Advance position past current item so final question shows as completed
|
||
try:
|
||
advance_result = await AIModeratorService.advance_discussion(self.focus_group_id)
|
||
if advance_result.get('error'):
|
||
self.logger.info(f"Could not advance past final item when ending session: {advance_result['error']}")
|
||
else:
|
||
self.logger.info("Advanced moderator position past final item - session complete")
|
||
except Exception as e:
|
||
self.logger.info(f"Error advancing position when ending session: {str(e)}")
|
||
|
||
# Stop the conversation
|
||
await self.stop_conversation(completion_reason)
|
||
|
||
return {
|
||
"message": "Session ended",
|
||
"completion_reason": completion_reason
|
||
}
|
||
|
||
except Exception as e:
|
||
return {"error": f"Error ending session: {str(e)}"}
|
||
|
||
async def _generate_participant_response(self, participant_id: str, topic: str) -> Dict[str, Any]:
|
||
"""Generate a response from a participant."""
|
||
try:
|
||
|
||
# Get participant data
|
||
persona = await Persona.find_by_id(participant_id)
|
||
if not persona:
|
||
error_msg = f"Participant {participant_id} not found"
|
||
self.logger.error(error_msg)
|
||
return {"error": error_msg}
|
||
|
||
|
||
# Get focus group data
|
||
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
||
if not focus_group:
|
||
error_msg = "Focus group not found"
|
||
self.logger.error(error_msg)
|
||
return {"error": error_msg}
|
||
|
||
|
||
# Get discussion guide
|
||
discussion_guide = focus_group.get('discussionGuide', '')
|
||
|
||
# Get the LLM model and GPT-5 parameters for this focus group
|
||
llm_model = focus_group.get('llm_model')
|
||
reasoning_effort = focus_group.get('reasoning_effort', 'medium')
|
||
verbosity = focus_group.get('verbosity', 'medium')
|
||
self.logger.info(f"🤖 Autonomous conversation using model: {llm_model or 'default (gpt-5.4)'} for focus group {self.focus_group_id}")
|
||
|
||
# Get recent messages and this persona's own history
|
||
messages = await FocusGroup.get_messages(self.focus_group_id)
|
||
recent_messages = messages[-20:] if len(messages) > 20 else messages
|
||
persona_own_history = [
|
||
m for m in messages
|
||
if m.get('senderId') == participant_id and m.get('type') != 'question'
|
||
][-8:] # Last 8 of their own messages for memory
|
||
|
||
# Generate response with timeout to prevent infinite hang
|
||
try:
|
||
response_text = await asyncio.wait_for(
|
||
generate_persona_response(
|
||
persona=persona,
|
||
current_topic=topic,
|
||
previous_messages=recent_messages,
|
||
temperature=0.7,
|
||
focus_group_id=self.focus_group_id,
|
||
llm_model=llm_model,
|
||
reasoning_effort=reasoning_effort,
|
||
verbosity=verbosity,
|
||
persona_own_history=persona_own_history,
|
||
),
|
||
timeout=self.response_timeout
|
||
)
|
||
except asyncio.TimeoutError:
|
||
error_msg = f"Participant response generation timed out after {self.response_timeout}s"
|
||
self.logger.error(f"⏱️ {error_msg}")
|
||
return {"error": error_msg}
|
||
except Exception as e:
|
||
self.logger.error(f"Error in generate_persona_response: {str(e)}")
|
||
import traceback
|
||
self.logger.error(f"❌ Full traceback: {traceback.format_exc()}")
|
||
raise
|
||
|
||
# Save message
|
||
message_data = {
|
||
"text": response_text,
|
||
"type": "response",
|
||
"senderId": participant_id
|
||
}
|
||
|
||
message_id = await FocusGroup.add_message(self.focus_group_id, message_data)
|
||
|
||
# GPT-5 fix: Yield after database write to flush WebSocket events
|
||
await self._yield_to_eventlet()
|
||
|
||
if not message_id:
|
||
error_msg = "Failed to save message to database - no message ID returned"
|
||
self.logger.error(error_msg)
|
||
return {"error": error_msg}
|
||
|
||
success_result = {
|
||
"message": "Participant response generated",
|
||
"participant_id": participant_id,
|
||
"response": response_text,
|
||
"message_id": message_id
|
||
}
|
||
return success_result
|
||
|
||
except FocusGroupResponseError as e:
|
||
error_msg = f"Error generating participant response: {str(e)}"
|
||
self.logger.error(f"❌ FocusGroupResponseError: {error_msg}")
|
||
return {"error": error_msg}
|
||
except Exception as e:
|
||
error_msg = f"Unexpected error generating response: {str(e)}"
|
||
self.logger.error(f"❌ Unexpected error: {error_msg}")
|
||
import traceback
|
||
self.logger.error(f"❌ Full traceback: {traceback.format_exc()}")
|
||
return {"error": error_msg}
|
||
|
||
async def _get_item_by_position_id(self, position_id: str) -> Optional[Dict[str, Any]]:
|
||
"""Get a discussion guide item by its position ID."""
|
||
try:
|
||
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
||
if not focus_group:
|
||
return None
|
||
|
||
discussion_guide = focus_group.get('discussionGuide')
|
||
if not discussion_guide or not isinstance(discussion_guide, dict):
|
||
return None
|
||
|
||
sections = discussion_guide.get('sections', [])
|
||
|
||
# Search through all sections and subsections for the item
|
||
for section in sections:
|
||
# Check activities in main section
|
||
for activity in section.get('activities', []):
|
||
if activity.get('id') == position_id:
|
||
return activity
|
||
|
||
# Check questions in main section
|
||
for question in section.get('questions', []):
|
||
if question.get('id') == position_id:
|
||
return question
|
||
|
||
# Check subsections
|
||
for subsection in section.get('subsections', []):
|
||
# Check activities in subsection
|
||
for activity in subsection.get('activities', []):
|
||
if activity.get('id') == position_id:
|
||
return activity
|
||
|
||
# Check questions in subsection
|
||
for question in subsection.get('questions', []):
|
||
if question.get('id') == position_id:
|
||
return question
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
self.logger.warning(f"Error getting item by position ID {position_id}: {e}")
|
||
return None
|
||
|
||
async def _add_moderator_message(self, content: str, message_type: str) -> Optional[str]:
|
||
"""Add a moderator message to the conversation."""
|
||
try:
|
||
# Initialize image detection variables
|
||
attached_assets = []
|
||
activates_visual_context = False
|
||
display_reference = None
|
||
|
||
# Check if current discussion guide item has image attachments
|
||
try:
|
||
from app.services.ai_moderator_service import AIModeratorService
|
||
from app.services.focus_group_response_service import extract_asset_filename_from_content
|
||
|
||
print(f"🔍 Checking current discussion guide item for image attachments")
|
||
|
||
moderator_status = await AIModeratorService.get_moderator_status(self.focus_group_id)
|
||
current_item = None
|
||
|
||
if moderator_status and 'moderator_position' in moderator_status:
|
||
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
||
if focus_group:
|
||
discussion_guide = focus_group.get('discussionGuide')
|
||
if discussion_guide and isinstance(discussion_guide, dict):
|
||
sections = discussion_guide.get('sections', [])
|
||
pos = moderator_status['moderator_position']
|
||
section_idx = pos.get('section_index', 0)
|
||
item_idx = pos.get('item_index', 0)
|
||
item_type = pos.get('item_type', 'activity')
|
||
subsection_idx = pos.get('subsection_index')
|
||
|
||
if section_idx < len(sections):
|
||
section = sections[section_idx]
|
||
|
||
# Get current item from subsection or section
|
||
if subsection_idx is not None and section.get('subsections'):
|
||
subsections = section['subsections']
|
||
if subsection_idx < len(subsections):
|
||
subsection = subsections[subsection_idx]
|
||
items = subsection.get('questions' if item_type == 'question' else 'activities', [])
|
||
if item_idx < len(items):
|
||
current_item = items[item_idx]
|
||
else:
|
||
# Section level
|
||
items = section.get('questions' if item_type == 'question' else 'activities', [])
|
||
if item_idx < len(items):
|
||
current_item = items[item_idx]
|
||
|
||
# Check if current item has visual asset metadata
|
||
if current_item:
|
||
print(f"🔍 Current item: {current_item.get('content', '')[:50]}...")
|
||
|
||
metadata = current_item.get('metadata', {})
|
||
visual_asset = metadata.get('visual_asset')
|
||
|
||
if visual_asset:
|
||
# Found image in metadata - use it
|
||
asset_filename = visual_asset.get('filename')
|
||
display_reference = visual_asset.get('display_reference')
|
||
|
||
print(f"🎨 Found image in current item metadata: {display_reference} -> {asset_filename}")
|
||
|
||
attached_assets = [asset_filename]
|
||
activates_visual_context = True
|
||
|
||
# Generate AI description and enhance the content
|
||
try:
|
||
from app.services.image_description_service import ImageDescriptionService, ImageDescriptionError
|
||
|
||
print(f"🎨 AI MODE: Generating description for {asset_filename}")
|
||
description = await ImageDescriptionService.generate_description(self.focus_group_id, asset_filename)
|
||
|
||
# Enhance the content with the description using display reference if available
|
||
if display_reference:
|
||
enhanced_content = ImageDescriptionService.enhance_creative_review_question_with_display_reference(
|
||
content, display_reference, description
|
||
)
|
||
else:
|
||
# Fallback to old method for legacy content
|
||
enhanced_content = ImageDescriptionService.enhance_creative_review_question(
|
||
content, asset_filename, description
|
||
)
|
||
|
||
# Update the content to use enhanced version
|
||
content = enhanced_content
|
||
|
||
print(f"✅ AI MODE: Enhanced moderator message with image description")
|
||
|
||
except ImageDescriptionError as desc_error:
|
||
print(f"⚠️ AI MODE: Failed to generate image description: {desc_error}")
|
||
self.logger.warning(f"Failed to generate image description in autonomous mode: {desc_error}")
|
||
# Continue with original content
|
||
else:
|
||
# No visual asset metadata, try legacy content parsing
|
||
activity_content = current_item.get('content', '')
|
||
asset_filename = extract_asset_filename_from_content(activity_content)
|
||
|
||
if asset_filename:
|
||
print(f"🔍 Legacy: Found asset filename in content: {asset_filename}")
|
||
attached_assets = [asset_filename]
|
||
activates_visual_context = True
|
||
else:
|
||
print(f"🔍 No image attachments found for current item")
|
||
else:
|
||
print(f"🔍 No current discussion guide item found")
|
||
except Exception as e:
|
||
self.logger.warning(f"Error checking for creative review activity: {e}")
|
||
print(f"⚠️ Error checking creative review: {e}")
|
||
|
||
print(f"🔍 FINAL RESULT: attached_assets={attached_assets}, activates_visual_context={activates_visual_context}")
|
||
|
||
# Create visual asset metadata for frontend display
|
||
visual_asset_metadata = None
|
||
if activates_visual_context and attached_assets and len(attached_assets) > 0:
|
||
# Create visual asset metadata that frontend expects
|
||
visual_asset_metadata = {
|
||
"filename": attached_assets[0], # Use first asset
|
||
"displayReference": display_reference or attached_assets[0] # Use display reference or filename as fallback
|
||
}
|
||
|
||
# Create message data with visual context information
|
||
message_data = {
|
||
"text": content,
|
||
"type": message_type,
|
||
"senderId": "moderator",
|
||
"attached_assets": attached_assets,
|
||
"activates_visual_context": activates_visual_context,
|
||
"visual_asset": visual_asset_metadata # Frontend needs this for image display
|
||
}
|
||
|
||
message_id = await FocusGroup.add_message(self.focus_group_id, message_data)
|
||
|
||
# GPT-5 fix: Yield after database write to flush WebSocket events
|
||
await self._yield_to_eventlet()
|
||
|
||
# Visual context activation is handled automatically by FocusGroup.add_message()
|
||
# when activates_visual_context=True and attached_assets are present
|
||
if activates_visual_context and attached_assets:
|
||
self.logger.info(f"✅ Added moderator message with visual context activation: {attached_assets}")
|
||
print(f"✅ VISUAL CONTEXT ACTIVATED: {attached_assets}")
|
||
|
||
return message_id
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error adding moderator message: {str(e)}")
|
||
return None
|
||
|
||
async def _update_discussion_guide_position(self, decision: Dict[str, Any]) -> None:
|
||
"""Update moderator position based on LLM decision."""
|
||
try:
|
||
# Check if LLM provided discussion guide position mapping
|
||
position_id = decision.get('discussion_guide_position_id')
|
||
action = decision.get('action', 'unknown')
|
||
|
||
if position_id:
|
||
# Validate that the position ID exists
|
||
section_id, item_id = await self._validate_position_id(position_id)
|
||
if section_id and item_id:
|
||
# LLM specified valid position - set it precisely
|
||
try:
|
||
result = await AIModeratorService.set_moderator_position(self.focus_group_id, section_id, item_id)
|
||
position_desc = f"position '{position_id}'"
|
||
|
||
if result.get('error'):
|
||
self.logger.warning(f"Failed to set LLM-specified position to {position_desc}: {result['error']}")
|
||
self.logger.info("🔄 Continuing with current position - next LLM decision will try again")
|
||
else:
|
||
self.logger.debug(f"Successfully set moderator position to LLM-specified {position_desc} (from {action} action)")
|
||
except Exception as e:
|
||
self.logger.warning(f"Error setting LLM-specified position: {str(e)}")
|
||
self.logger.info("🔄 Continuing with current position - next LLM decision will try again")
|
||
else:
|
||
# Invalid position ID
|
||
self.logger.warning(f"🚫 LLM specified invalid position '{position_id}', continuing with current position")
|
||
self.logger.info("🔄 Next LLM decision will try again with valid position mapping")
|
||
else:
|
||
# LLM didn't specify a position - continue with current position
|
||
self.logger.info(f"🔄 LLM didn't specify position for {action} action, continuing with current position")
|
||
self.logger.debug("Next LLM decision should include position mapping for better tracking")
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error updating discussion guide position: {str(e)}")
|
||
|
||
async def _validate_position_id(self, position_id: str) -> tuple[Optional[str], Optional[str]]:
|
||
"""Validate that the position ID exists and return the section_id and item_id it maps to."""
|
||
try:
|
||
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
||
if not focus_group:
|
||
return None, None
|
||
|
||
discussion_guide = focus_group.get('discussionGuide', {})
|
||
|
||
# Handle legacy format
|
||
if isinstance(discussion_guide, str):
|
||
self.logger.debug(f"Legacy discussion guide format, cannot validate position ID: {position_id}")
|
||
return None, None
|
||
|
||
# Handle structured format
|
||
if isinstance(discussion_guide, dict) and 'sections' in discussion_guide:
|
||
sections = discussion_guide['sections']
|
||
|
||
# Search through all sections for the position ID
|
||
for section in sections:
|
||
section_id = section.get('id')
|
||
|
||
# Check in top-level activities
|
||
if section.get('activities'):
|
||
for activity in section['activities']:
|
||
if activity.get('id') == position_id:
|
||
return section_id, position_id
|
||
|
||
# Check in top-level questions
|
||
if section.get('questions'):
|
||
for question in section['questions']:
|
||
if question.get('id') == position_id:
|
||
return section_id, position_id
|
||
|
||
# Check in subsections
|
||
if section.get('subsections'):
|
||
for subsection in section['subsections']:
|
||
# Check subsection activities
|
||
if subsection.get('activities'):
|
||
for activity in subsection['activities']:
|
||
if activity.get('id') == position_id:
|
||
return section_id, position_id
|
||
|
||
# Check subsection questions
|
||
if subsection.get('questions'):
|
||
for question in subsection['questions']:
|
||
if question.get('id') == position_id:
|
||
return section_id, position_id
|
||
|
||
# Position ID not found - log available IDs for debugging
|
||
available_positions = []
|
||
for section in sections:
|
||
# Collect from top-level activities and questions
|
||
if section.get('activities'):
|
||
available_positions.extend([a.get('id', 'no-id') for a in section['activities']])
|
||
if section.get('questions'):
|
||
available_positions.extend([q.get('id', 'no-id') for q in section['questions']])
|
||
|
||
# Collect from subsections
|
||
if section.get('subsections'):
|
||
for subsection in section['subsections']:
|
||
if subsection.get('activities'):
|
||
available_positions.extend([a.get('id', 'no-id') for a in subsection['activities']])
|
||
if subsection.get('questions'):
|
||
available_positions.extend([q.get('id', 'no-id') for q in subsection['questions']])
|
||
|
||
self.logger.warning(f"Position ID '{position_id}' not found. Available positions: {available_positions}")
|
||
return None, None
|
||
|
||
self.logger.debug(f"No structured discussion guide found, cannot validate position ID: {position_id}")
|
||
return None, None
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error validating position ID '{position_id}': {str(e)}")
|
||
return None, None
|
||
|
||
async def _fallback_advance_position(self) -> None:
|
||
"""Fallback method to advance position sequentially."""
|
||
try:
|
||
advance_result = await AIModeratorService.advance_discussion(self.focus_group_id)
|
||
if advance_result.get('error'):
|
||
self.logger.warning(f"Sequential advancement failed: {advance_result['error']}")
|
||
else:
|
||
self.logger.info(f"Sequential advancement successful: {advance_result.get('message', 'Success')}")
|
||
except Exception as e:
|
||
self.logger.warning(f"Failed to advance moderator position sequentially: {str(e)}")
|
||
|
||
async def _yield_to_eventlet(self):
|
||
"""Yield control to allow other tasks to run and flush WebSocket frames."""
|
||
try:
|
||
# Use asyncio sleep instead of socketio.sleep since we're in async context
|
||
await asyncio.sleep(0) # Yield to other tasks
|
||
except Exception as e:
|
||
self.logger.warning(f"Could not yield to event loop: {e}")
|
||
|
||
async def _wait_between_actions(self):
|
||
"""Wait an appropriate amount of time between actions."""
|
||
import random
|
||
|
||
# Random delay between min and max
|
||
delay = random.uniform(self.min_delay_between_actions, self.max_delay_between_actions)
|
||
await asyncio.sleep(delay)
|
||
|
||
async def get_conversation_status(self) -> Dict[str, Any]:
|
||
"""Get the current status of the autonomous conversation."""
|
||
try:
|
||
# Check the actual database state to determine if autonomous mode is truly running
|
||
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
||
if focus_group:
|
||
db_status = focus_group.get('status', 'unknown')
|
||
|
||
# Determine if autonomous mode is actually running based on database state
|
||
is_ai_mode = db_status == 'ai_mode'
|
||
|
||
# Update instance variables to match database state
|
||
if is_ai_mode:
|
||
self.is_running = True
|
||
self.conversation_state = "running"
|
||
else:
|
||
self.is_running = False
|
||
self.conversation_state = "idle"
|
||
|
||
self.logger.debug(f"Status check - DB status: {db_status}, is_running: {self.is_running}")
|
||
else:
|
||
self.logger.warning(f"Focus group {self.focus_group_id} not found during status check")
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error checking database status: {str(e)}")
|
||
# Keep existing instance state if database check fails
|
||
|
||
# Load reasoning history from database to ensure it persists across controller instances
|
||
reasoning_history = await FocusGroup.get_reasoning_history(self.focus_group_id, self.max_reasoning_history)
|
||
|
||
return {
|
||
"focus_group_id": self.focus_group_id,
|
||
"is_running": self.is_running,
|
||
"conversation_state": self.conversation_state,
|
||
"is_generating": self.is_generating,
|
||
"action_count": self.action_count,
|
||
"last_action_time": self.last_action_time.isoformat() if self.last_action_time else None,
|
||
"consecutive_silence_count": self.consecutive_silence_count,
|
||
"reasoning_history": reasoning_history # Now loads from database
|
||
}
|
||
|
||
async def _mark_all_questions_completed(self) -> None:
|
||
"""
|
||
Mark all questions and activities in the discussion guide as completed by setting
|
||
the moderator position to indicate 100% completion.
|
||
"""
|
||
try:
|
||
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
||
if not focus_group:
|
||
self.logger.error("Focus group not found when marking all questions completed")
|
||
return
|
||
|
||
discussion_guide = focus_group.get('discussionGuide', {})
|
||
|
||
# Handle legacy format - no position tracking needed
|
||
if isinstance(discussion_guide, str):
|
||
self.logger.debug("Legacy discussion guide format, no position update needed")
|
||
return
|
||
|
||
# Handle structured format
|
||
if not discussion_guide or 'sections' not in discussion_guide:
|
||
self.logger.debug("No structured discussion guide found, no position update needed")
|
||
return
|
||
|
||
sections = discussion_guide['sections']
|
||
if not sections:
|
||
self.logger.debug("No sections in discussion guide, no position update needed")
|
||
return
|
||
|
||
# Set moderator position to indicate all items are completed
|
||
# Position it past the last section to show 100% completion
|
||
last_section_index = len(sections) - 1
|
||
last_section = sections[last_section_index]
|
||
|
||
# Calculate the position that would indicate all items are done
|
||
# We set it past the last item in the last section
|
||
total_activities = len(last_section.get('activities', []))
|
||
total_questions = len(last_section.get('questions', []))
|
||
|
||
# Position it past all items to indicate completion
|
||
if total_questions > 0:
|
||
# If there are questions, position past the last question
|
||
completion_position = {
|
||
'section_index': last_section_index,
|
||
'item_index': total_questions, # Past the last question
|
||
'item_type': 'question'
|
||
}
|
||
elif total_activities > 0:
|
||
# If there are only activities, position past the last activity
|
||
completion_position = {
|
||
'section_index': last_section_index,
|
||
'item_index': total_activities, # Past the last activity
|
||
'item_type': 'activity'
|
||
}
|
||
else:
|
||
# Empty section, position past the section
|
||
completion_position = {
|
||
'section_index': last_section_index + 1,
|
||
'item_index': 0,
|
||
'item_type': 'activity'
|
||
}
|
||
|
||
# Update the focus group with the completion position
|
||
await FocusGroup.update(self.focus_group_id, {
|
||
'moderator_position': completion_position
|
||
})
|
||
|
||
self.logger.info(f"Marked all questions completed - set position to: {completion_position}")
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error marking all questions completed: {str(e)}") |