Back to docs
Python SDK

SDK API Reference

Complete reference for all public classes, decorators, and functions in the Dagy SDK.

Decorators

@task

Wraps a Python function as a Dagy task with orchestration capabilities.

from dagy import task

@task(
    name: Optional[str] = None,
    description: Optional[str] = None,
    executor: Optional[str] = None,
    retries: int = 0,
    retry_delay_seconds: Optional[float | list[float] | Callable] = None,
    retry_jitter_factor: Optional[float] = None,
    timeout_seconds: Optional[float] = None,
    concurrency_limit: Optional[int] = None,
    validate_parameters: bool = True,
    on_running: Optional[list[Hook]] = None,
    on_completion: Optional[list[Hook]] = None,
    on_failure: Optional[list[Hook]] = None,
    on_retry: Optional[list[Hook]] = None,
    retry_condition_fn: Optional[Callable[[Exception], bool]] = None,
)

Parameters:

ParameterTypeDefaultDescription
namestrFunction nameDisplay name for the task
descriptionstrDocstringHuman-readable description
executorstrNonePreferred backend: "lambda", "step-functions", or "ecs"
retriesint0Maximum retry attempts on failure
retry_delay_secondsfloat | list | CallableNoneDelay between retries. Float for fixed, list for per-attempt, callable for dynamic
retry_jitter_factorfloatNoneRandom jitter multiplier (0.0–1.0) added to retry delay
timeout_secondsfloatNoneHard timeout enforced via ThreadPoolExecutor
concurrency_limitintNoneMaximum concurrent instances of this task
validate_parametersboolTrueValidate parameter types before execution
on_runninglist[Hook]NoneHooks invoked when task starts running
on_completionlist[Hook]NoneHooks invoked on successful completion
on_failurelist[Hook]NoneHooks invoked on failure (after all retries)
on_retrylist[Hook]NoneHooks invoked before each retry attempt
retry_condition_fnCallable[[Exception], bool]NoneFunction that decides whether to retry based on the exception

Usage:

# Bare decorator (no arguments)
@task
def simple_task(x: int) -> int:
    return x * 2

# With configuration
@task(retries=3, retry_delay_seconds=[1, 5, 30], timeout_seconds=120)
def robust_task(url: str) -> dict:
    ...

# With conditional retry
@task(
    retries=5,
    retry_condition_fn=lambda e: isinstance(e, (ConnectionError, TimeoutError))
)
def api_call(endpoint: str) -> dict:
    ...

# With jitter
@task(retries=3, retry_delay_seconds=2.0, retry_jitter_factor=0.5)
def jittered_task():
    ...

@flow

Wraps a Python function as a Dagy flow (DAG).

from dagy import flow

@flow(
    name: Optional[str] = None,
    description: Optional[str] = None,
    retries: int = 0,
    retry_delay_seconds: Optional[float | list[float] | Callable] = None,
    retry_jitter_factor: Optional[float] = None,
    validate_parameters: bool = True,
    on_running: Optional[list[Hook]] = None,
    on_completion: Optional[list[Hook]] = None,
    on_failure: Optional[list[Hook]] = None,
    on_retry: Optional[list[Hook]] = None,
    retry_condition_fn: Optional[Callable[[Exception], bool]] = None,
)

Parameters:

ParameterTypeDefaultDescription
namestrFunction nameFlow identifier (used in API, CLI, and UI)
descriptionstrDocstringHuman-readable description
retriesint0Flow-level retry attempts
retry_delay_secondsfloat | list | CallableNoneDelay between flow retries
retry_jitter_factorfloatNoneJitter for flow retry delays
validate_parametersboolTrueValidate flow parameters
on_runninglist[Hook]NoneHooks when flow starts
on_completionlist[Hook]NoneHooks on flow success
on_failurelist[Hook]NoneHooks on flow failure
on_retrylist[Hook]NoneHooks before flow retry
retry_condition_fnCallableNoneConditional flow retry

Usage:

@flow(name="daily_etl", description="Daily data pipeline")
def daily_etl(date: str, source: str):
    data = extract(source, date)
    cleaned = transform(data)
    load(cleaned)

Note: The @flow decorator does not accept a version parameter. Flow versions are assigned at deploy time via flow.deploy() or the CLI --flow-version flag. The build() method accepts an optional version argument (default: "1").

Core Classes

Task

Represents a decorated task function.

Properties:

PropertyTypeDescription
fnCallableThe wrapped function
namestrTask name
descriptionOptional[str]Task description
executorOptional[str]Preferred executor
retriesintMax retry count
retry_delay_secondsVariousRetry delay config
retry_jitter_factorOptional[float]Jitter factor
timeout_secondsOptional[float]Hard timeout
concurrency_limitOptional[int]Max concurrency

Methods:

MethodDescription
__call__(*args, **kwargs)Execute task (local) or register in flow context (returns TaskOutput)

Flow

Represents a decorated flow function.

Properties:

PropertyTypeDescription
fnCallableThe wrapped function
namestrFlow name
descriptionOptional[str]Flow description

Methods:

MethodDescription
build(*args, **kwargs)Build the FlowSpec DAG without executing
run_local(*args, fail_fast=True, max_workers=1, **kwargs)Execute locally with metadata tracking
run(*args, **kwargs)Execute via API if configured, otherwise falls back to run_local()
deploy(name=None, *, bucket=None, api_url=None, schedule=None, default_executor=None, execution_mode="nano", tags=None, flow_kwargs=None, force=False)Build artifact, upload to S3, and register with the API. Returns _DeployResult (remote) with .flow_name, .flow_version, .deployment_name, .skipped, or LocalRunContext (local fallback). Compares code_hash against the deployed version and skips upload if unchanged (unless force=True). execution_mode defaults to "nano"; valid values are nano, micro, small, medium, large, xlarge.

FlowSpec

Serialized DAG specification. Created by Flow.build().

@dataclass(frozen=True)
class FlowSpec:
    name: str                          # Flow name
    version: str                       # Version string (default: "1")
    created_at: str                    # ISO timestamp
    tasks: Dict[str, TaskSpec]         # {task_name: spec}
    task_runs: List[TaskRunSpec]       # Execution plan
    edges: List[List[str]]            # [[source, target], ...]
    outputs: Any                       # Serialized outputs
    parameters: List[Dict[str, Any]]   # Flow parameter definitions

TaskSpec

Definition of a task within a FlowSpec.

@dataclass(frozen=True)
class TaskSpec:
    name: str                          # Task name
    import_path: str                   # Python module:function
    description: Optional[str]         # Description
    default_executor: Optional[str]    # Preferred executor
    retries: int                       # Retry count
    retry_delay_seconds: Optional[Any] # Retry delay config
    timeout_seconds: Optional[float]   # Hard timeout
    concurrency_limit: Optional[int]   # Max concurrency

TaskRunSpec

A specific execution instance of a task within a flow run.

@dataclass(frozen=True)
class TaskRunSpec:
    id: str                            # Unique run ID
    task_name: str                     # Reference to TaskSpec
    depends_on: List[str]              # Task IDs this depends on
    args: List[Any]                    # Positional arguments
    kwargs: Dict[str, Any]            # Keyword arguments
    executor: Optional[str]            # Executor override

TaskOutput

Reference to a task's output, used for dependency tracking.

class TaskOutput:
    task_run_id: str                   # Unique task run identifier

RunContext

Runtime context available during task execution.

class RunContext:
    kind: str              # "task" or "flow"
    name: str              # Task/flow name
    attempt: int           # Current attempt number (1-indexed)
    max_retries: int       # Max retry count
    parameters: Dict       # Runtime parameters

State

Task or flow execution state.

class State:
    type: str              # PENDING, RUNNING, SUCCEEDED, FAILED, SKIPPED, CANCELLED
    message: Optional[str] # Human-readable message

State factories:

from dagy import running, completed, failed

state = running()               # State(type="RUNNING")
state = completed()             # State(type="SUCCEEDED")
state = failed("timeout")       # State(type="FAILED", message="timeout")

Hook

Base class for lifecycle hooks. Extend this to create custom hooks.

from dagy import Hook

class SlackNotifyHook(Hook):
    def __call__(self, context: RunContext, state: State):
        # Send Slack notification
        ...

Deployment Functions

build_artifact(flow) -> LocalArtifact

Packages a flow and its dependencies into a ZIP artifact for deployment. The artifact's metadata.json includes a code_hash: a SHA-256 digest of the canonical flow spec and source code, used for change detection during deployment.

from dagy import build_artifact

artifact = build_artifact(my_flow)
print(artifact.path)  # Path to ZIP file

deploy_artifact(artifact, client)

Uploads a built artifact to S3 and registers the flow with the API.

from dagy import DagyClient, deploy_artifact

client = DagyClient(base_url="https://api.dagy.io", token="<token>")
deploy_artifact(artifact, client)

Updating Deployment Settings

After deployment, you can update runtime settings (execution mode, schedule, executor, tags, dependency packages) without redeploying the artifact using the PUT /deployments/{name}/settings API endpoint:

import requests

resp = requests.put(
    "https://api.dagy.io/v1/deployments/daily-etl/settings",
    headers={"Authorization": f"Bearer {token}"},
    json={
        "execution_mode": "micro",
        "schedule": "0 9 * * 1-5",
    },
)
updated = resp.json()

These settings can also be updated from the web UI via the Flow Settings dialog on the Flows page. See API endpoints for the full field reference.

DagyClient

HTTP client for the Dagy API.

from dagy import DagyClient

client = DagyClient(
    base_url="https://api.dagy.io",
    token="<access_token_or_api_key>"
)

Local Execution

Flow.run_local()

Execute a flow locally for development and testing. Uses a ThreadPoolExecutor and tracks metadata in a local DuckDB database.

result = my_flow.run_local(
    param1="value",
    fail_fast=True,      # Stop on first failure
    max_workers=1,       # Parallel task workers
)

Parameters:

ParameterTypeDefaultDescription
fail_fastboolTrueStop execution on first task failure
max_workersint1Number of parallel workers
cleanup_keep_lastint20Keep last N local run records

CLI Commands

dagy build <module.py>         # Build artifact
dagy deploy <module.py>        # Build + deploy
dagy runs list                 # List recent runs
dagy runs show <run_id>        # Show run details
dagy flows list                # List deployed flows
dagy config                    # Configure CLI profiles