""" Configuration API endpoints for model selection and system settings """ import logging from quart import Blueprint, jsonify, request, g from ..auth.middleware import dev_mode_bypass, get_user_id from ..jobs.models import ModelConfiguration from ..jobs.manager import JobManager logger = logging.getLogger(__name__) config_bp = Blueprint('config', __name__, url_prefix='/api/config') @config_bp.route('/models', methods=['GET']) @dev_mode_bypass async def get_available_models(): """ Get list of available models with pricing and capabilities Returns: List of available models with metadata """ try: models = JobManager.get_available_models() return jsonify({ 'models': [model.to_dict() for model in models] }) except Exception as e: logger.error(f"Failed to get available models: {e}") return jsonify({ 'error': 'configuration_error', 'message': 'Failed to retrieve available models' }), 500 @config_bp.route('/defaults', methods=['GET']) @dev_mode_bypass async def get_default_config(): """ Get default model configuration Returns: Default model configuration settings """ try: default_config = JobManager.get_default_model_config() return jsonify({ 'config': default_config.to_dict() }) except Exception as e: logger.error(f"Failed to get default config: {e}") return jsonify({ 'error': 'configuration_error', 'message': 'Failed to retrieve default configuration' }), 500 @config_bp.route('/estimate', methods=['POST']) @dev_mode_bypass async def estimate_processing_cost(): """ Estimate processing cost for given models and file size Expects: { "modelConfig": { "primaryModels": ["model1", "model2"], "consolidationModel": "model3" }, "fileSizeBytes": 12345, "estimatedTokens": 10000 } Returns: Cost breakdown by model and total estimated cost """ try: data = await request.get_json() if not data: return jsonify({ 'error': 'invalid_request', 'message': 'Request body required' }), 400 model_config_data = data.get('modelConfig', {}) file_size = data.get('fileSizeBytes', 0) estimated_tokens = data.get('estimatedTokens') # If no token estimate provided, estimate based on file size if not estimated_tokens: # Rough heuristic: 4 characters per token, with document structure overhead estimated_tokens = min(file_size // 3, 100000) # Cap at 100k tokens # Parse model configuration try: model_config = ModelConfiguration.from_dict(model_config_data) except Exception as e: return jsonify({ 'error': 'invalid_model_config', 'message': f'Invalid model configuration: {e}' }), 400 # Get all models to estimate all_models = model_config.primary_models + [model_config.consolidation_model] # Estimate cost using provider manager from ..jobs.manager import JobManager job_manager = JobManager.get_instance() cost_breakdown = job_manager.provider_manager.estimate_total_cost( model_keys=all_models, estimated_input_tokens=estimated_tokens, estimated_output_tokens=estimated_tokens // 2 # Assume 50% of input as output ) # Separate primary and consolidation costs primary_cost = sum( cost_breakdown.get(model, 0) for model in model_config.primary_models ) consolidation_cost = cost_breakdown.get(model_config.consolidation_model, 0) return jsonify({ 'estimatedTokens': estimated_tokens, 'costBreakdown': { 'primaryModels': { model: cost_breakdown.get(model, 0) for model in model_config.primary_models }, 'consolidationModel': { model_config.consolidation_model: consolidation_cost }, 'primaryTotal': primary_cost, 'consolidationTotal': consolidation_cost, 'grandTotal': cost_breakdown.get('total', 0) } }) except Exception as e: logger.error(f"Cost estimation error: {e}") return jsonify({ 'error': 'estimation_error', 'message': 'Failed to estimate processing cost' }), 500 @config_bp.route('/validate', methods=['POST']) @dev_mode_bypass async def validate_model_config(): """ Validate a model configuration Expects: { "modelConfig": { "primaryModels": ["model1", "model2"], "consolidationModel": "model3", "minimumSuccessThreshold": 1 } } Returns: Validation result with any warnings or errors """ try: data = await request.get_json() if not data: return jsonify({ 'error': 'invalid_request', 'message': 'Request body required' }), 400 model_config_data = data.get('modelConfig', {}) try: model_config = ModelConfiguration.from_dict(model_config_data) except Exception as e: return jsonify({ 'valid': False, 'error': f'Invalid model configuration: {e}' }), 400 # Validate models exist available_models = [model.key for model in JobManager.get_available_models()] warnings = [] errors = [] # Check primary models for model in model_config.primary_models: if model not in available_models: errors.append(f"Primary model '{model}' is not available") # Check consolidation model if model_config.consolidation_model not in available_models: errors.append(f"Consolidation model '{model_config.consolidation_model}' is not available") # Check minimum success threshold if model_config.minimum_success_threshold > len(model_config.primary_models): warnings.append( f"Minimum success threshold ({model_config.minimum_success_threshold}) " f"is higher than number of primary models ({len(model_config.primary_models)})" ) # Check for duplicate models if len(set(model_config.primary_models)) != len(model_config.primary_models): warnings.append("Duplicate models detected in primary models list") # Check if consolidation model is also in primary models if model_config.consolidation_model in model_config.primary_models: warnings.append("Consolidation model is also used as a primary model") return jsonify({ 'valid': len(errors) == 0, 'errors': errors, 'warnings': warnings, 'modelCount': { 'primary': len(model_config.primary_models), 'consolidation': 1, 'total': len(set(model_config.primary_models + [model_config.consolidation_model])) } }) except Exception as e: logger.error(f"Model config validation error: {e}") return jsonify({ 'error': 'validation_error', 'message': 'Failed to validate model configuration' }), 500 @config_bp.route('/system', methods=['GET']) @dev_mode_bypass async def get_system_info(): """ Get system configuration and status information Returns: System information for admin/debugging purposes """ try: from ..config_runtime import server_config from ..jobs.manager import JobManager job_manager = JobManager.get_instance() # Get system stats queue_size = await job_manager.get_queue_size() active_jobs = await job_manager.get_active_jobs_count() return jsonify({ 'system': { 'devMode': server_config.DEV_MODE, 'maxConcurrentJobs': server_config.MAX_CONCURRENT_JOBS, 'maxUploadSizeMB': server_config.MAX_UPLOAD_SIZE_MB, 'fileRetentionHours': server_config.FILE_RETENTION_HOURS, 'allowedExtensions': list(server_config.ALLOWED_EXTENSIONS) }, 'queue': { 'pending': queue_size, 'active': active_jobs, 'maxConcurrent': server_config.MAX_CONCURRENT_JOBS } }) except Exception as e: logger.error(f"Failed to get system info: {e}") return jsonify({ 'error': 'system_error', 'message': 'Failed to retrieve system information' }), 500