import logging from quart import Blueprint, request, jsonify, Response, send_file, current_app from app.auth.quart_jwt import jwt_required, get_jwt_identity from app.models.focus_group import FocusGroup logger = logging.getLogger(__name__) from app.models.persona import Persona from app.services.focus_group_service import FocusGroupService from app.services.image_description_service import ImageDescriptionService, ImageDescriptionError from app.services.task_manager import CancellableTask from bson import ObjectId import datetime import json import os import asyncio import uuid import tempfile from werkzeug.utils import secure_filename from werkzeug.datastructures import FileStorage from app.utils import make_serializable # Direct file processing utility for temp directory issues def process_files_directly_from_request_stream(request, logger): """ Process uploaded files directly from request stream before it gets consumed. This is the primary fallback when temp directories are unavailable. """ try: from io import BytesIO from werkzeug.datastructures import FileStorage import re # Get the raw input stream - try cached data first raw_data = None # Try to get cached data from before_request hook try: from quart import g if hasattr(g, 'cached_request_data'): raw_data = g.cached_request_data logger.info("Using cached request data from before_request hook") except Exception as g_err: logger.debug(f"Could not access request cache from g: {g_err}") # If no cached data, try to read from stream if not raw_data: try: raw_data = request.stream.read() if not raw_data: raw_data = request.get_data(cache=False) except Exception as stream_error: logger.warning(f"Could not read from request stream: {stream_error}") try: raw_data = request.get_data(cache=False) except Exception as get_data_error: logger.warning(f"Could not get data from request: {get_data_error}") raw_data = None if not raw_data: logger.warning("No raw data available in request stream") return [] logger.info(f"Processing {len(raw_data)} bytes from request stream") # Look for multipart boundary in Content-Type header content_type = request.headers.get('Content-Type', '') boundary_match = re.search(r'boundary=([^;,\s]+)', content_type) if not boundary_match: logger.warning("No multipart boundary found in Content-Type") return [] boundary = boundary_match.group(1).strip('"') logger.info(f"Using boundary: {boundary}") # Split by boundary parts = raw_data.split(f'--{boundary}'.encode()) files = [] for i, part in enumerate(parts): if b'Content-Disposition: form-data' in part and b'filename=' in part: try: # Extract filename filename_match = re.search(rb'filename="([^"]+)"', part) if not filename_match: continue filename = filename_match.group(1).decode('utf-8') logger.info(f"Processing file: {filename}") # Find the file data (after the headers, marked by \r\n\r\n) headers_end = part.find(b'\r\n\r\n') if headers_end == -1: logger.warning(f"No header boundary found for {filename}") continue # Extract file data file_data = part[headers_end + 4:] # Clean up trailing boundary markers and CRLF file_data = file_data.rstrip(b'\r\n-') if len(file_data) > 0: # Determine content type from headers if available content_type_match = re.search(rb'Content-Type:\s*([^\r\n]+)', part) detected_content_type = 'image/jpeg' # default if content_type_match: detected_content_type = content_type_match.group(1).decode('utf-8').strip() # Create a FileStorage-like object from the extracted data file_stream = BytesIO(file_data) file_obj = FileStorage( stream=file_stream, filename=filename, content_type=detected_content_type ) files.append(file_obj) logger.info(f"Successfully extracted: {filename} ({len(file_data)} bytes, {detected_content_type})") else: logger.warning(f"No data found for file: {filename}") except Exception as part_error: logger.warning(f"Failed to process file part: {part_error}") continue logger.info(f"Direct stream processing completed: {len(files)} files extracted") return files except Exception as e: logger.error(f"Direct stream processing failed: {e}") return [] def process_files_directly_from_request(request, logger): """ Process uploaded files directly from request data without using temp files. This is a fallback when the system can't access temp directories. """ try: from io import BytesIO from werkzeug.datastructures import FileStorage import re # Try to extract files from the raw request data # This is a simplified approach for small files raw_data = request.get_data() if not raw_data: logger.warning("No raw data in request") return [] # Look for multipart boundary content_type = request.headers.get('Content-Type', '') boundary_match = re.search(r'boundary=([^;]+)', content_type) if not boundary_match: logger.warning("No multipart boundary found") return [] boundary = boundary_match.group(1).strip('"') # Split by boundary parts = raw_data.split(f'--{boundary}'.encode()) files = [] for part in parts: if b'Content-Disposition: form-data' in part and b'filename=' in part: try: # Extract filename filename_match = re.search(rb'filename="([^"]+)"', part) if not filename_match: continue filename = filename_match.group(1).decode('utf-8') # Find the file data (after the headers) data_start = part.find(b'\r\n\r\n') if data_start == -1: continue file_data = part[data_start + 4:] # Remove trailing boundary markers file_data = file_data.rstrip(b'\r\n-') if len(file_data) > 0: # Create a FileStorage-like object from the extracted data file_obj = FileStorage( stream=BytesIO(file_data), filename=filename, content_type='image/jpeg' # Default, will be validated later ) files.append(file_obj) logger.info(f"Extracted file: {filename} ({len(file_data)} bytes)") except Exception as part_error: logger.warning(f"Failed to process part: {part_error}") continue return files except Exception as e: logger.error(f"Direct file processing failed: {e}") return [] # Asset upload utility functions - defined early for initialization def setup_temp_directory(): """Set up a temporary directory for file processing.""" try: # Try to use the backend directory as temp space if system temp is not available base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # Go up to backend/ temp_dir = os.path.join(base_dir, 'temp') # Only try to create and use temp directory if we have write permissions try: os.makedirs(temp_dir, exist_ok=True) # Test write permissions test_file = os.path.join(temp_dir, 'test_write') with open(test_file, 'w') as f: f.write('test') os.remove(test_file) # Set the TMPDIR environment variable to use our custom temp directory os.environ['TMPDIR'] = temp_dir tempfile.tempdir = temp_dir return temp_dir except (OSError, PermissionError): # If we can't write to temp directory, return None to skip temp directory usage logger.warning(f"Cannot write to temp directory {temp_dir}, will process files directly") return None except Exception as e: logger.warning(f"Could not set up temp directory: {e}") return None focus_groups_bp = Blueprint('focus_groups', __name__) # Initialize temp directory when the module is imported try: setup_temp_directory() except Exception as e: logger.warning(f"Could not initialize temp directory during module import: {e}") # Request data cache for direct processing request_data_cache = {} # Temporarily disable this before_request handler due to Quart ASGI context issues # @focus_groups_bp.before_request def cache_multipart_data(): """Cache multipart request data only when temp directories are unavailable.""" try: from quart import request, g # Safely check if we have an active request context if not request: return # Safely check request properties - handle Quart/Flask differences endpoint = getattr(request, 'endpoint', None) method = getattr(request, 'method', None) content_type = getattr(request, 'content_type', None) # Only cache for asset upload endpoints when temp directory issues are expected if (endpoint and 'upload_assets' in str(endpoint) and method == 'POST' and content_type and 'multipart/form-data' in content_type): # Check if temp directory is available - if so, let Quart handle normally temp_dir = setup_temp_directory() if temp_dir: # Temp directory is available, skip caching to allow normal processing return # Enable the rest of the caching logic if needed # For now, just return to prevent context errors return else: # Not an upload endpoint, skip processing return except (RuntimeError, AttributeError, Exception) as e: # Handle "Working outside of request context" gracefully # This can happen during startup or shutdown with ASGI return @focus_groups_bp.route('', methods=['GET']) @focus_groups_bp.route('/', methods=['GET']) @jwt_required() async def get_focus_groups(): import logging logger = logging.getLogger('app.focus_groups') try: logger.debug("=== GET focus groups API called ===") user_id = get_jwt_identity() logger.debug(f"User ID from JWT: {user_id}") focus_groups = await FocusGroup.find_by_user(user_id) logger.debug(f"Found {len(focus_groups)} total focus groups") # Make focus groups serializable logger.debug("Converting focus groups to serializable format") serializable_groups = make_serializable(focus_groups) logger.debug(f"Returning {len(serializable_groups)} serialized focus groups") logger.debug(f"Sample focus group data: {serializable_groups[:1] if serializable_groups else 'None'}") return jsonify(serializable_groups), 200 except Exception as e: logger.error(f"Error in get_focus_groups: {e}") logger.exception("Full exception traceback:") return jsonify({"error": str(e)}), 500 @focus_groups_bp.route('/all', methods=['GET']) @jwt_required() async def get_all_focus_groups(): try: user_id = get_jwt_identity() focus_groups = await FocusGroup.find_by_user(user_id) serializable_groups = make_serializable(focus_groups) return jsonify(serializable_groups), 200 except Exception as e: logger.error(f"Error in get_all_focus_groups: {e}") return jsonify({"error": str(e)}), 500 @focus_groups_bp.route('/', methods=['GET']) @jwt_required() async def get_focus_group(focus_group_id): try: user_id = get_jwt_identity() focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"message": "Focus group not found"}), 404 if focus_group.get("created_by") and focus_group.get("created_by") != user_id: return jsonify({"message": "Permission denied"}), 403 # Process participants count if needed if 'participants' in focus_group and focus_group['participants'] and isinstance(focus_group['participants'], list): focus_group['participants_count'] = len(focus_group['participants']) # Expand participants data if 'participants' in focus_group and focus_group['participants']: participants_data = [] for persona_id in focus_group['participants']: try: persona = await Persona.find_by_id(persona_id) if persona: participants_data.append(persona) except Exception as e: logger.error(f"Error fetching participant {persona_id}: {e}") focus_group['participants_data'] = participants_data # Make focus group serializable serializable_group = make_serializable(focus_group) return jsonify(serializable_group), 200 except Exception as e: logger.error(f"Error in get_focus_group: {e}") return jsonify({"error": str(e)}), 500 @focus_groups_bp.route('', methods=['POST']) @focus_groups_bp.route('/', methods=['POST']) @jwt_required() async def create_focus_group(): try: user_id = get_jwt_identity() # Use default user ID if not authenticated if not user_id: user_id = 'default_id' data = await request.get_json() if not data or not data.get('name'): return jsonify({"message": "Missing required fields"}), 400 # Remove _id fields if present to avoid conflicts if '_id' in data: del data['_id'] if 'id' in data: del data['id'] # Process participants if they're just IDs if 'participants' in data and isinstance(data['participants'], list): # Store participants count as a number if 'participants_count' not in data: data['participants_count'] = len(data['participants']) focus_group_id = await FocusGroup.create(data, user_id) # Get the created focus group to return focus_group = await FocusGroup.find_by_id(focus_group_id) return jsonify({ "message": "Focus group created successfully", "focus_group_id": focus_group_id, "focus_group": make_serializable(focus_group) }), 201 except Exception as e: logger.error(f"Error creating focus group: {e}") return jsonify({"message": f"Failed to create focus group: {str(e)}"}), 500 @focus_groups_bp.route('/', methods=['PUT']) @jwt_required() async def update_focus_group(focus_group_id): logger.debug(f"FOCUS GROUP UPDATE: focus_group_id={focus_group_id}") data = await request.get_json() if data and 'llm_model' in data: logger.debug(f"LLM MODEL UPDATE: {data['llm_model']} for {focus_group_id}") if not data: return jsonify({"message": "No data provided"}), 400 user_id = get_jwt_identity() focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"message": "Focus group not found"}), 404 if focus_group.get("created_by") != user_id: return jsonify({"message": "Permission denied"}), 403 success = await FocusGroup.update(focus_group_id, data, user_id=user_id) if success: return jsonify({"message": "Focus group updated successfully"}), 200 else: logger.error(f"Failed to update focus group {focus_group_id}") return jsonify({"message": "Failed to update focus group"}), 500 @focus_groups_bp.route('/', methods=['DELETE']) @jwt_required() async def delete_focus_group(focus_group_id): user_id = get_jwt_identity() focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"message": "Focus group not found"}), 404 if focus_group.get("created_by") != user_id: return jsonify({"message": "Permission denied"}), 403 success = await FocusGroup.delete(focus_group_id, user_id=user_id) if success: return jsonify({"message": "Focus group deleted successfully"}), 200 else: logger.error(f"Failed to delete focus group {focus_group_id}") return jsonify({"message": "Failed to delete focus group"}), 500 @focus_groups_bp.route('//participants', methods=['POST']) @jwt_required() async def add_participant(focus_group_id): user_id = get_jwt_identity() data = await request.get_json() if not data or not data.get('persona_id'): return jsonify({"message": "Missing persona_id"}), 400 persona_id = data.get('persona_id') # Verify focus group exists and belongs to user focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"message": "Focus group not found"}), 404 if focus_group.get("created_by") and focus_group.get("created_by") != user_id: return jsonify({"message": "Permission denied"}), 403 # Verify persona exists persona = await Persona.find_by_id(persona_id) if not persona: return jsonify({"message": "Persona not found"}), 404 success = await FocusGroup.add_participant(focus_group_id, persona_id) if success: return jsonify({"message": "Participant added successfully"}), 200 else: logger.error(f"Failed to add participant {persona_id} to focus group {focus_group_id}") return jsonify({"message": "Failed to add participant"}), 500 @focus_groups_bp.route('//participants/', methods=['DELETE']) @jwt_required() async def remove_participant(focus_group_id, persona_id): user_id = get_jwt_identity() # Verify focus group exists and belongs to user focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"message": "Focus group not found"}), 404 if focus_group.get("created_by") and focus_group.get("created_by") != user_id: return jsonify({"message": "Permission denied"}), 403 success = await FocusGroup.remove_participant(focus_group_id, persona_id) if success: return jsonify({"message": "Participant removed successfully"}), 200 else: logger.error(f"Failed to remove participant {persona_id} from focus group {focus_group_id}") return jsonify({"message": "Failed to remove participant"}), 500 @focus_groups_bp.route('//messages', methods=['GET']) @jwt_required() async def get_focus_group_messages(focus_group_id): """Get all messages for a focus group, including mode events.""" try: user_id = get_jwt_identity() # Verify focus group exists and belongs to user focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"message": "Focus group not found"}), 404 if focus_group.get("created_by") and focus_group.get("created_by") != user_id: return jsonify({"message": "Permission denied"}), 403 # Get messages and mode events messages = await FocusGroup.get_messages(focus_group_id) mode_events = await FocusGroup.get_mode_events(focus_group_id) # Make messages and events serializable and convert field names for frontend compatibility serializable_messages = make_serializable(messages) # Convert visual_asset field to visualAsset for frontend compatibility for message in serializable_messages: if 'visual_asset' in message and message['visual_asset']: message['visualAsset'] = message['visual_asset'] del message['visual_asset'] serializable_mode_events = make_serializable(mode_events) return jsonify({ "messages": serializable_messages, "mode_events": serializable_mode_events }), 200 except Exception as e: logger.error(f"Error in get_focus_group_messages: {e}") return jsonify({"error": str(e)}), 500 @focus_groups_bp.route('//messages', methods=['POST']) @jwt_required() async def add_focus_group_message(focus_group_id): """Add a new message to a focus group.""" try: # Get message data from request data = await request.get_json() if not data or not data.get('text'): return jsonify({"message": "Missing required fields"}), 400 # Verify focus group exists focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"message": "Focus group not found"}), 404 # Handle visual asset metadata for messages with visual context if data.get('visualAsset') and data.get('visualAsset', {}).get('filename'): visual_asset = data.get('visualAsset') filename = visual_asset.get('filename') # Add asset information for legacy compatibility data['attached_assets'] = [filename] data['activates_visual_context'] = True # Store visual asset metadata in proper format for database data['visual_asset'] = { 'filename': visual_asset.get('filename'), 'displayReference': visual_asset.get('displayReference') } logger.debug(f"MESSAGE WITH VISUAL ASSET: {visual_asset.get('displayReference')} -> {filename}") # Activate visual assets in the focus group for LLM context try: success = await FocusGroup._activate_visual_assets(focus_group_id, [filename], None) if success: logger.debug(f"VISUAL CONTEXT ACTIVATED: {filename} ({visual_asset.get('displayReference')})") else: logger.debug(f"⚠️ Failed to activate visual context for: {filename}") except Exception as activation_error: logger.debug(f"⚠️ Error activating visual context: {activation_error}") # Legacy fallback: Check if this is a facilitator message with a creative asset (for backward compatibility) elif data.get('senderId') == 'facilitator': try: from app.services.focus_group_response_service import extract_asset_filename_from_content # Extract asset filename from message text message_text = data.get('text', '') asset_filename = extract_asset_filename_from_content(message_text) if asset_filename: # Add visual context information to the message data data['attached_assets'] = [asset_filename] data['activates_visual_context'] = True logger.debug(f"LEGACY FACILITATOR MESSAGE: Detected creative asset: {asset_filename}") logger.debug(f"Message text: {message_text}") # Activate visual assets in the focus group for LLM context try: success = await FocusGroup._activate_visual_assets(focus_group_id, [asset_filename], None) if success: logger.debug(f"VISUAL CONTEXT ACTIVATED: {asset_filename}") else: logger.debug(f"⚠️ Failed to activate visual context for: {asset_filename}") except Exception as activation_error: logger.debug(f"⚠️ Error activating visual context: {activation_error}") except Exception as e: logger.debug(f"⚠️ Error checking for facilitator creative asset: {e}") # Debug: Log all message data for manual position setting if data.get('senderId') == 'moderator' and data.get('type') == 'question': logger.debug(f"🔍 MODERATOR MESSAGE DEBUG:") logger.debug(f" - Message text: {data.get('text', '')}") logger.debug(f" - Attached assets: {data.get('attached_assets', [])}") logger.debug(f" - Activates visual context: {data.get('activates_visual_context', False)}") # Add message message_id = await FocusGroup.add_message(focus_group_id, data) if not message_id: return jsonify({"message": "Failed to add message"}), 500 return jsonify({ "message": "Message added successfully", "message_id": message_id }), 201 except Exception as e: logger.error(f"Error in add_focus_group_message: {e}") return jsonify({"error": str(e)}), 500 @focus_groups_bp.route('//messages/', methods=['PATCH']) @jwt_required() async def update_focus_group_message(focus_group_id, message_id): """Update a message in a focus group, currently only for highlighted status.""" try: # Get message data from request data = await request.get_json() if data is None or 'highlighted' not in data: return jsonify({"message": "Missing highlighted field"}), 400 # Verify focus group exists focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"message": "Focus group not found"}), 404 # Update message highlight status success = await FocusGroup.update_message_highlight( focus_group_id, message_id, data['highlighted'] ) if not success: return jsonify({"message": "Failed to update message highlight status"}), 500 return jsonify({ "message": "Message highlight status updated successfully" }), 200 except Exception as e: logger.error(f"Error in update_focus_group_message: {e}") return jsonify({"error": str(e)}), 500 @focus_groups_bp.route('//notes', methods=['GET']) @jwt_required() async def get_focus_group_notes(focus_group_id): """Get all notes for a focus group.""" try: # Verify focus group exists focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"message": "Focus group not found"}), 404 # Get notes notes = await FocusGroup.get_notes(focus_group_id) # Make notes serializable serializable_notes = make_serializable(notes) return jsonify(serializable_notes), 200 except Exception as e: logger.error(f"Error in get_focus_group_notes: {e}") return jsonify({"error": str(e)}), 500 @focus_groups_bp.route('//notes', methods=['POST']) @jwt_required() async def add_focus_group_note(focus_group_id): """Add a new note to a focus group.""" try: # Get note data from request data = await request.get_json() if not data or not data.get('content'): return jsonify({"message": "Missing required fields"}), 400 # Verify focus group exists focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"message": "Focus group not found"}), 404 # Add note note_id = await FocusGroup.add_note(focus_group_id, data) if not note_id: return jsonify({"message": "Failed to add note"}), 500 # Get the created note to return notes = await FocusGroup.get_notes(focus_group_id) created_note = None for note in notes: if str(note.get('_id', '')) == str(note_id): created_note = note break return jsonify({ "message": "Note added successfully", "note_id": note_id, "note": make_serializable(created_note) if created_note else None }), 201 except Exception as e: logger.error(f"Error in add_focus_group_note: {e}") return jsonify({"error": str(e)}), 500 @focus_groups_bp.route('//notes/', methods=['DELETE']) @jwt_required() async def delete_focus_group_note(focus_group_id, note_id): """Delete a note from a focus group.""" try: # Verify focus group exists focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"message": "Focus group not found"}), 404 # Delete note success = await FocusGroup.delete_note(focus_group_id, note_id) if not success: return jsonify({"message": "Failed to delete note"}), 500 return jsonify({ "message": "Note deleted successfully" }), 200 except Exception as e: logger.error(f"Error in delete_focus_group_note: {e}") return jsonify({"error": str(e)}), 500 @focus_groups_bp.route('/generate-discussion-guide', methods=['POST']) @focus_groups_bp.route('//generate-discussion-guide', methods=['POST']) @jwt_required() async def generate_discussion_guide(focus_group_id=None): """Generate a discussion guide — returns task_id immediately (202), delivers result via WebSocket.""" try: data = await request.get_json() if not data: return jsonify({"error": "Missing request data", "can_retry": False}), 400 required_fields = ['name', 'description', 'objective', 'topic'] missing_fields = [f for f in required_fields if f not in data or not data[f]] if missing_fields: return jsonify({ "error": f"Missing required fields: {', '.join(missing_fields)}", "can_retry": False }), 400 user_id = get_jwt_identity() from app.services.task_manager import get_task_manager from app.websocket_manager_async import get_async_websocket_manager task_manager = get_task_manager() task_id = task_manager.generate_task_id() app = current_app._get_current_object() bg_task = asyncio.create_task( _run_discussion_guide_generation_bg(app, task_id, user_id, focus_group_id, data) ) await task_manager.register_task(bg_task, 'discussion_guide_generation', user_id, { 'focus_group_name': data['name'], 'focus_group_id': focus_group_id }, task_id=task_id) websocket_manager = get_async_websocket_manager() await websocket_manager.emit_to_user(user_id, 'task_started', { 'task_id': task_id, 'task_type': 'discussion_guide_generation', 'message': f'Started generating discussion guide for {data["name"]}' }) return jsonify({'task_id': task_id, 'message': 'Discussion guide generation started'}), 202 except Exception as e: logger.error(f"Discussion guide generation setup failed: {str(e)}") return jsonify({"error": str(e), "can_retry": True}), 500 async def _run_discussion_guide_generation_bg(app, task_id, user_id, focus_group_id, data): """Background coroutine: generates discussion guide and delivers result via WebSocket.""" from app.websocket_manager_async import get_async_websocket_manager websocket_manager = get_async_websocket_manager() async with app.app_context(): try: focus_group_name = data['name'] research_brief = f"{data['description']}\n\nResearch Objective: {data['objective']}" discussion_topics = data['topic'] duration = data.get('duration', 60) topic_mapping = { 'product-feedback': 'Product Feedback', 'creative-testing': 'Creative Testing', 'messaging-evaluation': 'Messaging Evaluation', 'user-experience': 'User Experience', 'market-research': 'Market Research' } formatted_topic = topic_mapping.get(discussion_topics, discussion_topics) if isinstance(discussion_topics, str) else 'General Discussion' llm_model = data.get('llm_model') if focus_group_id and not llm_model: try: fg = await FocusGroup.find_by_id(focus_group_id) if fg: llm_model = fg.get('llm_model') except Exception: pass discussion_guide = await FocusGroupService.generate_discussion_guide( focus_group_name=focus_group_name, research_brief=research_brief, discussion_topics=formatted_topic, duration=duration, temperature=0.7, focus_group_id=focus_group_id, llm_model=llm_model ) summary = None try: from app.services.focus_group_summary_service import generate_focus_group_summary summary = await generate_focus_group_summary({ 'name': focus_group_name, 'topic': formatted_topic, 'duration': duration, 'description': data.get('description', ''), 'discussionGuide': discussion_guide }, llm_model=llm_model) if focus_group_id and summary: await FocusGroup.update(focus_group_id, {'summary': summary}) except Exception as summary_err: app.logger.warning(f"Failed to generate summary (non-critical): {summary_err}") from app.services.task_manager import store_task_result await store_task_result(task_id, 'completed', result={ 'discussionGuide': discussion_guide, 'summary': summary }) await websocket_manager.emit_to_user(user_id, 'task_completed', { 'task_id': task_id, 'task_type': 'discussion_guide_generation', 'message': f'Successfully generated discussion guide for {focus_group_name}' }) except asyncio.CancelledError: from app.services.task_manager import store_task_result await store_task_result(task_id, 'cancelled') await websocket_manager.emit_to_user(user_id, 'task_cancelled', { 'task_id': task_id, 'task_type': 'discussion_guide_generation' }) except Exception as e: app.logger.error(f"Discussion guide generation background task failed: {str(e)}") from app.services.task_manager import store_task_result await store_task_result(task_id, 'failed', error=str(e)) await websocket_manager.emit_to_user(user_id, 'task_failed', { 'task_id': task_id, 'task_type': 'discussion_guide_generation', 'message': str(e) }) def convert_discussion_guide_to_markdown(discussion_guide, focus_group_name=None): """ Convert a discussion guide to markdown format. Handles both structured (JSON) and legacy (string) formats. """ timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Handle legacy string format if isinstance(discussion_guide, str): title = f"Discussion Guide: {focus_group_name}" if focus_group_name else "Discussion Guide" return f"""# {title} **Generated:** {timestamp} **Format:** Legacy Text Format --- {discussion_guide} --- *Exported from Semblance Synthetic Society*""" # Handle structured format if isinstance(discussion_guide, dict): title = f"Discussion Guide: {focus_group_name}" if focus_group_name else discussion_guide.get('title', 'Discussion Guide') total_duration = discussion_guide.get('total_duration', 'Unknown') markdown = f"""# {title} **Duration:** {total_duration} minutes **Generated:** {timestamp} --- """ # Process sections sections = discussion_guide.get('sections', []) for section_index, section in enumerate(sections): section_title = section.get('title', f'Section {section_index + 1}') markdown += f"## Section {section_index + 1}: {section_title}\n\n" # Add section content/description if section.get('content'): markdown += f"*{section['content']}*\n\n" # Add activities activities = section.get('activities', []) if activities: markdown += "### Activities\n\n" for activity_index, activity in enumerate(activities): markdown += format_discussion_item_markdown(activity, activity_index + 1, 'Activity') markdown += "\n" # Add questions questions = section.get('questions', []) if questions: markdown += "### Questions\n\n" for question_index, question in enumerate(questions): markdown += format_discussion_item_markdown(question, question_index + 1, 'Question') markdown += "\n" # Add subsections subsections = section.get('subsections', []) if subsections: for subsection_index, subsection in enumerate(subsections): subsection_title = subsection.get('title', f'Subsection {subsection_index + 1}') markdown += f"### Subsection {subsection_index + 1}: {subsection_title}\n\n" # Subsection activities sub_activities = subsection.get('activities', []) if sub_activities: markdown += "#### Activities\n\n" for activity_index, activity in enumerate(sub_activities): markdown += format_discussion_item_markdown(activity, activity_index + 1, 'Activity') markdown += "\n" # Subsection questions sub_questions = subsection.get('questions', []) if sub_questions: markdown += "#### Questions\n\n" for question_index, question in enumerate(sub_questions): markdown += format_discussion_item_markdown(question, question_index + 1, 'Question') markdown += "\n" markdown += "---\n\n" markdown += "*Exported from Semblance Synthetic Society*" return markdown # Fallback for unknown format return f"""# Discussion Guide **Generated:** {timestamp} --- Unable to parse discussion guide format. Raw content: {str(discussion_guide)} --- *Exported from Semblance Synthetic Society*""" def format_discussion_item_markdown(item, index, item_type): """Format a discussion item (question or activity) as markdown.""" item_type_display = item.get('type', 'unknown').replace('_', ' ').title() content = item.get('content', '') time_limit = item.get('time_limit') markdown = f"{index}. **{item_type_display}**" if time_limit: markdown += f" *({time_limit} min)*" markdown += f"\n {content}\n" # Add probe questions for questions if item_type == 'Question' and item.get('probes'): markdown += "\n **Probe Questions:**\n" for probe in item['probes']: markdown += f" - {probe}\n" markdown += "\n" return markdown def generate_discussion_guide_filename(focus_group_name=None, guide_title=None): """Generate a filename for the discussion guide download.""" date = datetime.datetime.now().strftime("%Y-%m-%d") base_name = 'discussion-guide' if focus_group_name: # Sanitize focus group name for filename sanitized_name = ''.join(c for c in focus_group_name if c.isalnum() or c in (' ', '-', '_')).rstrip() sanitized_name = sanitized_name.replace(' ', '-').lower() base_name = f"discussion-guide-{sanitized_name}" elif guide_title: # Fallback to guide title sanitized_title = ''.join(c for c in guide_title if c.isalnum() or c in (' ', '-', '_')).rstrip() sanitized_title = sanitized_title.replace(' ', '-').lower() base_name = f"discussion-guide-{sanitized_title}" return f"{base_name}-{date}.md" @focus_groups_bp.route('//discussion-guide/download', methods=['GET']) @jwt_required() async def download_discussion_guide(focus_group_id): """ Download the discussion guide for a focus group as a markdown file. Returns: A markdown file download of the discussion guide """ import logging logger = logging.getLogger('app.focus_groups') try: logger.debug(f"=== DOWNLOAD DISCUSSION GUIDE API called for focus group {focus_group_id} ===") # Verify focus group exists focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: logger.warning(f"Focus group not found: {focus_group_id}") return jsonify({"error": "Focus group not found"}), 404 focus_group_name = focus_group.get('name', 'Unnamed Focus Group') logger.info(f"Focus group found: {focus_group_name}") # Get discussion guide discussion_guide = focus_group.get('discussionGuide') if not discussion_guide: logger.warning(f"No discussion guide found for focus group {focus_group_id}") return jsonify({"error": "No discussion guide found for this focus group"}), 404 logger.info(f"Discussion guide found, type: {type(discussion_guide)}") # Convert to markdown try: markdown_content = convert_discussion_guide_to_markdown(discussion_guide, focus_group_name) logger.info(f"Discussion guide converted to markdown, length: {len(markdown_content)} characters") except Exception as e: logger.error(f"Failed to convert discussion guide to markdown: {str(e)}") return jsonify({"error": "Failed to convert discussion guide to markdown"}), 500 # Generate filename filename = generate_discussion_guide_filename(focus_group_name) logger.info(f"Generated filename: {filename}") # Create response with markdown content response = Response( markdown_content, mimetype='text/markdown', headers={ 'Content-Disposition': f'attachment; filename="{filename}"', 'Content-Type': 'text/markdown; charset=utf-8' } ) logger.info(f"✅ DOWNLOAD DISCUSSION GUIDE API completed successfully") return response except Exception as e: logger.error(f"Error in download_discussion_guide: {str(e)}") logger.exception("Full exception traceback:") return jsonify({ "error": "Failed to download discussion guide", "message": str(e) }), 500 # Additional asset upload utility functions def get_upload_folder(focus_group_id): """Get the upload folder path for a focus group.""" # Use absolute path to avoid working directory issues base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) # Go up to backend/ upload_dir = os.path.join(base_dir, 'uploads', f'focus-group-{focus_group_id}') return upload_dir def ensure_upload_folder(focus_group_id): """Ensure the upload folder exists for a focus group.""" upload_dir = get_upload_folder(focus_group_id) # Try to create subdirectory, but fall back to flat storage if filesystem is read-only try: os.makedirs(upload_dir, exist_ok=True) return upload_dir except (OSError, PermissionError) as e: logger.warning(f"Cannot create subdirectory {upload_dir}: {e}") logger.warning("Falling back to flat file storage in main uploads directory") # Use main uploads directory instead base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) main_upload_dir = os.path.join(base_dir, 'uploads') # Test if main directory is writable if os.path.isdir(main_upload_dir) and os.access(main_upload_dir, os.W_OK): return main_upload_dir else: raise OSError(f"Main uploads directory is not writable: {main_upload_dir}") IMAGE_EXTENSIONS = {'jpg', 'jpeg', 'png', 'gif', 'webp', 'bmp', 'tiff'} DOCUMENT_EXTENSIONS = {'pdf', 'docx', 'doc', 'txt', 'md', 'csv', 'xlsx', 'xls', 'pptx', 'ppt', 'rtf'} ALLOWED_EXTENSIONS = IMAGE_EXTENSIONS | DOCUMENT_EXTENSIONS DOCUMENT_MIME_TYPES = { 'pdf': 'application/pdf', 'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'doc': 'application/msword', 'txt': 'text/plain', 'md': 'text/markdown', 'csv': 'text/csv', 'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'xls': 'application/vnd.ms-excel', 'pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', 'ppt': 'application/vnd.ms-powerpoint', 'rtf': 'application/rtf', } def is_allowed_file(filename, allowed_extensions=None): """Check if file has an allowed extension.""" if allowed_extensions is None: allowed_extensions = ALLOWED_EXTENSIONS return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in allowed_extensions def validate_asset_file(file): """Validate uploaded asset file (images and documents).""" if not file: return False, "No file provided" if file.filename == '': return False, "No file selected" if not is_allowed_file(file.filename): return False, f"File type not allowed. Supported types: images (JPG, PNG, GIF, WebP) and documents (PDF, DOCX, TXT, XLSX, etc.)" # Check file size (50MB limit for documents, 10MB for images) ext = file.filename.rsplit('.', 1)[1].lower() if '.' in file.filename else '' size_limit = 50 * 1024 * 1024 if ext in DOCUMENT_EXTENSIONS else 10 * 1024 * 1024 try: current_pos = file.tell() file.seek(0, os.SEEK_END) file_size = file.tell() file.seek(current_pos) if file_size > size_limit: limit_mb = size_limit // (1024 * 1024) return False, f"File size exceeds {limit_mb}MB limit" except Exception as e: logger.warning(f"Could not validate file size: {e}") return True, "Valid file" # Keep old name as alias for backwards compat validate_image_file = validate_asset_file def save_uploaded_file_directly(file, file_path): """Save uploaded file directly to avoid temporary file issues.""" try: # Create the directory if it doesn't exist os.makedirs(os.path.dirname(file_path), exist_ok=True) # Read file content in chunks and write directly with open(file_path, 'wb') as f: file.seek(0) # Ensure we're at the beginning while True: chunk = file.read(8192) # Read in 8KB chunks if not chunk: break f.write(chunk) return True except Exception as e: logger.error(f"Error saving file directly: {e}") return False @focus_groups_bp.route('//assets', methods=['POST']) @jwt_required() async def upload_assets(focus_group_id): """Upload creative assets for a focus group.""" import logging logger = logging.getLogger('app.focus_groups') try: logger.debug(f"=== UPLOAD ASSETS API called for focus group {focus_group_id} ===") # Check for replace flag (Quart async form access) form_data = await request.form replace_existing = form_data.get('replace', '').lower() == 'true' logger.info(f"Replace existing assets flag: {replace_existing}") # Set up temporary directory for file processing (optional) temp_dir = setup_temp_directory() if temp_dir: logger.info(f"Using custom temporary directory: {temp_dir}") else: logger.info("No temp directory available, processing files directly") # Verify focus group exists focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: logger.warning(f"Focus group not found: {focus_group_id}") return jsonify({"error": "Focus group not found"}), 404 # If replace flag is set, clear existing assets first if replace_existing: logger.info("Replace flag set - clearing existing assets") existing_assets = focus_group.get('uploaded_assets', []) if existing_assets: logger.info(f"Deleting {len(existing_assets)} existing assets") # Delete existing files from disk for existing_asset in existing_assets: filename = existing_asset.get('filename') if filename: # Try to delete from both possible locations file_deleted = False # Try subdirectory location first upload_dir = get_upload_folder(focus_group_id) subdirectory_path = os.path.join(upload_dir, filename) if os.path.exists(subdirectory_path): try: os.remove(subdirectory_path) file_deleted = True logger.info(f"Deleted existing asset from subdirectory: {filename}") except Exception as e: logger.warning(f"Failed to delete {filename} from subdirectory: {e}") # Try flat storage location if not file_deleted: base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) main_upload_dir = os.path.join(base_dir, 'uploads') flat_path = os.path.join(main_upload_dir, filename) if os.path.exists(flat_path): try: os.remove(flat_path) file_deleted = True logger.info(f"Deleted existing asset from main uploads: {filename}") except Exception as e: logger.warning(f"Failed to delete {filename} from main uploads: {e}") if not file_deleted: logger.warning(f"Could not find or delete existing asset file: {filename}") # Clear assets from database success = await FocusGroup.clear_uploaded_assets(focus_group_id) if success: logger.info("Successfully cleared existing assets from database") else: logger.error("Failed to clear existing assets from database") return jsonify({"error": "Failed to clear existing assets"}), 500 # Try standard Flask file processing first (since temp directories are now working) files = None flask_processing_failed = False # Debug request information logger.info(f"Request content type: {request.content_type}") logger.info(f"Request content length: {request.content_length}") logger.info(f"Request method: {request.method}") # Get files using Quart async pattern files_data = await request.files logger.info(f"Request files keys: {list(files_data.keys())}") try: if 'assets' not in files_data: logger.warning(f"No 'assets' key in request.files. Available keys: {list(files_data.keys())}") return jsonify({"error": "No files provided"}), 400 files = files_data.getlist('assets') if not files or all(f.filename == '' for f in files): logger.warning("No files selected") return jsonify({"error": "No files selected"}), 400 logger.info(f"Successfully got {len(files)} files via standard Flask processing") except Exception as file_access_error: logger.warning(f"Standard Flask file processing failed: {file_access_error}") flask_processing_failed = True files = None # Fallback to direct processing if Flask processing failed if files is None and flask_processing_failed: logger.info("Attempting direct file processing as fallback...") try: if request.content_type and 'multipart/form-data' in request.content_type: files = process_files_directly_from_request_stream(request, logger) if files: logger.info(f"Successfully extracted {len(files)} files via direct processing") else: logger.warning("Direct processing found no files") except Exception as direct_error: logger.error(f"Direct file processing also failed: {direct_error}") return jsonify({ "error": "File upload temporarily unavailable", "details": "Server configuration issue with file processing. Please try uploading smaller files or contact support.", "code": "TEMP_DIR_ERROR", "can_retry": True }), 503 # Validate that we have files to process if not files: return jsonify({"error": "No valid files to process"}), 400 # Ensure upload directory exists upload_dir = ensure_upload_folder(focus_group_id) logger.info(f"Upload directory: {upload_dir}") uploaded_assets = [] errors = [] for file in files: try: # Validate file is_valid, error_message = validate_image_file(file) if not is_valid: errors.append(f"{file.filename}: {error_message}") continue # Generate unique filename with focus group prefix original_filename = secure_filename(file.filename) file_extension = original_filename.rsplit('.', 1)[1].lower() unique_filename = f"fg-{focus_group_id}-{uuid.uuid4().hex}.{file_extension}" # Save file using direct method to avoid temp directory issues file_path = os.path.join(upload_dir, unique_filename) # Try direct save first if not save_uploaded_file_directly(file, file_path): # Fallback to standard save method (Quart async version) try: await file.save(file_path) except Exception as save_error: logger.error(f"Both direct and standard file save failed: {save_error}") errors.append(f"{file.filename}: Save failed - {str(save_error)}") continue # Get file info file_size = os.path.getsize(file_path) # Determine mime type detected_mime = file.mimetype if not detected_mime or detected_mime in ('application/octet-stream', ''): detected_mime = DOCUMENT_MIME_TYPES.get(file_extension, f"image/{file_extension}") # Create asset metadata asset_metadata = { "filename": unique_filename, "original_name": original_filename, "size": file_size, "mime_type": detected_mime, "asset_type": "document" if file_extension in DOCUMENT_EXTENSIONS else "image", "upload_date": datetime.datetime.now(datetime.timezone.utc), "file_path": file_path } uploaded_assets.append(asset_metadata) logger.info(f"Successfully uploaded: {original_filename} -> {unique_filename}") except Exception as e: error_msg = f"{file.filename}: Upload failed - {str(e)}" errors.append(error_msg) logger.error(error_msg) if not uploaded_assets and errors: return jsonify({ "error": "All file uploads failed", "details": errors }), 400 # Update focus group with asset metadata if uploaded_assets: logger.info(f"Updating focus group {focus_group_id} with {len(uploaded_assets)} assets") logger.info(f"Asset metadata to save: {uploaded_assets}") success = await FocusGroup.add_uploaded_assets(focus_group_id, uploaded_assets) logger.info(f"Database update success: {success}") if not success: logger.error(f"Failed to save asset metadata to database for focus group {focus_group_id}") # Cleanup uploaded files if database update failed for asset in uploaded_assets: try: if os.path.exists(asset["file_path"]): os.remove(asset["file_path"]) except Exception as cleanup_err: logger.warning(f"Failed to delete asset file during cleanup: {cleanup_err}") return jsonify({"error": "Failed to update focus group with asset metadata"}), 500 else: logger.info(f"Successfully saved asset metadata to database") # DEBUG: Verify the data was saved by reading it back try: verification_assets = await FocusGroup.get_uploaded_assets(focus_group_id) logger.info(f"Verification: Found {len(verification_assets)} assets after save") logger.info(f"Verification asset data: {verification_assets}") except Exception as verify_error: logger.error(f"Verification failed: {verify_error}") response_data = { "message": f"Successfully uploaded {len(uploaded_assets)} asset(s)", "uploaded_assets": len(uploaded_assets), "assets": [ { "filename": asset["filename"], "original_name": asset["original_name"], "size": asset["size"], "mime_type": asset["mime_type"] } for asset in uploaded_assets ] } if errors: response_data["errors"] = errors response_data["message"] += f" ({len(errors)} failed)" logger.info(f"✅ UPLOAD ASSETS API completed successfully - {len(uploaded_assets)} assets uploaded") return jsonify(response_data), 201 except Exception as e: logger.error(f"Error in upload_assets: {str(e)}") logger.exception("Full exception traceback:") return jsonify({ "error": "Failed to upload assets", "message": str(e) }), 500 @focus_groups_bp.route('//assets', methods=['GET']) @jwt_required() async def get_assets(focus_group_id): """Get list of uploaded assets for a focus group.""" try: # Verify focus group exists focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"error": "Focus group not found"}), 404 # Get assets from focus group data assets = focus_group.get('uploaded_assets', []) # Return serializable asset data (exclude file_path for security) asset_list = [] for asset in assets: asset_info = { "filename": asset.get("filename"), "original_name": asset.get("original_name"), "user_assigned_name": asset.get("user_assigned_name"), "size": asset.get("size"), "mime_type": asset.get("mime_type"), "upload_date": asset.get("upload_date") } asset_list.append(make_serializable(asset_info)) return jsonify({ "assets": asset_list, "count": len(asset_list) }), 200 except Exception as e: logger.error(f"Error in get_assets: {e}") return jsonify({"error": str(e)}), 500 @focus_groups_bp.route('//assets/', methods=['GET']) @jwt_required() async def serve_asset(focus_group_id, filename): """Serve an uploaded asset file.""" try: # Verify focus group exists focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"error": "Focus group not found"}), 404 # Verify asset exists in focus group metadata assets = focus_group.get('uploaded_assets', []) asset = next((a for a in assets if a.get('filename') == filename), None) if not asset: return jsonify({"error": "Asset not found"}), 404 # Get file path - check both subdirectory and flat storage locations file_path = None # First try subdirectory location upload_dir = get_upload_folder(focus_group_id) subdirectory_path = os.path.join(upload_dir, filename) if os.path.isfile(subdirectory_path): file_path = subdirectory_path else: # Try flat storage location (main uploads directory) base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) main_upload_dir = os.path.join(base_dir, 'uploads') flat_path = os.path.join(main_upload_dir, filename) if os.path.isfile(flat_path): file_path = flat_path # Check if file exists if not file_path or not os.path.exists(file_path): return jsonify({"error": "Asset file not found on disk"}), 404 # Serve the file (Quart uses attachment_filename instead of download_name) return await send_file( file_path, mimetype=asset.get('mime_type', 'image/jpeg'), as_attachment=False, attachment_filename=asset.get('original_name', filename) ) except Exception as e: logger.error(f"Error in serve_asset: {e}") return jsonify({"error": str(e)}), 500 @focus_groups_bp.route('//assets/', methods=['DELETE']) @jwt_required() async def delete_asset(focus_group_id, filename): """Delete an uploaded asset.""" try: # Verify focus group exists focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"error": "Focus group not found"}), 404 # Remove asset from focus group metadata success = await FocusGroup.remove_uploaded_asset(focus_group_id, filename) if not success: return jsonify({"error": "Failed to update focus group metadata"}), 500 # Remove file from disk - check both locations file_deleted = False # Try subdirectory location first upload_dir = get_upload_folder(focus_group_id) subdirectory_path = os.path.join(upload_dir, filename) if os.path.exists(subdirectory_path): os.remove(subdirectory_path) file_deleted = True # Try flat storage location if not file_deleted: base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) main_upload_dir = os.path.join(base_dir, 'uploads') flat_path = os.path.join(main_upload_dir, filename) if os.path.exists(flat_path): os.remove(flat_path) file_deleted = True return jsonify({"message": "Asset deleted successfully"}), 200 except Exception as e: logger.error(f"Error in delete_asset: {e}") return jsonify({"error": str(e)}), 500 @focus_groups_bp.route('//assets/', methods=['PATCH']) @jwt_required() async def update_asset_name(focus_group_id, filename): """Update the user assigned name for an uploaded asset.""" try: # Verify focus group exists focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: return jsonify({"error": "Focus group not found"}), 404 # Get request data data = await request.get_json() if not data or 'user_assigned_name' not in data: return jsonify({"error": "Missing user_assigned_name field"}), 400 user_assigned_name = data['user_assigned_name'] # Validate that the asset exists assets = focus_group.get('uploaded_assets', []) asset = next((a for a in assets if a.get('filename') == filename), None) if not asset: return jsonify({"error": "Asset not found"}), 404 # Update the asset name success = await FocusGroup.update_asset_name(focus_group_id, filename, user_assigned_name) if not success: return jsonify({"error": "Failed to update asset name"}), 500 return jsonify({ "message": "Asset name updated successfully", "filename": filename, "user_assigned_name": user_assigned_name }), 200 except Exception as e: logger.error(f"Error in update_asset_name: {e}") return jsonify({"error": str(e)}), 500 @focus_groups_bp.route('//test-endpoint', methods=['POST']) @jwt_required() def test_endpoint(focus_group_id): """Test endpoint to verify routing is working.""" logger.debug(f"🔍 TEST ENDPOINT: Called for focus group {focus_group_id}") return jsonify({"message": "Test endpoint reached", "focus_group_id": focus_group_id}), 200 @focus_groups_bp.route('//test-websocket', methods=['POST']) @jwt_required() def test_websocket_emission(focus_group_id): """GPT-5 Sanity Check: Test WebSocket emission end-to-end.""" from app.models.focus_group import emit_websocket_event logger.debug(f"🔧 GPT-5 TEST: Testing WebSocket emission for focus group {focus_group_id}") # Test simple message emission as GPT-5 suggested emit_websocket_event("message_update", focus_group_id, { "id": "test-ping-" + str(uuid.uuid4())[:8], "text": "🔧 GPT-5 Test Ping", "sender": {"name": "Test System"}, "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat() }) return jsonify({ "message": "GPT-5 WebSocket test emission sent", "focus_group_id": focus_group_id, "event": "message_update" }), 200 async def _run_describe_asset_bg(app, task_id, user_id, focus_group_id, asset_filename): from app.websocket_manager_async import get_async_websocket_manager websocket_manager = get_async_websocket_manager() async with app.app_context(): try: description = await ImageDescriptionService.generate_description(focus_group_id, asset_filename) from app.services.task_manager import store_task_result await store_task_result(task_id, 'completed', result={ 'description': description, 'asset_filename': asset_filename }) await websocket_manager.emit_to_user(user_id, 'task_completed', { 'task_id': task_id, 'task_type': 'describe_asset' }) except asyncio.CancelledError: from app.services.task_manager import store_task_result await store_task_result(task_id, 'cancelled') await websocket_manager.emit_to_user(user_id, 'task_cancelled', { 'task_id': task_id, 'task_type': 'describe_asset' }) except Exception as e: from app.services.task_manager import store_task_result await store_task_result(task_id, 'failed', error=str(e)) await websocket_manager.emit_to_user(user_id, 'task_failed', { 'task_id': task_id, 'task_type': 'describe_asset', 'message': str(e) }) @focus_groups_bp.route('//describe-asset', methods=['POST']) @jwt_required() async def describe_asset(focus_group_id): """Generate AI description of an asset for enhanced creative review questions.""" logger.debug(f"API ENDPOINT: describe-asset called for focus group {focus_group_id}") try: # Verify focus group exists focus_group = await FocusGroup.find_by_id(focus_group_id) if not focus_group: logger.error(f"API: Focus group {focus_group_id} not found") return jsonify({"error": "Focus group not found"}), 404 # Get asset filename from request data = await request.get_json() if not data or 'asset_filename' not in data: logger.error(f"API: Missing asset_filename in request") return jsonify({"error": "Missing asset_filename in request"}), 400 asset_filename = data['asset_filename'] user_id = get_jwt_identity() from app.services.task_manager import get_task_manager from app.websocket_manager_async import get_async_websocket_manager task_manager = get_task_manager() task_id = task_manager.generate_task_id() websocket_manager = get_async_websocket_manager() app = current_app._get_current_object() bg_task = asyncio.create_task( _run_describe_asset_bg(app, task_id, user_id, focus_group_id, asset_filename) ) await task_manager.register_task(bg_task, 'describe_asset', user_id, {'focus_group_id': focus_group_id, 'asset_filename': asset_filename}, task_id=task_id) await websocket_manager.emit_to_user(user_id, 'task_started', { 'task_id': task_id, 'task_type': 'describe_asset', 'message': f'Started generating description for {asset_filename}' }) return jsonify({'task_id': task_id, 'message': 'Asset description generation started'}), 202 except Exception as e: logger.error(f"Error in describe_asset: {e}") return jsonify({"error": str(e)}), 500