Back to docs
Guides

Creating Custom Flow Nodes

This guide walks you through building custom nodes for the Dagy Flow Designer, from a minimal example to production-grade patterns. Custom nodes extend the platform's capabilities by adding domain-specific processing steps that integrate seamlessly with the visual builder and execution engine.

Prerequisites

Before creating custom nodes, you should be familiar with:

  • Python 3.10+ (async/await, dataclasses, type hints)
  • The Dagy SDK basics (@flow, @task decorators) — see SDK Overview
  • The connector model concepts — see Flow Node Architecture

How Custom Nodes Work

Custom nodes are Python classes that extend the FlowNode abstract base class. When Python loads your class, the framework auto-registers it via the __init_subclass__ hook — no manual registration step needed. Once registered, your node appears in the Node Registry API, the Flow Designer sidebar under "Custom Nodes", and is available for execution in any flow.

The registration flow:

You write MyNode(FlowNode) → Python imports the module
  → __init_subclass__ fires → FlowNode._registered_subclasses["my_node"] = MyNode
  → node_registry.sync_from_subclasses() picks it up on startup
  → API serves it alongside built-in nodes
  → Flow Designer fetches it and shows it in the sidebar

Step 1: Create Your Node File

Create a new Python file in your project. For this guide, we'll build a "Sentiment Analyzer" node that classifies text as positive, negative, or neutral.

# File: src/my_org/nodes/sentiment.py

from __future__ import annotations

from dagy.nodes.base import FlowNode
from dagy.nodes.connectors import ConnectorDef, ConnectorDirection, DataType, ConnectionCardinality
from dagy.nodes.metadata import (
    NodeMetadata, NodeCategory, NodeConfigField,
    ConfigFieldType, SelectOption,
)
from dagy.nodes.execution import ExecutionContext, ExecutionResult

These are the core imports you'll use in every custom node. Let's break them down:

  • FlowNode — The abstract base class your node extends
  • ConnectorDef — Defines an input or output port on your node
  • ConnectorDirectionINBOUND or OUTBOUND
  • DataType — The type of data your connector accepts/produces (e.g., STRING, JSON, DATAFRAME)
  • ConnectionCardinality — How many connections a connector allows
  • NodeMetadata — Your node's identity (name, category, icon, color)
  • NodeConfigField — A configuration field that appears in the node config panel
  • ExecutionContext — Runtime information passed to your node during execution
  • ExecutionResult — What your node returns after executing

Step 2: Define the Class and Metadata

Every node must implement three abstract methods: metadata(), connectors(), and execute().

class SentimentAnalyzerNode(FlowNode):
    """Analyze text sentiment using a configurable model."""

    @classmethod
    def metadata(cls) -> NodeMetadata:
        return NodeMetadata(
            node_type="sentiment_analyzer",       # Unique identifier (snake_case)
            label="Sentiment Analyzer",            # Display name in the UI
            description="Classify text as positive, negative, or neutral",
            category=NodeCategory.LLM,             # Which sidebar category
            icon="ThumbsUp",                       # Lucide icon name
            color="purple",                        # Node color theme
            version="1.0.0",                       # Semantic version
            author="My Org",                       # Your org/name
            official=False,                        # False for custom nodes
            tags=["sentiment", "nlp", "text"],     # Searchable tags
            documentation_url="https://docs.myorg.com/nodes/sentiment",
        )

Metadata fields explained:

FieldRequiredDescription
node_typeYesUnique snake_case identifier. Must be globally unique across all nodes.
labelYesHuman-readable name shown in the Flow Designer sidebar and node header.
descriptionYesOne-line description shown in tooltips and search results.
categoryYesOne of: INGESTION, TRANSFORM, LLM, VECTORDB, NOTIFICATION, CONTROL_FLOW, CUSTOM
iconNoName of a Lucide icon (e.g., "Brain", "Database", "Sparkles").
colorNoColor theme: "blue", "violet", "purple", "emerald", "amber", "slate", or any Tailwind color.
versionNoSemantic version string. Increment when the node interface changes.
authorNoYour name or organization.
officialNoSet to False for custom nodes. True is reserved for Dagy built-ins.
tagsNoList of searchable keywords. Helps users find your node.
documentation_urlNoLink to detailed documentation for this node.
deprecatedNoSet to True to mark as deprecated. Shows a warning in the UI.
deprecation_messageNoMessage shown when a deprecated node is used.

Step 3: Define Connectors

Connectors are the typed input/output ports on your node. They determine what data your node accepts and produces, and the Flow Designer renders them as connection handles.

    @classmethod
    def connectors(cls) -> list[ConnectorDef]:
        return [
            # --- Inbound connectors (inputs) ---
            ConnectorDef(
                id="text_in",                        # Unique ID within this node
                name="Text Input",                   # Label shown on hover
                description="Text to analyze for sentiment",
                direction=ConnectorDirection.INBOUND,
                data_types=frozenset({DataType.STRING, DataType.JSON}),
                cardinality=ConnectionCardinality.ZERO_OR_MANY,
                required=True,                       # DAG validation warns if unconnected
                position_hint="top",                 # UI position
            ),
            # --- Outbound connectors (outputs) ---
            ConnectorDef(
                id="positive_out",
                name="Positive",
                description="Text classified as positive sentiment",
                direction=ConnectorDirection.OUTBOUND,
                data_types=frozenset({DataType.JSON}),
                cardinality=ConnectionCardinality.ZERO_OR_MANY,
                position_hint="bottom-left",
            ),
            ConnectorDef(
                id="negative_out",
                name="Negative",
                description="Text classified as negative sentiment",
                direction=ConnectorDirection.OUTBOUND,
                data_types=frozenset({DataType.JSON}),
                cardinality=ConnectionCardinality.ZERO_OR_MANY,
                position_hint="bottom-right",
            ),
            ConnectorDef(
                id="neutral_out",
                name="Neutral",
                description="Text classified as neutral sentiment",
                direction=ConnectorDirection.OUTBOUND,
                data_types=frozenset({DataType.JSON}),
                cardinality=ConnectionCardinality.ZERO_OR_MANY,
                position_hint="bottom",
            ),
        ]

Connector design guidelines:

  • ID naming: Use <purpose>_in / <purpose>_out convention (e.g., text_in, result_out)
  • Data types: Accept the broadest reasonable set. If your node can handle both STRING and JSON, include both.
  • Cardinality: Use ZERO_OR_MANY for most cases. Use ONE only when exactly one connection is required.
  • Required: Set to True for essential inputs. The Flow Designer shows warnings for unconnected required connectors.
  • Position hints: Control where handles appear on the node in the UI:
    • Inbound: "top", "top-left", "top-right"
    • Outbound: "bottom", "bottom-left", "bottom-right"

Data type compatibility: When a user draws a connection in the Flow Designer, the system checks type compatibility. The ANY type is a universal wildcard. Some types have implicit coercions (e.g., STRING can connect to JSON targets). See the full compatibility matrix.

Step 4: Define Configuration Schema (Optional)

If your node needs user-configurable settings, define them via config_schema(). These fields appear in the Node Configuration Panel when the user selects your node.

    @classmethod
    def config_schema(cls) -> list[NodeConfigField]:
        return [
            NodeConfigField(
                name="model",
                label="Sentiment Model",
                field_type=ConfigFieldType.SELECT,
                required=True,
                default="vader",
                description="Which sentiment analysis model to use",
                options=[
                    SelectOption("VADER (fast, rule-based)", "vader"),
                    SelectOption("TextBlob (moderate)", "textblob"),
                    SelectOption("Custom API", "custom_api"),
                ],
            ),
            NodeConfigField(
                name="threshold",
                label="Confidence Threshold",
                field_type=ConfigFieldType.NUMBER,
                required=False,
                default=0.6,
                description="Minimum confidence score to classify (0.0–1.0)",
                min_value=0.0,
                max_value=1.0,
            ),
            NodeConfigField(
                name="api_endpoint",
                label="Custom API Endpoint",
                field_type=ConfigFieldType.STRING,
                required=False,
                placeholder="https://api.myorg.com/sentiment",
                description="Required when model is 'Custom API'",
                depends_on={"model": "custom_api"},  # Only shown when model=custom_api
            ),
            NodeConfigField(
                name="api_key_ref",
                label="API Key (Secret Reference)",
                field_type=ConfigFieldType.SECRET_REF,
                required=False,
                placeholder="secret:sentiment_api_key",
                description="Reference to a secret containing the API key",
                depends_on={"model": "custom_api"},
            ),
        ]

Available field types:

TypeRenders AsUse For
STRINGText inputShort text values (URLs, names, keys)
NUMBERNumeric inputIntegers or floats with optional min/max
BOOLEANToggle switchOn/off flags
SELECTDropdownChoosing from a fixed list of options
TEXTAREAMulti-line textCode, SQL queries, templates, prompts
JSONJSON editorComplex structured configuration
SECRET_REFText input with lock iconReferences to secrets (e.g., secret:my_key)
FILE_PATHText input with folder iconFile paths
CODECode editorPython/SQL code with syntax highlighting
KEY_VALUEKey-value pair editorDynamic dictionaries
CONNECTION_REFConnection pickerReferences to other connections

The depends_on field: Shows a config field conditionally based on another field's value. In the example above, api_endpoint only appears when the user selects "Custom API" as the model.

Step 5: Implement Execution Logic

The execute() method contains your node's core logic. It receives an ExecutionContext and must return an ExecutionResult.

    async def execute(self, context: ExecutionContext) -> ExecutionResult:
        # 1. Get upstream data from connected nodes
        input_data = context.get_upstream_data("text_in")
        if not input_data:
            return ExecutionResult.from_error("No input data received on text_in connector")

        # 2. Read configuration
        model = self.config.get("model", "vader")
        threshold = float(self.config.get("threshold", 0.6))

        context.log_info(f"Analyzing sentiment with model={model}, threshold={threshold}")

        # 3. Process each text item
        positive_items = []
        negative_items = []
        neutral_items = []

        for item in (input_data if isinstance(input_data, list) else [input_data]):
            text = item if isinstance(item, str) else item.get("text", str(item))

            # Analyze sentiment (example using VADER)
            score = await self._analyze(text, model, context)

            result = {"text": text, "score": score, "model": model}

            if score >= threshold:
                positive_items.append(result)
            elif score <= -threshold:
                negative_items.append(result)
            else:
                neutral_items.append(result)

        context.log_info(
            f"Results: {len(positive_items)} positive, "
            f"{len(negative_items)} negative, {len(neutral_items)} neutral"
        )

        # 4. Return outputs keyed by connector ID
        return ExecutionResult.from_success({
            "positive_out": positive_items,
            "negative_out": negative_items,
            "neutral_out": neutral_items,
        })

    async def _analyze(self, text: str, model: str, context: ExecutionContext) -> float:
        """Analyze a single text and return a sentiment score (-1.0 to 1.0)."""
        if model == "vader":
            from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
            analyzer = SentimentIntensityAnalyzer()
            scores = analyzer.polarity_scores(text)
            return scores["compound"]

        elif model == "textblob":
            from textblob import TextBlob
            blob = TextBlob(text)
            return blob.sentiment.polarity

        elif model == "custom_api":
            import httpx
            endpoint = self.config["api_endpoint"]
            api_key = context.get_secret(self.config.get("api_key_ref", ""))
            async with httpx.AsyncClient() as client:
                resp = await client.post(
                    endpoint,
                    json={"text": text},
                    headers={"Authorization": f"Bearer {api_key}"},
                )
                resp.raise_for_status()
                return resp.json()["score"]

        return 0.0

Key execution patterns:

  • context.get_upstream_data("connector_id") — Retrieves data from the named inbound connector. Returns None if nothing was sent.
  • self.config — Dictionary of the user's configuration values from the node config panel.
  • context.get_secret("secret:name") — Retrieves a decrypted secret by reference.
  • context.log_info/warning/error/debug(msg) — Logs messages that appear in the execution logs.
  • ExecutionResult.from_success(outputs) — Returns a successful result. The outputs dict must be keyed by outbound connector IDs.
  • ExecutionResult.from_error(msg) — Returns a failed result.

Step 6: Add Lifecycle Hooks (Optional)

For advanced use cases, override lifecycle hooks to add setup, cleanup, error recovery, or post-processing logic.

    async def validate_config(self) -> list[str]:
        """Validate configuration before execution. Return a list of error messages."""
        errors = []
        model = self.config.get("model")
        if model == "custom_api":
            if not self.config.get("api_endpoint"):
                errors.append("API endpoint is required when using Custom API model")
            if not self.config.get("api_key_ref"):
                errors.append("API key reference is required when using Custom API model")
        return errors

    async def pre_execute(self, context: ExecutionContext) -> None:
        """Called before execute(). Use for resource allocation, warm-up, etc."""
        context.log_debug("Warming up sentiment model...")

    async def post_execute(self, context: ExecutionContext, result: ExecutionResult) -> ExecutionResult:
        """Called after execute(). Use for metrics, cleanup, or result modification."""
        if result.success:
            total = sum(len(v) for v in result.outputs.values() if isinstance(v, list))
            context.log_info(f"Processed {total} items total")
        return result

    async def on_error(self, context: ExecutionContext, error: Exception) -> ExecutionResult | None:
        """Called when execute() raises an exception. Return a result to recover, or None to propagate."""
        context.log_error(f"Sentiment analysis failed: {error}")
        if context.attempt < context.max_retries:
            context.log_info(f"Will retry (attempt {context.attempt + 1}/{context.max_retries})")
            return None  # Let the retry mechanism handle it
        # On final failure, return partial results if available
        return ExecutionResult.from_error(str(error), error_type=type(error).__name__)

    def on_cancel(self, context: ExecutionContext) -> None:
        """Called synchronously when the node is cancelled. Use for cleanup."""
        pass

Lifecycle execution order:

validate_config()  →  pre_execute()  →  execute()  →  post_execute()
                                          ↓ (on exception)
                                       on_error()

Step 7: Make Your Node Discoverable

For the node registry to find your node at startup, its module must be imported. There are three approaches:

Option A: Add to a Package's __init__.py

If you're building a package of custom nodes:

# File: src/my_org/nodes/__init__.py
from . import sentiment  # noqa: F401 — import triggers __init_subclass__
from . import dedup      # noqa: F401
from . import enricher   # noqa: F401

Option B: Use Package Discovery

If your nodes are in a Python package, the registry can discover them automatically:

# In your app startup code
from dagy.nodes.registry import node_registry

# Discovers all FlowNode subclasses in the package
count = node_registry.discover_package("my_org.nodes")
print(f"Discovered {count} custom nodes")

Option C: Register via the REST API

For nodes that aren't Python classes (or for dynamically registered nodes):

curl -X POST https://api.dagy.io/nodes/registry \
  -H "Content-Type: application/json" \
  -H "X-Org-Id: my_org" \
  -d '{
    "node_type": "sentiment_analyzer",
    "label": "Sentiment Analyzer",
    "description": "Classify text sentiment",
    "category": "llm",
    "icon": "ThumbsUp",
    "color": "purple",
    "connectors": [
      {"id": "text_in", "name": "Text Input", "direction": "inbound", "data_types": ["string", "json"], "required": true},
      {"id": "positive_out", "name": "Positive", "direction": "outbound", "data_types": ["json"]},
      {"id": "negative_out", "name": "Negative", "direction": "outbound", "data_types": ["json"]},
      {"id": "neutral_out", "name": "Neutral", "direction": "outbound", "data_types": ["json"]}
    ],
    "config_schema": [
      {"name": "model", "label": "Model", "type": "select", "required": true, "options": [
        {"label": "VADER", "value": "vader"}, {"label": "TextBlob", "value": "textblob"}
      ]}
    ]
  }'

Step 8: Add a Frontend Definition (Optional)

If you want your node to appear in the Flow Designer even when the backend is unavailable (offline-first), add it to node-definitions.ts:

// File: web/src/components/flow-builder/node-definitions.ts
// Add to the NODE_TYPES array:

{
  type: "sentiment_analyzer",
  node_type: "sentiment_analyzer",
  label: "Sentiment Analyzer",
  description: "Classify text as positive, negative, or neutral",
  category: "llm",
  icon: "ThumbsUp",
  color: "purple",
  official: false,
  inbound_connector_count: 1,
  outbound_connector_count: 3,
  connectors: [
    { id: "text_in", name: "Text Input", description: "Text to analyze", direction: "inbound", data_types: ["string", "json"], cardinality: "zero_or_many", required: true, position_hint: "top", metadata: {} },
    { id: "positive_out", name: "Positive", description: "Positive sentiment results", direction: "outbound", data_types: ["json"], cardinality: "zero_or_many", required: false, position_hint: "bottom-left", metadata: {} },
    { id: "negative_out", name: "Negative", description: "Negative sentiment results", direction: "outbound", data_types: ["json"], cardinality: "zero_or_many", required: false, position_hint: "bottom-right", metadata: {} },
    { id: "neutral_out", name: "Neutral", description: "Neutral sentiment results", direction: "outbound", data_types: ["json"], cardinality: "zero_or_many", required: false, position_hint: "bottom", metadata: {} },
  ],
  schema: {
    fields: [
      { name: "model", label: "Sentiment Model", type: "select", required: true, default: "vader", options: [
        { label: "VADER", value: "vader" },
        { label: "TextBlob", value: "textblob" },
        { label: "Custom API", value: "custom_api" },
      ]},
      { name: "threshold", label: "Confidence Threshold", type: "number", required: false, default: 0.6 },
      ...commonTaskFields,
    ],
  },
},

Note: This is only needed for nodes you want available offline. Nodes registered via the Python SDK or API are automatically fetched by the node-registry-client.ts when the user opens the Flow Designer.

Complete Example

Here's the full sentiment analyzer node in one file:

# File: src/my_org/nodes/sentiment.py
"""Sentiment Analyzer node for Dagy Flow Designer."""

from __future__ import annotations

from dagy.nodes.base import FlowNode
from dagy.nodes.connectors import ConnectorDef, ConnectorDirection, DataType, ConnectionCardinality
from dagy.nodes.metadata import (
    NodeMetadata, NodeCategory, NodeConfigField,
    ConfigFieldType, SelectOption,
)
from dagy.nodes.execution import ExecutionContext, ExecutionResult


class SentimentAnalyzerNode(FlowNode):
    """Analyze text sentiment and route to positive/negative/neutral outputs."""

    @classmethod
    def metadata(cls) -> NodeMetadata:
        return NodeMetadata(
            node_type="sentiment_analyzer",
            label="Sentiment Analyzer",
            description="Classify text as positive, negative, or neutral",
            category=NodeCategory.LLM,
            icon="ThumbsUp",
            color="purple",
            version="1.0.0",
            author="My Org",
            official=False,
            tags=["sentiment", "nlp", "text", "classification"],
        )

    @classmethod
    def connectors(cls) -> list[ConnectorDef]:
        return [
            ConnectorDef(
                id="text_in",
                name="Text Input",
                description="Text to analyze for sentiment",
                direction=ConnectorDirection.INBOUND,
                data_types=frozenset({DataType.STRING, DataType.JSON}),
                cardinality=ConnectionCardinality.ZERO_OR_MANY,
                required=True,
                position_hint="top",
            ),
            ConnectorDef(
                id="positive_out",
                name="Positive",
                description="Text classified as positive sentiment",
                direction=ConnectorDirection.OUTBOUND,
                data_types=frozenset({DataType.JSON}),
                cardinality=ConnectionCardinality.ZERO_OR_MANY,
                position_hint="bottom-left",
            ),
            ConnectorDef(
                id="negative_out",
                name="Negative",
                description="Text classified as negative sentiment",
                direction=ConnectorDirection.OUTBOUND,
                data_types=frozenset({DataType.JSON}),
                cardinality=ConnectionCardinality.ZERO_OR_MANY,
                position_hint="bottom-right",
            ),
            ConnectorDef(
                id="neutral_out",
                name="Neutral",
                description="Text classified as neutral sentiment",
                direction=ConnectorDirection.OUTBOUND,
                data_types=frozenset({DataType.JSON}),
                cardinality=ConnectionCardinality.ZERO_OR_MANY,
                position_hint="bottom",
            ),
        ]

    @classmethod
    def config_schema(cls) -> list[NodeConfigField]:
        return [
            NodeConfigField(
                name="model",
                label="Sentiment Model",
                field_type=ConfigFieldType.SELECT,
                required=True,
                default="vader",
                options=[
                    SelectOption("VADER (fast, rule-based)", "vader"),
                    SelectOption("TextBlob (moderate)", "textblob"),
                    SelectOption("Custom API", "custom_api"),
                ],
            ),
            NodeConfigField(
                name="threshold",
                label="Confidence Threshold",
                field_type=ConfigFieldType.NUMBER,
                required=False,
                default=0.6,
                min_value=0.0,
                max_value=1.0,
            ),
            NodeConfigField(
                name="api_endpoint",
                label="Custom API Endpoint",
                field_type=ConfigFieldType.STRING,
                required=False,
                placeholder="https://api.myorg.com/sentiment",
                depends_on={"model": "custom_api"},
            ),
            NodeConfigField(
                name="api_key_ref",
                label="API Key",
                field_type=ConfigFieldType.SECRET_REF,
                required=False,
                placeholder="secret:sentiment_api_key",
                depends_on={"model": "custom_api"},
            ),
        ]

    async def validate_config(self) -> list[str]:
        errors = []
        if self.config.get("model") == "custom_api":
            if not self.config.get("api_endpoint"):
                errors.append("API endpoint is required for Custom API model")
            if not self.config.get("api_key_ref"):
                errors.append("API key reference is required for Custom API model")
        return errors

    async def execute(self, context: ExecutionContext) -> ExecutionResult:
        input_data = context.get_upstream_data("text_in")
        if not input_data:
            return ExecutionResult.from_error("No input data received on text_in")

        model = self.config.get("model", "vader")
        threshold = float(self.config.get("threshold", 0.6))

        context.log_info(f"Analyzing sentiment: model={model}, threshold={threshold}")

        positive, negative, neutral = [], [], []
        items = input_data if isinstance(input_data, list) else [input_data]

        for item in items:
            text = item if isinstance(item, str) else item.get("text", str(item))
            score = await self._score(text, model, context)
            result = {"text": text, "score": score, "model": model}

            if score >= threshold:
                positive.append(result)
            elif score <= -threshold:
                negative.append(result)
            else:
                neutral.append(result)

        context.log_info(f"+{len(positive)} / -{len(negative)} / ~{len(neutral)}")
        return ExecutionResult.from_success({
            "positive_out": positive,
            "negative_out": negative,
            "neutral_out": neutral,
        })

    async def _score(self, text: str, model: str, ctx: ExecutionContext) -> float:
        if model == "vader":
            from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
            return SentimentIntensityAnalyzer().polarity_scores(text)["compound"]
        elif model == "textblob":
            from textblob import TextBlob
            return TextBlob(text).sentiment.polarity
        elif model == "custom_api":
            import httpx
            key = ctx.get_secret(self.config.get("api_key_ref", ""))
            async with httpx.AsyncClient() as c:
                r = await c.post(
                    self.config["api_endpoint"],
                    json={"text": text},
                    headers={"Authorization": f"Bearer {key}"},
                )
                r.raise_for_status()
                return r.json()["score"]
        return 0.0

    async def on_error(self, context: ExecutionContext, error: Exception) -> ExecutionResult | None:
        context.log_error(f"Sentiment analysis failed: {error}")
        return None  # Propagate error, let retry mechanism handle it

Common Patterns

Split/Route Pattern

Route data to different outputs based on conditions (like the Sentiment Analyzer above or the built-in Conditional Branch):

# Two outbound connectors with position hints for visual clarity
ConnectorDef(id="true_out", ..., position_hint="bottom-left"),
ConnectorDef(id="false_out", ..., position_hint="bottom-right"),

Fan-In / Merge Pattern

Accept data from multiple upstream nodes:

ConnectorDef(id="input_a", name="Input A", direction=ConnectorDirection.INBOUND, position_hint="top-left"),
ConnectorDef(id="input_b", name="Input B", direction=ConnectorDirection.INBOUND, position_hint="top-right"),

Error Channel Pattern

Include a dedicated error output for downstream error handling:

ConnectorDef(id="output_out", ..., position_hint="bottom-left"),
ConnectorDef(id="error_out", name="Error", data_types=frozenset({DataType.ERROR}), position_hint="bottom-right"),

Secret Handling

Always use secret references rather than raw credentials:

# In config_schema:
NodeConfigField(name="api_key", field_type=ConfigFieldType.SECRET_REF, ...)

# In execute:
api_key = context.get_secret(self.config["api_key"])

Metrics and Logging

Track performance metrics in your node:

async def execute(self, context: ExecutionContext) -> ExecutionResult:
    import time
    start = time.monotonic()

    # ... your logic ...

    elapsed = time.monotonic() - start
    return ExecutionResult(
        success=True,
        outputs={"result_out": data},
        metrics={"processing_time_ms": elapsed * 1000, "items_processed": len(data)},
        # ... other fields
    )

Testing Your Node

Unit Testing

import pytest
from my_org.nodes.sentiment import SentimentAnalyzerNode
from dagy.nodes.execution import ExecutionContext, ExecutionResult

@pytest.mark.asyncio
async def test_sentiment_positive():
    node = SentimentAnalyzerNode(
        config={"model": "vader", "threshold": 0.5},
        instance_id="test-1",
    )
    context = ExecutionContext(
        run_id="test-run",
        task_run_id="test-task",
        flow_name="test",
        flow_version="1.0.0",
        node_type="sentiment_analyzer",
        node_instance_id="test-1",
        upstream_results={"text_in": "I love this product! It's amazing!"},
    )
    result = await node.execute(context)
    assert result.success
    assert len(result.outputs["positive_out"]) == 1
    assert len(result.outputs["negative_out"]) == 0

@pytest.mark.asyncio
async def test_validation_requires_api_endpoint():
    node = SentimentAnalyzerNode(
        config={"model": "custom_api"},
        instance_id="test-2",
    )
    errors = await node.validate_config()
    assert "API endpoint is required" in errors[0]

Integration Testing with the Registry

from dagy.nodes.registry import node_registry

def test_node_registers():
    node_registry.sync_from_subclasses()
    definition = node_registry.get("sentiment_analyzer")
    assert definition is not None
    assert definition.metadata().label == "Sentiment Analyzer"
    assert len(definition.connectors()) == 4  # 1 in + 3 out

Versioning and Deprecation

When you need to update a node's interface (connectors or config schema):

  1. Increment the version in metadata().
  2. The API stores the previous version in DAGY_NODE_VERSIONS table.
  3. If the change is breaking, set deprecated=True on the old version and provide a deprecation_message pointing to the new node.
  4. Existing flows continue to work with the old version until they're updated.
@classmethod
def metadata(cls) -> NodeMetadata:
    return NodeMetadata(
        node_type="sentiment_analyzer_v2",
        label="Sentiment Analyzer v2",
        version="2.0.0",
        # ... rest of metadata
    )

Further Reading