import time from typing import Optional from opentelemetry import metrics # from opentelemetry.exporter.prometheus import PrometheusMetricReader # Disabled for local dev from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource from prometheus_client import start_http_server from ..core.config import settings from ..core.logging import get_logger logger = get_logger(__name__) class ApplicationMetrics: """Central metrics collection for the accessible video platform""" def __init__(self): self.setup_metrics() # Job processing metrics self.job_total_counter = self.meter.create_counter( name="jobs_total", description="Total number of jobs created", unit="1" ) self.job_status_gauge = self.meter.create_up_down_counter( name="jobs_by_status", description="Current number of jobs by status", unit="1" ) self.job_processing_duration = self.meter.create_histogram( name="job_processing_duration_seconds", description="Time taken to process jobs through each stage", unit="s" ) # AI service metrics self.ai_requests_counter = self.meter.create_counter( name="ai_requests_total", description="Total AI service requests", unit="1" ) self.ai_request_duration = self.meter.create_histogram( name="ai_request_duration_seconds", description="Duration of AI service requests", unit="s" ) self.ai_confidence_histogram = self.meter.create_histogram( name="ai_confidence_score", description="AI confidence scores distribution", unit="1" ) # Storage metrics self.storage_operations_counter = self.meter.create_counter( name="storage_operations_total", description="Total storage operations", unit="1" ) self.storage_operation_duration = self.meter.create_histogram( name="storage_operation_duration_seconds", description="Duration of storage operations", unit="s" ) # Queue metrics self.queue_tasks_counter = self.meter.create_counter( name="celery_tasks_total", description="Total Celery tasks processed", unit="1" ) self.queue_task_duration = self.meter.create_histogram( name="celery_task_duration_seconds", description="Duration of Celery task execution", unit="s" ) # User activity metrics self.auth_attempts_counter = self.meter.create_counter( name="auth_attempts_total", description="Total authentication attempts", unit="1" ) self.active_users_gauge = self.meter.create_up_down_counter( name="active_users", description="Number of currently active users", unit="1" ) # Rate limiting metrics self.rate_limit_counter = self.meter.create_counter( name="rate_limit_checks_total", description="Total rate limit checks performed", unit="1" ) # Request validation metrics self.validation_counter = self.meter.create_counter( name="request_validation_total", description="Total request validations performed", unit="1" ) self.validation_duration = self.meter.create_histogram( name="request_validation_duration_seconds", description="Duration of request validation", unit="s" ) def setup_metrics(self): """Initialize metrics provider and meter""" resource = Resource.create({ "service.name": "accessible-video-api", "service.version": "1.0.0", "deployment.environment": settings.app_env, }) # Set up Prometheus metrics reader (disabled for local dev) # prometheus_reader = PrometheusMetricReader() # Create metrics provider provider = MeterProvider( resource=resource, # metric_readers=[prometheus_reader] # Disabled for local dev ) metrics.set_meter_provider(provider) # Get meter for this service self.meter = metrics.get_meter("accessible-video-api") logger.info("Metrics provider initialized with Prometheus exporter") def start_prometheus_server(self, port: int = 8001): """Start Prometheus metrics HTTP server""" try: start_http_server(port) logger.info(f"Prometheus metrics server started on port {port}") except Exception as e: logger.error(f"Failed to start Prometheus server: {e}") # Job metrics methods def record_job_created(self, client_id: str, language: str): """Record a new job creation""" self.job_total_counter.add( 1, attributes={ "client_id": client_id, "source_language": language, "action": "created" } ) def record_job_status_change(self, job_id: str, old_status: str, new_status: str): """Record job status change""" # Decrement old status count self.job_status_gauge.add( -1, attributes={"status": old_status} ) # Increment new status count self.job_status_gauge.add( 1, attributes={"status": new_status} ) def record_job_processing_time(self, stage: str, duration_seconds: float, job_id: str): """Record time taken for job processing stage""" self.job_processing_duration.record( duration_seconds, attributes={ "stage": stage, "job_id": job_id } ) # AI service metrics methods def record_ai_request(self, service: str, operation: str, language: Optional[str] = None): """Record AI service request""" attributes = { "service": service, "operation": operation } if language: attributes["language"] = language self.ai_requests_counter.add(1, attributes=attributes) def record_ai_request_duration(self, service: str, operation: str, duration_seconds: float): """Record AI request duration""" self.ai_request_duration.record( duration_seconds, attributes={ "service": service, "operation": operation } ) def record_ai_confidence(self, confidence: float, service: str): """Record AI confidence score""" self.ai_confidence_histogram.record( confidence, attributes={"service": service} ) # Storage metrics methods def record_storage_operation(self, operation: str, file_type: str, success: bool): """Record storage operation""" self.storage_operations_counter.add( 1, attributes={ "operation": operation, "file_type": file_type, "result": "success" if success else "error" } ) def record_storage_duration(self, operation: str, duration_seconds: float): """Record storage operation duration""" self.storage_operation_duration.record( duration_seconds, attributes={"operation": operation} ) # Queue metrics methods def record_celery_task(self, task_name: str, queue: str, result: str): """Record Celery task execution""" self.queue_tasks_counter.add( 1, attributes={ "task_name": task_name, "queue": queue, "result": result } ) def record_celery_task_duration(self, task_name: str, duration_seconds: float): """Record Celery task duration""" self.queue_task_duration.record( duration_seconds, attributes={"task_name": task_name} ) # Auth metrics methods def record_auth_attempt(self, result: str, user_role: Optional[str] = None): """Record authentication attempt""" attributes = {"result": result} if user_role: attributes["user_role"] = user_role self.auth_attempts_counter.add(1, attributes=attributes) def update_active_users(self, count_change: int, user_role: str): """Update active users count""" self.active_users_gauge.add( count_change, attributes={"user_role": user_role} ) # Global metrics instance app_metrics = ApplicationMetrics() class MetricsTimer: """Context manager for timing operations""" def __init__(self, metric_recorder, *args, **kwargs): self.metric_recorder = metric_recorder self.args = args self.kwargs = kwargs self.start_time = None def __enter__(self): self.start_time = time.time() return self def __exit__(self, exc_type, exc_val, exc_tb): if self.start_time: duration = time.time() - self.start_time self.metric_recorder(duration, *self.args, **self.kwargs) # Convenience functions for common metrics patterns def time_job_processing(stage: str, job_id: str): """Time a job processing stage""" return MetricsTimer( app_metrics.record_job_processing_time, stage, job_id ) def time_ai_request(service: str, operation: str): """Time an AI service request""" return MetricsTimer( app_metrics.record_ai_request_duration, service, operation ) def time_storage_operation(operation: str): """Time a storage operation""" return MetricsTimer( app_metrics.record_storage_duration, operation ) def time_celery_task(task_name: str): """Time a Celery task execution""" return MetricsTimer( app_metrics.record_celery_task_duration, task_name ) def track_rate_limit_metrics(identifier: str, is_allowed: bool, current_requests: int, limit: int): """Track rate limiting metrics""" if hasattr(app_metrics, 'rate_limit_counter'): app_metrics.rate_limit_counter.add( 1, attributes={ "identifier_type": identifier.split(":")[0] if ":" in identifier else "unknown", "is_allowed": str(is_allowed), "status": "allowed" if is_allowed else "blocked" } ) def track_validation_metrics(endpoint: str, method: str, is_valid: bool, validation_time: float, error_types: list): """Track request validation metrics""" if hasattr(app_metrics, 'validation_counter'): app_metrics.validation_counter.add( 1, attributes={ "endpoint": endpoint, "method": method, "is_valid": str(is_valid), "error_types": ",".join(error_types) if error_types else "none" } ) if hasattr(app_metrics, 'validation_duration'): app_metrics.validation_duration.record( validation_time, attributes={ "endpoint": endpoint, "method": method } )