Back to docs
Getting Started

Quickstart

Build and run your first Dagy pipeline in under 5 minutes.

Prerequisites

  • Python 3.10 or later. See Installation to install the SDK and CLI.
  • For remote deployment: run dagy login and ensure AWS credentials are configured.

1. Create a flow

Create a file called my_pipeline.py with a few tasks and a flow:

# my_pipeline.py
from dagy import flow, task


@task
def ingest(source: str) -> list:
    """Simulate reading records from a source."""
    return [{"id": 1, "value": 10}, {"id": 2, "value": 20}]


@task
def transform(records: list) -> list:
    """Double every value."""
    return [{**r, "value": r["value"] * 2} for r in records]


@task
def report(records: list) -> str:
    """Summarise the results."""
    total = sum(r["value"] for r in records)
    return f"Processed {len(records)} records. Total: {total}"


@flow(name="my_pipeline", version="1.0.0")
def my_pipeline(source: str = "local"):
    """Ingest → transform → report."""
    raw = ingest(source)
    cleaned = transform(raw)
    return report(cleaned)

Calling ingest(source) inside the flow body does not run the function immediately. It records a DAG node and returns a TaskOutput placeholder. Passing that placeholder to transform(raw) creates the dependency edge automatically.


2. Run a test execution

Verify the pipeline end-to-end before deploying.

Python:

result = my_pipeline.run_local(source="local")
print(f"Run ID : {result.run_id}")
print(f"Status : {result.status}")

CLI:

dagy run my_pipeline.py:my_pipeline --param source=local

Expected output:

Run completed: my_pipeline-20260304T120000Z-a1b2c3  status=SUCCEEDED

Task artifacts are written to ~/.dagy/artifacts/<run_id>/ and run metadata is persisted in ~/.dagy/dagy.duckdb.

To run tasks in parallel, increase --max-workers:

dagy run my_pipeline.py:my_pipeline --param source=local --max-workers 4

3. Deploy the flow

Once the flow passes local testing, package and deploy it.

Option A: One-step Python deploy

flow.deploy() builds the artifact, uploads it to S3, and registers it with the API. It automatically detects code changes by comparing a code_hash (SHA-256 of the flow spec + source) against the currently deployed version. If nothing changed, the deploy is skipped. Pass force=True to override.

The name (deployment name) defaults to the source file stem (e.g. my_pipeline.pymy_pipeline). The namespace groups related pipelines and defaults to the directory path relative to the git root:

result = my_pipeline.deploy()
# deployment_name → "my_pipeline"  (derived from filename)
# namespace       → "examples/01_basics"  (derived from CWD relative to git root)
print(f"Deployment : {result.deployment_name}")

Pass explicit values to override the defaults:

result = my_pipeline.deploy(
    name="my-pipeline-prod",
    namespace="data/ingestion",
    tags={"team": "data-eng", "env": "prod"},
    flow_kwargs={"source": "s3://my-bucket/data.json"},
    schedule="0 8 * * *",   # run daily at 08:00 UTC
)

The tags dictionary (arbitrary key-value pairs) is stored with the flow registration and is surfaced as filterable chips on the Flows page.

Option B: Two-step CLI deploy

Step 1: build the artifact:

dagy build my_pipeline.py:my_pipeline --output-dir ./dist

This creates ./dist/my_pipeline/<build_id>/artifact.zip containing the serialised flow spec, metadata, manifest, and source file.

Step 2: upload and register:

dagy deploy ./dist/my_pipeline/<build_id>/artifact.zip \
  --deployment my-pipeline \
  --flow-name my_pipeline \
  --flow-version 1.0.0

Expected output:

Deployed to s3://dagy-artifacts-.../dagy/flows/my_pipeline/1.0.0/artifact.zip
and registered as my_pipeline:1.0.0

Add --schedule "0 8 * * *" to attach a cron schedule at deploy time. Add --force to deploy even when the code hasn't changed.


4. List deployed flows

Confirm the flow is registered:

dagy flows list

Example output:

┌──────────────┬──────────────┬──────────────────────────┐
│ flow_name    │ flow_version │ created_at               │
├──────────────┼──────────────┼──────────────────────────┤
│ my_pipeline  │ 1.0.0        │ 2026-03-04T12:00:00Z     │
└──────────────┴──────────────┴──────────────────────────┘

Use --profile staging or --profile prod to query a specific environment.

Namespace, version and tags

Every deployed flow carries three metadata dimensions:

FieldTypePurposeDefault
namespacestrHierarchical grouping (e.g. data/ingestion)CWD path relative to git root
flow_versionstrImmutable version identifier (e.g. 1.0.0)"latest"
tagsDict[str, str]Arbitrary key-value labels (e.g. env=prod){}

In the Flows page, namespace and tag chips appear automatically once flows with those fields are registered. Clicking a chip filters the grid to matching flows, no backend changes needed.

For a deep dive on derivation rules, storage layout, and best practices, see Namespaces, Versioning & Tags.


5. Run a deployed flow

Trigger a run of the deployed flow by its deployment name:

dagy run my-pipeline --param source=s3://my-bucket/data.json

With an explicit environment profile:

dagy run my-pipeline --param source=s3://my-bucket/data.json --profile prod

Expected output:

{'run_id': 'my-pipeline-abc123', 'status': 'QUEUED', ...}

The run is queued and executed asynchronously by the backend. Use dagy runs list or dagy logs (see below) to check progress.


6. View and check logs

List recent local runs and their statuses:

dagy runs list
my_pipeline-20260304-abc123  SUCCEEDED  2026-03-04T12:00:00Z  my_pipeline:1.0.0
my_pipeline-20260303-def456  FAILED     2026-03-03T09:00:00Z  my_pipeline:1.0.0

Inspect a run in detail (tasks, attempts, timings):

dagy runs show my_pipeline-20260304-abc123
Run: my_pipeline-20260304-abc123  status=SUCCEEDED  flow=my_pipeline:1.0.0
Start: 2026-03-04T12:00:00Z  End: 2026-03-04T12:00:03Z
- ingest-1     ingest     status=SUCCEEDED  attempt=1
- transform-1  transform  status=SUCCEEDED  attempt=1
- report-1     report     status=SUCCEEDED  attempt=1

Stream the full run log:

dagy logs my_pipeline-20260304-abc123

Drill into a specific task's log:

dagy logs my_pipeline-20260304-abc123 --task transform-1

Log files live at:

~/.dagy/runs/<run_id>/run.log
~/.dagy/runs/<run_id>/task_runs/<task_run_id>/task.log

7. Pass parameters to a flow

Declaring parameters

Parameters are plain Python function arguments on the @flow function. Type annotations are used for validation and are surfaced in the UI; defaults make a parameter optional.

from dagy import flow, task


@task
def describe_job(name: str, batch_size: int, dry_run: bool, threshold: float) -> str:
    mode = "DRY RUN" if dry_run else "LIVE"
    return f"[{mode}] Job '{name}': batch_size={batch_size}, threshold={threshold}"


@task
def run_job(description: str, dry_run: bool) -> dict:
    if dry_run:
        return {"status": "skipped", "reason": "dry_run=True"}
    return {"status": "completed", "description": description}


@flow(name="parameterised_flow", version="0.1.0", validate_parameters=True)
def parameterised_flow(
    name: str = "nightly-sync",
    batch_size: int = 100,
    dry_run: bool = True,
    threshold: float = 0.95,
) -> None:
    desc = describe_job(name, batch_size, dry_run, threshold)
    run_job(desc, dry_run)

Setting validate_parameters=True (the default) makes Dagy raise a ParameterTypeError at call time if a value does not match its declared type.

Passing parameters: local Python

# All defaults
result = parameterised_flow.run_local()

# Override specific parameters
result = parameterised_flow.run_local(
    name="prod-sync",
    batch_size=500,
    dry_run=False,
    threshold=0.8,
)
print(result.run_id, result.status)

Passing parameters: CLI

Use --param key=value (repeat for each parameter). Dagy coerces values to the declared type automatically.

# Run locally with overrides
dagy run my_pipeline.py:parameterised_flow \
  --param name=prod-sync \
  --param batch_size=500 \
  --param dry_run=false \
  --param threshold=0.8

# Run a deployed flow remotely
dagy run parameterised-flow \
  --param name=prod-sync \
  --param batch_size=500 \
  --param dry_run=false

Passing parameters: Python deploy() / flow_kwargs

Supply parameters at build time through flow_kwargs. These are baked into the artifact and become the default values used when the deployment runs:

result = parameterised_flow.deploy(
    name="parameters-job",
    flow_kwargs={
        "batch_size": 50,
        "dry_run": False,
        "threshold": 0.8,
    },
    schedule="0 6 * * *",   # optional cron
)

Passing parameters: API

curl -X POST https://api.dagy.io/runs \
  -H "Authorization: Bearer $DAGY_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "flow_name": "parameterised_flow",
    "parameters": {
      "name": "prod-sync",
      "batch_size": 500,
      "dry_run": false,
      "threshold": 0.8
    }
  }'

Passing parameters: UI (Run Now dialog)

When a flow has declared parameters, the Run Now dialog in the Flows page renders a labeled input for each one, pre-filled with its default value. Values are coerced to their declared types (int, float, bool, str) before the run is triggered.

  1. Open Flows in the sidebar.
  2. Hover over a flow card and click Run.
  3. Each parameter appears with its name, type badge, and default value filled in.
  4. Edit the values you want to override, then click Trigger run.

!!! tip "Required vs optional" Parameters without a default are marked required. The dialog will send whatever value you enter, so make sure to fill them in before triggering.

Supported types

Python typeCLI stringUI coercion
strkey=helloverbatim
intkey=100int(value)
floatkey=0.95float(value)
boolkey=false"true" / "1" / "True"True, everything else → False
Any / unannotatedkey=valueverbatim string

Next steps

TopicLink
Iterate on flows locallyLocal Development
Namespaces, versioning & tagsNamespaces, Versioning & Tags
Full SDK documentationSDK Overview
Every CLI flagCLI Reference
Retries, timeouts, hooksRetries & Timeouts
Cron and interval schedulingScheduling
More parameter examplesSDK Examples