""" Task management routes for handling cancellable operations. """ from quart import Blueprint, jsonify, request from app.services.task_manager import get_task_manager from app.websocket_manager_async import get_async_websocket_manager from app.auth.quart_jwt import jwt_required, get_jwt_identity import logging logger = logging.getLogger(__name__) tasks_bp = Blueprint('tasks', __name__) @tasks_bp.route('/', methods=['GET']) @jwt_required() async def get_task_result(task_id: str): """ Poll for task status and result. Returns 200 with {task_id, status, task_type, result?, error?} if found. Returns 404 if task not found (expired or never existed). Returns 403 if task belongs to a different user. """ try: user_id = get_jwt_identity() task_manager = get_task_manager() task_info = await task_manager.get_task_info(task_id) if not task_info: return jsonify({'error': 'Task not found or expired', 'task_id': task_id}), 404 if task_info.user_id and task_info.user_id != user_id: return jsonify({'error': 'Access denied', 'task_id': task_id}), 403 data = await task_manager.get_task_status_dict(task_id) return jsonify(data), 200 except Exception as e: logger.error(f"Error fetching task result {task_id}: {str(e)}") return jsonify({'error': 'Internal server error', 'task_id': task_id}), 500 @tasks_bp.route('/', methods=['DELETE']) @jwt_required() async def cancel_task(task_id: str): """ Cancel a running task by its ID. Args: task_id: The unique identifier of the task to cancel Returns: JSON response indicating success or failure """ try: task_manager = get_task_manager() # Get task info before cancellation for ownership check + WebSocket notification task_info = await task_manager.get_task_info(task_id) if not task_info: return jsonify({'error': 'Task not found or already completed', 'task_id': task_id}), 404 user_id = get_jwt_identity() if task_info.user_id and task_info.user_id != user_id: return jsonify({'error': 'Access denied', 'task_id': task_id}), 403 # Attempt to cancel the task cancelled = await task_manager.cancel_task(task_id) if not cancelled: return jsonify({ 'error': 'Task not found or already completed', 'task_id': task_id }), 404 await task_manager.store_result(task_id, 'cancelled') # Send WebSocket notification about cancellation websocket_manager = get_async_websocket_manager() if task_info and task_info.user_id: await websocket_manager.emit_to_user( task_info.user_id, 'task_cancelled', { 'task_id': task_id, 'task_type': task_info.task_type, 'message': f'{task_info.task_type.replace("_", " ").title()} cancelled successfully' } ) logger.info(f"Successfully cancelled task {task_id}") return jsonify({ 'message': 'Task cancelled successfully', 'task_id': task_id, 'task_type': task_info.task_type if task_info else None }), 200 except Exception as e: logger.error(f"Error cancelling task {task_id}: {str(e)}") return jsonify({ 'error': 'Internal server error while cancelling task', 'task_id': task_id }), 500 @tasks_bp.route('/user/me', methods=['GET']) @jwt_required() async def get_user_tasks(): """ Get all active tasks for the authenticated user. Returns: JSON response with list of active tasks """ try: user_id = get_jwt_identity() task_manager = get_task_manager() user_tasks = await task_manager.get_user_tasks(user_id) # Convert task info to JSON-serializable format tasks_data = [] for task_id, task_info in user_tasks.items(): tasks_data.append({ 'task_id': task_id, 'task_type': task_info.task_type, 'status': task_info.status, 'created_at': task_info.created_at.isoformat(), 'metadata': task_info.metadata }) return jsonify({ 'tasks': tasks_data, 'count': len(tasks_data) }), 200 except Exception as e: logger.error(f"Error fetching tasks for user {user_id}: {str(e)}") return jsonify({ 'error': 'Internal server error while fetching user tasks' }), 500 @tasks_bp.route('/status', methods=['GET']) @jwt_required() async def get_task_status(): """ Get overall task manager status (for debugging/monitoring). Returns: JSON response with task manager statistics """ try: task_manager = get_task_manager() active_count = await task_manager.get_active_task_count() return jsonify({ 'active_tasks': active_count, 'status': 'operational' }), 200 except Exception as e: logger.error(f"Error fetching task manager status: {str(e)}") return jsonify({ 'error': 'Internal server error while fetching status' }), 500