Back to examples
Basics

Logging

Capture print() and Python logging output as structured task events.

When to use

Use this when you need observability into your tasks. DAGY captures both `print()` and `logging.*` calls and stores them as task_log events.

basics/logging.py
"""
05_logging.py: Verify that print() and logging are fully captured.

Demonstrates:
- print() output captured via thread-local stdout intercept
- logging.debug / .info / .warning / .error captured via root log handler
- Both sources end up as task_log events in events.jsonl (and task.log)

After running locally you can inspect the captured output at:
  ~/.dagy/runs/<run_id>/events.jsonl        ← all events for the run
  ~/.dagy/runs/<run_id>/task_runs/<id>/events.jsonl  ← per-task events

Three-stage pipeline:
  generate_dataset → validate_records → summarise_results
"""

import logging
import random
import time

from dagy import flow, task

logger = logging.getLogger(__name__)


# ─── Tasks ────────────────────────────────────────────────────────────────────


@task
def generate_dataset(n: int) -> list[dict]:
    """Step 1: produce a synthetic dataset and log its creation."""
    print(f"[generate_dataset] generating {n} records")

    records = [{"id": i, "value": i * 3, "valid": i % 5 != 0} for i in range(1, n + 1)]
    time.sleep(random.uniform(0.5, 5.0))

    logger.debug("raw records: %s", records)
    logger.info("generated %d records (%d valid)", n, sum(r["valid"] for r in records))

    print(f"[generate_dataset] done - {len(records)} records ready")
    return records


@task
def validate_records(records: list[dict]) -> dict:
    """Step 2: split records into valid/invalid and warn on each rejected one."""
    print(f"[validate_records] validating {len(records)} records")
    time.sleep(random.uniform(0.3, 5.0))

    valid, invalid = [], []
    for r in records:
        if r["valid"]:
            valid.append(r)
        else:
            invalid.append(r)
            logger.warning("rejected record id=%d (value=%d)", r["id"], r["value"])

    logger.info("validation complete: %d accepted, %d rejected", len(valid), len(invalid))
    print(f"[validate_records] accepted={len(valid)}  rejected={len(invalid)}")

    return {"valid": valid, "invalid": invalid, "total": len(records)}


@task
def summarise_results(report: dict, threshold: float) -> str:
    """Step 3: compute pass-rate and emit an error log if it falls below the threshold."""
    valid_count = len(report["valid"])
    total = report["total"]
    pass_rate = valid_count / total if total else 0.0

    print(f"[summarise_results] pass_rate={pass_rate:.1%}  threshold={threshold:.1%}")
    time.sleep(random.uniform(0.2, 5.0))

    logger.debug("full report: %s", report)

    if pass_rate < threshold:
        logger.error(
            "pass rate %.1f%% is below threshold %.1f%%",
            pass_rate * 100,
            threshold * 100,
        )
    else:
        logger.info("pass rate %.1f%% meets threshold %.1f%%", pass_rate * 100, threshold * 100)

    summary = (
        f"total={total}  valid={valid_count}  "
        f"invalid={len(report['invalid'])}  pass_rate={pass_rate:.1%}"
    )
    print(f"[summarise_results] {summary}")
    return summary


# ─── Flow ─────────────────────────────────────────────────────────────────────


@flow(name="logging_test")
def logging_test_flow(n: int = 12, threshold: float = 0.75) -> None:
    """Pipeline whose only purpose is to exercise the log capture system."""
    records = generate_dataset(n)
    report = validate_records(records)
    summarise_results(report, threshold)


# ─── Entry point ──────────────────────────────────────────────────────────────


if __name__ == "__main__":
    # Lower the root logger level so that debug/info/warning/error all flow
    # through the DAGY capture handler and appear in events.jsonl.
    logging.basicConfig(level=logging.DEBUG, format="%(levelname)s %(name)s: %(message)s")

    result = logging_test_flow.deploy(
        name="logging-test",
        schedule="*/5 * * * *",
        timezone="Asia/Kolkata",
    )

    # LocalRunContext (no API configured) exposes run_id and status directly.
    # _DeployResult (remote API) exposes flow_name / deployment_name.
    if hasattr(result, "run_id"):
        print(f"\nrun_id : {result.run_id}")
        print(f"status : {result.status}")
        print(f"\nLog files written to:")
        print(f"  ~/.dagy/runs/{result.run_id}/events.jsonl")
        print(f"  ~/.dagy/runs/{result.run_id}/run.log")
    else:
        print(f"Deployed : {result.flow_name}:{result.flow_version}")
        print(f"Deployment: {result.deployment_name}")

How it works

  1. `generate_dataset` creates synthetic records and logs via both `print()` and `logging`.
  2. `validate_records` splits records into valid/invalid, warning on each rejection.
  3. `summarise_results` computes a pass rate and emits an error log if below threshold.
  4. DAGY intercepts stdout via a thread-local redirect and attaches a root log handler.
  5. All captured output is written to `events.jsonl` for each task run.

Inputs

  • `n` (int, default `12`): number of records to generate.
  • `threshold` (float, default `0.75`): minimum pass rate.

Outputs

  • A summary string with total, valid, invalid counts, and pass rate.
  • Log events written to `~/.dagy/runs/<run_id>/events.jsonl`.

Dependencies

  • `dagy`, `logging` (stdlib), `random` (stdlib), `time` (stdlib)