Analytics

The analytics module provides comprehensive tracking and analysis of feature flag evaluations. It includes event models, collectors for storing events, aggregators for computing metrics, and exporters for monitoring systems.

Overview

The analytics system follows a three-layer architecture:

  1. Events: FlagEvaluationEvent captures each flag evaluation

  2. Collectors: Store events (in-memory, database, or custom backends)

  3. Aggregators: Compute metrics from collected events

  4. Exporters: Push metrics to external systems (Prometheus, OpenTelemetry)

For usage guides and examples, see Flag Analytics.

Core API

FlagEvaluationEvent

class litestar_flags.analytics.FlagEvaluationEvent[source]

Bases: object

Event capturing a single feature flag evaluation.

Records detailed information about each flag evaluation for analytics, debugging, and monitoring purposes. This model follows the OpenFeature specification patterns for evaluation telemetry.

timestamp

When the evaluation occurred (UTC).

flag_key

The key of the evaluated flag.

value

The evaluated flag value (any type).

reason

The reason for the evaluation result.

variant

The variant key if a variant was selected.

targeting_key

The targeting key used for evaluation (e.g., user ID).

context_attributes

Additional context attributes used in evaluation.

evaluation_duration_ms

Time taken to evaluate the flag in milliseconds.

Example

>>> from datetime import datetime, UTC
>>> event = FlagEvaluationEvent(
...     timestamp=datetime.now(UTC),
...     flag_key="new_checkout",
...     value=True,
...     reason=EvaluationReason.TARGETING_MATCH,
...     variant="beta_users",
...     targeting_key="user-123",
...     context_attributes={"plan": "premium"},
...     evaluation_duration_ms=1.5,
... )
>>> event.flag_key
'new_checkout'
Parameters:
__init__(timestamp, flag_key, value, reason, variant=None, targeting_key=None, context_attributes=<factory>, evaluation_duration_ms=0.0)
Parameters:
Return type:

None

timestamp: datetime
flag_key: str
value: Any
reason: EvaluationReason
variant: str | None
targeting_key: str | None
context_attributes: dict[str, Any]
evaluation_duration_ms: float
to_dict()[source]

Convert to dictionary representation.

Return type:

dict[str, Any]

Returns:

Dictionary representation of the evaluation event.

FlagMetrics

class litestar_flags.analytics.FlagMetrics[source]

Bases: object

Aggregated metrics for a feature flag.

Contains computed statistics about flag evaluations including evaluation rate, unique users, distributions, and latency percentiles.

evaluation_rate

Evaluations per second in the measurement window.

unique_users

Count of unique targeting keys in the window.

variant_distribution

Count of evaluations per variant.

reason_distribution

Count of evaluations per reason.

error_rate

Percentage of evaluations that resulted in errors (0-100).

latency_p50

50th percentile latency in milliseconds.

latency_p90

90th percentile latency in milliseconds.

latency_p99

99th percentile latency in milliseconds.

total_evaluations

Total number of evaluations in the window.

window_start

Start of the measurement window.

window_end

End of the measurement window.

Example

>>> metrics = FlagMetrics(
...     evaluation_rate=10.5,
...     unique_users=150,
...     variant_distribution={"control": 75, "treatment": 75},
...     reason_distribution={"SPLIT": 150},
...     error_rate=0.0,
...     latency_p50=1.2,
...     latency_p90=2.5,
...     latency_p99=5.0,
... )
Parameters:
evaluation_rate: float
unique_users: int
variant_distribution: dict[str, int]
reason_distribution: dict[str, int]
error_rate: float
latency_p50: float
latency_p90: float
latency_p99: float
total_evaluations: int
window_start: datetime | None
window_end: datetime | None
to_dict()[source]

Convert metrics to dictionary representation.

Return type:

dict[str, Any]

Returns:

Dictionary representation of the metrics.

__init__(evaluation_rate=0.0, unique_users=0, variant_distribution=<factory>, reason_distribution=<factory>, error_rate=0.0, latency_p50=0.0, latency_p90=0.0, latency_p99=0.0, total_evaluations=0, window_start=None, window_end=None)
Parameters:
Return type:

None

AnalyticsCollector Protocol

class litestar_flags.analytics.AnalyticsCollector[source]

Bases: Protocol

Protocol for analytics collectors.

All analytics collector implementations must implement this protocol. Methods are async to support both sync and async backends.

Implementations:
  • InMemoryAnalyticsCollector: In-memory storage for development/testing

  • (Future) DatadogAnalyticsCollector: Datadog integration

  • (Future) PrometheusAnalyticsCollector: Prometheus metrics

Example

>>> class MyCollector:
...     async def record(self, event: FlagEvaluationEvent) -> None:
...         # Store or process the event
...         pass
...
...     async def flush(self) -> None:
...         # Flush any buffered events
...         pass
...
...     async def close(self) -> None:
...         # Clean up resources
...         pass
>>> isinstance(MyCollector(), AnalyticsCollector)
True
async record(event)[source]

Record a flag evaluation event.

This method should be fast and non-blocking. Implementations may buffer events and flush them asynchronously to avoid impacting flag evaluation latency.

Parameters:

event (FlagEvaluationEvent) – The evaluation event to record.

Return type:

None

async flush()[source]

Flush any buffered events.

Forces immediate processing of any buffered events. This is useful for graceful shutdown or when events need to be persisted immediately.

Return type:

None

async close()[source]

Close the collector and release any resources.

This method should flush any remaining events and clean up resources such as connections or background tasks.

Return type:

None

__init__(*args, **kwargs)

Collectors

InMemoryAnalyticsCollector

class litestar_flags.analytics.InMemoryAnalyticsCollector[source]

Bases: object

In-memory analytics collector for development and testing.

This collector stores evaluation events in memory with a configurable maximum size. When the maximum size is reached, oldest events are discarded to make room for new ones.

Thread-safe implementation using asyncio.Lock for concurrent access.

max_size

Maximum number of events to store.

Example

>>> from datetime import datetime, UTC
>>> from litestar_flags.analytics import FlagEvaluationEvent, InMemoryAnalyticsCollector
>>> from litestar_flags.types import EvaluationReason
>>> collector = InMemoryAnalyticsCollector(max_size=1000)
>>> event = FlagEvaluationEvent(
...     timestamp=datetime.now(UTC),
...     flag_key="my_flag",
...     value=True,
...     reason=EvaluationReason.STATIC,
... )
>>> await collector.record(event)
>>> events = await collector.get_events()
>>> len(events)
1
Parameters:

max_size (int)

__init__(max_size=10000)[source]

Initialize the in-memory analytics collector.

Parameters:

max_size (int) – Maximum number of events to store. Defaults to 10000. When exceeded, oldest events are discarded.

Return type:

None

property max_size: int

Return the maximum number of events to store.

Returns:

The configured maximum size.

async record(event)[source]

Record a flag evaluation event.

Thread-safe method that stores the event in memory. If the maximum size is exceeded, the oldest event is removed.

Parameters:

event (FlagEvaluationEvent) – The evaluation event to record.

Return type:

None

async flush()[source]

Flush buffered events.

For the in-memory collector, this is a no-op since events are stored immediately. Provided for protocol compliance.

Return type:

None

async close()[source]

Close the collector and clear all stored events.

Releases the stored events from memory.

Return type:

None

async get_events(flag_key=None, limit=None)[source]

Retrieve stored evaluation events.

This method is primarily intended for testing and debugging. It allows filtering and limiting the returned events.

Parameters:
  • flag_key (str | None) – If provided, only return events for this flag key.

  • limit (int | None) – If provided, return at most this many events (most recent first).

Return type:

list[FlagEvaluationEvent]

Returns:

List of stored evaluation events matching the criteria.

async get_event_count(flag_key=None)[source]

Get the count of stored events.

Parameters:

flag_key (str | None) – If provided, only count events for this flag key.

Return type:

int

Returns:

The number of stored events matching the criteria.

async clear()[source]

Clear all stored events.

Removes all events from memory without closing the collector.

Return type:

None

DatabaseAnalyticsCollector

class litestar_flags.analytics.collectors.database.DatabaseAnalyticsCollector[source]

Bases: object

Database analytics collector with batch writes.

Buffers evaluation events in memory and periodically flushes them to the database in batches for optimal write performance. Uses SQLAlchemy async sessions for non-blocking database operations.

Features:
  • Configurable batch size and flush interval

  • Background task for periodic flushing

  • Thread-safe event buffering with asyncio.Lock

  • Automatic flush on close for data integrity

  • Proper cleanup of database connections

batch_size

Maximum events to buffer before auto-flush.

flush_interval_seconds

Time between automatic flushes.

Example

>>> collector = await DatabaseAnalyticsCollector.create(
...     connection_string="postgresql+asyncpg://user:pass@localhost/db",
...     batch_size=100,
...     flush_interval_seconds=5.0,
... )
>>> try:
...     await collector.record(event)
... finally:
...     await collector.close()
Parameters:
__init__(engine, session_maker, batch_size=100, flush_interval_seconds=5.0)[source]

Initialize the database analytics collector.

Parameters:
  • engine (AsyncEngine) – The SQLAlchemy async engine.

  • session_maker (async_sessionmaker[AsyncSession]) – The session maker factory.

  • batch_size (int) – Maximum events to buffer before auto-flush. Defaults to 100.

  • flush_interval_seconds (float) – Seconds between automatic flushes. Defaults to 5.0.

Return type:

None

async classmethod create(connection_string, batch_size=100, flush_interval_seconds=5.0, create_tables=True, **engine_kwargs)[source]

Create a new database analytics collector.

Factory method that sets up the database connection and optionally creates the analytics_events table.

Parameters:
  • connection_string (str) – Database connection string (SQLAlchemy format).

  • batch_size (int) – Maximum events to buffer before auto-flush. Defaults to 100.

  • flush_interval_seconds (float) – Seconds between automatic flushes. Defaults to 5.0.

  • create_tables (bool) – Whether to create tables on startup. Defaults to True.

  • **engine_kwargs (Any) – Additional arguments for create_async_engine.

Return type:

DatabaseAnalyticsCollector

Returns:

Configured DatabaseAnalyticsCollector instance with background flush task running.

Example

>>> collector = await DatabaseAnalyticsCollector.create(
...     connection_string="sqlite+aiosqlite:///analytics.db",
...     batch_size=50,
...     flush_interval_seconds=10.0,
... )
property batch_size: int

Return the configured batch size.

Returns:

The maximum number of events to buffer before auto-flush.

property flush_interval_seconds: float

Return the configured flush interval.

Returns:

The time in seconds between automatic flushes.

async record(event)[source]

Record a flag evaluation event.

Buffers the event in memory. If the buffer reaches batch_size, automatically triggers a flush to the database.

Thread-safe method using asyncio.Lock.

Parameters:

event (FlagEvaluationEvent) – The evaluation event to record.

Raises:

RuntimeError – If the collector has been closed.

Return type:

None

async flush()[source]

Flush buffered events to the database.

Writes all buffered events to the database in a single transaction. This method is thread-safe and can be called concurrently with record().

If there are no buffered events, this is a no-op.

Return type:

None

async close()[source]

Close the collector and release resources.

Flushes any remaining buffered events, cancels the background flush task, and disposes of the database engine connection pool.

This method is idempotent - calling it multiple times is safe.

Return type:

None

async get_buffer_size()[source]

Get the current number of buffered events.

Return type:

int

Returns:

The number of events currently in the buffer.

async health_check()[source]

Check if the database connection is healthy.

Executes a simple query to verify connectivity.

Return type:

bool

Returns:

True if the database is reachable, False otherwise.

Aggregator

AnalyticsAggregator

class litestar_flags.analytics.AnalyticsAggregator[source]

Bases: object

Aggregator for computing metrics from feature flag evaluation events.

Supports multiple event sources including in-memory collectors and database sessions. Provides methods for computing various metrics including evaluation rates, unique users, distributions, and latencies.

The aggregator uses window-based aggregation, only considering events within the specified time window for each metric calculation.

source

The event source (InMemoryAnalyticsCollector or AsyncSession).

Example

>>> from litestar_flags.analytics import InMemoryAnalyticsCollector
>>> collector = InMemoryAnalyticsCollector()
>>> aggregator = AnalyticsAggregator(collector)
>>> rate = await aggregator.get_evaluation_rate("my_flag", window_seconds=60)
>>> metrics = await aggregator.get_flag_metrics("my_flag")
Parameters:

source (InMemoryAnalyticsCollector | AsyncSession)

__init__(source)[source]

Initialize the analytics aggregator.

Parameters:

source (InMemoryAnalyticsCollector | AsyncSession) – The event source to aggregate from. Can be an InMemoryAnalyticsCollector for in-memory events or an AsyncSession for database-backed events.

Return type:

None

async get_evaluation_rate(flag_key, window_seconds=60)[source]

Calculate the evaluation rate for a flag.

Parameters:
  • flag_key (str) – The key of the flag to measure.

  • window_seconds (int) – The time window in seconds (default: 60).

Return type:

float

Returns:

Evaluations per second within the time window.

async get_unique_users(flag_key, window_seconds=3600)[source]

Count unique targeting keys for a flag.

Parameters:
  • flag_key (str) – The key of the flag to measure.

  • window_seconds (int) – The time window in seconds (default: 3600).

Return type:

int

Returns:

Count of unique targeting keys within the time window.

async get_variant_distribution(flag_key, window_seconds=3600)[source]

Get the distribution of variants for a flag.

Parameters:
  • flag_key (str) – The key of the flag to measure.

  • window_seconds (int) – The time window in seconds (default: 3600).

Return type:

dict[str, int]

Returns:

Dictionary mapping variant names to evaluation counts.

async get_reason_distribution(flag_key, window_seconds=3600)[source]

Get the distribution of evaluation reasons for a flag.

Parameters:
  • flag_key (str) – The key of the flag to measure.

  • window_seconds (int) – The time window in seconds (default: 3600).

Return type:

dict[str, int]

Returns:

Dictionary mapping reason strings to evaluation counts.

async get_error_rate(flag_key, window_seconds=3600)[source]

Calculate the error rate for a flag.

Parameters:
  • flag_key (str) – The key of the flag to measure.

  • window_seconds (int) – The time window in seconds (default: 3600).

Return type:

float

Returns:

Percentage of evaluations that resulted in errors (0-100).

async get_latency_percentiles(flag_key, percentiles=None)[source]

Calculate latency percentiles for a flag.

Parameters:
  • flag_key (str) – The key of the flag to measure.

  • percentiles (list[float] | None) – List of percentiles to calculate (default: [50, 90, 99]).

Return type:

dict[float, float]

Returns:

Dictionary mapping percentile values to latencies in milliseconds.

async get_flag_metrics(flag_key, window_seconds=3600)[source]

Get all aggregated metrics for a flag.

This is a convenience method that computes all available metrics in a single call.

Parameters:
  • flag_key (str) – The key of the flag to measure.

  • window_seconds (int) – The time window in seconds (default: 3600).

Return type:

FlagMetrics

Returns:

FlagMetrics object containing all computed metrics.

Exporters

PrometheusExporter

class litestar_flags.analytics.PrometheusExporter[source]

Bases: object

Prometheus metrics exporter for feature flag evaluations.

Exposes feature flag metrics in Prometheus format for monitoring and alerting. This exporter implements the AnalyticsCollector protocol, allowing it to receive evaluation events directly.

Metrics exported:
  • feature_flag_evaluations_total: Counter of flag evaluations

    Labels: flag_key, reason, variant

  • feature_flag_evaluation_duration_seconds: Histogram of evaluation times

    Labels: flag_key

  • feature_flag_unique_users: Gauge of unique users per flag

    Labels: flag_key

  • feature_flag_error_rate: Gauge of error rate per flag

    Labels: flag_key

registry

The Prometheus registry to use for metrics.

prefix

Optional prefix for metric names.

Example

>>> exporter = PrometheusExporter()
>>> await exporter.record(evaluation_event)
>>> # Metrics are automatically updated
Parameters:
__init__(registry=None, prefix='', duration_buckets=(0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0))[source]

Initialize the Prometheus exporter.

Parameters:
  • registry (CollectorRegistry | None) – Custom Prometheus registry. If not provided, uses the default global registry.

  • prefix (str) – Optional prefix for metric names (e.g., “myapp” -> “myapp_feature_flag_*”).

  • duration_buckets (tuple[float, ...]) – Custom histogram buckets for duration measurements in seconds.

Raises:

ImportError – If prometheus_client is not installed.

Return type:

None

property registry: CollectorRegistry

Get the Prometheus registry.

Returns:

The Prometheus registry used by this exporter.

property evaluations_counter: Counter

Get the evaluations counter metric.

Returns:

The Counter metric for flag evaluations.

property duration_histogram: Histogram

Get the duration histogram metric.

Returns:

The Histogram metric for evaluation durations.

property unique_users_gauge: Gauge

Get the unique users gauge metric.

Returns:

The Gauge metric for unique users.

property error_rate_gauge: Gauge

Get the error rate gauge metric.

Returns:

The Gauge metric for error rates.

async record(event)[source]

Record a single analytics event.

Updates all Prometheus metrics based on the evaluation event. This method implements the AnalyticsCollector protocol.

Parameters:

event (FlagEvaluationEvent) – The analytics event to record.

Return type:

None

async record_batch(events)[source]

Record multiple analytics events in a batch.

Parameters:

events (list[FlagEvaluationEvent]) – List of analytics events to record.

Return type:

None

async flush()[source]

Flush any buffered data.

For Prometheus, metrics are updated immediately, so this is a no-op. Provided for AnalyticsCollector protocol compliance.

Return type:

None

async close()[source]

Close the exporter and clean up resources.

Clears internal tracking state but does not unregister metrics from the Prometheus registry.

Return type:

None

update_from_metrics(flag_key, metrics)[source]

Update gauge metrics from a metrics object or dictionary.

This method syncs Prometheus gauges with pre-aggregated statistics from a FlagMetrics object or a compatible dictionary.

Parameters:
  • flag_key (str) – The flag key to update metrics for.

  • metrics (MetricsProvider | dict[str, Any]) – A MetricsProvider (like FlagMetrics) or a dictionary with keys: unique_users, error_rate (0-100), total_evaluations.

Return type:

None

Example

>>> from litestar_flags.analytics.aggregator import AnalyticsAggregator
>>> aggregator = AnalyticsAggregator(collector)
>>> metrics = aggregator.get_flag_metrics("feature_a")
>>> exporter.update_from_metrics("feature_a", metrics)
async update_from_aggregator(aggregator, flag_keys, window_seconds=3600)[source]

Update gauge metrics from an analytics aggregator.

This method is useful for syncing Prometheus gauges with pre-aggregated statistics from an AnalyticsAggregator.

Parameters:
  • aggregator (Any) – An AnalyticsAggregator instance with get_flag_metrics or get_flag_metrics_async method.

  • flag_keys (Sequence[str]) – List of flag keys to update metrics for.

  • window_seconds (int) – Time window for metric aggregation (default: 3600).

Return type:

None

Example

>>> from litestar_flags.analytics.aggregator import AnalyticsAggregator
>>> aggregator = AnalyticsAggregator(collector)
>>> await exporter.update_from_aggregator(
...     aggregator=aggregator,
...     flag_keys=["feature_a", "feature_b"],
... )
get_tracked_flag_keys()[source]

Get list of flag keys that have been tracked.

Return type:

list[str]

Returns:

List of flag keys that have recorded evaluations.

async reset_flag_stats(flag_key)[source]

Reset internal tracking stats for a specific flag.

This clears the unique users set and error tracking for the flag. Note: This does not reset Prometheus metrics themselves.

Parameters:

flag_key (str) – The flag key to reset stats for.

Return type:

None

async reset_all_stats()[source]

Reset all internal tracking stats.

Clears all unique user sets and error tracking. Note: This does not reset Prometheus metrics themselves.

Return type:

None

OTelAnalyticsExporter

class litestar_flags.analytics.exporters.otel.OTelAnalyticsExporter[source]

Bases: object

OpenTelemetry exporter for feature flag analytics.

Exports feature flag evaluation events as OpenTelemetry spans and metrics. Implements the AnalyticsCollector protocol for seamless integration with the analytics pipeline.

This exporter can optionally wrap an existing OTelHook instance to share tracer and meter configurations, or create its own OpenTelemetry instruments.

Metrics exported:
  • feature_flag.analytics.events_recorded: Counter of recorded analytics events

    Labels: flag_key, reason

  • feature_flag.analytics.batch_size: Histogram of batch sizes when flushing

    Labels: (none)

tracer

The OpenTelemetry tracer for creating spans.

meter

The OpenTelemetry meter for recording metrics.

batch_size

Maximum number of events to buffer before auto-flush.

flush_interval

Time in seconds between automatic flushes.

Example

>>> exporter = OTelAnalyticsExporter(batch_size=50)
>>> await exporter.record(evaluation_event)
>>> # Events are batched and flushed automatically
>>> await exporter.close()
Parameters:
  • tracer (Tracer | None)

  • meter (Meter | None)

  • otel_hook (OTelHook | None)

  • tracer_name (str)

  • meter_name (str)

  • batch_size (int)

  • flush_interval (float)

  • record_values (bool)

  • create_spans (bool)

__init__(tracer=None, meter=None, otel_hook=None, tracer_name='litestar_flags.analytics', meter_name='litestar_flags.analytics', batch_size=100, flush_interval=30.0, record_values=False, create_spans=True)[source]

Initialize the OpenTelemetry analytics exporter.

Parameters:
  • tracer (Tracer | None) – Custom tracer instance. If not provided, uses otel_hook’s tracer or creates one using tracer_name.

  • meter (Meter | None) – Custom meter instance. If not provided, uses otel_hook’s meter or creates one using meter_name.

  • otel_hook (OTelHook | None) – Existing OTelHook instance to share tracer/meter from. If provided, tracer and meter arguments are ignored.

  • tracer_name (str) – Name for the default tracer if none provided.

  • meter_name (str) – Name for the default meter if none provided.

  • batch_size (int) – Maximum number of events to buffer before auto-flush. Set to 0 to disable batching.

  • flush_interval (float) – Time in seconds between automatic flushes. Set to 0 to disable automatic flushing.

  • record_values (bool) – Whether to record flag values in spans. Disabled by default for privacy/security reasons.

  • create_spans (bool) – Whether to create spans for each event. Set to False to only record metrics without span overhead.

Raises:

ImportError – If opentelemetry-api is not installed.

Return type:

None

property tracer: Tracer

Get the tracer instance.

Returns:

The OpenTelemetry tracer used by this exporter.

property meter: Meter

Get the meter instance.

Returns:

The OpenTelemetry meter used by this exporter.

property events_recorded_counter: Counter

Get the events recorded counter metric.

Returns:

The Counter metric for recorded analytics events.

property batch_size_histogram: Histogram

Get the batch size histogram metric.

Returns:

The Histogram metric for batch sizes.

property buffer_size: int

Get the current number of buffered events.

Returns:

Number of events currently in the buffer.

async record(event)[source]

Record a flag evaluation event.

Buffers the event and flushes when batch_size is reached. This method implements the AnalyticsCollector protocol.

Parameters:

event (FlagEvaluationEvent) – The evaluation event to record.

Return type:

None

async flush()[source]

Flush any buffered events.

Forces immediate processing of all buffered events, creating spans and recording metrics for each event.

Return type:

None

async close()[source]

Close the exporter and release resources.

Flushes remaining events and cancels the background flush task.

Return type:

None

record_sync(event)[source]

Record an event synchronously without batching.

This method processes the event immediately without buffering. Useful for low-volume scenarios or when immediate recording is required.

Parameters:

event (FlagEvaluationEvent) – The evaluation event to record.

Return type:

None

async record_batch(events)[source]

Record multiple events in a batch.

Adds all events to the buffer and triggers a flush.

Parameters:

events (list[FlagEvaluationEvent]) – List of evaluation events to record.

Return type:

None

Helper Functions

litestar_flags.analytics.exporters.otel.create_exporter_from_hook(otel_hook, batch_size=100, flush_interval=30.0, record_values=False, create_spans=True)[source]

Create an OTelAnalyticsExporter from an existing OTelHook.

This factory function creates an analytics exporter that shares the tracer and meter from an existing OTelHook instance, ensuring consistent instrumentation across flag evaluation and analytics.

Parameters:
  • otel_hook (OTelHook) – The OTelHook instance to share tracer/meter from.

  • batch_size (int) – Maximum number of events to buffer before auto-flush.

  • flush_interval (float) – Time in seconds between automatic flushes.

  • record_values (bool) – Whether to record flag values in spans.

  • create_spans (bool) – Whether to create spans for each event.

Return type:

OTelAnalyticsExporter

Returns:

A configured OTelAnalyticsExporter instance.

Raises:

ImportError – If opentelemetry-api is not installed.

Example

>>> from litestar_flags.contrib.otel import OTelHook
>>> hook = OTelHook()
>>> exporter = create_exporter_from_hook(hook)
>>> await exporter.record(event)

Database Models

AnalyticsEventModel

class litestar_flags.analytics.models.AnalyticsEventModel[source]

Bases: UUIDv7AuditBase

SQLAlchemy model for analytics events.

Stores feature flag evaluation events for analysis and reporting. Designed for high-volume writes with appropriate indexes for common queries.

timestamp

When the evaluation occurred.

flag_key

The key of the evaluated flag.

value

The evaluated value stored as JSON.

reason

Why this value was returned.

variant

The variant key if applicable.

targeting_key

The key used for targeting (e.g., user ID).

context_attributes

Additional context attributes as JSON.

evaluation_duration_ms

Evaluation time in milliseconds.

timestamp: Mapped[datetime]
flag_key: Mapped[str]
reason: Mapped[str]
id: UUID

UUID Primary key column.

value: Mapped[dict[str, Any] | None]
variant: Mapped[str | None]
targeting_key: Mapped[str | None]
context_attributes: Mapped[dict[str, Any]]
evaluation_duration_ms: Mapped[float | None]
created_at: datetime | None

Date/time of instance creation.

updated_at: datetime | None

Date/time of instance last update.

__init__(**kwargs)

A simple constructor that allows initialization from kwargs.

Sets attributes on the constructed instance using the names and values in kwargs.

Only keys that are present as attributes of the instance’s class are allowed. These could be, for example, any mapped columns or relationships.

Constants

Availability Flags

litestar_flags.analytics.PROMETHEUS_AVAILABLE

Boolean indicating if prometheus_client is installed and available.

litestar_flags.analytics.exporters.otel.OTEL_AVAILABLE

Boolean indicating if opentelemetry-api is installed and available.

See Also