#!/usr/bin/env python3 """ Memory Manager - Prevents memory exhaustion during concurrent processing """ import psutil import os import time import logging from functools import wraps class MemoryManager: """Memory management utility to prevent system crashes""" def __init__(self, max_memory_percent=80, max_swap_percent=80): """ Initialize memory manager Args: max_memory_percent: Maximum memory usage percentage before throttling max_swap_percent: Maximum swap usage percentage before warning (does not throttle) """ self.max_memory_percent = max_memory_percent self.max_swap_percent = max_swap_percent self.logger = logging.getLogger('master_adapt_detect') def get_memory_usage(self): """Get current memory and swap usage""" memory = psutil.virtual_memory() swap = psutil.swap_memory() return { 'memory_percent': memory.percent, 'memory_available_gb': memory.available / (1024**3), 'swap_percent': swap.percent, 'swap_used_gb': swap.used / (1024**3) } def is_memory_safe(self): """Check if memory usage is within safe limits (only RAM, not swap)""" usage = self.get_memory_usage() # Warn about swap usage but don't block processing if usage['swap_percent'] > self.max_swap_percent: self.logger.warning(f"High swap usage: {usage['swap_percent']:.1f}% - Performance may be degraded") # Only block processing for high RAM usage if usage['memory_percent'] > self.max_memory_percent: self.logger.warning(f"High memory usage: {usage['memory_percent']:.1f}%") return False return True def wait_for_memory_safe(self, timeout=30): """Wait for memory to return to safe levels""" start_time = time.time() while not self.is_memory_safe(): if time.time() - start_time > timeout: self.logger.error("Memory did not return to safe levels within timeout") return False self.logger.info("Waiting for memory to return to safe levels...") time.sleep(1) return True def limit_concurrent_processes(self, max_processes=None): """Calculate safe number of concurrent processes based on memory""" # Always get available memory for logging available_gb = psutil.virtual_memory().available / (1024**3) if max_processes is None: # Conservative estimate based on available memory # Assume each process needs ~2GB for feature processing max_processes = max(1, int(available_gb / 2)) cpu_count = psutil.cpu_count() # Don't exceed CPU count or memory-based limit safe_processes = min(max_processes, cpu_count) self.logger.info(f"Limiting concurrent processes to {safe_processes} (Memory: {available_gb:.1f}GB available)") return safe_processes def memory_safe_execution(memory_manager): """Decorator to ensure memory-safe execution of functions""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): # Check memory before execution if not memory_manager.is_memory_safe(): memory_manager.logger.warning("Memory usage high, waiting before execution...") if not memory_manager.wait_for_memory_safe(): raise MemoryError("Memory usage too high to safely execute function") try: return func(*args, **kwargs) except MemoryError as e: memory_manager.logger.error(f"Memory error in {func.__name__}: {e}") raise finally: # Force garbage collection import gc gc.collect() return wrapper return decorator def reduce_feature_count(features, max_features=10000): """Reduce feature count to prevent memory explosion""" if len(features) > max_features: # Keep best features based on response strength import numpy as np responses = [f.response for f in features] indices = np.argsort(responses)[-max_features:] reduced_features = [features[i] for i in indices] return reduced_features return features