Data isolation: - GET /tasks/<id>: verify requesting user owns the task (403 if not) - DELETE /tasks/<id>: same ownership check - GET /tasks/status: add @jwt_required() - GET /personas/<id>: add ownership check (403 if created_by != user) - GET /focus-groups/<id>: add ownership check - GET /focus-groups/<id>/messages: add ownership check - POST/DELETE /focus-groups/<id>/participants: add ownership check Fix conversation/decision 500: - Convert POST /conversation/decision to async 202+background (was synchronous LLM → timed out / LLM errors → 500) - Frontend polls waitForTaskResult for decision result before calling generateResponseAsync - GET /conversation/insights: return empty insights (200) on LLM error instead of 500 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
164 lines
No EOL
5.4 KiB
Python
Executable file
164 lines
No EOL
5.4 KiB
Python
Executable file
"""
|
|
Task management routes for handling cancellable operations.
|
|
"""
|
|
|
|
from quart import Blueprint, jsonify, request
|
|
from app.services.task_manager import get_task_manager
|
|
from app.websocket_manager_async import get_async_websocket_manager
|
|
from app.auth.quart_jwt import jwt_required, get_jwt_identity
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
tasks_bp = Blueprint('tasks', __name__)
|
|
|
|
|
|
@tasks_bp.route('/<task_id>', methods=['GET'])
|
|
@jwt_required()
|
|
async def get_task_result(task_id: str):
|
|
"""
|
|
Poll for task status and result.
|
|
Returns 200 with {task_id, status, task_type, result?, error?} if found.
|
|
Returns 404 if task not found (expired or never existed).
|
|
Returns 403 if task belongs to a different user.
|
|
"""
|
|
try:
|
|
user_id = get_jwt_identity()
|
|
task_manager = get_task_manager()
|
|
task_info = await task_manager.get_task_info(task_id)
|
|
if not task_info:
|
|
return jsonify({'error': 'Task not found or expired', 'task_id': task_id}), 404
|
|
if task_info.user_id and task_info.user_id != user_id:
|
|
return jsonify({'error': 'Access denied', 'task_id': task_id}), 403
|
|
data = await task_manager.get_task_status_dict(task_id)
|
|
return jsonify(data), 200
|
|
except Exception as e:
|
|
logger.error(f"Error fetching task result {task_id}: {str(e)}")
|
|
return jsonify({'error': 'Internal server error', 'task_id': task_id}), 500
|
|
|
|
|
|
@tasks_bp.route('/<task_id>', methods=['DELETE'])
|
|
@jwt_required()
|
|
async def cancel_task(task_id: str):
|
|
"""
|
|
Cancel a running task by its ID.
|
|
|
|
Args:
|
|
task_id: The unique identifier of the task to cancel
|
|
|
|
Returns:
|
|
JSON response indicating success or failure
|
|
"""
|
|
try:
|
|
task_manager = get_task_manager()
|
|
|
|
# Get task info before cancellation for ownership check + WebSocket notification
|
|
task_info = await task_manager.get_task_info(task_id)
|
|
|
|
if not task_info:
|
|
return jsonify({'error': 'Task not found or already completed', 'task_id': task_id}), 404
|
|
|
|
user_id = get_jwt_identity()
|
|
if task_info.user_id and task_info.user_id != user_id:
|
|
return jsonify({'error': 'Access denied', 'task_id': task_id}), 403
|
|
|
|
# Attempt to cancel the task
|
|
cancelled = await task_manager.cancel_task(task_id)
|
|
|
|
if not cancelled:
|
|
return jsonify({
|
|
'error': 'Task not found or already completed',
|
|
'task_id': task_id
|
|
}), 404
|
|
|
|
await task_manager.store_result(task_id, 'cancelled')
|
|
|
|
# Send WebSocket notification about cancellation
|
|
websocket_manager = get_async_websocket_manager()
|
|
if task_info and task_info.user_id:
|
|
await websocket_manager.emit_to_user(
|
|
task_info.user_id,
|
|
'task_cancelled',
|
|
{
|
|
'task_id': task_id,
|
|
'task_type': task_info.task_type,
|
|
'message': f'{task_info.task_type.replace("_", " ").title()} cancelled successfully'
|
|
}
|
|
)
|
|
|
|
logger.info(f"Successfully cancelled task {task_id}")
|
|
|
|
return jsonify({
|
|
'message': 'Task cancelled successfully',
|
|
'task_id': task_id,
|
|
'task_type': task_info.task_type if task_info else None
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cancelling task {task_id}: {str(e)}")
|
|
return jsonify({
|
|
'error': 'Internal server error while cancelling task',
|
|
'task_id': task_id
|
|
}), 500
|
|
|
|
|
|
@tasks_bp.route('/user/me', methods=['GET'])
|
|
@jwt_required()
|
|
async def get_user_tasks():
|
|
"""
|
|
Get all active tasks for the authenticated user.
|
|
|
|
Returns:
|
|
JSON response with list of active tasks
|
|
"""
|
|
try:
|
|
user_id = get_jwt_identity()
|
|
task_manager = get_task_manager()
|
|
user_tasks = await task_manager.get_user_tasks(user_id)
|
|
|
|
# Convert task info to JSON-serializable format
|
|
tasks_data = []
|
|
for task_id, task_info in user_tasks.items():
|
|
tasks_data.append({
|
|
'task_id': task_id,
|
|
'task_type': task_info.task_type,
|
|
'status': task_info.status,
|
|
'created_at': task_info.created_at.isoformat(),
|
|
'metadata': task_info.metadata
|
|
})
|
|
|
|
return jsonify({
|
|
'tasks': tasks_data,
|
|
'count': len(tasks_data)
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching tasks for user {user_id}: {str(e)}")
|
|
return jsonify({
|
|
'error': 'Internal server error while fetching user tasks'
|
|
}), 500
|
|
|
|
|
|
@tasks_bp.route('/status', methods=['GET'])
|
|
@jwt_required()
|
|
async def get_task_status():
|
|
"""
|
|
Get overall task manager status (for debugging/monitoring).
|
|
|
|
Returns:
|
|
JSON response with task manager statistics
|
|
"""
|
|
try:
|
|
task_manager = get_task_manager()
|
|
active_count = await task_manager.get_active_task_count()
|
|
|
|
return jsonify({
|
|
'active_tasks': active_count,
|
|
'status': 'operational'
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching task manager status: {str(e)}")
|
|
return jsonify({
|
|
'error': 'Internal server error while fetching status'
|
|
}), 500 |