Back to examples
Hooks
Flow Hooks
Lifecycle hooks at the flow level; fire once for the entire pipeline run.
When to use
Use flow hooks when you need a single notification or action at the start/end of the entire pipeline, rather than per-task.
hooks/flow-hooks.py
"""
02_flow_hooks.py: Lifecycle hooks at the flow level.
Demonstrates:
- on_running, on_completion, on_failure hooks on @flow
- Flow-level hooks fire once for the entire pipeline run
"""
from dagy import RunContext, State, flow, task
def flow_started(ctx: RunContext, state: State) -> None:
print(f" [FLOW running] '{ctx.name}' run_id={ctx.run_id} params={ctx.parameters}")
def flow_done(ctx: RunContext, state: State) -> None:
print(f" [FLOW completed] '{ctx.name}' run_id={ctx.run_id}")
def flow_failed(ctx: RunContext, state: State) -> None:
print(f" [FLOW failed] '{ctx.name}' run_id={ctx.run_id} msg={state.message}")
@task
def step_one(n: int) -> int:
return n + 1
@task
def step_two(n: int) -> int:
return n * 10
@flow(
name="flow_hooks_flow",
on_running=[flow_started],
on_completion=[flow_done],
on_failure=[flow_failed],
)
def flow_hooks_flow(n: int = 5) -> None:
a = step_one(n)
step_two(a)
if __name__ == "__main__":
result = flow_hooks_flow.run_local(n=5)
print(f"Run ID : {result.run_id}")
print(f"Status : {result.status}")How it works
- Define flow-level hooks with the same `(RunContext, State)` signature.
- Attach to `@flow` via `on_running`, `on_completion`, `on_failure`.
- `flow_started` fires once when the pipeline begins.
- `flow_done` fires once when all tasks complete successfully.
- `flow_failed` fires if any task causes the pipeline to fail.
Inputs
- `n` (int, default `5`): input number.
Outputs
- The computed result, with flow-level hook messages.
Dependencies
`dagy`, `RunContext`, `State`