Back to docs
Python SDK

SDK Examples

This page provides a quick overview of common Dagy SDK patterns. For the complete collection of 57 example flows covering all use cases, see the [SDK Cookbook](cookbook.md).

This page provides a quick overview of common Dagy SDK patterns. For the complete collection of 57 example flows covering all use cases, see the SDK Cookbook.

Quick Examples

Hello World

from dagy import flow, task

@task
def greet(name: str) -> str:
    return f"Hello, {name}!"

@flow(name="hello_world", version="1.0.0")
def hello_world(name: str = "World"):
    return greet(name)

# Test locally
result = hello_world.run_local(name="Dagy")
print(result.run_id, result.status)

Task Chain with Data Passing

@task
def extract(source: str) -> list:
    return [{"id": 1}, {"id": 2}]

@task
def transform(records: list) -> list:
    return [r | {"processed": True} for r in records]

@task
def load(records: list, destination: str) -> int:
    return len(records)

@flow(name="etl", version="1.0.0")
def etl_pipeline(source: str, destination: str):
    raw = extract(source)
    cleaned = transform(raw)
    return load(cleaned, destination)

# Run locally
result = etl_pipeline.run_local(source="input.json", destination="output/")
print(result.run_id, result.status)

Retries with Exponential Backoff

@task(retries=3, retry_delay_seconds=[2, 10, 60])
def call_api(url: str) -> dict:
    import httpx
    response = httpx.get(url, timeout=30)
    response.raise_for_status()
    return response.json()

Timeout Protection

@task(timeout_seconds=300, retries=1)
def long_computation(data: list) -> dict:
    # Hard-killed after 5 minutes
    ...

Backend Selection

@task(executor="ecs")  # Force ECS for resource-intensive work
def train_model(dataset_path: str) -> dict:
    ...

@task(executor="lambda")  # Force Lambda for quick tasks
def notify(result: dict) -> None:
    ...

Deploy and Run

# One-step: build, upload, register, and trigger a run
result = etl_pipeline.deploy(
    name="daily-etl",
    flow_kwargs={"source": "s3://bucket/data.json", "destination": "s3://bucket/out/"},
    schedule="0 6 * * *",  # daily at 06:00 UTC
)
print(result.run_id, result.status)

Or split into build + deploy using the CLI:

# Build the artifact
dagy build etl_pipeline.py:etl_pipeline --output-dir ./dist

# Upload and register
dagy deploy ./dist/etl/<build_id>/artifact.zip \
  --deployment daily-etl \
  --flow-name etl \
  --flow-version 1.0.0 \
  --schedule "0 6 * * *"

Deploy with Execution Mode

Use execution_mode to control how the Flow Executor backend runs your flow:

# Deploy with micro execution (each task runs in its own Lambda)
result = etl_pipeline.deploy(
    name="daily-etl",
    execution_mode="micro",
    flow_kwargs={"source": "s3://bucket/data.json"},
)

# Deploy with nano execution (entire flow in one Lambda)
result = etl_pipeline.deploy(
    name="fast-etl",
    execution_mode="nano",
)

Or via the CLI:

dagy deploy ./dist/artifact.zip \
  --deployment daily-etl \
  --flow-name etl \
  --flow-version 1.0.0 \
  --execution-mode micro

Update Deployment Settings

After a flow is deployed, update its runtime settings without redeploying the artifact:

# Change execution mode and schedule via the API
curl -X PUT https://api.dagy.io/v1/deployments/daily-etl/settings \
  -H "Authorization: Bearer {token}" \
  -H "Content-Type: application/json" \
  -d '{
    "execution_mode": "micro",
    "schedule": "0 9 * * 1-5",
    "dep_package_slugs": ["pandas-layer", "numpy-utils"]
  }'

You can also update settings from the web UI via the Flow Settings dialog on the Flows page. Only the fields you include in the request are changed; omitted fields remain unchanged. See the API reference for all available fields.

Task Logging

Any print() call inside a task is captured as a task_log event, visible in the web UI's Event Log (Tree, Gantt, and Logs views). Click a task bar in the Gantt chart to open a floating panel showing that task's logs, timing, and errors.

Use task logs to record meaningful progress: input sizes, computed stats, validation outcomes, and quality gate decisions rather than generic "started" / "done" messages.

@task(timeout_seconds=30)
def extract_user_features(raw: dict) -> dict:
    print(f"Processing {len(raw['events'])} events for user-level features")
    # ... feature extraction logic ...
    print(f"Extracted features for {len(features)} users ({buyers} buyers, avg purchase_rate={avg_rate:.4f})")
    return {"type": "user_features", "count": len(features), "features": features}

@task(timeout_seconds=30)
def validate_features(matrix: dict) -> dict:
    dims = matrix["dimensions"]
    print(f"Validating feature matrix: {dims['users']} users, {dims['sessions']} sessions, {dims['products']} products")
    # ... validation logic ...
    print(f"Validation {'PASSED' if is_valid else 'FAILED'}: {critical} critical, {warnings} warnings")
    return {"valid": is_valid, "issues": issues, ...}

@task(timeout_seconds=30)
def detect_feature_drift(matrix: dict) -> dict:
    print("Comparing current distributions against historical baselines (z-threshold=2.0)")
    # ... drift detection logic ...
    for feat, r in drift_results.items():
        status = "DRIFTED" if r["drifted"] else "stable"
        print(f"  {feat}: current_mean={r['current_mean']}, baseline={r['baseline_mean']}, z={r['z_score']} [{status}]")
    print(f"Drift check complete: {drifted_count}/{len(drift_results)} features drifted")
    return {"drift_detected": any_drift, ...}

For a complete 20-task example with logging in every task, see examples/09_complex/02_ml_feature_pipeline.py.

Complex Pipeline Examples

The examples/09_complex/ directory contains production-grade pipelines that combine multiple Dagy patterns:

ExampleTasksPatterns
01_multi_source_etl.py153 parallel sources → merge → validate + enrich → quality gate → multi-format output
02_ml_feature_pipeline.py20Ingest → 5 parallel feature branches → assemble → validate + drift detect → training set + 3 artifact outputs

ml_feature_pipeline demonstrates:

  • Wide fan-out / fan-in: raw events split into 5 independent feature extraction branches (user, session, product, temporal, interaction), then merge into a unified feature matrix
  • Nested diamond: the feature matrix feeds parallel validation and drift detection, which converge in a quality reconciliation gate
  • Structured task logging: every task logs input sizes, computed statistics, drift scores, and gate decisions via print(), visible per-task in the Gantt chart's floating log panel
  • Metrics hooks: reusable _metrics_hook() factory collects phase timings across all tasks
  • Retry with jitter: load_feature_registry retries transient errors with retry_condition_fn
  • Timeout protection: 30–60s timeout_seconds on every task
  • Multi-artifact output: feature_matrix.json, drift_report.txt, data_quality_card.json
ingest_raw_events ─┬─► extract_user_features ──────────┐
                   ├─► extract_session_features ────────┤
                   ├─► extract_product_features ────────┤
                   ├─► extract_temporal_features ───────┼─► assemble_feature_matrix ─┬─► validate_features ──────┐
                   └─► extract_interaction_features ────┘                            │                          │
                                                                                      └─► detect_feature_drift ──┼─► reconcile_quality
                                                                                                                  │
load_feature_registry ────────────────────────────────────────────────────────────────────────────────────────────┘
                                                                                                                  │
                                                                                      ┌──────────────────────────┘
                                                                                      ▼
                                                                             build_training_set ─┬─► export_feature_matrix
                                                                                                  ├─► generate_drift_report
                                                                                                  └─► generate_data_quality_card

More Examples

The SDK Cookbook contains 57 complete examples organized into 10 categories: getting started, data engineering, API integrations, ML/AI, file processing, retry patterns, scheduling, notifications, advanced patterns, and real-world scenarios.