Back to examples
Basics
Parameters
Typed flow parameters with defaults and validation.
When to use
Use this when your pipeline needs configurable inputs. DAGY introspects the function signature to build a parameter schema automatically.
basics/parameters.py
"""
03_parameters.py: Typed flow parameters with defaults.
Parameters are declared as typed Python function arguments. No separate
Parameter class or schema file is needed. DAGY introspects the signature
to build the parameter schema automatically.
Rules:
- Arguments WITH defaults → optional parameters (caller may omit them)
- Arguments WITHOUT defaults → required parameters (caller must supply them)
- validate_parameters=True → types are enforced at call time; passing a
string where an int is expected raises TypeError
Three calling modes are shown in __main__:
1. run_local() - execute locally with all defaults
2. run_local(...) - execute locally with explicit overrides
3. deploy(...) - register with the DAGY backend, baking in build-time values
"""
from dagy import flow, task
from dagy.exceptions import ParameterTypeError
@task
def describe_job(name: str, batch_size: int, dry_run: bool, threshold: float) -> str:
mode = "DRY RUN" if dry_run else "LIVE"
print(f"Describing job: name={name}, batch_size={batch_size}, dry_run={dry_run}, threshold={threshold}")
return (
f"[{mode}] Job '{name}': batch_size={batch_size}, threshold={threshold}"
)
@task
def run_job(description: str, dry_run: bool) -> dict:
print(f"Running job: description={description}, dry_run={dry_run}")
if dry_run:
return {"status": "skipped", "reason": "dry_run=True"}
return {"status": "completed", "description": description}
@flow(name="parameterised_flow", validate_parameters=True)
def parameterised_flow(
name: str = "nightly-sync",
batch_size: int = 100,
dry_run: bool = True,
threshold: float = 0.95,
) -> None:
print(f"Parameters: name={name}, batch_size={batch_size}, dry_run={dry_run}, threshold={threshold}")
desc = describe_job(name, batch_size, dry_run, threshold)
run_job(desc, dry_run)
if __name__ == "__main__":
result = parameterised_flow.deploy(
name="parameters-job",
flow_kwargs={ # baked-in values; caller can still override at runtime
"batch_size": 50,
"dry_run": False,
"threshold": 0.8,
},
)
print(result)How it works
- Declare typed arguments with defaults on the `@flow` function; these become optional parameters.
- Set `validate_parameters=True` on `@flow` to enforce types at call time.
- `describe_job` receives the parameter values and builds a description string.
- `run_job` conditionally executes based on `dry_run`.
- Use `flow_kwargs` in `deploy()` to bake in default overrides for production.
Inputs
- `name` (str, default `"nightly-sync"`): job name.
- `batch_size` (int, default `100`): records per batch.
- `dry_run` (bool, default `True`): skip execution if true.
- `threshold` (float, default `0.95`): quality threshold.
Outputs
- A dict with status and description of the job result.
Dependencies
`dagy`