← Back to POCs
Complete
SnowflakeSQLData Quality

Lightweight Data Quality Framework for Snowflake

April 10, 2026·View on GitHub

Overview

A zero-infrastructure data quality framework that lives entirely in SQL. Rules are stored in a configuration table; checks run automatically via Tasks whenever new data arrives via Streams.

Architecture

raw table
    ↓ (Stream)
quality_check_task (every 5 min)
    ↓
dq_rules (config table) → run checks
    ↓
dq_results (results table)
    ↓
dq_dashboard (secure view for BI tools)

Rule Configuration Table

CREATE TABLE dq.rules (
  rule_id      INT AUTOINCREMENT,
  table_name   VARCHAR,
  column_name  VARCHAR,
  rule_type    VARCHAR,   -- 'not_null', 'unique', 'range', 'regex', 'referential'
  rule_param   VARIANT,   -- JSON params: {"min": 0, "max": 100}
  severity     VARCHAR,   -- 'critical', 'warning'
  is_active    BOOLEAN DEFAULT TRUE
);

-- Example rules
INSERT INTO dq.rules (table_name, column_name, rule_type, rule_param, severity)
VALUES
  ('orders', 'order_id',    'not_null', NULL,                    'critical'),
  ('orders', 'amount',      'range',    '{"min":0,"max":99999}', 'critical'),
  ('orders', 'email',       'regex',    '{"pattern":".*@.*\\..*"}','warning'),
  ('orders', 'customer_id', 'referential','{"ref_table":"customers","ref_col":"id"}','critical');

Automated Check Runner

CREATE OR REPLACE PROCEDURE dq.run_checks(p_table VARCHAR)
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'run_checks'
AS $$
from snowflake.snowpark import Session

def run_checks(session: Session, p_table: str) -> str:
    rules = session.table("dq.rules").filter(
        f"table_name = '{p_table}' AND is_active = TRUE"
    ).collect()

    results = []
    for rule in rules:
        sql = build_check_sql(rule)
        fail_count = session.sql(sql).collect()[0][0]
        results.append({
            "table": p_table,
            "column": rule["COLUMN_NAME"],
            "rule": rule["RULE_TYPE"],
            "fail_count": fail_count,
            "severity": rule["SEVERITY"],
            "checked_at": "CURRENT_TIMESTAMP()",
        })

    session.create_dataframe(results).write.mode("append").save_as_table("dq.results")
    return f"Checked {len(rules)} rules on {p_table}."
$$;

Dashboard View

CREATE SECURE VIEW dq.dashboard AS
SELECT
  table_name,
  column_name,
  rule_type,
  severity,
  SUM(fail_count)                                     AS total_failures,
  MAX(checked_at)                                     AS last_checked,
  ROUND(100.0 * SUM(fail_count) / COUNT(*), 2)        AS failure_rate_pct
FROM dq.results
WHERE checked_at >= DATEADD('day', -7, CURRENT_TIMESTAMP())
GROUP BY 1, 2, 3, 4
ORDER BY severity, total_failures DESC;

Connect this view to Tableau, Sigma, or any BI tool for a live DQ dashboard.