Back to examples
Real World
ETL Pipeline
In-memory CSV Extract → Transform → Load pipeline with JSON artifact output.
When to use
Use this as a template for any ETL workflow. It demonstrates the full extract-transform-load pattern with artifact persistence, using only Python stdlib.
real-world/etl-pipeline.py
"""
01_etl_pipeline.py: In-memory CSV ETL pipeline.
Demonstrates:
- A realistic Extract → Transform → Load pattern using stdlib only
- Producing a JSON artifact at the end
- No external dependencies (csv, io, json from stdlib)
"""
import csv
import io
import json
from dagy import LocalArtifact, flow, task
# Inline CSV source (simulates reading from a file or S3 object)
_CSV_DATA = """\
id,product,quantity,unit_price
1,Widget A,10,4.99
2,Widget B,3,14.50
3,Gadget X,7,29.99
4,Widget A,2,4.99
5,Gadget Y,1,99.00
"""
@task
def extract() -> list:
"""Read CSV rows into a list of dicts."""
reader = csv.DictReader(io.StringIO(_CSV_DATA.strip()))
return [
{
"id": int(row["id"]),
"product": row["product"],
"quantity": int(row["quantity"]),
"unit_price": float(row["unit_price"]),
}
for row in reader
]
@task
def transform(rows: list) -> list:
"""Add a 'total' column and filter low-value rows (total < 20)."""
enriched = []
for row in rows:
total = row["quantity"] * row["unit_price"]
if total >= 20:
enriched.append({**row, "total": round(total, 2)})
return enriched
@task
def aggregate(rows: list) -> dict:
"""Summarise: group by product, sum totals."""
summary: dict[str, float] = {}
for row in rows:
summary[row["product"]] = round(
summary.get(row["product"], 0.0) + row["total"], 2
)
return {
"by_product": summary,
"grand_total": round(sum(summary.values()), 2),
"row_count": len(rows),
}
@task
def load(summary: dict) -> LocalArtifact:
"""Persist the aggregated result as a JSON artifact."""
return LocalArtifact(
type="json",
value=json.dumps(summary, indent=2),
filename="etl_output.json",
)
@flow(name="etl_pipeline")
def etl_pipeline() -> None:
raw = extract()
filtered = transform(raw)
summary = aggregate(filtered)
load(summary)
if __name__ == "__main__":
result = etl_pipeline.run_local()
print(f"Run ID : {result.run_id}")
print(f"Status : {result.status}")How it works
- `extract` parses inline CSV data into a list of typed dicts.
- `transform` adds a `total` column (quantity × unit_price) and filters out low-value rows.
- `aggregate` groups by product and sums totals.
- `load` persists the aggregated summary as a JSON artifact.
- No external dependencies; `csv`, `io`, and `json` are all stdlib.
Inputs
- None; CSV data is embedded in the source.
Outputs
- `etl_output.json` artifact with product totals and grand total.
Dependencies
`dagy`, `LocalArtifact`, `csv` (stdlib), `io` (stdlib), `json` (stdlib)