SDK Overview
The Dagy SDK is a Python library for defining, building, deploying, and running data pipelines. It provides a decorator-based API for defining flows and tasks, local execution for development, and packaging tools for cloud deployment.
Installation
pip install dagy
Or with uv:
uv add dagy
Core Concepts
Tasks
A task is the smallest unit of work in Dagy. It wraps a Python function and adds orchestration capabilities like retries, timeouts, and lifecycle hooks.
from dagy import task
@task(retries=3, timeout_seconds=60)
def extract_data(source_url: str) -> dict:
"""Fetch data from a source."""
import httpx
response = httpx.get(source_url)
return response.json()
Flows
A flow is a directed acyclic graph (DAG) of tasks. It defines the execution order and data flow between tasks. Flows are defined as Python functions that call tasks.
from dagy import flow
@flow(name="etl_pipeline", version="1.0.0")
def etl_pipeline(source_url: str, destination: str):
"""Extract, transform, and load data."""
raw = extract_data(source_url)
cleaned = transform_data(raw)
load_data(cleaned, destination)
Task Outputs
When a task is called within a flow context, it returns a TaskOutput object instead of the actual value. TaskOutputs track dependencies between tasks and enable the DAG to be constructed.
result = extract_data(source_url) # Returns TaskOutput, not dict
transform_data(result) # Creates dependency edge
Key Exports
The SDK exports these primary symbols:
from dagy import (
# Decorators
flow, # Define a flow (DAG)
task, # Define a task (node)
# Core Classes
Flow, # Flow class
Task, # Task class
FlowSpec, # Serialized DAG specification
TaskOutput, # Task output reference
# Runtime
RunContext, # Runtime context (name, attempt, parameters)
Hook, # Lifecycle hook base class
State, # Task/flow state
running, # Running state factory
completed, # Completed state factory
failed, # Failed state factory
# Deployment
DagyClient, # API client
build_artifact, # Package flow as ZIP
deploy_artifact,# Upload to S3 + register
LocalArtifact, # Local artifact for testing
# Data Models
Deployment, # Deployment metadata
Executor, # Executor configuration
Run, # Run metadata
)
Workflow
1. Define
Create tasks and flows using decorators:
from dagy import flow, task
@task(retries=2, retry_delay_seconds=5)
def fetch(url: str) -> str:
import httpx
return httpx.get(url).text
@task(timeout_seconds=30)
def parse(html: str) -> dict:
return {"title": html[:50], "length": len(html)}
@flow(name="web_scraper", version="1.0.0")
def web_scraper(url: str):
html = fetch(url)
return parse(html)
2. Test Locally
Use run_local() for development and testing:
result = web_scraper.run_local(url="https://example.com")
print(result)
3. Build
Package the flow and its dependencies into a ZIP artifact:
from dagy import build_artifact
artifact = build_artifact(web_scraper)
Or via CLI:
dagy build my_flows.py
4. Deploy
Use flow.deploy() to build, upload, and register in one call. Dagy automatically skips the deploy if the code hasn't changed (based on code_hash). Pass force=True to override:
result = web_scraper.deploy(name="web-scraper")
if result.skipped:
print("No changes detected, deploy skipped")
else:
print(result.deployment_name, result.flow_version)
Or build and deploy separately via the CLI:
# Build the artifact zip
dagy build my_flows.py:web_scraper --output-dir ./dist
# Upload to S3 and register with the API
dagy deploy ./dist/web_scraper/<build_id>/artifact.zip \
--deployment web-scraper \
--flow-name web_scraper \
--flow-version 1.0.0 \
--profile production
5. Run
Trigger runs via the CLI, Python SDK, or a schedule:
# Via CLI (remote deployment)
dagy run web-scraper --param url=https://example.com
# Via CLI (local execution)
dagy run my_flows.py:web_scraper --param url=https://example.com
# Via Python SDK
result = web_scraper.run(url="https://example.com")
print(result.run_id, result.status)
Execution Backends
Dagy routes flows to the optimal execution backend:
| Backend | Best For | Max Duration | Parallel |
|---|---|---|---|
| Lambda | Short, simple tasks | 15 min | No |
| Step Functions | Medium, parallel workflows | 1 year | Yes |
| ECS Fargate | Long-running, resource-intensive | Unlimited | Yes |
The BackendRouter automatically selects based on duration, memory, and task count. You can override with the executor parameter:
@task(executor="ecs")
def train_model(data: dict) -> dict:
# Long-running, memory-intensive task
...
Flow Node Framework
In addition to the decorator-based @flow / @task API, Dagy provides a class-based Flow Node framework for building reusable, composable nodes with typed connectors. This is the framework that powers the visual Flow Designer.
When to Use FlowNode vs @task
Use @task when you want simple Python functions with orchestration. Use FlowNode when you need typed input/output connectors, rich configuration schemas, visual representation in the Flow Designer, or reusable node packages.
Quick Example
from dagy.nodes.base import FlowNode
from dagy.nodes.connectors import ConnectorDef, ConnectorDirection, DataType
from dagy.nodes.metadata import NodeMetadata, NodeCategory
from dagy.nodes.execution import ExecutionContext, ExecutionResult
class MyTransformNode(FlowNode):
@classmethod
def metadata(cls) -> NodeMetadata:
return NodeMetadata(
node_type="my_transform",
label="My Transform",
description="Transforms data",
category=NodeCategory.TRANSFORM,
)
@classmethod
def connectors(cls) -> list[ConnectorDef]:
return [
ConnectorDef(id="data_in", name="Input", direction=ConnectorDirection.INBOUND,
data_types=frozenset({DataType.JSON}), required=True),
ConnectorDef(id="data_out", name="Output", direction=ConnectorDirection.OUTBOUND,
data_types=frozenset({DataType.JSON})),
]
async def execute(self, context: ExecutionContext) -> ExecutionResult:
data = context.get_upstream_data("data_in")
result = transform(data) # your logic
return ExecutionResult.from_success({"data_out": result})
Importing this module auto-registers the node via __init_subclass__. It immediately appears in the Node Registry API and the Flow Designer sidebar.
Key Exports (Node Framework)
from dagy.nodes import (
FlowNode, # Abstract base class
ConnectorDef, # Connector definition
ConnectorDirection, # INBOUND / OUTBOUND
DataType, # STRING, JSON, DATAFRAME, etc.
ConnectionCardinality, # ONE, ZERO_OR_MANY, etc.
NodeMetadata, # Node identity
NodeConfigField, # Config field definition
NodeRegistry, # Registry class
node_registry, # Global registry singleton
ExecutionContext, # Runtime context
ExecutionResult, # Execution output
NodeState, # PENDING, RUNNING, COMPLETED, etc.
)
For the complete guide to building custom nodes, see Creating Custom Nodes. For the full technical reference, see Flow Node Architecture.
Next Steps
- SDK API Reference: Complete decorator and class documentation
- SDK Cookbook: 57 complete example flows
- Creating Custom Nodes: Build reusable nodes for the Flow Designer
- Flow Node Architecture: Node framework design and connector model
- Retries & Timeouts: Fault tolerance configuration
- Scheduling: Cron and interval scheduling