Back to examples
Hooks

Task Hooks

Lifecycle hooks (on_running, on_completion, on_failure) attached to individual tasks.

When to use

Use task hooks when you need to observe, log, or react to state changes of specific tasks; e.g. send a notification when a critical task fails.

hooks/task-hooks.py
"""
01_task_hooks.py: Lifecycle hooks at the task level.

Demonstrates:
- on_running, on_completion, on_failure hooks on @task
- Hook signature: Callable[[RunContext, State], None]
- Accessing RunContext.name, .attempt, .parameters inside a hook
"""

from dagy import Hook, RunContext, State, flow, task

_attempt_tracker: dict[str, int] = {}


def on_running_hook(ctx: RunContext, state: State) -> None:
    print(f"  [HOOK running]    {ctx.name} attempt={ctx.attempt} params={ctx.parameters}")


def on_completion_hook(ctx: RunContext, state: State) -> None:
    print(f"  [HOOK completed]  {ctx.name} attempt={ctx.attempt}")


def on_failure_hook(ctx: RunContext, state: State) -> None:
    print(f"  [HOOK failed]     {ctx.name} attempt={ctx.attempt} msg={state.message}")


@task(
    retries=2,
    retry_delay_seconds=0.02,
    on_running=[on_running_hook],
    on_completion=[on_completion_hook],
    on_failure=[on_failure_hook],
)
def unreliable_task(value: int) -> int:
    _attempt_tracker["unreliable"] = _attempt_tracker.get("unreliable", 0) + 1
    n = _attempt_tracker["unreliable"]
    if n < 2:
        raise ValueError(f"Not ready on attempt {n}")
    return value * 2


@flow(name="task_hooks_flow")
def task_hooks_flow(value: int = 7) -> None:
    unreliable_task(value)


if __name__ == "__main__":
    result = task_hooks_flow.run_local(value=7)
    print(f"Run ID : {result.run_id}")
    print(f"Status : {result.status}")

How it works

  1. Define hook functions with the signature `(ctx: RunContext, state: State) -> None`.
  2. Attach them to `@task` via `on_running`, `on_completion`, `on_failure` lists.
  3. `RunContext` gives access to `name`, `attempt`, `parameters`, and `run_id`.
  4. `State` provides the current state `type` and optional `message` (for failures).
  5. The example task retries twice, so you see the failure hook fire, then the completion hook on success.

Inputs

  • `value` (int, default `7`): input value to double.

Outputs

  • The doubled value, with hook messages printed to stdout.

Dependencies

  • `dagy`, `RunContext`, `State`, `Hook`