master_adapt_detect/memory_manager.py
2025-10-01 14:32:55 -05:00

119 lines
No EOL
4.4 KiB
Python

#!/usr/bin/env python3
"""
Memory Manager - Prevents memory exhaustion during concurrent processing
"""
import psutil
import os
import time
import logging
from functools import wraps
class MemoryManager:
"""Memory management utility to prevent system crashes"""
def __init__(self, max_memory_percent=80, max_swap_percent=80):
"""
Initialize memory manager
Args:
max_memory_percent: Maximum memory usage percentage before throttling
max_swap_percent: Maximum swap usage percentage before warning (does not throttle)
"""
self.max_memory_percent = max_memory_percent
self.max_swap_percent = max_swap_percent
self.logger = logging.getLogger('master_adapt_detect')
def get_memory_usage(self):
"""Get current memory and swap usage"""
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
return {
'memory_percent': memory.percent,
'memory_available_gb': memory.available / (1024**3),
'swap_percent': swap.percent,
'swap_used_gb': swap.used / (1024**3)
}
def is_memory_safe(self):
"""Check if memory usage is within safe limits (only RAM, not swap)"""
usage = self.get_memory_usage()
# Warn about swap usage but don't block processing
if usage['swap_percent'] > self.max_swap_percent:
self.logger.warning(f"High swap usage: {usage['swap_percent']:.1f}% - Performance may be degraded")
# Only block processing for high RAM usage
if usage['memory_percent'] > self.max_memory_percent:
self.logger.warning(f"High memory usage: {usage['memory_percent']:.1f}%")
return False
return True
def wait_for_memory_safe(self, timeout=30):
"""Wait for memory to return to safe levels"""
start_time = time.time()
while not self.is_memory_safe():
if time.time() - start_time > timeout:
self.logger.error("Memory did not return to safe levels within timeout")
return False
self.logger.info("Waiting for memory to return to safe levels...")
time.sleep(1)
return True
def limit_concurrent_processes(self, max_processes=None):
"""Calculate safe number of concurrent processes based on memory"""
# Always get available memory for logging
available_gb = psutil.virtual_memory().available / (1024**3)
if max_processes is None:
# Conservative estimate based on available memory
# Assume each process needs ~2GB for feature processing
max_processes = max(1, int(available_gb / 2))
cpu_count = psutil.cpu_count()
# Don't exceed CPU count or memory-based limit
safe_processes = min(max_processes, cpu_count)
self.logger.info(f"Limiting concurrent processes to {safe_processes} (Memory: {available_gb:.1f}GB available)")
return safe_processes
def memory_safe_execution(memory_manager):
"""Decorator to ensure memory-safe execution of functions"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Check memory before execution
if not memory_manager.is_memory_safe():
memory_manager.logger.warning("Memory usage high, waiting before execution...")
if not memory_manager.wait_for_memory_safe():
raise MemoryError("Memory usage too high to safely execute function")
try:
return func(*args, **kwargs)
except MemoryError as e:
memory_manager.logger.error(f"Memory error in {func.__name__}: {e}")
raise
finally:
# Force garbage collection
import gc
gc.collect()
return wrapper
return decorator
def reduce_feature_count(features, max_features=10000):
"""Reduce feature count to prevent memory explosion"""
if len(features) > max_features:
# Keep best features based on response strength
import numpy as np
responses = [f.response for f in features]
indices = np.argsort(responses)[-max_features:]
reduced_features = [features[i] for i in indices]
return reduced_features
return features