Back to examples
Concurrency

Parallel Tasks

Run independent tasks concurrently using a worker pool.

When to use

Use `max_workers` when you have many independent tasks that can benefit from parallel execution (e.g. fetching data from multiple sources).

concurrency/parallel-tasks.py
"""
01_parallel_tasks.py: Independent tasks executed in parallel.

Demonstrates:
- run_local(max_workers=4): worker pool size
- Tasks with no dependency on each other are scheduled concurrently
"""

import time

from dagy import flow, task


@task
def fetch_segment(segment_id: int) -> dict:
    """Simulate a latency-bound fetch (fast in tests)."""
    time.sleep(0.02)
    return {"segment": segment_id, "rows": segment_id * 100}


@task
def aggregate(segments: list) -> dict:
    total = sum(s["rows"] for s in segments)
    return {"total_rows": total, "segment_count": len(segments)}


@flow(name="parallel_tasks_flow")
def parallel_tasks_flow() -> None:
    # Five independent fetch tasks, run concurrently with max_workers=4.
    seg0 = fetch_segment(0)
    seg1 = fetch_segment(1)
    seg2 = fetch_segment(2)
    seg3 = fetch_segment(3)
    seg4 = fetch_segment(4)

    aggregate([seg0, seg1, seg2, seg3, seg4])


if __name__ == "__main__":
    result = parallel_tasks_flow.run_local(max_workers=4)
    print(f"Run ID : {result.run_id}")
    print(f"Status : {result.status}")

How it works

  1. Five `fetch_segment` tasks are defined in the flow; none depends on another.
  2. DAGY detects the independence and schedules them concurrently.
  3. `run_local(max_workers=4)` allows up to 4 tasks to run simultaneously.
  4. `aggregate` receives all five results as a list and computes totals.

Inputs

  • None; segment IDs are hard-coded.

Outputs

  • An aggregate dict with `total_rows` and `segment_count`.

Dependencies

  • `dagy`, `time` (stdlib)