Overview
A zero-infrastructure data quality framework that lives entirely in SQL. Rules are stored in a configuration table; checks run automatically via Tasks whenever new data arrives via Streams.
Architecture
raw table
↓ (Stream)
quality_check_task (every 5 min)
↓
dq_rules (config table) → run checks
↓
dq_results (results table)
↓
dq_dashboard (secure view for BI tools)
Rule Configuration Table
CREATE TABLE dq.rules (
rule_id INT AUTOINCREMENT,
table_name VARCHAR,
column_name VARCHAR,
rule_type VARCHAR, -- 'not_null', 'unique', 'range', 'regex', 'referential'
rule_param VARIANT, -- JSON params: {"min": 0, "max": 100}
severity VARCHAR, -- 'critical', 'warning'
is_active BOOLEAN DEFAULT TRUE
);
-- Example rules
INSERT INTO dq.rules (table_name, column_name, rule_type, rule_param, severity)
VALUES
('orders', 'order_id', 'not_null', NULL, 'critical'),
('orders', 'amount', 'range', '{"min":0,"max":99999}', 'critical'),
('orders', 'email', 'regex', '{"pattern":".*@.*\\..*"}','warning'),
('orders', 'customer_id', 'referential','{"ref_table":"customers","ref_col":"id"}','critical');
Automated Check Runner
CREATE OR REPLACE PROCEDURE dq.run_checks(p_table VARCHAR)
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'run_checks'
AS $$
from snowflake.snowpark import Session
def run_checks(session: Session, p_table: str) -> str:
rules = session.table("dq.rules").filter(
f"table_name = '{p_table}' AND is_active = TRUE"
).collect()
results = []
for rule in rules:
sql = build_check_sql(rule)
fail_count = session.sql(sql).collect()[0][0]
results.append({
"table": p_table,
"column": rule["COLUMN_NAME"],
"rule": rule["RULE_TYPE"],
"fail_count": fail_count,
"severity": rule["SEVERITY"],
"checked_at": "CURRENT_TIMESTAMP()",
})
session.create_dataframe(results).write.mode("append").save_as_table("dq.results")
return f"Checked {len(rules)} rules on {p_table}."
$$;
Dashboard View
CREATE SECURE VIEW dq.dashboard AS
SELECT
table_name,
column_name,
rule_type,
severity,
SUM(fail_count) AS total_failures,
MAX(checked_at) AS last_checked,
ROUND(100.0 * SUM(fail_count) / COUNT(*), 2) AS failure_rate_pct
FROM dq.results
WHERE checked_at >= DATEADD('day', -7, CURRENT_TIMESTAMP())
GROUP BY 1, 2, 3, 4
ORDER BY severity, total_failures DESC;
Connect this view to Tableau, Sigma, or any BI tool for a live DQ dashboard.