Source code for litestar_flags.analytics.exporters.prometheus

"""Prometheus metrics exporter for feature flag analytics.

This module provides Prometheus metrics integration for feature flag evaluations,
enabling monitoring and alerting through Prometheus-compatible systems.

Example:
    Basic usage with Prometheus::

        from litestar_flags.analytics.exporters.prometheus import PrometheusExporter

        # Create the exporter
        exporter = PrometheusExporter()

        # Record evaluation events directly
        await exporter.record(event)

        # Or update from an aggregator
        await exporter.update_from_aggregator(aggregator, ["flag_1", "flag_2"])

    Custom registry::

        from prometheus_client import CollectorRegistry

        registry = CollectorRegistry()
        exporter = PrometheusExporter(registry=registry, prefix="myapp")

Requires:
    prometheus_client>=0.17.0

"""

from __future__ import annotations

import asyncio
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Protocol, cast, runtime_checkable

if TYPE_CHECKING:
    from collections.abc import Sequence

    from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram

    from litestar_flags.analytics.models import FlagEvaluationEvent

# Handle optional prometheus_client import
try:
    from prometheus_client import REGISTRY as PROM_REGISTRY
    from prometheus_client import CollectorRegistry as PromCollectorRegistry
    from prometheus_client import Counter as PromCounter
    from prometheus_client import Gauge as PromGauge
    from prometheus_client import Histogram as PromHistogram

    PROMETHEUS_AVAILABLE = True
except ImportError:
    PROMETHEUS_AVAILABLE = False
    PROM_REGISTRY = None  # type: ignore[assignment]
    PromCollectorRegistry = None  # type: ignore[assignment]
    PromCounter = None  # type: ignore[assignment]
    PromGauge = None  # type: ignore[assignment]
    PromHistogram = None  # type: ignore[assignment]


__all__ = ["PROMETHEUS_AVAILABLE", "PrometheusExporter"]

# Default histogram buckets for evaluation duration (in seconds)
# Ranges from 100 microseconds to 1 second
DEFAULT_DURATION_BUCKETS = (
    0.0001,  # 100us
    0.0005,  # 500us
    0.001,  # 1ms
    0.005,  # 5ms
    0.01,  # 10ms
    0.025,  # 25ms
    0.05,  # 50ms
    0.1,  # 100ms
    0.25,  # 250ms
    0.5,  # 500ms
    1.0,  # 1s
)


@runtime_checkable
class MetricsProvider(Protocol):
    """Protocol for objects that provide flag metrics.

    This protocol is compatible with the FlagMetrics dataclass and any
    object that has the required attributes.

    """

    @property
    def unique_users(self) -> int:
        """Number of unique users who evaluated the flag."""
        ...

    @property
    def error_rate(self) -> float:
        """Error rate as a percentage (0-100)."""
        ...

    @property
    def total_evaluations(self) -> int:
        """Total number of evaluations."""
        ...


[docs] class PrometheusExporter: """Prometheus metrics exporter for feature flag evaluations. Exposes feature flag metrics in Prometheus format for monitoring and alerting. This exporter implements the AnalyticsCollector protocol, allowing it to receive evaluation events directly. Metrics exported: - feature_flag_evaluations_total: Counter of flag evaluations Labels: flag_key, reason, variant - feature_flag_evaluation_duration_seconds: Histogram of evaluation times Labels: flag_key - feature_flag_unique_users: Gauge of unique users per flag Labels: flag_key - feature_flag_error_rate: Gauge of error rate per flag Labels: flag_key Attributes: registry: The Prometheus registry to use for metrics. prefix: Optional prefix for metric names. Example: >>> exporter = PrometheusExporter() >>> await exporter.record(evaluation_event) >>> # Metrics are automatically updated """
[docs] def __init__( self, registry: CollectorRegistry | None = None, prefix: str = "", duration_buckets: tuple[float, ...] = DEFAULT_DURATION_BUCKETS, ) -> None: """Initialize the Prometheus exporter. Args: registry: Custom Prometheus registry. If not provided, uses the default global registry. prefix: Optional prefix for metric names (e.g., "myapp" -> "myapp_feature_flag_*"). duration_buckets: Custom histogram buckets for duration measurements in seconds. Raises: ImportError: If prometheus_client is not installed. """ if not PROMETHEUS_AVAILABLE or PromCounter is None: raise ImportError( "prometheus_client is required for PrometheusExporter. " "Install it with: pip install litestar-flags[prometheus]" ) self._registry: CollectorRegistry = registry or PROM_REGISTRY # type: ignore[assignment] self._prefix = f"{prefix}_" if prefix else "" self._lock = asyncio.Lock() # Track unique users per flag (using sets) self._unique_users: dict[str, set[str]] = defaultdict(set) # Track error counts and total counts for error rate calculation self._error_counts: dict[str, int] = defaultdict(int) self._total_counts: dict[str, int] = defaultdict(int) # Create metrics self._evaluations_counter: Counter = PromCounter( # type: ignore[assignment] f"{self._prefix}feature_flag_evaluations_total", "Total number of feature flag evaluations", labelnames=["flag_key", "reason", "variant"], registry=self._registry, ) self._duration_histogram: Histogram = PromHistogram( # type: ignore[assignment] f"{self._prefix}feature_flag_evaluation_duration_seconds", "Duration of feature flag evaluations in seconds", labelnames=["flag_key"], buckets=duration_buckets, registry=self._registry, ) self._unique_users_gauge: Gauge = PromGauge( # type: ignore[assignment] f"{self._prefix}feature_flag_unique_users", "Number of unique users who evaluated each flag", labelnames=["flag_key"], registry=self._registry, ) self._error_rate_gauge: Gauge = PromGauge( # type: ignore[assignment] f"{self._prefix}feature_flag_error_rate", "Error rate for feature flag evaluations (0.0 to 1.0)", labelnames=["flag_key"], registry=self._registry, )
@property def registry(self) -> CollectorRegistry: """Get the Prometheus registry. Returns: The Prometheus registry used by this exporter. """ return self._registry @property def evaluations_counter(self) -> Counter: """Get the evaluations counter metric. Returns: The Counter metric for flag evaluations. """ return self._evaluations_counter @property def duration_histogram(self) -> Histogram: """Get the duration histogram metric. Returns: The Histogram metric for evaluation durations. """ return self._duration_histogram @property def unique_users_gauge(self) -> Gauge: """Get the unique users gauge metric. Returns: The Gauge metric for unique users. """ return self._unique_users_gauge @property def error_rate_gauge(self) -> Gauge: """Get the error rate gauge metric. Returns: The Gauge metric for error rates. """ return self._error_rate_gauge
[docs] async def record(self, event: FlagEvaluationEvent) -> None: """Record a single analytics event. Updates all Prometheus metrics based on the evaluation event. This method implements the AnalyticsCollector protocol. Args: event: The analytics event to record. """ flag_key = event.flag_key reason = event.reason if isinstance(event.reason, str) else event.reason.value variant = event.variant or "" # Update evaluations counter self._evaluations_counter.labels( flag_key=flag_key, reason=reason, variant=variant, ).inc() # Update duration histogram (convert from ms to seconds) latency_ms = getattr(event, "latency_ms", None) or getattr(event, "evaluation_duration_ms", None) or 0.0 if latency_ms > 0: self._duration_histogram.labels(flag_key=flag_key).observe(latency_ms / 1000.0) async with self._lock: # Track unique users targeting_key = event.targeting_key if targeting_key: self._unique_users[flag_key].add(targeting_key) self._unique_users_gauge.labels(flag_key=flag_key).set(len(self._unique_users[flag_key])) # Track errors for error rate calculation self._total_counts[flag_key] += 1 # Check if this is an error event is_error = reason == "ERROR" if is_error: self._error_counts[flag_key] += 1 # Update error rate gauge total = self._total_counts[flag_key] if total > 0: error_rate = self._error_counts[flag_key] / total self._error_rate_gauge.labels(flag_key=flag_key).set(error_rate)
[docs] async def record_batch(self, events: list[FlagEvaluationEvent]) -> None: """Record multiple analytics events in a batch. Args: events: List of analytics events to record. """ for event in events: await self.record(event)
[docs] async def flush(self) -> None: """Flush any buffered data. For Prometheus, metrics are updated immediately, so this is a no-op. Provided for AnalyticsCollector protocol compliance. """
# Prometheus metrics are updated immediately, nothing to flush
[docs] async def close(self) -> None: """Close the exporter and clean up resources. Clears internal tracking state but does not unregister metrics from the Prometheus registry. """ async with self._lock: self._unique_users.clear() self._error_counts.clear() self._total_counts.clear()
[docs] def update_from_metrics( self, flag_key: str, metrics: MetricsProvider | dict[str, Any], ) -> None: """Update gauge metrics from a metrics object or dictionary. This method syncs Prometheus gauges with pre-aggregated statistics from a FlagMetrics object or a compatible dictionary. Args: flag_key: The flag key to update metrics for. metrics: A MetricsProvider (like FlagMetrics) or a dictionary with keys: unique_users, error_rate (0-100), total_evaluations. Example: >>> from litestar_flags.analytics.aggregator import AnalyticsAggregator >>> aggregator = AnalyticsAggregator(collector) >>> metrics = aggregator.get_flag_metrics("feature_a") >>> exporter.update_from_metrics("feature_a", metrics) """ unique_users: int error_rate: float if isinstance(metrics, dict): metrics_dict = cast(dict[str, Any], metrics) unique_users = int(metrics_dict.get("unique_users") or 0) error_rate = float(metrics_dict.get("error_rate") or 0.0) else: unique_users = metrics.unique_users error_rate = metrics.error_rate # Update unique users gauge self._unique_users_gauge.labels(flag_key=flag_key).set(unique_users) # Update error rate gauge (convert from percentage to 0-1 ratio) self._error_rate_gauge.labels(flag_key=flag_key).set(error_rate / 100.0)
[docs] async def update_from_aggregator( self, aggregator: Any, flag_keys: Sequence[str], window_seconds: int = 3600, ) -> None: """Update gauge metrics from an analytics aggregator. This method is useful for syncing Prometheus gauges with pre-aggregated statistics from an AnalyticsAggregator. Args: aggregator: An AnalyticsAggregator instance with get_flag_metrics or get_flag_metrics_async method. flag_keys: List of flag keys to update metrics for. window_seconds: Time window for metric aggregation (default: 3600). Example: >>> from litestar_flags.analytics.aggregator import AnalyticsAggregator >>> aggregator = AnalyticsAggregator(collector) >>> await exporter.update_from_aggregator( ... aggregator=aggregator, ... flag_keys=["feature_a", "feature_b"], ... ) """ for flag_key in flag_keys: try: # Try async method first, then fall back to sync if hasattr(aggregator, "get_flag_metrics_async"): metrics = await aggregator.get_flag_metrics_async(flag_key, window_seconds) elif hasattr(aggregator, "get_flag_metrics"): metrics = aggregator.get_flag_metrics(flag_key, window_seconds) else: continue self.update_from_metrics(flag_key, metrics) except Exception: # noqa: S112 # Silently ignore errors from individual flag stats retrieval # to avoid disrupting the update of other flags continue
[docs] def get_tracked_flag_keys(self) -> list[str]: """Get list of flag keys that have been tracked. Returns: List of flag keys that have recorded evaluations. """ return list(self._unique_users.keys() | self._total_counts.keys())
[docs] async def reset_flag_stats(self, flag_key: str) -> None: """Reset internal tracking stats for a specific flag. This clears the unique users set and error tracking for the flag. Note: This does not reset Prometheus metrics themselves. Args: flag_key: The flag key to reset stats for. """ async with self._lock: if flag_key in self._unique_users: del self._unique_users[flag_key] if flag_key in self._error_counts: del self._error_counts[flag_key] if flag_key in self._total_counts: del self._total_counts[flag_key]
[docs] async def reset_all_stats(self) -> None: """Reset all internal tracking stats. Clears all unique user sets and error tracking. Note: This does not reset Prometheus metrics themselves. """ async with self._lock: self._unique_users.clear() self._error_counts.clear() self._total_counts.clear()