Back to examples
Real World
Data Validation
Schema validation with conditional retry on bad upstream data.
When to use
Use this when consuming data from unreliable upstream sources. The pattern validates schemas, retries on validation errors, and produces a normalised output.
real-world/data-validation.py
"""
02_data_validation.py: Schema validation with retry on bad data.
Demonstrates:
- Fetch → validate schema → transform → report pattern
- Raising on schema violations
- retry_condition_fn to only retry on validation errors
- Producing a structured JSON artifact report
"""
import json
from dagy import LocalArtifact, flow, task
_fetch_calls = 0
# Simulated upstream that returns bad data on first call, good data after.
_PAYLOADS = [
# Call 1: missing required field "timestamp"
[{"id": 1, "value": 10}, {"id": 2, "value": 20}],
# Call 2: valid
[
{"id": 1, "value": 10, "timestamp": "2026-03-01T00:00:00Z"},
{"id": 2, "value": 20, "timestamp": "2026-03-01T01:00:00Z"},
{"id": 3, "value": 30, "timestamp": "2026-03-01T02:00:00Z"},
],
]
def is_validation_error(exc: Exception) -> bool:
return isinstance(exc, ValueError)
@task(retries=2, retry_delay_seconds=0.02, retry_condition_fn=is_validation_error)
def fetch_and_validate() -> list:
"""Fetches data and raises ValueError if the schema is wrong."""
global _fetch_calls
payload = _PAYLOADS[min(_fetch_calls, len(_PAYLOADS) - 1)]
_fetch_calls += 1
required_fields = {"id", "value", "timestamp"}
for record in payload:
missing = required_fields - set(record.keys())
if missing:
raise ValueError(f"Record {record} missing fields: {missing}")
return payload
@task
def transform_records(records: list) -> list:
"""Normalise: add 'value_normalised' as value / max_value."""
max_val = max(r["value"] for r in records)
return [
{**r, "value_normalised": round(r["value"] / max_val, 4)}
for r in records
]
@task
def build_report(records: list) -> LocalArtifact:
report = {
"record_count": len(records),
"value_range": {
"min": min(r["value"] for r in records),
"max": max(r["value"] for r in records),
},
"records": records,
}
return LocalArtifact(
type="json",
value=json.dumps(report, indent=2),
filename="validation_report.json",
)
@flow(name="data_validation_flow")
def data_validation_flow() -> None:
records = fetch_and_validate()
transformed = transform_records(records)
build_report(transformed)
if __name__ == "__main__":
result = data_validation_flow.run_local()
print(f"Run ID : {result.run_id}")
print(f"Status : {result.status}")How it works
- `fetch_and_validate` fetches data and checks for required fields. On the first call it gets bad data (missing `timestamp`), raises `ValueError`.
- `retry_condition_fn=is_validation_error` ensures only `ValueError` triggers a retry.
- On retry, the function gets valid data and succeeds.
- `transform_records` normalises values relative to the maximum.
- `build_report` produces a JSON artifact with the validated, transformed records.
Inputs
- None; data is simulated internally.
Outputs
- `validation_report.json` artifact with record count, value range, and normalised records.
Dependencies
`dagy`, `LocalArtifact`, `json` (stdlib)