Overview
This POC implements a full ETL pipeline using Snowpark DataFrames — no pandas, no leaving Snowflake. All computation happens inside the platform using lazy evaluation.
Pipeline Stages
Extract
def extract(session: Session, source_schema: str) -> DataFrame:
"""Read raw data from staging tables."""
return (
session.table(f"{source_schema}.raw_events")
.filter(col("processed_flag") == False)
.select("event_id", "user_id", "event_type", "payload", "created_at")
)
Transform
from snowflake.snowpark.functions import parse_json, col, to_timestamp
def transform(df: DataFrame) -> DataFrame:
"""Parse JSON payload, cast types, enrich."""
return (
df
.with_column("event_ts", to_timestamp(col("created_at")))
.with_column("action", parse_json(col("payload"))["action"].cast("string"))
.with_column("value", parse_json(col("payload"))["value"].cast("float"))
.drop("payload", "created_at")
.na.drop() # Drop rows with nulls
)
Load
from snowflake.snowpark import MergeResult
def load(session: Session, df: DataFrame, target: str) -> MergeResult:
"""Upsert into the target table using MERGE."""
target_table = session.table(target)
return target_table.merge(
df,
target_table["event_id"] == df["event_id"],
[
when_matched().update({"action": df["action"], "value": df["value"]}),
when_not_matched().insert({
"event_id": df["event_id"],
"user_id": df["user_id"],
"action": df["action"],
"value": df["value"],
"event_ts": df["event_ts"],
}),
],
)
Running the Pipeline
python pipeline.py --source-schema RAW --target-table CURATED.EVENTS
Testing
# test_transform.py
from pipeline import transform
def test_transform_drops_nulls(session):
test_data = session.create_dataframe([
("e1", "u1", "click", '{"action":"buy","value":9.99}', "2026-01-01"),
("e2", "u2", "view", None, "2026-01-01"), # null payload → dropped
], schema=["event_id","user_id","event_type","payload","created_at"])
result = transform(test_data).collect()
assert len(result) == 1
assert result[0]["action"] == "buy"