Back to docs
Python SDK

SDK Overview

The Dagy SDK is a Python library for defining, building, deploying, and running data pipelines. It provides a decorator-based API for defining flows and tasks, local execution for development, and packaging tools for cloud deployment.

Installation

pip install dagy

Or with uv:

uv add dagy

Core Concepts

Tasks

A task is the smallest unit of work in Dagy. It wraps a Python function and adds orchestration capabilities like retries, timeouts, and lifecycle hooks.

from dagy import task

@task(retries=3, timeout_seconds=60)
def extract_data(source_url: str) -> dict:
    """Fetch data from a source."""
    import httpx
    response = httpx.get(source_url)
    return response.json()

Flows

A flow is a directed acyclic graph (DAG) of tasks. It defines the execution order and data flow between tasks. Flows are defined as Python functions that call tasks.

from dagy import flow

@flow(name="etl_pipeline", version="1.0.0")
def etl_pipeline(source_url: str, destination: str):
    """Extract, transform, and load data."""
    raw = extract_data(source_url)
    cleaned = transform_data(raw)
    load_data(cleaned, destination)

Task Outputs

When a task is called within a flow context, it returns a TaskOutput object instead of the actual value. TaskOutputs track dependencies between tasks and enable the DAG to be constructed.

result = extract_data(source_url)  # Returns TaskOutput, not dict
transform_data(result)  # Creates dependency edge

Key Exports

The SDK exports these primary symbols:

from dagy import (
    # Decorators
    flow,           # Define a flow (DAG)
    task,           # Define a task (node)

    # Core Classes
    Flow,           # Flow class
    Task,           # Task class
    FlowSpec,       # Serialized DAG specification
    TaskOutput,     # Task output reference

    # Runtime
    RunContext,      # Runtime context (name, attempt, parameters)
    Hook,           # Lifecycle hook base class
    State,          # Task/flow state
    running,        # Running state factory
    completed,      # Completed state factory
    failed,         # Failed state factory

    # Deployment
    DagyClient,     # API client
    build_artifact, # Package flow as ZIP
    deploy_artifact,# Upload to S3 + register
    LocalArtifact,  # Local artifact for testing

    # Data Models
    Deployment,     # Deployment metadata
    Executor,       # Executor configuration
    Run,            # Run metadata
)

Workflow

1. Define

Create tasks and flows using decorators:

from dagy import flow, task

@task(retries=2, retry_delay_seconds=5)
def fetch(url: str) -> str:
    import httpx
    return httpx.get(url).text

@task(timeout_seconds=30)
def parse(html: str) -> dict:
    return {"title": html[:50], "length": len(html)}

@flow(name="web_scraper", version="1.0.0")
def web_scraper(url: str):
    html = fetch(url)
    return parse(html)

2. Test Locally

Use run_local() for development and testing:

result = web_scraper.run_local(url="https://example.com")
print(result)

3. Build

Package the flow and its dependencies into a ZIP artifact:

from dagy import build_artifact
artifact = build_artifact(web_scraper)

Or via CLI:

dagy build my_flows.py

4. Deploy

Use flow.deploy() to build, upload, and register in one call. Dagy automatically skips the deploy if the code hasn't changed (based on code_hash). Pass force=True to override:

result = web_scraper.deploy(name="web-scraper")
if result.skipped:
    print("No changes detected, deploy skipped")
else:
    print(result.deployment_name, result.flow_version)

Or build and deploy separately via the CLI:

# Build the artifact zip
dagy build my_flows.py:web_scraper --output-dir ./dist

# Upload to S3 and register with the API
dagy deploy ./dist/web_scraper/<build_id>/artifact.zip \
  --deployment web-scraper \
  --flow-name web_scraper \
  --flow-version 1.0.0 \
  --profile production

5. Run

Trigger runs via the CLI, Python SDK, or a schedule:

# Via CLI (remote deployment)
dagy run web-scraper --param url=https://example.com

# Via CLI (local execution)
dagy run my_flows.py:web_scraper --param url=https://example.com
# Via Python SDK
result = web_scraper.run(url="https://example.com")
print(result.run_id, result.status)

Execution Backends

Dagy routes flows to the optimal execution backend:

BackendBest ForMax DurationParallel
LambdaShort, simple tasks15 minNo
Step FunctionsMedium, parallel workflows1 yearYes
ECS FargateLong-running, resource-intensiveUnlimitedYes

The BackendRouter automatically selects based on duration, memory, and task count. You can override with the executor parameter:

@task(executor="ecs")
def train_model(data: dict) -> dict:
    # Long-running, memory-intensive task
    ...

Flow Node Framework

In addition to the decorator-based @flow / @task API, Dagy provides a class-based Flow Node framework for building reusable, composable nodes with typed connectors. This is the framework that powers the visual Flow Designer.

When to Use FlowNode vs @task

Use @task when you want simple Python functions with orchestration. Use FlowNode when you need typed input/output connectors, rich configuration schemas, visual representation in the Flow Designer, or reusable node packages.

Quick Example

from dagy.nodes.base import FlowNode
from dagy.nodes.connectors import ConnectorDef, ConnectorDirection, DataType
from dagy.nodes.metadata import NodeMetadata, NodeCategory
from dagy.nodes.execution import ExecutionContext, ExecutionResult

class MyTransformNode(FlowNode):
    @classmethod
    def metadata(cls) -> NodeMetadata:
        return NodeMetadata(
            node_type="my_transform",
            label="My Transform",
            description="Transforms data",
            category=NodeCategory.TRANSFORM,
        )

    @classmethod
    def connectors(cls) -> list[ConnectorDef]:
        return [
            ConnectorDef(id="data_in", name="Input", direction=ConnectorDirection.INBOUND,
                        data_types=frozenset({DataType.JSON}), required=True),
            ConnectorDef(id="data_out", name="Output", direction=ConnectorDirection.OUTBOUND,
                        data_types=frozenset({DataType.JSON})),
        ]

    async def execute(self, context: ExecutionContext) -> ExecutionResult:
        data = context.get_upstream_data("data_in")
        result = transform(data)  # your logic
        return ExecutionResult.from_success({"data_out": result})

Importing this module auto-registers the node via __init_subclass__. It immediately appears in the Node Registry API and the Flow Designer sidebar.

Key Exports (Node Framework)

from dagy.nodes import (
    FlowNode,              # Abstract base class
    ConnectorDef,          # Connector definition
    ConnectorDirection,    # INBOUND / OUTBOUND
    DataType,              # STRING, JSON, DATAFRAME, etc.
    ConnectionCardinality, # ONE, ZERO_OR_MANY, etc.
    NodeMetadata,          # Node identity
    NodeConfigField,       # Config field definition
    NodeRegistry,          # Registry class
    node_registry,         # Global registry singleton
    ExecutionContext,      # Runtime context
    ExecutionResult,       # Execution output
    NodeState,             # PENDING, RUNNING, COMPLETED, etc.
)

For the complete guide to building custom nodes, see Creating Custom Nodes. For the full technical reference, see Flow Node Architecture.

Next Steps