Back to examples
Real World
Report Generation
Aggregate three data sources and render a markdown report.
When to use
Use this for periodic business reporting. It demonstrates fan-out to independent data sources, fan-in aggregation, and rendering a human-readable artifact.
real-world/report-generation.py
"""
03_report_generation.py: Aggregate three data sources, render a markdown report.
Demonstrates:
- Fan-out to three independent data sources
- Fan-in into a single aggregator
- Rendering a human-readable markdown report as a text artifact
"""
from dagy import LocalArtifact, flow, task
@task
def fetch_sales() -> dict:
return {
"region": "EMEA",
"total": 142_500.0,
"orders": 320,
"top_product": "Widget Pro",
}
@task
def fetch_support() -> dict:
return {
"tickets_opened": 87,
"tickets_closed": 79,
"avg_resolution_hours": 4.2,
"csat_score": 4.7,
}
@task
def fetch_infra() -> dict:
return {
"uptime_pct": 99.97,
"deployments": 12,
"incidents": 1,
"p99_latency_ms": 134,
}
@task
def render_report(sources: list) -> LocalArtifact:
"""Combine all three data sources into a markdown report."""
sales, support, infra = sources
lines = [
"# Weekly Business Report",
"",
"## Sales",
f"- Region: {sales['region']}",
f"- Total revenue: ${sales['total']:,.2f}",
f"- Orders: {sales['orders']}",
f"- Top product: {sales['top_product']}",
"",
"## Support",
f"- Tickets opened / closed: {support['tickets_opened']} / {support['tickets_closed']}",
f"- Avg resolution: {support['avg_resolution_hours']}h",
f"- CSAT: {support['csat_score']}/5.0",
"",
"## Infrastructure",
f"- Uptime: {infra['uptime_pct']}%",
f"- Deployments: {infra['deployments']}",
f"- Incidents: {infra['incidents']}",
f"- P99 latency: {infra['p99_latency_ms']}ms",
]
content = "\n".join(lines)
return LocalArtifact(type="text", value=content, filename="weekly_report.md")
@flow(name="report_generation_flow")
def report_generation_flow() -> None:
sales = fetch_sales()
support = fetch_support()
infra = fetch_infra()
render_report([sales, support, infra])
if __name__ == "__main__":
result = report_generation_flow.run_local(max_workers=3)
print(f"Run ID : {result.run_id}")
print(f"Status : {result.status}")How it works
- Three independent tasks fetch data: `fetch_sales`, `fetch_support`, `fetch_infra`.
- All three run in parallel (`max_workers=3`).
- `render_report` receives all three results as a list.
- It destructures the list into sales, support, and infra dicts.
- A markdown report is assembled line-by-line and returned as a text artifact.
Inputs
- None; all data is generated internally.
Outputs
- `weekly_report.md` text artifact with sales, support, and infra sections.
Dependencies
`dagy`, `LocalArtifact`