Back to examples
Basics

Linear Chain

A sequential A → B → C → D pipeline that passes data through each stage.

When to use

Use this when you need a straightforward sequential pipeline where each step depends on the previous one's output.

basics/linear-chain.py
"""
02_linear_chain.py: Sequential A -> B -> C -> D pipeline.

Demonstrates:
- Passing task outputs to downstream tasks
- How DAGY builds a dependency graph automatically
- Each task receives the previous task's return value
"""

from dagy import flow, task


@task
def ingest(source: str) -> dict:
    """Step 1: load raw data from a named source."""
    return {"source": source, "records": [1, 2, 3, 4, 5]}


@task
def filter_records(payload: dict) -> dict:
    """Step 2: keep only even records."""
    evens = [r for r in payload["records"] if r % 2 == 0]
    return {**payload, "records": evens}


@task
def enrich(payload: dict) -> dict:
    """Step 3: add metadata."""
    return {**payload, "count": len(payload["records"]), "processed": True}


@task
def summarise(payload: dict) -> str:
    """Step 4: produce a human-readable summary."""
    return (
        f"Source={payload['source']}  "
        f"Records={payload['records']}  "
        f"Count={payload['count']}"
    )


@flow(name="linear_chain")
def linear_chain_flow(source: str = "demo") -> None:
    raw = ingest(source)
    filtered = filter_records(raw)
    enriched = enrich(filtered)
    summarise(enriched)


if __name__ == "__main__":
    result = linear_chain_flow.deploy(name="linear_chain")
    print(f"Deployed : {result.flow_name}:{result.flow_version}")
    print(f"Deployment: {result.deployment_name}")

How it works

  1. `ingest` loads raw data from a named source and returns a dict.
  2. `filter_records` receives the dict and keeps only even records.
  3. `enrich` adds metadata (count, processed flag) to the payload.
  4. `summarise` produces a human-readable summary string.
  5. DAGY automatically builds the dependency graph from the call chain inside the flow.

Inputs

  • `source` (str, default `"demo"`): the data source name.

Outputs

  • A summary string with source, records, and count.

Dependencies

  • `dagy`