Back to examples
Hooks
Task Hooks
Lifecycle hooks (on_running, on_completion, on_failure) attached to individual tasks.
When to use
Use task hooks when you need to observe, log, or react to state changes of specific tasks; e.g. send a notification when a critical task fails.
hooks/task-hooks.py
"""
01_task_hooks.py: Lifecycle hooks at the task level.
Demonstrates:
- on_running, on_completion, on_failure hooks on @task
- Hook signature: Callable[[RunContext, State], None]
- Accessing RunContext.name, .attempt, .parameters inside a hook
"""
from dagy import Hook, RunContext, State, flow, task
_attempt_tracker: dict[str, int] = {}
def on_running_hook(ctx: RunContext, state: State) -> None:
print(f" [HOOK running] {ctx.name} attempt={ctx.attempt} params={ctx.parameters}")
def on_completion_hook(ctx: RunContext, state: State) -> None:
print(f" [HOOK completed] {ctx.name} attempt={ctx.attempt}")
def on_failure_hook(ctx: RunContext, state: State) -> None:
print(f" [HOOK failed] {ctx.name} attempt={ctx.attempt} msg={state.message}")
@task(
retries=2,
retry_delay_seconds=0.02,
on_running=[on_running_hook],
on_completion=[on_completion_hook],
on_failure=[on_failure_hook],
)
def unreliable_task(value: int) -> int:
_attempt_tracker["unreliable"] = _attempt_tracker.get("unreliable", 0) + 1
n = _attempt_tracker["unreliable"]
if n < 2:
raise ValueError(f"Not ready on attempt {n}")
return value * 2
@flow(name="task_hooks_flow")
def task_hooks_flow(value: int = 7) -> None:
unreliable_task(value)
if __name__ == "__main__":
result = task_hooks_flow.run_local(value=7)
print(f"Run ID : {result.run_id}")
print(f"Status : {result.status}")How it works
- Define hook functions with the signature `(ctx: RunContext, state: State) -> None`.
- Attach them to `@task` via `on_running`, `on_completion`, `on_failure` lists.
- `RunContext` gives access to `name`, `attempt`, `parameters`, and `run_id`.
- `State` provides the current state `type` and optional `message` (for failures).
- The example task retries twice, so you see the failure hook fire, then the completion hook on success.
Inputs
- `value` (int, default `7`): input value to double.
Outputs
- The doubled value, with hook messages printed to stdout.
Dependencies
`dagy`, `RunContext`, `State`, `Hook`