← Back to POCs
Complete
SnowflakeSnowparkPythonETL

End-to-End ETL Pipeline with Snowpark

May 20, 2026·View on GitHub

Overview

This POC implements a full ETL pipeline using Snowpark DataFrames — no pandas, no leaving Snowflake. All computation happens inside the platform using lazy evaluation.

Pipeline Stages

Extract

def extract(session: Session, source_schema: str) -> DataFrame:
    """Read raw data from staging tables."""
    return (
        session.table(f"{source_schema}.raw_events")
        .filter(col("processed_flag") == False)
        .select("event_id", "user_id", "event_type", "payload", "created_at")
    )

Transform

from snowflake.snowpark.functions import parse_json, col, to_timestamp

def transform(df: DataFrame) -> DataFrame:
    """Parse JSON payload, cast types, enrich."""
    return (
        df
        .with_column("event_ts", to_timestamp(col("created_at")))
        .with_column("action", parse_json(col("payload"))["action"].cast("string"))
        .with_column("value",  parse_json(col("payload"))["value"].cast("float"))
        .drop("payload", "created_at")
        .na.drop()                   # Drop rows with nulls
    )

Load

from snowflake.snowpark import MergeResult

def load(session: Session, df: DataFrame, target: str) -> MergeResult:
    """Upsert into the target table using MERGE."""
    target_table = session.table(target)
    return target_table.merge(
        df,
        target_table["event_id"] == df["event_id"],
        [
            when_matched().update({"action": df["action"], "value": df["value"]}),
            when_not_matched().insert({
                "event_id": df["event_id"],
                "user_id":  df["user_id"],
                "action":   df["action"],
                "value":    df["value"],
                "event_ts": df["event_ts"],
            }),
        ],
    )

Running the Pipeline

python pipeline.py --source-schema RAW --target-table CURATED.EVENTS

Testing

# test_transform.py
from pipeline import transform

def test_transform_drops_nulls(session):
    test_data = session.create_dataframe([
        ("e1", "u1", "click", '{"action":"buy","value":9.99}', "2026-01-01"),
        ("e2", "u2", "view",  None, "2026-01-01"),  # null payload → dropped
    ], schema=["event_id","user_id","event_type","payload","created_at"])

    result = transform(test_data).collect()
    assert len(result) == 1
    assert result[0]["action"] == "buy"