Back to docs
Concepts

Retries & Timeouts

Dagy provides configurable retry policies, backoff strategies, jitter, and hard timeouts at both the task and flow level.

Retry Configuration

Both @task and @flow decorators accept retry parameters:

from dagy import task

@task(
    retries=3,
    retry_delay_seconds=2.0,
    retry_jitter_factor=0.5,
    retry_condition_fn=lambda e: isinstance(e, ConnectionError),
)
def fetch_data(url: str) -> dict:
    ...
ParameterTypeDefaultDescription
retriesint0Maximum retry attempts on failure
retry_delay_secondsfloat | list | CallableNoneDelay between retries
retry_jitter_factorfloatNoneRandom jitter multiplier added to delay
retry_condition_fnCallable[[Exception], bool]NonePredicate that decides whether to retry

Delay Strategies

The retry_delay_seconds parameter supports four patterns:

Fixed delay

A single number repeats for every retry attempt:

@task(retries=3, retry_delay_seconds=2.0)
def fixed():
    ...
# Delays: [2.0, 2.0, 2.0]

Custom sequence

A list specifies the delay for each attempt. If the list is shorter than the retry count, the last value is repeated:

@task(retries=3, retry_delay_seconds=[1, 5, 30])
def escalating():
    ...
# Delays: [1.0, 5.0, 30.0]

Dynamic (callable)

A callable receives the retry count and returns a list of delays:

@task(retries=5, retry_delay_seconds=lambda n: [2**i for i in range(n)])
def exponential():
    ...
# Delays: [1, 2, 4, 8, 16]

No delay

When retry_delay_seconds is None (default), retries are immediate with zero delay.

Jitter

Jitter adds randomness to retry delays to prevent thundering-herd problems when multiple tasks retry simultaneously.

Formula:

final_delay = base_delay + (base_delay × jitter_factor × random())

Where random() is a uniform value in [0, 1).

Example: With retry_delay_seconds=2.0 and retry_jitter_factor=0.5:

  • Jitter range per attempt: 0 to 2.0 × 0.5 = 1.0
  • Final delay range: 2.0 to 3.0 seconds
@task(retries=3, retry_delay_seconds=1.0, retry_jitter_factor=0.5)
def jittered():
    ...
# Each delay: base 1.0 + random jitter up to 0.5

The retry_jitter_factor must be >= 0. A negative value raises RetryConfigurationError.

Conditional Retry

The retry_condition_fn parameter lets you selectively retry only specific exception types:

from dagy import task

def only_transient(exc: Exception) -> bool:
    return isinstance(exc, (ConnectionError, TimeoutError))

@task(retries=5, retry_delay_seconds=1.0, retry_condition_fn=only_transient)
def api_call(endpoint: str) -> dict:
    ...

Logic:

  1. The base condition checks attempt < max_retries
  2. If retry_condition_fn is provided, both must be true to retry
  3. The function receives the caught exception instance
  4. Return True to retry, False to fail immediately

This is useful for distinguishing transient errors (network timeouts, rate limits) from permanent failures (invalid input, authentication errors).

Timeouts

Hard timeouts prevent tasks from running indefinitely. Specify timeout_seconds on the @task decorator:

@task(timeout_seconds=30.0)
def must_finish_fast():
    ...

How timeouts work

Local execution: Timeouts are enforced using concurrent.futures.ThreadPoolExecutor. The task runs in a worker thread, and if it exceeds the timeout, a TaskTimeoutError is raised.

Key behaviors:

  • Timeouts are enforced per attempt, not across all retries
  • Each retry attempt gets a fresh timeout window
  • TaskTimeoutError is treated like any other exception; it can trigger retries
@task(retries=3, retry_delay_seconds=1.0, timeout_seconds=5.0)
def robust_fetch(url: str) -> dict:
    # Each attempt has 5 seconds. If it times out, retries up to 3 times.
    ...

Timeouts with Step Functions

When deployed to Step Functions, timeout_seconds maps directly to the ASL TimeoutSeconds property on the Task state.

Retry Behavior Across Backends

FeatureLocal ExecutorStep Functions
Retry delaysFull support (fixed, list, callable)Fixed interval only (IntervalSeconds)
JitterSDK-computed per attemptMapped to BackoffRate: 2.0
TimeoutThreadPoolExecutor enforcementASL TimeoutSeconds
Conditional retryFull retry_condition_fn supportNot supported (retries on all errors)
Error routingException propagationASL CatchHandleTaskFailure state

Note: Step Functions translates retry delays to max(1, int(interval)), losing fractional seconds. List-based and callable delays use only the first interval value.

Lifecycle Hooks During Retries

Hooks are invoked at each stage of the retry lifecycle:

HookWhen
on_runningBefore each attempt (including retries)
on_retryAfter a failure, before the retry delay
on_completionAfter a successful attempt
on_failureAfter the final failure (all retries exhausted)
from dagy import task, Hook, RunContext, State

class LogRetry(Hook):
    def __call__(self, context: RunContext, state: State):
        print(f"Retry attempt {context.attempt}/{context.max_retries}: {state.message}")

@task(retries=3, on_retry=[LogRetry()])
def monitored_task():
    ...

The RunContext provided to hooks includes:

FieldDescription
kind"task" or "flow"
nameTask or flow name
attemptCurrent attempt number (1-indexed)
max_retriesMaximum retry count
parametersRuntime parameters