Back to examples
Hooks

Flow Hooks

Lifecycle hooks at the flow level; fire once for the entire pipeline run.

When to use

Use flow hooks when you need a single notification or action at the start/end of the entire pipeline, rather than per-task.

hooks/flow-hooks.py
"""
02_flow_hooks.py: Lifecycle hooks at the flow level.

Demonstrates:
- on_running, on_completion, on_failure hooks on @flow
- Flow-level hooks fire once for the entire pipeline run
"""

from dagy import RunContext, State, flow, task


def flow_started(ctx: RunContext, state: State) -> None:
    print(f"  [FLOW running]   '{ctx.name}' run_id={ctx.run_id} params={ctx.parameters}")


def flow_done(ctx: RunContext, state: State) -> None:
    print(f"  [FLOW completed] '{ctx.name}' run_id={ctx.run_id}")


def flow_failed(ctx: RunContext, state: State) -> None:
    print(f"  [FLOW failed]    '{ctx.name}' run_id={ctx.run_id} msg={state.message}")


@task
def step_one(n: int) -> int:
    return n + 1


@task
def step_two(n: int) -> int:
    return n * 10


@flow(
    name="flow_hooks_flow",
    on_running=[flow_started],
    on_completion=[flow_done],
    on_failure=[flow_failed],
)
def flow_hooks_flow(n: int = 5) -> None:
    a = step_one(n)
    step_two(a)


if __name__ == "__main__":
    result = flow_hooks_flow.run_local(n=5)
    print(f"Run ID : {result.run_id}")
    print(f"Status : {result.status}")

How it works

  1. Define flow-level hooks with the same `(RunContext, State)` signature.
  2. Attach to `@flow` via `on_running`, `on_completion`, `on_failure`.
  3. `flow_started` fires once when the pipeline begins.
  4. `flow_done` fires once when all tasks complete successfully.
  5. `flow_failed` fires if any task causes the pipeline to fail.

Inputs

  • `n` (int, default `5`): input number.

Outputs

  • The computed result, with flow-level hook messages.

Dependencies

  • `dagy`, `RunContext`, `State`