Creating Custom Flow Nodes
This guide walks you through building custom nodes for the Dagy Flow Designer, from a minimal example to production-grade patterns. Custom nodes extend the platform's capabilities by adding domain-specific processing steps that integrate seamlessly with the visual builder and execution engine.
Prerequisites
Before creating custom nodes, you should be familiar with:
- Python 3.10+ (async/await, dataclasses, type hints)
- The Dagy SDK basics (
@flow,@taskdecorators) — see SDK Overview - The connector model concepts — see Flow Node Architecture
How Custom Nodes Work
Custom nodes are Python classes that extend the FlowNode abstract base class. When Python loads your class, the framework auto-registers it via the __init_subclass__ hook — no manual registration step needed. Once registered, your node appears in the Node Registry API, the Flow Designer sidebar under "Custom Nodes", and is available for execution in any flow.
The registration flow:
You write MyNode(FlowNode) → Python imports the module
→ __init_subclass__ fires → FlowNode._registered_subclasses["my_node"] = MyNode
→ node_registry.sync_from_subclasses() picks it up on startup
→ API serves it alongside built-in nodes
→ Flow Designer fetches it and shows it in the sidebar
Step 1: Create Your Node File
Create a new Python file in your project. For this guide, we'll build a "Sentiment Analyzer" node that classifies text as positive, negative, or neutral.
# File: src/my_org/nodes/sentiment.py
from __future__ import annotations
from dagy.nodes.base import FlowNode
from dagy.nodes.connectors import ConnectorDef, ConnectorDirection, DataType, ConnectionCardinality
from dagy.nodes.metadata import (
NodeMetadata, NodeCategory, NodeConfigField,
ConfigFieldType, SelectOption,
)
from dagy.nodes.execution import ExecutionContext, ExecutionResult
These are the core imports you'll use in every custom node. Let's break them down:
FlowNode— The abstract base class your node extendsConnectorDef— Defines an input or output port on your nodeConnectorDirection—INBOUNDorOUTBOUNDDataType— The type of data your connector accepts/produces (e.g.,STRING,JSON,DATAFRAME)ConnectionCardinality— How many connections a connector allowsNodeMetadata— Your node's identity (name, category, icon, color)NodeConfigField— A configuration field that appears in the node config panelExecutionContext— Runtime information passed to your node during executionExecutionResult— What your node returns after executing
Step 2: Define the Class and Metadata
Every node must implement three abstract methods: metadata(), connectors(), and execute().
class SentimentAnalyzerNode(FlowNode):
"""Analyze text sentiment using a configurable model."""
@classmethod
def metadata(cls) -> NodeMetadata:
return NodeMetadata(
node_type="sentiment_analyzer", # Unique identifier (snake_case)
label="Sentiment Analyzer", # Display name in the UI
description="Classify text as positive, negative, or neutral",
category=NodeCategory.LLM, # Which sidebar category
icon="ThumbsUp", # Lucide icon name
color="purple", # Node color theme
version="1.0.0", # Semantic version
author="My Org", # Your org/name
official=False, # False for custom nodes
tags=["sentiment", "nlp", "text"], # Searchable tags
documentation_url="https://docs.myorg.com/nodes/sentiment",
)
Metadata fields explained:
| Field | Required | Description |
|---|---|---|
node_type | Yes | Unique snake_case identifier. Must be globally unique across all nodes. |
label | Yes | Human-readable name shown in the Flow Designer sidebar and node header. |
description | Yes | One-line description shown in tooltips and search results. |
category | Yes | One of: INGESTION, TRANSFORM, LLM, VECTORDB, NOTIFICATION, CONTROL_FLOW, CUSTOM |
icon | No | Name of a Lucide icon (e.g., "Brain", "Database", "Sparkles"). |
color | No | Color theme: "blue", "violet", "purple", "emerald", "amber", "slate", or any Tailwind color. |
version | No | Semantic version string. Increment when the node interface changes. |
author | No | Your name or organization. |
official | No | Set to False for custom nodes. True is reserved for Dagy built-ins. |
tags | No | List of searchable keywords. Helps users find your node. |
documentation_url | No | Link to detailed documentation for this node. |
deprecated | No | Set to True to mark as deprecated. Shows a warning in the UI. |
deprecation_message | No | Message shown when a deprecated node is used. |
Step 3: Define Connectors
Connectors are the typed input/output ports on your node. They determine what data your node accepts and produces, and the Flow Designer renders them as connection handles.
@classmethod
def connectors(cls) -> list[ConnectorDef]:
return [
# --- Inbound connectors (inputs) ---
ConnectorDef(
id="text_in", # Unique ID within this node
name="Text Input", # Label shown on hover
description="Text to analyze for sentiment",
direction=ConnectorDirection.INBOUND,
data_types=frozenset({DataType.STRING, DataType.JSON}),
cardinality=ConnectionCardinality.ZERO_OR_MANY,
required=True, # DAG validation warns if unconnected
position_hint="top", # UI position
),
# --- Outbound connectors (outputs) ---
ConnectorDef(
id="positive_out",
name="Positive",
description="Text classified as positive sentiment",
direction=ConnectorDirection.OUTBOUND,
data_types=frozenset({DataType.JSON}),
cardinality=ConnectionCardinality.ZERO_OR_MANY,
position_hint="bottom-left",
),
ConnectorDef(
id="negative_out",
name="Negative",
description="Text classified as negative sentiment",
direction=ConnectorDirection.OUTBOUND,
data_types=frozenset({DataType.JSON}),
cardinality=ConnectionCardinality.ZERO_OR_MANY,
position_hint="bottom-right",
),
ConnectorDef(
id="neutral_out",
name="Neutral",
description="Text classified as neutral sentiment",
direction=ConnectorDirection.OUTBOUND,
data_types=frozenset({DataType.JSON}),
cardinality=ConnectionCardinality.ZERO_OR_MANY,
position_hint="bottom",
),
]
Connector design guidelines:
- ID naming: Use
<purpose>_in/<purpose>_outconvention (e.g.,text_in,result_out) - Data types: Accept the broadest reasonable set. If your node can handle both
STRINGandJSON, include both. - Cardinality: Use
ZERO_OR_MANYfor most cases. UseONEonly when exactly one connection is required. - Required: Set to
Truefor essential inputs. The Flow Designer shows warnings for unconnected required connectors. - Position hints: Control where handles appear on the node in the UI:
- Inbound:
"top","top-left","top-right" - Outbound:
"bottom","bottom-left","bottom-right"
- Inbound:
Data type compatibility: When a user draws a connection in the Flow Designer, the system checks type compatibility. The ANY type is a universal wildcard. Some types have implicit coercions (e.g., STRING can connect to JSON targets). See the full compatibility matrix.
Step 4: Define Configuration Schema (Optional)
If your node needs user-configurable settings, define them via config_schema(). These fields appear in the Node Configuration Panel when the user selects your node.
@classmethod
def config_schema(cls) -> list[NodeConfigField]:
return [
NodeConfigField(
name="model",
label="Sentiment Model",
field_type=ConfigFieldType.SELECT,
required=True,
default="vader",
description="Which sentiment analysis model to use",
options=[
SelectOption("VADER (fast, rule-based)", "vader"),
SelectOption("TextBlob (moderate)", "textblob"),
SelectOption("Custom API", "custom_api"),
],
),
NodeConfigField(
name="threshold",
label="Confidence Threshold",
field_type=ConfigFieldType.NUMBER,
required=False,
default=0.6,
description="Minimum confidence score to classify (0.0–1.0)",
min_value=0.0,
max_value=1.0,
),
NodeConfigField(
name="api_endpoint",
label="Custom API Endpoint",
field_type=ConfigFieldType.STRING,
required=False,
placeholder="https://api.myorg.com/sentiment",
description="Required when model is 'Custom API'",
depends_on={"model": "custom_api"}, # Only shown when model=custom_api
),
NodeConfigField(
name="api_key_ref",
label="API Key (Secret Reference)",
field_type=ConfigFieldType.SECRET_REF,
required=False,
placeholder="secret:sentiment_api_key",
description="Reference to a secret containing the API key",
depends_on={"model": "custom_api"},
),
]
Available field types:
| Type | Renders As | Use For |
|---|---|---|
STRING | Text input | Short text values (URLs, names, keys) |
NUMBER | Numeric input | Integers or floats with optional min/max |
BOOLEAN | Toggle switch | On/off flags |
SELECT | Dropdown | Choosing from a fixed list of options |
TEXTAREA | Multi-line text | Code, SQL queries, templates, prompts |
JSON | JSON editor | Complex structured configuration |
SECRET_REF | Text input with lock icon | References to secrets (e.g., secret:my_key) |
FILE_PATH | Text input with folder icon | File paths |
CODE | Code editor | Python/SQL code with syntax highlighting |
KEY_VALUE | Key-value pair editor | Dynamic dictionaries |
CONNECTION_REF | Connection picker | References to other connections |
The depends_on field: Shows a config field conditionally based on another field's value. In the example above, api_endpoint only appears when the user selects "Custom API" as the model.
Step 5: Implement Execution Logic
The execute() method contains your node's core logic. It receives an ExecutionContext and must return an ExecutionResult.
async def execute(self, context: ExecutionContext) -> ExecutionResult:
# 1. Get upstream data from connected nodes
input_data = context.get_upstream_data("text_in")
if not input_data:
return ExecutionResult.from_error("No input data received on text_in connector")
# 2. Read configuration
model = self.config.get("model", "vader")
threshold = float(self.config.get("threshold", 0.6))
context.log_info(f"Analyzing sentiment with model={model}, threshold={threshold}")
# 3. Process each text item
positive_items = []
negative_items = []
neutral_items = []
for item in (input_data if isinstance(input_data, list) else [input_data]):
text = item if isinstance(item, str) else item.get("text", str(item))
# Analyze sentiment (example using VADER)
score = await self._analyze(text, model, context)
result = {"text": text, "score": score, "model": model}
if score >= threshold:
positive_items.append(result)
elif score <= -threshold:
negative_items.append(result)
else:
neutral_items.append(result)
context.log_info(
f"Results: {len(positive_items)} positive, "
f"{len(negative_items)} negative, {len(neutral_items)} neutral"
)
# 4. Return outputs keyed by connector ID
return ExecutionResult.from_success({
"positive_out": positive_items,
"negative_out": negative_items,
"neutral_out": neutral_items,
})
async def _analyze(self, text: str, model: str, context: ExecutionContext) -> float:
"""Analyze a single text and return a sentiment score (-1.0 to 1.0)."""
if model == "vader":
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
analyzer = SentimentIntensityAnalyzer()
scores = analyzer.polarity_scores(text)
return scores["compound"]
elif model == "textblob":
from textblob import TextBlob
blob = TextBlob(text)
return blob.sentiment.polarity
elif model == "custom_api":
import httpx
endpoint = self.config["api_endpoint"]
api_key = context.get_secret(self.config.get("api_key_ref", ""))
async with httpx.AsyncClient() as client:
resp = await client.post(
endpoint,
json={"text": text},
headers={"Authorization": f"Bearer {api_key}"},
)
resp.raise_for_status()
return resp.json()["score"]
return 0.0
Key execution patterns:
context.get_upstream_data("connector_id")— Retrieves data from the named inbound connector. ReturnsNoneif nothing was sent.self.config— Dictionary of the user's configuration values from the node config panel.context.get_secret("secret:name")— Retrieves a decrypted secret by reference.context.log_info/warning/error/debug(msg)— Logs messages that appear in the execution logs.ExecutionResult.from_success(outputs)— Returns a successful result. Theoutputsdict must be keyed by outbound connector IDs.ExecutionResult.from_error(msg)— Returns a failed result.
Step 6: Add Lifecycle Hooks (Optional)
For advanced use cases, override lifecycle hooks to add setup, cleanup, error recovery, or post-processing logic.
async def validate_config(self) -> list[str]:
"""Validate configuration before execution. Return a list of error messages."""
errors = []
model = self.config.get("model")
if model == "custom_api":
if not self.config.get("api_endpoint"):
errors.append("API endpoint is required when using Custom API model")
if not self.config.get("api_key_ref"):
errors.append("API key reference is required when using Custom API model")
return errors
async def pre_execute(self, context: ExecutionContext) -> None:
"""Called before execute(). Use for resource allocation, warm-up, etc."""
context.log_debug("Warming up sentiment model...")
async def post_execute(self, context: ExecutionContext, result: ExecutionResult) -> ExecutionResult:
"""Called after execute(). Use for metrics, cleanup, or result modification."""
if result.success:
total = sum(len(v) for v in result.outputs.values() if isinstance(v, list))
context.log_info(f"Processed {total} items total")
return result
async def on_error(self, context: ExecutionContext, error: Exception) -> ExecutionResult | None:
"""Called when execute() raises an exception. Return a result to recover, or None to propagate."""
context.log_error(f"Sentiment analysis failed: {error}")
if context.attempt < context.max_retries:
context.log_info(f"Will retry (attempt {context.attempt + 1}/{context.max_retries})")
return None # Let the retry mechanism handle it
# On final failure, return partial results if available
return ExecutionResult.from_error(str(error), error_type=type(error).__name__)
def on_cancel(self, context: ExecutionContext) -> None:
"""Called synchronously when the node is cancelled. Use for cleanup."""
pass
Lifecycle execution order:
validate_config() → pre_execute() → execute() → post_execute()
↓ (on exception)
on_error()
Step 7: Make Your Node Discoverable
For the node registry to find your node at startup, its module must be imported. There are three approaches:
Option A: Add to a Package's __init__.py
If you're building a package of custom nodes:
# File: src/my_org/nodes/__init__.py
from . import sentiment # noqa: F401 — import triggers __init_subclass__
from . import dedup # noqa: F401
from . import enricher # noqa: F401
Option B: Use Package Discovery
If your nodes are in a Python package, the registry can discover them automatically:
# In your app startup code
from dagy.nodes.registry import node_registry
# Discovers all FlowNode subclasses in the package
count = node_registry.discover_package("my_org.nodes")
print(f"Discovered {count} custom nodes")
Option C: Register via the REST API
For nodes that aren't Python classes (or for dynamically registered nodes):
curl -X POST https://api.dagy.io/nodes/registry \
-H "Content-Type: application/json" \
-H "X-Org-Id: my_org" \
-d '{
"node_type": "sentiment_analyzer",
"label": "Sentiment Analyzer",
"description": "Classify text sentiment",
"category": "llm",
"icon": "ThumbsUp",
"color": "purple",
"connectors": [
{"id": "text_in", "name": "Text Input", "direction": "inbound", "data_types": ["string", "json"], "required": true},
{"id": "positive_out", "name": "Positive", "direction": "outbound", "data_types": ["json"]},
{"id": "negative_out", "name": "Negative", "direction": "outbound", "data_types": ["json"]},
{"id": "neutral_out", "name": "Neutral", "direction": "outbound", "data_types": ["json"]}
],
"config_schema": [
{"name": "model", "label": "Model", "type": "select", "required": true, "options": [
{"label": "VADER", "value": "vader"}, {"label": "TextBlob", "value": "textblob"}
]}
]
}'
Step 8: Add a Frontend Definition (Optional)
If you want your node to appear in the Flow Designer even when the backend is unavailable (offline-first), add it to node-definitions.ts:
// File: web/src/components/flow-builder/node-definitions.ts
// Add to the NODE_TYPES array:
{
type: "sentiment_analyzer",
node_type: "sentiment_analyzer",
label: "Sentiment Analyzer",
description: "Classify text as positive, negative, or neutral",
category: "llm",
icon: "ThumbsUp",
color: "purple",
official: false,
inbound_connector_count: 1,
outbound_connector_count: 3,
connectors: [
{ id: "text_in", name: "Text Input", description: "Text to analyze", direction: "inbound", data_types: ["string", "json"], cardinality: "zero_or_many", required: true, position_hint: "top", metadata: {} },
{ id: "positive_out", name: "Positive", description: "Positive sentiment results", direction: "outbound", data_types: ["json"], cardinality: "zero_or_many", required: false, position_hint: "bottom-left", metadata: {} },
{ id: "negative_out", name: "Negative", description: "Negative sentiment results", direction: "outbound", data_types: ["json"], cardinality: "zero_or_many", required: false, position_hint: "bottom-right", metadata: {} },
{ id: "neutral_out", name: "Neutral", description: "Neutral sentiment results", direction: "outbound", data_types: ["json"], cardinality: "zero_or_many", required: false, position_hint: "bottom", metadata: {} },
],
schema: {
fields: [
{ name: "model", label: "Sentiment Model", type: "select", required: true, default: "vader", options: [
{ label: "VADER", value: "vader" },
{ label: "TextBlob", value: "textblob" },
{ label: "Custom API", value: "custom_api" },
]},
{ name: "threshold", label: "Confidence Threshold", type: "number", required: false, default: 0.6 },
...commonTaskFields,
],
},
},
Note: This is only needed for nodes you want available offline. Nodes registered via the Python SDK or API are automatically fetched by the node-registry-client.ts when the user opens the Flow Designer.
Complete Example
Here's the full sentiment analyzer node in one file:
# File: src/my_org/nodes/sentiment.py
"""Sentiment Analyzer node for Dagy Flow Designer."""
from __future__ import annotations
from dagy.nodes.base import FlowNode
from dagy.nodes.connectors import ConnectorDef, ConnectorDirection, DataType, ConnectionCardinality
from dagy.nodes.metadata import (
NodeMetadata, NodeCategory, NodeConfigField,
ConfigFieldType, SelectOption,
)
from dagy.nodes.execution import ExecutionContext, ExecutionResult
class SentimentAnalyzerNode(FlowNode):
"""Analyze text sentiment and route to positive/negative/neutral outputs."""
@classmethod
def metadata(cls) -> NodeMetadata:
return NodeMetadata(
node_type="sentiment_analyzer",
label="Sentiment Analyzer",
description="Classify text as positive, negative, or neutral",
category=NodeCategory.LLM,
icon="ThumbsUp",
color="purple",
version="1.0.0",
author="My Org",
official=False,
tags=["sentiment", "nlp", "text", "classification"],
)
@classmethod
def connectors(cls) -> list[ConnectorDef]:
return [
ConnectorDef(
id="text_in",
name="Text Input",
description="Text to analyze for sentiment",
direction=ConnectorDirection.INBOUND,
data_types=frozenset({DataType.STRING, DataType.JSON}),
cardinality=ConnectionCardinality.ZERO_OR_MANY,
required=True,
position_hint="top",
),
ConnectorDef(
id="positive_out",
name="Positive",
description="Text classified as positive sentiment",
direction=ConnectorDirection.OUTBOUND,
data_types=frozenset({DataType.JSON}),
cardinality=ConnectionCardinality.ZERO_OR_MANY,
position_hint="bottom-left",
),
ConnectorDef(
id="negative_out",
name="Negative",
description="Text classified as negative sentiment",
direction=ConnectorDirection.OUTBOUND,
data_types=frozenset({DataType.JSON}),
cardinality=ConnectionCardinality.ZERO_OR_MANY,
position_hint="bottom-right",
),
ConnectorDef(
id="neutral_out",
name="Neutral",
description="Text classified as neutral sentiment",
direction=ConnectorDirection.OUTBOUND,
data_types=frozenset({DataType.JSON}),
cardinality=ConnectionCardinality.ZERO_OR_MANY,
position_hint="bottom",
),
]
@classmethod
def config_schema(cls) -> list[NodeConfigField]:
return [
NodeConfigField(
name="model",
label="Sentiment Model",
field_type=ConfigFieldType.SELECT,
required=True,
default="vader",
options=[
SelectOption("VADER (fast, rule-based)", "vader"),
SelectOption("TextBlob (moderate)", "textblob"),
SelectOption("Custom API", "custom_api"),
],
),
NodeConfigField(
name="threshold",
label="Confidence Threshold",
field_type=ConfigFieldType.NUMBER,
required=False,
default=0.6,
min_value=0.0,
max_value=1.0,
),
NodeConfigField(
name="api_endpoint",
label="Custom API Endpoint",
field_type=ConfigFieldType.STRING,
required=False,
placeholder="https://api.myorg.com/sentiment",
depends_on={"model": "custom_api"},
),
NodeConfigField(
name="api_key_ref",
label="API Key",
field_type=ConfigFieldType.SECRET_REF,
required=False,
placeholder="secret:sentiment_api_key",
depends_on={"model": "custom_api"},
),
]
async def validate_config(self) -> list[str]:
errors = []
if self.config.get("model") == "custom_api":
if not self.config.get("api_endpoint"):
errors.append("API endpoint is required for Custom API model")
if not self.config.get("api_key_ref"):
errors.append("API key reference is required for Custom API model")
return errors
async def execute(self, context: ExecutionContext) -> ExecutionResult:
input_data = context.get_upstream_data("text_in")
if not input_data:
return ExecutionResult.from_error("No input data received on text_in")
model = self.config.get("model", "vader")
threshold = float(self.config.get("threshold", 0.6))
context.log_info(f"Analyzing sentiment: model={model}, threshold={threshold}")
positive, negative, neutral = [], [], []
items = input_data if isinstance(input_data, list) else [input_data]
for item in items:
text = item if isinstance(item, str) else item.get("text", str(item))
score = await self._score(text, model, context)
result = {"text": text, "score": score, "model": model}
if score >= threshold:
positive.append(result)
elif score <= -threshold:
negative.append(result)
else:
neutral.append(result)
context.log_info(f"+{len(positive)} / -{len(negative)} / ~{len(neutral)}")
return ExecutionResult.from_success({
"positive_out": positive,
"negative_out": negative,
"neutral_out": neutral,
})
async def _score(self, text: str, model: str, ctx: ExecutionContext) -> float:
if model == "vader":
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
return SentimentIntensityAnalyzer().polarity_scores(text)["compound"]
elif model == "textblob":
from textblob import TextBlob
return TextBlob(text).sentiment.polarity
elif model == "custom_api":
import httpx
key = ctx.get_secret(self.config.get("api_key_ref", ""))
async with httpx.AsyncClient() as c:
r = await c.post(
self.config["api_endpoint"],
json={"text": text},
headers={"Authorization": f"Bearer {key}"},
)
r.raise_for_status()
return r.json()["score"]
return 0.0
async def on_error(self, context: ExecutionContext, error: Exception) -> ExecutionResult | None:
context.log_error(f"Sentiment analysis failed: {error}")
return None # Propagate error, let retry mechanism handle it
Common Patterns
Split/Route Pattern
Route data to different outputs based on conditions (like the Sentiment Analyzer above or the built-in Conditional Branch):
# Two outbound connectors with position hints for visual clarity
ConnectorDef(id="true_out", ..., position_hint="bottom-left"),
ConnectorDef(id="false_out", ..., position_hint="bottom-right"),
Fan-In / Merge Pattern
Accept data from multiple upstream nodes:
ConnectorDef(id="input_a", name="Input A", direction=ConnectorDirection.INBOUND, position_hint="top-left"),
ConnectorDef(id="input_b", name="Input B", direction=ConnectorDirection.INBOUND, position_hint="top-right"),
Error Channel Pattern
Include a dedicated error output for downstream error handling:
ConnectorDef(id="output_out", ..., position_hint="bottom-left"),
ConnectorDef(id="error_out", name="Error", data_types=frozenset({DataType.ERROR}), position_hint="bottom-right"),
Secret Handling
Always use secret references rather than raw credentials:
# In config_schema:
NodeConfigField(name="api_key", field_type=ConfigFieldType.SECRET_REF, ...)
# In execute:
api_key = context.get_secret(self.config["api_key"])
Metrics and Logging
Track performance metrics in your node:
async def execute(self, context: ExecutionContext) -> ExecutionResult:
import time
start = time.monotonic()
# ... your logic ...
elapsed = time.monotonic() - start
return ExecutionResult(
success=True,
outputs={"result_out": data},
metrics={"processing_time_ms": elapsed * 1000, "items_processed": len(data)},
# ... other fields
)
Testing Your Node
Unit Testing
import pytest
from my_org.nodes.sentiment import SentimentAnalyzerNode
from dagy.nodes.execution import ExecutionContext, ExecutionResult
@pytest.mark.asyncio
async def test_sentiment_positive():
node = SentimentAnalyzerNode(
config={"model": "vader", "threshold": 0.5},
instance_id="test-1",
)
context = ExecutionContext(
run_id="test-run",
task_run_id="test-task",
flow_name="test",
flow_version="1.0.0",
node_type="sentiment_analyzer",
node_instance_id="test-1",
upstream_results={"text_in": "I love this product! It's amazing!"},
)
result = await node.execute(context)
assert result.success
assert len(result.outputs["positive_out"]) == 1
assert len(result.outputs["negative_out"]) == 0
@pytest.mark.asyncio
async def test_validation_requires_api_endpoint():
node = SentimentAnalyzerNode(
config={"model": "custom_api"},
instance_id="test-2",
)
errors = await node.validate_config()
assert "API endpoint is required" in errors[0]
Integration Testing with the Registry
from dagy.nodes.registry import node_registry
def test_node_registers():
node_registry.sync_from_subclasses()
definition = node_registry.get("sentiment_analyzer")
assert definition is not None
assert definition.metadata().label == "Sentiment Analyzer"
assert len(definition.connectors()) == 4 # 1 in + 3 out
Versioning and Deprecation
When you need to update a node's interface (connectors or config schema):
- Increment the
versioninmetadata(). - The API stores the previous version in
DAGY_NODE_VERSIONStable. - If the change is breaking, set
deprecated=Trueon the old version and provide adeprecation_messagepointing to the new node. - Existing flows continue to work with the old version until they're updated.
@classmethod
def metadata(cls) -> NodeMetadata:
return NodeMetadata(
node_type="sentiment_analyzer_v2",
label="Sentiment Analyzer v2",
version="2.0.0",
# ... rest of metadata
)
Further Reading
- Flow Node Architecture — Full technical reference for the framework
- SDK Overview — The
@flowand@taskdecorator API - API Endpoints — Node Registry REST API reference
- Flow Builder Guide — Using the visual Flow Designer