AS
AgSkills.dev
MARKETPLACE

projection-patterns

Build read models and projections from event streams. Use when implementing CQRS read sides, building materialized views, or optimizing query performance in event-sourced systems.

28.2k
3.1k

Preview

SKILL.md
name
projection-patterns
description
Build read models and projections from event streams. Use when implementing CQRS read sides, building materialized views, or optimizing query performance in event-sourced systems.

Projection Patterns

Comprehensive guide to building projections and read models for event-sourced systems.

When to Use This Skill

  • Building CQRS read models
  • Creating materialized views from events
  • Optimizing query performance
  • Implementing real-time dashboards
  • Building search indexes from events
  • Aggregating data across streams

Core Concepts

1. Projection Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Event Store │────►│ Projector   │────►│ Read Model  β”‚
β”‚             β”‚     β”‚             β”‚     β”‚ (Database)  β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚     β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚     β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Events  β”‚ β”‚     β”‚ β”‚ Handler β”‚ β”‚     β”‚ β”‚ Tables  β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚     β”‚ β”‚ Logic   β”‚ β”‚     β”‚ β”‚ Views   β”‚ β”‚
β”‚             β”‚     β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚     β”‚ β”‚ Cache   β”‚ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

2. Projection Types

TypeDescriptionUse Case
LiveReal-time from subscriptionCurrent state queries
CatchupProcess historical eventsRebuilding read models
PersistentStores checkpointResume after restart
InlineSame transaction as writeStrong consistency

Templates

Template 1: Basic Projector

from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Dict, Any, Callable, List import asyncpg @dataclass class Event: stream_id: str event_type: str data: dict version: int global_position: int class Projection(ABC): """Base class for projections.""" @property @abstractmethod def name(self) -> str: """Unique projection name for checkpointing.""" pass @abstractmethod def handles(self) -> List[str]: """List of event types this projection handles.""" pass @abstractmethod async def apply(self, event: Event) -> None: """Apply event to the read model.""" pass class Projector: """Runs projections from event store.""" def __init__(self, event_store, checkpoint_store): self.event_store = event_store self.checkpoint_store = checkpoint_store self.projections: List[Projection] = [] def register(self, projection: Projection): self.projections.append(projection) async def run(self, batch_size: int = 100): """Run all projections continuously.""" while True: for projection in self.projections: await self._run_projection(projection, batch_size) await asyncio.sleep(0.1) async def _run_projection(self, projection: Projection, batch_size: int): checkpoint = await self.checkpoint_store.get(projection.name) position = checkpoint or 0 events = await self.event_store.read_all(position, batch_size) for event in events: if event.event_type in projection.handles(): await projection.apply(event) await self.checkpoint_store.save( projection.name, event.global_position ) async def rebuild(self, projection: Projection): """Rebuild a projection from scratch.""" await self.checkpoint_store.delete(projection.name) # Optionally clear read model tables await self._run_projection(projection, batch_size=1000)

Template 2: Order Summary Projection

class OrderSummaryProjection(Projection): """Projects order events to a summary read model.""" def __init__(self, db_pool: asyncpg.Pool): self.pool = db_pool @property def name(self) -> str: return "order_summary" def handles(self) -> List[str]: return [ "OrderCreated", "OrderItemAdded", "OrderItemRemoved", "OrderShipped", "OrderCompleted", "OrderCancelled" ] async def apply(self, event: Event) -> None: handlers = { "OrderCreated": self._handle_created, "OrderItemAdded": self._handle_item_added, "OrderItemRemoved": self._handle_item_removed, "OrderShipped": self._handle_shipped, "OrderCompleted": self._handle_completed, "OrderCancelled": self._handle_cancelled, } handler = handlers.get(event.event_type) if handler: await handler(event) async def _handle_created(self, event: Event): async with self.pool.acquire() as conn: await conn.execute( """ INSERT INTO order_summaries (order_id, customer_id, status, total_amount, item_count, created_at) VALUES ($1, $2, $3, $4, $5, $6) """, event.data['order_id'], event.data['customer_id'], 'pending', 0, 0, event.data['created_at'] ) async def _handle_item_added(self, event: Event): async with self.pool.acquire() as conn: await conn.execute( """ UPDATE order_summaries SET total_amount = total_amount + $2, item_count = item_count + 1, updated_at = NOW() WHERE order_id = $1 """, event.data['order_id'], event.data['price'] * event.data['quantity'] ) async def _handle_item_removed(self, event: Event): async with self.pool.acquire() as conn: await conn.execute( """ UPDATE order_summaries SET total_amount = total_amount - $2, item_count = item_count - 1, updated_at = NOW() WHERE order_id = $1 """, event.data['order_id'], event.data['price'] * event.data['quantity'] ) async def _handle_shipped(self, event: Event): async with self.pool.acquire() as conn: await conn.execute( """ UPDATE order_summaries SET status = 'shipped', shipped_at = $2, updated_at = NOW() WHERE order_id = $1 """, event.data['order_id'], event.data['shipped_at'] ) async def _handle_completed(self, event: Event): async with self.pool.acquire() as conn: await conn.execute( """ UPDATE order_summaries SET status = 'completed', completed_at = $2, updated_at = NOW() WHERE order_id = $1 """, event.data['order_id'], event.data['completed_at'] ) async def _handle_cancelled(self, event: Event): async with self.pool.acquire() as conn: await conn.execute( """ UPDATE order_summaries SET status = 'cancelled', cancelled_at = $2, cancellation_reason = $3, updated_at = NOW() WHERE order_id = $1 """, event.data['order_id'], event.data['cancelled_at'], event.data.get('reason') )

Template 3: Elasticsearch Search Projection

from elasticsearch import AsyncElasticsearch class ProductSearchProjection(Projection): """Projects product events to Elasticsearch for full-text search.""" def __init__(self, es_client: AsyncElasticsearch): self.es = es_client self.index = "products" @property def name(self) -> str: return "product_search" def handles(self) -> List[str]: return [ "ProductCreated", "ProductUpdated", "ProductPriceChanged", "ProductDeleted" ] async def apply(self, event: Event) -> None: if event.event_type == "ProductCreated": await self.es.index( index=self.index, id=event.data['product_id'], document={ 'name': event.data['name'], 'description': event.data['description'], 'category': event.data['category'], 'price': event.data['price'], 'tags': event.data.get('tags', []), 'created_at': event.data['created_at'] } ) elif event.event_type == "ProductUpdated": await self.es.update( index=self.index, id=event.data['product_id'], doc={ 'name': event.data['name'], 'description': event.data['description'], 'category': event.data['category'], 'tags': event.data.get('tags', []), 'updated_at': event.data['updated_at'] } ) elif event.event_type == "ProductPriceChanged": await self.es.update( index=self.index, id=event.data['product_id'], doc={ 'price': event.data['new_price'], 'price_updated_at': event.data['changed_at'] } ) elif event.event_type == "ProductDeleted": await self.es.delete( index=self.index, id=event.data['product_id'] )

Template 4: Aggregating Projection

class DailySalesProjection(Projection): """Aggregates sales data by day for reporting.""" def __init__(self, db_pool: asyncpg.Pool): self.pool = db_pool @property def name(self) -> str: return "daily_sales" def handles(self) -> List[str]: return ["OrderCompleted", "OrderRefunded"] async def apply(self, event: Event) -> None: if event.event_type == "OrderCompleted": await self._increment_sales(event) elif event.event_type == "OrderRefunded": await self._decrement_sales(event) async def _increment_sales(self, event: Event): date = event.data['completed_at'][:10] # YYYY-MM-DD async with self.pool.acquire() as conn: await conn.execute( """ INSERT INTO daily_sales (date, total_orders, total_revenue, total_items) VALUES ($1, 1, $2, $3) ON CONFLICT (date) DO UPDATE SET total_orders = daily_sales.total_orders + 1, total_revenue = daily_sales.total_revenue + $2, total_items = daily_sales.total_items + $3, updated_at = NOW() """, date, event.data['total_amount'], event.data['item_count'] ) async def _decrement_sales(self, event: Event): date = event.data['original_completed_at'][:10] async with self.pool.acquire() as conn: await conn.execute( """ UPDATE daily_sales SET total_orders = total_orders - 1, total_revenue = total_revenue - $2, total_refunds = total_refunds + $2, updated_at = NOW() WHERE date = $1 """, date, event.data['refund_amount'] )

Template 5: Multi-Table Projection

class CustomerActivityProjection(Projection): """Projects customer activity across multiple tables.""" def __init__(self, db_pool: asyncpg.Pool): self.pool = db_pool @property def name(self) -> str: return "customer_activity" def handles(self) -> List[str]: return [ "CustomerCreated", "OrderCompleted", "ReviewSubmitted", "CustomerTierChanged" ] async def apply(self, event: Event) -> None: async with self.pool.acquire() as conn: async with conn.transaction(): if event.event_type == "CustomerCreated": # Insert into customers table await conn.execute( """ INSERT INTO customers (customer_id, email, name, tier, created_at) VALUES ($1, $2, $3, 'bronze', $4) """, event.data['customer_id'], event.data['email'], event.data['name'], event.data['created_at'] ) # Initialize activity summary await conn.execute( """ INSERT INTO customer_activity_summary (customer_id, total_orders, total_spent, total_reviews) VALUES ($1, 0, 0, 0) """, event.data['customer_id'] ) elif event.event_type == "OrderCompleted": # Update activity summary await conn.execute( """ UPDATE customer_activity_summary SET total_orders = total_orders + 1, total_spent = total_spent + $2, last_order_at = $3 WHERE customer_id = $1 """, event.data['customer_id'], event.data['total_amount'], event.data['completed_at'] ) # Insert into order history await conn.execute( """ INSERT INTO customer_order_history (customer_id, order_id, amount, completed_at) VALUES ($1, $2, $3, $4) """, event.data['customer_id'], event.data['order_id'], event.data['total_amount'], event.data['completed_at'] ) elif event.event_type == "ReviewSubmitted": await conn.execute( """ UPDATE customer_activity_summary SET total_reviews = total_reviews + 1, last_review_at = $2 WHERE customer_id = $1 """, event.data['customer_id'], event.data['submitted_at'] ) elif event.event_type == "CustomerTierChanged": await conn.execute( """ UPDATE customers SET tier = $2, updated_at = NOW() WHERE customer_id = $1 """, event.data['customer_id'], event.data['new_tier'] )

Best Practices

Do's

  • Make projections idempotent - Safe to replay
  • Use transactions - For multi-table updates
  • Store checkpoints - Resume after failures
  • Monitor lag - Alert on projection delays
  • Plan for rebuilds - Design for reconstruction

Don'ts

  • Don't couple projections - Each is independent
  • Don't skip error handling - Log and alert on failures
  • Don't ignore ordering - Events must be processed in order
  • Don't over-normalize - Denormalize for query patterns

Resources

GitHub Repository
wshobson/agents
Stars
28,275
Forks
3,111
Open Repository
Install Skill
Download ZIP1 files