Back to examples
Graph Patterns

Fan-In

Three independent tasks merge into one aggregator.

When to use

Use fan-in when you need to collect results from multiple independent sources and merge them in a single downstream task.

graph-patterns/fan-in.py
"""
02_fan_in.py: Three independent tasks merge into one aggregator.

Demonstrates:
- Fan-in: multiple upstream TaskOutputs collected into a list and passed to one task
- The aggregator task receives a list of results from independent branches
"""

from dagy import flow, task


@task
def fetch_users() -> list:
    return [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]


@task
def fetch_orders() -> list:
    return [{"order_id": 100, "user_id": 1}, {"order_id": 101, "user_id": 2}]


@task
def fetch_products() -> list:
    return [{"sku": "A1", "price": 9.99}, {"sku": "B2", "price": 24.50}]


@task
def merge_report(sources: list) -> dict:
    """sources is a list of [users, orders, products]."""
    users, orders, products = sources
    return {
        "user_count": len(users),
        "order_count": len(orders),
        "product_count": len(products),
        "summary": f"{len(users)} users, {len(orders)} orders, {len(products)} products",
    }


@flow(name="fan_in_flow")
def fan_in_flow() -> None:
    users = fetch_users()
    orders = fetch_orders()
    products = fetch_products()

    # Pass a list of TaskOutputs. DAGY creates edges from all three to merge_report.
    merge_report([users, orders, products])


if __name__ == "__main__":
    result = fan_in_flow.run_local(max_workers=3)
    print(f"Run ID : {result.run_id}")
    print(f"Status : {result.status}")

How it works

  1. Three independent tasks (`fetch_users`, `fetch_orders`, `fetch_products`) each return data.
  2. Their outputs are collected into a list: `[users, orders, products]`.
  3. `merge_report` receives the list and destructures it into individual sources.
  4. DAGY creates edges from all three source tasks to `merge_report` automatically.

Inputs

  • None; data is generated internally by each fetch task.

Outputs

  • A merged report dict with counts and summary from all three sources.

Dependencies

  • `dagy`