import logging from typing import Optional from opentelemetry import trace # from opentelemetry.exporter.gcp.trace import CloudTraceSpanExporter # Disabled for local dev from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.pymongo import PymongoInstrumentor from opentelemetry.instrumentation.redis import RedisInstrumentor from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from ..core.config import settings logger = logging.getLogger(__name__) def setup_tracing(app_name: str = "accessible-video-api", service_version: str = "1.0.0"): """Initialize OpenTelemetry tracing for the application""" # Create resource with service information resource = Resource.create({ "service.name": app_name, "service.version": service_version, "service.namespace": "accessible-video", "deployment.environment": settings.app_env, }) # Set up tracer provider tracer_provider = TracerProvider(resource=resource) trace.set_tracer_provider(tracer_provider) # Configure span processor and exporter based on environment if settings.app_env == "prod" and settings.gcp_project_id: # Use Google Cloud Trace in production (disabled for local dev) # cloud_trace_exporter = CloudTraceSpanExporter( # project_id=settings.gcp_project_id # ) # span_processor = BatchSpanProcessor(cloud_trace_exporter) # tracer_provider.add_span_processor(span_processor) logger.info("Google Cloud Trace disabled for local dev") elif settings.otel_exporter_otlp_endpoint: # Use OTLP exporter for other observability platforms from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter otlp_exporter = OTLPSpanExporter( endpoint=settings.otel_exporter_otlp_endpoint, headers={}, ) span_processor = BatchSpanProcessor(otlp_exporter) tracer_provider.add_span_processor(span_processor) logger.info(f"Configured OTLP trace exporter: {settings.otel_exporter_otlp_endpoint}") else: # Development mode - use console exporter from opentelemetry.sdk.trace.export import ConsoleSpanExporter console_exporter = ConsoleSpanExporter() span_processor = BatchSpanProcessor(console_exporter) tracer_provider.add_span_processor(span_processor) logger.info("Configured console trace exporter for development") logger.info(f"OpenTelemetry tracing initialized for {app_name}") return tracer_provider def instrument_fastapi_app(app): """Instrument FastAPI application with automatic tracing""" FastAPIInstrumentor.instrument_app( app, tracer_provider=trace.get_tracer_provider(), excluded_urls="health,metrics", # Don't trace health checks ) logger.info("FastAPI instrumentation enabled") def instrument_dependencies(): """Instrument external dependencies for automatic tracing""" # Instrument MongoDB PymongoInstrumentor().instrument( tracer_provider=trace.get_tracer_provider() ) logger.info("MongoDB instrumentation enabled") # Instrument Redis RedisInstrumentor().instrument( tracer_provider=trace.get_tracer_provider() ) logger.info("Redis instrumentation enabled") def get_tracer(name: str = "accessible-video"): """Get a tracer instance for manual instrumentation""" return trace.get_tracer(name) def trace_async_operation(operation_name: str, **attributes): """Decorator for tracing async operations""" def decorator(func): async def wrapper(*args, **kwargs): tracer = get_tracer() with tracer.start_as_current_span( operation_name, attributes=attributes ) as span: try: result = await func(*args, **kwargs) span.set_attribute("operation.result", "success") return result except Exception as e: span.set_attribute("operation.result", "error") span.set_attribute("operation.error_message", str(e)) span.record_exception(e) raise return wrapper return decorator def trace_job_pipeline(job_id: str, pipeline_stage: str): """Decorator for tracing job pipeline stages""" def decorator(func): async def wrapper(*args, **kwargs): tracer = get_tracer() with tracer.start_as_current_span( f"job_pipeline.{pipeline_stage}", attributes={ "job.id": job_id, "job.pipeline_stage": pipeline_stage, } ) as span: try: result = await func(*args, **kwargs) span.set_attribute("job.result", "success") return result except Exception as e: span.set_attribute("job.result", "error") span.set_attribute("job.error_message", str(e)) span.record_exception(e) raise return wrapper return decorator def trace_ai_operation(operation_type: str, language: Optional[str] = None): """Decorator for tracing AI service operations""" def decorator(func): async def wrapper(*args, **kwargs): tracer = get_tracer() span_attributes = { "ai.operation_type": operation_type, "ai.provider": "gemini" if "gemini" in operation_type else "google_translate" } if language: span_attributes["ai.language"] = language with tracer.start_as_current_span( f"ai.{operation_type}", attributes=span_attributes ) as span: try: result = await func(*args, **kwargs) # Add result attributes if available if isinstance(result, dict): if "confidence" in result: span.set_attribute("ai.confidence", result["confidence"]) if "language" in result: span.set_attribute("ai.detected_language", result["language"]) span.set_attribute("ai.result", "success") return result except Exception as e: span.set_attribute("ai.result", "error") span.set_attribute("ai.error_message", str(e)) span.record_exception(e) raise return wrapper return decorator def trace_storage_operation(operation_type: str, file_path: str): """Decorator for tracing storage operations""" def decorator(func): async def wrapper(*args, **kwargs): tracer = get_tracer() with tracer.start_as_current_span( f"storage.{operation_type}", attributes={ "storage.operation": operation_type, "storage.path": file_path, "storage.provider": "gcs" } ) as span: try: result = await func(*args, **kwargs) span.set_attribute("storage.result", "success") if isinstance(result, str) and result.startswith("gs://"): span.set_attribute("storage.result_uri", result) return result except Exception as e: span.set_attribute("storage.result", "error") span.set_attribute("storage.error_message", str(e)) span.record_exception(e) raise return wrapper return decorator class TracingContext: """Context manager for manual span creation with attributes""" def __init__(self, span_name: str, attributes: Optional[dict] = None): self.span_name = span_name self.attributes = attributes or {} self.tracer = get_tracer() self.span = None def __enter__(self): self.span = self.tracer.start_span(self.span_name, attributes=self.attributes) return self.span def __exit__(self, exc_type, exc_val, exc_tb): if exc_type: self.span.set_attribute("error", True) self.span.set_attribute("error_message", str(exc_val)) self.span.record_exception(exc_val) self.span.end() # Convenience functions for common tracing patterns def trace_api_request(endpoint: str, user_id: Optional[str] = None): """Create span for API request with common attributes""" attributes = { "http.route": endpoint, "component": "api" } if user_id: attributes["user.id"] = user_id return TracingContext(f"api.{endpoint.replace('/', '_')}", attributes) def trace_celery_task(task_name: str, job_id: Optional[str] = None): """Create span for Celery task execution""" attributes = { "celery.task_name": task_name, "component": "worker" } if job_id: attributes["job.id"] = job_id return TracingContext(f"celery.{task_name}", attributes)