AS
AgSkills.dev
MARKETPLACE

microservices-patterns

Design microservices architectures with service boundaries, event-driven communication, and resilience patterns. Use when building distributed systems, decomposing monoliths, or implementing microservices.

29.3k
3.2k

Preview

SKILL.md
name
microservices-patterns
description
Design microservices architectures with service boundaries, event-driven communication, and resilience patterns. Use when building distributed systems, decomposing monoliths, or implementing microservices.

Microservices Patterns

Master microservices architecture patterns including service boundaries, inter-service communication, data management, and resilience patterns for building distributed systems.

When to Use This Skill

  • Decomposing monoliths into microservices
  • Designing service boundaries and contracts
  • Implementing inter-service communication
  • Managing distributed data and transactions
  • Building resilient distributed systems
  • Implementing service discovery and load balancing
  • Designing event-driven architectures

Core Concepts

1. Service Decomposition Strategies

By Business Capability

  • Organize services around business functions
  • Each service owns its domain
  • Example: OrderService, PaymentService, InventoryService

By Subdomain (DDD)

  • Core domain, supporting subdomains
  • Bounded contexts map to services
  • Clear ownership and responsibility

Strangler Fig Pattern

  • Gradually extract from monolith
  • New functionality as microservices
  • Proxy routes to old/new systems

2. Communication Patterns

Synchronous (Request/Response)

  • REST APIs
  • gRPC
  • GraphQL

Asynchronous (Events/Messages)

  • Event streaming (Kafka)
  • Message queues (RabbitMQ, SQS)
  • Pub/Sub patterns

3. Data Management

Database Per Service

  • Each service owns its data
  • No shared databases
  • Loose coupling

Saga Pattern

  • Distributed transactions
  • Compensating actions
  • Eventual consistency

4. Resilience Patterns

Circuit Breaker

  • Fail fast on repeated errors
  • Prevent cascade failures

Retry with Backoff

  • Transient fault handling
  • Exponential backoff

Bulkhead

  • Isolate resources
  • Limit impact of failures

Service Decomposition Patterns

Pattern 1: By Business Capability

# E-commerce example # Order Service class OrderService: """Handles order lifecycle.""" async def create_order(self, order_data: dict) -> Order: order = Order.create(order_data) # Publish event for other services await self.event_bus.publish( OrderCreatedEvent( order_id=order.id, customer_id=order.customer_id, items=order.items, total=order.total ) ) return order # Payment Service (separate service) class PaymentService: """Handles payment processing.""" async def process_payment(self, payment_request: PaymentRequest) -> PaymentResult: # Process payment result = await self.payment_gateway.charge( amount=payment_request.amount, customer=payment_request.customer_id ) if result.success: await self.event_bus.publish( PaymentCompletedEvent( order_id=payment_request.order_id, transaction_id=result.transaction_id ) ) return result # Inventory Service (separate service) class InventoryService: """Handles inventory management.""" async def reserve_items(self, order_id: str, items: List[OrderItem]) -> ReservationResult: # Check availability for item in items: available = await self.inventory_repo.get_available(item.product_id) if available < item.quantity: return ReservationResult( success=False, error=f"Insufficient inventory for {item.product_id}" ) # Reserve items reservation = await self.create_reservation(order_id, items) await self.event_bus.publish( InventoryReservedEvent( order_id=order_id, reservation_id=reservation.id ) ) return ReservationResult(success=True, reservation=reservation)

Pattern 2: API Gateway

from fastapi import FastAPI, HTTPException, Depends import httpx from circuitbreaker import circuit app = FastAPI() class APIGateway: """Central entry point for all client requests.""" def __init__(self): self.order_service_url = "http://order-service:8000" self.payment_service_url = "http://payment-service:8001" self.inventory_service_url = "http://inventory-service:8002" self.http_client = httpx.AsyncClient(timeout=5.0) @circuit(failure_threshold=5, recovery_timeout=30) async def call_order_service(self, path: str, method: str = "GET", **kwargs): """Call order service with circuit breaker.""" response = await self.http_client.request( method, f"{self.order_service_url}{path}", **kwargs ) response.raise_for_status() return response.json() async def create_order_aggregate(self, order_id: str) -> dict: """Aggregate data from multiple services.""" # Parallel requests order, payment, inventory = await asyncio.gather( self.call_order_service(f"/orders/{order_id}"), self.call_payment_service(f"/payments/order/{order_id}"), self.call_inventory_service(f"/reservations/order/{order_id}"), return_exceptions=True ) # Handle partial failures result = {"order": order} if not isinstance(payment, Exception): result["payment"] = payment if not isinstance(inventory, Exception): result["inventory"] = inventory return result @app.post("/api/orders") async def create_order( order_data: dict, gateway: APIGateway = Depends() ): """API Gateway endpoint.""" try: # Route to order service order = await gateway.call_order_service( "/orders", method="POST", json=order_data ) return {"order": order} except httpx.HTTPError as e: raise HTTPException(status_code=503, detail="Order service unavailable")

Communication Patterns

Pattern 1: Synchronous REST Communication

# Service A calls Service B import httpx from tenacity import retry, stop_after_attempt, wait_exponential class ServiceClient: """HTTP client with retries and timeout.""" def __init__(self, base_url: str): self.base_url = base_url self.client = httpx.AsyncClient( timeout=httpx.Timeout(5.0, connect=2.0), limits=httpx.Limits(max_keepalive_connections=20) ) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10) ) async def get(self, path: str, **kwargs): """GET with automatic retries.""" response = await self.client.get(f"{self.base_url}{path}", **kwargs) response.raise_for_status() return response.json() async def post(self, path: str, **kwargs): """POST request.""" response = await self.client.post(f"{self.base_url}{path}", **kwargs) response.raise_for_status() return response.json() # Usage payment_client = ServiceClient("http://payment-service:8001") result = await payment_client.post("/payments", json=payment_data)

Pattern 2: Asynchronous Event-Driven

# Event-driven communication with Kafka from aiokafka import AIOKafkaProducer, AIOKafkaConsumer import json from dataclasses import dataclass, asdict from datetime import datetime @dataclass class DomainEvent: event_id: str event_type: str aggregate_id: str occurred_at: datetime data: dict class EventBus: """Event publishing and subscription.""" def __init__(self, bootstrap_servers: List[str]): self.bootstrap_servers = bootstrap_servers self.producer = None async def start(self): self.producer = AIOKafkaProducer( bootstrap_servers=self.bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode() ) await self.producer.start() async def publish(self, event: DomainEvent): """Publish event to Kafka topic.""" topic = event.event_type await self.producer.send_and_wait( topic, value=asdict(event), key=event.aggregate_id.encode() ) async def subscribe(self, topic: str, handler: callable): """Subscribe to events.""" consumer = AIOKafkaConsumer( topic, bootstrap_servers=self.bootstrap_servers, value_deserializer=lambda v: json.loads(v.decode()), group_id="my-service" ) await consumer.start() try: async for message in consumer: event_data = message.value await handler(event_data) finally: await consumer.stop() # Order Service publishes event async def create_order(order_data: dict): order = await save_order(order_data) event = DomainEvent( event_id=str(uuid.uuid4()), event_type="OrderCreated", aggregate_id=order.id, occurred_at=datetime.now(), data={ "order_id": order.id, "customer_id": order.customer_id, "total": order.total } ) await event_bus.publish(event) # Inventory Service listens for OrderCreated async def handle_order_created(event_data: dict): """React to order creation.""" order_id = event_data["data"]["order_id"] items = event_data["data"]["items"] # Reserve inventory await reserve_inventory(order_id, items)

Pattern 3: Saga Pattern (Distributed Transactions)

# Saga orchestration for order fulfillment from enum import Enum from typing import List, Callable class SagaStep: """Single step in saga.""" def __init__( self, name: str, action: Callable, compensation: Callable ): self.name = name self.action = action self.compensation = compensation class SagaStatus(Enum): PENDING = "pending" COMPLETED = "completed" COMPENSATING = "compensating" FAILED = "failed" class OrderFulfillmentSaga: """Orchestrated saga for order fulfillment.""" def __init__(self): self.steps: List[SagaStep] = [ SagaStep( "create_order", action=self.create_order, compensation=self.cancel_order ), SagaStep( "reserve_inventory", action=self.reserve_inventory, compensation=self.release_inventory ), SagaStep( "process_payment", action=self.process_payment, compensation=self.refund_payment ), SagaStep( "confirm_order", action=self.confirm_order, compensation=self.cancel_order_confirmation ) ] async def execute(self, order_data: dict) -> SagaResult: """Execute saga steps.""" completed_steps = [] context = {"order_data": order_data} try: for step in self.steps: # Execute step result = await step.action(context) if not result.success: # Compensate await self.compensate(completed_steps, context) return SagaResult( status=SagaStatus.FAILED, error=result.error ) completed_steps.append(step) context.update(result.data) return SagaResult(status=SagaStatus.COMPLETED, data=context) except Exception as e: # Compensate on error await self.compensate(completed_steps, context) return SagaResult(status=SagaStatus.FAILED, error=str(e)) async def compensate(self, completed_steps: List[SagaStep], context: dict): """Execute compensating actions in reverse order.""" for step in reversed(completed_steps): try: await step.compensation(context) except Exception as e: # Log compensation failure print(f"Compensation failed for {step.name}: {e}") # Step implementations async def create_order(self, context: dict) -> StepResult: order = await order_service.create(context["order_data"]) return StepResult(success=True, data={"order_id": order.id}) async def cancel_order(self, context: dict): await order_service.cancel(context["order_id"]) async def reserve_inventory(self, context: dict) -> StepResult: result = await inventory_service.reserve( context["order_id"], context["order_data"]["items"] ) return StepResult( success=result.success, data={"reservation_id": result.reservation_id} ) async def release_inventory(self, context: dict): await inventory_service.release(context["reservation_id"]) async def process_payment(self, context: dict) -> StepResult: result = await payment_service.charge( context["order_id"], context["order_data"]["total"] ) return StepResult( success=result.success, data={"transaction_id": result.transaction_id}, error=result.error ) async def refund_payment(self, context: dict): await payment_service.refund(context["transaction_id"])

Resilience Patterns

Circuit Breaker Pattern

from enum import Enum from datetime import datetime, timedelta from typing import Callable, Any class CircuitState(Enum): CLOSED = "closed" # Normal operation OPEN = "open" # Failing, reject requests HALF_OPEN = "half_open" # Testing if recovered class CircuitBreaker: """Circuit breaker for service calls.""" def __init__( self, failure_threshold: int = 5, recovery_timeout: int = 30, success_threshold: int = 2 ): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.success_threshold = success_threshold self.failure_count = 0 self.success_count = 0 self.state = CircuitState.CLOSED self.opened_at = None async def call(self, func: Callable, *args, **kwargs) -> Any: """Execute function with circuit breaker.""" if self.state == CircuitState.OPEN: if self._should_attempt_reset(): self.state = CircuitState.HALF_OPEN else: raise CircuitBreakerOpenError("Circuit breaker is open") try: result = await func(*args, **kwargs) self._on_success() return result except Exception as e: self._on_failure() raise def _on_success(self): """Handle successful call.""" self.failure_count = 0 if self.state == CircuitState.HALF_OPEN: self.success_count += 1 if self.success_count >= self.success_threshold: self.state = CircuitState.CLOSED self.success_count = 0 def _on_failure(self): """Handle failed call.""" self.failure_count += 1 if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN self.opened_at = datetime.now() if self.state == CircuitState.HALF_OPEN: self.state = CircuitState.OPEN self.opened_at = datetime.now() def _should_attempt_reset(self) -> bool: """Check if enough time passed to try again.""" return ( datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout) ) # Usage breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30) async def call_payment_service(payment_data: dict): return await breaker.call( payment_client.process_payment, payment_data )

Resources

  • references/service-decomposition-guide.md: Breaking down monoliths
  • references/communication-patterns.md: Sync vs async patterns
  • references/saga-implementation.md: Distributed transactions
  • assets/circuit-breaker.py: Production circuit breaker
  • assets/event-bus-template.py: Kafka event bus implementation
  • assets/api-gateway-template.py: Complete API gateway

Best Practices

  1. Service Boundaries: Align with business capabilities
  2. Database Per Service: No shared databases
  3. API Contracts: Versioned, backward compatible
  4. Async When Possible: Events over direct calls
  5. Circuit Breakers: Fail fast on service failures
  6. Distributed Tracing: Track requests across services
  7. Service Registry: Dynamic service discovery
  8. Health Checks: Liveness and readiness probes

Common Pitfalls

  • Distributed Monolith: Tightly coupled services
  • Chatty Services: Too many inter-service calls
  • Shared Databases: Tight coupling through data
  • No Circuit Breakers: Cascade failures
  • Synchronous Everything: Tight coupling, poor resilience
  • Premature Microservices: Starting with microservices
  • Ignoring Network Failures: Assuming reliable network
  • No Compensation Logic: Can't undo failed transactions
GitHub Repository
wshobson/agents
Stars
29,322
Forks
3,213
Open Repository
Install Skill
Download ZIP1 files