Back to examples
Real World

Report Generation

Aggregate three data sources and render a markdown report.

When to use

Use this for periodic business reporting. It demonstrates fan-out to independent data sources, fan-in aggregation, and rendering a human-readable artifact.

real-world/report-generation.py
"""
03_report_generation.py: Aggregate three data sources, render a markdown report.

Demonstrates:
- Fan-out to three independent data sources
- Fan-in into a single aggregator
- Rendering a human-readable markdown report as a text artifact
"""

from dagy import LocalArtifact, flow, task


@task
def fetch_sales() -> dict:
    return {
        "region": "EMEA",
        "total": 142_500.0,
        "orders": 320,
        "top_product": "Widget Pro",
    }


@task
def fetch_support() -> dict:
    return {
        "tickets_opened": 87,
        "tickets_closed": 79,
        "avg_resolution_hours": 4.2,
        "csat_score": 4.7,
    }


@task
def fetch_infra() -> dict:
    return {
        "uptime_pct": 99.97,
        "deployments": 12,
        "incidents": 1,
        "p99_latency_ms": 134,
    }


@task
def render_report(sources: list) -> LocalArtifact:
    """Combine all three data sources into a markdown report."""
    sales, support, infra = sources

    lines = [
        "# Weekly Business Report",
        "",
        "## Sales",
        f"- Region: {sales['region']}",
        f"- Total revenue: ${sales['total']:,.2f}",
        f"- Orders: {sales['orders']}",
        f"- Top product: {sales['top_product']}",
        "",
        "## Support",
        f"- Tickets opened / closed: {support['tickets_opened']} / {support['tickets_closed']}",
        f"- Avg resolution: {support['avg_resolution_hours']}h",
        f"- CSAT: {support['csat_score']}/5.0",
        "",
        "## Infrastructure",
        f"- Uptime: {infra['uptime_pct']}%",
        f"- Deployments: {infra['deployments']}",
        f"- Incidents: {infra['incidents']}",
        f"- P99 latency: {infra['p99_latency_ms']}ms",
    ]

    content = "\n".join(lines)
    return LocalArtifact(type="text", value=content, filename="weekly_report.md")


@flow(name="report_generation_flow")
def report_generation_flow() -> None:
    sales = fetch_sales()
    support = fetch_support()
    infra = fetch_infra()

    render_report([sales, support, infra])


if __name__ == "__main__":
    result = report_generation_flow.run_local(max_workers=3)
    print(f"Run ID : {result.run_id}")
    print(f"Status : {result.status}")

How it works

  1. Three independent tasks fetch data: `fetch_sales`, `fetch_support`, `fetch_infra`.
  2. All three run in parallel (`max_workers=3`).
  3. `render_report` receives all three results as a list.
  4. It destructures the list into sales, support, and infra dicts.
  5. A markdown report is assembled line-by-line and returned as a text artifact.

Inputs

  • None; all data is generated internally.

Outputs

  • `weekly_report.md` text artifact with sales, support, and infra sections.

Dependencies

  • `dagy`, `LocalArtifact`