Back to examples
Hooks
Audit Hooks
Reusable hook factory pattern for consistent audit logging across tasks.
When to use
Use this pattern when multiple tasks need the same hook behaviour. A factory function avoids duplication and lets you inject context via closures.
hooks/audit-hooks.py
"""
03_audit_hooks.py: Reusable hook factory applied to multiple tasks.
Demonstrates:
- make_log_hook(label) factory that returns a Hook
- The same logging behaviour shared across several tasks without duplication
- Hook closures capturing extra context
"""
import datetime
from dagy import RunContext, State, flow, task
def make_log_hook(label: str):
"""Return a Hook that prints a structured audit line tagged with `label`."""
def hook(ctx: RunContext, state: State) -> None:
ts = datetime.datetime.now(datetime.timezone.utc).strftime("%H:%M:%S")
print(f" [{ts}] {label:12s} | task={ctx.name:20s} | state={state.type:10s} | attempt={ctx.attempt}")
return hook
# Shared hooks
_on_start = make_log_hook("AUDIT:start")
_on_end = make_log_hook("AUDIT:end")
_on_fail = make_log_hook("AUDIT:fail")
@task(on_running=[_on_start], on_completion=[_on_end], on_failure=[_on_fail])
def extract(source: str) -> list:
return [{"id": i, "src": source} for i in range(5)]
@task(on_running=[_on_start], on_completion=[_on_end], on_failure=[_on_fail])
def transform_records(records: list) -> list:
return [{**r, "processed": True} for r in records]
@task(on_running=[_on_start], on_completion=[_on_end], on_failure=[_on_fail])
def load(records: list) -> int:
return len(records)
@flow(name="audit_hooks_flow")
def audit_hooks_flow(source: str = "db") -> None:
raw = extract(source)
processed = transform_records(raw)
load(processed)
if __name__ == "__main__":
result = audit_hooks_flow.run_local(source="db")
print(f"Run ID : {result.run_id}")
print(f"Status : {result.status}")How it works
- `make_log_hook(label)` is a factory that returns a hook function.
- The returned hook captures `label` via closure and prints a structured audit line.
- The same `_on_start`, `_on_end`, `_on_fail` hooks are reused on all three tasks.
- Each task (`extract`, `transform_records`, `load`) gets consistent audit logging.
- The timestamp, task name, state, and attempt number are logged for each lifecycle event.
Inputs
- `source` (str, default `"db"`): data source name.
Outputs
- The count of loaded records, with audit trail in stdout.
Dependencies
`dagy`, `RunContext`, `State`, `datetime` (stdlib)