Back to examples
Hooks

Audit Hooks

Reusable hook factory pattern for consistent audit logging across tasks.

When to use

Use this pattern when multiple tasks need the same hook behaviour. A factory function avoids duplication and lets you inject context via closures.

hooks/audit-hooks.py
"""
03_audit_hooks.py: Reusable hook factory applied to multiple tasks.

Demonstrates:
- make_log_hook(label) factory that returns a Hook
- The same logging behaviour shared across several tasks without duplication
- Hook closures capturing extra context
"""

import datetime

from dagy import RunContext, State, flow, task


def make_log_hook(label: str):
    """Return a Hook that prints a structured audit line tagged with `label`."""
    def hook(ctx: RunContext, state: State) -> None:
        ts = datetime.datetime.now(datetime.timezone.utc).strftime("%H:%M:%S")
        print(f"  [{ts}] {label:12s} | task={ctx.name:20s} | state={state.type:10s} | attempt={ctx.attempt}")
    return hook


# Shared hooks
_on_start = make_log_hook("AUDIT:start")
_on_end = make_log_hook("AUDIT:end")
_on_fail = make_log_hook("AUDIT:fail")


@task(on_running=[_on_start], on_completion=[_on_end], on_failure=[_on_fail])
def extract(source: str) -> list:
    return [{"id": i, "src": source} for i in range(5)]


@task(on_running=[_on_start], on_completion=[_on_end], on_failure=[_on_fail])
def transform_records(records: list) -> list:
    return [{**r, "processed": True} for r in records]


@task(on_running=[_on_start], on_completion=[_on_end], on_failure=[_on_fail])
def load(records: list) -> int:
    return len(records)


@flow(name="audit_hooks_flow")
def audit_hooks_flow(source: str = "db") -> None:
    raw = extract(source)
    processed = transform_records(raw)
    load(processed)


if __name__ == "__main__":
    result = audit_hooks_flow.run_local(source="db")
    print(f"Run ID : {result.run_id}")
    print(f"Status : {result.status}")

How it works

  1. `make_log_hook(label)` is a factory that returns a hook function.
  2. The returned hook captures `label` via closure and prints a structured audit line.
  3. The same `_on_start`, `_on_end`, `_on_fail` hooks are reused on all three tasks.
  4. Each task (`extract`, `transform_records`, `load`) gets consistent audit logging.
  5. The timestamp, task name, state, and attempt number are logged for each lifecycle event.

Inputs

  • `source` (str, default `"db"`): data source name.

Outputs

  • The count of loaded records, with audit trail in stdout.

Dependencies

  • `dagy`, `RunContext`, `State`, `datetime` (stdlib)