Source code for litestar_flags.analytics.exporters.otel

"""OpenTelemetry analytics exporter for feature flag metrics.

This module provides OpenTelemetry integration for exporting feature flag
analytics as spans and metrics, enabling distributed tracing and observability.

Example:
    Basic usage with OpenTelemetry::

        from litestar_flags.analytics.exporters.otel import OTelAnalyticsExporter

        # Create the exporter
        exporter = OTelAnalyticsExporter()

        # Record evaluation events
        await exporter.record(event)

        # Flush batched events
        await exporter.flush()

    Custom tracer and meter::

        from opentelemetry import trace, metrics

        tracer = trace.get_tracer("my-app")
        meter = metrics.get_meter("my-app")

        exporter = OTelAnalyticsExporter(tracer=tracer, meter=meter)

    Using with the existing OTelHook::

        from litestar_flags.contrib.otel import OTelHook
        from litestar_flags.analytics.exporters.otel import OTelAnalyticsExporter

        hook = OTelHook()
        exporter = OTelAnalyticsExporter(otel_hook=hook)

Requires:
    opentelemetry-api>=1.20.0

"""

from __future__ import annotations

import asyncio
from collections import deque
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from opentelemetry.metrics import Counter, Histogram, Meter
    from opentelemetry.trace import Tracer

    from litestar_flags.analytics.models import FlagEvaluationEvent
    from litestar_flags.contrib.otel import OTelHook

# Handle optional opentelemetry import
try:
    from opentelemetry import metrics as otel_metrics
    from opentelemetry import trace as otel_trace
    from opentelemetry.trace import SpanKind, StatusCode

    OTEL_AVAILABLE = True
except ImportError:
    OTEL_AVAILABLE = False
    otel_trace = None  # type: ignore[assignment]
    otel_metrics = None  # type: ignore[assignment]
    SpanKind = None  # type: ignore[assignment]
    StatusCode = None  # type: ignore[assignment]


__all__ = ["OTEL_AVAILABLE", "OTelAnalyticsExporter"]

# Span names for analytics events
SPAN_NAME_ANALYTICS_EVENT = "feature_flag.analytics.event"
SPAN_NAME_ANALYTICS_BATCH = "feature_flag.analytics.batch"

# Semantic conventions for analytics spans
ATTR_FLAG_KEY = "feature_flag.key"
ATTR_FLAG_VALUE = "feature_flag.value"
ATTR_FLAG_VARIANT = "feature_flag.variant"
ATTR_FLAG_REASON = "feature_flag.reason"
ATTR_TARGETING_KEY = "feature_flag.targeting_key"
ATTR_EVALUATION_DURATION_MS = "feature_flag.evaluation_duration_ms"
ATTR_EVENT_TIMESTAMP = "feature_flag.event_timestamp"
ATTR_BATCH_SIZE = "feature_flag.analytics.batch_size"

# Metric names for analytics
METRIC_EVENTS_RECORDED = "feature_flag.analytics.events_recorded"
METRIC_BATCH_SIZE = "feature_flag.analytics.batch_size"

# Default batch configuration
DEFAULT_BATCH_SIZE = 100
DEFAULT_FLUSH_INTERVAL_SECONDS = 30.0


[docs] class OTelAnalyticsExporter: """OpenTelemetry exporter for feature flag analytics. Exports feature flag evaluation events as OpenTelemetry spans and metrics. Implements the AnalyticsCollector protocol for seamless integration with the analytics pipeline. This exporter can optionally wrap an existing OTelHook instance to share tracer and meter configurations, or create its own OpenTelemetry instruments. Metrics exported: - feature_flag.analytics.events_recorded: Counter of recorded analytics events Labels: flag_key, reason - feature_flag.analytics.batch_size: Histogram of batch sizes when flushing Labels: (none) Attributes: tracer: The OpenTelemetry tracer for creating spans. meter: The OpenTelemetry meter for recording metrics. batch_size: Maximum number of events to buffer before auto-flush. flush_interval: Time in seconds between automatic flushes. Example: >>> exporter = OTelAnalyticsExporter(batch_size=50) >>> await exporter.record(evaluation_event) >>> # Events are batched and flushed automatically >>> await exporter.close() """
[docs] def __init__( self, tracer: Tracer | None = None, meter: Meter | None = None, otel_hook: OTelHook | None = None, tracer_name: str = "litestar_flags.analytics", meter_name: str = "litestar_flags.analytics", batch_size: int = DEFAULT_BATCH_SIZE, flush_interval: float = DEFAULT_FLUSH_INTERVAL_SECONDS, record_values: bool = False, create_spans: bool = True, ) -> None: """Initialize the OpenTelemetry analytics exporter. Args: tracer: Custom tracer instance. If not provided, uses otel_hook's tracer or creates one using tracer_name. meter: Custom meter instance. If not provided, uses otel_hook's meter or creates one using meter_name. otel_hook: Existing OTelHook instance to share tracer/meter from. If provided, tracer and meter arguments are ignored. tracer_name: Name for the default tracer if none provided. meter_name: Name for the default meter if none provided. batch_size: Maximum number of events to buffer before auto-flush. Set to 0 to disable batching. flush_interval: Time in seconds between automatic flushes. Set to 0 to disable automatic flushing. record_values: Whether to record flag values in spans. Disabled by default for privacy/security reasons. create_spans: Whether to create spans for each event. Set to False to only record metrics without span overhead. Raises: ImportError: If opentelemetry-api is not installed. """ if not OTEL_AVAILABLE or otel_trace is None or otel_metrics is None: raise ImportError( "opentelemetry-api is required for OTelAnalyticsExporter. " "Install it with: pip install litestar-flags[otel]" ) # Use otel_hook's instruments if provided, otherwise create our own if otel_hook is not None: self._tracer: Tracer = otel_hook.tracer self._meter: Meter = otel_hook.meter else: self._tracer = tracer or otel_trace.get_tracer(tracer_name) self._meter = meter or otel_metrics.get_meter(meter_name) self._batch_size = batch_size self._flush_interval = flush_interval self._record_values = record_values self._create_spans = create_spans # Event buffer for batching self._buffer: deque[FlagEvaluationEvent] = deque() self._buffer_lock = asyncio.Lock() # Background flush task self._flush_task: asyncio.Task[None] | None = None self._closed = False # Create analytics-specific metrics instruments self._events_recorded_counter: Counter = self._meter.create_counter( # type: ignore[assignment] name=METRIC_EVENTS_RECORDED, unit="1", description="Number of feature flag analytics events recorded", ) self._batch_size_histogram: Histogram = self._meter.create_histogram( # type: ignore[assignment] name=METRIC_BATCH_SIZE, unit="1", description="Size of analytics event batches when flushed", )
@property def tracer(self) -> Tracer: """Get the tracer instance. Returns: The OpenTelemetry tracer used by this exporter. """ return self._tracer @property def meter(self) -> Meter: """Get the meter instance. Returns: The OpenTelemetry meter used by this exporter. """ return self._meter @property def events_recorded_counter(self) -> Counter: """Get the events recorded counter metric. Returns: The Counter metric for recorded analytics events. """ return self._events_recorded_counter @property def batch_size_histogram(self) -> Histogram: """Get the batch size histogram metric. Returns: The Histogram metric for batch sizes. """ return self._batch_size_histogram @property def buffer_size(self) -> int: """Get the current number of buffered events. Returns: Number of events currently in the buffer. """ return len(self._buffer)
[docs] async def record(self, event: FlagEvaluationEvent) -> None: """Record a flag evaluation event. Buffers the event and flushes when batch_size is reached. This method implements the AnalyticsCollector protocol. Args: event: The evaluation event to record. """ if self._closed: return async with self._buffer_lock: self._buffer.append(event) # Start background flush task if not running and interval is set if self._flush_task is None and self._flush_interval > 0: self._flush_task = asyncio.create_task(self._background_flush()) # Auto-flush if batch size reached if self._batch_size > 0 and len(self._buffer) >= self._batch_size: await self._flush_buffer()
[docs] async def flush(self) -> None: """Flush any buffered events. Forces immediate processing of all buffered events, creating spans and recording metrics for each event. """ async with self._buffer_lock: await self._flush_buffer()
[docs] async def close(self) -> None: """Close the exporter and release resources. Flushes remaining events and cancels the background flush task. """ self._closed = True # Cancel background flush task if self._flush_task is not None: self._flush_task.cancel() try: await self._flush_task except asyncio.CancelledError: pass self._flush_task = None # Flush remaining events await self.flush()
async def _background_flush(self) -> None: """Background task for periodic flushing.""" while not self._closed: try: await asyncio.sleep(self._flush_interval) if not self._closed: await self.flush() except asyncio.CancelledError: break except Exception: # noqa: S110 # Log error but continue running pass async def _flush_buffer(self) -> None: """Flush the current buffer. Must be called with _buffer_lock held. """ if not self._buffer: return # Get all events from buffer events = list(self._buffer) self._buffer.clear() # Record batch size metric batch_size = len(events) self._batch_size_histogram.record(batch_size) # Create a parent span for the batch if creating spans if self._create_spans and batch_size > 1: with self._tracer.start_as_current_span( name=SPAN_NAME_ANALYTICS_BATCH, kind=SpanKind.INTERNAL, attributes={ATTR_BATCH_SIZE: batch_size}, ) as batch_span: for event in events: self._process_event(event) batch_span.set_status(StatusCode.OK) else: # Process events without batch span for event in events: self._process_event(event) def _process_event(self, event: FlagEvaluationEvent) -> None: """Process a single analytics event. Creates a span (if enabled) and records metrics for the event. Args: event: The evaluation event to process. """ flag_key = event.flag_key reason = event.reason.value if hasattr(event.reason, "value") else str(event.reason) # Record event counter metric metric_attributes = { ATTR_FLAG_KEY: flag_key, ATTR_FLAG_REASON: reason, } self._events_recorded_counter.add(1, metric_attributes) # Create span if enabled if self._create_spans: self._create_event_span(event) def _create_event_span(self, event: FlagEvaluationEvent) -> None: """Create a span for an analytics event. Args: event: The evaluation event to create a span for. """ flag_key = event.flag_key reason = event.reason.value if hasattr(event.reason, "value") else str(event.reason) # Build span attributes attributes: dict[str, Any] = { ATTR_FLAG_KEY: flag_key, ATTR_FLAG_REASON: reason, ATTR_EVENT_TIMESTAMP: event.timestamp.isoformat(), } if event.variant: attributes[ATTR_FLAG_VARIANT] = event.variant if event.targeting_key: attributes[ATTR_TARGETING_KEY] = event.targeting_key if event.evaluation_duration_ms > 0: attributes[ATTR_EVALUATION_DURATION_MS] = event.evaluation_duration_ms if self._record_values: # Convert value to string for span attribute value_str = str(event.value) if len(value_str) <= 256: # Limit attribute size attributes[ATTR_FLAG_VALUE] = value_str # Create and immediately end the span (event is already complete) with self._tracer.start_as_current_span( name=SPAN_NAME_ANALYTICS_EVENT, kind=SpanKind.INTERNAL, attributes=attributes, ) as span: span.set_status(StatusCode.OK)
[docs] def record_sync(self, event: FlagEvaluationEvent) -> None: """Record an event synchronously without batching. This method processes the event immediately without buffering. Useful for low-volume scenarios or when immediate recording is required. Args: event: The evaluation event to record. """ if self._closed: return self._process_event(event)
[docs] async def record_batch(self, events: list[FlagEvaluationEvent]) -> None: """Record multiple events in a batch. Adds all events to the buffer and triggers a flush. Args: events: List of evaluation events to record. """ if self._closed or not events: return async with self._buffer_lock: self._buffer.extend(events) await self._flush_buffer()
[docs] def create_exporter_from_hook( otel_hook: OTelHook, batch_size: int = DEFAULT_BATCH_SIZE, flush_interval: float = DEFAULT_FLUSH_INTERVAL_SECONDS, record_values: bool = False, create_spans: bool = True, ) -> OTelAnalyticsExporter: """Create an OTelAnalyticsExporter from an existing OTelHook. This factory function creates an analytics exporter that shares the tracer and meter from an existing OTelHook instance, ensuring consistent instrumentation across flag evaluation and analytics. Args: otel_hook: The OTelHook instance to share tracer/meter from. batch_size: Maximum number of events to buffer before auto-flush. flush_interval: Time in seconds between automatic flushes. record_values: Whether to record flag values in spans. create_spans: Whether to create spans for each event. Returns: A configured OTelAnalyticsExporter instance. Raises: ImportError: If opentelemetry-api is not installed. Example: >>> from litestar_flags.contrib.otel import OTelHook >>> hook = OTelHook() >>> exporter = create_exporter_from_hook(hook) >>> await exporter.record(event) """ if not OTEL_AVAILABLE: raise ImportError( "opentelemetry-api is required for create_exporter_from_hook. " "Install it with: pip install litestar-flags[otel]" ) return OTelAnalyticsExporter( otel_hook=otel_hook, batch_size=batch_size, flush_interval=flush_interval, record_values=record_values, create_spans=create_spans, )