Back to examples
Basics
Linear Chain
A sequential A → B → C → D pipeline that passes data through each stage.
When to use
Use this when you need a straightforward sequential pipeline where each step depends on the previous one's output.
basics/linear-chain.py
"""
02_linear_chain.py: Sequential A -> B -> C -> D pipeline.
Demonstrates:
- Passing task outputs to downstream tasks
- How DAGY builds a dependency graph automatically
- Each task receives the previous task's return value
"""
from dagy import flow, task
@task
def ingest(source: str) -> dict:
"""Step 1: load raw data from a named source."""
return {"source": source, "records": [1, 2, 3, 4, 5]}
@task
def filter_records(payload: dict) -> dict:
"""Step 2: keep only even records."""
evens = [r for r in payload["records"] if r % 2 == 0]
return {**payload, "records": evens}
@task
def enrich(payload: dict) -> dict:
"""Step 3: add metadata."""
return {**payload, "count": len(payload["records"]), "processed": True}
@task
def summarise(payload: dict) -> str:
"""Step 4: produce a human-readable summary."""
return (
f"Source={payload['source']} "
f"Records={payload['records']} "
f"Count={payload['count']}"
)
@flow(name="linear_chain")
def linear_chain_flow(source: str = "demo") -> None:
raw = ingest(source)
filtered = filter_records(raw)
enriched = enrich(filtered)
summarise(enriched)
if __name__ == "__main__":
result = linear_chain_flow.deploy(name="linear_chain")
print(f"Deployed : {result.flow_name}:{result.flow_version}")
print(f"Deployment: {result.deployment_name}")How it works
- `ingest` loads raw data from a named source and returns a dict.
- `filter_records` receives the dict and keeps only even records.
- `enrich` adds metadata (count, processed flag) to the payload.
- `summarise` produces a human-readable summary string.
- DAGY automatically builds the dependency graph from the call chain inside the flow.
Inputs
- `source` (str, default `"demo"`): the data source name.
Outputs
- A summary string with source, records, and count.
Dependencies
`dagy`