Back to examples
Graph Patterns
Diamond
Classic diamond topology: ingest → [clean, enrich] → combine.
When to use
Use the diamond pattern when data needs two independent transformations before being merged. This is the most common real-world DAG shape.
graph-patterns/diamond.py
"""
03_diamond.py: Full diamond pattern: ingest -> [clean, enrich] -> combine.
Demonstrates:
- Fan-out from a single source into two parallel branches
- Fan-in: both branches feed a single downstream combiner
- The classic "diamond" DAG topology
"""
from dagy import flow, task
@task
def ingest(source: str) -> dict:
return {
"source": source,
"records": [
{"id": 1, "value": 10, "category": "A"},
{"id": 2, "value": None, "category": "B"},
{"id": 3, "value": 30, "category": "A"},
],
}
@task
def clean(payload: dict) -> dict:
"""Drop records with null values."""
valid = [r for r in payload["records"] if r["value"] is not None]
return {**payload, "records": valid, "dropped": len(payload["records"]) - len(valid)}
@task
def enrich(payload: dict) -> dict:
"""Add a 'score' field to records that have a non-null value."""
enriched = [
{**r, "score": r["value"] * 1.5}
for r in payload["records"]
if r["value"] is not None
]
return {**payload, "records": enriched}
@task
def combine(branches: list) -> dict:
"""Merge results from clean and enrich branches."""
cleaned, enriched = branches
return {
"source": cleaned["source"],
"clean_count": len(cleaned["records"]),
"dropped": cleaned["dropped"],
"enriched_count": len(enriched["records"]),
"scores": [r["score"] for r in enriched["records"]],
}
@flow(name="diamond_flow")
def diamond_flow(source: str = "raw_feed") -> None:
raw = ingest(source)
# Fan-out: both branches depend on raw but not each other.
cleaned = clean(raw)
enriched = enrich(raw)
# Fan-in: combine receives results from both branches.
combine([cleaned, enriched])
if __name__ == "__main__":
result = diamond_flow.run_local(max_workers=2)
print(f"Run ID : {result.run_id}")
print(f"Status : {result.status}")How it works
- `ingest` loads raw records with some null values.
- `clean` drops records with null values (branch 1).
- `enrich` adds a computed `score` field to valid records (branch 2).
- Both branches run in parallel since they depend only on `ingest`.
- `combine` receives `[cleaned, enriched]` as a list and merges the results.
Inputs
- `source` (str, default `"raw_feed"`): the data source name.
Outputs
- A combined dict with clean count, dropped count, enriched count, and scores.
Dependencies
`dagy`