Back to examples
Error Handling

Timeout

Task-level timeout enforcement to prevent runaway tasks.

When to use

Use `timeout_seconds` when a task should be killed if it takes too long; e.g. slow API calls or compute-heavy operations.

error-handling/timeout.py
"""
04_timeout.py: Task-level timeout enforcement.

Demonstrates:
- timeout_seconds=2 on a task
- A task that completes well within the limit (shows timeout does not interfere)
"""

import time

from dagy import flow, task


@task(timeout_seconds=2)
def slow_but_fine(delay: float) -> str:
    """Sleeps for `delay` seconds. Must be < timeout_seconds to succeed."""
    print(f"  Working for {delay}s (timeout=2s)...")
    time.sleep(delay)
    return f"Completed after {delay}s"


@task
def report(message: str) -> None:
    print(f"  {message}")


@flow(name="timeout_flow")
def timeout_flow(delay: float = 0.1) -> None:
    msg = slow_but_fine(delay)
    report(msg)


if __name__ == "__main__":
    result = timeout_flow.run_local(delay=0.1)
    print(f"Run ID : {result.run_id}")
    print(f"Status : {result.status}")

How it works

  1. `@task(timeout_seconds=2)` sets a 2-second execution limit.
  2. `slow_but_fine` sleeps for a configurable delay. If the delay exceeds the timeout, the task is cancelled.
  3. In this example the delay is well within the limit, showing timeout does not interfere with normal execution.
  4. `report` prints the result message.

Inputs

  • `delay` (float, default `0.1`): how long the task sleeps.

Outputs

  • A completion message string.

Dependencies

  • `dagy`, `time` (stdlib)