SDK API Reference
Complete reference for all public classes, decorators, and functions in the Dagy SDK.
Decorators
@task
Wraps a Python function as a Dagy task with orchestration capabilities.
from dagy import task
@task(
name: Optional[str] = None,
description: Optional[str] = None,
executor: Optional[str] = None,
retries: int = 0,
retry_delay_seconds: Optional[float | list[float] | Callable] = None,
retry_jitter_factor: Optional[float] = None,
timeout_seconds: Optional[float] = None,
concurrency_limit: Optional[int] = None,
validate_parameters: bool = True,
on_running: Optional[list[Hook]] = None,
on_completion: Optional[list[Hook]] = None,
on_failure: Optional[list[Hook]] = None,
on_retry: Optional[list[Hook]] = None,
retry_condition_fn: Optional[Callable[[Exception], bool]] = None,
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | Function name | Display name for the task |
description | str | Docstring | Human-readable description |
executor | str | None | Preferred backend: "lambda", "step-functions", or "ecs" |
retries | int | 0 | Maximum retry attempts on failure |
retry_delay_seconds | float | list | Callable | None | Delay between retries. Float for fixed, list for per-attempt, callable for dynamic |
retry_jitter_factor | float | None | Random jitter multiplier (0.0–1.0) added to retry delay |
timeout_seconds | float | None | Hard timeout enforced via ThreadPoolExecutor |
concurrency_limit | int | None | Maximum concurrent instances of this task |
validate_parameters | bool | True | Validate parameter types before execution |
on_running | list[Hook] | None | Hooks invoked when task starts running |
on_completion | list[Hook] | None | Hooks invoked on successful completion |
on_failure | list[Hook] | None | Hooks invoked on failure (after all retries) |
on_retry | list[Hook] | None | Hooks invoked before each retry attempt |
retry_condition_fn | Callable[[Exception], bool] | None | Function that decides whether to retry based on the exception |
Usage:
# Bare decorator (no arguments)
@task
def simple_task(x: int) -> int:
return x * 2
# With configuration
@task(retries=3, retry_delay_seconds=[1, 5, 30], timeout_seconds=120)
def robust_task(url: str) -> dict:
...
# With conditional retry
@task(
retries=5,
retry_condition_fn=lambda e: isinstance(e, (ConnectionError, TimeoutError))
)
def api_call(endpoint: str) -> dict:
...
# With jitter
@task(retries=3, retry_delay_seconds=2.0, retry_jitter_factor=0.5)
def jittered_task():
...
@flow
Wraps a Python function as a Dagy flow (DAG).
from dagy import flow
@flow(
name: Optional[str] = None,
description: Optional[str] = None,
retries: int = 0,
retry_delay_seconds: Optional[float | list[float] | Callable] = None,
retry_jitter_factor: Optional[float] = None,
validate_parameters: bool = True,
on_running: Optional[list[Hook]] = None,
on_completion: Optional[list[Hook]] = None,
on_failure: Optional[list[Hook]] = None,
on_retry: Optional[list[Hook]] = None,
retry_condition_fn: Optional[Callable[[Exception], bool]] = None,
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | Function name | Flow identifier (used in API, CLI, and UI) |
description | str | Docstring | Human-readable description |
retries | int | 0 | Flow-level retry attempts |
retry_delay_seconds | float | list | Callable | None | Delay between flow retries |
retry_jitter_factor | float | None | Jitter for flow retry delays |
validate_parameters | bool | True | Validate flow parameters |
on_running | list[Hook] | None | Hooks when flow starts |
on_completion | list[Hook] | None | Hooks on flow success |
on_failure | list[Hook] | None | Hooks on flow failure |
on_retry | list[Hook] | None | Hooks before flow retry |
retry_condition_fn | Callable | None | Conditional flow retry |
Usage:
@flow(name="daily_etl", description="Daily data pipeline")
def daily_etl(date: str, source: str):
data = extract(source, date)
cleaned = transform(data)
load(cleaned)
Note: The
@flowdecorator does not accept aversionparameter. Flow versions are assigned at deploy time viaflow.deploy()or the CLI--flow-versionflag. Thebuild()method accepts an optionalversionargument (default:"1").
Core Classes
Task
Represents a decorated task function.
Properties:
| Property | Type | Description |
|---|---|---|
fn | Callable | The wrapped function |
name | str | Task name |
description | Optional[str] | Task description |
executor | Optional[str] | Preferred executor |
retries | int | Max retry count |
retry_delay_seconds | Various | Retry delay config |
retry_jitter_factor | Optional[float] | Jitter factor |
timeout_seconds | Optional[float] | Hard timeout |
concurrency_limit | Optional[int] | Max concurrency |
Methods:
| Method | Description |
|---|---|
__call__(*args, **kwargs) | Execute task (local) or register in flow context (returns TaskOutput) |
Flow
Represents a decorated flow function.
Properties:
| Property | Type | Description |
|---|---|---|
fn | Callable | The wrapped function |
name | str | Flow name |
description | Optional[str] | Flow description |
Methods:
| Method | Description |
|---|---|
build(*args, **kwargs) | Build the FlowSpec DAG without executing |
run_local(*args, fail_fast=True, max_workers=1, **kwargs) | Execute locally with metadata tracking |
run(*args, **kwargs) | Execute via API if configured, otherwise falls back to run_local() |
deploy(name=None, *, bucket=None, api_url=None, schedule=None, default_executor=None, execution_mode="nano", tags=None, flow_kwargs=None, force=False) | Build artifact, upload to S3, and register with the API. Returns _DeployResult (remote) with .flow_name, .flow_version, .deployment_name, .skipped, or LocalRunContext (local fallback). Compares code_hash against the deployed version and skips upload if unchanged (unless force=True). execution_mode defaults to "nano"; valid values are nano, micro, small, medium, large, xlarge. |
FlowSpec
Serialized DAG specification. Created by Flow.build().
@dataclass(frozen=True)
class FlowSpec:
name: str # Flow name
version: str # Version string (default: "1")
created_at: str # ISO timestamp
tasks: Dict[str, TaskSpec] # {task_name: spec}
task_runs: List[TaskRunSpec] # Execution plan
edges: List[List[str]] # [[source, target], ...]
outputs: Any # Serialized outputs
parameters: List[Dict[str, Any]] # Flow parameter definitions
TaskSpec
Definition of a task within a FlowSpec.
@dataclass(frozen=True)
class TaskSpec:
name: str # Task name
import_path: str # Python module:function
description: Optional[str] # Description
default_executor: Optional[str] # Preferred executor
retries: int # Retry count
retry_delay_seconds: Optional[Any] # Retry delay config
timeout_seconds: Optional[float] # Hard timeout
concurrency_limit: Optional[int] # Max concurrency
TaskRunSpec
A specific execution instance of a task within a flow run.
@dataclass(frozen=True)
class TaskRunSpec:
id: str # Unique run ID
task_name: str # Reference to TaskSpec
depends_on: List[str] # Task IDs this depends on
args: List[Any] # Positional arguments
kwargs: Dict[str, Any] # Keyword arguments
executor: Optional[str] # Executor override
TaskOutput
Reference to a task's output, used for dependency tracking.
class TaskOutput:
task_run_id: str # Unique task run identifier
RunContext
Runtime context available during task execution.
class RunContext:
kind: str # "task" or "flow"
name: str # Task/flow name
attempt: int # Current attempt number (1-indexed)
max_retries: int # Max retry count
parameters: Dict # Runtime parameters
State
Task or flow execution state.
class State:
type: str # PENDING, RUNNING, SUCCEEDED, FAILED, SKIPPED, CANCELLED
message: Optional[str] # Human-readable message
State factories:
from dagy import running, completed, failed
state = running() # State(type="RUNNING")
state = completed() # State(type="SUCCEEDED")
state = failed("timeout") # State(type="FAILED", message="timeout")
Hook
Base class for lifecycle hooks. Extend this to create custom hooks.
from dagy import Hook
class SlackNotifyHook(Hook):
def __call__(self, context: RunContext, state: State):
# Send Slack notification
...
Deployment Functions
build_artifact(flow) -> LocalArtifact
Packages a flow and its dependencies into a ZIP artifact for deployment. The artifact's metadata.json includes a code_hash: a SHA-256 digest of the canonical flow spec and source code, used for change detection during deployment.
from dagy import build_artifact
artifact = build_artifact(my_flow)
print(artifact.path) # Path to ZIP file
deploy_artifact(artifact, client)
Uploads a built artifact to S3 and registers the flow with the API.
from dagy import DagyClient, deploy_artifact
client = DagyClient(base_url="https://api.dagy.io", token="<token>")
deploy_artifact(artifact, client)
Updating Deployment Settings
After deployment, you can update runtime settings (execution mode, schedule, executor, tags, dependency packages) without redeploying the artifact using the PUT /deployments/{name}/settings API endpoint:
import requests
resp = requests.put(
"https://api.dagy.io/v1/deployments/daily-etl/settings",
headers={"Authorization": f"Bearer {token}"},
json={
"execution_mode": "micro",
"schedule": "0 9 * * 1-5",
},
)
updated = resp.json()
These settings can also be updated from the web UI via the Flow Settings dialog on the Flows page. See API endpoints for the full field reference.
DagyClient
HTTP client for the Dagy API.
from dagy import DagyClient
client = DagyClient(
base_url="https://api.dagy.io",
token="<access_token_or_api_key>"
)
Local Execution
Flow.run_local()
Execute a flow locally for development and testing. Uses a ThreadPoolExecutor and tracks metadata in a local DuckDB database.
result = my_flow.run_local(
param1="value",
fail_fast=True, # Stop on first failure
max_workers=1, # Parallel task workers
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
fail_fast | bool | True | Stop execution on first task failure |
max_workers | int | 1 | Number of parallel workers |
cleanup_keep_last | int | 20 | Keep last N local run records |
CLI Commands
dagy build <module.py> # Build artifact
dagy deploy <module.py> # Build + deploy
dagy runs list # List recent runs
dagy runs show <run_id> # Show run details
dagy flows list # List deployed flows
dagy config # Configure CLI profiles