Back to examples
Graph Patterns

Diamond

Classic diamond topology: ingest → [clean, enrich] → combine.

When to use

Use the diamond pattern when data needs two independent transformations before being merged. This is the most common real-world DAG shape.

graph-patterns/diamond.py
"""
03_diamond.py: Full diamond pattern: ingest -> [clean, enrich] -> combine.

Demonstrates:
- Fan-out from a single source into two parallel branches
- Fan-in: both branches feed a single downstream combiner
- The classic "diamond" DAG topology
"""

from dagy import flow, task


@task
def ingest(source: str) -> dict:
    return {
        "source": source,
        "records": [
            {"id": 1, "value": 10, "category": "A"},
            {"id": 2, "value": None, "category": "B"},
            {"id": 3, "value": 30, "category": "A"},
        ],
    }


@task
def clean(payload: dict) -> dict:
    """Drop records with null values."""
    valid = [r for r in payload["records"] if r["value"] is not None]
    return {**payload, "records": valid, "dropped": len(payload["records"]) - len(valid)}


@task
def enrich(payload: dict) -> dict:
    """Add a 'score' field to records that have a non-null value."""
    enriched = [
        {**r, "score": r["value"] * 1.5}
        for r in payload["records"]
        if r["value"] is not None
    ]
    return {**payload, "records": enriched}


@task
def combine(branches: list) -> dict:
    """Merge results from clean and enrich branches."""
    cleaned, enriched = branches
    return {
        "source": cleaned["source"],
        "clean_count": len(cleaned["records"]),
        "dropped": cleaned["dropped"],
        "enriched_count": len(enriched["records"]),
        "scores": [r["score"] for r in enriched["records"]],
    }


@flow(name="diamond_flow")
def diamond_flow(source: str = "raw_feed") -> None:
    raw = ingest(source)

    # Fan-out: both branches depend on raw but not each other.
    cleaned = clean(raw)
    enriched = enrich(raw)

    # Fan-in: combine receives results from both branches.
    combine([cleaned, enriched])


if __name__ == "__main__":
    result = diamond_flow.run_local(max_workers=2)
    print(f"Run ID : {result.run_id}")
    print(f"Status : {result.status}")

How it works

  1. `ingest` loads raw records with some null values.
  2. `clean` drops records with null values (branch 1).
  3. `enrich` adds a computed `score` field to valid records (branch 2).
  4. Both branches run in parallel since they depend only on `ingest`.
  5. `combine` receives `[cleaned, enriched]` as a list and merges the results.

Inputs

  • `source` (str, default `"raw_feed"`): the data source name.

Outputs

  • A combined dict with clean count, dropped count, enriched count, and scores.

Dependencies

  • `dagy`