Back to docs
Concepts

States & Lifecycle

Every task and flow execution in Dagy transitions through a defined set of states. Lifecycle hooks let you react to these transitions for logging, notifications, and custom logic.

States

Run States

StateDescription
QUEUEDRun has been created and is waiting for execution
RUNNINGRun is actively executing
SUCCEEDEDAll tasks completed successfully
FAILEDOne or more tasks failed (after exhausting retries)
CANCELLEDRun was cancelled via API or user action

Task States

StateDescription
RUNNINGTask is executing (or retrying)
SUCCEEDEDTask completed successfully
FAILEDTask failed after all retry attempts
SKIPPEDTask was skipped because an upstream dependency failed
TIMED_OUTTask exceeded its timeout_seconds limit

State Transitions

Flow Lifecycle

QUEUED → RUNNING → SUCCEEDED
                 → FAILED
      → CANCELLED

A flow moves to RUNNING when execution begins. It ends in SUCCEEDED if all tasks succeed, or FAILED if any task fails (and fail_fast is enabled or all tasks have been attempted).

Task Lifecycle

RUNNING → SUCCEEDED
        → FAILED → (retry) → RUNNING
        → TIMED_OUT → (retry) → RUNNING
SKIPPED (if upstream dependency failed)

Key behaviors:

  • Retries: A failed task transitions back to RUNNING for each retry attempt, up to retries times
  • Timeouts: A TaskTimeoutError is treated like any other failure and can trigger retries
  • Skipping: If any upstream dependency is FAILED or SKIPPED, the task is marked SKIPPED without execution
  • Fail-fast: When fail_fast=True (default), the first task failure cancels remaining inflight tasks and raises TaskFailedError

Dependency Resolution

The local executor processes tasks using a dependency-aware scheduling loop:

  1. Ready check: A task is ready when all its depends_on task IDs have a status in statuses
  2. Failure propagation: If any dependency is FAILED or SKIPPED, the task is immediately marked SKIPPED
  3. Parallel execution: Ready tasks are submitted to a ThreadPoolExecutor up to max_workers concurrency
  4. Cycle detection: If no tasks can be scheduled and none are inflight, a DependencyCycleError is raised

State Object

The State dataclass represents a point-in-time state:

from dagy import running, completed, failed

state = running()                  # State(type="running")
state = completed()                # State(type="completed")
state = failed("timeout exceeded") # State(type="failed", message="timeout exceeded")

Each state includes an automatic UTC timestamp:

@dataclass(frozen=True)
class State:
    type: str
    message: Optional[str] = None
    timestamp: str = ""  # Auto-set to UTC ISO timestamp

Lifecycle Hooks

Hooks are callables invoked at state transitions. Both @task and @flow decorators accept four hook lists:

HookTriggerState
on_runningBefore each execution attemptrunning()
on_retryAfter failure, before retry delayfailed("retrying after error: ...")
on_completionAfter successful executioncompleted()
on_failureAfter final failure (retries exhausted)failed(str(exc))

Hook Signature

A hook is any callable that accepts (RunContext, State):

from dagy import Hook, RunContext, State

def my_hook(context: RunContext, state: State) -> None:
    print(f"[{context.name}] attempt {context.attempt}: {state.type}")

RunContext

The RunContext provided to hooks contains:

@dataclass(frozen=True)
class RunContext:
    kind: str              # "task" or "flow"
    name: str              # Task or flow name
    attempt: int           # Current attempt (1-indexed)
    max_retries: int       # Maximum retry count
    parameters: Dict       # Runtime parameters
    run_id: Optional[str]  # Run identifier (when available)

Example: Slack Notification on Failure

from dagy import task, RunContext, State

def notify_slack(context: RunContext, state: State) -> None:
    if state.type == "failed":
        send_slack_message(
            channel="#alerts",
            text=f"Task {context.name} failed after {context.attempt} attempts: {state.message}"
        )

@task(retries=3, on_failure=[notify_slack])
def critical_task():
    ...

Rich Task Run State (Flow Executor)

When using the Flow Executor backend (execution_mode set to in-process or task-isolated), each task run record in DynamoDB includes additional observability fields:

FieldDescription
input_payloadSerialized inputs passed to the task function
output_payloadSerialized return value from the task (small payloads stored inline)
resolved_paramsFinal parameter values after resolution and validation
stack_traceFull Python traceback on failure
retry_countNumber of retry attempts executed
artifact_refsReferences to artifacts produced by the task
output_s3_uriS3 URI where the task output is persisted
state_s3_uriS3 URI for the full task state snapshot

Small payloads (under ~300 KB) are stored inline in DynamoDB for fast access. Larger payloads are stored in S3, with the URI recorded in the output_s3_uri field.

Hook Execution Order

For a task with retries, hooks fire in this order:

on_running  → (attempt 1 fails)
on_retry    → (delay)
on_running  → (attempt 2 fails)
on_retry    → (delay)
on_running  → (attempt 3 succeeds)
on_completion

If all attempts fail:

on_running  → (attempt 1 fails)
on_retry    → (delay)
on_running  → (attempt 2 fails)
on_retry    → (delay)
on_running  → (attempt 3 fails)
on_failure

Local Executor Hooks

The local executor provides an additional set of observability hooks for run_local():

HookSignatureDescription
on_task_start(task_id, attempt)Task execution begins
on_task_success(task_id, attempt)Task succeeded
on_task_failure(task_id, attempt, message)Task failed (final)
on_task_retry(task_id, attempt, message)Task retrying
on_task_output(task_id, attempt, line)Captured stdout/stderr line

These hooks power the local run logging and event recording system. Run metadata is written to ~/.dagy/runs/<run_id>/metadata.json.

State Storage

Local (DuckDB)

run_local() tracks state in a local DuckDB database at ~/.dagy/dagy.duckdb:

  • create_run(): records flow run start
  • update_run(): updates status and end time
  • create_task_run(): records task start with attempt count
  • update_task_run(): updates task status, end time, and error

Remote (API)

The API stores run and task states in DynamoDB. State updates are processed asynchronously via SQS events (run_status_update). The API tracks additional fields like started_at, completed_at, and error_message.