Back to examples
Graph Patterns
Fan-In
Three independent tasks merge into one aggregator.
When to use
Use fan-in when you need to collect results from multiple independent sources and merge them in a single downstream task.
graph-patterns/fan-in.py
"""
02_fan_in.py: Three independent tasks merge into one aggregator.
Demonstrates:
- Fan-in: multiple upstream TaskOutputs collected into a list and passed to one task
- The aggregator task receives a list of results from independent branches
"""
from dagy import flow, task
@task
def fetch_users() -> list:
return [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
@task
def fetch_orders() -> list:
return [{"order_id": 100, "user_id": 1}, {"order_id": 101, "user_id": 2}]
@task
def fetch_products() -> list:
return [{"sku": "A1", "price": 9.99}, {"sku": "B2", "price": 24.50}]
@task
def merge_report(sources: list) -> dict:
"""sources is a list of [users, orders, products]."""
users, orders, products = sources
return {
"user_count": len(users),
"order_count": len(orders),
"product_count": len(products),
"summary": f"{len(users)} users, {len(orders)} orders, {len(products)} products",
}
@flow(name="fan_in_flow")
def fan_in_flow() -> None:
users = fetch_users()
orders = fetch_orders()
products = fetch_products()
# Pass a list of TaskOutputs. DAGY creates edges from all three to merge_report.
merge_report([users, orders, products])
if __name__ == "__main__":
result = fan_in_flow.run_local(max_workers=3)
print(f"Run ID : {result.run_id}")
print(f"Status : {result.status}")How it works
- Three independent tasks (`fetch_users`, `fetch_orders`, `fetch_products`) each return data.
- Their outputs are collected into a list: `[users, orders, products]`.
- `merge_report` receives the list and destructures it into individual sources.
- DAGY creates edges from all three source tasks to `merge_report` automatically.
Inputs
- None; data is generated internally by each fetch task.
Outputs
- A merged report dict with counts and summary from all three sources.
Dependencies
`dagy`