semblance-dev/backend/app/routes/focus_group_ai.py
Vadym Samoilenko bc4138f332 Final pieces: decorators on LLM routes, usage self-service, billing page, WS events
Backend:
- @active_required + @with_user_context applied to all LLM-invoking routes
  in personas.py, focus_group_ai.py, ai_personas.py
- backend/app/routes/usage.py: GET /api/usage/me (MTD summary by feature),
  GET /api/usage/focus-groups/<id> (owner or admin)
- Registered usage_bp in app/__init__.py
- llm_service._record_usage now emits usage_update WS event to focus group room

Frontend:
- useMyUsage + useFocusGroupUsage hooks
- MyUsage.tsx: personal billing dashboard (cost cards + per-feature table)
- /billing route (ProtectedRoute) + Billing nav link
- FocusGroupSession: quota_warning amber banner with Progress bar,
  quota_exceeded + quota_warning WS events wired via websocketServiceNew

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-24 18:43:13 +01:00

1312 lines
No EOL
54 KiB
Python
Executable file

"""
Focus Group AI Routes
These endpoints handle AI-assisted focus group functionality, including persona responses
and key theme generation.
"""
from quart import Blueprint, request, jsonify, current_app
from app.auth.quart_jwt import jwt_required, get_jwt_identity
from typing import Dict, List, Any
import time
import concurrent.futures
import asyncio
from app.services.focus_group_response_service import (
generate_persona_response,
generate_creative_review_response,
extract_asset_filename_from_content,
FocusGroupResponseError
)
from app.services.key_theme_service import (
KeyThemeService,
KeyThemeServiceError
)
from app.services.task_manager import CancellableTask
from app.services.ai_moderator_service import AIModeratorService
from app.services.autonomous_conversation_controller import AutonomousConversationController
from app.services.conversation_decision_service import ConversationDecisionService, ConversationDecisionError
from app.services.conversation_state_manager import ConversationStateManager
from app.services.ai_runner_service import get_ai_runner
from app.services.image_description_service import ImageDescriptionService, ImageDescriptionError
from app.models.focus_group import FocusGroup
from app.models.persona import Persona
from app.utils.rate_limiter import rate_limit
from app.utils import active_required, with_user_context
# Create the blueprint
focus_group_ai_bp = Blueprint('focus_group_ai', __name__)
def _user_key():
return f"{request.endpoint}:{get_jwt_identity()}"
@focus_group_ai_bp.route('/generate-response', methods=['POST'])
@jwt_required()
@active_required
@with_user_context
@rate_limit(max_requests=10, window_seconds=60, key_func=_user_key)
async def generate_ai_response():
"""
Generate a response from a persona in a focus group discussion.
Request body:
{
"focus_group_id": "focus_group_id",
"persona_id": "persona_id",
"current_topic": "What do you think about this product?",
"temperature": 0.7, # Optional
"message_limit": 10 # Optional, number of previous messages to include
}
Returns immediately with 202 + task_id; result delivered via WebSocket task_completed event.
"""
try:
data = (await request.get_json()) or {}
# Validate required fields
required_fields = ['focus_group_id', 'persona_id', 'current_topic']
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return jsonify({
"error": "Missing required fields",
"missing": missing_fields
}), 400
focus_group_id = data['focus_group_id']
persona_id = data['persona_id']
current_topic = data['current_topic']
temperature = data.get('temperature', 0.7)
user_id = get_jwt_identity()
# Validate focus group exists
focus_group = await FocusGroup.find_by_id(focus_group_id)
if not focus_group:
return jsonify({"error": "Focus group not found"}), 404
# Validate persona exists
persona = await Persona.find_by_id(persona_id)
if not persona:
return jsonify({"error": "Persona not found"}), 404
# Validate persona is part of the focus group
if 'participants' not in focus_group or persona_id not in focus_group['participants']:
return jsonify({
"error": "Persona is not a participant in this focus group"
}), 400
from app.services.task_manager import get_task_manager
from app.websocket_manager_async import get_async_websocket_manager
task_manager = get_task_manager()
task_id = task_manager.generate_task_id()
websocket_manager = get_async_websocket_manager()
app = current_app._get_current_object()
bg_task = asyncio.create_task(
_run_generate_response_bg(app, task_id, user_id, focus_group_id, persona_id, current_topic, temperature)
)
await task_manager.register_task(
bg_task, 'generate_response', user_id,
{'focus_group_id': focus_group_id, 'persona_id': persona_id},
task_id=task_id
)
await websocket_manager.emit_to_user(user_id, 'task_started', {
'task_id': task_id,
'task_type': 'generate_response',
'message': f'Started generating response for persona {persona_id}'
})
return jsonify({'task_id': task_id, 'message': 'Response generation started'}), 202
except Exception as e:
current_app.logger.error(f"Unexpected error starting generate_response: {str(e)}")
return jsonify({"error": "Internal server error", "message": str(e)}), 500
async def _run_generate_response_bg(app, task_id, user_id, focus_group_id, persona_id, current_topic, temperature):
from app.websocket_manager_async import get_async_websocket_manager
websocket_manager = get_async_websocket_manager()
async with app.app_context():
try:
# Get focus group and persona
focus_group = await FocusGroup.find_by_id(focus_group_id)
persona = await Persona.find_by_id(persona_id)
llm_model = focus_group.get('llm_model')
reasoning_effort = focus_group.get('reasoning_effort', 'low')
verbosity = focus_group.get('verbosity', 'medium')
# Get previous messages
messages = await FocusGroup.get_messages(focus_group_id)
recent_messages = messages
# Check for active visual context
from app.services.conversation_context_service import ConversationContextService
has_visual_context = await ConversationContextService.has_visual_context(focus_group_id)
# Build multimodal conversation context
try:
multimodal_context = await ConversationContextService.build_multimodal_context(focus_group_id, recent_messages)
except Exception as e:
app.logger.warning(f"Error building multimodal context: {e}")
multimodal_context = {
"has_visual_context": False,
"conversation_context": [],
"text_context": "",
"visual_timeline": {},
"total_messages": len(recent_messages),
"total_visual_assets": 0
}
# Generate response
if has_visual_context:
from app.services.llm_service import LLMService
from app.utils.prompt_loader import load_prompt
persona_details = _format_persona_details_for_context(persona)
prompt = load_prompt('focus-group-response', {
'persona_details': persona_details,
'current_topic': current_topic,
'previous_messages': multimodal_context['text_context'],
'length_instructions': _get_response_length_instructions(persona, recent_messages, current_topic),
'is_creative_review': True,
'creative_instructions': """
VISUAL CONTEXT AVAILABLE:
You are participating in a focus group discussion where visual materials have been shown. The images in your conversation context are part of the ongoing discussion. Please provide your authentic reaction and feedback based on your personality, background, and preferences, taking into account both the conversation history and any visual materials you can see.
Consider:
- Your first impression of any visuals shown
- How the visual materials relate to the discussion topic
- Any specific elements that catch your attention
- How the visuals might appeal to people like you
- Any suggestions or concerns you might have
- The ongoing conversation context
Be genuine and specific in your feedback, drawing on your personal experiences and preferences.
"""
})
response_text = await LLMService.generate_contextual_response(
prompt=prompt,
conversation_context=multimodal_context['conversation_context'],
temperature=temperature,
model_name=llm_model,
reasoning_effort=reasoning_effort if llm_model in ('gpt-5', 'gpt-5.4-2026-03-05') else None,
verbosity=verbosity if llm_model in ('gpt-5', 'gpt-5.4-2026-03-05') else None
)
else:
response_text = await generate_persona_response(
persona=persona,
current_topic=current_topic,
previous_messages=recent_messages,
temperature=temperature,
focus_group_id=focus_group_id,
llm_model=llm_model,
reasoning_effort=reasoning_effort,
verbosity=verbosity
)
# Save message to DB
message_data = {
"text": response_text,
"type": "response",
"senderId": persona_id
}
message_id = FocusGroup.add_message(focus_group_id, message_data)
if message_id:
from datetime import datetime
from app.services.task_manager import store_task_result
await store_task_result(task_id, 'completed', result={
'response': response_text,
'message_id': message_id,
'persona_id': persona_id,
'focus_group_id': focus_group_id,
'timestamp': datetime.now().isoformat()
})
await websocket_manager.emit_to_user(user_id, 'task_completed', {
'task_id': task_id,
'task_type': 'generate_response'
})
else:
from app.services.task_manager import store_task_result
await store_task_result(task_id, 'failed', error='Failed to save message to database')
await websocket_manager.emit_to_user(user_id, 'task_failed', {
'task_id': task_id,
'task_type': 'generate_response',
'message': 'Failed to save message to database'
})
except asyncio.CancelledError:
from app.services.task_manager import store_task_result
await store_task_result(task_id, 'cancelled')
await websocket_manager.emit_to_user(user_id, 'task_cancelled', {
'task_id': task_id,
'task_type': 'generate_response'
})
except Exception as e:
app.logger.error(f"Error in _run_generate_response_bg: {str(e)}")
from app.services.task_manager import store_task_result
await store_task_result(task_id, 'failed', error=str(e))
await websocket_manager.emit_to_user(user_id, 'task_failed', {
'task_id': task_id,
'task_type': 'generate_response',
'message': str(e)
})
@focus_group_ai_bp.route('/generate-key-themes', methods=['POST'])
@jwt_required()
@active_required
@with_user_context
@rate_limit(max_requests=10, window_seconds=60, key_func=_user_key)
async def generate_key_themes():
"""
Generate key themes from a focus group discussion.
Request body:
{
"focus_group_id": "focus_group_id",
"temperature": 0.7 # Optional
}
Returns immediately with 202 + task_id; result delivered via WebSocket task_completed event.
"""
try:
data = (await request.get_json()) or {}
# Validate required fields
if 'focus_group_id' not in data:
return jsonify({
"error": "Missing required field: focus_group_id"
}), 400
focus_group_id = data['focus_group_id']
temperature = data.get('temperature', 0.7)
user_id = get_jwt_identity()
# Validate focus group exists
focus_group = await FocusGroup.find_by_id(focus_group_id)
if not focus_group:
return jsonify({"error": "Focus group not found"}), 404
from app.services.task_manager import get_task_manager
from app.websocket_manager_async import get_async_websocket_manager
task_manager = get_task_manager()
task_id = task_manager.generate_task_id()
websocket_manager = get_async_websocket_manager()
app = current_app._get_current_object()
bg_task = asyncio.create_task(
_run_key_themes_bg(app, task_id, user_id, focus_group_id, temperature)
)
await task_manager.register_task(
bg_task, 'key_themes_generation', user_id,
{'focus_group_id': focus_group_id},
task_id=task_id
)
await websocket_manager.emit_to_user(user_id, 'task_started', {
'task_id': task_id,
'task_type': 'key_themes_generation',
'message': f'Started generating key themes for focus group {focus_group_id}'
})
return jsonify({'task_id': task_id, 'message': 'Key themes generation started'}), 202
except Exception as e:
current_app.logger.error(f"Unexpected error starting generate_key_themes: {str(e)}")
return jsonify({"error": "Internal server error", "message": str(e)}), 500
async def _run_key_themes_bg(app, task_id, user_id, focus_group_id, temperature):
from app.websocket_manager_async import get_async_websocket_manager
websocket_manager = get_async_websocket_manager()
async with app.app_context():
try:
focus_group = await FocusGroup.find_by_id(focus_group_id)
llm_model = focus_group.get('llm_model')
themes = await KeyThemeService.generate_key_themes(
focus_group_id=focus_group_id,
temperature=temperature,
llm_model=llm_model
)
app.logger.info(f"Generated {len(themes)} key themes for focus group {focus_group_id}")
theme_ids = await FocusGroup.add_generated_themes(focus_group_id, themes)
if not theme_ids:
from app.services.task_manager import store_task_result
await store_task_result(task_id, 'failed', error='The themes were generated but could not be saved to the database')
await websocket_manager.emit_to_user(user_id, 'task_failed', {
'task_id': task_id,
'task_type': 'key_themes_generation',
'message': 'The themes were generated but could not be saved to the database'
})
return
formatted_themes = []
for i, theme in enumerate(themes):
if i < len(theme_ids):
formatted_themes.append({
"id": theme_ids[i],
"title": theme["title"],
"description": theme["description"],
"quotes": theme.get("quotes", []),
"source": "generated"
})
from app.services.task_manager import store_task_result
await store_task_result(task_id, 'completed', result={
'themes': formatted_themes,
'focus_group_id': focus_group_id
})
await websocket_manager.emit_to_user(user_id, 'task_completed', {
'task_id': task_id,
'task_type': 'key_themes_generation'
})
except asyncio.CancelledError:
from app.services.task_manager import store_task_result
await store_task_result(task_id, 'cancelled')
await websocket_manager.emit_to_user(user_id, 'task_cancelled', {
'task_id': task_id,
'task_type': 'key_themes_generation'
})
except Exception as e:
app.logger.error(f"Error in _run_key_themes_bg: {str(e)}")
from app.services.task_manager import store_task_result
await store_task_result(task_id, 'failed', error=str(e))
await websocket_manager.emit_to_user(user_id, 'task_failed', {
'task_id': task_id,
'task_type': 'key_themes_generation',
'message': str(e)
})
@focus_group_ai_bp.route('/key-themes/<focus_group_id>', methods=['GET'])
@jwt_required()
@active_required
async def get_key_themes(focus_group_id):
"""
Get all generated key themes for a focus group.
Returns:
A JSON object containing the generated key themes
"""
try:
# Validate focus group exists
focus_group = await FocusGroup.find_by_id(focus_group_id)
if not focus_group:
return jsonify({"error": "Focus group not found"}), 404
# Get themes
themes = await FocusGroup.get_generated_themes(focus_group_id)
# Format themes for response
formatted_themes = []
for theme in themes:
formatted_themes.append({
"id": theme.get("id", ""),
"title": theme.get("title", ""),
"description": theme.get("description", ""),
"quotes": theme.get("quotes", []),
"source": theme.get("source", "generated")
})
return jsonify({
"message": "Key themes retrieved successfully",
"themes": formatted_themes,
"focus_group_id": focus_group_id
}), 200
except Exception as e:
current_app.logger.error(f"Error getting key themes: {str(e)}")
return jsonify({
"error": "Failed to get key themes",
"message": str(e)
}), 500
@focus_group_ai_bp.route('/key-themes/<focus_group_id>/<theme_id>', methods=['DELETE'])
@jwt_required()
@active_required
async def delete_key_theme(focus_group_id, theme_id):
"""
Delete a key theme from a focus group.
Returns:
A JSON object confirming the deletion
"""
try:
# Validate focus group exists
focus_group = await FocusGroup.find_by_id(focus_group_id)
if not focus_group:
return jsonify({"error": "Focus group not found"}), 404
# Delete theme
success = await FocusGroup.delete_generated_theme(focus_group_id, theme_id)
if not success:
return jsonify({
"error": "Failed to delete theme",
"message": "Theme not found or could not be deleted"
}), 404
return jsonify({
"message": "Theme deleted successfully",
"theme_id": theme_id,
"focus_group_id": focus_group_id
}), 200
except Exception as e:
current_app.logger.error(f"Error deleting key theme: {str(e)}")
return jsonify({
"error": "Failed to delete key theme",
"message": str(e)
}), 500
@focus_group_ai_bp.route('/moderator/status/<focus_group_id>', methods=['GET'])
@jwt_required()
@active_required
async def get_moderator_status(focus_group_id):
"""
Get the current moderator status for a focus group.
Returns:
A JSON object containing the current moderator status
"""
try:
status = await AIModeratorService.get_moderator_status(focus_group_id)
if "error" in status:
return jsonify(status), 404 if "not found" in status["error"] else 500
return jsonify({
"message": "Moderator status retrieved successfully",
"status": status,
"focus_group_id": focus_group_id
}), 200
except Exception as e:
current_app.logger.error(f"Error getting moderator status: {str(e)}")
return jsonify({
"error": "Failed to get moderator status",
"message": str(e)
}), 500
@focus_group_ai_bp.route('/moderator/advance/<focus_group_id>', methods=['POST'])
@jwt_required()
@active_required
@with_user_context
@rate_limit(max_requests=10, window_seconds=60, key_func=_user_key)
async def advance_moderator_discussion(focus_group_id):
"""
Advance the moderator to the next item in the discussion guide.
For manual mode, also use AI to decide which participant should respond next.
Request body (optional):
{
"generate_participant_response": true, # Default: true for manual mode, false for autonomous mode
"temperature": 0.7
}
Returns:
A JSON object containing the moderator response, updated position, and optionally participant response
"""
try:
data = (await request.get_json()) or {}
temperature = data.get('temperature', 0.7)
# Check if focus group is in autonomous mode
focus_group = await FocusGroup.find_by_id(focus_group_id)
if not focus_group:
return jsonify({"error": "Focus group not found"}), 404
# Get the LLM model for this focus group
llm_model = focus_group.get('llm_model')
is_autonomous_mode = focus_group.get('status', '').startswith('autonomous_')
# Default: generate participant response for manual mode, not for autonomous mode
generate_participant_response = data.get('generate_participant_response', not is_autonomous_mode)
result = await AIModeratorService.advance_discussion(focus_group_id)
if "error" in result:
return jsonify(result), 404 if "not found" in result["error"] else 500
# If we have a moderator response, save it as a message
if result.get("moderator_response") and not result.get("completed", False):
# Check if this is a creative review activity that needs visual asset activation
attached_assets = []
activates_visual_context = False
current_item = result.get("current_item")
if current_item:
# Try to get asset info from metadata (new metadata-driven approach)
asset_filename = None
display_reference = None
metadata = current_item.get('metadata', {})
visual_asset = metadata.get('visual_asset')
if visual_asset:
# Use metadata (preferred method)
asset_filename = visual_asset.get('filename')
display_reference = visual_asset.get('display_reference')
print(f"🎨 Found asset metadata: {display_reference} -> {asset_filename}")
else:
# Fallback to content parsing (legacy support)
activity_content = current_item.get("content", "")
asset_filename = extract_asset_filename_from_content(activity_content)
print(f"🎨 Legacy asset filename extraction: {asset_filename}")
if asset_filename:
print(f"🎨 ADVANCE DISCUSSION: Item with image detected (type: {current_item.get('type')})")
print(f"🎨 Asset: {display_reference or 'legacy'} -> {asset_filename}")
if asset_filename:
attached_assets = [asset_filename]
# Generate AI description and enhance the moderator response
try:
print(f"🎨 AI MODE: Generating description for {asset_filename}")
description = await ImageDescriptionService.generate_description(focus_group_id, asset_filename)
# Enhance the moderator response with the description using display reference if available
if display_reference:
enhanced_response = ImageDescriptionService.enhance_creative_review_question_with_display_reference(
result["moderator_response"], display_reference, description
)
else:
# Fallback to old method for legacy content
enhanced_response = ImageDescriptionService.enhance_creative_review_question(
result["moderator_response"], asset_filename, description
)
# Update the result with enhanced response
result["moderator_response"] = enhanced_response
result["ai_description_generated"] = True
result["ai_description"] = description
print(f"✅ AI MODE: Enhanced moderator response with image description")
except ImageDescriptionError as desc_error:
print(f"⚠️ AI MODE: Failed to generate image description: {desc_error}")
result["ai_description_error"] = str(desc_error)
result["ai_description_generated"] = False
# Continue with original response
activates_visual_context = True
print(f"🎨 ADVANCE DISCUSSION: Will activate visual context for asset: {asset_filename}")
# Create visual asset metadata for frontend display
visual_asset_metadata = None
if activates_visual_context and attached_assets and len(attached_assets) > 0:
# Create visual asset metadata that frontend expects
visual_asset_metadata = {
"filename": attached_assets[0], # Use first asset
"displayReference": display_reference or attached_assets[0] # Use display reference or filename as fallback
}
message_data = {
"text": result["moderator_response"],
"type": "question",
"senderId": "moderator",
"attached_assets": attached_assets,
"activates_visual_context": activates_visual_context,
"visual_asset": visual_asset_metadata # Frontend needs this for image display
}
message_id = FocusGroup.add_message(focus_group_id, message_data)
# Visual context activation is handled automatically by FocusGroup.add_message()
# when activates_visual_context=True and attached_assets are present
if activates_visual_context and attached_assets:
print(f"✅ ADVANCE DISCUSSION: Visual context activated for {attached_assets}")
if message_id:
result["message_id"] = message_id
else:
current_app.logger.warning("Failed to save moderator message to database")
# For manual mode, also generate AI participant response
if generate_participant_response and not result.get("completed", False):
try:
# Use AI to decide which participant should respond next
decision = await ConversationDecisionService.decide_next_action(focus_group_id, temperature, 'ai')
if decision.get('action') == 'participant_respond':
participant_id = decision['details']['participant_id']
topic_context = decision['details']['topic_context']
# Get participant data
persona = await Persona.find_by_id(participant_id)
if persona:
# Get recent messages for context
messages = await FocusGroup.get_messages(focus_group_id)
recent_messages = messages[-20:] if len(messages) > 20 else messages
# Generate participant response
response_text = await generate_persona_response(
persona=persona,
current_topic=topic_context,
previous_messages=recent_messages,
temperature=temperature,
focus_group_id=focus_group_id,
llm_model=llm_model
)
# Save participant message
participant_message_data = {
"text": response_text,
"type": "response",
"senderId": participant_id
}
participant_message_id = FocusGroup.add_message(focus_group_id, participant_message_data)
# Add participant response info to result
result["participant_response"] = {
"participant_id": participant_id,
"response": response_text,
"message_id": participant_message_id,
"ai_decision": decision
}
else:
current_app.logger.warning(f"Participant {participant_id} not found for response generation")
else:
# AI decided on a different action
result["ai_decision"] = decision
current_app.logger.info(f"AI suggested action '{decision.get('action')}' instead of participant response")
except (ConversationDecisionError, FocusGroupResponseError) as e:
current_app.logger.warning(f"Could not generate participant response: {str(e)}")
result["participant_response_error"] = str(e)
except Exception as e:
current_app.logger.error(f"Unexpected error generating participant response: {str(e)}")
result["participant_response_error"] = f"Unexpected error: {str(e)}"
return jsonify(result), 200
except Exception as e:
current_app.logger.error(f"Error advancing moderator discussion: {str(e)}")
return jsonify({
"error": "Failed to advance discussion",
"message": str(e)
}), 500
@focus_group_ai_bp.route('/moderator/position/<focus_group_id>', methods=['PUT'])
@jwt_required()
@active_required
async def set_moderator_position(focus_group_id):
"""
Set the moderator position to a specific section and item.
Request body:
{
"section_id": "section_id",
"item_id": "item_id" # Optional
}
Returns:
A JSON object confirming the position change
"""
try:
data = (await request.get_json()) or {}
# Validate required fields
if 'section_id' not in data:
return jsonify({
"error": "Missing required field: section_id"
}), 400
section_id = data['section_id']
item_id = data.get('item_id')
result = await AIModeratorService.set_moderator_position(focus_group_id, section_id, item_id)
if "error" in result:
return jsonify(result), 404 if "not found" in result["error"] else 400
# WebSocket event is already emitted by AIModeratorService.set_moderator_position()
# No need to emit duplicate event here
return jsonify(result), 200
except Exception as e:
current_app.logger.error(f"Error setting moderator position: {str(e)}")
return jsonify({
"error": "Failed to set moderator position",
"message": str(e)
}), 500
@focus_group_ai_bp.route('/autonomous/start/<focus_group_id>', methods=['POST'])
@jwt_required()
@active_required
@with_user_context
@rate_limit(max_requests=10, window_seconds=60, key_func=_user_key)
async def start_autonomous_conversation(focus_group_id):
"""
Start autonomous conversation for a focus group.
Request body:
{
"initial_prompt": "Optional initial prompt to start the conversation"
}
Returns:
A JSON object containing the start result
"""
try:
current_app.logger.info(f"=== START AUTONOMOUS CONVERSATION API called for focus group {focus_group_id} ===")
data = (await request.get_json()) or {}
initial_prompt = data.get('initial_prompt')
current_app.logger.info(f"Request data: {data}")
# Create autonomous conversation controller
current_app.logger.info("Creating AutonomousConversationController...")
controller = AutonomousConversationController(focus_group_id, current_app.logger)
current_app.logger.info("Controller created successfully")
current_app.logger.info("Preparing to submit conversation to AI Runner...")
# Get the AI Runner service and submit the conversation
ai_runner = get_ai_runner()
if not ai_runner.is_running:
current_app.logger.error("AI Runner service is not running")
return jsonify({"error": "AI Runner service is not available"}), 503
# Set status to ai_mode NOW (before submitting to background runner) so that
# the frontend's immediate status poll after this response reads the correct state.
# The controller will also set it, making this idempotent.
from datetime import datetime, timezone
try:
await FocusGroup.update(focus_group_id, {
'status': 'ai_mode',
'autonomous_started_at': datetime.now(timezone.utc)
})
current_app.logger.info("Set focus group status to ai_mode before AI runner submission")
except Exception as e:
current_app.logger.warning(f"Failed to pre-set ai_mode status: {e}")
# Submit the conversation to the AI Runner (non-blocking)
current_app.logger.info("Submitting conversation to AI Runner...")
try:
future = ai_runner.submit_conversation(
focus_group_id,
controller.start_autonomous_conversation(initial_prompt)
)
current_app.logger.info("Conversation submitted to AI Runner successfully")
except Exception as e:
current_app.logger.error(f"Failed to submit conversation to AI Runner: {e}")
return jsonify({"error": f"Failed to start AI conversation: {str(e)}"}), 500
# Log the AI mode start event
try:
user_id = get_jwt_identity() # Get user ID if available
mode_event_id = await FocusGroup.add_mode_event(focus_group_id, 'ai_mode_started', user_id)
current_app.logger.info(f"Logged AI mode start event: {mode_event_id}")
except Exception as e:
current_app.logger.warning(f"Failed to log AI mode start event: {e}")
# Return immediately with a success response
result = {
"message": "Autonomous conversation started",
"focus_group_id": focus_group_id,
"state": "starting",
"background": True,
"ai_runner_active": True
}
current_app.logger.info(f"Controller returned result: {result}")
if "error" in result:
current_app.logger.warning(f"Controller returned error: {result}")
return jsonify(result), 400
current_app.logger.info(f"Returning success response: {result}")
return jsonify(result), 200
except Exception as e:
current_app.logger.error(f"Exception in start_autonomous_conversation: {str(e)}")
current_app.logger.exception("Full exception traceback:")
return jsonify({
"error": "Failed to start autonomous conversation",
"message": str(e)
}), 500
@focus_group_ai_bp.route('/autonomous/stop/<focus_group_id>', methods=['POST'])
@jwt_required()
@active_required
async def stop_autonomous_conversation(focus_group_id):
"""
Stop autonomous conversation for a focus group.
Request body:
{
"reason": "Optional reason for stopping (default: manual_stop)"
}
Returns:
A JSON object containing the stop result
"""
try:
data = (await request.get_json()) or {}
reason = data.get('reason', 'manual_stop')
current_app.logger.info(f"=== STOP AUTONOMOUS CONVERSATION API called for focus group {focus_group_id} ===")
current_app.logger.info(f"Stop reason: {reason}")
# Create autonomous conversation controller
# Use AI Runner to stop the conversation
current_app.logger.info("Requesting AI Runner to stop conversation...")
ai_runner = get_ai_runner()
if ai_runner.is_running:
success = ai_runner.stop_conversation(focus_group_id)
if success:
current_app.logger.info(f"Successfully requested stop for focus group {focus_group_id}")
else:
current_app.logger.warning(f"No active conversation found for focus group {focus_group_id}")
else:
current_app.logger.warning("AI Runner is not running, cannot stop conversation")
# Update focus group status in database
from datetime import datetime, timezone
status = 'completed' if reason in ['completed', 'discussion_guide_completed', 'natural_completion'] else 'active'
await FocusGroup.update(focus_group_id, {
'status': status,
'autonomous_ended_at': datetime.now(timezone.utc),
'completion_reason': reason
})
current_app.logger.info(f"Signaled autonomous conversation to stop for focus group {focus_group_id}: {reason}")
# Add mode event for AI session concluded (regardless of reason)
user_id = get_jwt_identity() if get_jwt_identity() else None
mode_event_id = await FocusGroup.add_mode_event(focus_group_id, 'ai_session_concluded', user_id)
if mode_event_id:
current_app.logger.info(f"🎯 Added AI session concluded mode event: {mode_event_id}")
else:
current_app.logger.warning(f"Failed to add AI session concluded mode event for focus group {focus_group_id}")
# Return immediately with a success response like start_autonomous_conversation
result = {
"message": "Autonomous conversation stopping",
"focus_group_id": focus_group_id,
"state": "stopping",
"background": True
}
current_app.logger.info(f"Returning success response for stop request: {result}")
return jsonify(result), 200
except Exception as e:
current_app.logger.error(f"Error stopping autonomous conversation: {str(e)}")
return jsonify({
"error": "Failed to stop autonomous conversation",
"message": str(e)
}), 500
@focus_group_ai_bp.route('/autonomous/status/<focus_group_id>', methods=['GET'])
@jwt_required()
@active_required
async def get_autonomous_conversation_status(focus_group_id):
"""
Get the status of autonomous conversation for a focus group.
Returns:
A JSON object containing the conversation status
"""
try:
# Create autonomous conversation controller
controller = AutonomousConversationController(focus_group_id, current_app.logger)
# Get status
status = await controller.get_conversation_status()
return jsonify({
"message": "Autonomous conversation status retrieved",
"status": status,
"focus_group_id": focus_group_id
}), 200
except Exception as e:
current_app.logger.error(f"Error getting autonomous conversation status: {str(e)}")
return jsonify({
"error": "Failed to get autonomous conversation status",
"message": str(e)
}), 500
@focus_group_ai_bp.route('/conversation/state/<focus_group_id>', methods=['GET'])
@jwt_required()
@active_required
async def get_conversation_state(focus_group_id):
"""
Get the current conversation state for a focus group.
Returns:
A JSON object containing the conversation state
"""
try:
# Create conversation state manager
state_manager = ConversationStateManager(focus_group_id)
# Get state
state = await state_manager.get_conversation_state()
if "error" in state:
return jsonify(state), 404 if "not found" in state["error"] else 500
return jsonify({
"message": "Conversation state retrieved",
"state": state,
"focus_group_id": focus_group_id
}), 200
except Exception as e:
current_app.logger.error(f"Error getting conversation state: {str(e)}")
return jsonify({
"error": "Failed to get conversation state",
"message": str(e)
}), 500
@focus_group_ai_bp.route('/conversation/analytics/<focus_group_id>', methods=['GET'])
@jwt_required()
@active_required
async def get_conversation_analytics(focus_group_id):
"""
Get detailed conversation analytics for a focus group.
Returns:
A JSON object containing conversation analytics
"""
try:
# Create conversation state manager
state_manager = ConversationStateManager(focus_group_id)
# Get analytics
analytics = await state_manager.get_conversation_analytics()
if "error" in analytics:
return jsonify(analytics), 404 if "not found" in analytics["error"] else 500
return jsonify({
"message": "Conversation analytics retrieved",
"analytics": analytics,
"focus_group_id": focus_group_id
}), 200
except Exception as e:
current_app.logger.error(f"Error getting conversation analytics: {str(e)}")
return jsonify({
"error": "Failed to get conversation analytics",
"message": str(e)
}), 500
@focus_group_ai_bp.route('/conversation/decision/<focus_group_id>', methods=['POST'])
@jwt_required()
@active_required
@with_user_context
@rate_limit(max_requests=10, window_seconds=60, key_func=_user_key)
async def make_conversation_decision(focus_group_id):
"""
Make a conversation decision using the LLM decision engine.
Returns 202 immediately; poll GET /tasks/{task_id} for result.
"""
try:
user_id = get_jwt_identity()
data = (await request.get_json()) or {}
temperature = data.get('temperature', 0.7)
mode = data.get('mode', 'ai')
from app.services.task_manager import get_task_manager
task_manager = get_task_manager()
task_id = task_manager.generate_task_id()
app = current_app._get_current_object()
bg_task = asyncio.create_task(
_run_conversation_decision_bg(app, task_id, user_id, focus_group_id, temperature, mode)
)
await task_manager.register_task(
bg_task, 'conversation_decision', user_id,
{'focus_group_id': focus_group_id}, task_id=task_id
)
return jsonify({'task_id': task_id, 'message': 'Decision started'}), 202
except Exception as e:
current_app.logger.error(f"Error starting conversation decision: {str(e)}")
return jsonify({"error": "Failed to start conversation decision", "message": str(e)}), 500
async def _run_conversation_decision_bg(app, task_id, user_id, focus_group_id, temperature, mode):
from app.services.task_manager import store_task_result
from app.websocket_manager_async import get_async_websocket_manager
websocket_manager = get_async_websocket_manager()
async with app.app_context():
try:
decision = await ConversationDecisionService.decide_next_action(focus_group_id, temperature, mode)
await store_task_result(task_id, 'completed', result={'decision': decision, 'focus_group_id': focus_group_id})
await websocket_manager.emit_to_user(user_id, 'task_completed', {
'task_id': task_id, 'task_type': 'conversation_decision'
})
except Exception as e:
current_app.logger.error(f"conversation_decision bg error: {str(e)}")
await store_task_result(task_id, 'failed', error=str(e))
await websocket_manager.emit_to_user(user_id, 'task_failed', {
'task_id': task_id, 'task_type': 'conversation_decision', 'error': str(e)
})
@focus_group_ai_bp.route('/conversation/insights/<focus_group_id>', methods=['GET'])
@jwt_required()
@active_required
@with_user_context
async def get_conversation_insights(focus_group_id):
"""
Get LLM-generated insights about the conversation.
Returns:
A JSON object containing conversation insights
"""
try:
insights = await ConversationDecisionService.get_conversation_insights(focus_group_id)
return jsonify({
"message": "Conversation insights generated",
"insights": insights,
"focus_group_id": focus_group_id
}), 200
except Exception as e:
# Return empty insights on LLM error — dashboard handles missing data gracefully
current_app.logger.error(f"Error getting conversation insights: {str(e)}")
return jsonify({
"message": "Insights unavailable",
"insights": {},
"focus_group_id": focus_group_id
}), 200
@focus_group_ai_bp.route('/conversation/intervene/<focus_group_id>', methods=['POST'])
@jwt_required()
@active_required
@rate_limit(max_requests=10, window_seconds=60, key_func=_user_key)
async def manual_intervention(focus_group_id):
"""
Manually intervene in autonomous conversation.
Request body:
{
"action": "pause" | "resume" | "redirect" | "call_participant",
"message": "Optional message to add",
"participant_id": "Optional participant to call on"
}
Returns:
A JSON object containing the intervention result
"""
try:
data = (await request.get_json()) or {}
action = data.get('action', 'pause')
message = data.get('message')
participant_id = data.get('participant_id')
# Create state manager
state_manager = ConversationStateManager(focus_group_id)
result = None
if action == 'pause':
result = state_manager.pause_autonomous_mode()
elif action == 'resume':
result = state_manager.resume_autonomous_mode()
elif action == 'redirect' and message:
# Add moderator message
FocusGroup.add_message(focus_group_id, {
"text": message,
"type": "question",
"senderId": "moderator"
})
result = {"message": "Moderator message added"}
elif action == 'call_participant' and participant_id:
# Add moderator message calling on specific participant
persona = await Persona.find_by_id(participant_id)
if persona:
call_message = f"{persona.get('name', 'Participant')}, what are your thoughts on this?"
FocusGroup.add_message(focus_group_id, {
"text": call_message,
"type": "question",
"senderId": "moderator"
})
result = {"message": f"Called on {persona.get('name', 'participant')}"}
else:
result = {"error": "Participant not found"}
else:
result = {"error": "Invalid action or missing parameters"}
if result and "error" not in result:
return jsonify({
"message": "Manual intervention successful",
"action": action,
"result": result,
"focus_group_id": focus_group_id
}), 200
else:
return jsonify(result), 400
except Exception as e:
current_app.logger.error(f"Error in manual intervention: {str(e)}")
return jsonify({
"error": "Failed to perform manual intervention",
"message": str(e)
}), 500
@focus_group_ai_bp.route('/conversation/reasoning-history/<focus_group_id>', methods=['GET'])
@jwt_required()
@active_required
async def get_reasoning_history(focus_group_id):
"""
Get the AI reasoning history for an autonomous conversation.
Returns:
A JSON object containing the reasoning history
"""
try:
# Create autonomous conversation controller to get reasoning history
controller = AutonomousConversationController(focus_group_id)
status = await controller.get_conversation_status()
reasoning_history = status.get('reasoning_history', [])
return jsonify({
"message": "Reasoning history retrieved successfully",
"reasoning_history": reasoning_history,
"focus_group_id": focus_group_id,
"total_decisions": len(reasoning_history)
}), 200
except Exception as e:
current_app.logger.error(f"Error getting reasoning history: {str(e)}")
return jsonify({
"error": "Failed to get reasoning history",
"message": str(e)
}), 500
@focus_group_ai_bp.route('/moderator/end-session/<focus_group_id>', methods=['POST'])
@jwt_required()
@active_required
@with_user_context
@rate_limit(max_requests=10, window_seconds=60, key_func=_user_key)
async def end_focus_group_session(focus_group_id):
"""
End a focus group session with a concluding moderator statement.
Request body (optional):
{
"reason": "manual_stop" | "auto_complete" | "timeout" | "session_ended"
}
Returns:
A JSON object containing the concluding statement and session end confirmation
"""
try:
current_app.logger.info(f"=== END FOCUS GROUP SESSION API called for focus group {focus_group_id} ===")
data = (await request.get_json()) or {}
reason = data.get('reason', 'session_ended')
current_app.logger.info(f"Session ending reason: {reason}")
# Validate focus group exists
focus_group = await FocusGroup.find_by_id(focus_group_id)
if not focus_group:
current_app.logger.warning(f"Focus group not found: {focus_group_id}")
return jsonify({"error": "Focus group not found"}), 404
current_app.logger.info(f"Focus group found: {focus_group.get('name', 'Unnamed')}")
# End the session with concluding statement
result = await AIModeratorService.end_session_with_concluding_statement(focus_group_id, reason)
if "error" in result:
current_app.logger.error(f"Error ending session: {result['error']}")
return jsonify(result), 400 if "not found" in result["error"] else 500
current_app.logger.info(f"Session ended successfully with concluding statement")
current_app.logger.info(f"Concluding statement: {result.get('concluding_statement', '')[:100]}...")
return jsonify(result), 200
except Exception as e:
current_app.logger.error(f"Unexpected error ending focus group session: {str(e)}")
return jsonify({
"error": "Failed to end session",
"message": str(e)
}), 500
# Helper functions for the new contextual response system
def _format_persona_details_for_context(persona: Dict[str, Any]) -> str:
"""Format persona details for the prompt context."""
details = []
# Basic demographics
details.append(f"Name: {persona.get('name', 'Unknown')}")
details.append(f"Age: {persona.get('age', 'Unknown')}")
details.append(f"Gender: {persona.get('gender', 'Unknown')}")
details.append(f"Occupation: {persona.get('occupation', 'Unknown')}")
details.append(f"Education: {persona.get('education', 'Unknown')}")
details.append(f"Location: {persona.get('location', 'Unknown')}")
# Personality characteristics
details.append(f"Personality: {persona.get('personality', 'Not specified')}")
# OCEAN traits if available
ocean = persona.get('oceanTraits', {})
if ocean:
traits = []
if 'openness' in ocean:
traits.append(f"Openness: {ocean['openness']}/100")
if 'conscientiousness' in ocean:
traits.append(f"Conscientiousness: {ocean['conscientiousness']}/100")
if 'extraversion' in ocean:
traits.append(f"Extraversion: {ocean['extraversion']}/100")
if 'agreeableness' in ocean:
traits.append(f"Agreeableness: {ocean['agreeableness']}/100")
if 'neuroticism' in ocean:
traits.append(f"Neuroticism: {ocean['neuroticism']}/100")
if traits:
details.append("OCEAN Traits:")
details.extend([f"- {trait}" for trait in traits])
# Goals, frustrations, motivations
if 'goals' in persona and persona['goals']:
details.append("Goals:")
details.extend([f"- {goal}" for goal in persona['goals']])
if 'frustrations' in persona and persona['frustrations']:
details.append("Frustrations:")
details.extend([f"- {frustration}" for frustration in persona['frustrations']])
if 'motivations' in persona and persona['motivations']:
details.append("Motivations:")
details.extend([f"- {motivation}" for motivation in persona['motivations']])
# Think, feel, do
tfd = persona.get('thinkFeelDo', {})
if tfd:
if 'thinks' in tfd and tfd['thinks']:
details.append("Thinks:")
details.extend([f"- {thought}" for thought in tfd['thinks']])
if 'feels' in tfd and tfd['feels']:
details.append("Feels:")
details.extend([f"- {feeling}" for feeling in tfd['feels']])
if 'does' in tfd and tfd['does']:
details.append("Does:")
details.extend([f"- {action}" for action in tfd['does']])
# Join all details with line breaks
return "\n".join(details)
def _get_response_length_instructions(persona: Dict[str, Any], previous_messages: List[Dict[str, Any]], current_topic: str) -> str:
"""Get response length instructions based on persona and context."""
# Import the helper from focus_group_response_service
from app.services.focus_group_response_service import _determine_response_length_preference, _get_length_specific_instructions
length_preference = _determine_response_length_preference(persona, previous_messages, current_topic)
return _get_length_specific_instructions(length_preference)