""" Autonomous Conversation Controller Orchestrates the autonomous conversation flow for focus groups using LLM decision making. """ from typing import Dict, Any, Optional, List import asyncio import time from datetime import datetime, timedelta, timezone import logging from app.services.conversation_decision_service import ConversationDecisionService, ConversationDecisionError from app.services.focus_group_response_service import generate_persona_response, FocusGroupResponseError from app.services.ai_moderator_service import AIModeratorService from app.models.focus_group import FocusGroup # Now fully async from app.models.persona import Persona class AutonomousConversationController: """Controller for managing autonomous conversation flow.""" def __init__(self, focus_group_id: str, logger: Optional[logging.Logger] = None): self.focus_group_id = focus_group_id self.logger = logger or logging.getLogger(__name__) self.is_running = False self.conversation_state = "idle" # idle, running, paused, completed, error self.is_generating = False # Track when actively generating responses self.last_action_time = None self.action_count = 0 self.max_actions_per_session = 500 # Safety limit # Timing configuration self.min_delay_between_actions = 3 # seconds self.max_delay_between_actions = 10 # seconds self.response_timeout = 120 # seconds (LLM calls can be slow in production) # Edge case tracking self.consecutive_silence_count = 0 self.max_consecutive_silence = 3 self.participant_dominance_threshold = 0.4 # 40% of messages # Reasoning history tracking self.reasoning_history = [] # List of recent AI decisions with reasoning self.max_reasoning_history = 20 # Keep last 20 decisions # Initialize state from database on construction self._initialize_state_from_database() def _initialize_state_from_database(self): """Initialize the controller's state with defaults (database check happens during start).""" # Set default state - actual database check will happen when start_autonomous_conversation is called self.is_running = False self.conversation_state = "idle" self.logger.debug(f"Initialized controller with default state - database state will be checked on start") async def start_autonomous_conversation(self, initial_prompt: Optional[str] = None) -> Dict[str, Any]: """ Start the autonomous conversation flow. Args: initial_prompt: Optional initial prompt to start the conversation Returns: Dictionary containing the start result """ try: if self.is_running: # If conversation is already running, reset it and restart self.logger.info(f"Conversation already running for focus group {self.focus_group_id}, resetting state and restarting") await self.stop_conversation("restart") # Reset the running state to allow restart self.is_running = False self.conversation_state = "stopped" # Validate focus group exists (using async model) focus_group = await FocusGroup.find_by_id(self.focus_group_id) if not focus_group: return {"error": "Focus group not found"} # Check if focus group has participants participants = focus_group.get('participants', []) if not participants: return {"error": "Focus group has no participants"} # Update focus group status (using async model) await FocusGroup.update(self.focus_group_id, { 'status': 'ai_mode', 'autonomous_started_at': datetime.now(timezone.utc) }) self.is_running = True self.conversation_state = "running" self.action_count = 0 self.consecutive_silence_count = 0 self.logger.info(f"Starting autonomous conversation for focus group {self.focus_group_id}") # Add initial prompt if provided if initial_prompt: await self._add_moderator_message(initial_prompt, "question") # Start the conversation loop and run it to completion # (This will now run in background thread from the API route) result = await self._run_conversation_loop() return { "message": "Autonomous conversation started", "focus_group_id": self.focus_group_id, "state": self.conversation_state, "conversation_result": result } except Exception as e: self.logger.error(f"Error starting autonomous conversation: {str(e)}") self.conversation_state = "error" return {"error": f"Failed to start autonomous conversation: {str(e)}"} async def stop_conversation(self, reason: str = "manual_stop") -> Dict[str, Any]: """ Stop the autonomous conversation. Args: reason: Reason for stopping the conversation Returns: Dictionary containing the stop result """ try: self.is_running = False self.conversation_state = "completed" # Update focus group status (using async model) status = 'completed' if reason in ['completed', 'discussion_guide_completed', 'natural_completion'] else 'active' await FocusGroup.update(self.focus_group_id, { 'status': status, 'autonomous_ended_at': datetime.now(timezone.utc), 'completion_reason': reason }) # GPT-5 fix: Emit AI status update to notify frontend of completion # The FocusGroup.update() will trigger the websocket event automatically # Mode events are now handled by AIModeratorService.end_session_with_concluding_statement() # to prevent duplicate mode event indicators self.logger.info(f"Stopped autonomous conversation for focus group {self.focus_group_id}: {reason}") # Add completion message only for certain reasons (not manual_stop) completion_messages = { "natural_completion": "We've reached the end of our discussion guide. Thank you everyone for your valuable insights.", "discussion_guide_completed": "We've covered all topics in our discussion guide. Thank you everyone for your valuable insights and participation.", "time_limit": "We've reached our scheduled time limit. Thank you for your participation in this focus group.", "error": "The session has been ended due to a technical issue. Thank you for your participation." } # Only add completion message for specific reasons (skip manual_stop) if reason in completion_messages: # Use the AI moderator service to properly end the session with mode events from app.services.ai_moderator_service import AIModeratorService ending_result = await AIModeratorService.end_session_with_concluding_statement( self.focus_group_id, reason ) if "error" in ending_result: self.logger.error(f"Error ending session with concluding statement: {ending_result['error']}") # Fallback to simple message completion_message = completion_messages[reason] await self._add_moderator_message(completion_message, "system") else: self.logger.info(f"Successfully ended session with concluding statement: {ending_result.get('concluding_statement', '')[:100]}...") elif reason == "manual_stop": # For manual stops, add a mode event to indicate AI session concluded mode_event_id = await FocusGroup.add_mode_event( focus_group_id=self.focus_group_id, event_type='ai_session_concluded' ) if mode_event_id: self.logger.info(f"🎯 Added AI session concluded mode event for manual stop: {mode_event_id}") else: self.logger.warning(f"Failed to add AI session concluded mode event for manual stop") # For discussion guide completion, ensure all items are marked as completed (100% progress) if reason in ["discussion_guide_completed", "natural_completion"]: await self._mark_all_questions_completed() return { "message": "Autonomous conversation stopped", "focus_group_id": self.focus_group_id, "state": self.conversation_state, "reason": reason, "action_count": self.action_count } except Exception as e: self.logger.error(f"Error stopping conversation: {str(e)}") return {"error": f"Failed to stop conversation: {str(e)}"} async def _run_conversation_loop(self) -> Dict[str, Any]: """ Main conversation loop that makes decisions and executes actions. Returns: Dictionary containing the loop result """ try: while self.is_running and self.action_count < self.max_actions_per_session: # Check if we should continue if not await self._should_continue_conversation(): break # Make a decision about the next action decision = await self._make_conversation_decision() if not decision: self.consecutive_silence_count += 1 if self.consecutive_silence_count >= self.max_consecutive_silence: self.logger.warning("Too many consecutive silent decisions, ending conversation") await self.stop_conversation("excessive_silence") break continue # Execute the decision result = await self._execute_decision(decision) # Update reasoning history with execution result reasoning_id = decision.get('reasoning_id') await self._update_reasoning_execution(reasoning_id, result) if result.get("error"): self.logger.error(f"Error executing decision: {result['error']}") continue # Reset silence count on successful action self.consecutive_silence_count = 0 self.action_count += 1 self.last_action_time = datetime.now(timezone.utc) # GPT-5 fix: Yield to eventlet hub after each action to flush WebSocket frames await self._yield_to_eventlet() # Wait before next action await self._wait_between_actions() # Check for session end conditions if decision.get("action") == "end_session": await self.stop_conversation("natural_completion") break # Handle loop exit conditions if self.action_count >= self.max_actions_per_session: await self.stop_conversation("action_limit_reached") return { "message": "Conversation loop completed", "action_count": self.action_count, "state": self.conversation_state } except Exception as e: # Handle quota exhaustion — pause gracefully instead of erroring try: from app.models.quota import QuotaExceededError if isinstance(e, QuotaExceededError): self.logger.warning( f"Quota exceeded for focus group {self.focus_group_id}: {e}" ) self.is_running = False self.conversation_state = "paused_quota" await FocusGroup.update(self.focus_group_id, {"status": "paused_quota"}) try: from app.websocket_manager_async import websocket_manager await websocket_manager.emit_to_focus_group( self.focus_group_id, "quota_exceeded", { "scope": e.scope, "limit_usd": e.limit_usd, "used_usd": e.used_usd, "focus_group_id": self.focus_group_id, }, ) except Exception as _we: self.logger.warning(f"Could not emit quota_exceeded WS event: {_we}") return {"error": "quota_exceeded", "scope": e.scope} except ImportError: pass self.logger.error(f"Error in conversation loop: {str(e)}") self.conversation_state = "error" await self.stop_conversation("error") return {"error": f"Conversation loop error: {str(e)}"} async def _should_continue_conversation(self) -> bool: """Check if the conversation should continue based on various conditions.""" try: # Check focus group status (using async model) focus_group = await FocusGroup.find_by_id(self.focus_group_id) if not focus_group: return False current_status = focus_group.get('status', '') # Check if autonomous mode should stop - any status other than 'ai_mode' means stop if current_status != 'ai_mode': self.is_running = False self.logger.info(f"Autonomous conversation stopping due to status change: {current_status}") return False # Check if discussion guide is completed if await self._is_discussion_guide_completed(focus_group): await self.stop_conversation("discussion_guide_completed") return False # Note: Time limits are now informational only and don't stop conversations # Users can see elapsed time in the UI but it won't auto-stop the session return True except Exception as e: self.logger.error(f"Error checking conversation continuation: {str(e)}") return False async def _is_discussion_guide_completed(self, focus_group: Dict[str, Any]) -> bool: """ Check if the discussion guide has been completed. Args: focus_group: The focus group data Returns: True if the discussion guide is completed, False otherwise """ try: discussion_guide = focus_group.get('discussionGuide', {}) # Handle legacy markdown format - never auto-complete for legacy guides if isinstance(discussion_guide, str): return False # If no structured guide, don't auto-complete if not discussion_guide or 'sections' not in discussion_guide: return False # Get current moderator position moderator_position = focus_group.get('moderator_position', { 'section_index': 0, 'item_index': 0, 'item_type': 'activity' }) sections = discussion_guide['sections'] section_index = moderator_position.get('section_index', 0) item_index = moderator_position.get('item_index', 0) item_type = moderator_position.get('item_type', 'activity') # Check if we're past the last section if section_index >= len(sections): return True # Check if we're in the last section and past all items if section_index == len(sections) - 1: last_section = sections[section_index] # Get the total number of items in the last section activities_count = len(last_section.get('activities', [])) questions_count = len(last_section.get('questions', [])) # If we're past all activities and questions, guide is complete if item_type == 'activity' and item_index >= activities_count: # Check if there are questions to move to if questions_count > 0: return False # Still have questions to cover else: return True # No questions, guide complete elif item_type == 'question' and item_index >= questions_count: return True # Past all questions in last section return False except Exception as e: self.logger.error(f"Error checking discussion guide completion: {str(e)}") return False async def _make_conversation_decision(self) -> Optional[Dict[str, Any]]: """ Make a decision about the next conversation action using the LLM. Returns: Dictionary containing the decision (with reasoning_id added), or None if no decision could be made """ try: decision = await ConversationDecisionService.decide_next_action( self.focus_group_id, temperature=0.7, mode='ai' ) self.logger.info(f"LLM Decision: {decision['action']} - {decision['reasoning']}") # Store reasoning in history for UI display and get the database ID reasoning_id = await self._store_reasoning(decision) # Add the reasoning_id to the decision for later use decision['reasoning_id'] = reasoning_id return decision except ConversationDecisionError as e: self.logger.error(f"Error making conversation decision: {str(e)}") return None except Exception as e: self.logger.error(f"Unexpected error in decision making: {str(e)}") return None async def _store_reasoning(self, decision: Dict[str, Any]) -> Optional[str]: """ Store reasoning from AI decision for UI display. Args: decision: The decision dictionary from ConversationDecisionService Returns: The ID of the stored reasoning entry, or None if failed """ try: reasoning_entry = { 'timestamp': datetime.now(timezone.utc).isoformat(), 'action': decision.get('action', 'unknown'), 'reasoning': decision.get('reasoning', 'No reasoning provided'), 'details': decision.get('details', {}), 'execution_status': 'pending', 'execution_result': None } # Store to database for persistence reasoning_id = await FocusGroup.add_reasoning_entry(self.focus_group_id, reasoning_entry) # Also keep in memory for quick access during active session reasoning_entry['_id'] = reasoning_id # Add the database ID self.reasoning_history.insert(0, reasoning_entry) # Maintain max history size in memory if len(self.reasoning_history) > self.max_reasoning_history: self.reasoning_history = self.reasoning_history[:self.max_reasoning_history] return reasoning_id except Exception as e: self.logger.error(f"Error storing reasoning: {str(e)}") return None async def _update_reasoning_execution(self, reasoning_id: Optional[str], execution_result: Dict[str, Any]) -> None: """ Update the reasoning entry with execution results. Args: reasoning_id: The ID of the reasoning entry to update execution_result: The result from executing the decision """ try: # Update the database record if reasoning_id: await FocusGroup.update_reasoning_execution(self.focus_group_id, reasoning_id, execution_result) # Also update in memory for quick access during active session if self.reasoning_history: latest_entry = self.reasoning_history[0] latest_entry['execution_status'] = 'success' if not execution_result.get('error') else 'error' latest_entry['execution_result'] = execution_result except Exception as e: self.logger.error(f"Error updating reasoning execution: {str(e)}") async def _execute_decision(self, decision: Dict[str, Any]) -> Dict[str, Any]: """ Execute a conversation decision. Args: decision: The decision to execute Returns: Dictionary containing the execution result """ try: action = decision["action"] details = decision["details"] self.logger.info(f"Executing decision: {action}") self.logger.debug(f"Decision details: {details}") # For moderator_speak actions, update position FIRST so creative review detection works correctly if action == "moderator_speak": # Update position before creating message so current position is correct for image detection self.logger.debug(f"Updating discussion guide position BEFORE moderator_speak execution") await self._update_discussion_guide_position(decision) result = await self._execute_moderator_speak(details) elif action == "participant_respond": result = await self._execute_participant_respond(details) # Update position after other actions if not result.get("error"): self.logger.debug(f"Updating discussion guide position after successful {action}") await self._update_discussion_guide_position(decision) else: self.logger.warning(f"Skipping position update due to action error: {result.get('error')}") elif action == "participant_interaction": result = await self._execute_participant_interaction(details) # Update position after other actions if not result.get("error"): self.logger.debug(f"Updating discussion guide position after successful {action}") await self._update_discussion_guide_position(decision) else: self.logger.warning(f"Skipping position update due to action error: {result.get('error')}") elif action == "probe_trigger": result = await self._execute_probe_trigger(details) # Update position after other actions if not result.get("error"): self.logger.debug(f"Updating discussion guide position after successful {action}") await self._update_discussion_guide_position(decision) else: self.logger.warning(f"Skipping position update due to action error: {result.get('error')}") elif action == "end_session": result = await self._execute_end_session(details) # Update position after other actions if not result.get("error"): self.logger.debug(f"Updating discussion guide position after successful {action}") await self._update_discussion_guide_position(decision) else: self.logger.warning(f"Skipping position update due to action error: {result.get('error')}") else: error_msg = f"Unknown action type: {action}" self.logger.error(error_msg) return {"error": error_msg} self.logger.debug(f"Action {action} execution result: {result}") return result except Exception as e: error_msg = f"Decision execution error: {str(e)}" self.logger.error(f"❌ Error executing decision: {error_msg}") import traceback self.logger.error(f"❌ Full traceback: {traceback.format_exc()}") return {"error": error_msg} async def _execute_moderator_speak(self, details: Dict[str, Any]) -> Dict[str, Any]: """Execute moderator speak action.""" try: content = details["content"] message_type = details.get("message_type", "question") # Add moderator message message_id = await self._add_moderator_message(content, message_type) return { "message": "Moderator message added", "message_id": message_id, "content": content } except Exception as e: return {"error": f"Error in moderator speak: {str(e)}"} async def _execute_participant_respond(self, details: Dict[str, Any]) -> Dict[str, Any]: """Execute participant respond action.""" try: self.is_generating = True # Mark as generating participant_id = details["participant_id"] topic_context = details["topic_context"] # Generate participant response result = await self._generate_participant_response(participant_id, topic_context) return result except Exception as e: error_msg = f"Error in participant respond: {str(e)}" self.logger.error(f"❌ {error_msg}") import traceback self.logger.error(f"❌ Full traceback: {traceback.format_exc()}") return {"error": error_msg} finally: self.is_generating = False # Always clear generating state async def _execute_participant_interaction(self, details: Dict[str, Any]) -> Dict[str, Any]: """Execute participant interaction action.""" try: self.is_generating = True # Mark as generating participant_ids = details["participant_ids"] moderator_prompt = details["moderator_prompt"] # Add moderator prompt for interaction await self._add_moderator_message(moderator_prompt, "question") # Generate responses from both participants results = [] for participant_id in participant_ids: result = await self._generate_participant_response(participant_id, moderator_prompt) results.append(result) return { "message": "Participant interaction executed", "participant_responses": results } except Exception as e: return {"error": f"Error in participant interaction: {str(e)}"} finally: self.is_generating = False # Always clear generating state async def _execute_probe_trigger(self, details: Dict[str, Any]) -> Dict[str, Any]: """Execute probe trigger action.""" try: probe_question = details["probe_question"] # Add probe question message_id = await self._add_moderator_message(probe_question, "probe") return { "message": "Probe trigger executed", "message_id": message_id, "probe_question": probe_question } except Exception as e: return {"error": f"Error in probe trigger: {str(e)}"} async def _execute_end_session(self, details: Dict[str, Any]) -> Dict[str, Any]: """Execute end session action.""" try: closing_message = details["closing_message"] completion_reason = details["completion_reason"] # Add closing message await self._add_moderator_message(closing_message, "system") # Advance position past current item so final question shows as completed try: advance_result = await AIModeratorService.advance_discussion(self.focus_group_id) if advance_result.get('error'): self.logger.info(f"Could not advance past final item when ending session: {advance_result['error']}") else: self.logger.info("Advanced moderator position past final item - session complete") except Exception as e: self.logger.info(f"Error advancing position when ending session: {str(e)}") # Stop the conversation await self.stop_conversation(completion_reason) return { "message": "Session ended", "completion_reason": completion_reason } except Exception as e: return {"error": f"Error ending session: {str(e)}"} async def _generate_participant_response(self, participant_id: str, topic: str) -> Dict[str, Any]: """Generate a response from a participant.""" try: # Get participant data persona = await Persona.find_by_id(participant_id) if not persona: error_msg = f"Participant {participant_id} not found" self.logger.error(error_msg) return {"error": error_msg} # Get focus group data focus_group = await FocusGroup.find_by_id(self.focus_group_id) if not focus_group: error_msg = "Focus group not found" self.logger.error(error_msg) return {"error": error_msg} # Get discussion guide discussion_guide = focus_group.get('discussionGuide', '') # Get the LLM model and GPT-5 parameters for this focus group llm_model = focus_group.get('llm_model') reasoning_effort = focus_group.get('reasoning_effort', 'medium') verbosity = focus_group.get('verbosity', 'medium') self.logger.info(f"🤖 Autonomous conversation using model: {llm_model or 'default (gemini-3-pro-preview)'} for focus group {self.focus_group_id}") # Get recent messages messages = await FocusGroup.get_messages(self.focus_group_id) recent_messages = messages[-20:] if len(messages) > 20 else messages # Generate response with timeout to prevent infinite hang try: response_text = await asyncio.wait_for( generate_persona_response( persona=persona, current_topic=topic, previous_messages=recent_messages, temperature=0.7, focus_group_id=self.focus_group_id, llm_model=llm_model, reasoning_effort=reasoning_effort, verbosity=verbosity ), timeout=self.response_timeout ) except asyncio.TimeoutError: error_msg = f"Participant response generation timed out after {self.response_timeout}s" self.logger.error(f"⏱️ {error_msg}") return {"error": error_msg} except Exception as e: self.logger.error(f"Error in generate_persona_response: {str(e)}") import traceback self.logger.error(f"❌ Full traceback: {traceback.format_exc()}") raise # Save message message_data = { "text": response_text, "type": "response", "senderId": participant_id } message_id = await FocusGroup.add_message(self.focus_group_id, message_data) # GPT-5 fix: Yield after database write to flush WebSocket events await self._yield_to_eventlet() if not message_id: error_msg = "Failed to save message to database - no message ID returned" self.logger.error(error_msg) return {"error": error_msg} success_result = { "message": "Participant response generated", "participant_id": participant_id, "response": response_text, "message_id": message_id } return success_result except FocusGroupResponseError as e: error_msg = f"Error generating participant response: {str(e)}" self.logger.error(f"❌ FocusGroupResponseError: {error_msg}") return {"error": error_msg} except Exception as e: error_msg = f"Unexpected error generating response: {str(e)}" self.logger.error(f"❌ Unexpected error: {error_msg}") import traceback self.logger.error(f"❌ Full traceback: {traceback.format_exc()}") return {"error": error_msg} async def _get_item_by_position_id(self, position_id: str) -> Optional[Dict[str, Any]]: """Get a discussion guide item by its position ID.""" try: focus_group = await FocusGroup.find_by_id(self.focus_group_id) if not focus_group: return None discussion_guide = focus_group.get('discussionGuide') if not discussion_guide or not isinstance(discussion_guide, dict): return None sections = discussion_guide.get('sections', []) # Search through all sections and subsections for the item for section in sections: # Check activities in main section for activity in section.get('activities', []): if activity.get('id') == position_id: return activity # Check questions in main section for question in section.get('questions', []): if question.get('id') == position_id: return question # Check subsections for subsection in section.get('subsections', []): # Check activities in subsection for activity in subsection.get('activities', []): if activity.get('id') == position_id: return activity # Check questions in subsection for question in subsection.get('questions', []): if question.get('id') == position_id: return question return None except Exception as e: self.logger.warning(f"Error getting item by position ID {position_id}: {e}") return None async def _add_moderator_message(self, content: str, message_type: str) -> Optional[str]: """Add a moderator message to the conversation.""" try: # Initialize image detection variables attached_assets = [] activates_visual_context = False display_reference = None # Check if current discussion guide item has image attachments try: from app.services.ai_moderator_service import AIModeratorService from app.services.focus_group_response_service import extract_asset_filename_from_content print(f"🔍 Checking current discussion guide item for image attachments") moderator_status = await AIModeratorService.get_moderator_status(self.focus_group_id) current_item = None if moderator_status and 'moderator_position' in moderator_status: focus_group = await FocusGroup.find_by_id(self.focus_group_id) if focus_group: discussion_guide = focus_group.get('discussionGuide') if discussion_guide and isinstance(discussion_guide, dict): sections = discussion_guide.get('sections', []) pos = moderator_status['moderator_position'] section_idx = pos.get('section_index', 0) item_idx = pos.get('item_index', 0) item_type = pos.get('item_type', 'activity') subsection_idx = pos.get('subsection_index') if section_idx < len(sections): section = sections[section_idx] # Get current item from subsection or section if subsection_idx is not None and section.get('subsections'): subsections = section['subsections'] if subsection_idx < len(subsections): subsection = subsections[subsection_idx] items = subsection.get('questions' if item_type == 'question' else 'activities', []) if item_idx < len(items): current_item = items[item_idx] else: # Section level items = section.get('questions' if item_type == 'question' else 'activities', []) if item_idx < len(items): current_item = items[item_idx] # Check if current item has visual asset metadata if current_item: print(f"🔍 Current item: {current_item.get('content', '')[:50]}...") metadata = current_item.get('metadata', {}) visual_asset = metadata.get('visual_asset') if visual_asset: # Found image in metadata - use it asset_filename = visual_asset.get('filename') display_reference = visual_asset.get('display_reference') print(f"🎨 Found image in current item metadata: {display_reference} -> {asset_filename}") attached_assets = [asset_filename] activates_visual_context = True # Generate AI description and enhance the content try: from app.services.image_description_service import ImageDescriptionService, ImageDescriptionError print(f"🎨 AI MODE: Generating description for {asset_filename}") description = await ImageDescriptionService.generate_description(self.focus_group_id, asset_filename) # Enhance the content with the description using display reference if available if display_reference: enhanced_content = ImageDescriptionService.enhance_creative_review_question_with_display_reference( content, display_reference, description ) else: # Fallback to old method for legacy content enhanced_content = ImageDescriptionService.enhance_creative_review_question( content, asset_filename, description ) # Update the content to use enhanced version content = enhanced_content print(f"✅ AI MODE: Enhanced moderator message with image description") except ImageDescriptionError as desc_error: print(f"⚠️ AI MODE: Failed to generate image description: {desc_error}") self.logger.warning(f"Failed to generate image description in autonomous mode: {desc_error}") # Continue with original content else: # No visual asset metadata, try legacy content parsing activity_content = current_item.get('content', '') asset_filename = extract_asset_filename_from_content(activity_content) if asset_filename: print(f"🔍 Legacy: Found asset filename in content: {asset_filename}") attached_assets = [asset_filename] activates_visual_context = True else: print(f"🔍 No image attachments found for current item") else: print(f"🔍 No current discussion guide item found") except Exception as e: self.logger.warning(f"Error checking for creative review activity: {e}") print(f"⚠️ Error checking creative review: {e}") print(f"🔍 FINAL RESULT: attached_assets={attached_assets}, activates_visual_context={activates_visual_context}") # Create visual asset metadata for frontend display visual_asset_metadata = None if activates_visual_context and attached_assets and len(attached_assets) > 0: # Create visual asset metadata that frontend expects visual_asset_metadata = { "filename": attached_assets[0], # Use first asset "displayReference": display_reference or attached_assets[0] # Use display reference or filename as fallback } # Create message data with visual context information message_data = { "text": content, "type": message_type, "senderId": "moderator", "attached_assets": attached_assets, "activates_visual_context": activates_visual_context, "visual_asset": visual_asset_metadata # Frontend needs this for image display } message_id = await FocusGroup.add_message(self.focus_group_id, message_data) # GPT-5 fix: Yield after database write to flush WebSocket events await self._yield_to_eventlet() # Visual context activation is handled automatically by FocusGroup.add_message() # when activates_visual_context=True and attached_assets are present if activates_visual_context and attached_assets: self.logger.info(f"✅ Added moderator message with visual context activation: {attached_assets}") print(f"✅ VISUAL CONTEXT ACTIVATED: {attached_assets}") return message_id except Exception as e: self.logger.error(f"Error adding moderator message: {str(e)}") return None async def _update_discussion_guide_position(self, decision: Dict[str, Any]) -> None: """Update moderator position based on LLM decision.""" try: # Check if LLM provided discussion guide position mapping position_id = decision.get('discussion_guide_position_id') action = decision.get('action', 'unknown') if position_id: # Validate that the position ID exists section_id, item_id = await self._validate_position_id(position_id) if section_id and item_id: # LLM specified valid position - set it precisely try: result = await AIModeratorService.set_moderator_position(self.focus_group_id, section_id, item_id) position_desc = f"position '{position_id}'" if result.get('error'): self.logger.warning(f"Failed to set LLM-specified position to {position_desc}: {result['error']}") self.logger.info("🔄 Continuing with current position - next LLM decision will try again") else: self.logger.debug(f"Successfully set moderator position to LLM-specified {position_desc} (from {action} action)") except Exception as e: self.logger.warning(f"Error setting LLM-specified position: {str(e)}") self.logger.info("🔄 Continuing with current position - next LLM decision will try again") else: # Invalid position ID self.logger.warning(f"🚫 LLM specified invalid position '{position_id}', continuing with current position") self.logger.info("🔄 Next LLM decision will try again with valid position mapping") else: # LLM didn't specify a position - continue with current position self.logger.info(f"🔄 LLM didn't specify position for {action} action, continuing with current position") self.logger.debug("Next LLM decision should include position mapping for better tracking") except Exception as e: self.logger.error(f"Error updating discussion guide position: {str(e)}") async def _validate_position_id(self, position_id: str) -> tuple[Optional[str], Optional[str]]: """Validate that the position ID exists and return the section_id and item_id it maps to.""" try: focus_group = await FocusGroup.find_by_id(self.focus_group_id) if not focus_group: return None, None discussion_guide = focus_group.get('discussionGuide', {}) # Handle legacy format if isinstance(discussion_guide, str): self.logger.debug(f"Legacy discussion guide format, cannot validate position ID: {position_id}") return None, None # Handle structured format if isinstance(discussion_guide, dict) and 'sections' in discussion_guide: sections = discussion_guide['sections'] # Search through all sections for the position ID for section in sections: section_id = section.get('id') # Check in top-level activities if section.get('activities'): for activity in section['activities']: if activity.get('id') == position_id: return section_id, position_id # Check in top-level questions if section.get('questions'): for question in section['questions']: if question.get('id') == position_id: return section_id, position_id # Check in subsections if section.get('subsections'): for subsection in section['subsections']: # Check subsection activities if subsection.get('activities'): for activity in subsection['activities']: if activity.get('id') == position_id: return section_id, position_id # Check subsection questions if subsection.get('questions'): for question in subsection['questions']: if question.get('id') == position_id: return section_id, position_id # Position ID not found - log available IDs for debugging available_positions = [] for section in sections: # Collect from top-level activities and questions if section.get('activities'): available_positions.extend([a.get('id', 'no-id') for a in section['activities']]) if section.get('questions'): available_positions.extend([q.get('id', 'no-id') for q in section['questions']]) # Collect from subsections if section.get('subsections'): for subsection in section['subsections']: if subsection.get('activities'): available_positions.extend([a.get('id', 'no-id') for a in subsection['activities']]) if subsection.get('questions'): available_positions.extend([q.get('id', 'no-id') for q in subsection['questions']]) self.logger.warning(f"Position ID '{position_id}' not found. Available positions: {available_positions}") return None, None self.logger.debug(f"No structured discussion guide found, cannot validate position ID: {position_id}") return None, None except Exception as e: self.logger.error(f"Error validating position ID '{position_id}': {str(e)}") return None, None async def _fallback_advance_position(self) -> None: """Fallback method to advance position sequentially.""" try: advance_result = await AIModeratorService.advance_discussion(self.focus_group_id) if advance_result.get('error'): self.logger.warning(f"Sequential advancement failed: {advance_result['error']}") else: self.logger.info(f"Sequential advancement successful: {advance_result.get('message', 'Success')}") except Exception as e: self.logger.warning(f"Failed to advance moderator position sequentially: {str(e)}") async def _yield_to_eventlet(self): """Yield control to allow other tasks to run and flush WebSocket frames.""" try: # Use asyncio sleep instead of socketio.sleep since we're in async context await asyncio.sleep(0) # Yield to other tasks except Exception as e: self.logger.warning(f"Could not yield to event loop: {e}") async def _wait_between_actions(self): """Wait an appropriate amount of time between actions.""" import random # Random delay between min and max delay = random.uniform(self.min_delay_between_actions, self.max_delay_between_actions) await asyncio.sleep(delay) async def get_conversation_status(self) -> Dict[str, Any]: """Get the current status of the autonomous conversation.""" try: # Check the actual database state to determine if autonomous mode is truly running focus_group = await FocusGroup.find_by_id(self.focus_group_id) if focus_group: db_status = focus_group.get('status', 'unknown') # Determine if autonomous mode is actually running based on database state is_ai_mode = db_status == 'ai_mode' # Update instance variables to match database state if is_ai_mode: self.is_running = True self.conversation_state = "running" else: self.is_running = False self.conversation_state = "idle" self.logger.debug(f"Status check - DB status: {db_status}, is_running: {self.is_running}") else: self.logger.warning(f"Focus group {self.focus_group_id} not found during status check") except Exception as e: self.logger.error(f"Error checking database status: {str(e)}") # Keep existing instance state if database check fails # Load reasoning history from database to ensure it persists across controller instances reasoning_history = await FocusGroup.get_reasoning_history(self.focus_group_id, self.max_reasoning_history) return { "focus_group_id": self.focus_group_id, "is_running": self.is_running, "conversation_state": self.conversation_state, "is_generating": self.is_generating, "action_count": self.action_count, "last_action_time": self.last_action_time.isoformat() if self.last_action_time else None, "consecutive_silence_count": self.consecutive_silence_count, "reasoning_history": reasoning_history # Now loads from database } async def _mark_all_questions_completed(self) -> None: """ Mark all questions and activities in the discussion guide as completed by setting the moderator position to indicate 100% completion. """ try: focus_group = await FocusGroup.find_by_id(self.focus_group_id) if not focus_group: self.logger.error("Focus group not found when marking all questions completed") return discussion_guide = focus_group.get('discussionGuide', {}) # Handle legacy format - no position tracking needed if isinstance(discussion_guide, str): self.logger.debug("Legacy discussion guide format, no position update needed") return # Handle structured format if not discussion_guide or 'sections' not in discussion_guide: self.logger.debug("No structured discussion guide found, no position update needed") return sections = discussion_guide['sections'] if not sections: self.logger.debug("No sections in discussion guide, no position update needed") return # Set moderator position to indicate all items are completed # Position it past the last section to show 100% completion last_section_index = len(sections) - 1 last_section = sections[last_section_index] # Calculate the position that would indicate all items are done # We set it past the last item in the last section total_activities = len(last_section.get('activities', [])) total_questions = len(last_section.get('questions', [])) # Position it past all items to indicate completion if total_questions > 0: # If there are questions, position past the last question completion_position = { 'section_index': last_section_index, 'item_index': total_questions, # Past the last question 'item_type': 'question' } elif total_activities > 0: # If there are only activities, position past the last activity completion_position = { 'section_index': last_section_index, 'item_index': total_activities, # Past the last activity 'item_type': 'activity' } else: # Empty section, position past the section completion_position = { 'section_index': last_section_index + 1, 'item_index': 0, 'item_type': 'activity' } # Update the focus group with the completion position await FocusGroup.update(self.focus_group_id, { 'moderator_position': completion_position }) self.logger.info(f"Marked all questions completed - set position to: {completion_position}") except Exception as e: self.logger.error(f"Error marking all questions completed: {str(e)}")