Source code for litestar_flags.analytics.collectors.database

"""Database analytics collector using SQLAlchemy async sessions."""

from __future__ import annotations

import asyncio
import logging
from typing import TYPE_CHECKING, Any

from litestar_flags.models.base import HAS_ADVANCED_ALCHEMY

if not HAS_ADVANCED_ALCHEMY:
    raise ImportError(
        "Database analytics collector requires 'advanced-alchemy'. Install with: pip install litestar-flags[database]"
    )

from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine

from litestar_flags.analytics.models import AnalyticsEventModel

if TYPE_CHECKING:
    from sqlalchemy.ext.asyncio import AsyncEngine

    from litestar_flags.analytics.models import FlagEvaluationEvent

__all__ = ["DatabaseAnalyticsCollector"]

logger = logging.getLogger(__name__)


[docs] class DatabaseAnalyticsCollector: """Database analytics collector with batch writes. Buffers evaluation events in memory and periodically flushes them to the database in batches for optimal write performance. Uses SQLAlchemy async sessions for non-blocking database operations. Features: - Configurable batch size and flush interval - Background task for periodic flushing - Thread-safe event buffering with asyncio.Lock - Automatic flush on close for data integrity - Proper cleanup of database connections Attributes: batch_size: Maximum events to buffer before auto-flush. flush_interval_seconds: Time between automatic flushes. Example: >>> collector = await DatabaseAnalyticsCollector.create( ... connection_string="postgresql+asyncpg://user:pass@localhost/db", ... batch_size=100, ... flush_interval_seconds=5.0, ... ) >>> try: ... await collector.record(event) ... finally: ... await collector.close() """
[docs] def __init__( self, engine: AsyncEngine, session_maker: async_sessionmaker[AsyncSession], batch_size: int = 100, flush_interval_seconds: float = 5.0, ) -> None: """Initialize the database analytics collector. Args: engine: The SQLAlchemy async engine. session_maker: The session maker factory. batch_size: Maximum events to buffer before auto-flush. Defaults to 100. flush_interval_seconds: Seconds between automatic flushes. Defaults to 5.0. """ self._engine = engine self._session_maker = session_maker self._batch_size = batch_size self._flush_interval_seconds = flush_interval_seconds self._buffer: list[FlagEvaluationEvent] = [] self._lock = asyncio.Lock() self._flush_task: asyncio.Task[None] | None = None self._closed = False
[docs] @classmethod async def create( cls, connection_string: str, batch_size: int = 100, flush_interval_seconds: float = 5.0, create_tables: bool = True, **engine_kwargs: Any, ) -> DatabaseAnalyticsCollector: """Create a new database analytics collector. Factory method that sets up the database connection and optionally creates the analytics_events table. Args: connection_string: Database connection string (SQLAlchemy format). batch_size: Maximum events to buffer before auto-flush. Defaults to 100. flush_interval_seconds: Seconds between automatic flushes. Defaults to 5.0. create_tables: Whether to create tables on startup. Defaults to True. **engine_kwargs: Additional arguments for create_async_engine. Returns: Configured DatabaseAnalyticsCollector instance with background flush task running. Example: >>> collector = await DatabaseAnalyticsCollector.create( ... connection_string="sqlite+aiosqlite:///analytics.db", ... batch_size=50, ... flush_interval_seconds=10.0, ... ) """ engine = create_async_engine( connection_string, echo=engine_kwargs.pop("echo", False), **engine_kwargs, ) if create_tables: async with engine.begin() as conn: from advanced_alchemy.base import orm_registry # Import to register the model from litestar_flags.analytics.models import AnalyticsEventModel _ = AnalyticsEventModel # Ensure model is registered await conn.run_sync(orm_registry.metadata.create_all) session_maker = async_sessionmaker(engine, expire_on_commit=False) collector = cls( engine=engine, session_maker=session_maker, batch_size=batch_size, flush_interval_seconds=flush_interval_seconds, ) # Start the background flush task collector._start_flush_task() return collector
@property def batch_size(self) -> int: """Return the configured batch size. Returns: The maximum number of events to buffer before auto-flush. """ return self._batch_size @property def flush_interval_seconds(self) -> float: """Return the configured flush interval. Returns: The time in seconds between automatic flushes. """ return self._flush_interval_seconds def _start_flush_task(self) -> None: """Start the background flush task.""" if self._flush_task is None or self._flush_task.done(): self._flush_task = asyncio.create_task(self._periodic_flush()) async def _periodic_flush(self) -> None: """Periodically flush buffered events to the database. Runs in a background task and flushes events at the configured interval. Continues running until the collector is closed. """ while not self._closed: try: await asyncio.sleep(self._flush_interval_seconds) if not self._closed: await self.flush() except asyncio.CancelledError: # Task was cancelled, exit gracefully break except Exception: logger.exception("Error during periodic flush")
[docs] async def record(self, event: FlagEvaluationEvent) -> None: """Record a flag evaluation event. Buffers the event in memory. If the buffer reaches batch_size, automatically triggers a flush to the database. Thread-safe method using asyncio.Lock. Args: event: The evaluation event to record. Raises: RuntimeError: If the collector has been closed. """ if self._closed: raise RuntimeError("Cannot record to a closed collector") async with self._lock: self._buffer.append(event) should_flush = len(self._buffer) >= self._batch_size if should_flush: await self.flush()
[docs] async def flush(self) -> None: """Flush buffered events to the database. Writes all buffered events to the database in a single transaction. This method is thread-safe and can be called concurrently with record(). If there are no buffered events, this is a no-op. """ async with self._lock: if not self._buffer: return events_to_flush = self._buffer.copy() self._buffer.clear() if not events_to_flush: return try: async with self._session_maker() as session: models = [self._event_to_model(event) for event in events_to_flush] session.add_all(models) await session.commit() logger.debug("Flushed %d analytics events to database", len(models)) except Exception: # On failure, re-add events to buffer for retry async with self._lock: self._buffer = events_to_flush + self._buffer logger.exception("Failed to flush analytics events to database") raise
def _event_to_model(self, event: FlagEvaluationEvent) -> AnalyticsEventModel: """Convert a FlagEvaluationEvent to an AnalyticsEventModel. Args: event: The evaluation event to convert. Returns: SQLAlchemy model ready for database insertion. """ # Convert the value to a JSON-serializable format value: dict[str, Any] | None = None if event.value is not None: if isinstance(event.value, dict): value = event.value else: value = {"value": event.value} return AnalyticsEventModel( timestamp=event.timestamp, flag_key=event.flag_key, value=value, reason=event.reason.value if hasattr(event.reason, "value") else str(event.reason), variant=event.variant, targeting_key=event.targeting_key, context_attributes=event.context_attributes, evaluation_duration_ms=event.evaluation_duration_ms, )
[docs] async def close(self) -> None: """Close the collector and release resources. Flushes any remaining buffered events, cancels the background flush task, and disposes of the database engine connection pool. This method is idempotent - calling it multiple times is safe. """ if self._closed: return self._closed = True # Cancel the periodic flush task if self._flush_task is not None and not self._flush_task.done(): self._flush_task.cancel() try: await self._flush_task except asyncio.CancelledError: pass # Flush any remaining events try: # Temporarily unset closed flag to allow flush self._closed = False async with self._lock: if self._buffer: events_to_flush = self._buffer.copy() self._buffer.clear() else: events_to_flush = [] if events_to_flush: async with self._session_maker() as session: models = [self._event_to_model(event) for event in events_to_flush] session.add_all(models) await session.commit() logger.debug("Flushed %d remaining analytics events on close", len(models)) except Exception: logger.exception("Failed to flush remaining events on close") finally: self._closed = True # Dispose of the engine await self._engine.dispose() logger.debug("Database analytics collector closed")
[docs] async def get_buffer_size(self) -> int: """Get the current number of buffered events. Returns: The number of events currently in the buffer. """ async with self._lock: return len(self._buffer)
[docs] async def health_check(self) -> bool: """Check if the database connection is healthy. Executes a simple query to verify connectivity. Returns: True if the database is reachable, False otherwise. """ try: from sqlalchemy import select async with self._session_maker() as session: await session.execute(select(1)) return True except Exception: return False