Back to examples
Real World

ETL Pipeline

In-memory CSV Extract → Transform → Load pipeline with JSON artifact output.

When to use

Use this as a template for any ETL workflow. It demonstrates the full extract-transform-load pattern with artifact persistence, using only Python stdlib.

real-world/etl-pipeline.py
"""
01_etl_pipeline.py: In-memory CSV ETL pipeline.

Demonstrates:
- A realistic Extract → Transform → Load pattern using stdlib only
- Producing a JSON artifact at the end
- No external dependencies (csv, io, json from stdlib)
"""

import csv
import io
import json

from dagy import LocalArtifact, flow, task

# Inline CSV source (simulates reading from a file or S3 object)
_CSV_DATA = """\
id,product,quantity,unit_price
1,Widget A,10,4.99
2,Widget B,3,14.50
3,Gadget X,7,29.99
4,Widget A,2,4.99
5,Gadget Y,1,99.00
"""


@task
def extract() -> list:
    """Read CSV rows into a list of dicts."""
    reader = csv.DictReader(io.StringIO(_CSV_DATA.strip()))
    return [
        {
            "id": int(row["id"]),
            "product": row["product"],
            "quantity": int(row["quantity"]),
            "unit_price": float(row["unit_price"]),
        }
        for row in reader
    ]


@task
def transform(rows: list) -> list:
    """Add a 'total' column and filter low-value rows (total < 20)."""
    enriched = []
    for row in rows:
        total = row["quantity"] * row["unit_price"]
        if total >= 20:
            enriched.append({**row, "total": round(total, 2)})
    return enriched


@task
def aggregate(rows: list) -> dict:
    """Summarise: group by product, sum totals."""
    summary: dict[str, float] = {}
    for row in rows:
        summary[row["product"]] = round(
            summary.get(row["product"], 0.0) + row["total"], 2
        )
    return {
        "by_product": summary,
        "grand_total": round(sum(summary.values()), 2),
        "row_count": len(rows),
    }


@task
def load(summary: dict) -> LocalArtifact:
    """Persist the aggregated result as a JSON artifact."""
    return LocalArtifact(
        type="json",
        value=json.dumps(summary, indent=2),
        filename="etl_output.json",
    )


@flow(name="etl_pipeline")
def etl_pipeline() -> None:
    raw = extract()
    filtered = transform(raw)
    summary = aggregate(filtered)
    load(summary)


if __name__ == "__main__":
    result = etl_pipeline.run_local()
    print(f"Run ID : {result.run_id}")
    print(f"Status : {result.status}")

How it works

  1. `extract` parses inline CSV data into a list of typed dicts.
  2. `transform` adds a `total` column (quantity × unit_price) and filters out low-value rows.
  3. `aggregate` groups by product and sums totals.
  4. `load` persists the aggregated summary as a JSON artifact.
  5. No external dependencies; `csv`, `io`, and `json` are all stdlib.

Inputs

  • None; CSV data is embedded in the source.

Outputs

  • `etl_output.json` artifact with product totals and grand total.

Dependencies

  • `dagy`, `LocalArtifact`, `csv` (stdlib), `io` (stdlib), `json` (stdlib)