Migrate task result delivery from WebSocket to HTTP polling
Backend:
- task_manager.py: add result/error/completed_at storage, TTL sweeper (5min), store_task_result() helper
- tasks.py: add GET /<task_id> endpoint returning stored result; cancel route stores 'cancelled' status
- __init__.py: start TTL sweeper on app startup
- All 8 bg functions: store result before emitting lightweight WS hint (no payload data)
Frontend:
- src/lib/taskPolling.ts: waitForTaskResult() — polls GET /tasks/{id} every 2s, WS hint triggers immediate poll, 5min timeout
- src/hooks/useTaskPolling.ts: drop-in replacement for useCancellableGeneration using polling
- Migrate 6 Promise-based WS listeners → waitForTaskResult() in DiscussionPanel, FocusGroupSession (×2), PersonaProfile, PersonaModificationModal, useDiscussionGuideGeneration
- Migrate 3 hook-based consumers → useTaskPolling in AIRecruiter, SyntheticUsers, BulkExportProgressModal
Fixes WS Promise leak: polling survives disconnects, background tabs, page reloads.
WS events retained as zero-payload hints for near-zero latency when connected.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
c7034634e3
commit
1b387daacf
18 changed files with 659 additions and 394 deletions
|
|
@ -156,6 +156,12 @@ def create_app():
|
|||
app.register_blueprint(folders_bp, url_prefix='/api/folders')
|
||||
app.register_blueprint(tasks_bp, url_prefix='/api/tasks')
|
||||
|
||||
@app.before_serving
|
||||
async def start_task_sweeper():
|
||||
import asyncio
|
||||
from app.services.task_manager import get_task_manager
|
||||
asyncio.create_task(get_task_manager().start_sweeper())
|
||||
|
||||
# Health check endpoint
|
||||
@app.route('/api/health', methods=['GET'])
|
||||
def health_check():
|
||||
|
|
|
|||
|
|
@ -1138,24 +1138,29 @@ async def _run_persona_generation_bg(
|
|||
|
||||
print(f"🎉 Backend: Unified generation complete - {len(completed_personas)} personas created")
|
||||
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'persona_full_generation',
|
||||
'message': f'Successfully generated {len(completed_personas)} personas',
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'completed', result={
|
||||
'personas_created': len(completed_personas),
|
||||
'errors_count': len(failed_personas),
|
||||
'partial_success': len(failed_personas) > 0 and len(completed_personas) > 0
|
||||
})
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'persona_full_generation'
|
||||
})
|
||||
|
||||
except asyncio.CancelledError:
|
||||
app.logger.info(f"Persona generation task {task_id} cancelled")
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'cancelled')
|
||||
await websocket_manager.emit_to_user(user_id, 'task_cancelled', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'persona_full_generation',
|
||||
'message': 'Generation cancelled'
|
||||
'task_type': 'persona_full_generation'
|
||||
})
|
||||
except Exception as e:
|
||||
app.logger.error(f"Persona generation task {task_id} failed: {str(e)}")
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'failed', error=str(e))
|
||||
await websocket_manager.emit_to_user(user_id, 'task_failed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'persona_full_generation',
|
||||
|
|
|
|||
|
|
@ -209,15 +209,22 @@ Be genuine and specific in your feedback, drawing on your personal experiences a
|
|||
message_id = FocusGroup.add_message(focus_group_id, message_data)
|
||||
|
||||
if message_id:
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'generate_response',
|
||||
from datetime import datetime
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'completed', result={
|
||||
'response': response_text,
|
||||
'message_id': message_id,
|
||||
'persona_id': persona_id,
|
||||
'focus_group_id': focus_group_id
|
||||
'focus_group_id': focus_group_id,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
})
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'generate_response'
|
||||
})
|
||||
else:
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'failed', error='Failed to save message to database')
|
||||
await websocket_manager.emit_to_user(user_id, 'task_failed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'generate_response',
|
||||
|
|
@ -225,13 +232,16 @@ Be genuine and specific in your feedback, drawing on your personal experiences a
|
|||
})
|
||||
|
||||
except asyncio.CancelledError:
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'cancelled')
|
||||
await websocket_manager.emit_to_user(user_id, 'task_cancelled', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'generate_response',
|
||||
'message': 'Response generation was cancelled'
|
||||
'task_type': 'generate_response'
|
||||
})
|
||||
except Exception as e:
|
||||
app.logger.error(f"Error in _run_generate_response_bg: {str(e)}")
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'failed', error=str(e))
|
||||
await websocket_manager.emit_to_user(user_id, 'task_failed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'generate_response',
|
||||
|
|
@ -316,6 +326,8 @@ async def _run_key_themes_bg(app, task_id, user_id, focus_group_id, temperature)
|
|||
theme_ids = await FocusGroup.add_generated_themes(focus_group_id, themes)
|
||||
|
||||
if not theme_ids:
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'failed', error='The themes were generated but could not be saved to the database')
|
||||
await websocket_manager.emit_to_user(user_id, 'task_failed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'key_themes_generation',
|
||||
|
|
@ -334,21 +346,27 @@ async def _run_key_themes_bg(app, task_id, user_id, focus_group_id, temperature)
|
|||
"source": "generated"
|
||||
})
|
||||
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'key_themes_generation',
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'completed', result={
|
||||
'themes': formatted_themes,
|
||||
'focus_group_id': focus_group_id
|
||||
})
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'key_themes_generation'
|
||||
})
|
||||
|
||||
except asyncio.CancelledError:
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'cancelled')
|
||||
await websocket_manager.emit_to_user(user_id, 'task_cancelled', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'key_themes_generation',
|
||||
'message': 'Key themes generation was cancelled'
|
||||
'task_type': 'key_themes_generation'
|
||||
})
|
||||
except Exception as e:
|
||||
app.logger.error(f"Error in _run_key_themes_bg: {str(e)}")
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'failed', error=str(e))
|
||||
await websocket_manager.emit_to_user(user_id, 'task_failed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'key_themes_generation',
|
||||
|
|
|
|||
|
|
@ -833,23 +833,31 @@ async def _run_discussion_guide_generation_bg(app, task_id, user_id, focus_group
|
|||
except Exception as summary_err:
|
||||
app.logger.warning(f"Failed to generate summary (non-critical): {summary_err}")
|
||||
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'discussion_guide_generation',
|
||||
'message': f'Successfully generated discussion guide for {focus_group_name}',
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'completed', result={
|
||||
'discussionGuide': discussion_guide,
|
||||
'summary': summary
|
||||
})
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'discussion_guide_generation',
|
||||
'message': f'Successfully generated discussion guide for {focus_group_name}'
|
||||
})
|
||||
|
||||
except asyncio.CancelledError:
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'cancelled')
|
||||
await websocket_manager.emit_to_user(user_id, 'task_cancelled', {
|
||||
'task_id': task_id,
|
||||
'message': 'Discussion guide generation cancelled'
|
||||
'task_type': 'discussion_guide_generation'
|
||||
})
|
||||
except Exception as e:
|
||||
app.logger.error(f"Discussion guide generation background task failed: {str(e)}")
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'failed', error=str(e))
|
||||
await websocket_manager.emit_to_user(user_id, 'task_failed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'discussion_guide_generation',
|
||||
'message': str(e)
|
||||
})
|
||||
|
||||
|
|
@ -1598,18 +1606,25 @@ async def _run_describe_asset_bg(app, task_id, user_id, focus_group_id, asset_fi
|
|||
async with app.app_context():
|
||||
try:
|
||||
description = await ImageDescriptionService.generate_description(focus_group_id, asset_filename)
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'describe_asset',
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'completed', result={
|
||||
'description': description,
|
||||
'asset_filename': asset_filename
|
||||
})
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'describe_asset'
|
||||
})
|
||||
except asyncio.CancelledError:
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'cancelled')
|
||||
await websocket_manager.emit_to_user(user_id, 'task_cancelled', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'describe_asset'
|
||||
})
|
||||
except Exception as e:
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'failed', error=str(e))
|
||||
await websocket_manager.emit_to_user(user_id, 'task_failed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'describe_asset',
|
||||
|
|
|
|||
|
|
@ -210,20 +210,29 @@ async def _run_modify_persona_bg(app, task_id, user_id, persona_id, modification
|
|||
verbosity=verbosity,
|
||||
preview_only=preview_only
|
||||
)
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'persona_modification',
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'completed', result={
|
||||
'persona': make_serializable(modified_persona_data),
|
||||
'preview_only': preview_only
|
||||
})
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'persona_modification'
|
||||
})
|
||||
except asyncio.CancelledError:
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'cancelled')
|
||||
await websocket_manager.emit_to_user(user_id, 'task_cancelled', {'task_id': task_id})
|
||||
except PersonaModificationError as e:
|
||||
logger.error(f"Persona modification error: {e}")
|
||||
await websocket_manager.emit_to_user(user_id, 'task_failed', {'task_id': task_id, 'message': str(e)})
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'failed', error=str(e))
|
||||
await websocket_manager.emit_to_user(user_id, 'task_failed', {'task_id': task_id, 'task_type': 'persona_modification', 'message': str(e)})
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error in _run_modify_persona_bg: {e}")
|
||||
await websocket_manager.emit_to_user(user_id, 'task_failed', {'task_id': task_id, 'message': str(e)})
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'failed', error=str(e))
|
||||
await websocket_manager.emit_to_user(user_id, 'task_failed', {'task_id': task_id, 'task_type': 'persona_modification', 'message': str(e)})
|
||||
|
||||
@personas_bp.route('/<persona_id>/export-profile', methods=['POST'])
|
||||
@jwt_required()
|
||||
|
|
@ -279,32 +288,41 @@ async def _run_export_profile_bg(app, task_id, user_id, persona_id, llm_model, t
|
|||
llm_model=llm_model,
|
||||
temperature=temperature
|
||||
)
|
||||
from app.services.task_manager import store_task_result
|
||||
if result.get('success'):
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'export_profile',
|
||||
await store_task_result(task_id, 'completed', result={
|
||||
'success': True,
|
||||
'markdown_content': result['markdown_content'],
|
||||
'persona_name': result['persona_name'],
|
||||
'model_used': result.get('model_used')
|
||||
})
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'export_profile'
|
||||
})
|
||||
else:
|
||||
logger.debug(f"LLM generation failed, using fallback for persona {persona_id}")
|
||||
fallback_markdown = export_service.generate_fallback_markdown(persona_data)
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'export_profile',
|
||||
await store_task_result(task_id, 'completed', result={
|
||||
'success': True,
|
||||
'markdown_content': fallback_markdown,
|
||||
'persona_name': persona_data.get('name', 'Unknown'),
|
||||
'model_used': 'fallback',
|
||||
'warning': 'Used fallback formatting due to LLM error'
|
||||
})
|
||||
await websocket_manager.emit_to_user(user_id, 'task_completed', {
|
||||
'task_id': task_id,
|
||||
'task_type': 'export_profile'
|
||||
})
|
||||
except asyncio.CancelledError:
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'cancelled')
|
||||
await websocket_manager.emit_to_user(user_id, 'task_cancelled', {'task_id': task_id})
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error in _run_export_profile_bg: {e}")
|
||||
await websocket_manager.emit_to_user(user_id, 'task_failed', {'task_id': task_id, 'message': str(e)})
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'failed', error=str(e))
|
||||
await websocket_manager.emit_to_user(user_id, 'task_failed', {'task_id': task_id, 'task_type': 'export_profile', 'message': str(e)})
|
||||
|
||||
@personas_bp.route('/bulk-export', methods=['POST'])
|
||||
@jwt_required()
|
||||
|
|
|
|||
|
|
@ -13,6 +13,25 @@ logger = logging.getLogger(__name__)
|
|||
tasks_bp = Blueprint('tasks', __name__)
|
||||
|
||||
|
||||
@tasks_bp.route('/<task_id>', methods=['GET'])
|
||||
@jwt_required()
|
||||
async def get_task_result(task_id: str):
|
||||
"""
|
||||
Poll for task status and result.
|
||||
Returns 200 with {task_id, status, task_type, result?, error?} if found.
|
||||
Returns 404 if task not found (expired or never existed).
|
||||
"""
|
||||
try:
|
||||
task_manager = get_task_manager()
|
||||
data = await task_manager.get_task_status_dict(task_id)
|
||||
if not data:
|
||||
return jsonify({'error': 'Task not found or expired', 'task_id': task_id}), 404
|
||||
return jsonify(data), 200
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching task result {task_id}: {str(e)}")
|
||||
return jsonify({'error': 'Internal server error', 'task_id': task_id}), 500
|
||||
|
||||
|
||||
@tasks_bp.route('/<task_id>', methods=['DELETE'])
|
||||
@jwt_required()
|
||||
async def cancel_task(task_id: str):
|
||||
|
|
@ -39,7 +58,9 @@ async def cancel_task(task_id: str):
|
|||
'error': 'Task not found or already completed',
|
||||
'task_id': task_id
|
||||
}), 404
|
||||
|
||||
|
||||
await task_manager.store_result(task_id, 'cancelled')
|
||||
|
||||
# Send WebSocket notification about cancellation
|
||||
websocket_manager = get_async_websocket_manager()
|
||||
if task_info and task_info.user_id:
|
||||
|
|
|
|||
|
|
@ -454,35 +454,37 @@ class BulkPersonaExportService:
|
|||
if export_dir and os.path.exists(export_dir):
|
||||
import shutil
|
||||
shutil.rmtree(export_dir, ignore_errors=True)
|
||||
|
||||
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'cancelled')
|
||||
if self.websocket_manager:
|
||||
await self.websocket_manager.emit_to_user(
|
||||
user_id,
|
||||
'task_cancelled',
|
||||
{
|
||||
'task_id': task_id,
|
||||
'message': 'Export cancelled successfully'
|
||||
}
|
||||
{'task_id': task_id}
|
||||
)
|
||||
|
||||
|
||||
return False, "Export cancelled by user", task_id
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Bulk export error: {e}")
|
||||
if export_dir and os.path.exists(export_dir):
|
||||
import shutil
|
||||
shutil.rmtree(export_dir, ignore_errors=True)
|
||||
|
||||
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'failed', error=str(e))
|
||||
if self.websocket_manager:
|
||||
await self.websocket_manager.emit_to_user(
|
||||
user_id,
|
||||
'task_failed',
|
||||
{
|
||||
'task_id': task_id,
|
||||
'task_type': 'bulk_persona_export',
|
||||
'message': f'Export failed: {str(e)}'
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
return False, f"Export failed: {str(e)}", task_id
|
||||
|
||||
async def _export_as_markdown_zip(
|
||||
|
|
@ -551,20 +553,22 @@ class BulkPersonaExportService:
|
|||
total_personas, total_personas
|
||||
)
|
||||
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'completed', result={
|
||||
'file_path': zip_path
|
||||
})
|
||||
if self.websocket_manager:
|
||||
await self.websocket_manager.emit_to_user(
|
||||
user_id,
|
||||
'task_completed',
|
||||
{
|
||||
'task_id': task_id,
|
||||
'message': f'Successfully exported {total_personas} persona profiles',
|
||||
'file_path': zip_path,
|
||||
'file_size': file_size
|
||||
'task_type': 'bulk_persona_export'
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
return True, zip_path, task_id
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Markdown ZIP export error: {e}")
|
||||
return False, f"Markdown export failed: {str(e)}", task_id
|
||||
|
|
@ -618,20 +622,22 @@ class BulkPersonaExportService:
|
|||
total_personas, total_personas
|
||||
)
|
||||
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'completed', result={
|
||||
'file_path': zip_path
|
||||
})
|
||||
if self.websocket_manager:
|
||||
await self.websocket_manager.emit_to_user(
|
||||
user_id,
|
||||
'task_completed',
|
||||
{
|
||||
'task_id': task_id,
|
||||
'message': f'Successfully exported {total_personas} persona JSON files',
|
||||
'file_path': zip_path,
|
||||
'file_size': os.path.getsize(zip_path)
|
||||
'task_type': 'bulk_persona_export'
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
return True, zip_path, task_id
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"JSON ZIP export error: {e}")
|
||||
return False, f"JSON export failed: {str(e)}", task_id
|
||||
|
|
@ -732,20 +738,22 @@ class BulkPersonaExportService:
|
|||
total_personas, total_personas
|
||||
)
|
||||
|
||||
from app.services.task_manager import store_task_result
|
||||
await store_task_result(task_id, 'completed', result={
|
||||
'file_path': zip_path
|
||||
})
|
||||
if self.websocket_manager:
|
||||
await self.websocket_manager.emit_to_user(
|
||||
user_id,
|
||||
'task_completed',
|
||||
{
|
||||
'task_id': task_id,
|
||||
'message': f'Successfully exported {total_personas} persona CSV files',
|
||||
'file_path': zip_path,
|
||||
'file_size': file_size
|
||||
'task_type': 'bulk_persona_export'
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
return True, zip_path, task_id
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"CSV ZIP export error: {e}")
|
||||
return False, f"CSV export failed: {str(e)}", task_id
|
||||
|
|
@ -25,6 +25,9 @@ class TaskInfo:
|
|||
self.metadata = metadata or {}
|
||||
self.created_at = datetime.now(timezone.utc)
|
||||
self.status = "running"
|
||||
self.result: Optional[Dict[str, Any]] = None
|
||||
self.error: Optional[str] = None
|
||||
self.completed_at: Optional[datetime] = None
|
||||
|
||||
|
||||
class TaskManager:
|
||||
|
|
@ -106,7 +109,8 @@ class TaskManager:
|
|||
# Cancel the task
|
||||
task_info.task.cancel()
|
||||
task_info.status = "cancelled"
|
||||
|
||||
task_info.completed_at = datetime.now(timezone.utc)
|
||||
|
||||
logger.info(f"Cancelled task {task_id} of type {task_info.task_type}")
|
||||
return True
|
||||
|
||||
|
|
@ -124,14 +128,57 @@ class TaskManager:
|
|||
if task_info.user_id == user_id and not task_info.task.done()
|
||||
}
|
||||
|
||||
async def _cleanup_completed_task(self, task_id: str):
|
||||
"""Internal method to clean up completed tasks."""
|
||||
async def store_result(self, task_id: str, status: str, result: Dict[str, Any] = None, error: str = None):
|
||||
"""Store task result for polling. Called by background functions."""
|
||||
async with self._task_lock:
|
||||
task_info = self._tasks.get(task_id)
|
||||
if task_info:
|
||||
logger.info(f"Cleaning up completed task {task_id}")
|
||||
del self._tasks[task_id]
|
||||
task_info.status = status # 'completed', 'failed', 'cancelled'
|
||||
task_info.result = result
|
||||
task_info.error = error
|
||||
task_info.completed_at = datetime.now(timezone.utc)
|
||||
logger.info(f"Stored result for task {task_id}: status={status}")
|
||||
|
||||
async def get_task_status_dict(self, task_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get task status and result for polling endpoint."""
|
||||
async with self._task_lock:
|
||||
info = self._tasks.get(task_id)
|
||||
if not info:
|
||||
return None
|
||||
return {
|
||||
'task_id': info.task_id,
|
||||
'status': info.status,
|
||||
'task_type': info.task_type,
|
||||
'result': info.result,
|
||||
'error': info.error,
|
||||
'created_at': info.created_at.isoformat(),
|
||||
}
|
||||
|
||||
async def _cleanup_completed_task(self, task_id: str):
|
||||
"""Background task done — retain result for polling (TTL sweeper handles cleanup)."""
|
||||
logger.info(f"Task {task_id} asyncio Task finished — result retained for polling TTL")
|
||||
|
||||
RESULT_TTL_SECONDS = 300 # 5 minutes
|
||||
|
||||
async def sweep_expired_tasks(self):
|
||||
"""Remove tasks whose results have been retained past TTL."""
|
||||
now = datetime.now(timezone.utc)
|
||||
async with self._task_lock:
|
||||
expired = [
|
||||
tid for tid, info in self._tasks.items()
|
||||
if info.completed_at and (now - info.completed_at).total_seconds() > self.RESULT_TTL_SECONDS
|
||||
]
|
||||
for tid in expired:
|
||||
del self._tasks[tid]
|
||||
if expired:
|
||||
logger.info(f"Swept {len(expired)} expired task results")
|
||||
|
||||
async def start_sweeper(self):
|
||||
"""Background loop to periodically clean up expired task results."""
|
||||
while True:
|
||||
await asyncio.sleep(60)
|
||||
await self.sweep_expired_tasks()
|
||||
|
||||
async def get_active_task_count(self) -> int:
|
||||
"""Get the number of currently active tasks."""
|
||||
async with self._task_lock:
|
||||
|
|
@ -174,13 +221,18 @@ async def register_cancellable_task(
|
|||
async def cancel_task_by_id(task_id: str) -> bool:
|
||||
"""
|
||||
Convenience function to cancel a task by ID.
|
||||
|
||||
|
||||
Returns:
|
||||
True if task was found and cancelled, False otherwise
|
||||
"""
|
||||
return await get_task_manager().cancel_task(task_id)
|
||||
|
||||
|
||||
async def store_task_result(task_id: str, status: str, result: Dict[str, Any] = None, error: str = None):
|
||||
"""Convenience function to store a task result."""
|
||||
await get_task_manager().store_result(task_id, status, result, error)
|
||||
|
||||
|
||||
class CancellableTask:
|
||||
"""
|
||||
Context manager for creating cancellable tasks with automatic cleanup.
|
||||
|
|
|
|||
|
|
@ -9,8 +9,7 @@ import AIRecruiterForm, { formSchema } from './ai-recruiter/AIRecruiterForm';
|
|||
import PersonaReviewList from './ai-recruiter/PersonaReviewList';
|
||||
import { generateSyntheticPersonas } from '@/utils/personaGenerator';
|
||||
import { usePersonaStorage, GENERATED_PERSONAS_KEY } from '@/hooks/usePersonaStorage';
|
||||
import { useCancellableGeneration } from '@/hooks/useCancellableGeneration';
|
||||
import { getSocket } from '@/services/websocketServiceNew';
|
||||
import { useTaskPolling } from '@/hooks/useTaskPolling';
|
||||
import ProgressModal from '@/components/ui/ProgressModal';
|
||||
import { Persona } from "@/types/persona";
|
||||
|
||||
|
|
@ -24,10 +23,7 @@ export default function AIRecruiter({ targetFolderId, targetFolderName }: AIRecr
|
|||
const navigate = useNavigate();
|
||||
const { loadPersonas, savePersonas } = usePersonaStorage();
|
||||
|
||||
// Get WebSocket instance from singleton service
|
||||
const socket = getSocket();
|
||||
|
||||
const [generationState, generationControls] = useCancellableGeneration('persona generation', socket);
|
||||
const [generationState, generationControls] = useTaskPolling('persona generation');
|
||||
const [generatedPersonas, setGeneratedPersonas] = useState<Persona[]>([]);
|
||||
const [selectedPersonas, setSelectedPersonas] = useState<string[]>([]);
|
||||
const [showReview, setShowReview] = useState(false);
|
||||
|
|
@ -63,23 +59,22 @@ export default function AIRecruiter({ targetFolderId, targetFolderName }: AIRecr
|
|||
}
|
||||
}, [generationState.isGenerating, generationState.isCancelling, generationToastId]);
|
||||
|
||||
// Handle WebSocket task completion — personas arrive here
|
||||
// Handle task completion via polling result
|
||||
useEffect(() => {
|
||||
const handleTaskCompleted = (event: CustomEvent) => {
|
||||
const data = event.detail;
|
||||
if (data.task_id !== taskIdRef.current) return;
|
||||
if (!generationState.isComplete && !generationState.hasError) return;
|
||||
|
||||
generationControls.completeGeneration();
|
||||
if (generationToastId) {
|
||||
toast.dismiss(generationToastId);
|
||||
setGenerationToastId(null);
|
||||
}
|
||||
if (generationToastId) {
|
||||
toast.dismiss(generationToastId);
|
||||
setGenerationToastId(null);
|
||||
}
|
||||
|
||||
const count = data.personas_created || 0;
|
||||
const errorsCount = data.errors_count || 0;
|
||||
if (generationState.isComplete) {
|
||||
const result = generationState.result || {};
|
||||
const count = result.personas_created || 0;
|
||||
const errorsCount = result.errors_count || 0;
|
||||
|
||||
if (count > 0) {
|
||||
if (data.partial_success && errorsCount > 0) {
|
||||
if (result.partial_success && errorsCount > 0) {
|
||||
toast.success("Some personas generated successfully", {
|
||||
description: `${count} created, ${errorsCount} failed.`,
|
||||
duration: 8000
|
||||
|
|
@ -94,28 +89,11 @@ export default function AIRecruiter({ targetFolderId, targetFolderName }: AIRecr
|
|||
generationControls.failGeneration("No personas were generated");
|
||||
toast.error("Failed to generate personas", { description: "No personas were returned." });
|
||||
}
|
||||
};
|
||||
|
||||
const handleTaskFailed = (event: CustomEvent) => {
|
||||
const data = event.detail;
|
||||
if (data.task_id !== taskIdRef.current) return;
|
||||
|
||||
const errorMessage = data.message || "Please try again or adjust your parameters";
|
||||
generationControls.failGeneration(errorMessage);
|
||||
if (generationToastId) {
|
||||
toast.dismiss(generationToastId);
|
||||
setGenerationToastId(null);
|
||||
}
|
||||
} else if (generationState.hasError) {
|
||||
const errorMessage = generationState.error || "Please try again or adjust your parameters";
|
||||
toast.error("Failed to generate personas", { description: errorMessage, duration: 6000 });
|
||||
};
|
||||
|
||||
window.addEventListener('ws:task_completed', handleTaskCompleted as EventListener);
|
||||
window.addEventListener('ws:task_failed', handleTaskFailed as EventListener);
|
||||
return () => {
|
||||
window.removeEventListener('ws:task_completed', handleTaskCompleted as EventListener);
|
||||
window.removeEventListener('ws:task_failed', handleTaskFailed as EventListener);
|
||||
};
|
||||
}, [generationToastId, targetFolderName]);
|
||||
}
|
||||
}, [generationState.isComplete, generationState.hasError]);
|
||||
|
||||
async function onSubmit(values: z.infer<typeof formSchema>) {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import ReasoningPanel from './ReasoningPanel';
|
|||
import { Persona } from '@/types/persona';
|
||||
import { ModeEvent, Message } from './types';
|
||||
import { focusGroupsApi, focusGroupAiApi } from '@/lib/api';
|
||||
import { waitForTaskResult } from '@/lib/taskPolling';
|
||||
import { toast } from 'sonner';
|
||||
import ModeSwitchMarker from './ModeSwitchMarker';
|
||||
|
||||
|
|
@ -906,40 +907,24 @@ const DiscussionPanel = ({
|
|||
const taskId = response.data?.task_id;
|
||||
|
||||
if (response.status === 202 && taskId) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const handleCompleted = (event: CustomEvent) => {
|
||||
const detail = event.detail;
|
||||
if (detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
resolve({
|
||||
data: {
|
||||
response: detail.response,
|
||||
message_id: detail.message_id,
|
||||
persona_id: detail.persona_id,
|
||||
focus_group_id: detail.focus_group_id,
|
||||
timestamp: detail.timestamp
|
||||
}
|
||||
});
|
||||
const taskResult = await waitForTaskResult(taskId);
|
||||
if (taskResult.status === 'completed' && taskResult.result) {
|
||||
return {
|
||||
...response,
|
||||
data: {
|
||||
...response.data,
|
||||
response: taskResult.result.response,
|
||||
message_id: taskResult.result.message_id,
|
||||
persona_id: taskResult.result.persona_id,
|
||||
focus_group_id: taskResult.result.focus_group_id,
|
||||
timestamp: taskResult.result.timestamp
|
||||
}
|
||||
};
|
||||
const handleFailed = (event: CustomEvent) => {
|
||||
if (event.detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
reject(new Error(event.detail.message || 'Generation failed'));
|
||||
};
|
||||
const handleCancelled = (event: CustomEvent) => {
|
||||
if (event.detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
resolve(null);
|
||||
};
|
||||
const cleanup = () => {
|
||||
window.removeEventListener('ws:task_completed', handleCompleted as EventListener);
|
||||
window.removeEventListener('ws:task_failed', handleFailed as EventListener);
|
||||
window.removeEventListener('ws:task_cancelled', handleCancelled as EventListener);
|
||||
};
|
||||
window.addEventListener('ws:task_completed', handleCompleted as EventListener);
|
||||
window.addEventListener('ws:task_failed', handleFailed as EventListener);
|
||||
window.addEventListener('ws:task_cancelled', handleCancelled as EventListener);
|
||||
});
|
||||
} else if (taskResult.status === 'failed') {
|
||||
throw new Error(taskResult.error || 'Response generation failed');
|
||||
} else {
|
||||
return null; // cancelled
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: sync response
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ import {
|
|||
import { Textarea } from "@/components/ui/textarea";
|
||||
import { personasApi } from '@/lib/api';
|
||||
import { toastService } from '@/lib/toast';
|
||||
import { waitForTaskResult } from '@/lib/taskPolling';
|
||||
import { useCancellableGeneration } from '@/hooks/useCancellableGeneration';
|
||||
import { getSocket } from '@/services/websocketServiceNew';
|
||||
import ProgressModal from '@/components/ui/ProgressModal';
|
||||
|
|
@ -117,38 +118,18 @@ export default function PersonaModificationModal({
|
|||
|
||||
if (response.status === 202 && response.data?.task_id) {
|
||||
const taskId = response.data.task_id;
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const handleCompleted = (event: CustomEvent) => {
|
||||
if (event.detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
modificationControls.completeGeneration();
|
||||
toastService.success("Preview generated successfully!", {
|
||||
description: `Ready to review proposed changes to ${persona.name}`
|
||||
});
|
||||
onPersonaPreview(event.detail.persona);
|
||||
handleClose();
|
||||
resolve();
|
||||
};
|
||||
const handleFailed = (event: CustomEvent) => {
|
||||
if (event.detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
modificationControls.failGeneration(event.detail.message);
|
||||
reject(new Error(event.detail.message));
|
||||
};
|
||||
const handleCancelled = (event: CustomEvent) => {
|
||||
if (event.detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
resolve();
|
||||
};
|
||||
const cleanup = () => {
|
||||
window.removeEventListener('ws:task_completed', handleCompleted as EventListener);
|
||||
window.removeEventListener('ws:task_failed', handleFailed as EventListener);
|
||||
window.removeEventListener('ws:task_cancelled', handleCancelled as EventListener);
|
||||
};
|
||||
window.addEventListener('ws:task_completed', handleCompleted as EventListener);
|
||||
window.addEventListener('ws:task_failed', handleFailed as EventListener);
|
||||
window.addEventListener('ws:task_cancelled', handleCancelled as EventListener);
|
||||
});
|
||||
const taskResult = await waitForTaskResult(taskId);
|
||||
if (taskResult.status === 'completed' && taskResult.result?.persona) {
|
||||
modificationControls.completeGeneration();
|
||||
toastService.success("Preview generated successfully!", {
|
||||
description: `Ready to review proposed changes to ${persona.name}`
|
||||
});
|
||||
onPersonaPreview(taskResult.result.persona);
|
||||
handleClose();
|
||||
} else if (taskResult.status === 'failed') {
|
||||
throw new Error(taskResult.error || 'Modification failed');
|
||||
}
|
||||
// cancelled: do nothing
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import { ArrowLeft, Edit, Home, Users, User, Download, Bot } from 'lucide-react'
|
|||
import { useNavigation } from '@/contexts/NavigationContext';
|
||||
import { focusGroupsApi, personasApi } from '@/lib/api';
|
||||
import { toastService } from '@/lib/toast';
|
||||
import { waitForTaskResult } from '@/lib/taskPolling';
|
||||
|
||||
import { PersonaSidebar } from './PersonaSidebar';
|
||||
import { PersonaAttitudinalProfile } from './PersonaAttitudinalProfile';
|
||||
|
|
@ -112,54 +113,36 @@ export default function PersonaProfile() {
|
|||
|
||||
if (response.status === 202 && response.data?.task_id) {
|
||||
const taskId = response.data.task_id;
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const handleCompleted = (event: CustomEvent) => {
|
||||
if (event.detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
const { markdown_content, persona_name, model_used, warning } = event.detail;
|
||||
if (markdown_content) {
|
||||
const currentDate = new Date().toISOString().split('T')[0];
|
||||
const safePersonaName = persona_name.replace(/[^a-zA-Z0-9\-\s]/g, '').replace(/\s+/g, '-').toLowerCase();
|
||||
const filename = `${safePersonaName}-profile-${currentDate}.md`;
|
||||
const element = document.createElement('a');
|
||||
const file = new Blob([markdown_content], { type: 'text/markdown' });
|
||||
element.href = URL.createObjectURL(file);
|
||||
element.download = filename;
|
||||
document.body.appendChild(element);
|
||||
element.click();
|
||||
document.body.removeChild(element);
|
||||
if (warning) {
|
||||
toastService.success("Profile downloaded with fallback formatting", {
|
||||
description: `${persona_name} profile saved as ${filename}`
|
||||
});
|
||||
} else {
|
||||
const modelDisplay = model_used === 'gpt-4.1' ? 'GPT-4.1' : model_used;
|
||||
toastService.success("Profile downloaded successfully", {
|
||||
description: `${persona_name} profile processed with ${modelDisplay} and saved as ${filename}`
|
||||
});
|
||||
}
|
||||
const taskResult = await waitForTaskResult(taskId);
|
||||
if (taskResult.status === 'completed' && taskResult.result) {
|
||||
const { markdown_content, persona_name, model_used, warning } = taskResult.result;
|
||||
if (markdown_content) {
|
||||
const currentDate = new Date().toISOString().split('T')[0];
|
||||
const safePersonaName = persona_name.replace(/[^a-zA-Z0-9\-\s]/g, '').replace(/\s+/g, '-').toLowerCase();
|
||||
const filename = `${safePersonaName}-profile-${currentDate}.md`;
|
||||
const element = document.createElement('a');
|
||||
const file = new Blob([markdown_content], { type: 'text/markdown' });
|
||||
element.href = URL.createObjectURL(file);
|
||||
element.download = filename;
|
||||
document.body.appendChild(element);
|
||||
element.click();
|
||||
document.body.removeChild(element);
|
||||
if (warning) {
|
||||
toastService.success("Profile downloaded with fallback formatting", {
|
||||
description: `${persona_name} profile saved as ${filename}`
|
||||
});
|
||||
} else {
|
||||
const modelDisplay = model_used === 'gpt-4.1' ? 'GPT-4.1' : model_used;
|
||||
toastService.success("Profile downloaded successfully", {
|
||||
description: `${persona_name} profile processed with ${modelDisplay} and saved as ${filename}`
|
||||
});
|
||||
}
|
||||
resolve();
|
||||
};
|
||||
const handleFailed = (event: CustomEvent) => {
|
||||
if (event.detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
reject(new Error(event.detail.message));
|
||||
};
|
||||
const handleCancelled = (event: CustomEvent) => {
|
||||
if (event.detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
resolve();
|
||||
};
|
||||
const cleanup = () => {
|
||||
window.removeEventListener('ws:task_completed', handleCompleted as EventListener);
|
||||
window.removeEventListener('ws:task_failed', handleFailed as EventListener);
|
||||
window.removeEventListener('ws:task_cancelled', handleCancelled as EventListener);
|
||||
};
|
||||
window.addEventListener('ws:task_completed', handleCompleted as EventListener);
|
||||
window.addEventListener('ws:task_failed', handleFailed as EventListener);
|
||||
window.addEventListener('ws:task_cancelled', handleCancelled as EventListener);
|
||||
});
|
||||
}
|
||||
} else if (taskResult.status === 'failed') {
|
||||
throw new Error(taskResult.error || 'Export failed');
|
||||
} else {
|
||||
return; // cancelled
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import React, { useState, useEffect, useRef } from 'react';
|
||||
import { waitForTaskResult } from '@/lib/taskPolling';
|
||||
import { Dialog, DialogContent, DialogDescription, DialogHeader, DialogTitle } from '@/components/ui/dialog';
|
||||
import { Progress } from '@/components/ui/progress';
|
||||
import { Button } from '@/components/ui/button';
|
||||
|
|
@ -63,11 +64,11 @@ export const BulkExportProgressModal: React.FC<BulkExportProgressModalProps> = (
|
|||
}
|
||||
}, [isOpen, taskId]);
|
||||
|
||||
// Set up WebSocket event listeners for bulk export progress
|
||||
// Keep real-time WS listener for granular progress updates
|
||||
useEffect(() => {
|
||||
const handleBulkExportProgress = (event: CustomEvent) => {
|
||||
const data: BulkExportProgressData = event.detail;
|
||||
|
||||
|
||||
// Only handle events for our task
|
||||
if (data.task_id !== stateRef.current.taskId) {
|
||||
return;
|
||||
|
|
@ -79,70 +80,51 @@ export const BulkExportProgressModal: React.FC<BulkExportProgressModalProps> = (
|
|||
setCurrentPersonaName(data.current_persona_name || null);
|
||||
};
|
||||
|
||||
const handleTaskCompleted = (event: CustomEvent) => {
|
||||
const data = event.detail;
|
||||
|
||||
if (data.task_id !== stateRef.current.taskId) {
|
||||
return;
|
||||
}
|
||||
|
||||
setIsComplete(true);
|
||||
setProgress(100);
|
||||
setCurrentItem('Export completed successfully!');
|
||||
|
||||
// Store the file path for download
|
||||
if (data.file_path) {
|
||||
setDownloadFilePath(data.file_path);
|
||||
}
|
||||
|
||||
toast.success(`Successfully exported ${personaCount} persona profiles!`);
|
||||
};
|
||||
|
||||
const handleTaskFailed = (event: CustomEvent) => {
|
||||
const data = event.detail;
|
||||
|
||||
if (data.task_id !== stateRef.current.taskId) {
|
||||
return;
|
||||
}
|
||||
|
||||
setHasError(true);
|
||||
setErrorMessage(data.message || 'Export failed');
|
||||
setCurrentItem('Export failed');
|
||||
toast.error('Export failed: ' + (data.message || 'Unknown error'));
|
||||
};
|
||||
|
||||
const handleTaskCancelled = (event: CustomEvent) => {
|
||||
const data = event.detail;
|
||||
|
||||
if (data.task_id !== stateRef.current.taskId) {
|
||||
return;
|
||||
}
|
||||
|
||||
setIsCancelling(false);
|
||||
setCurrentItem('Export cancelled');
|
||||
toast.success('Export cancelled successfully');
|
||||
|
||||
// Close modal after short delay
|
||||
setTimeout(() => {
|
||||
onClose();
|
||||
}, 1000);
|
||||
};
|
||||
|
||||
// Register window event listeners
|
||||
window.addEventListener('ws:bulk_export_progress', handleBulkExportProgress as EventListener);
|
||||
window.addEventListener('ws:task_completed', handleTaskCompleted as EventListener);
|
||||
window.addEventListener('ws:task_failed', handleTaskFailed as EventListener);
|
||||
window.addEventListener('ws:task_cancelled', handleTaskCancelled as EventListener);
|
||||
|
||||
// Cleanup listeners
|
||||
return () => {
|
||||
window.removeEventListener('ws:bulk_export_progress', handleBulkExportProgress as EventListener);
|
||||
window.removeEventListener('ws:task_completed', handleTaskCompleted as EventListener);
|
||||
window.removeEventListener('ws:task_failed', handleTaskFailed as EventListener);
|
||||
window.removeEventListener('ws:task_cancelled', handleTaskCancelled as EventListener);
|
||||
};
|
||||
}, []); // Only set up once
|
||||
|
||||
// Poll for task completion / failure / cancellation via HTTP
|
||||
useEffect(() => {
|
||||
if (!taskId) return;
|
||||
|
||||
let cancelled = false;
|
||||
|
||||
waitForTaskResult(taskId).then((taskResult) => {
|
||||
if (cancelled) return;
|
||||
|
||||
if (taskResult.status === 'completed') {
|
||||
setIsComplete(true);
|
||||
setProgress(100);
|
||||
setCurrentItem('Export completed successfully!');
|
||||
if (taskResult.result?.file_path) {
|
||||
setDownloadFilePath(taskResult.result.file_path);
|
||||
}
|
||||
toast.success(`Successfully exported ${personaCount} persona profiles!`);
|
||||
} else if (taskResult.status === 'failed') {
|
||||
const msg = taskResult.error || 'Export failed';
|
||||
setHasError(true);
|
||||
setErrorMessage(msg);
|
||||
setCurrentItem('Export failed');
|
||||
toast.error('Export failed: ' + msg);
|
||||
} else if (taskResult.status === 'cancelled') {
|
||||
setIsCancelling(false);
|
||||
setCurrentItem('Export cancelled');
|
||||
toast.success('Export cancelled successfully');
|
||||
setTimeout(() => {
|
||||
onClose();
|
||||
}, 1000);
|
||||
}
|
||||
});
|
||||
|
||||
return () => {
|
||||
cancelled = true;
|
||||
};
|
||||
}, [taskId]);
|
||||
|
||||
const handleCancel = async () => {
|
||||
if (!taskId || isCancelling) return;
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import { UseFormReturn } from 'react-hook-form';
|
|||
import { toast } from 'sonner';
|
||||
import { focusGroupsApi } from '@/lib/api';
|
||||
import { useCancellableGeneration } from '@/hooks/useCancellableGeneration';
|
||||
import { waitForTaskResult } from '@/lib/taskPolling';
|
||||
import { getSocket } from '@/services/websocketServiceNew';
|
||||
|
||||
interface DiscussionGuideGenerationOptions {
|
||||
|
|
@ -87,47 +88,25 @@ export function useDiscussionGuideGeneration({
|
|||
guideGenerationControls.setTaskId(taskId);
|
||||
}
|
||||
|
||||
// Backend returns 202 immediately — wait for result via WebSocket
|
||||
// Backend returns 202 immediately — wait for result via HTTP polling
|
||||
if (response.status === 202 && taskId) {
|
||||
return new Promise<string>((resolve, reject) => {
|
||||
const handleCompleted = (event: CustomEvent) => {
|
||||
const detail = event.detail;
|
||||
if (detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
if (detail.discussionGuide) {
|
||||
guideGenerationControls.completeGeneration();
|
||||
resolve(detail.discussionGuide);
|
||||
} else {
|
||||
guideGenerationControls.failGeneration('No guide returned');
|
||||
reject(new Error('No discussion guide returned'));
|
||||
}
|
||||
};
|
||||
|
||||
const handleFailed = (event: CustomEvent) => {
|
||||
const detail = event.detail;
|
||||
if (detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
guideGenerationControls.failGeneration(detail.message || 'Generation failed');
|
||||
reject(new Error(detail.message || 'Generation failed'));
|
||||
};
|
||||
|
||||
const handleCancelled = (event: CustomEvent) => {
|
||||
const detail = event.detail;
|
||||
if (detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
resolve('');
|
||||
};
|
||||
|
||||
const cleanup = () => {
|
||||
window.removeEventListener('ws:task_completed', handleCompleted as EventListener);
|
||||
window.removeEventListener('ws:task_failed', handleFailed as EventListener);
|
||||
window.removeEventListener('ws:task_cancelled', handleCancelled as EventListener);
|
||||
};
|
||||
|
||||
window.addEventListener('ws:task_completed', handleCompleted as EventListener);
|
||||
window.addEventListener('ws:task_failed', handleFailed as EventListener);
|
||||
window.addEventListener('ws:task_cancelled', handleCancelled as EventListener);
|
||||
});
|
||||
const taskResult = await waitForTaskResult(taskId);
|
||||
if (taskResult.status === 'completed') {
|
||||
const guide = taskResult.result?.discussionGuide;
|
||||
if (guide) {
|
||||
guideGenerationControls.completeGeneration();
|
||||
return guide;
|
||||
} else {
|
||||
guideGenerationControls.failGeneration('No guide returned');
|
||||
throw new Error('No discussion guide returned');
|
||||
}
|
||||
} else if (taskResult.status === 'failed') {
|
||||
guideGenerationControls.failGeneration(taskResult.error || 'Generation failed');
|
||||
throw new Error(taskResult.error || 'Generation failed');
|
||||
} else {
|
||||
// cancelled
|
||||
return '';
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: synchronous response with guide in body
|
||||
|
|
|
|||
157
src/hooks/useTaskPolling.ts
Normal file
157
src/hooks/useTaskPolling.ts
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
/**
|
||||
* useTaskPolling — Drop-in replacement for useCancellableGeneration.
|
||||
* Polls GET /tasks/{taskId} instead of listening for WebSocket events.
|
||||
* Same state shape and controls interface for backward compatibility.
|
||||
*/
|
||||
import { useState, useEffect, useRef, useCallback } from 'react';
|
||||
import { waitForTaskResult } from '@/lib/taskPolling';
|
||||
import { cancelTaskWithFeedback } from '@/lib/taskCancellation';
|
||||
|
||||
interface TaskPollingState {
|
||||
isGenerating: boolean;
|
||||
isCancelling: boolean;
|
||||
hasError: boolean;
|
||||
isComplete: boolean;
|
||||
taskId: string | null;
|
||||
error: string | null;
|
||||
}
|
||||
|
||||
interface TaskPollingControls {
|
||||
startGeneration: (taskId?: string) => void;
|
||||
setTaskId: (id: string) => void;
|
||||
completeGeneration: () => void;
|
||||
failGeneration: (error: string) => void;
|
||||
cancelGeneration: () => Promise<boolean>;
|
||||
resetGeneration: () => void;
|
||||
}
|
||||
|
||||
const initialState: TaskPollingState = {
|
||||
isGenerating: false,
|
||||
isCancelling: false,
|
||||
hasError: false,
|
||||
isComplete: false,
|
||||
taskId: null,
|
||||
error: null,
|
||||
};
|
||||
|
||||
export function useTaskPolling(
|
||||
taskDescription: string = 'generation',
|
||||
_socket?: any // ignored — kept for API compatibility with useCancellableGeneration
|
||||
): [TaskPollingState, TaskPollingControls] {
|
||||
const [state, setState] = useState<TaskPollingState>(initialState);
|
||||
const stateRef = useRef(state);
|
||||
stateRef.current = state;
|
||||
|
||||
// When taskId is set and generation is active, start polling
|
||||
useEffect(() => {
|
||||
const { taskId, isGenerating } = state;
|
||||
if (!taskId || !isGenerating) return;
|
||||
|
||||
let cancelled = false;
|
||||
|
||||
const run = async () => {
|
||||
try {
|
||||
const result = await waitForTaskResult(taskId, { intervalMs: 2000, timeoutMs: 300000 });
|
||||
|
||||
if (cancelled) return;
|
||||
|
||||
if (result.status === 'completed') {
|
||||
setState(prev => ({ ...prev, isGenerating: false, isComplete: true, hasError: false, error: null }));
|
||||
} else if (result.status === 'failed') {
|
||||
setState(prev => ({
|
||||
...prev,
|
||||
isGenerating: false,
|
||||
isCancelling: false,
|
||||
hasError: true,
|
||||
error: result.error || 'Generation failed',
|
||||
}));
|
||||
} else if (result.status === 'cancelled') {
|
||||
setState(initialState);
|
||||
}
|
||||
} catch (err: any) {
|
||||
if (cancelled) return;
|
||||
setState(prev => ({
|
||||
...prev,
|
||||
isGenerating: false,
|
||||
isCancelling: false,
|
||||
hasError: true,
|
||||
error: err.message || 'Polling failed',
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
run();
|
||||
|
||||
return () => {
|
||||
cancelled = true;
|
||||
};
|
||||
}, [state.taskId, state.isGenerating]);
|
||||
|
||||
const startGeneration = useCallback((taskId?: string) => {
|
||||
setState({
|
||||
isGenerating: true,
|
||||
isCancelling: false,
|
||||
hasError: false,
|
||||
isComplete: false,
|
||||
taskId: taskId || null,
|
||||
error: null,
|
||||
});
|
||||
}, []);
|
||||
|
||||
const setTaskId = useCallback((id: string) => {
|
||||
setState(prev => ({ ...prev, taskId: id }));
|
||||
}, []);
|
||||
|
||||
const completeGeneration = useCallback(() => {
|
||||
setState(prev => ({
|
||||
...prev,
|
||||
isGenerating: false,
|
||||
isCancelling: false,
|
||||
isComplete: true,
|
||||
hasError: false,
|
||||
error: null,
|
||||
}));
|
||||
}, []);
|
||||
|
||||
const failGeneration = useCallback((error: string) => {
|
||||
setState(prev => ({
|
||||
...prev,
|
||||
isGenerating: false,
|
||||
isCancelling: false,
|
||||
hasError: true,
|
||||
isComplete: false,
|
||||
error,
|
||||
}));
|
||||
}, []);
|
||||
|
||||
const cancelGeneration = useCallback(async (): Promise<boolean> => {
|
||||
const currentTaskId = stateRef.current.taskId;
|
||||
if (!currentTaskId || stateRef.current.isCancelling) return false;
|
||||
|
||||
setState(prev => ({ ...prev, isCancelling: true }));
|
||||
const success = await cancelTaskWithFeedback(currentTaskId, taskDescription);
|
||||
|
||||
if (success) {
|
||||
setState(initialState);
|
||||
} else {
|
||||
setState(prev => ({ ...prev, isCancelling: false }));
|
||||
}
|
||||
|
||||
return success;
|
||||
}, [taskDescription]);
|
||||
|
||||
const resetGeneration = useCallback(() => {
|
||||
setState(initialState);
|
||||
}, []);
|
||||
|
||||
const controls: TaskPollingControls = {
|
||||
startGeneration,
|
||||
setTaskId,
|
||||
completeGeneration,
|
||||
failGeneration,
|
||||
cancelGeneration,
|
||||
resetGeneration,
|
||||
};
|
||||
|
||||
return [state, controls];
|
||||
}
|
||||
112
src/lib/taskPolling.ts
Normal file
112
src/lib/taskPolling.ts
Normal file
|
|
@ -0,0 +1,112 @@
|
|||
/**
|
||||
* HTTP polling utility for async LLM task results.
|
||||
* Replaces WebSocket-based task_completed event listeners.
|
||||
*
|
||||
* Backend stores results in task_manager (5-min TTL).
|
||||
* Poll GET /tasks/{taskId} until status !== 'running'.
|
||||
* WS hint events (ws:task_completed etc.) trigger immediate polls when WS is connected.
|
||||
*/
|
||||
import api from './api';
|
||||
|
||||
export interface TaskPollResult {
|
||||
task_id: string;
|
||||
status: 'running' | 'completed' | 'failed' | 'cancelled';
|
||||
task_type: string;
|
||||
result?: Record<string, any>;
|
||||
error?: string;
|
||||
created_at?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for an async task to complete by polling the backend.
|
||||
*
|
||||
* @param taskId - The task ID returned from the 202 response
|
||||
* @param opts.intervalMs - Polling interval (default: 2000ms)
|
||||
* @param opts.timeoutMs - Max wait time before rejecting (default: 300000ms = 5 min)
|
||||
* @returns Resolved TaskPollResult with status 'completed', 'failed', or 'cancelled'
|
||||
* @throws Error on timeout or network errors after retries
|
||||
*/
|
||||
export async function waitForTaskResult(
|
||||
taskId: string,
|
||||
opts: { intervalMs?: number; timeoutMs?: number } = {}
|
||||
): Promise<TaskPollResult> {
|
||||
const { intervalMs = 2000, timeoutMs = 300000 } = opts;
|
||||
const startTime = Date.now();
|
||||
let consecutiveErrors = 0;
|
||||
const MAX_CONSECUTIVE_ERRORS = 5;
|
||||
|
||||
return new Promise<TaskPollResult>((resolve, reject) => {
|
||||
let stopped = false;
|
||||
let timer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
const stop = () => {
|
||||
stopped = true;
|
||||
if (timer) {
|
||||
clearInterval(timer);
|
||||
timer = null;
|
||||
}
|
||||
window.removeEventListener('ws:task_completed', wsHint);
|
||||
window.removeEventListener('ws:task_failed', wsHint);
|
||||
window.removeEventListener('ws:task_cancelled', wsHint);
|
||||
};
|
||||
|
||||
const poll = async () => {
|
||||
if (stopped) return;
|
||||
|
||||
// Check timeout
|
||||
if (Date.now() - startTime > timeoutMs) {
|
||||
stop();
|
||||
reject(new Error(`Task ${taskId} polling timed out after ${timeoutMs / 1000}s`));
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await api.get<TaskPollResult>(`/tasks/${taskId}`);
|
||||
const data = response.data;
|
||||
consecutiveErrors = 0;
|
||||
|
||||
if (data.status !== 'running') {
|
||||
stop();
|
||||
resolve(data);
|
||||
}
|
||||
// status === 'running': continue polling
|
||||
} catch (err: any) {
|
||||
consecutiveErrors++;
|
||||
|
||||
if (err.response?.status === 404) {
|
||||
// Task not found — may be registering (race) or expired
|
||||
// Wait up to 10s before giving up
|
||||
if (Date.now() - startTime > 10000) {
|
||||
stop();
|
||||
reject(new Error(`Task ${taskId} not found — may have expired`));
|
||||
}
|
||||
// else: keep polling, task might still be registering
|
||||
return;
|
||||
}
|
||||
|
||||
if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) {
|
||||
stop();
|
||||
reject(new Error(`Task ${taskId} polling failed after ${MAX_CONSECUTIVE_ERRORS} consecutive errors`));
|
||||
}
|
||||
// else: transient error, keep polling
|
||||
}
|
||||
};
|
||||
|
||||
// Listen for WS hint events to trigger immediate poll
|
||||
const wsHint = (event: Event) => {
|
||||
const detail = (event as CustomEvent).detail;
|
||||
if (detail?.task_id === taskId) {
|
||||
poll(); // immediate poll on WS hint
|
||||
}
|
||||
};
|
||||
|
||||
window.addEventListener('ws:task_completed', wsHint);
|
||||
window.addEventListener('ws:task_failed', wsHint);
|
||||
window.addEventListener('ws:task_cancelled', wsHint);
|
||||
|
||||
// Start polling interval
|
||||
timer = setInterval(poll, intervalMs);
|
||||
// Poll immediately
|
||||
poll();
|
||||
});
|
||||
}
|
||||
|
|
@ -14,6 +14,7 @@ import {
|
|||
Bot
|
||||
} from 'lucide-react';
|
||||
import { toastService } from '@/lib/toast';
|
||||
import { waitForTaskResult } from '@/lib/taskPolling';
|
||||
import { Button } from '@/components/ui/button';
|
||||
import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs';
|
||||
import { Dialog, DialogContent, DialogDescription, DialogFooter, DialogHeader, DialogTitle } from '@/components/ui/dialog';
|
||||
|
|
@ -1572,52 +1573,35 @@ const FocusGroupSession = () => {
|
|||
if (taskId) themeGenerationControls.setTaskId(taskId);
|
||||
|
||||
if (response.status === 202 && taskId) {
|
||||
await new Promise<void>((resolve) => {
|
||||
const handleCompleted = (event: CustomEvent) => {
|
||||
const detail = event.detail;
|
||||
if (detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
if (detail.themes) {
|
||||
setThemes(prev => [...prev, ...detail.themes]);
|
||||
setTimeout(() => {
|
||||
themeGenerationControls.completeGeneration();
|
||||
toastService.success(`Generated ${detail.themes.length} key themes`, {
|
||||
description: "New themes have been added to the analysis."
|
||||
});
|
||||
}, 3000);
|
||||
} else {
|
||||
setTimeout(() => {
|
||||
themeGenerationControls.completeGeneration();
|
||||
toastService.warning("No new themes were generated", {
|
||||
description: "Try again when the discussion has more content."
|
||||
});
|
||||
}, 3000);
|
||||
}
|
||||
resolve();
|
||||
};
|
||||
const handleFailed = (event: CustomEvent) => {
|
||||
if (event.detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
themeGenerationControls.failGeneration(event.detail.message || 'Failed to generate key themes');
|
||||
toastService.error("Failed to generate key themes", {
|
||||
description: "There was an error analyzing the discussion. Please try again."
|
||||
});
|
||||
resolve();
|
||||
};
|
||||
const handleCancelled = (event: CustomEvent) => {
|
||||
if (event.detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
resolve();
|
||||
};
|
||||
const cleanup = () => {
|
||||
window.removeEventListener('ws:task_completed', handleCompleted as EventListener);
|
||||
window.removeEventListener('ws:task_failed', handleFailed as EventListener);
|
||||
window.removeEventListener('ws:task_cancelled', handleCancelled as EventListener);
|
||||
};
|
||||
window.addEventListener('ws:task_completed', handleCompleted as EventListener);
|
||||
window.addEventListener('ws:task_failed', handleFailed as EventListener);
|
||||
window.addEventListener('ws:task_cancelled', handleCancelled as EventListener);
|
||||
});
|
||||
if (themeGenerationControls.setTaskId) themeGenerationControls.setTaskId(taskId);
|
||||
|
||||
const taskResult = await waitForTaskResult(taskId);
|
||||
if (taskResult.status === 'completed') {
|
||||
const themes = taskResult.result?.themes;
|
||||
if (themes && themes.length > 0) {
|
||||
setThemes(prevThemes => [...prevThemes, ...themes]);
|
||||
setTimeout(() => {
|
||||
themeGenerationControls.completeGeneration();
|
||||
toastService.success(`Generated ${themes.length} key themes`, {
|
||||
description: "New themes have been added to the analysis."
|
||||
});
|
||||
}, 3000);
|
||||
} else {
|
||||
setTimeout(() => {
|
||||
themeGenerationControls.completeGeneration();
|
||||
toastService.warning("No new themes were generated", {
|
||||
description: "Try again when the discussion has more content."
|
||||
});
|
||||
}, 3000);
|
||||
}
|
||||
} else if (taskResult.status === 'failed') {
|
||||
themeGenerationControls.failGeneration(taskResult.error || 'Failed');
|
||||
toastService.error("Failed to generate key themes", {
|
||||
description: taskResult.error || "There was an error analyzing the discussion."
|
||||
});
|
||||
}
|
||||
// cancelled: silently stop
|
||||
return;
|
||||
} else if (response.data && response.data.themes) {
|
||||
// Fallback: sync response
|
||||
setThemes(prevThemes => [...prevThemes, ...response.data.themes]);
|
||||
|
|
@ -2099,31 +2083,14 @@ const FocusGroupSession = () => {
|
|||
const res = await focusGroupsApi.describeAsset(id, assetFilename);
|
||||
if (res.status === 202 && res.data?.task_id) {
|
||||
const taskId = res.data.task_id;
|
||||
return new Promise<typeof res>((resolve, reject) => {
|
||||
const cleanup = () => {
|
||||
window.removeEventListener('ws:task_completed', onCompleted as EventListener);
|
||||
window.removeEventListener('ws:task_failed', onFailed as EventListener);
|
||||
window.removeEventListener('ws:task_cancelled', onCancelled as EventListener);
|
||||
};
|
||||
const onCompleted = (e: CustomEvent) => {
|
||||
if (e.detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
resolve({ ...res, data: { ...res.data, description: e.detail.description } });
|
||||
};
|
||||
const onFailed = (e: CustomEvent) => {
|
||||
if (e.detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
reject(new Error(e.detail.message || 'Description failed'));
|
||||
};
|
||||
const onCancelled = (e: CustomEvent) => {
|
||||
if (e.detail.task_id !== taskId) return;
|
||||
cleanup();
|
||||
resolve({ ...res, data: { ...res.data, description: null } });
|
||||
};
|
||||
window.addEventListener('ws:task_completed', onCompleted as EventListener);
|
||||
window.addEventListener('ws:task_failed', onFailed as EventListener);
|
||||
window.addEventListener('ws:task_cancelled', onCancelled as EventListener);
|
||||
});
|
||||
const taskResult = await waitForTaskResult(taskId);
|
||||
if (taskResult.status === 'completed') {
|
||||
return { ...res, data: { ...res.data, description: taskResult.result?.description } };
|
||||
} else if (taskResult.status === 'failed') {
|
||||
throw new Error(taskResult.error || 'Description failed');
|
||||
} else {
|
||||
return { ...res, data: { ...res.data, description: null } }; // cancelled
|
||||
}
|
||||
}
|
||||
return res;
|
||||
})();
|
||||
|
|
|
|||
|
|
@ -39,8 +39,7 @@ import { Label } from "@/components/ui/label";
|
|||
import { Checkbox } from "@/components/ui/checkbox";
|
||||
import { Persona } from '@/types/persona';
|
||||
import { usePersonaStorage } from '@/hooks/usePersonaStorage';
|
||||
import { useCancellableGeneration } from '@/hooks/useCancellableGeneration';
|
||||
import { getSocket } from '@/services/websocketServiceNew';
|
||||
import { useTaskPolling } from '@/hooks/useTaskPolling';
|
||||
import { personasApi, aiPersonasApi, foldersApi } from '@/lib/api';
|
||||
import { toastService } from '@/lib/toast';
|
||||
import ProgressModal from '@/components/ui/ProgressModal';
|
||||
|
|
@ -87,9 +86,8 @@ const SyntheticUsers = () => {
|
|||
const [mode, setMode] = useState<'view' | 'create'>('view');
|
||||
const [creationMode, setCreationMode] = useState<'manual' | 'ai'>('ai');
|
||||
|
||||
// WebSocket and cancellable generation for summary generation
|
||||
const socket = getSocket();
|
||||
const [summaryGenerationState, summaryGenerationControls] = useCancellableGeneration('persona summary generation', socket);
|
||||
// Polling-based cancellable generation for summary generation
|
||||
const [summaryGenerationState, summaryGenerationControls] = useTaskPolling('persona summary generation');
|
||||
const [isSummaryProgressModalOpen, setIsSummaryProgressModalOpen] = useState(false);
|
||||
const [summaryProgressDescription, setSummaryProgressDescription] = useState('');
|
||||
// Bulk export no longer needs cancellable generation - it's instant
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue