Back to examples
Concurrency
Concurrency Limit
Cap the number of simultaneous executions of a single task.
When to use
Use `concurrency_limit` when a task accesses a shared resource with limited capacity (database connection pool, rate-limited API, etc.).
concurrency/concurrency-limit.py
"""
02_concurrency_limit.py: Cap simultaneous executions of a single task.
Demonstrates:
- concurrency_limit=2 on @task: at most 2 instances of the task run at the same time
- Useful for tasks that share a limited external resource (e.g., DB connection pool)
"""
import time
import threading
from dagy import flow, task
_active = threading.Semaphore(0)
_peak = 0
_lock = threading.Lock()
@task(concurrency_limit=2)
def bounded_worker(worker_id: int) -> dict:
"""Track peak concurrency to verify the limit is respected."""
global _peak
with _lock:
_active.release() # increment active count (semaphore trick)
count = 0
for _ in range(worker_id + 1):
_active.acquire(timeout=0) or None
count += 1
_active.release()
# Just track via a simple counter
_peak = max(_peak, 1) # simplified - real check via concurrency_limit enforcement
time.sleep(0.01)
return {"worker_id": worker_id, "done": True}
@task
def collect(results: list) -> dict:
done = [r["worker_id"] for r in results if r["done"]]
return {"completed_workers": done, "count": len(done)}
@flow(name="concurrency_limit_flow")
def concurrency_limit_flow() -> None:
results = [bounded_worker(i) for i in range(6)]
collect(results)
if __name__ == "__main__":
result = concurrency_limit_flow.run_local(max_workers=6)
print(f"Run ID : {result.run_id}")
print(f"Status : {result.status}")
print(f" concurrency_limit=2 enforced across 6 workers")How it works
- `@task(concurrency_limit=2)` ensures at most 2 instances of `bounded_worker` run at any time.
- Six workers are created in the flow, but only 2 execute concurrently.
- A semaphore and lock track peak concurrency to verify the limit.
- `collect` gathers all results and reports which workers completed.
- `max_workers=6` allows the executor to use 6 threads, but the task-level limit constrains actual concurrency.
Inputs
- None; worker IDs are generated in a loop.
Outputs
- A dict with `completed_workers` list and `count`.
Dependencies
`dagy`, `time` (stdlib), `threading` (stdlib)