AS
AgSkills.dev
MARKETPLACE

data-quality-frameworks

Implement data quality validation with Great Expectations, dbt tests, and data contracts. Use when building data quality pipelines, implementing validation rules, or establishing data contracts.

28.2k
3.1k

Preview

SKILL.md
name
data-quality-frameworks
description
Implement data quality validation with Great Expectations, dbt tests, and data contracts. Use when building data quality pipelines, implementing validation rules, or establishing data contracts.

Data Quality Frameworks

Production patterns for implementing data quality with Great Expectations, dbt tests, and data contracts to ensure reliable data pipelines.

When to Use This Skill

  • Implementing data quality checks in pipelines
  • Setting up Great Expectations validation
  • Building comprehensive dbt test suites
  • Establishing data contracts between teams
  • Monitoring data quality metrics
  • Automating data validation in CI/CD

Core Concepts

1. Data Quality Dimensions

DimensionDescriptionExample Check
CompletenessNo missing valuesexpect_column_values_to_not_be_null
UniquenessNo duplicatesexpect_column_values_to_be_unique
ValidityValues in expected rangeexpect_column_values_to_be_in_set
AccuracyData matches realityCross-reference validation
ConsistencyNo contradictionsexpect_column_pair_values_A_to_be_greater_than_B
TimelinessData is recentexpect_column_max_to_be_between

2. Testing Pyramid for Data

          /\
         /  \     Integration Tests (cross-table)
        /────\
       /      \   Unit Tests (single column)
      /────────\
     /          \ Schema Tests (structure)
    /────────────\

Quick Start

Great Expectations Setup

# Install pip install great_expectations # Initialize project great_expectations init # Create datasource great_expectations datasource new
# great_expectations/checkpoints/daily_validation.yml import great_expectations as gx # Create context context = gx.get_context() # Create expectation suite suite = context.add_expectation_suite("orders_suite") # Add expectations suite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id") ) suite.add_expectation( gx.expectations.ExpectColumnValuesToBeUnique(column="order_id") ) # Validate results = context.run_checkpoint(checkpoint_name="daily_orders")

Patterns

Pattern 1: Great Expectations Suite

# expectations/orders_suite.py import great_expectations as gx from great_expectations.core import ExpectationSuite from great_expectations.core.expectation_configuration import ExpectationConfiguration def build_orders_suite() -> ExpectationSuite: """Build comprehensive orders expectation suite""" suite = ExpectationSuite(expectation_suite_name="orders_suite") # Schema expectations suite.add_expectation(ExpectationConfiguration( expectation_type="expect_table_columns_to_match_set", kwargs={ "column_set": ["order_id", "customer_id", "amount", "status", "created_at"], "exact_match": False # Allow additional columns } )) # Primary key suite.add_expectation(ExpectationConfiguration( expectation_type="expect_column_values_to_not_be_null", kwargs={"column": "order_id"} )) suite.add_expectation(ExpectationConfiguration( expectation_type="expect_column_values_to_be_unique", kwargs={"column": "order_id"} )) # Foreign key suite.add_expectation(ExpectationConfiguration( expectation_type="expect_column_values_to_not_be_null", kwargs={"column": "customer_id"} )) # Categorical values suite.add_expectation(ExpectationConfiguration( expectation_type="expect_column_values_to_be_in_set", kwargs={ "column": "status", "value_set": ["pending", "processing", "shipped", "delivered", "cancelled"] } )) # Numeric ranges suite.add_expectation(ExpectationConfiguration( expectation_type="expect_column_values_to_be_between", kwargs={ "column": "amount", "min_value": 0, "max_value": 100000, "strict_min": True # amount > 0 } )) # Date validity suite.add_expectation(ExpectationConfiguration( expectation_type="expect_column_values_to_be_dateutil_parseable", kwargs={"column": "created_at"} )) # Freshness - data should be recent suite.add_expectation(ExpectationConfiguration( expectation_type="expect_column_max_to_be_between", kwargs={ "column": "created_at", "min_value": {"$PARAMETER": "now - timedelta(days=1)"}, "max_value": {"$PARAMETER": "now"} } )) # Row count sanity suite.add_expectation(ExpectationConfiguration( expectation_type="expect_table_row_count_to_be_between", kwargs={ "min_value": 1000, # Expect at least 1000 rows "max_value": 10000000 } )) # Statistical expectations suite.add_expectation(ExpectationConfiguration( expectation_type="expect_column_mean_to_be_between", kwargs={ "column": "amount", "min_value": 50, "max_value": 500 } )) return suite

Pattern 2: Great Expectations Checkpoint

# great_expectations/checkpoints/orders_checkpoint.yml name: orders_checkpoint config_version: 1.0 class_name: Checkpoint run_name_template: "%Y%m%d-%H%M%S-orders-validation" validations: - batch_request: datasource_name: warehouse data_connector_name: default_inferred_data_connector_name data_asset_name: orders data_connector_query: index: -1 # Latest batch expectation_suite_name: orders_suite action_list: - name: store_validation_result action: class_name: StoreValidationResultAction - name: store_evaluation_parameters action: class_name: StoreEvaluationParametersAction - name: update_data_docs action: class_name: UpdateDataDocsAction # Slack notification on failure - name: send_slack_notification action: class_name: SlackNotificationAction slack_webhook: ${SLACK_WEBHOOK} notify_on: failure renderer: module_name: great_expectations.render.renderer.slack_renderer class_name: SlackRenderer
# Run checkpoint import great_expectations as gx context = gx.get_context() result = context.run_checkpoint(checkpoint_name="orders_checkpoint") if not result.success: failed_expectations = [ r for r in result.run_results.values() if not r.success ] raise ValueError(f"Data quality check failed: {failed_expectations}")

Pattern 3: dbt Data Tests

# models/marts/core/_core__models.yml version: 2 models: - name: fct_orders description: Order fact table tests: # Table-level tests - dbt_utils.recency: datepart: day field: created_at interval: 1 - dbt_utils.at_least_one - dbt_utils.expression_is_true: expression: "total_amount >= 0" columns: - name: order_id description: Primary key tests: - unique - not_null - name: customer_id description: Foreign key to dim_customers tests: - not_null - relationships: to: ref('dim_customers') field: customer_id - name: order_status tests: - accepted_values: values: ["pending", "processing", "shipped", "delivered", "cancelled"] - name: total_amount tests: - not_null - dbt_utils.expression_is_true: expression: ">= 0" - name: created_at tests: - not_null - dbt_utils.expression_is_true: expression: "<= current_timestamp" - name: dim_customers columns: - name: customer_id tests: - unique - not_null - name: email tests: - unique - not_null # Custom regex test - dbt_utils.expression_is_true: expression: "email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'"

Pattern 4: Custom dbt Tests

-- tests/generic/test_row_count_in_range.sql {% test row_count_in_range(model, min_count, max_count) %} with row_count as ( select count(*) as cnt from {{ model }} ) select cnt from row_count where cnt < {{ min_count }} or cnt > {{ max_count }} {% endtest %} -- Usage in schema.yml: -- tests: -- - row_count_in_range: -- min_count: 1000 -- max_count: 10000000
-- tests/generic/test_sequential_values.sql {% test sequential_values(model, column_name, interval=1) %} with lagged as ( select {{ column_name }}, lag({{ column_name }}) over (order by {{ column_name }}) as prev_value from {{ model }} ) select * from lagged where {{ column_name }} - prev_value != {{ interval }} and prev_value is not null {% endtest %}
-- tests/singular/assert_orders_customers_match.sql -- Singular test: specific business rule with orders_customers as ( select distinct customer_id from {{ ref('fct_orders') }} ), dim_customers as ( select customer_id from {{ ref('dim_customers') }} ), orphaned_orders as ( select o.customer_id from orders_customers o left join dim_customers c using (customer_id) where c.customer_id is null ) select * from orphaned_orders -- Test passes if this returns 0 rows

Pattern 5: Data Contracts

# contracts/orders_contract.yaml apiVersion: datacontract.com/v1.0.0 kind: DataContract metadata: name: orders version: 1.0.0 owner: data-platform-team contact: data-[email protected] info: title: Orders Data Contract description: Contract for order event data from the ecommerce platform purpose: Analytics, reporting, and ML features servers: production: type: snowflake account: company.us-east-1 database: ANALYTICS schema: CORE terms: usage: Internal analytics only limitations: PII must not be exposed in downstream marts billing: Charged per query TB scanned schema: type: object properties: order_id: type: string format: uuid description: Unique order identifier required: true unique: true pii: false customer_id: type: string format: uuid description: Customer identifier required: true pii: true piiClassification: indirect total_amount: type: number minimum: 0 maximum: 100000 description: Order total in USD created_at: type: string format: date-time description: Order creation timestamp required: true status: type: string enum: [pending, processing, shipped, delivered, cancelled] description: Current order status quality: type: SodaCL specification: checks for orders: - row_count > 0 - missing_count(order_id) = 0 - duplicate_count(order_id) = 0 - invalid_count(status) = 0: valid values: [pending, processing, shipped, delivered, cancelled] - freshness(created_at) < 24h sla: availability: 99.9% freshness: 1 hour latency: 5 minutes

Pattern 6: Automated Quality Pipeline

# quality_pipeline.py from dataclasses import dataclass from typing import List, Dict, Any import great_expectations as gx from datetime import datetime @dataclass class QualityResult: table: str passed: bool total_expectations: int failed_expectations: int details: List[Dict[str, Any]] timestamp: datetime class DataQualityPipeline: """Orchestrate data quality checks across tables""" def __init__(self, context: gx.DataContext): self.context = context self.results: List[QualityResult] = [] def validate_table(self, table: str, suite: str) -> QualityResult: """Validate a single table against expectation suite""" checkpoint_config = { "name": f"{table}_validation", "config_version": 1.0, "class_name": "Checkpoint", "validations": [{ "batch_request": { "datasource_name": "warehouse", "data_asset_name": table, }, "expectation_suite_name": suite, }], } result = self.context.run_checkpoint(**checkpoint_config) # Parse results validation_result = list(result.run_results.values())[0] results = validation_result.results failed = [r for r in results if not r.success] return QualityResult( table=table, passed=result.success, total_expectations=len(results), failed_expectations=len(failed), details=[{ "expectation": r.expectation_config.expectation_type, "success": r.success, "observed_value": r.result.get("observed_value"), } for r in results], timestamp=datetime.now() ) def run_all(self, tables: Dict[str, str]) -> Dict[str, QualityResult]: """Run validation for all tables""" results = {} for table, suite in tables.items(): print(f"Validating {table}...") results[table] = self.validate_table(table, suite) return results def generate_report(self, results: Dict[str, QualityResult]) -> str: """Generate quality report""" report = ["# Data Quality Report", f"Generated: {datetime.now()}", ""] total_passed = sum(1 for r in results.values() if r.passed) total_tables = len(results) report.append(f"## Summary: {total_passed}/{total_tables} tables passed") report.append("") for table, result in results.items(): status = "βœ…" if result.passed else "❌" report.append(f"### {status} {table}") report.append(f"- Expectations: {result.total_expectations}") report.append(f"- Failed: {result.failed_expectations}") if not result.passed: report.append("- Failed checks:") for detail in result.details: if not detail["success"]: report.append(f" - {detail['expectation']}: {detail['observed_value']}") report.append("") return "\n".join(report) # Usage context = gx.get_context() pipeline = DataQualityPipeline(context) tables_to_validate = { "orders": "orders_suite", "customers": "customers_suite", "products": "products_suite", } results = pipeline.run_all(tables_to_validate) report = pipeline.generate_report(results) # Fail pipeline if any table failed if not all(r.passed for r in results.values()): print(report) raise ValueError("Data quality checks failed!")

Best Practices

Do's

  • Test early - Validate source data before transformations
  • Test incrementally - Add tests as you find issues
  • Document expectations - Clear descriptions for each test
  • Alert on failures - Integrate with monitoring
  • Version contracts - Track schema changes

Don'ts

  • Don't test everything - Focus on critical columns
  • Don't ignore warnings - They often precede failures
  • Don't skip freshness - Stale data is bad data
  • Don't hardcode thresholds - Use dynamic baselines
  • Don't test in isolation - Test relationships too

Resources

GitHub Repository
wshobson/agents
Stars
28,275
Forks
3,111
Open Repository
Install Skill
Download ZIP1 files