Retries & Timeouts
Dagy provides configurable retry policies, backoff strategies, jitter, and hard timeouts at both the task and flow level.
Retry Configuration
Both @task and @flow decorators accept retry parameters:
from dagy import task
@task(
retries=3,
retry_delay_seconds=2.0,
retry_jitter_factor=0.5,
retry_condition_fn=lambda e: isinstance(e, ConnectionError),
)
def fetch_data(url: str) -> dict:
...
| Parameter | Type | Default | Description |
|---|---|---|---|
retries | int | 0 | Maximum retry attempts on failure |
retry_delay_seconds | float | list | Callable | None | Delay between retries |
retry_jitter_factor | float | None | Random jitter multiplier added to delay |
retry_condition_fn | Callable[[Exception], bool] | None | Predicate that decides whether to retry |
Delay Strategies
The retry_delay_seconds parameter supports four patterns:
Fixed delay
A single number repeats for every retry attempt:
@task(retries=3, retry_delay_seconds=2.0)
def fixed():
...
# Delays: [2.0, 2.0, 2.0]
Custom sequence
A list specifies the delay for each attempt. If the list is shorter than the retry count, the last value is repeated:
@task(retries=3, retry_delay_seconds=[1, 5, 30])
def escalating():
...
# Delays: [1.0, 5.0, 30.0]
Dynamic (callable)
A callable receives the retry count and returns a list of delays:
@task(retries=5, retry_delay_seconds=lambda n: [2**i for i in range(n)])
def exponential():
...
# Delays: [1, 2, 4, 8, 16]
No delay
When retry_delay_seconds is None (default), retries are immediate with zero delay.
Jitter
Jitter adds randomness to retry delays to prevent thundering-herd problems when multiple tasks retry simultaneously.
Formula:
final_delay = base_delay + (base_delay × jitter_factor × random())
Where random() is a uniform value in [0, 1).
Example: With retry_delay_seconds=2.0 and retry_jitter_factor=0.5:
- Jitter range per attempt:
0to2.0 × 0.5 = 1.0 - Final delay range: 2.0 to 3.0 seconds
@task(retries=3, retry_delay_seconds=1.0, retry_jitter_factor=0.5)
def jittered():
...
# Each delay: base 1.0 + random jitter up to 0.5
The retry_jitter_factor must be >= 0. A negative value raises RetryConfigurationError.
Conditional Retry
The retry_condition_fn parameter lets you selectively retry only specific exception types:
from dagy import task
def only_transient(exc: Exception) -> bool:
return isinstance(exc, (ConnectionError, TimeoutError))
@task(retries=5, retry_delay_seconds=1.0, retry_condition_fn=only_transient)
def api_call(endpoint: str) -> dict:
...
Logic:
- The base condition checks
attempt < max_retries - If
retry_condition_fnis provided, both must be true to retry - The function receives the caught exception instance
- Return
Trueto retry,Falseto fail immediately
This is useful for distinguishing transient errors (network timeouts, rate limits) from permanent failures (invalid input, authentication errors).
Timeouts
Hard timeouts prevent tasks from running indefinitely. Specify timeout_seconds on the @task decorator:
@task(timeout_seconds=30.0)
def must_finish_fast():
...
How timeouts work
Local execution: Timeouts are enforced using concurrent.futures.ThreadPoolExecutor. The task runs in a worker thread, and if it exceeds the timeout, a TaskTimeoutError is raised.
Key behaviors:
- Timeouts are enforced per attempt, not across all retries
- Each retry attempt gets a fresh timeout window
TaskTimeoutErroris treated like any other exception; it can trigger retries
@task(retries=3, retry_delay_seconds=1.0, timeout_seconds=5.0)
def robust_fetch(url: str) -> dict:
# Each attempt has 5 seconds. If it times out, retries up to 3 times.
...
Timeouts with Step Functions
When deployed to Step Functions, timeout_seconds maps directly to the ASL TimeoutSeconds property on the Task state.
Retry Behavior Across Backends
| Feature | Local Executor | Step Functions |
|---|---|---|
| Retry delays | Full support (fixed, list, callable) | Fixed interval only (IntervalSeconds) |
| Jitter | SDK-computed per attempt | Mapped to BackoffRate: 2.0 |
| Timeout | ThreadPoolExecutor enforcement | ASL TimeoutSeconds |
| Conditional retry | Full retry_condition_fn support | Not supported (retries on all errors) |
| Error routing | Exception propagation | ASL Catch → HandleTaskFailure state |
Note: Step Functions translates retry delays to
max(1, int(interval)), losing fractional seconds. List-based and callable delays use only the first interval value.
Lifecycle Hooks During Retries
Hooks are invoked at each stage of the retry lifecycle:
| Hook | When |
|---|---|
on_running | Before each attempt (including retries) |
on_retry | After a failure, before the retry delay |
on_completion | After a successful attempt |
on_failure | After the final failure (all retries exhausted) |
from dagy import task, Hook, RunContext, State
class LogRetry(Hook):
def __call__(self, context: RunContext, state: State):
print(f"Retry attempt {context.attempt}/{context.max_retries}: {state.message}")
@task(retries=3, on_retry=[LogRetry()])
def monitored_task():
...
The RunContext provided to hooks includes:
| Field | Description |
|---|---|
kind | "task" or "flow" |
name | Task or flow name |
attempt | Current attempt number (1-indexed) |
max_retries | Maximum retry count |
parameters | Runtime parameters |