Analytics¶
The analytics module provides comprehensive tracking and analysis of feature flag evaluations. It includes event models, collectors for storing events, aggregators for computing metrics, and exporters for monitoring systems.
Overview¶
The analytics system follows a three-layer architecture:
Events:
FlagEvaluationEventcaptures each flag evaluationCollectors: Store events (in-memory, database, or custom backends)
Aggregators: Compute metrics from collected events
Exporters: Push metrics to external systems (Prometheus, OpenTelemetry)
For usage guides and examples, see Flag Analytics.
Core API¶
FlagEvaluationEvent¶
- class litestar_flags.analytics.FlagEvaluationEvent[source]¶
Bases:
objectEvent capturing a single feature flag evaluation.
Records detailed information about each flag evaluation for analytics, debugging, and monitoring purposes. This model follows the OpenFeature specification patterns for evaluation telemetry.
- timestamp¶
When the evaluation occurred (UTC).
- flag_key¶
The key of the evaluated flag.
- value¶
The evaluated flag value (any type).
- reason¶
The reason for the evaluation result.
- variant¶
The variant key if a variant was selected.
- targeting_key¶
The targeting key used for evaluation (e.g., user ID).
- context_attributes¶
Additional context attributes used in evaluation.
- evaluation_duration_ms¶
Time taken to evaluate the flag in milliseconds.
Example
>>> from datetime import datetime, UTC >>> event = FlagEvaluationEvent( ... timestamp=datetime.now(UTC), ... flag_key="new_checkout", ... value=True, ... reason=EvaluationReason.TARGETING_MATCH, ... variant="beta_users", ... targeting_key="user-123", ... context_attributes={"plan": "premium"}, ... evaluation_duration_ms=1.5, ... ) >>> event.flag_key 'new_checkout'
- Parameters:
- __init__(timestamp, flag_key, value, reason, variant=None, targeting_key=None, context_attributes=<factory>, evaluation_duration_ms=0.0)¶
-
reason:
EvaluationReason¶
FlagMetrics¶
- class litestar_flags.analytics.FlagMetrics[source]¶
Bases:
objectAggregated metrics for a feature flag.
Contains computed statistics about flag evaluations including evaluation rate, unique users, distributions, and latency percentiles.
- evaluation_rate¶
Evaluations per second in the measurement window.
- unique_users¶
Count of unique targeting keys in the window.
- variant_distribution¶
Count of evaluations per variant.
- reason_distribution¶
Count of evaluations per reason.
- error_rate¶
Percentage of evaluations that resulted in errors (0-100).
- latency_p50¶
50th percentile latency in milliseconds.
- latency_p90¶
90th percentile latency in milliseconds.
- latency_p99¶
99th percentile latency in milliseconds.
- total_evaluations¶
Total number of evaluations in the window.
- window_start¶
Start of the measurement window.
- window_end¶
End of the measurement window.
Example
>>> metrics = FlagMetrics( ... evaluation_rate=10.5, ... unique_users=150, ... variant_distribution={"control": 75, "treatment": 75}, ... reason_distribution={"SPLIT": 150}, ... error_rate=0.0, ... latency_p50=1.2, ... latency_p90=2.5, ... latency_p99=5.0, ... )
- Parameters:
- __init__(evaluation_rate=0.0, unique_users=0, variant_distribution=<factory>, reason_distribution=<factory>, error_rate=0.0, latency_p50=0.0, latency_p90=0.0, latency_p99=0.0, total_evaluations=0, window_start=None, window_end=None)¶
- Parameters:
- Return type:
None
AnalyticsCollector Protocol¶
- class litestar_flags.analytics.AnalyticsCollector[source]¶
Bases:
ProtocolProtocol for analytics collectors.
All analytics collector implementations must implement this protocol. Methods are async to support both sync and async backends.
- Implementations:
InMemoryAnalyticsCollector: In-memory storage for development/testing
(Future) DatadogAnalyticsCollector: Datadog integration
(Future) PrometheusAnalyticsCollector: Prometheus metrics
Example
>>> class MyCollector: ... async def record(self, event: FlagEvaluationEvent) -> None: ... # Store or process the event ... pass ... ... async def flush(self) -> None: ... # Flush any buffered events ... pass ... ... async def close(self) -> None: ... # Clean up resources ... pass >>> isinstance(MyCollector(), AnalyticsCollector) True
- async record(event)[source]¶
Record a flag evaluation event.
This method should be fast and non-blocking. Implementations may buffer events and flush them asynchronously to avoid impacting flag evaluation latency.
- Parameters:
event (
FlagEvaluationEvent) – The evaluation event to record.- Return type:
- async flush()[source]¶
Flush any buffered events.
Forces immediate processing of any buffered events. This is useful for graceful shutdown or when events need to be persisted immediately.
- Return type:
- async close()[source]¶
Close the collector and release any resources.
This method should flush any remaining events and clean up resources such as connections or background tasks.
- Return type:
- __init__(*args, **kwargs)¶
Collectors¶
InMemoryAnalyticsCollector¶
- class litestar_flags.analytics.InMemoryAnalyticsCollector[source]¶
Bases:
objectIn-memory analytics collector for development and testing.
This collector stores evaluation events in memory with a configurable maximum size. When the maximum size is reached, oldest events are discarded to make room for new ones.
Thread-safe implementation using asyncio.Lock for concurrent access.
- max_size¶
Maximum number of events to store.
Example
>>> from datetime import datetime, UTC >>> from litestar_flags.analytics import FlagEvaluationEvent, InMemoryAnalyticsCollector >>> from litestar_flags.types import EvaluationReason >>> collector = InMemoryAnalyticsCollector(max_size=1000) >>> event = FlagEvaluationEvent( ... timestamp=datetime.now(UTC), ... flag_key="my_flag", ... value=True, ... reason=EvaluationReason.STATIC, ... ) >>> await collector.record(event) >>> events = await collector.get_events() >>> len(events) 1
- Parameters:
max_size (
int)
- __init__(max_size=10000)[source]¶
Initialize the in-memory analytics collector.
- Parameters:
max_size (
int) – Maximum number of events to store. Defaults to 10000. When exceeded, oldest events are discarded.- Return type:
None
- property max_size: int¶
Return the maximum number of events to store.
- Returns:
The configured maximum size.
- async record(event)[source]¶
Record a flag evaluation event.
Thread-safe method that stores the event in memory. If the maximum size is exceeded, the oldest event is removed.
- Parameters:
event (
FlagEvaluationEvent) – The evaluation event to record.- Return type:
- async flush()[source]¶
Flush buffered events.
For the in-memory collector, this is a no-op since events are stored immediately. Provided for protocol compliance.
- Return type:
- async close()[source]¶
Close the collector and clear all stored events.
Releases the stored events from memory.
- Return type:
- async get_events(flag_key=None, limit=None)[source]¶
Retrieve stored evaluation events.
This method is primarily intended for testing and debugging. It allows filtering and limiting the returned events.
DatabaseAnalyticsCollector¶
- class litestar_flags.analytics.collectors.database.DatabaseAnalyticsCollector[source]¶
Bases:
objectDatabase analytics collector with batch writes.
Buffers evaluation events in memory and periodically flushes them to the database in batches for optimal write performance. Uses SQLAlchemy async sessions for non-blocking database operations.
- Features:
Configurable batch size and flush interval
Background task for periodic flushing
Thread-safe event buffering with asyncio.Lock
Automatic flush on close for data integrity
Proper cleanup of database connections
- batch_size¶
Maximum events to buffer before auto-flush.
- flush_interval_seconds¶
Time between automatic flushes.
Example
>>> collector = await DatabaseAnalyticsCollector.create( ... connection_string="postgresql+asyncpg://user:pass@localhost/db", ... batch_size=100, ... flush_interval_seconds=5.0, ... ) >>> try: ... await collector.record(event) ... finally: ... await collector.close()
- Parameters:
engine (
AsyncEngine)session_maker (
async_sessionmaker[AsyncSession])batch_size (
int)flush_interval_seconds (
float)
- __init__(engine, session_maker, batch_size=100, flush_interval_seconds=5.0)[source]¶
Initialize the database analytics collector.
- Parameters:
engine (
AsyncEngine) – The SQLAlchemy async engine.session_maker (
async_sessionmaker[AsyncSession]) – The session maker factory.batch_size (
int) – Maximum events to buffer before auto-flush. Defaults to 100.flush_interval_seconds (
float) – Seconds between automatic flushes. Defaults to 5.0.
- Return type:
None
- async classmethod create(connection_string, batch_size=100, flush_interval_seconds=5.0, create_tables=True, **engine_kwargs)[source]¶
Create a new database analytics collector.
Factory method that sets up the database connection and optionally creates the analytics_events table.
- Parameters:
connection_string (
str) – Database connection string (SQLAlchemy format).batch_size (
int) – Maximum events to buffer before auto-flush. Defaults to 100.flush_interval_seconds (
float) – Seconds between automatic flushes. Defaults to 5.0.create_tables (
bool) – Whether to create tables on startup. Defaults to True.**engine_kwargs (
Any) – Additional arguments for create_async_engine.
- Return type:
- Returns:
Configured DatabaseAnalyticsCollector instance with background flush task running.
Example
>>> collector = await DatabaseAnalyticsCollector.create( ... connection_string="sqlite+aiosqlite:///analytics.db", ... batch_size=50, ... flush_interval_seconds=10.0, ... )
- property batch_size: int¶
Return the configured batch size.
- Returns:
The maximum number of events to buffer before auto-flush.
- property flush_interval_seconds: float¶
Return the configured flush interval.
- Returns:
The time in seconds between automatic flushes.
- async record(event)[source]¶
Record a flag evaluation event.
Buffers the event in memory. If the buffer reaches batch_size, automatically triggers a flush to the database.
Thread-safe method using asyncio.Lock.
- Parameters:
event (
FlagEvaluationEvent) – The evaluation event to record.- Raises:
RuntimeError – If the collector has been closed.
- Return type:
- async flush()[source]¶
Flush buffered events to the database.
Writes all buffered events to the database in a single transaction. This method is thread-safe and can be called concurrently with record().
If there are no buffered events, this is a no-op.
- Return type:
- async close()[source]¶
Close the collector and release resources.
Flushes any remaining buffered events, cancels the background flush task, and disposes of the database engine connection pool.
This method is idempotent - calling it multiple times is safe.
- Return type:
Aggregator¶
AnalyticsAggregator¶
- class litestar_flags.analytics.AnalyticsAggregator[source]¶
Bases:
objectAggregator for computing metrics from feature flag evaluation events.
Supports multiple event sources including in-memory collectors and database sessions. Provides methods for computing various metrics including evaluation rates, unique users, distributions, and latencies.
The aggregator uses window-based aggregation, only considering events within the specified time window for each metric calculation.
- source¶
The event source (InMemoryAnalyticsCollector or AsyncSession).
Example
>>> from litestar_flags.analytics import InMemoryAnalyticsCollector >>> collector = InMemoryAnalyticsCollector() >>> aggregator = AnalyticsAggregator(collector) >>> rate = await aggregator.get_evaluation_rate("my_flag", window_seconds=60) >>> metrics = await aggregator.get_flag_metrics("my_flag")
- Parameters:
source (
InMemoryAnalyticsCollector|AsyncSession)
- __init__(source)[source]¶
Initialize the analytics aggregator.
- Parameters:
source (
InMemoryAnalyticsCollector|AsyncSession) – The event source to aggregate from. Can be an InMemoryAnalyticsCollector for in-memory events or an AsyncSession for database-backed events.- Return type:
None
- async get_evaluation_rate(flag_key, window_seconds=60)[source]¶
Calculate the evaluation rate for a flag.
- async get_unique_users(flag_key, window_seconds=3600)[source]¶
Count unique targeting keys for a flag.
- async get_variant_distribution(flag_key, window_seconds=3600)[source]¶
Get the distribution of variants for a flag.
- async get_reason_distribution(flag_key, window_seconds=3600)[source]¶
Get the distribution of evaluation reasons for a flag.
- async get_latency_percentiles(flag_key, percentiles=None)[source]¶
Calculate latency percentiles for a flag.
Exporters¶
PrometheusExporter¶
- class litestar_flags.analytics.PrometheusExporter[source]¶
Bases:
objectPrometheus metrics exporter for feature flag evaluations.
Exposes feature flag metrics in Prometheus format for monitoring and alerting. This exporter implements the AnalyticsCollector protocol, allowing it to receive evaluation events directly.
- Metrics exported:
- feature_flag_evaluations_total: Counter of flag evaluations
Labels: flag_key, reason, variant
- feature_flag_evaluation_duration_seconds: Histogram of evaluation times
Labels: flag_key
- feature_flag_unique_users: Gauge of unique users per flag
Labels: flag_key
- feature_flag_error_rate: Gauge of error rate per flag
Labels: flag_key
- registry¶
The Prometheus registry to use for metrics.
- prefix¶
Optional prefix for metric names.
Example
>>> exporter = PrometheusExporter() >>> await exporter.record(evaluation_event) >>> # Metrics are automatically updated
- __init__(registry=None, prefix='', duration_buckets=(0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0))[source]¶
Initialize the Prometheus exporter.
- Parameters:
registry (
CollectorRegistry|None) – Custom Prometheus registry. If not provided, uses the default global registry.prefix (
str) – Optional prefix for metric names (e.g., “myapp” -> “myapp_feature_flag_*”).duration_buckets (
tuple[float,...]) – Custom histogram buckets for duration measurements in seconds.
- Raises:
ImportError – If prometheus_client is not installed.
- Return type:
None
- property registry: CollectorRegistry¶
Get the Prometheus registry.
- Returns:
The Prometheus registry used by this exporter.
- property evaluations_counter: Counter¶
Get the evaluations counter metric.
- Returns:
The Counter metric for flag evaluations.
- property duration_histogram: Histogram¶
Get the duration histogram metric.
- Returns:
The Histogram metric for evaluation durations.
- property unique_users_gauge: Gauge¶
Get the unique users gauge metric.
- Returns:
The Gauge metric for unique users.
- property error_rate_gauge: Gauge¶
Get the error rate gauge metric.
- Returns:
The Gauge metric for error rates.
- async record(event)[source]¶
Record a single analytics event.
Updates all Prometheus metrics based on the evaluation event. This method implements the AnalyticsCollector protocol.
- Parameters:
event (
FlagEvaluationEvent) – The analytics event to record.- Return type:
- async record_batch(events)[source]¶
Record multiple analytics events in a batch.
- Parameters:
events (
list[FlagEvaluationEvent]) – List of analytics events to record.- Return type:
- async flush()[source]¶
Flush any buffered data.
For Prometheus, metrics are updated immediately, so this is a no-op. Provided for AnalyticsCollector protocol compliance.
- Return type:
- async close()[source]¶
Close the exporter and clean up resources.
Clears internal tracking state but does not unregister metrics from the Prometheus registry.
- Return type:
- update_from_metrics(flag_key, metrics)[source]¶
Update gauge metrics from a metrics object or dictionary.
This method syncs Prometheus gauges with pre-aggregated statistics from a FlagMetrics object or a compatible dictionary.
- Parameters:
- Return type:
Example
>>> from litestar_flags.analytics.aggregator import AnalyticsAggregator >>> aggregator = AnalyticsAggregator(collector) >>> metrics = aggregator.get_flag_metrics("feature_a") >>> exporter.update_from_metrics("feature_a", metrics)
- async update_from_aggregator(aggregator, flag_keys, window_seconds=3600)[source]¶
Update gauge metrics from an analytics aggregator.
This method is useful for syncing Prometheus gauges with pre-aggregated statistics from an AnalyticsAggregator.
- Parameters:
- Return type:
Example
>>> from litestar_flags.analytics.aggregator import AnalyticsAggregator >>> aggregator = AnalyticsAggregator(collector) >>> await exporter.update_from_aggregator( ... aggregator=aggregator, ... flag_keys=["feature_a", "feature_b"], ... )
OTelAnalyticsExporter¶
- class litestar_flags.analytics.exporters.otel.OTelAnalyticsExporter[source]¶
Bases:
objectOpenTelemetry exporter for feature flag analytics.
Exports feature flag evaluation events as OpenTelemetry spans and metrics. Implements the AnalyticsCollector protocol for seamless integration with the analytics pipeline.
This exporter can optionally wrap an existing OTelHook instance to share tracer and meter configurations, or create its own OpenTelemetry instruments.
- Metrics exported:
- feature_flag.analytics.events_recorded: Counter of recorded analytics events
Labels: flag_key, reason
- feature_flag.analytics.batch_size: Histogram of batch sizes when flushing
Labels: (none)
- tracer¶
The OpenTelemetry tracer for creating spans.
- meter¶
The OpenTelemetry meter for recording metrics.
- batch_size¶
Maximum number of events to buffer before auto-flush.
- flush_interval¶
Time in seconds between automatic flushes.
Example
>>> exporter = OTelAnalyticsExporter(batch_size=50) >>> await exporter.record(evaluation_event) >>> # Events are batched and flushed automatically >>> await exporter.close()
- Parameters:
- __init__(tracer=None, meter=None, otel_hook=None, tracer_name='litestar_flags.analytics', meter_name='litestar_flags.analytics', batch_size=100, flush_interval=30.0, record_values=False, create_spans=True)[source]¶
Initialize the OpenTelemetry analytics exporter.
- Parameters:
tracer (
Tracer|None) – Custom tracer instance. If not provided, uses otel_hook’s tracer or creates one using tracer_name.meter (
Meter|None) – Custom meter instance. If not provided, uses otel_hook’s meter or creates one using meter_name.otel_hook (
OTelHook|None) – Existing OTelHook instance to share tracer/meter from. If provided, tracer and meter arguments are ignored.tracer_name (
str) – Name for the default tracer if none provided.meter_name (
str) – Name for the default meter if none provided.batch_size (
int) – Maximum number of events to buffer before auto-flush. Set to 0 to disable batching.flush_interval (
float) – Time in seconds between automatic flushes. Set to 0 to disable automatic flushing.record_values (
bool) – Whether to record flag values in spans. Disabled by default for privacy/security reasons.create_spans (
bool) – Whether to create spans for each event. Set to False to only record metrics without span overhead.
- Raises:
ImportError – If opentelemetry-api is not installed.
- Return type:
None
- property tracer: Tracer¶
Get the tracer instance.
- Returns:
The OpenTelemetry tracer used by this exporter.
- property meter: Meter¶
Get the meter instance.
- Returns:
The OpenTelemetry meter used by this exporter.
- property events_recorded_counter: Counter¶
Get the events recorded counter metric.
- Returns:
The Counter metric for recorded analytics events.
- property batch_size_histogram: Histogram¶
Get the batch size histogram metric.
- Returns:
The Histogram metric for batch sizes.
- property buffer_size: int¶
Get the current number of buffered events.
- Returns:
Number of events currently in the buffer.
- async record(event)[source]¶
Record a flag evaluation event.
Buffers the event and flushes when batch_size is reached. This method implements the AnalyticsCollector protocol.
- Parameters:
event (
FlagEvaluationEvent) – The evaluation event to record.- Return type:
- async flush()[source]¶
Flush any buffered events.
Forces immediate processing of all buffered events, creating spans and recording metrics for each event.
- Return type:
- async close()[source]¶
Close the exporter and release resources.
Flushes remaining events and cancels the background flush task.
- Return type:
- record_sync(event)[source]¶
Record an event synchronously without batching.
This method processes the event immediately without buffering. Useful for low-volume scenarios or when immediate recording is required.
- Parameters:
event (
FlagEvaluationEvent) – The evaluation event to record.- Return type:
- async record_batch(events)[source]¶
Record multiple events in a batch.
Adds all events to the buffer and triggers a flush.
- Parameters:
events (
list[FlagEvaluationEvent]) – List of evaluation events to record.- Return type:
Helper Functions¶
- litestar_flags.analytics.exporters.otel.create_exporter_from_hook(otel_hook, batch_size=100, flush_interval=30.0, record_values=False, create_spans=True)[source]¶
Create an OTelAnalyticsExporter from an existing OTelHook.
This factory function creates an analytics exporter that shares the tracer and meter from an existing OTelHook instance, ensuring consistent instrumentation across flag evaluation and analytics.
- Parameters:
otel_hook (
OTelHook) – The OTelHook instance to share tracer/meter from.batch_size (
int) – Maximum number of events to buffer before auto-flush.flush_interval (
float) – Time in seconds between automatic flushes.record_values (
bool) – Whether to record flag values in spans.create_spans (
bool) – Whether to create spans for each event.
- Return type:
- Returns:
A configured OTelAnalyticsExporter instance.
- Raises:
ImportError – If opentelemetry-api is not installed.
Example
>>> from litestar_flags.contrib.otel import OTelHook >>> hook = OTelHook() >>> exporter = create_exporter_from_hook(hook) >>> await exporter.record(event)
Database Models¶
AnalyticsEventModel¶
- class litestar_flags.analytics.models.AnalyticsEventModel[source]¶
Bases:
UUIDv7AuditBaseSQLAlchemy model for analytics events.
Stores feature flag evaluation events for analysis and reporting. Designed for high-volume writes with appropriate indexes for common queries.
- timestamp¶
When the evaluation occurred.
- flag_key¶
The key of the evaluated flag.
- value¶
The evaluated value stored as JSON.
- reason¶
Why this value was returned.
- variant¶
The variant key if applicable.
- targeting_key¶
The key used for targeting (e.g., user ID).
- context_attributes¶
Additional context attributes as JSON.
- evaluation_duration_ms¶
Evaluation time in milliseconds.
- timestamp: Mapped[datetime]¶
- flag_key: Mapped[str]¶
- reason: Mapped[str]¶
- id: UUID¶
UUID Primary key column.
- value: Mapped[dict[str, Any] | None]¶
- variant: Mapped[str | None]¶
- targeting_key: Mapped[str | None]¶
- context_attributes: Mapped[dict[str, Any]]¶
- evaluation_duration_ms: Mapped[float | None]¶
- created_at: datetime | None¶
Date/time of instance creation.
- updated_at: datetime | None¶
Date/time of instance last update.
- __init__(**kwargs)¶
A simple constructor that allows initialization from kwargs.
Sets attributes on the constructed instance using the names and values in
kwargs.Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.
Constants¶
Availability Flags¶
- litestar_flags.analytics.PROMETHEUS_AVAILABLE¶
Boolean indicating if
prometheus_clientis installed and available.
- litestar_flags.analytics.exporters.otel.OTEL_AVAILABLE¶
Boolean indicating if
opentelemetry-apiis installed and available.
See Also¶
Flag Analytics - User guide with examples
Types and Enums -
EvaluationReasonenum and other typesModels - Flag and rule models