Back to examples
Basics
Logging
Capture print() and Python logging output as structured task events.
When to use
Use this when you need observability into your tasks. DAGY captures both `print()` and `logging.*` calls and stores them as task_log events.
basics/logging.py
"""
05_logging.py: Verify that print() and logging are fully captured.
Demonstrates:
- print() output captured via thread-local stdout intercept
- logging.debug / .info / .warning / .error captured via root log handler
- Both sources end up as task_log events in events.jsonl (and task.log)
After running locally you can inspect the captured output at:
~/.dagy/runs/<run_id>/events.jsonl ← all events for the run
~/.dagy/runs/<run_id>/task_runs/<id>/events.jsonl ← per-task events
Three-stage pipeline:
generate_dataset → validate_records → summarise_results
"""
import logging
import random
import time
from dagy import flow, task
logger = logging.getLogger(__name__)
# ─── Tasks ────────────────────────────────────────────────────────────────────
@task
def generate_dataset(n: int) -> list[dict]:
"""Step 1: produce a synthetic dataset and log its creation."""
print(f"[generate_dataset] generating {n} records")
records = [{"id": i, "value": i * 3, "valid": i % 5 != 0} for i in range(1, n + 1)]
time.sleep(random.uniform(0.5, 5.0))
logger.debug("raw records: %s", records)
logger.info("generated %d records (%d valid)", n, sum(r["valid"] for r in records))
print(f"[generate_dataset] done - {len(records)} records ready")
return records
@task
def validate_records(records: list[dict]) -> dict:
"""Step 2: split records into valid/invalid and warn on each rejected one."""
print(f"[validate_records] validating {len(records)} records")
time.sleep(random.uniform(0.3, 5.0))
valid, invalid = [], []
for r in records:
if r["valid"]:
valid.append(r)
else:
invalid.append(r)
logger.warning("rejected record id=%d (value=%d)", r["id"], r["value"])
logger.info("validation complete: %d accepted, %d rejected", len(valid), len(invalid))
print(f"[validate_records] accepted={len(valid)} rejected={len(invalid)}")
return {"valid": valid, "invalid": invalid, "total": len(records)}
@task
def summarise_results(report: dict, threshold: float) -> str:
"""Step 3: compute pass-rate and emit an error log if it falls below the threshold."""
valid_count = len(report["valid"])
total = report["total"]
pass_rate = valid_count / total if total else 0.0
print(f"[summarise_results] pass_rate={pass_rate:.1%} threshold={threshold:.1%}")
time.sleep(random.uniform(0.2, 5.0))
logger.debug("full report: %s", report)
if pass_rate < threshold:
logger.error(
"pass rate %.1f%% is below threshold %.1f%%",
pass_rate * 100,
threshold * 100,
)
else:
logger.info("pass rate %.1f%% meets threshold %.1f%%", pass_rate * 100, threshold * 100)
summary = (
f"total={total} valid={valid_count} "
f"invalid={len(report['invalid'])} pass_rate={pass_rate:.1%}"
)
print(f"[summarise_results] {summary}")
return summary
# ─── Flow ─────────────────────────────────────────────────────────────────────
@flow(name="logging_test")
def logging_test_flow(n: int = 12, threshold: float = 0.75) -> None:
"""Pipeline whose only purpose is to exercise the log capture system."""
records = generate_dataset(n)
report = validate_records(records)
summarise_results(report, threshold)
# ─── Entry point ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
# Lower the root logger level so that debug/info/warning/error all flow
# through the DAGY capture handler and appear in events.jsonl.
logging.basicConfig(level=logging.DEBUG, format="%(levelname)s %(name)s: %(message)s")
result = logging_test_flow.deploy(
name="logging-test",
schedule="*/5 * * * *",
timezone="Asia/Kolkata",
)
# LocalRunContext (no API configured) exposes run_id and status directly.
# _DeployResult (remote API) exposes flow_name / deployment_name.
if hasattr(result, "run_id"):
print(f"\nrun_id : {result.run_id}")
print(f"status : {result.status}")
print(f"\nLog files written to:")
print(f" ~/.dagy/runs/{result.run_id}/events.jsonl")
print(f" ~/.dagy/runs/{result.run_id}/run.log")
else:
print(f"Deployed : {result.flow_name}:{result.flow_version}")
print(f"Deployment: {result.deployment_name}")How it works
- `generate_dataset` creates synthetic records and logs via both `print()` and `logging`.
- `validate_records` splits records into valid/invalid, warning on each rejection.
- `summarise_results` computes a pass rate and emits an error log if below threshold.
- DAGY intercepts stdout via a thread-local redirect and attaches a root log handler.
- All captured output is written to `events.jsonl` for each task run.
Inputs
- `n` (int, default `12`): number of records to generate.
- `threshold` (float, default `0.75`): minimum pass rate.
Outputs
- A summary string with total, valid, invalid counts, and pass rate.
- Log events written to `~/.dagy/runs/<run_id>/events.jsonl`.
Dependencies
`dagy`, `logging` (stdlib), `random` (stdlib), `time` (stdlib)