States & Lifecycle
Every task and flow execution in Dagy transitions through a defined set of states. Lifecycle hooks let you react to these transitions for logging, notifications, and custom logic.
States
Run States
| State | Description |
|---|---|
QUEUED | Run has been created and is waiting for execution |
RUNNING | Run is actively executing |
SUCCEEDED | All tasks completed successfully |
FAILED | One or more tasks failed (after exhausting retries) |
CANCELLED | Run was cancelled via API or user action |
Task States
| State | Description |
|---|---|
RUNNING | Task is executing (or retrying) |
SUCCEEDED | Task completed successfully |
FAILED | Task failed after all retry attempts |
SKIPPED | Task was skipped because an upstream dependency failed |
TIMED_OUT | Task exceeded its timeout_seconds limit |
State Transitions
Flow Lifecycle
QUEUED → RUNNING → SUCCEEDED
→ FAILED
→ CANCELLED
A flow moves to RUNNING when execution begins. It ends in SUCCEEDED if all tasks succeed, or FAILED if any task fails (and fail_fast is enabled or all tasks have been attempted).
Task Lifecycle
RUNNING → SUCCEEDED
→ FAILED → (retry) → RUNNING
→ TIMED_OUT → (retry) → RUNNING
SKIPPED (if upstream dependency failed)
Key behaviors:
- Retries: A failed task transitions back to
RUNNINGfor each retry attempt, up toretriestimes - Timeouts: A
TaskTimeoutErroris treated like any other failure and can trigger retries - Skipping: If any upstream dependency is
FAILEDorSKIPPED, the task is markedSKIPPEDwithout execution - Fail-fast: When
fail_fast=True(default), the first task failure cancels remaining inflight tasks and raisesTaskFailedError
Dependency Resolution
The local executor processes tasks using a dependency-aware scheduling loop:
- Ready check: A task is ready when all its
depends_ontask IDs have a status instatuses - Failure propagation: If any dependency is
FAILEDorSKIPPED, the task is immediately markedSKIPPED - Parallel execution: Ready tasks are submitted to a
ThreadPoolExecutorup tomax_workersconcurrency - Cycle detection: If no tasks can be scheduled and none are inflight, a
DependencyCycleErroris raised
State Object
The State dataclass represents a point-in-time state:
from dagy import running, completed, failed
state = running() # State(type="running")
state = completed() # State(type="completed")
state = failed("timeout exceeded") # State(type="failed", message="timeout exceeded")
Each state includes an automatic UTC timestamp:
@dataclass(frozen=True)
class State:
type: str
message: Optional[str] = None
timestamp: str = "" # Auto-set to UTC ISO timestamp
Lifecycle Hooks
Hooks are callables invoked at state transitions. Both @task and @flow decorators accept four hook lists:
| Hook | Trigger | State |
|---|---|---|
on_running | Before each execution attempt | running() |
on_retry | After failure, before retry delay | failed("retrying after error: ...") |
on_completion | After successful execution | completed() |
on_failure | After final failure (retries exhausted) | failed(str(exc)) |
Hook Signature
A hook is any callable that accepts (RunContext, State):
from dagy import Hook, RunContext, State
def my_hook(context: RunContext, state: State) -> None:
print(f"[{context.name}] attempt {context.attempt}: {state.type}")
RunContext
The RunContext provided to hooks contains:
@dataclass(frozen=True)
class RunContext:
kind: str # "task" or "flow"
name: str # Task or flow name
attempt: int # Current attempt (1-indexed)
max_retries: int # Maximum retry count
parameters: Dict # Runtime parameters
run_id: Optional[str] # Run identifier (when available)
Example: Slack Notification on Failure
from dagy import task, RunContext, State
def notify_slack(context: RunContext, state: State) -> None:
if state.type == "failed":
send_slack_message(
channel="#alerts",
text=f"Task {context.name} failed after {context.attempt} attempts: {state.message}"
)
@task(retries=3, on_failure=[notify_slack])
def critical_task():
...
Rich Task Run State (Flow Executor)
When using the Flow Executor backend (execution_mode set to in-process or task-isolated), each task run record in DynamoDB includes additional observability fields:
| Field | Description |
|---|---|
input_payload | Serialized inputs passed to the task function |
output_payload | Serialized return value from the task (small payloads stored inline) |
resolved_params | Final parameter values after resolution and validation |
stack_trace | Full Python traceback on failure |
retry_count | Number of retry attempts executed |
artifact_refs | References to artifacts produced by the task |
output_s3_uri | S3 URI where the task output is persisted |
state_s3_uri | S3 URI for the full task state snapshot |
Small payloads (under ~300 KB) are stored inline in DynamoDB for fast access. Larger payloads are stored in S3, with the URI recorded in the output_s3_uri field.
Hook Execution Order
For a task with retries, hooks fire in this order:
on_running → (attempt 1 fails)
on_retry → (delay)
on_running → (attempt 2 fails)
on_retry → (delay)
on_running → (attempt 3 succeeds)
on_completion
If all attempts fail:
on_running → (attempt 1 fails)
on_retry → (delay)
on_running → (attempt 2 fails)
on_retry → (delay)
on_running → (attempt 3 fails)
on_failure
Local Executor Hooks
The local executor provides an additional set of observability hooks for run_local():
| Hook | Signature | Description |
|---|---|---|
on_task_start | (task_id, attempt) | Task execution begins |
on_task_success | (task_id, attempt) | Task succeeded |
on_task_failure | (task_id, attempt, message) | Task failed (final) |
on_task_retry | (task_id, attempt, message) | Task retrying |
on_task_output | (task_id, attempt, line) | Captured stdout/stderr line |
These hooks power the local run logging and event recording system. Run metadata is written to ~/.dagy/runs/<run_id>/metadata.json.
State Storage
Local (DuckDB)
run_local() tracks state in a local DuckDB database at ~/.dagy/dagy.duckdb:
create_run(): records flow run startupdate_run(): updates status and end timecreate_task_run(): records task start with attempt countupdate_task_run(): updates task status, end time, and error
Remote (API)
The API stores run and task states in DynamoDB. State updates are processed asynchronously via SQS events (run_status_update). The API tracks additional fields like started_at, completed_at, and error_message.