Source code for litestar_flags.analytics.protocols

"""Protocols and interfaces for analytics collectors."""

from __future__ import annotations

from typing import TYPE_CHECKING, Protocol, runtime_checkable

if TYPE_CHECKING:
    from litestar_flags.analytics.models import FlagEvaluationEvent

__all__ = ["AnalyticsCollector"]


[docs] @runtime_checkable class AnalyticsCollector(Protocol): """Protocol for analytics collectors. All analytics collector implementations must implement this protocol. Methods are async to support both sync and async backends. Implementations: - InMemoryAnalyticsCollector: In-memory storage for development/testing - (Future) DatadogAnalyticsCollector: Datadog integration - (Future) PrometheusAnalyticsCollector: Prometheus metrics Example: >>> class MyCollector: ... async def record(self, event: FlagEvaluationEvent) -> None: ... # Store or process the event ... pass ... ... async def flush(self) -> None: ... # Flush any buffered events ... pass ... ... async def close(self) -> None: ... # Clean up resources ... pass >>> isinstance(MyCollector(), AnalyticsCollector) True """
[docs] async def record(self, event: FlagEvaluationEvent) -> None: """Record a flag evaluation event. This method should be fast and non-blocking. Implementations may buffer events and flush them asynchronously to avoid impacting flag evaluation latency. Args: event: The evaluation event to record. """ ...
[docs] async def flush(self) -> None: """Flush any buffered events. Forces immediate processing of any buffered events. This is useful for graceful shutdown or when events need to be persisted immediately. """ ...
[docs] async def close(self) -> None: """Close the collector and release any resources. This method should flush any remaining events and clean up resources such as connections or background tasks. """ ...