AS
AgSkills.dev
MARKETPLACE

saga-orchestration

Implement saga patterns for distributed transactions and cross-aggregate workflows. Use when coordinating multi-step business processes, handling compensating transactions, or managing long-running workflows.

29.3k
3.2k

Preview

SKILL.md
name
saga-orchestration
description
Implement saga patterns for distributed transactions and cross-aggregate workflows. Use when coordinating multi-step business processes, handling compensating transactions, or managing long-running workflows.

Saga Orchestration

Patterns for managing distributed transactions and long-running business processes.

When to Use This Skill

  • Coordinating multi-service transactions
  • Implementing compensating transactions
  • Managing long-running business workflows
  • Handling failures in distributed systems
  • Building order fulfillment processes
  • Implementing approval workflows

Core Concepts

1. Saga Types

Choreography                    Orchestration
β”Œβ”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚Svc A│─►│Svc B│─►│Svc Cβ”‚     β”‚ Orchestratorβ”‚
β””β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
   β”‚        β”‚        β”‚               β”‚
   β–Ό        β–Ό        β–Ό         β”Œβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”
 Event    Event    Event       β–Ό     β–Ό     β–Ό
                            β”Œβ”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”
                            β”‚Svc1β”‚β”‚Svc2β”‚β”‚Svc3β”‚
                            β””β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”˜

2. Saga Execution States

StateDescription
StartedSaga initiated
PendingWaiting for step completion
CompensatingRolling back due to failure
CompletedAll steps succeeded
FailedSaga failed after compensation

Templates

Template 1: Saga Orchestrator Base

from abc import ABC, abstractmethod from dataclasses import dataclass, field from enum import Enum from typing import List, Dict, Any, Optional from datetime import datetime import uuid class SagaState(Enum): STARTED = "started" PENDING = "pending" COMPENSATING = "compensating" COMPLETED = "completed" FAILED = "failed" @dataclass class SagaStep: name: str action: str compensation: str status: str = "pending" result: Optional[Dict] = None error: Optional[str] = None executed_at: Optional[datetime] = None compensated_at: Optional[datetime] = None @dataclass class Saga: saga_id: str saga_type: str state: SagaState data: Dict[str, Any] steps: List[SagaStep] current_step: int = 0 created_at: datetime = field(default_factory=datetime.utcnow) updated_at: datetime = field(default_factory=datetime.utcnow) class SagaOrchestrator(ABC): """Base class for saga orchestrators.""" def __init__(self, saga_store, event_publisher): self.saga_store = saga_store self.event_publisher = event_publisher @abstractmethod def define_steps(self, data: Dict) -> List[SagaStep]: """Define the saga steps.""" pass @property @abstractmethod def saga_type(self) -> str: """Unique saga type identifier.""" pass async def start(self, data: Dict) -> Saga: """Start a new saga.""" saga = Saga( saga_id=str(uuid.uuid4()), saga_type=self.saga_type, state=SagaState.STARTED, data=data, steps=self.define_steps(data) ) await self.saga_store.save(saga) await self._execute_next_step(saga) return saga async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict): """Handle successful step completion.""" saga = await self.saga_store.get(saga_id) # Update step for step in saga.steps: if step.name == step_name: step.status = "completed" step.result = result step.executed_at = datetime.utcnow() break saga.current_step += 1 saga.updated_at = datetime.utcnow() # Check if saga is complete if saga.current_step >= len(saga.steps): saga.state = SagaState.COMPLETED await self.saga_store.save(saga) await self._on_saga_completed(saga) else: saga.state = SagaState.PENDING await self.saga_store.save(saga) await self._execute_next_step(saga) async def handle_step_failed(self, saga_id: str, step_name: str, error: str): """Handle step failure - start compensation.""" saga = await self.saga_store.get(saga_id) # Mark step as failed for step in saga.steps: if step.name == step_name: step.status = "failed" step.error = error break saga.state = SagaState.COMPENSATING saga.updated_at = datetime.utcnow() await self.saga_store.save(saga) # Start compensation from current step backwards await self._compensate(saga) async def _execute_next_step(self, saga: Saga): """Execute the next step in the saga.""" if saga.current_step >= len(saga.steps): return step = saga.steps[saga.current_step] step.status = "executing" await self.saga_store.save(saga) # Publish command to execute step await self.event_publisher.publish( step.action, { "saga_id": saga.saga_id, "step_name": step.name, **saga.data } ) async def _compensate(self, saga: Saga): """Execute compensation for completed steps.""" # Compensate in reverse order for i in range(saga.current_step - 1, -1, -1): step = saga.steps[i] if step.status == "completed": step.status = "compensating" await self.saga_store.save(saga) await self.event_publisher.publish( step.compensation, { "saga_id": saga.saga_id, "step_name": step.name, "original_result": step.result, **saga.data } ) async def handle_compensation_completed(self, saga_id: str, step_name: str): """Handle compensation completion.""" saga = await self.saga_store.get(saga_id) for step in saga.steps: if step.name == step_name: step.status = "compensated" step.compensated_at = datetime.utcnow() break # Check if all compensations complete all_compensated = all( s.status in ("compensated", "pending", "failed") for s in saga.steps ) if all_compensated: saga.state = SagaState.FAILED await self._on_saga_failed(saga) await self.saga_store.save(saga) async def _on_saga_completed(self, saga: Saga): """Called when saga completes successfully.""" await self.event_publisher.publish( f"{self.saga_type}Completed", {"saga_id": saga.saga_id, **saga.data} ) async def _on_saga_failed(self, saga: Saga): """Called when saga fails after compensation.""" await self.event_publisher.publish( f"{self.saga_type}Failed", {"saga_id": saga.saga_id, "error": "Saga failed", **saga.data} )

Template 2: Order Fulfillment Saga

class OrderFulfillmentSaga(SagaOrchestrator): """Orchestrates order fulfillment across services.""" @property def saga_type(self) -> str: return "OrderFulfillment" def define_steps(self, data: Dict) -> List[SagaStep]: return [ SagaStep( name="reserve_inventory", action="InventoryService.ReserveItems", compensation="InventoryService.ReleaseReservation" ), SagaStep( name="process_payment", action="PaymentService.ProcessPayment", compensation="PaymentService.RefundPayment" ), SagaStep( name="create_shipment", action="ShippingService.CreateShipment", compensation="ShippingService.CancelShipment" ), SagaStep( name="send_confirmation", action="NotificationService.SendOrderConfirmation", compensation="NotificationService.SendCancellationNotice" ) ] # Usage async def create_order(order_data: Dict): saga = OrderFulfillmentSaga(saga_store, event_publisher) return await saga.start({ "order_id": order_data["order_id"], "customer_id": order_data["customer_id"], "items": order_data["items"], "payment_method": order_data["payment_method"], "shipping_address": order_data["shipping_address"] }) # Event handlers in each service class InventoryService: async def handle_reserve_items(self, command: Dict): try: # Reserve inventory reservation = await self.reserve( command["items"], command["order_id"] ) # Report success await self.event_publisher.publish( "SagaStepCompleted", { "saga_id": command["saga_id"], "step_name": "reserve_inventory", "result": {"reservation_id": reservation.id} } ) except InsufficientInventoryError as e: await self.event_publisher.publish( "SagaStepFailed", { "saga_id": command["saga_id"], "step_name": "reserve_inventory", "error": str(e) } ) async def handle_release_reservation(self, command: Dict): # Compensating action await self.release_reservation( command["original_result"]["reservation_id"] ) await self.event_publisher.publish( "SagaCompensationCompleted", { "saga_id": command["saga_id"], "step_name": "reserve_inventory" } )

Template 3: Choreography-Based Saga

from dataclasses import dataclass from typing import Dict, Any import asyncio @dataclass class SagaContext: """Passed through choreographed saga events.""" saga_id: str step: int data: Dict[str, Any] completed_steps: list class OrderChoreographySaga: """Choreography-based saga using events.""" def __init__(self, event_bus): self.event_bus = event_bus self._register_handlers() def _register_handlers(self): self.event_bus.subscribe("OrderCreated", self._on_order_created) self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved) self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed) self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created) # Compensation handlers self.event_bus.subscribe("PaymentFailed", self._on_payment_failed) self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed) async def _on_order_created(self, event: Dict): """Step 1: Order created, reserve inventory.""" await self.event_bus.publish("ReserveInventory", { "saga_id": event["order_id"], "order_id": event["order_id"], "items": event["items"] }) async def _on_inventory_reserved(self, event: Dict): """Step 2: Inventory reserved, process payment.""" await self.event_bus.publish("ProcessPayment", { "saga_id": event["saga_id"], "order_id": event["order_id"], "amount": event["total_amount"], "reservation_id": event["reservation_id"] }) async def _on_payment_processed(self, event: Dict): """Step 3: Payment done, create shipment.""" await self.event_bus.publish("CreateShipment", { "saga_id": event["saga_id"], "order_id": event["order_id"], "payment_id": event["payment_id"] }) async def _on_shipment_created(self, event: Dict): """Step 4: Complete - send confirmation.""" await self.event_bus.publish("OrderFulfilled", { "saga_id": event["saga_id"], "order_id": event["order_id"], "tracking_number": event["tracking_number"] }) # Compensation handlers async def _on_payment_failed(self, event: Dict): """Payment failed - release inventory.""" await self.event_bus.publish("ReleaseInventory", { "saga_id": event["saga_id"], "reservation_id": event["reservation_id"] }) await self.event_bus.publish("OrderFailed", { "order_id": event["order_id"], "reason": "Payment failed" }) async def _on_shipment_failed(self, event: Dict): """Shipment failed - refund payment and release inventory.""" await self.event_bus.publish("RefundPayment", { "saga_id": event["saga_id"], "payment_id": event["payment_id"] }) await self.event_bus.publish("ReleaseInventory", { "saga_id": event["saga_id"], "reservation_id": event["reservation_id"] })

Template 4: Saga with Timeouts

class TimeoutSagaOrchestrator(SagaOrchestrator): """Saga orchestrator with step timeouts.""" def __init__(self, saga_store, event_publisher, scheduler): super().__init__(saga_store, event_publisher) self.scheduler = scheduler async def _execute_next_step(self, saga: Saga): if saga.current_step >= len(saga.steps): return step = saga.steps[saga.current_step] step.status = "executing" step.timeout_at = datetime.utcnow() + timedelta(minutes=5) await self.saga_store.save(saga) # Schedule timeout check await self.scheduler.schedule( f"saga_timeout_{saga.saga_id}_{step.name}", self._check_timeout, {"saga_id": saga.saga_id, "step_name": step.name}, run_at=step.timeout_at ) await self.event_publisher.publish( step.action, {"saga_id": saga.saga_id, "step_name": step.name, **saga.data} ) async def _check_timeout(self, data: Dict): """Check if step has timed out.""" saga = await self.saga_store.get(data["saga_id"]) step = next(s for s in saga.steps if s.name == data["step_name"]) if step.status == "executing": # Step timed out - fail it await self.handle_step_failed( data["saga_id"], data["step_name"], "Step timed out" )

Best Practices

Do's

  • Make steps idempotent - Safe to retry
  • Design compensations carefully - They must work
  • Use correlation IDs - For tracing across services
  • Implement timeouts - Don't wait forever
  • Log everything - For debugging failures

Don'ts

  • Don't assume instant completion - Sagas take time
  • Don't skip compensation testing - Most critical part
  • Don't couple services - Use async messaging
  • Don't ignore partial failures - Handle gracefully

Resources

GitHub Repository
wshobson/agents
Stars
29,322
Forks
3,213
Open Repository
Install Skill
Download ZIP1 files