Back to examples
Concurrency
Parallel Tasks
Run independent tasks concurrently using a worker pool.
When to use
Use `max_workers` when you have many independent tasks that can benefit from parallel execution (e.g. fetching data from multiple sources).
concurrency/parallel-tasks.py
"""
01_parallel_tasks.py: Independent tasks executed in parallel.
Demonstrates:
- run_local(max_workers=4): worker pool size
- Tasks with no dependency on each other are scheduled concurrently
"""
import time
from dagy import flow, task
@task
def fetch_segment(segment_id: int) -> dict:
"""Simulate a latency-bound fetch (fast in tests)."""
time.sleep(0.02)
return {"segment": segment_id, "rows": segment_id * 100}
@task
def aggregate(segments: list) -> dict:
total = sum(s["rows"] for s in segments)
return {"total_rows": total, "segment_count": len(segments)}
@flow(name="parallel_tasks_flow")
def parallel_tasks_flow() -> None:
# Five independent fetch tasks, run concurrently with max_workers=4.
seg0 = fetch_segment(0)
seg1 = fetch_segment(1)
seg2 = fetch_segment(2)
seg3 = fetch_segment(3)
seg4 = fetch_segment(4)
aggregate([seg0, seg1, seg2, seg3, seg4])
if __name__ == "__main__":
result = parallel_tasks_flow.run_local(max_workers=4)
print(f"Run ID : {result.run_id}")
print(f"Status : {result.status}")How it works
- Five `fetch_segment` tasks are defined in the flow; none depends on another.
- DAGY detects the independence and schedules them concurrently.
- `run_local(max_workers=4)` allows up to 4 tasks to run simultaneously.
- `aggregate` receives all five results as a list and computes totals.
Inputs
- None; segment IDs are hard-coded.
Outputs
- An aggregate dict with `total_rows` and `segment_count`.
Dependencies
`dagy`, `time` (stdlib)