cohorta/backend/app/services/focus_group_service.py
Vadym Samoilenko e01569c412
All checks were successful
Deploy to Production / deploy (push) Successful in 2m23s
feat: commit all app changes — billing API, new auth, design overhaul
Includes frontend redesign (Navigation, billingApi), backend updates
(auth routes, admin routes, LLM service refactor), MSAL removal,
and dependency updates.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 19:04:43 +01:00

468 lines
26 KiB
Python
Executable file

"""
Focus Group Service for Synthetic Society
This service provides functionality for generating discussion guides
and other focus group related operations using the LLM service.
"""
from app.services.llm_service import LLMService
from app.utils.prompt_loader import load_prompt, PromptLoaderError
from app.utils.discussion_guide_schema import DiscussionGuideValidator
from app.models.focus_group import FocusGroup
from typing import Dict, Any, Optional, List, Union
import json
import asyncio
import logging
import os
# Set up logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class FocusGroupService:
"""Service for focus group operations."""
@staticmethod
async def generate_discussion_guide(
focus_group_name: str,
research_brief: str,
discussion_topics: str,
duration: int = 60,
temperature: float = 0.7,
max_retries: int = 3,
focus_group_id: Optional[str] = None,
llm_model: Optional[str] = None
) -> Dict[str, Any]:
"""
Generate a focus group discussion guide using the LLM with retry logic.
Args:
focus_group_name: The name of the focus group
research_brief: The research objectives and context
discussion_topics: Key topics to be covered in the discussion
duration: Duration of the focus group in minutes
temperature: Controls randomness in generation
max_retries: Maximum number of retry attempts
focus_group_id: Optional focus group ID to check for uploaded assets
llm_model: Optional LLM model to use for generation
Returns:
A structured JSON discussion guide (dict)
Raises:
Exception: If all retry attempts fail
"""
logger.info(f"Generating discussion guide: '{focus_group_name}' ({duration}min)")
# Calculate approximate section times based on duration
total_minutes = int(duration)
intro_time = max(5, int(total_minutes * 0.1))
warmup_time = max(5, int(total_minutes * 0.15))
main_topics_time = max(20, int(total_minutes * 0.5))
conclusion_time = max(5, int(total_minutes * 0.1))
remaining_time = total_minutes - (intro_time + warmup_time + main_topics_time + conclusion_time)
# Adjust main topics time to account for remaining time
main_topics_time += remaining_time
# Calculate content scaling parameters based on duration
if total_minutes <= 45:
duration_category = "short"
recommended_main_topics = min(2, len([topic.strip() for topic in discussion_topics.split(',')]))
questions_per_warmup = 2
questions_per_subsection = 2
include_creative_exercises = False
probe_questions_per_main = 1
elif total_minutes <= 75:
duration_category = "medium"
recommended_main_topics = min(3, len([topic.strip() for topic in discussion_topics.split(',')]))
questions_per_warmup = 3
questions_per_subsection = 3
include_creative_exercises = True
probe_questions_per_main = 2
else: # 76+ minutes
duration_category = "long"
recommended_main_topics = min(4, len([topic.strip() for topic in discussion_topics.split(',')]))
questions_per_warmup = 4
questions_per_subsection = 4
include_creative_exercises = True
probe_questions_per_main = 3
# Parse topics into a list
topic_list = [topic.strip() for topic in discussion_topics.split(',')]
# Check for uploaded creative assets if focus_group_id is provided
uploaded_assets = []
if focus_group_id:
try:
uploaded_assets = await FocusGroup.get_uploaded_assets(focus_group_id)
if uploaded_assets:
logger.info(f"Retrieved {len(uploaded_assets)} assets for focus group {focus_group_id}")
except Exception as e:
logger.error(f"Could not retrieve assets for focus group {focus_group_id}: {e}")
# Load and format the discussion guide prompt
try:
# Prepare template variables
template_vars = {
'focus_group_name': focus_group_name,
'research_brief': research_brief,
'discussion_topics': ', '.join(topic_list),
'duration': duration,
'intro_time': intro_time,
'warmup_time': warmup_time,
'main_topics_time': main_topics_time,
'conclusion_time': conclusion_time,
'duration_category': duration_category,
'recommended_main_topics': recommended_main_topics,
'questions_per_warmup': questions_per_warmup,
'questions_per_subsection': questions_per_subsection,
'include_creative_exercises': include_creative_exercises,
'probe_questions_per_main': probe_questions_per_main,
'uploaded_assets': uploaded_assets,
'has_assets': len(uploaded_assets) > 0,
'asset_count': len(uploaded_assets),
'asset_requirement_note': ' (will require creative review activities)' if len(uploaded_assets) > 0 else '',
# Create a formatted list of asset display references for the LLM
'uploaded_asset_list': '\n'.join([f"- {DiscussionGuideValidator.generate_display_reference(uploaded_assets, i)} ({asset.get('original_name', asset.get('original_filename', 'unknown'))})" for i, asset in enumerate(uploaded_assets)]) if uploaded_assets else 'No assets uploaded',
# Conditional content for asset sections
'assets_section': FocusGroupService._generate_assets_section(uploaded_assets) if uploaded_assets else 'No creative assets have been uploaded for this focus group.'
}
prompt = load_prompt('discussion-guide-generation', template_vars)
logger.info(f"Starting discussion guide generation: {len(uploaded_assets)} assets, {llm_model or 'default'} model")
except PromptLoaderError as e:
error_msg = f"Error loading discussion guide prompt: {str(e)}"
logger.error(error_msg)
raise Exception(error_msg)
# Retry logic with exponential backoff
last_error = None
for attempt in range(1, max_retries + 1):
try:
# Special handling for GPT models to ensure creative review compliance
enhanced_prompt = prompt
if llm_model and llm_model.startswith('gpt'):
# Add extra emphasis for GPT models about creative review requirements
if uploaded_assets and len(uploaded_assets) > 0:
asset_emphasis = f"\n\n🚨🚨🚨 CRITICAL FOR GPT MODELS - READ THIS FIRST 🚨🚨🚨\n"
asset_emphasis += f"YOU ABSOLUTELY MUST INCLUDE EXACTLY {len(uploaded_assets)} ACTIVITIES WITH type='creative_review'\n"
asset_emphasis += f"EACH activity must reference ONE of these display references in content AND include metadata:\n"
for i, asset in enumerate(uploaded_assets):
display_ref = DiscussionGuideValidator.generate_display_reference(uploaded_assets, i)
asset_emphasis += f"- Display Reference: '{display_ref}' -> Filename: {asset.get('filename', 'unknown')}\n"
asset_emphasis += f"FAILURE TO INCLUDE ALL {len(uploaded_assets)} CREATIVE_REVIEW ACTIVITIES WITH PROPER METADATA WILL RESULT IN INVALID OUTPUT\n"
asset_emphasis += f"🚨🚨🚨 END CRITICAL INSTRUCTIONS 🚨🚨🚨\n\n"
enhanced_prompt = asset_emphasis + prompt
# Generate content using LLM
response = await LLMService.generate_content(
prompt=enhanced_prompt,
temperature=temperature,
max_tokens=16000, # Use a much higher token limit to avoid truncation
model_name=llm_model
)
# Clean up the response to remove code fences if present
clean_response = response.strip()
if clean_response.startswith("```json"):
clean_response = clean_response[7:].strip()
elif clean_response.startswith("```"):
clean_response = clean_response[3:].strip()
# Remove trailing code fence if present
if clean_response.endswith("```"):
clean_response = clean_response[:-3].strip()
logger.info(f"Cleaned response (length: {len(clean_response)} chars)")
# Try to parse as JSON
try:
guide_json = json.loads(clean_response)
logger.info(f"Successfully parsed JSON response")
# Validate the JSON structure
is_valid, validation_errors = DiscussionGuideValidator.validate_json_structure(guide_json)
if is_valid:
# Validate creative review activities if assets were uploaded
if uploaded_assets and len(uploaded_assets) > 0:
creative_review_count = 0
creative_review_activities = []
sections = guide_json.get('sections', [])
# Count creative_review activities across all sections
for section in sections:
activities = section.get('activities', [])
for activity in activities:
if activity.get('type') == 'creative_review':
creative_review_count += 1
creative_review_activities.append({
'section': section.get('title', 'Unknown'),
'content': activity.get('content', 'No content')[:100] + '...'
})
# Also check in subsections
subsections = section.get('subsections', [])
for subsection in subsections:
activities = subsection.get('activities', [])
for activity in activities:
if activity.get('type') == 'creative_review':
creative_review_count += 1
creative_review_activities.append({
'section': f"{section.get('title', 'Unknown')} > {subsection.get('title', 'Unknown')}",
'content': activity.get('content', 'No content')[:100] + '...'
})
# Also check questions in subsections for creative_review type
questions = subsection.get('questions', [])
for question in questions:
if question.get('type') == 'creative_review':
creative_review_count += 1
creative_review_activities.append({
'section': f"{section.get('title', 'Unknown')} > {subsection.get('title', 'Unknown')} (question)",
'content': question.get('content', 'No content')[:100] + '...'
})
logger.info(f"=== CREATIVE REVIEW VALIDATION RESULTS (Model: {llm_model or 'gpt-5.4'}) ===")
logger.info(f"Found {creative_review_count} creative_review activities for {len(uploaded_assets)} uploaded assets")
if creative_review_activities:
logger.info("Creative review activities found:")
for i, activity in enumerate(creative_review_activities):
logger.info(f" {i+1}. Section: {activity['section']}")
logger.info(f" Content: {activity['content']}")
# If no creative review activities were generated, retry with enhanced prompt
if creative_review_count == 0:
logger.warning(f"❌ WARNING: No creative_review activities generated despite {len(uploaded_assets)} uploaded assets!")
logger.warning(f"❌ This suggests {llm_model or 'gpt-5.4'} is not following the creative asset instructions")
# For GPT models, if this was already the enhanced prompt, we have a serious issue
if llm_model and llm_model.startswith('gpt') and attempt < max_retries:
logger.warning(f"❌ GPT model failed to generate creative_review activities. Will retry with even more explicit instructions.")
# This will trigger a retry with the next attempt
raise Exception(f"GPT model failed to generate required creative_review activities")
elif creative_review_count < len(uploaded_assets):
logger.warning(f"⚠️ WARNING: Only {creative_review_count} creative_review activities generated for {len(uploaded_assets)} assets")
# For GPT models with incomplete creative reviews, also consider this a failure worth retrying
if llm_model and llm_model.startswith('gpt') and attempt < max_retries:
logger.warning(f"⚠️ GPT model generated incomplete creative_review activities. Will retry.")
raise Exception(f"GPT model generated only {creative_review_count}/{len(uploaded_assets)} required creative_review activities")
else:
logger.info(f"✅ Good: {creative_review_count} creative_review activities generated for {len(uploaded_assets)} assets")
logger.info(f"Discussion guide generation successful on attempt {attempt}/{max_retries}")
logger.info(f"Generated guide has {len(guide_json.get('sections', []))} sections")
# Post-process the discussion guide to add visual asset metadata to creative_review activities
if uploaded_assets and len(uploaded_assets) > 0:
logger.info(f"Post-processing discussion guide to add visual asset metadata")
guide_json = FocusGroupService._add_visual_asset_metadata_to_guide(guide_json, uploaded_assets)
return guide_json
else:
error_msg = f"Generated JSON failed validation: {validation_errors}"
logger.warning(error_msg)
last_error = Exception(error_msg)
except json.JSONDecodeError as e:
error_msg = f"Failed to parse generated response as JSON: {str(e)}"
logger.warning(error_msg)
logger.debug(f"Raw response that failed to parse: {clean_response[:500]}...")
last_error = Exception(error_msg)
except Exception as e:
logger.warning(f"Generation attempt {attempt} failed: {str(e)}")
last_error = e
# If this wasn't the last attempt, wait before retrying (exponential backoff)
if attempt < max_retries:
wait_time = 2 ** (attempt - 1) # 1, 2, 4 seconds
logger.info(f"Retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})")
await asyncio.sleep(wait_time)
# All attempts failed
final_error_msg = f"Discussion guide generation failed after {max_retries} attempts. Last error: {str(last_error)}"
logger.error(final_error_msg)
raise Exception(final_error_msg)
@staticmethod
def _generate_assets_section(uploaded_assets: List[Dict[str, Any]]) -> str:
"""Generate the assets section content for the discussion guide prompt."""
if not uploaded_assets:
return 'No creative assets have been uploaded for this focus group.'
asset_count = len(uploaded_assets)
# Create list of display references and asset metadata for the LLM
asset_entries = []
for i, asset in enumerate(uploaded_assets):
display_ref = DiscussionGuideValidator.generate_display_reference(uploaded_assets, i)
asset_entries.append({
'display_reference': display_ref,
'filename': asset.get('filename', 'unknown'),
'original_name': asset.get('original_name', asset.get('original_filename', 'unknown'))
})
uploaded_asset_list = '\n'.join([f"- {entry['display_reference']} (original: {entry['original_name']})" for entry in asset_entries])
asset_metadata_list = '\n'.join([f"- Display Reference: '{entry['display_reference']}' -> System Filename: {entry['filename']}" for entry in asset_entries])
@staticmethod
def _add_visual_asset_metadata_to_guide(guide_json: Dict[str, Any], uploaded_assets: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Post-process the discussion guide to add visual asset metadata to creative_review activities.
This ensures that moderator systems can identify which asset each activity references.
"""
from app.utils.discussion_guide_schema import DiscussionGuideValidator
# Create a mapping of display references to asset data
asset_mapping = {}
for i, asset in enumerate(uploaded_assets):
display_ref = DiscussionGuideValidator.generate_display_reference(uploaded_assets, i)
asset_mapping[display_ref.lower()] = {
'filename': asset.get('filename'),
'display_reference': display_ref
}
processed_count = 0
# Process all sections
sections = guide_json.get('sections', [])
for section in sections:
# Process activities in section
activities = section.get('activities', [])
for activity in activities:
if activity.get('type') == 'creative_review':
if FocusGroupService._add_metadata_to_activity(activity, asset_mapping):
processed_count += 1
# Process questions in section (some may be creative_review type)
questions = section.get('questions', [])
for question in questions:
if question.get('type') == 'creative_review':
if FocusGroupService._add_metadata_to_activity(question, asset_mapping):
processed_count += 1
# Process subsections
subsections = section.get('subsections', [])
for subsection in subsections:
# Process activities in subsection
activities = subsection.get('activities', [])
for activity in activities:
if activity.get('type') == 'creative_review':
if FocusGroupService._add_metadata_to_activity(activity, asset_mapping):
processed_count += 1
# Process questions in subsection
questions = subsection.get('questions', [])
for question in questions:
if question.get('type') == 'creative_review':
if FocusGroupService._add_metadata_to_activity(question, asset_mapping):
processed_count += 1
print(f"✅ POST-PROCESS: Added metadata to {processed_count} creative_review activities")
return guide_json
@staticmethod
def _add_metadata_to_activity(activity: Dict[str, Any], asset_mapping: Dict[str, Dict[str, str]]) -> bool:
"""
Add visual asset metadata to a single activity based on its content.
Returns True if metadata was added, False otherwise.
"""
content = activity.get('content', '').lower()
# Find which asset this activity references by checking content for display references
matched_asset = None
for display_ref, asset_data in asset_mapping.items():
if display_ref in content:
matched_asset = asset_data
break
if matched_asset:
# Add metadata to the activity
if 'metadata' not in activity:
activity['metadata'] = {}
activity['metadata']['visual_asset'] = {
'filename': matched_asset['filename'],
'display_reference': matched_asset['display_reference']
}
print(f"📎 Added metadata to activity: {matched_asset['display_reference']} -> {matched_asset['filename']}")
return True
else:
print(f"⚠️ Could not match creative_review activity to asset: {activity.get('content', '')[:50]}...")
return False
@staticmethod
def _generate_assets_section(uploaded_assets: List[Dict[str, Any]]) -> str:
"""Generate the assets section content for the discussion guide prompt."""
if not uploaded_assets:
return 'No creative assets have been uploaded for this focus group.'
asset_count = len(uploaded_assets)
# Create list of display references and asset metadata for the LLM
asset_entries = []
for i, asset in enumerate(uploaded_assets):
display_ref = DiscussionGuideValidator.generate_display_reference(uploaded_assets, i)
asset_entries.append({
'display_reference': display_ref,
'filename': asset.get('filename', 'unknown'),
'original_name': asset.get('original_name', asset.get('original_filename', 'unknown'))
})
uploaded_asset_list = '\n'.join([f"- {entry['display_reference']} (original: {entry['original_name']})" for entry in asset_entries])
asset_metadata_list = '\n'.join([f"- Display Reference: '{entry['display_reference']}' -> System Filename: {entry['filename']}" for entry in asset_entries])
return f"""🚨 CRITICAL REQUIREMENT: This focus group has {asset_count} uploaded creative asset(s) that MUST be included in the discussion guide.
**MANDATORY CREATIVE REVIEW ACTIVITIES:**
YOU MUST CREATE EXACTLY {asset_count} "creative_review" ACTIVITIES - ONE FOR EACH ASSET BELOW:
**UPLOADED ASSETS:**
{uploaded_asset_list}
**CREATIVE REVIEW ACTIVITY REQUIREMENTS:**
- CREATE one "creative_review" activity for EACH asset listed above
- Each activity type MUST be "creative_review" (not "open_question" or any other type)
- MANDATORY: Reference the display name (e.g., "Asset 1", "My Campaign Ad") in the activity content - DO NOT use system filenames
- Example format: "Please review [DISPLAY_REFERENCE] on your screen. What is your immediate gut reaction? What words come to mind?"
- Distribute these activities throughout different sections (not all in one place)
- Allow 3-5 minutes per creative review activity
- Add 1-2 probe questions after each creative review
**IMPORTANT METADATA REQUIREMENTS:**
For each creative_review activity, you MUST also include metadata that maps the display reference to the system filename:
```json
{{
"id": "creative_review_1",
"type": "creative_review",
"content": "Please review Asset 1 on your screen. What is your immediate gut reaction?",
"metadata": {{
"visual_asset": {{
"filename": "fg-123-abc.jpg",
"display_reference": "Asset 1"
}}
}}
}}
```
**ASSET METADATA MAPPING:**
{asset_metadata_list}
**VALIDATION CHECKLIST:**
Before finalizing your JSON, verify:
□ You have created exactly {asset_count} activities with type "creative_review"
□ Each creative_review activity references a display name (not system filename) in the content
□ Each creative_review activity has proper metadata with visual_asset field
□ Creative review activities are spread across different sections of the guide
□ Each creative review activity has adequate time allocation
**CREATIVE ASSET INTEGRATION:**
- Integrate creative review activities naturally into the flow of discussion
- Place creative assets strategically within relevant topic sections
- Ensure creative reviews don't dominate the discussion - balance with other questions
- Use creative assets to support and enhance the main discussion topics"""