- Per-persona message history: each persona now sees their own 8 previous responses, preventing repetition and enabling position evolution - OCEAN archetype labels in decision engine context: instead of raw numbers, the decision LLM now sees "agreeableness: 72/100 [HIGH] — consensus-seeker" - P2P interaction context: when participants interact directly, each one now knows who they are responding to and what that person last said - Python-level contrarian override: when agreement ratio in recent messages exceeds 6% and a contrarian persona (low agreeableness or high neuroticism) hasn't spoken recently, Python overrides moderator/probe action to call them Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1315 lines
No EOL
63 KiB
Python
Executable file
1315 lines
No EOL
63 KiB
Python
Executable file
"""
|
|
Autonomous Conversation Controller
|
|
Orchestrates the autonomous conversation flow for focus groups using LLM decision making.
|
|
"""
|
|
|
|
from typing import Dict, Any, Optional, List
|
|
import asyncio
|
|
import time
|
|
from datetime import datetime, timedelta, timezone
|
|
import logging
|
|
|
|
from app.services.conversation_decision_service import ConversationDecisionService, ConversationDecisionError
|
|
from app.services.focus_group_response_service import generate_persona_response, FocusGroupResponseError
|
|
from app.services.ai_moderator_service import AIModeratorService
|
|
from app.models.focus_group import FocusGroup # Now fully async
|
|
from app.models.persona import Persona
|
|
|
|
|
|
class AutonomousConversationController:
|
|
"""Controller for managing autonomous conversation flow."""
|
|
|
|
def __init__(self, focus_group_id: str, logger: Optional[logging.Logger] = None):
|
|
self.focus_group_id = focus_group_id
|
|
self.logger = logger or logging.getLogger(__name__)
|
|
self.is_running = False
|
|
self.conversation_state = "idle" # idle, running, paused, completed, error
|
|
self.is_generating = False # Track when actively generating responses
|
|
self.last_action_time = None
|
|
self.action_count = 0
|
|
self.max_actions_per_session = 500 # Safety limit
|
|
|
|
# Timing configuration
|
|
self.min_delay_between_actions = 3 # seconds
|
|
self.max_delay_between_actions = 10 # seconds
|
|
self.response_timeout = 120 # seconds (LLM calls can be slow in production)
|
|
|
|
# Edge case tracking
|
|
self.consecutive_silence_count = 0
|
|
self.max_consecutive_silence = 3
|
|
self.participant_dominance_threshold = 0.4 # 40% of messages
|
|
|
|
# Reasoning history tracking
|
|
self.reasoning_history = [] # List of recent AI decisions with reasoning
|
|
self.max_reasoning_history = 20 # Keep last 20 decisions
|
|
|
|
# Initialize state from database on construction
|
|
self._initialize_state_from_database()
|
|
|
|
def _initialize_state_from_database(self):
|
|
"""Initialize the controller's state with defaults (database check happens during start)."""
|
|
# Set default state - actual database check will happen when start_autonomous_conversation is called
|
|
self.is_running = False
|
|
self.conversation_state = "idle"
|
|
self.logger.debug(f"Initialized controller with default state - database state will be checked on start")
|
|
|
|
async def start_autonomous_conversation(self, initial_prompt: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Start the autonomous conversation flow.
|
|
|
|
Args:
|
|
initial_prompt: Optional initial prompt to start the conversation
|
|
|
|
Returns:
|
|
Dictionary containing the start result
|
|
"""
|
|
try:
|
|
if self.is_running:
|
|
# If conversation is already running, reset it and restart
|
|
self.logger.info(f"Conversation already running for focus group {self.focus_group_id}, resetting state and restarting")
|
|
await self.stop_conversation("restart")
|
|
# Reset the running state to allow restart
|
|
self.is_running = False
|
|
self.conversation_state = "stopped"
|
|
|
|
# Validate focus group exists (using async model)
|
|
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
|
if not focus_group:
|
|
return {"error": "Focus group not found"}
|
|
|
|
# Check if focus group has participants
|
|
participants = focus_group.get('participants', [])
|
|
if not participants:
|
|
return {"error": "Focus group has no participants"}
|
|
|
|
# Update focus group status (using async model)
|
|
await FocusGroup.update(self.focus_group_id, {
|
|
'status': 'ai_mode',
|
|
'autonomous_started_at': datetime.now(timezone.utc)
|
|
})
|
|
|
|
self.is_running = True
|
|
self.conversation_state = "running"
|
|
self.action_count = 0
|
|
self.consecutive_silence_count = 0
|
|
|
|
self.logger.info(f"Starting autonomous conversation for focus group {self.focus_group_id}")
|
|
|
|
# Add initial prompt if provided
|
|
if initial_prompt:
|
|
await self._add_moderator_message(initial_prompt, "question")
|
|
|
|
# Start the conversation loop and run it to completion
|
|
# (This will now run in background thread from the API route)
|
|
result = await self._run_conversation_loop()
|
|
|
|
return {
|
|
"message": "Autonomous conversation started",
|
|
"focus_group_id": self.focus_group_id,
|
|
"state": self.conversation_state,
|
|
"conversation_result": result
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error starting autonomous conversation: {str(e)}")
|
|
self.conversation_state = "error"
|
|
return {"error": f"Failed to start autonomous conversation: {str(e)}"}
|
|
|
|
async def stop_conversation(self, reason: str = "manual_stop") -> Dict[str, Any]:
|
|
"""
|
|
Stop the autonomous conversation.
|
|
|
|
Args:
|
|
reason: Reason for stopping the conversation
|
|
|
|
Returns:
|
|
Dictionary containing the stop result
|
|
"""
|
|
try:
|
|
self.is_running = False
|
|
self.conversation_state = "completed"
|
|
|
|
# Update focus group status (using async model)
|
|
status = 'completed' if reason in ['completed', 'discussion_guide_completed', 'natural_completion'] else 'active'
|
|
await FocusGroup.update(self.focus_group_id, {
|
|
'status': status,
|
|
'autonomous_ended_at': datetime.now(timezone.utc),
|
|
'completion_reason': reason
|
|
})
|
|
|
|
# GPT-5 fix: Emit AI status update to notify frontend of completion
|
|
# The FocusGroup.update() will trigger the websocket event automatically
|
|
|
|
# Mode events are now handled by AIModeratorService.end_session_with_concluding_statement()
|
|
# to prevent duplicate mode event indicators
|
|
|
|
self.logger.info(f"Stopped autonomous conversation for focus group {self.focus_group_id}: {reason}")
|
|
|
|
# Add completion message only for certain reasons (not manual_stop)
|
|
completion_messages = {
|
|
"natural_completion": "We've reached the end of our discussion guide. Thank you everyone for your valuable insights.",
|
|
"discussion_guide_completed": "We've covered all topics in our discussion guide. Thank you everyone for your valuable insights and participation.",
|
|
"time_limit": "We've reached our scheduled time limit. Thank you for your participation in this focus group.",
|
|
"error": "The session has been ended due to a technical issue. Thank you for your participation."
|
|
}
|
|
|
|
# Only add completion message for specific reasons (skip manual_stop)
|
|
if reason in completion_messages:
|
|
# Use the AI moderator service to properly end the session with mode events
|
|
from app.services.ai_moderator_service import AIModeratorService
|
|
|
|
ending_result = await AIModeratorService.end_session_with_concluding_statement(
|
|
self.focus_group_id, reason
|
|
)
|
|
|
|
if "error" in ending_result:
|
|
self.logger.error(f"Error ending session with concluding statement: {ending_result['error']}")
|
|
# Fallback to simple message
|
|
completion_message = completion_messages[reason]
|
|
await self._add_moderator_message(completion_message, "system")
|
|
else:
|
|
self.logger.info(f"Successfully ended session with concluding statement: {ending_result.get('concluding_statement', '')[:100]}...")
|
|
elif reason == "manual_stop":
|
|
# For manual stops, add a mode event to indicate AI session concluded
|
|
mode_event_id = await FocusGroup.add_mode_event(
|
|
focus_group_id=self.focus_group_id,
|
|
event_type='ai_session_concluded'
|
|
)
|
|
if mode_event_id:
|
|
self.logger.info(f"🎯 Added AI session concluded mode event for manual stop: {mode_event_id}")
|
|
else:
|
|
self.logger.warning(f"Failed to add AI session concluded mode event for manual stop")
|
|
|
|
# For discussion guide completion, ensure all items are marked as completed (100% progress)
|
|
if reason in ["discussion_guide_completed", "natural_completion"]:
|
|
await self._mark_all_questions_completed()
|
|
|
|
return {
|
|
"message": "Autonomous conversation stopped",
|
|
"focus_group_id": self.focus_group_id,
|
|
"state": self.conversation_state,
|
|
"reason": reason,
|
|
"action_count": self.action_count
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error stopping conversation: {str(e)}")
|
|
return {"error": f"Failed to stop conversation: {str(e)}"}
|
|
|
|
async def _run_conversation_loop(self) -> Dict[str, Any]:
|
|
"""
|
|
Main conversation loop that makes decisions and executes actions.
|
|
|
|
Returns:
|
|
Dictionary containing the loop result
|
|
"""
|
|
try:
|
|
while self.is_running and self.action_count < self.max_actions_per_session:
|
|
# Check if we should continue
|
|
if not await self._should_continue_conversation():
|
|
break
|
|
|
|
# Make a decision about the next action
|
|
decision = await self._make_conversation_decision()
|
|
|
|
if not decision:
|
|
self.consecutive_silence_count += 1
|
|
if self.consecutive_silence_count >= self.max_consecutive_silence:
|
|
self.logger.warning("Too many consecutive silent decisions, ending conversation")
|
|
await self.stop_conversation("excessive_silence")
|
|
break
|
|
continue
|
|
|
|
# Execute the decision
|
|
result = await self._execute_decision(decision)
|
|
|
|
# Update reasoning history with execution result
|
|
reasoning_id = decision.get('reasoning_id')
|
|
await self._update_reasoning_execution(reasoning_id, result)
|
|
|
|
if result.get("error"):
|
|
self.logger.error(f"Error executing decision: {result['error']}")
|
|
continue
|
|
|
|
# Reset silence count on successful action
|
|
self.consecutive_silence_count = 0
|
|
self.action_count += 1
|
|
self.last_action_time = datetime.now(timezone.utc)
|
|
|
|
# GPT-5 fix: Yield to eventlet hub after each action to flush WebSocket frames
|
|
await self._yield_to_eventlet()
|
|
|
|
# Wait before next action
|
|
await self._wait_between_actions()
|
|
|
|
# Check for session end conditions
|
|
if decision.get("action") == "end_session":
|
|
await self.stop_conversation("natural_completion")
|
|
break
|
|
|
|
# Handle loop exit conditions
|
|
if self.action_count >= self.max_actions_per_session:
|
|
await self.stop_conversation("action_limit_reached")
|
|
|
|
return {
|
|
"message": "Conversation loop completed",
|
|
"action_count": self.action_count,
|
|
"state": self.conversation_state
|
|
}
|
|
|
|
except Exception as e:
|
|
# Handle quota exhaustion — pause gracefully instead of erroring
|
|
try:
|
|
from app.models.quota import QuotaExceededError
|
|
if isinstance(e, QuotaExceededError):
|
|
self.logger.warning(
|
|
f"Quota exceeded for focus group {self.focus_group_id}: {e}"
|
|
)
|
|
self.is_running = False
|
|
self.conversation_state = "paused_quota"
|
|
await FocusGroup.update(self.focus_group_id, {"status": "paused_quota"})
|
|
try:
|
|
from app.websocket_manager_async import websocket_manager
|
|
await websocket_manager.emit_to_focus_group(
|
|
self.focus_group_id,
|
|
"quota_exceeded",
|
|
{
|
|
"scope": e.scope,
|
|
"limit_usd": e.limit_usd,
|
|
"used_usd": e.used_usd,
|
|
"focus_group_id": self.focus_group_id,
|
|
},
|
|
)
|
|
except Exception as _we:
|
|
self.logger.warning(f"Could not emit quota_exceeded WS event: {_we}")
|
|
return {"error": "quota_exceeded", "scope": e.scope}
|
|
except ImportError:
|
|
pass
|
|
self.logger.error(f"Error in conversation loop: {str(e)}")
|
|
self.conversation_state = "error"
|
|
await self.stop_conversation("error")
|
|
return {"error": f"Conversation loop error: {str(e)}"}
|
|
|
|
async def _should_continue_conversation(self) -> bool:
|
|
"""Check if the conversation should continue based on various conditions."""
|
|
try:
|
|
# Check focus group status (using async model)
|
|
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
|
if not focus_group:
|
|
return False
|
|
|
|
current_status = focus_group.get('status', '')
|
|
|
|
# Check if autonomous mode should stop - any status other than 'ai_mode' means stop
|
|
if current_status != 'ai_mode':
|
|
self.is_running = False
|
|
self.logger.info(f"Autonomous conversation stopping due to status change: {current_status}")
|
|
return False
|
|
|
|
# Check if discussion guide is completed
|
|
if await self._is_discussion_guide_completed(focus_group):
|
|
await self.stop_conversation("discussion_guide_completed")
|
|
return False
|
|
|
|
# Note: Time limits are now informational only and don't stop conversations
|
|
# Users can see elapsed time in the UI but it won't auto-stop the session
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error checking conversation continuation: {str(e)}")
|
|
return False
|
|
|
|
async def _is_discussion_guide_completed(self, focus_group: Dict[str, Any]) -> bool:
|
|
"""
|
|
Check if the discussion guide has been completed.
|
|
|
|
Args:
|
|
focus_group: The focus group data
|
|
|
|
Returns:
|
|
True if the discussion guide is completed, False otherwise
|
|
"""
|
|
try:
|
|
discussion_guide = focus_group.get('discussionGuide', {})
|
|
|
|
# Handle legacy markdown format - never auto-complete for legacy guides
|
|
if isinstance(discussion_guide, str):
|
|
return False
|
|
|
|
# If no structured guide, don't auto-complete
|
|
if not discussion_guide or 'sections' not in discussion_guide:
|
|
return False
|
|
|
|
# Get current moderator position
|
|
moderator_position = focus_group.get('moderator_position', {
|
|
'section_index': 0,
|
|
'item_index': 0,
|
|
'item_type': 'activity'
|
|
})
|
|
|
|
sections = discussion_guide['sections']
|
|
section_index = moderator_position.get('section_index', 0)
|
|
item_index = moderator_position.get('item_index', 0)
|
|
item_type = moderator_position.get('item_type', 'activity')
|
|
|
|
# Check if we're past the last section
|
|
if section_index >= len(sections):
|
|
return True
|
|
|
|
# Check if we're in the last section and past all items
|
|
if section_index == len(sections) - 1:
|
|
last_section = sections[section_index]
|
|
|
|
# Get the total number of items in the last section
|
|
activities_count = len(last_section.get('activities', []))
|
|
questions_count = len(last_section.get('questions', []))
|
|
|
|
# If we're past all activities and questions, guide is complete
|
|
if item_type == 'activity' and item_index >= activities_count:
|
|
# Check if there are questions to move to
|
|
if questions_count > 0:
|
|
return False # Still have questions to cover
|
|
else:
|
|
return True # No questions, guide complete
|
|
elif item_type == 'question' and item_index >= questions_count:
|
|
return True # Past all questions in last section
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error checking discussion guide completion: {str(e)}")
|
|
return False
|
|
|
|
async def _make_conversation_decision(self) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Make a decision about the next conversation action using the LLM.
|
|
|
|
Returns:
|
|
Dictionary containing the decision (with reasoning_id added), or None if no decision could be made
|
|
"""
|
|
try:
|
|
decision = await ConversationDecisionService.decide_next_action(
|
|
self.focus_group_id,
|
|
temperature=0.7,
|
|
mode='ai'
|
|
)
|
|
|
|
self.logger.info(f"LLM Decision: {decision['action']} - {decision['reasoning']}")
|
|
|
|
# Python-level contrarian override: if consensus detected and LLM didn't call a skeptic
|
|
all_messages = await FocusGroup.get_messages(self.focus_group_id)
|
|
decision = await self._apply_contrarian_override(decision, all_messages)
|
|
if decision.get('reasoning', '').startswith('Python override'):
|
|
self.logger.info(f"Decision overridden to: {decision['action']} - {decision['reasoning']}")
|
|
|
|
# Store reasoning in history for UI display and get the database ID
|
|
reasoning_id = await self._store_reasoning(decision)
|
|
|
|
# Add the reasoning_id to the decision for later use
|
|
decision['reasoning_id'] = reasoning_id
|
|
|
|
return decision
|
|
|
|
except ConversationDecisionError as e:
|
|
self.logger.error(f"Error making conversation decision: {str(e)}")
|
|
return None
|
|
except Exception as e:
|
|
self.logger.error(f"Unexpected error in decision making: {str(e)}")
|
|
return None
|
|
|
|
async def _apply_contrarian_override(
|
|
self, decision: Dict[str, Any], all_messages: List[Dict[str, Any]]
|
|
) -> Dict[str, Any]:
|
|
"""Override moderator/probe action with a contrarian participant when consensus is detected.
|
|
|
|
Only fires when: action is moderator_speak or probe_trigger AND the last 8 participant
|
|
messages are heavily agreement-weighted AND a contrarian persona (low agreeableness or
|
|
high neuroticism) hasn't spoken in the last 6 turns.
|
|
"""
|
|
if decision.get('action') not in ('moderator_speak', 'probe_trigger'):
|
|
return decision
|
|
|
|
# Measure recent agreement level in the last 8 participant messages
|
|
participant_msgs = [m for m in all_messages if m.get('senderId') != 'moderator'][-8:]
|
|
if len(participant_msgs) < 3:
|
|
return decision # Not enough data
|
|
|
|
agreement_words = {'agree', 'yes', 'exactly', 'definitely', 'absolutely', 'same', 'too', 'also',
|
|
'согласен', 'согласна', 'конечно', 'точно', 'именно', 'да', 'тоже'}
|
|
disagreement_words = {'disagree', 'but', 'however', 'different', 'wrong', 'not',
|
|
'не', 'нет', 'однако', 'зато', 'хотя', 'против', 'сомневаюсь'}
|
|
agree_count = 0
|
|
total_words = 0
|
|
for m in participant_msgs:
|
|
words = m.get('text', '').lower().split()
|
|
total_words += len(words)
|
|
agree_count += sum(1 for w in words if w.strip(".,!?;:\"'") in agreement_words)
|
|
if total_words == 0:
|
|
return decision
|
|
agreement_ratio = agree_count / total_words
|
|
if agreement_ratio < 0.06: # ~6% threshold — below this, no override
|
|
return decision
|
|
|
|
# Find recent speakers (last 6 messages)
|
|
recent_speaker_ids = {m.get('senderId') for m in all_messages[-6:]}
|
|
|
|
# Load participants and find a contrarian not spoken recently
|
|
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
|
if not focus_group:
|
|
return decision
|
|
participant_ids = focus_group.get('participants', [])
|
|
contrarian = None
|
|
for pid in participant_ids:
|
|
if pid in recent_speaker_ids:
|
|
continue
|
|
persona = await Persona.find_by_id(pid)
|
|
if not persona:
|
|
continue
|
|
ocean = persona.get('oceanTraits', {})
|
|
agreeableness = ocean.get('agreeableness', 50)
|
|
neuroticism = ocean.get('neuroticism', 50)
|
|
personality = str(persona.get('personality', '')).lower()
|
|
is_contrarian = (
|
|
agreeableness < 40
|
|
or neuroticism > 65
|
|
or any(w in personality for w in ('skeptic', 'sceptic', 'critical', 'скептик', 'критик'))
|
|
)
|
|
if is_contrarian:
|
|
contrarian = (pid, persona.get('name', 'participant'))
|
|
break
|
|
|
|
if not contrarian:
|
|
return decision
|
|
|
|
contrarian_id, contrarian_name = contrarian
|
|
self.logger.info(
|
|
f"🔄 Contrarian override: high agreement ({agreement_ratio:.0%}) → calling {contrarian_name}"
|
|
)
|
|
return {
|
|
'action': 'participant_respond',
|
|
'reasoning': (
|
|
f'Python override: consensus detected (agreement ratio {agreement_ratio:.0%}), '
|
|
f'calling contrarian {contrarian_name}'
|
|
),
|
|
'details': {
|
|
'participant_id': str(contrarian_id),
|
|
'call_out': f"{contrarian_name}, we haven't heard from you — what's your take?",
|
|
'topic_context': (
|
|
'The group has been agreeing. Express your specific reservations, concerns, '
|
|
'or the downsides you see from your perspective.'
|
|
),
|
|
},
|
|
'discussion_guide_position_id': decision.get('discussion_guide_position_id', '1'),
|
|
}
|
|
|
|
async def _store_reasoning(self, decision: Dict[str, Any]) -> Optional[str]:
|
|
"""
|
|
Store reasoning from AI decision for UI display.
|
|
|
|
Args:
|
|
decision: The decision dictionary from ConversationDecisionService
|
|
|
|
Returns:
|
|
The ID of the stored reasoning entry, or None if failed
|
|
"""
|
|
try:
|
|
reasoning_entry = {
|
|
'timestamp': datetime.now(timezone.utc).isoformat(),
|
|
'action': decision.get('action', 'unknown'),
|
|
'reasoning': decision.get('reasoning', 'No reasoning provided'),
|
|
'details': decision.get('details', {}),
|
|
'execution_status': 'pending',
|
|
'execution_result': None
|
|
}
|
|
|
|
# Store to database for persistence
|
|
reasoning_id = await FocusGroup.add_reasoning_entry(self.focus_group_id, reasoning_entry)
|
|
|
|
# Also keep in memory for quick access during active session
|
|
reasoning_entry['_id'] = reasoning_id # Add the database ID
|
|
self.reasoning_history.insert(0, reasoning_entry)
|
|
|
|
# Maintain max history size in memory
|
|
if len(self.reasoning_history) > self.max_reasoning_history:
|
|
self.reasoning_history = self.reasoning_history[:self.max_reasoning_history]
|
|
|
|
return reasoning_id
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error storing reasoning: {str(e)}")
|
|
return None
|
|
|
|
async def _update_reasoning_execution(self, reasoning_id: Optional[str], execution_result: Dict[str, Any]) -> None:
|
|
"""
|
|
Update the reasoning entry with execution results.
|
|
|
|
Args:
|
|
reasoning_id: The ID of the reasoning entry to update
|
|
execution_result: The result from executing the decision
|
|
"""
|
|
try:
|
|
# Update the database record
|
|
if reasoning_id:
|
|
await FocusGroup.update_reasoning_execution(self.focus_group_id, reasoning_id, execution_result)
|
|
|
|
# Also update in memory for quick access during active session
|
|
if self.reasoning_history:
|
|
latest_entry = self.reasoning_history[0]
|
|
latest_entry['execution_status'] = 'success' if not execution_result.get('error') else 'error'
|
|
latest_entry['execution_result'] = execution_result
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error updating reasoning execution: {str(e)}")
|
|
|
|
|
|
async def _execute_decision(self, decision: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Execute a conversation decision.
|
|
|
|
Args:
|
|
decision: The decision to execute
|
|
|
|
Returns:
|
|
Dictionary containing the execution result
|
|
"""
|
|
try:
|
|
action = decision["action"]
|
|
details = decision["details"]
|
|
|
|
self.logger.info(f"Executing decision: {action}")
|
|
self.logger.debug(f"Decision details: {details}")
|
|
|
|
# For moderator_speak actions, update position FIRST so creative review detection works correctly
|
|
if action == "moderator_speak":
|
|
# Update position before creating message so current position is correct for image detection
|
|
self.logger.debug(f"Updating discussion guide position BEFORE moderator_speak execution")
|
|
await self._update_discussion_guide_position(decision)
|
|
result = await self._execute_moderator_speak(details)
|
|
|
|
elif action == "participant_respond":
|
|
result = await self._execute_participant_respond(details)
|
|
# Update position after other actions
|
|
if not result.get("error"):
|
|
self.logger.debug(f"Updating discussion guide position after successful {action}")
|
|
await self._update_discussion_guide_position(decision)
|
|
else:
|
|
self.logger.warning(f"Skipping position update due to action error: {result.get('error')}")
|
|
|
|
elif action == "participant_interaction":
|
|
result = await self._execute_participant_interaction(details)
|
|
# Update position after other actions
|
|
if not result.get("error"):
|
|
self.logger.debug(f"Updating discussion guide position after successful {action}")
|
|
await self._update_discussion_guide_position(decision)
|
|
else:
|
|
self.logger.warning(f"Skipping position update due to action error: {result.get('error')}")
|
|
|
|
elif action == "probe_trigger":
|
|
result = await self._execute_probe_trigger(details)
|
|
# Update position after other actions
|
|
if not result.get("error"):
|
|
self.logger.debug(f"Updating discussion guide position after successful {action}")
|
|
await self._update_discussion_guide_position(decision)
|
|
else:
|
|
self.logger.warning(f"Skipping position update due to action error: {result.get('error')}")
|
|
|
|
elif action == "end_session":
|
|
result = await self._execute_end_session(details)
|
|
# Update position after other actions
|
|
if not result.get("error"):
|
|
self.logger.debug(f"Updating discussion guide position after successful {action}")
|
|
await self._update_discussion_guide_position(decision)
|
|
else:
|
|
self.logger.warning(f"Skipping position update due to action error: {result.get('error')}")
|
|
|
|
else:
|
|
error_msg = f"Unknown action type: {action}"
|
|
self.logger.error(error_msg)
|
|
return {"error": error_msg}
|
|
|
|
self.logger.debug(f"Action {action} execution result: {result}")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
error_msg = f"Decision execution error: {str(e)}"
|
|
self.logger.error(f"❌ Error executing decision: {error_msg}")
|
|
import traceback
|
|
self.logger.error(f"❌ Full traceback: {traceback.format_exc()}")
|
|
return {"error": error_msg}
|
|
|
|
async def _execute_moderator_speak(self, details: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute moderator speak action."""
|
|
try:
|
|
content = details["content"]
|
|
message_type = details.get("message_type", "question")
|
|
|
|
# Add moderator message
|
|
message_id = await self._add_moderator_message(content, message_type)
|
|
|
|
return {
|
|
"message": "Moderator message added",
|
|
"message_id": message_id,
|
|
"content": content
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"error": f"Error in moderator speak: {str(e)}"}
|
|
|
|
async def _execute_participant_respond(self, details: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute participant respond action."""
|
|
try:
|
|
self.is_generating = True # Mark as generating
|
|
|
|
participant_id = details["participant_id"]
|
|
topic_context = details["topic_context"]
|
|
|
|
|
|
# Generate participant response
|
|
result = await self._generate_participant_response(participant_id, topic_context)
|
|
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error in participant respond: {str(e)}"
|
|
self.logger.error(f"❌ {error_msg}")
|
|
import traceback
|
|
self.logger.error(f"❌ Full traceback: {traceback.format_exc()}")
|
|
return {"error": error_msg}
|
|
finally:
|
|
self.is_generating = False # Always clear generating state
|
|
|
|
async def _execute_participant_interaction(self, details: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute participant interaction action."""
|
|
try:
|
|
self.is_generating = True # Mark as generating
|
|
|
|
participant_ids = details["participant_ids"]
|
|
moderator_prompt = details["moderator_prompt"]
|
|
|
|
# Add moderator prompt for interaction
|
|
await self._add_moderator_message(moderator_prompt, "question")
|
|
|
|
# Fetch all messages once so we can find what each person last said
|
|
all_messages = await FocusGroup.get_messages(self.focus_group_id)
|
|
|
|
results = []
|
|
for participant_id in participant_ids:
|
|
# Build context: include the other participant's most recent message
|
|
other_ids = [pid for pid in participant_ids if pid != participant_id]
|
|
p2p_context = moderator_prompt
|
|
if other_ids and all_messages:
|
|
other_msgs = [
|
|
m for m in all_messages
|
|
if m.get('senderId') in other_ids and m.get('type') != 'question'
|
|
]
|
|
if other_msgs:
|
|
last_msg = other_msgs[-1]
|
|
other_persona = await Persona.find_by_id(other_ids[0])
|
|
other_name = other_persona.get('name', 'another participant') if other_persona else 'another participant'
|
|
p2p_context = (
|
|
f"{moderator_prompt}\n\n"
|
|
f"[DIRECT RESPONSE: You are responding to {other_name} who just said: "
|
|
f'"{last_msg.get("text", "").strip()}"]'
|
|
)
|
|
|
|
result = await self._generate_participant_response(participant_id, p2p_context)
|
|
results.append(result)
|
|
|
|
return {
|
|
"message": "Participant interaction executed",
|
|
"participant_responses": results
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"error": f"Error in participant interaction: {str(e)}"}
|
|
finally:
|
|
self.is_generating = False # Always clear generating state
|
|
|
|
async def _execute_probe_trigger(self, details: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute probe trigger action."""
|
|
try:
|
|
probe_question = details["probe_question"]
|
|
|
|
# Add probe question
|
|
message_id = await self._add_moderator_message(probe_question, "probe")
|
|
|
|
return {
|
|
"message": "Probe trigger executed",
|
|
"message_id": message_id,
|
|
"probe_question": probe_question
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"error": f"Error in probe trigger: {str(e)}"}
|
|
|
|
async def _execute_end_session(self, details: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute end session action."""
|
|
try:
|
|
closing_message = details["closing_message"]
|
|
completion_reason = details["completion_reason"]
|
|
|
|
# Add closing message
|
|
await self._add_moderator_message(closing_message, "system")
|
|
|
|
# Advance position past current item so final question shows as completed
|
|
try:
|
|
advance_result = await AIModeratorService.advance_discussion(self.focus_group_id)
|
|
if advance_result.get('error'):
|
|
self.logger.info(f"Could not advance past final item when ending session: {advance_result['error']}")
|
|
else:
|
|
self.logger.info("Advanced moderator position past final item - session complete")
|
|
except Exception as e:
|
|
self.logger.info(f"Error advancing position when ending session: {str(e)}")
|
|
|
|
# Stop the conversation
|
|
await self.stop_conversation(completion_reason)
|
|
|
|
return {
|
|
"message": "Session ended",
|
|
"completion_reason": completion_reason
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"error": f"Error ending session: {str(e)}"}
|
|
|
|
async def _generate_participant_response(self, participant_id: str, topic: str) -> Dict[str, Any]:
|
|
"""Generate a response from a participant."""
|
|
try:
|
|
|
|
# Get participant data
|
|
persona = await Persona.find_by_id(participant_id)
|
|
if not persona:
|
|
error_msg = f"Participant {participant_id} not found"
|
|
self.logger.error(error_msg)
|
|
return {"error": error_msg}
|
|
|
|
|
|
# Get focus group data
|
|
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
|
if not focus_group:
|
|
error_msg = "Focus group not found"
|
|
self.logger.error(error_msg)
|
|
return {"error": error_msg}
|
|
|
|
|
|
# Get discussion guide
|
|
discussion_guide = focus_group.get('discussionGuide', '')
|
|
|
|
# Get the LLM model and GPT-5 parameters for this focus group
|
|
llm_model = focus_group.get('llm_model')
|
|
reasoning_effort = focus_group.get('reasoning_effort', 'medium')
|
|
verbosity = focus_group.get('verbosity', 'medium')
|
|
self.logger.info(f"🤖 Autonomous conversation using model: {llm_model or 'default (gpt-5.4)'} for focus group {self.focus_group_id}")
|
|
|
|
# Get recent messages and this persona's own history
|
|
messages = await FocusGroup.get_messages(self.focus_group_id)
|
|
recent_messages = messages[-20:] if len(messages) > 20 else messages
|
|
persona_own_history = [
|
|
m for m in messages
|
|
if m.get('senderId') == participant_id and m.get('type') != 'question'
|
|
][-8:] # Last 8 of their own messages for memory
|
|
|
|
# Generate response with timeout to prevent infinite hang
|
|
try:
|
|
response_text = await asyncio.wait_for(
|
|
generate_persona_response(
|
|
persona=persona,
|
|
current_topic=topic,
|
|
previous_messages=recent_messages,
|
|
temperature=0.7,
|
|
focus_group_id=self.focus_group_id,
|
|
llm_model=llm_model,
|
|
reasoning_effort=reasoning_effort,
|
|
verbosity=verbosity,
|
|
persona_own_history=persona_own_history,
|
|
),
|
|
timeout=self.response_timeout
|
|
)
|
|
except asyncio.TimeoutError:
|
|
error_msg = f"Participant response generation timed out after {self.response_timeout}s"
|
|
self.logger.error(f"⏱️ {error_msg}")
|
|
return {"error": error_msg}
|
|
except Exception as e:
|
|
self.logger.error(f"Error in generate_persona_response: {str(e)}")
|
|
import traceback
|
|
self.logger.error(f"❌ Full traceback: {traceback.format_exc()}")
|
|
raise
|
|
|
|
# Save message
|
|
message_data = {
|
|
"text": response_text,
|
|
"type": "response",
|
|
"senderId": participant_id
|
|
}
|
|
|
|
message_id = await FocusGroup.add_message(self.focus_group_id, message_data)
|
|
|
|
# GPT-5 fix: Yield after database write to flush WebSocket events
|
|
await self._yield_to_eventlet()
|
|
|
|
if not message_id:
|
|
error_msg = "Failed to save message to database - no message ID returned"
|
|
self.logger.error(error_msg)
|
|
return {"error": error_msg}
|
|
|
|
success_result = {
|
|
"message": "Participant response generated",
|
|
"participant_id": participant_id,
|
|
"response": response_text,
|
|
"message_id": message_id
|
|
}
|
|
return success_result
|
|
|
|
except FocusGroupResponseError as e:
|
|
error_msg = f"Error generating participant response: {str(e)}"
|
|
self.logger.error(f"❌ FocusGroupResponseError: {error_msg}")
|
|
return {"error": error_msg}
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error generating response: {str(e)}"
|
|
self.logger.error(f"❌ Unexpected error: {error_msg}")
|
|
import traceback
|
|
self.logger.error(f"❌ Full traceback: {traceback.format_exc()}")
|
|
return {"error": error_msg}
|
|
|
|
async def _get_item_by_position_id(self, position_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get a discussion guide item by its position ID."""
|
|
try:
|
|
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
|
if not focus_group:
|
|
return None
|
|
|
|
discussion_guide = focus_group.get('discussionGuide')
|
|
if not discussion_guide or not isinstance(discussion_guide, dict):
|
|
return None
|
|
|
|
sections = discussion_guide.get('sections', [])
|
|
|
|
# Search through all sections and subsections for the item
|
|
for section in sections:
|
|
# Check activities in main section
|
|
for activity in section.get('activities', []):
|
|
if activity.get('id') == position_id:
|
|
return activity
|
|
|
|
# Check questions in main section
|
|
for question in section.get('questions', []):
|
|
if question.get('id') == position_id:
|
|
return question
|
|
|
|
# Check subsections
|
|
for subsection in section.get('subsections', []):
|
|
# Check activities in subsection
|
|
for activity in subsection.get('activities', []):
|
|
if activity.get('id') == position_id:
|
|
return activity
|
|
|
|
# Check questions in subsection
|
|
for question in subsection.get('questions', []):
|
|
if question.get('id') == position_id:
|
|
return question
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Error getting item by position ID {position_id}: {e}")
|
|
return None
|
|
|
|
async def _add_moderator_message(self, content: str, message_type: str) -> Optional[str]:
|
|
"""Add a moderator message to the conversation."""
|
|
try:
|
|
# Initialize image detection variables
|
|
attached_assets = []
|
|
activates_visual_context = False
|
|
display_reference = None
|
|
|
|
# Check if current discussion guide item has image attachments
|
|
try:
|
|
from app.services.ai_moderator_service import AIModeratorService
|
|
from app.services.focus_group_response_service import extract_asset_filename_from_content
|
|
|
|
print(f"🔍 Checking current discussion guide item for image attachments")
|
|
|
|
moderator_status = await AIModeratorService.get_moderator_status(self.focus_group_id)
|
|
current_item = None
|
|
|
|
if moderator_status and 'moderator_position' in moderator_status:
|
|
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
|
if focus_group:
|
|
discussion_guide = focus_group.get('discussionGuide')
|
|
if discussion_guide and isinstance(discussion_guide, dict):
|
|
sections = discussion_guide.get('sections', [])
|
|
pos = moderator_status['moderator_position']
|
|
section_idx = pos.get('section_index', 0)
|
|
item_idx = pos.get('item_index', 0)
|
|
item_type = pos.get('item_type', 'activity')
|
|
subsection_idx = pos.get('subsection_index')
|
|
|
|
if section_idx < len(sections):
|
|
section = sections[section_idx]
|
|
|
|
# Get current item from subsection or section
|
|
if subsection_idx is not None and section.get('subsections'):
|
|
subsections = section['subsections']
|
|
if subsection_idx < len(subsections):
|
|
subsection = subsections[subsection_idx]
|
|
items = subsection.get('questions' if item_type == 'question' else 'activities', [])
|
|
if item_idx < len(items):
|
|
current_item = items[item_idx]
|
|
else:
|
|
# Section level
|
|
items = section.get('questions' if item_type == 'question' else 'activities', [])
|
|
if item_idx < len(items):
|
|
current_item = items[item_idx]
|
|
|
|
# Check if current item has visual asset metadata
|
|
if current_item:
|
|
print(f"🔍 Current item: {current_item.get('content', '')[:50]}...")
|
|
|
|
metadata = current_item.get('metadata', {})
|
|
visual_asset = metadata.get('visual_asset')
|
|
|
|
if visual_asset:
|
|
# Found image in metadata - use it
|
|
asset_filename = visual_asset.get('filename')
|
|
display_reference = visual_asset.get('display_reference')
|
|
|
|
print(f"🎨 Found image in current item metadata: {display_reference} -> {asset_filename}")
|
|
|
|
attached_assets = [asset_filename]
|
|
activates_visual_context = True
|
|
|
|
# Generate AI description and enhance the content
|
|
try:
|
|
from app.services.image_description_service import ImageDescriptionService, ImageDescriptionError
|
|
|
|
print(f"🎨 AI MODE: Generating description for {asset_filename}")
|
|
description = await ImageDescriptionService.generate_description(self.focus_group_id, asset_filename)
|
|
|
|
# Enhance the content with the description using display reference if available
|
|
if display_reference:
|
|
enhanced_content = ImageDescriptionService.enhance_creative_review_question_with_display_reference(
|
|
content, display_reference, description
|
|
)
|
|
else:
|
|
# Fallback to old method for legacy content
|
|
enhanced_content = ImageDescriptionService.enhance_creative_review_question(
|
|
content, asset_filename, description
|
|
)
|
|
|
|
# Update the content to use enhanced version
|
|
content = enhanced_content
|
|
|
|
print(f"✅ AI MODE: Enhanced moderator message with image description")
|
|
|
|
except ImageDescriptionError as desc_error:
|
|
print(f"⚠️ AI MODE: Failed to generate image description: {desc_error}")
|
|
self.logger.warning(f"Failed to generate image description in autonomous mode: {desc_error}")
|
|
# Continue with original content
|
|
else:
|
|
# No visual asset metadata, try legacy content parsing
|
|
activity_content = current_item.get('content', '')
|
|
asset_filename = extract_asset_filename_from_content(activity_content)
|
|
|
|
if asset_filename:
|
|
print(f"🔍 Legacy: Found asset filename in content: {asset_filename}")
|
|
attached_assets = [asset_filename]
|
|
activates_visual_context = True
|
|
else:
|
|
print(f"🔍 No image attachments found for current item")
|
|
else:
|
|
print(f"🔍 No current discussion guide item found")
|
|
except Exception as e:
|
|
self.logger.warning(f"Error checking for creative review activity: {e}")
|
|
print(f"⚠️ Error checking creative review: {e}")
|
|
|
|
print(f"🔍 FINAL RESULT: attached_assets={attached_assets}, activates_visual_context={activates_visual_context}")
|
|
|
|
# Create visual asset metadata for frontend display
|
|
visual_asset_metadata = None
|
|
if activates_visual_context and attached_assets and len(attached_assets) > 0:
|
|
# Create visual asset metadata that frontend expects
|
|
visual_asset_metadata = {
|
|
"filename": attached_assets[0], # Use first asset
|
|
"displayReference": display_reference or attached_assets[0] # Use display reference or filename as fallback
|
|
}
|
|
|
|
# Create message data with visual context information
|
|
message_data = {
|
|
"text": content,
|
|
"type": message_type,
|
|
"senderId": "moderator",
|
|
"attached_assets": attached_assets,
|
|
"activates_visual_context": activates_visual_context,
|
|
"visual_asset": visual_asset_metadata # Frontend needs this for image display
|
|
}
|
|
|
|
message_id = await FocusGroup.add_message(self.focus_group_id, message_data)
|
|
|
|
# GPT-5 fix: Yield after database write to flush WebSocket events
|
|
await self._yield_to_eventlet()
|
|
|
|
# Visual context activation is handled automatically by FocusGroup.add_message()
|
|
# when activates_visual_context=True and attached_assets are present
|
|
if activates_visual_context and attached_assets:
|
|
self.logger.info(f"✅ Added moderator message with visual context activation: {attached_assets}")
|
|
print(f"✅ VISUAL CONTEXT ACTIVATED: {attached_assets}")
|
|
|
|
return message_id
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error adding moderator message: {str(e)}")
|
|
return None
|
|
|
|
async def _update_discussion_guide_position(self, decision: Dict[str, Any]) -> None:
|
|
"""Update moderator position based on LLM decision."""
|
|
try:
|
|
# Check if LLM provided discussion guide position mapping
|
|
position_id = decision.get('discussion_guide_position_id')
|
|
action = decision.get('action', 'unknown')
|
|
|
|
if position_id:
|
|
# Validate that the position ID exists
|
|
section_id, item_id = await self._validate_position_id(position_id)
|
|
if section_id and item_id:
|
|
# LLM specified valid position - set it precisely
|
|
try:
|
|
result = await AIModeratorService.set_moderator_position(self.focus_group_id, section_id, item_id)
|
|
position_desc = f"position '{position_id}'"
|
|
|
|
if result.get('error'):
|
|
self.logger.warning(f"Failed to set LLM-specified position to {position_desc}: {result['error']}")
|
|
self.logger.info("🔄 Continuing with current position - next LLM decision will try again")
|
|
else:
|
|
self.logger.debug(f"Successfully set moderator position to LLM-specified {position_desc} (from {action} action)")
|
|
except Exception as e:
|
|
self.logger.warning(f"Error setting LLM-specified position: {str(e)}")
|
|
self.logger.info("🔄 Continuing with current position - next LLM decision will try again")
|
|
else:
|
|
# Invalid position ID
|
|
self.logger.warning(f"🚫 LLM specified invalid position '{position_id}', continuing with current position")
|
|
self.logger.info("🔄 Next LLM decision will try again with valid position mapping")
|
|
else:
|
|
# LLM didn't specify a position - continue with current position
|
|
self.logger.info(f"🔄 LLM didn't specify position for {action} action, continuing with current position")
|
|
self.logger.debug("Next LLM decision should include position mapping for better tracking")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error updating discussion guide position: {str(e)}")
|
|
|
|
async def _validate_position_id(self, position_id: str) -> tuple[Optional[str], Optional[str]]:
|
|
"""Validate that the position ID exists and return the section_id and item_id it maps to."""
|
|
try:
|
|
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
|
if not focus_group:
|
|
return None, None
|
|
|
|
discussion_guide = focus_group.get('discussionGuide', {})
|
|
|
|
# Handle legacy format
|
|
if isinstance(discussion_guide, str):
|
|
self.logger.debug(f"Legacy discussion guide format, cannot validate position ID: {position_id}")
|
|
return None, None
|
|
|
|
# Handle structured format
|
|
if isinstance(discussion_guide, dict) and 'sections' in discussion_guide:
|
|
sections = discussion_guide['sections']
|
|
|
|
# Search through all sections for the position ID
|
|
for section in sections:
|
|
section_id = section.get('id')
|
|
|
|
# Check in top-level activities
|
|
if section.get('activities'):
|
|
for activity in section['activities']:
|
|
if activity.get('id') == position_id:
|
|
return section_id, position_id
|
|
|
|
# Check in top-level questions
|
|
if section.get('questions'):
|
|
for question in section['questions']:
|
|
if question.get('id') == position_id:
|
|
return section_id, position_id
|
|
|
|
# Check in subsections
|
|
if section.get('subsections'):
|
|
for subsection in section['subsections']:
|
|
# Check subsection activities
|
|
if subsection.get('activities'):
|
|
for activity in subsection['activities']:
|
|
if activity.get('id') == position_id:
|
|
return section_id, position_id
|
|
|
|
# Check subsection questions
|
|
if subsection.get('questions'):
|
|
for question in subsection['questions']:
|
|
if question.get('id') == position_id:
|
|
return section_id, position_id
|
|
|
|
# Position ID not found - log available IDs for debugging
|
|
available_positions = []
|
|
for section in sections:
|
|
# Collect from top-level activities and questions
|
|
if section.get('activities'):
|
|
available_positions.extend([a.get('id', 'no-id') for a in section['activities']])
|
|
if section.get('questions'):
|
|
available_positions.extend([q.get('id', 'no-id') for q in section['questions']])
|
|
|
|
# Collect from subsections
|
|
if section.get('subsections'):
|
|
for subsection in section['subsections']:
|
|
if subsection.get('activities'):
|
|
available_positions.extend([a.get('id', 'no-id') for a in subsection['activities']])
|
|
if subsection.get('questions'):
|
|
available_positions.extend([q.get('id', 'no-id') for q in subsection['questions']])
|
|
|
|
self.logger.warning(f"Position ID '{position_id}' not found. Available positions: {available_positions}")
|
|
return None, None
|
|
|
|
self.logger.debug(f"No structured discussion guide found, cannot validate position ID: {position_id}")
|
|
return None, None
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error validating position ID '{position_id}': {str(e)}")
|
|
return None, None
|
|
|
|
async def _fallback_advance_position(self) -> None:
|
|
"""Fallback method to advance position sequentially."""
|
|
try:
|
|
advance_result = await AIModeratorService.advance_discussion(self.focus_group_id)
|
|
if advance_result.get('error'):
|
|
self.logger.warning(f"Sequential advancement failed: {advance_result['error']}")
|
|
else:
|
|
self.logger.info(f"Sequential advancement successful: {advance_result.get('message', 'Success')}")
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to advance moderator position sequentially: {str(e)}")
|
|
|
|
async def _yield_to_eventlet(self):
|
|
"""Yield control to allow other tasks to run and flush WebSocket frames."""
|
|
try:
|
|
# Use asyncio sleep instead of socketio.sleep since we're in async context
|
|
await asyncio.sleep(0) # Yield to other tasks
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not yield to event loop: {e}")
|
|
|
|
async def _wait_between_actions(self):
|
|
"""Wait an appropriate amount of time between actions."""
|
|
import random
|
|
|
|
# Random delay between min and max
|
|
delay = random.uniform(self.min_delay_between_actions, self.max_delay_between_actions)
|
|
await asyncio.sleep(delay)
|
|
|
|
async def get_conversation_status(self) -> Dict[str, Any]:
|
|
"""Get the current status of the autonomous conversation."""
|
|
try:
|
|
# Check the actual database state to determine if autonomous mode is truly running
|
|
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
|
if focus_group:
|
|
db_status = focus_group.get('status', 'unknown')
|
|
|
|
# Determine if autonomous mode is actually running based on database state
|
|
is_ai_mode = db_status == 'ai_mode'
|
|
|
|
# Update instance variables to match database state
|
|
if is_ai_mode:
|
|
self.is_running = True
|
|
self.conversation_state = "running"
|
|
else:
|
|
self.is_running = False
|
|
self.conversation_state = "idle"
|
|
|
|
self.logger.debug(f"Status check - DB status: {db_status}, is_running: {self.is_running}")
|
|
else:
|
|
self.logger.warning(f"Focus group {self.focus_group_id} not found during status check")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error checking database status: {str(e)}")
|
|
# Keep existing instance state if database check fails
|
|
|
|
# Load reasoning history from database to ensure it persists across controller instances
|
|
reasoning_history = await FocusGroup.get_reasoning_history(self.focus_group_id, self.max_reasoning_history)
|
|
|
|
return {
|
|
"focus_group_id": self.focus_group_id,
|
|
"is_running": self.is_running,
|
|
"conversation_state": self.conversation_state,
|
|
"is_generating": self.is_generating,
|
|
"action_count": self.action_count,
|
|
"last_action_time": self.last_action_time.isoformat() if self.last_action_time else None,
|
|
"consecutive_silence_count": self.consecutive_silence_count,
|
|
"reasoning_history": reasoning_history # Now loads from database
|
|
}
|
|
|
|
async def _mark_all_questions_completed(self) -> None:
|
|
"""
|
|
Mark all questions and activities in the discussion guide as completed by setting
|
|
the moderator position to indicate 100% completion.
|
|
"""
|
|
try:
|
|
focus_group = await FocusGroup.find_by_id(self.focus_group_id)
|
|
if not focus_group:
|
|
self.logger.error("Focus group not found when marking all questions completed")
|
|
return
|
|
|
|
discussion_guide = focus_group.get('discussionGuide', {})
|
|
|
|
# Handle legacy format - no position tracking needed
|
|
if isinstance(discussion_guide, str):
|
|
self.logger.debug("Legacy discussion guide format, no position update needed")
|
|
return
|
|
|
|
# Handle structured format
|
|
if not discussion_guide or 'sections' not in discussion_guide:
|
|
self.logger.debug("No structured discussion guide found, no position update needed")
|
|
return
|
|
|
|
sections = discussion_guide['sections']
|
|
if not sections:
|
|
self.logger.debug("No sections in discussion guide, no position update needed")
|
|
return
|
|
|
|
# Set moderator position to indicate all items are completed
|
|
# Position it past the last section to show 100% completion
|
|
last_section_index = len(sections) - 1
|
|
last_section = sections[last_section_index]
|
|
|
|
# Calculate the position that would indicate all items are done
|
|
# We set it past the last item in the last section
|
|
total_activities = len(last_section.get('activities', []))
|
|
total_questions = len(last_section.get('questions', []))
|
|
|
|
# Position it past all items to indicate completion
|
|
if total_questions > 0:
|
|
# If there are questions, position past the last question
|
|
completion_position = {
|
|
'section_index': last_section_index,
|
|
'item_index': total_questions, # Past the last question
|
|
'item_type': 'question'
|
|
}
|
|
elif total_activities > 0:
|
|
# If there are only activities, position past the last activity
|
|
completion_position = {
|
|
'section_index': last_section_index,
|
|
'item_index': total_activities, # Past the last activity
|
|
'item_type': 'activity'
|
|
}
|
|
else:
|
|
# Empty section, position past the section
|
|
completion_position = {
|
|
'section_index': last_section_index + 1,
|
|
'item_index': 0,
|
|
'item_type': 'activity'
|
|
}
|
|
|
|
# Update the focus group with the completion position
|
|
await FocusGroup.update(self.focus_group_id, {
|
|
'moderator_position': completion_position
|
|
})
|
|
|
|
self.logger.info(f"Marked all questions completed - set position to: {completion_position}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error marking all questions completed: {str(e)}") |