Back to examples
Error Handling
Timeout
Task-level timeout enforcement to prevent runaway tasks.
When to use
Use `timeout_seconds` when a task should be killed if it takes too long; e.g. slow API calls or compute-heavy operations.
error-handling/timeout.py
"""
04_timeout.py: Task-level timeout enforcement.
Demonstrates:
- timeout_seconds=2 on a task
- A task that completes well within the limit (shows timeout does not interfere)
"""
import time
from dagy import flow, task
@task(timeout_seconds=2)
def slow_but_fine(delay: float) -> str:
"""Sleeps for `delay` seconds. Must be < timeout_seconds to succeed."""
print(f" Working for {delay}s (timeout=2s)...")
time.sleep(delay)
return f"Completed after {delay}s"
@task
def report(message: str) -> None:
print(f" {message}")
@flow(name="timeout_flow")
def timeout_flow(delay: float = 0.1) -> None:
msg = slow_but_fine(delay)
report(msg)
if __name__ == "__main__":
result = timeout_flow.run_local(delay=0.1)
print(f"Run ID : {result.run_id}")
print(f"Status : {result.status}")How it works
- `@task(timeout_seconds=2)` sets a 2-second execution limit.
- `slow_but_fine` sleeps for a configurable delay. If the delay exceeds the timeout, the task is cancelled.
- In this example the delay is well within the limit, showing timeout does not interfere with normal execution.
- `report` prints the result message.
Inputs
- `delay` (float, default `0.1`): how long the task sleeps.
Outputs
- A completion message string.
Dependencies
`dagy`, `time` (stdlib)