Quickstart
Build and run your first Dagy pipeline in under 5 minutes.
Prerequisites
- Python 3.10 or later. See Installation to install the SDK and CLI.
- For remote deployment: run
dagy loginand ensure AWS credentials are configured.
1. Create a flow
Create a file called my_pipeline.py with a few tasks and a flow:
# my_pipeline.py
from dagy import flow, task
@task
def ingest(source: str) -> list:
"""Simulate reading records from a source."""
return [{"id": 1, "value": 10}, {"id": 2, "value": 20}]
@task
def transform(records: list) -> list:
"""Double every value."""
return [{**r, "value": r["value"] * 2} for r in records]
@task
def report(records: list) -> str:
"""Summarise the results."""
total = sum(r["value"] for r in records)
return f"Processed {len(records)} records. Total: {total}"
@flow(name="my_pipeline", version="1.0.0")
def my_pipeline(source: str = "local"):
"""Ingest → transform → report."""
raw = ingest(source)
cleaned = transform(raw)
return report(cleaned)
Calling ingest(source) inside the flow body does not run the function immediately. It records a DAG node and returns a TaskOutput placeholder. Passing that placeholder to transform(raw) creates the dependency edge automatically.
2. Run a test execution
Verify the pipeline end-to-end before deploying.
Python:
result = my_pipeline.run_local(source="local")
print(f"Run ID : {result.run_id}")
print(f"Status : {result.status}")
CLI:
dagy run my_pipeline.py:my_pipeline --param source=local
Expected output:
Run completed: my_pipeline-20260304T120000Z-a1b2c3 status=SUCCEEDED
Task artifacts are written to ~/.dagy/artifacts/<run_id>/ and run metadata is persisted in ~/.dagy/dagy.duckdb.
To run tasks in parallel, increase --max-workers:
dagy run my_pipeline.py:my_pipeline --param source=local --max-workers 4
3. Deploy the flow
Once the flow passes local testing, package and deploy it.
Option A: One-step Python deploy
flow.deploy() builds the artifact, uploads it to S3, and registers it with the API. It automatically detects code changes by comparing a code_hash (SHA-256 of the flow spec + source) against the currently deployed version. If nothing changed, the deploy is skipped. Pass force=True to override.
The name (deployment name) defaults to the source file stem (e.g. my_pipeline.py → my_pipeline). The namespace groups related pipelines and defaults to the directory path relative to the git root:
result = my_pipeline.deploy()
# deployment_name → "my_pipeline" (derived from filename)
# namespace → "examples/01_basics" (derived from CWD relative to git root)
print(f"Deployment : {result.deployment_name}")
Pass explicit values to override the defaults:
result = my_pipeline.deploy(
name="my-pipeline-prod",
namespace="data/ingestion",
tags={"team": "data-eng", "env": "prod"},
flow_kwargs={"source": "s3://my-bucket/data.json"},
schedule="0 8 * * *", # run daily at 08:00 UTC
)
The tags dictionary (arbitrary key-value pairs) is stored with the flow registration and is surfaced as filterable chips on the Flows page.
Option B: Two-step CLI deploy
Step 1: build the artifact:
dagy build my_pipeline.py:my_pipeline --output-dir ./dist
This creates ./dist/my_pipeline/<build_id>/artifact.zip containing the serialised flow spec, metadata, manifest, and source file.
Step 2: upload and register:
dagy deploy ./dist/my_pipeline/<build_id>/artifact.zip \
--deployment my-pipeline \
--flow-name my_pipeline \
--flow-version 1.0.0
Expected output:
Deployed to s3://dagy-artifacts-.../dagy/flows/my_pipeline/1.0.0/artifact.zip
and registered as my_pipeline:1.0.0
Add --schedule "0 8 * * *" to attach a cron schedule at deploy time. Add --force to deploy even when the code hasn't changed.
4. List deployed flows
Confirm the flow is registered:
dagy flows list
Example output:
┌──────────────┬──────────────┬──────────────────────────┐
│ flow_name │ flow_version │ created_at │
├──────────────┼──────────────┼──────────────────────────┤
│ my_pipeline │ 1.0.0 │ 2026-03-04T12:00:00Z │
└──────────────┴──────────────┴──────────────────────────┘
Use --profile staging or --profile prod to query a specific environment.
Namespace, version and tags
Every deployed flow carries three metadata dimensions:
| Field | Type | Purpose | Default |
|---|---|---|---|
namespace | str | Hierarchical grouping (e.g. data/ingestion) | CWD path relative to git root |
flow_version | str | Immutable version identifier (e.g. 1.0.0) | "latest" |
tags | Dict[str, str] | Arbitrary key-value labels (e.g. env=prod) | {} |
In the Flows page, namespace and tag chips appear automatically once flows with those fields are registered. Clicking a chip filters the grid to matching flows, no backend changes needed.
For a deep dive on derivation rules, storage layout, and best practices, see Namespaces, Versioning & Tags.
5. Run a deployed flow
Trigger a run of the deployed flow by its deployment name:
dagy run my-pipeline --param source=s3://my-bucket/data.json
With an explicit environment profile:
dagy run my-pipeline --param source=s3://my-bucket/data.json --profile prod
Expected output:
{'run_id': 'my-pipeline-abc123', 'status': 'QUEUED', ...}
The run is queued and executed asynchronously by the backend. Use dagy runs list or dagy logs (see below) to check progress.
6. View and check logs
List recent local runs and their statuses:
dagy runs list
my_pipeline-20260304-abc123 SUCCEEDED 2026-03-04T12:00:00Z my_pipeline:1.0.0
my_pipeline-20260303-def456 FAILED 2026-03-03T09:00:00Z my_pipeline:1.0.0
Inspect a run in detail (tasks, attempts, timings):
dagy runs show my_pipeline-20260304-abc123
Run: my_pipeline-20260304-abc123 status=SUCCEEDED flow=my_pipeline:1.0.0
Start: 2026-03-04T12:00:00Z End: 2026-03-04T12:00:03Z
- ingest-1 ingest status=SUCCEEDED attempt=1
- transform-1 transform status=SUCCEEDED attempt=1
- report-1 report status=SUCCEEDED attempt=1
Stream the full run log:
dagy logs my_pipeline-20260304-abc123
Drill into a specific task's log:
dagy logs my_pipeline-20260304-abc123 --task transform-1
Log files live at:
~/.dagy/runs/<run_id>/run.log
~/.dagy/runs/<run_id>/task_runs/<task_run_id>/task.log
7. Pass parameters to a flow
Declaring parameters
Parameters are plain Python function arguments on the @flow function. Type annotations are used for validation and are surfaced in the UI; defaults make a parameter optional.
from dagy import flow, task
@task
def describe_job(name: str, batch_size: int, dry_run: bool, threshold: float) -> str:
mode = "DRY RUN" if dry_run else "LIVE"
return f"[{mode}] Job '{name}': batch_size={batch_size}, threshold={threshold}"
@task
def run_job(description: str, dry_run: bool) -> dict:
if dry_run:
return {"status": "skipped", "reason": "dry_run=True"}
return {"status": "completed", "description": description}
@flow(name="parameterised_flow", version="0.1.0", validate_parameters=True)
def parameterised_flow(
name: str = "nightly-sync",
batch_size: int = 100,
dry_run: bool = True,
threshold: float = 0.95,
) -> None:
desc = describe_job(name, batch_size, dry_run, threshold)
run_job(desc, dry_run)
Setting validate_parameters=True (the default) makes Dagy raise a ParameterTypeError at call time if a value does not match its declared type.
Passing parameters: local Python
# All defaults
result = parameterised_flow.run_local()
# Override specific parameters
result = parameterised_flow.run_local(
name="prod-sync",
batch_size=500,
dry_run=False,
threshold=0.8,
)
print(result.run_id, result.status)
Passing parameters: CLI
Use --param key=value (repeat for each parameter). Dagy coerces values to the declared type automatically.
# Run locally with overrides
dagy run my_pipeline.py:parameterised_flow \
--param name=prod-sync \
--param batch_size=500 \
--param dry_run=false \
--param threshold=0.8
# Run a deployed flow remotely
dagy run parameterised-flow \
--param name=prod-sync \
--param batch_size=500 \
--param dry_run=false
Passing parameters: Python deploy() / flow_kwargs
Supply parameters at build time through flow_kwargs. These are baked into the artifact and become the default values used when the deployment runs:
result = parameterised_flow.deploy(
name="parameters-job",
flow_kwargs={
"batch_size": 50,
"dry_run": False,
"threshold": 0.8,
},
schedule="0 6 * * *", # optional cron
)
Passing parameters: API
curl -X POST https://api.dagy.io/runs \
-H "Authorization: Bearer $DAGY_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"flow_name": "parameterised_flow",
"parameters": {
"name": "prod-sync",
"batch_size": 500,
"dry_run": false,
"threshold": 0.8
}
}'
Passing parameters: UI (Run Now dialog)
When a flow has declared parameters, the Run Now dialog in the Flows page renders a labeled input for each one, pre-filled with its default value. Values are coerced to their declared types (int, float, bool, str) before the run is triggered.
- Open Flows in the sidebar.
- Hover over a flow card and click Run.
- Each parameter appears with its name, type badge, and default value filled in.
- Edit the values you want to override, then click Trigger run.
!!! tip "Required vs optional" Parameters without a default are marked required. The dialog will send whatever value you enter, so make sure to fill them in before triggering.
Supported types
| Python type | CLI string | UI coercion |
|---|---|---|
str | key=hello | verbatim |
int | key=100 | int(value) |
float | key=0.95 | float(value) |
bool | key=false | "true" / "1" / "True" → True, everything else → False |
Any / unannotated | key=value | verbatim string |
Next steps
| Topic | Link |
|---|---|
| Iterate on flows locally | Local Development |
| Namespaces, versioning & tags | Namespaces, Versioning & Tags |
| Full SDK documentation | SDK Overview |
| Every CLI flag | CLI Reference |
| Retries, timeouts, hooks | Retries & Timeouts |
| Cron and interval scheduling | Scheduling |
| More parameter examples | SDK Examples |