Dagy Flow Node Architecture
---
Comprehensive reference for the flow node framework — design, lifecycle, connectors, extensibility, APIs, and persistence.
1. Framework Design Overview
The Dagy Flow Node Architecture provides a unified, type-safe framework for defining, registering, executing, and composing workflow nodes. It spans three layers:
Python SDK (src/dagy/nodes/) — The core framework where nodes are defined as Python classes extending FlowNode. Each node declares its metadata, typed connectors, configuration schema, and async execution logic.
FastAPI Backend (src/dagy_api/nodes/) — REST API for node registration, discovery, and connection validation. Custom nodes are persisted in DynamoDB and served alongside built-in nodes.
Next.js Frontend (web/src/components/flow-builder/) — The visual Flow Designer where users drag-drop generic Task nodes, configure code execution, and connect them via typed connectors with real-time validation. Task nodes are universally applicable and execute arbitrary code; the backend framework supports custom node types but the frontend presents only the Task abstraction.
Design Principles
- Convention over configuration: Nodes auto-register via
__init_subclass__— just subclassFlowNodeand your node is discoverable. - Type safety end-to-end: Connector data types are enforced at connection time (both frontend and backend), preventing invalid pipelines.
- Backward compatibility: Legacy node definitions without connectors are automatically wrapped with default input/output connectors.
- Separation of concerns: Node definition (metadata + connectors + schema) is fully separate from execution logic.
2. Node Lifecycle
A flow node progresses through a well-defined lifecycle during execution:
PENDING → QUEUED → RUNNING → COMPLETED
↘ FAILED → RETRYING → RUNNING
↘ TIMED_OUT
↘ CANCELLED
↘ SKIPPED
Lifecycle Hooks
The FlowNode.run() method orchestrates the full lifecycle:
async def run(self, context: ExecutionContext) -> ExecutionResult:
# 1. Validate configuration
errors = await self.validate_config()
if errors:
return ExecutionResult.from_error(...)
# 2. Pre-execution hook (setup, resource allocation)
await self.pre_execute(context)
# 3. Execute the node's core logic
result = await self.execute(context)
# 4. Post-execution hook (cleanup, metric recording)
result = await self.post_execute(context, result)
return result
If an exception occurs during execution, the on_error() hook is called, which can return a recovery ExecutionResult or None to propagate the error.
State Transitions
| State | Description |
|---|---|
PENDING | Node is defined but not yet scheduled |
QUEUED | Node is waiting for upstream dependencies |
RUNNING | Node is actively executing |
COMPLETED | Execution finished successfully |
FAILED | Execution failed (may trigger retry) |
RETRYING | Failed execution is being retried |
TIMED_OUT | Execution exceeded the timeout |
CANCELLED | Execution was cancelled externally |
SKIPPED | Node was skipped (conditional branch) |
3. Connector Model
Connectors are the typed ports through which nodes exchange data. Every node declares its connectors via the connectors() class method.
Connector Definition
@dataclass(frozen=True)
class ConnectorDef:
id: str # Unique within the node (e.g., "data_in", "output")
name: str # Human-readable name
description: str # Tooltip text
direction: ConnectorDirection # INBOUND or OUTBOUND
data_types: frozenset[DataType] # Accepted/produced types
cardinality: ConnectionCardinality # How many connections allowed
required: bool = False # Must be connected for valid DAG
position_hint: str | None = None # UI positioning ("top", "bottom-left", etc.)
metadata: dict = field(default_factory=dict)
Frontend Task Nodes: All Task nodes in the frontend are created with default connectors:
- Input (id:
"input", inbound, top) — Acceptsanydata type,zero_or_manycardinality - Output (id:
"output", outbound, bottom) — Producesanydata type,zero_or_manycardinality
Data Types
The framework defines 15 data types with an implicit compatibility matrix:
| Type | Can implicitly connect to |
|---|---|
any | Everything (wildcard) |
string | json, document |
number | string |
boolean | string, number |
json | string, document |
list | json |
dataframe | json, list |
embedding | list |
event | json |
Direct type matches always succeed. The any type is a universal wildcard — it connects to everything.
Connection Cardinality
| Cardinality | Max connections |
|---|---|
one | Exactly 1 |
zero_or_one | 0 or 1 |
zero_or_many | Unlimited |
one_or_many | At least 1 |
Connection Validation
Connections are validated at three levels:
- Direction: Source must be
outbound, target must beinbound - Type compatibility: At least one source type must be compatible with at least one target type
- Cardinality: Neither connector may exceed its maximum connection count
from dagy.nodes.connectors import validate_connection
errors = validate_connection(
source_connector=src_node.get_connector("data_out"),
target_connector=tgt_node.get_connector("data_in"),
existing_source_connections=1,
existing_target_connections=0,
)
# errors: List[str] — empty means valid
Frontend Connector Rendering
Each connector renders as a React Flow Handle positioned dynamically based on position_hint:
"top","top-left","top-right"→ Inbound handles along the top edge"bottom","bottom-left","bottom-right"→ Outbound handles along the bottom edge
Hovering a connector shows a tooltip with its name, description, data types, and required status. Multiple connectors on the same edge are spaced evenly using leftPercent = (index + 1) / (count + 1) * 100.
4. Extension Pattern: Custom Nodes
Creating a Custom Node (Python)
from dagy.nodes.base import FlowNode
from dagy.nodes.connectors import ConnectorDef, ConnectorDirection, DataType
from dagy.nodes.metadata import NodeMetadata, NodeCategory, NodeConfigField, ConfigFieldType
from dagy.nodes.execution import ExecutionContext, ExecutionResult
class MyCustomNode(FlowNode):
"""My custom data processing node."""
@classmethod
def metadata(cls) -> NodeMetadata:
return NodeMetadata(
node_type="my_custom_node",
label="My Custom Node",
description="Processes data in a custom way",
category=NodeCategory.TRANSFORM,
icon="Wand2",
color="violet",
official=False,
tags=["custom", "transform"],
)
@classmethod
def connectors(cls) -> list[ConnectorDef]:
return [
ConnectorDef(
id="data_in",
name="Input Data",
description="Data to process",
direction=ConnectorDirection.INBOUND,
data_types=frozenset({DataType.JSON, DataType.DATAFRAME}),
required=True,
),
ConnectorDef(
id="result_out",
name="Result",
description="Processed output",
direction=ConnectorDirection.OUTBOUND,
data_types=frozenset({DataType.JSON}),
),
]
@classmethod
def config_schema(cls) -> list[NodeConfigField]:
return [
NodeConfigField(
name="transform_mode",
label="Transform Mode",
field_type=ConfigFieldType.SELECT,
required=True,
default="normalize",
options=[
SelectOption("Normalize", "normalize"),
SelectOption("Aggregate", "aggregate"),
],
),
]
async def execute(self, context: ExecutionContext) -> ExecutionResult:
input_data = context.get_upstream_data("data_in")
mode = self.config.get("transform_mode", "normalize")
context.log_info(f"Processing with mode={mode}")
result = {"processed": True, "mode": mode}
return ExecutionResult.from_success({"result_out": result})
That's it. Because FlowNode uses __init_subclass__, importing this module auto-registers the node. It will appear in the node registry, the API, and the Flow Designer.
Registering via API (for non-Python nodes)
POST /nodes/registry
{
"node_type": "my_api_node",
"label": "My API Node",
"description": "Registered via API",
"category": "custom",
"icon": "Plug",
"color": "blue",
"connectors": [
{"id": "in", "name": "Input", "direction": "inbound", "data_types": ["any"]},
{"id": "out", "name": "Output", "direction": "outbound", "data_types": ["json"]}
],
"config_schema": [
{"name": "api_url", "label": "API URL", "type": "string", "required": true}
]
}
5. Registration and Discovery Flow
Auto-Registration (Python)
1. Developer creates MyNode(FlowNode) in my_nodes.py
2. Python's __init_subclass__ fires → FlowNode._registered_subclasses["my_node"] = MyNode
3. On app startup: node_registry.sync_from_subclasses() picks up all registered classes
4. API serves them alongside DynamoDB-stored custom nodes
Package Discovery
from dagy.nodes.registry import node_registry
# Discover all nodes in a package
count = node_registry.discover_package("dagy.nodes.builtin")
# Walks all submodules, imports them, triggering __init_subclass__
# Discover built-in nodes specifically
count = node_registry.discover_builtin()
# Imports dagy.nodes.builtin which imports all builtin modules
Registry Singleton
The node_registry singleton provides:
register(node_cls)— Register a node classget(node_type)→Type[FlowNode]orNonelist_all()→ List of definition dictslist_by_category(category)→ Filtered listsearch(query)→ Full-text search across type, label, description, tagscreate_instance(node_type, config, instance_id)→ Instantiated node
6. Execution Model
ExecutionContext
Every node receives an ExecutionContext containing:
run_id,task_run_id— Identifiers for the current executionflow_name,flow_version— Which flow is runningnode_type,node_instance_id— Which node instanceattempt,max_retries— Retry trackingparameters— Runtime parameters passed to the flowsecrets— Decrypted secrets (accessed viaget_secret())upstream_results— Output from upstream nodes (accessed viaget_upstream_data())environment,correlation_id,metadata— Additional context
ExecutionResult
@dataclass
class ExecutionResult:
success: bool
outputs: dict[str, Any] # Keyed by connector ID
error: str | None
error_type: str | None
state: NodeState
elapsed_seconds: float
logs: list[str]
metrics: dict[str, float]
metadata: dict[str, Any]
@classmethod
def from_success(cls, outputs: dict) -> "ExecutionResult": ...
@classmethod
def from_error(cls, error: str, error_type: str = "ExecutionError") -> "ExecutionResult": ...
Outputs are keyed by connector ID (e.g., {"data_out": [...], "metadata_out": {...}}), enabling the execution engine to route data to the correct downstream connectors.
Error Handling and Retries
The execution engine handles retries externally based on the node's retry configuration. The lifecycle:
execute() fails → on_error() called → if returns result, use it
→ if returns None, mark FAILED
→ if attempt < max_retries, mark RETRYING → re-execute
7. API Contracts
Node Registry Endpoints
| Method | Path | Description |
|---|---|---|
GET | /nodes/registry | List all nodes (builtin + custom for org) |
GET | /nodes/registry/builtin | List built-in nodes only |
GET | /nodes/registry/custom | List org's custom nodes |
GET | /nodes/registry/categories | List categories with counts |
GET | /nodes/registry/search?q=... | Search nodes by query |
GET | /nodes/registry/{node_type} | Get single node definition |
POST | /nodes/registry | Register a custom node |
PUT | /nodes/registry/{node_type} | Update a custom node |
DELETE | /nodes/registry/{node_type} | Delete a custom node |
POST | /nodes/validate-connection | Validate a connection |
Request/Response Examples
List nodes:
GET /nodes/registry?org_id=my_org
→ { "items": [...], "total": 27, "builtin_count": 24, "custom_count": 3 }
Register custom node:
POST /nodes/registry
Content-Type: application/json
X-Org-Id: my_org
{
"node_type": "custom_enricher",
"label": "Data Enricher",
"description": "Enriches records with external data",
"category": "transform",
"icon": "Sparkles",
"color": "violet",
"connectors": [...],
"config_schema": [...]
}
→ { "node_type": "custom_enricher", "label": "Data Enricher", ... }
Validate connection:
POST /nodes/validate-connection
{
"source_node_type": "s3_ingest",
"source_connector_id": "data_out",
"target_node_type": "schema_validator",
"target_connector_id": "data_in",
"existing_source_connections": 0,
"existing_target_connections": 0
}
→ { "valid": true, "errors": [] }
Frontend Registry Client
The node-registry-client.ts provides a cached client:
import { fetchNodeDefinitions, searchNodeDefinitions } from "./node-registry-client";
// Fetches from backend with 60s cache, falls back to static definitions
const nodes = await fetchNodeDefinitions(orgId, authHeaders);
// Search with full-text matching
const results = await searchNodeDefinitions("embedding", orgId);
8. Persistence Design
DynamoDB Tables
DAGY_NODE_DEFINITIONS — Stores custom node definitions.
| Attribute | Type | Key |
|---|---|---|
node_type | String | Partition Key |
org_id | String | GSI partition |
category | String | GSI partition |
label | String | — |
description | String | — |
icon | String | — |
color | String | — |
version | String | — |
author | String | — |
official | Boolean | — |
tags | List | — |
connectors_json | String | JSON-serialized connectors |
config_schema_json | String | JSON-serialized config schema |
import_path | String | Python import path |
enabled | Boolean | — |
deprecated | Boolean | — |
created_at | String | ISO timestamp |
updated_at | String | ISO timestamp |
GSIs:
gsi_org_node_defs— Query by org_id, sorted by updated_atgsi_category_node_defs— Query by category, sorted by category_sortgsi_org_category_node_defs— Query by org_id, sorted by category_sort
DAGY_NODE_VERSIONS — Stores version history for custom nodes.
| Attribute | Type | Key |
|---|---|---|
node_type | String | Partition Key |
version | String | Sort Key |
org_id | String | — |
definition_snapshot | String | Full JSON snapshot |
change_summary | String | — |
created_by | String | — |
created_at | String | ISO timestamp |
PynamoDB Models
class NodeDefinitionModel(Model):
class Meta:
table_name = os.environ.get("DAGY_NODE_DEFINITIONS_TABLE", "DAGY_NODE_DEFINITIONS")
region = os.environ.get("AWS_REGION", "us-east-1")
node_type = UnicodeAttribute(hash_key=True)
org_id = UnicodeAttribute()
# ... (see src/dagy_api/nodes/models.py for full definition)
9. Frontend Node Model
Simplified Task-Only Design
The frontend Flow Builder presents a single universal Task node type for maximum flexibility and simplicity. All flow logic is expressed through generic Task nodes with configurable code execution:
| Field | Type | Purpose |
|---|---|---|
code | textarea | JavaScript or Python code to execute in the task |
importPath | string | Optional module/import path for dependencies |
retries | integer | Number of retry attempts on failure (0–10) |
retryDelaySeconds | integer | Delay between retry attempts (seconds) |
timeoutSeconds | integer | Task execution timeout (seconds) |
concurrencyLimit | integer | Max parallel executions (0 = unlimited) |
Connectors: All Task nodes have two default connectors:
- Input (inbound, top) — Accepts any data type, unlimited connections
- Output (outbound, bottom) — Produces any data type, unlimited connections
Node Definition Catalog (node-definitions.ts): Now ~110 lines with a single Task entry. The getNodeTypeByType() function falls back to Task for any unknown type, ensuring forward compatibility.
Backend Node Framework (Reference)
The backend Python SDK (src/dagy/nodes/) supports custom node types organized by category. These are available for direct Python development and API registration, but are not exposed in the frontend UI:
- Ingestion — S3, Postgres, HTTP, Kafka, BigQuery readers
- Transform — Schema validation, data normalization, pandas, SQL, deduplication
- LLM — OpenAI embeddings, GPT-4 classification, Claude summarization, LLM routing
- Vector DB — Pinecone, Weaviate, Chroma integrations
- Notifications — Slack alerts, email, webhooks, S3 writes
- Custom Code — Python functions, shell commands, Docker containers
- Control Flow — Conditional branches, merge, delay/wait
These 28 node types remain available in the backend for:
- Direct Python SDK usage (developers can subclass them)
- API-based registration and discovery
- Backend execution and flow validation
However, they are not rendered in the frontend UI. Instead, developers configure Task nodes with the appropriate code logic to replicate their functionality.
10. Frontend Architecture
Component Hierarchy
FlowBuilder
├── FlowConfigPanel (flow-level: name, version, executor, environment)
├── Toolbar (undo/redo, save, deploy, zoom)
├── NodeLibraryPanel (single "Task" entry, drag source)
├── ReactFlow Canvas
│ ├── FlowNodeComponent (per node)
│ │ ├── ConnectorHandle[] (dynamic inbound/outbound handles)
│ │ └── ConnectorTooltip (hover info)
│ ├── MiniMap
│ └── Controls
├── NodeConfigPanel (selected Task node configuration)
│ ├── Config tab (retries, timeout, concurrency, importPath)
│ └── Code tab (code editor for task logic)
├── ValidationPanel (error/warning list)
└── DeployDialog (flow name, version, environment)
Key Modules
| Module | Purpose |
|---|---|
node-definitions.ts | Static Task node definition (~110 lines); getNodeTypeByType() falls back to Task for unknown types |
node-registry-client.ts | Backend API client with caching (optional, for custom nodes if needed) |
connection-validator.ts | Frontend connection validation logic |
flow-node.tsx | Custom React Flow node with dynamic connectors |
serializer.ts | Canvas ↔ FlowSpec bidirectional conversion; guessNodeType() always returns Task; flowSpecToCanvas() maps all tasks to generic Task visual |
validation.ts | DAG validation (cycles, orphans, required fields, connectors) |
types.ts | TypeScript type definitions |
flow-config-panel.tsx | Flow-level configuration (name, version, executor, environment) |
Connection Validation Flow (Frontend)
User drags edge from source handle to target handle
→ onConnect fires in flow-builder.tsx
→ validatePendingConnection() called
→ Finds source/target connectors by handle ID
→ Checks direction (outbound → inbound)
→ Checks data type compatibility
→ Checks cardinality limits
→ If valid: edge is created with connector metadata
→ If invalid: edge is rejected, console warning logged
Frontend Serialization (Task-Only Model)
The serializer.ts module handles conversion between the visual canvas and flow specifications:
guessNodeType(): Always returns the Task node type, regardless of input. This ensures all nodes—whether loaded from a backend spec or newly created—are represented as Task nodes in the frontend.
flowSpecToCanvas(): Maps all loaded tasks from a flow specification to the generic Task visual representation. Each node is instantiated with:
- Default Input and Output connectors (any type, zero_or_many cardinality)
- Config fields populated from the task's metadata (code, importPath, retries, etc.)
- Visual positioning preserved from the specification
Round-trip fidelity: When a user saves a flow, Task node configurations are serialized back to the flow spec, preserving all runtime parameters. The backend then executes these Task nodes using its extensible execution framework.
11. File Structure
src/dagy/nodes/
├── __init__.py # Public exports
├── base.py # FlowNode abstract base class
├── connectors.py # ConnectorDef, DataType, validation
├── metadata.py # NodeMetadata, NodeCategory, NodeConfigField
├── execution.py # ExecutionContext, ExecutionResult, NodeState
├── exceptions.py # Exception hierarchy
├── registry.py # NodeRegistry singleton
└── builtin/
├── __init__.py # Imports all modules to trigger registration
├── ingestion.py # S3, Postgres, HTTP, Kafka, BigQuery
├── transform.py # Validator, Normalizer, Pandas, SQL, Dedup
├── llm.py # OpenAI, GPT-4, Claude, Router
├── vectordb.py # Pinecone, Weaviate, Chroma
├── notification.py # Slack, Email, Webhook, S3 Write
└── control_flow.py # Branch, Merge, Wait, Python, Shell, Docker
src/dagy_api/nodes/
├── __init__.py
├── models.py # PynamoDB DynamoDB models
└── router.py # FastAPI router with all endpoints
web/src/components/flow-builder/
├── index.ts # Public exports
├── types.ts # TypeScript type definitions
├── node-definitions.ts # Single Task node definition (~110 lines)
├── node-registry-client.ts # Backend API client (optional for custom nodes)
├── connection-validator.ts # Frontend connection validation
├── flow-node.tsx # React Flow custom node component
├── flow-builder.tsx # Main canvas component
├── node-panel.tsx # Node sidebar (single Task entry)
├── node-config-panel.tsx # Task node configuration (Config + Code tabs)
├── flow-config-panel.tsx # Flow-level configuration (name, version, executor, environment)
├── serializer.ts # Canvas ↔ FlowSpec conversion (guessNodeType → Task)
├── validation.ts # DAG validation
├── validation-panel.tsx # Validation error display
├── deploy-dialog.tsx # Deployment dialog
└── toolbar.tsx # Canvas toolbar
infrastructure/
└── dagy_stack.py # CDK: DAGY_NODE_DEFINITIONS, DAGY_NODE_VERSIONS tables