"""Pre-built workflows for feature flag governance.
This module provides ready-to-use workflow definitions for common
feature flag management scenarios.
"""
from __future__ import annotations
from datetime import timedelta
from typing import TYPE_CHECKING, Any
from litestar_workflows import BaseMachineStep, Edge, TimerStep, WorkflowContext, WorkflowDefinition
from litestar_flags.contrib.workflows.steps import (
ApplyFlagChangeStep,
ManagerApprovalStep,
NotifyStakeholdersStep,
QAValidationStep,
RolloutStep,
ValidateFlagChangeStep,
)
from litestar_flags.contrib.workflows.types import RolloutStage
if TYPE_CHECKING:
from litestar_flags.protocols import StorageBackend
__all__ = [
"FlagApprovalWorkflow",
"ScheduledRolloutWorkflow",
]
class _RejectedStep(BaseMachineStep):
"""Internal step for rejected changes.
This is a terminal step that marks the workflow as rejected
without performing any actions.
"""
def __init__(self) -> None:
"""Initialize rejected step."""
super().__init__(name="rejected", description="Change request was rejected")
async def execute(self, context: WorkflowContext) -> dict[str, Any]:
"""Mark as rejected."""
return {
"rejected": True,
"reason": context.get("manager_comments") or context.get("errors", ["Unknown reason"]),
}
[docs]
class FlagApprovalWorkflow:
"""Workflow for approving feature flag changes.
This workflow implements a standard approval process:
1. Validate the change request
2. Manager approval (human step)
3. QA validation (human step)
4. Apply the change
5. Notify stakeholders
Example:
Registering the workflow::
from litestar_workflows import WorkflowRegistry
from litestar_flags.contrib.workflows import FlagApprovalWorkflow
registry = WorkflowRegistry()
registry.register(FlagApprovalWorkflow)
Starting an approval workflow::
from litestar_flags.contrib.workflows import FlagChangeRequest, ChangeType
request = FlagChangeRequest(
flag_key="new_checkout_flow",
change_type=ChangeType.CREATE,
requested_by="developer@example.com",
reason="Launch new checkout experience",
flag_data={
"name": "New Checkout Flow",
"description": "Enables the redesigned checkout",
"default_enabled": False,
},
)
instance = await engine.start_workflow(
"flag_approval",
initial_data={"request": request.to_dict()},
)
"""
__workflow_name__ = "flag_approval"
__workflow_version__ = "1.0.0"
__workflow_description__ = "Approval workflow for feature flag changes"
[docs]
def __init__(self, storage: StorageBackend | None = None) -> None:
"""Initialize the workflow.
Args:
storage: Storage backend for flag operations.
"""
self._storage = storage
[docs]
@classmethod
def get_definition(
cls,
storage: StorageBackend | None = None,
require_qa: bool = True,
notify_on_complete: bool = True,
) -> WorkflowDefinition:
"""Get the workflow definition.
Args:
storage: Storage backend for flag operations.
require_qa: Whether to require QA validation step.
notify_on_complete: Whether to notify stakeholders on completion.
Returns:
The workflow definition.
"""
steps: dict[str, Any] = {
"validate": ValidateFlagChangeStep(storage=storage),
"manager_approval": ManagerApprovalStep(),
"apply_change": ApplyFlagChangeStep(storage=storage),
"rejected": _RejectedStep(),
}
edges = [
Edge("validate", "manager_approval", condition=lambda ctx: ctx.get("valid", False)),
Edge("validate", "rejected", condition=lambda ctx: not ctx.get("valid", False)),
]
terminal_steps = {"rejected"}
if require_qa:
steps["qa_validation"] = QAValidationStep()
edges.extend(
[
Edge("manager_approval", "qa_validation", condition=lambda ctx: ctx.get("manager_approved", False)),
Edge("manager_approval", "rejected", condition=lambda ctx: not ctx.get("manager_approved", False)),
Edge("qa_validation", "apply_change", condition=lambda ctx: ctx.get("qa_validated", False)),
Edge("qa_validation", "rejected", condition=lambda ctx: not ctx.get("qa_validated", False)),
]
)
else:
edges.extend(
[
Edge("manager_approval", "apply_change", condition=lambda ctx: ctx.get("manager_approved", False)),
Edge("manager_approval", "rejected", condition=lambda ctx: not ctx.get("manager_approved", False)),
]
)
if notify_on_complete:
steps["notify"] = NotifyStakeholdersStep()
edges.append(Edge("apply_change", "notify"))
terminal_steps.add("notify")
else:
terminal_steps.add("apply_change")
return WorkflowDefinition(
name=cls.__workflow_name__,
version=cls.__workflow_version__,
description=cls.__workflow_description__,
steps=steps,
edges=edges,
initial_step="validate",
terminal_steps=terminal_steps,
)
[docs]
class ScheduledRolloutWorkflow:
"""Workflow for gradual feature flag rollouts.
This workflow implements a staged rollout process:
1. Start at 5% (INITIAL stage)
2. Wait, then increase to 25% (EARLY stage)
3. Wait, then increase to 50% (HALF stage)
4. Wait, then increase to 75% (MAJORITY stage)
5. Wait, then increase to 100% (FULL stage)
Between each stage, there's a configurable wait period to
monitor for issues before proceeding.
Example:
Starting a rollout workflow::
instance = await engine.start_workflow(
"scheduled_rollout",
initial_data={
"flag_key": "new_feature",
},
)
"""
__workflow_name__ = "scheduled_rollout"
__workflow_version__ = "1.0.0"
__workflow_description__ = "Gradual rollout workflow with staged percentages"
[docs]
@classmethod
def get_definition(
cls,
storage: StorageBackend | None = None,
stage_delay_minutes: int = 60,
stages: list[RolloutStage] | None = None,
) -> WorkflowDefinition:
"""Get the workflow definition.
Args:
storage: Storage backend for flag operations.
stage_delay_minutes: Minutes to wait between rollout stages.
stages: List of rollout stages to execute. Defaults to all stages.
Returns:
The workflow definition.
"""
if stages is None:
stages = [
RolloutStage.INITIAL,
RolloutStage.EARLY,
RolloutStage.HALF,
RolloutStage.MAJORITY,
RolloutStage.FULL,
]
steps: dict[str, Any] = {}
edges: list[Edge] = []
prev_step = None
for stage in stages:
step_name = f"rollout_{stage.value}"
steps[step_name] = RolloutStep(target_stage=stage, storage=storage)
if prev_step:
# Add wait step between stages
wait_name = f"wait_before_{stage.value}"
steps[wait_name] = TimerStep(
name=wait_name,
duration=timedelta(minutes=stage_delay_minutes),
description=f"Wait before {stage.value} rollout ({stage.percentage}%)",
)
edges.append(Edge(prev_step, wait_name))
edges.append(Edge(wait_name, step_name))
prev_step = step_name
# Add notification at the end
steps["notify_complete"] = NotifyStakeholdersStep(
notification_channels=["email", "slack"],
)
if prev_step:
edges.append(Edge(prev_step, "notify_complete"))
return WorkflowDefinition(
name=cls.__workflow_name__,
version=cls.__workflow_version__,
description=cls.__workflow_description__,
steps=steps,
edges=edges,
initial_step=f"rollout_{stages[0].value}",
terminal_steps={"notify_complete"},
)