Back to docs
Architecture

Dagy Flow Node Architecture

---

Comprehensive reference for the flow node framework — design, lifecycle, connectors, extensibility, APIs, and persistence.


1. Framework Design Overview

The Dagy Flow Node Architecture provides a unified, type-safe framework for defining, registering, executing, and composing workflow nodes. It spans three layers:

Python SDK (src/dagy/nodes/) — The core framework where nodes are defined as Python classes extending FlowNode. Each node declares its metadata, typed connectors, configuration schema, and async execution logic.

FastAPI Backend (src/dagy_api/nodes/) — REST API for node registration, discovery, and connection validation. Custom nodes are persisted in DynamoDB and served alongside built-in nodes.

Next.js Frontend (web/src/components/flow-builder/) — The visual Flow Designer where users drag-drop generic Task nodes, configure code execution, and connect them via typed connectors with real-time validation. Task nodes are universally applicable and execute arbitrary code; the backend framework supports custom node types but the frontend presents only the Task abstraction.

Design Principles

  • Convention over configuration: Nodes auto-register via __init_subclass__ — just subclass FlowNode and your node is discoverable.
  • Type safety end-to-end: Connector data types are enforced at connection time (both frontend and backend), preventing invalid pipelines.
  • Backward compatibility: Legacy node definitions without connectors are automatically wrapped with default input/output connectors.
  • Separation of concerns: Node definition (metadata + connectors + schema) is fully separate from execution logic.

2. Node Lifecycle

A flow node progresses through a well-defined lifecycle during execution:

PENDING → QUEUED → RUNNING → COMPLETED
                          ↘ FAILED → RETRYING → RUNNING
                          ↘ TIMED_OUT
                          ↘ CANCELLED
                          ↘ SKIPPED

Lifecycle Hooks

The FlowNode.run() method orchestrates the full lifecycle:

async def run(self, context: ExecutionContext) -> ExecutionResult:
    # 1. Validate configuration
    errors = await self.validate_config()
    if errors:
        return ExecutionResult.from_error(...)

    # 2. Pre-execution hook (setup, resource allocation)
    await self.pre_execute(context)

    # 3. Execute the node's core logic
    result = await self.execute(context)

    # 4. Post-execution hook (cleanup, metric recording)
    result = await self.post_execute(context, result)

    return result

If an exception occurs during execution, the on_error() hook is called, which can return a recovery ExecutionResult or None to propagate the error.

State Transitions

StateDescription
PENDINGNode is defined but not yet scheduled
QUEUEDNode is waiting for upstream dependencies
RUNNINGNode is actively executing
COMPLETEDExecution finished successfully
FAILEDExecution failed (may trigger retry)
RETRYINGFailed execution is being retried
TIMED_OUTExecution exceeded the timeout
CANCELLEDExecution was cancelled externally
SKIPPEDNode was skipped (conditional branch)

3. Connector Model

Connectors are the typed ports through which nodes exchange data. Every node declares its connectors via the connectors() class method.

Connector Definition

@dataclass(frozen=True)
class ConnectorDef:
    id: str                          # Unique within the node (e.g., "data_in", "output")
    name: str                        # Human-readable name
    description: str                 # Tooltip text
    direction: ConnectorDirection    # INBOUND or OUTBOUND
    data_types: frozenset[DataType]  # Accepted/produced types
    cardinality: ConnectionCardinality  # How many connections allowed
    required: bool = False           # Must be connected for valid DAG
    position_hint: str | None = None # UI positioning ("top", "bottom-left", etc.)
    metadata: dict = field(default_factory=dict)

Frontend Task Nodes: All Task nodes in the frontend are created with default connectors:

  • Input (id: "input", inbound, top) — Accepts any data type, zero_or_many cardinality
  • Output (id: "output", outbound, bottom) — Produces any data type, zero_or_many cardinality

Data Types

The framework defines 15 data types with an implicit compatibility matrix:

TypeCan implicitly connect to
anyEverything (wildcard)
stringjson, document
numberstring
booleanstring, number
jsonstring, document
listjson
dataframejson, list
embeddinglist
eventjson

Direct type matches always succeed. The any type is a universal wildcard — it connects to everything.

Connection Cardinality

CardinalityMax connections
oneExactly 1
zero_or_one0 or 1
zero_or_manyUnlimited
one_or_manyAt least 1

Connection Validation

Connections are validated at three levels:

  1. Direction: Source must be outbound, target must be inbound
  2. Type compatibility: At least one source type must be compatible with at least one target type
  3. Cardinality: Neither connector may exceed its maximum connection count
from dagy.nodes.connectors import validate_connection

errors = validate_connection(
    source_connector=src_node.get_connector("data_out"),
    target_connector=tgt_node.get_connector("data_in"),
    existing_source_connections=1,
    existing_target_connections=0,
)
# errors: List[str] — empty means valid

Frontend Connector Rendering

Each connector renders as a React Flow Handle positioned dynamically based on position_hint:

  • "top", "top-left", "top-right" → Inbound handles along the top edge
  • "bottom", "bottom-left", "bottom-right" → Outbound handles along the bottom edge

Hovering a connector shows a tooltip with its name, description, data types, and required status. Multiple connectors on the same edge are spaced evenly using leftPercent = (index + 1) / (count + 1) * 100.


4. Extension Pattern: Custom Nodes

Creating a Custom Node (Python)

from dagy.nodes.base import FlowNode
from dagy.nodes.connectors import ConnectorDef, ConnectorDirection, DataType
from dagy.nodes.metadata import NodeMetadata, NodeCategory, NodeConfigField, ConfigFieldType
from dagy.nodes.execution import ExecutionContext, ExecutionResult

class MyCustomNode(FlowNode):
    """My custom data processing node."""

    @classmethod
    def metadata(cls) -> NodeMetadata:
        return NodeMetadata(
            node_type="my_custom_node",
            label="My Custom Node",
            description="Processes data in a custom way",
            category=NodeCategory.TRANSFORM,
            icon="Wand2",
            color="violet",
            official=False,
            tags=["custom", "transform"],
        )

    @classmethod
    def connectors(cls) -> list[ConnectorDef]:
        return [
            ConnectorDef(
                id="data_in",
                name="Input Data",
                description="Data to process",
                direction=ConnectorDirection.INBOUND,
                data_types=frozenset({DataType.JSON, DataType.DATAFRAME}),
                required=True,
            ),
            ConnectorDef(
                id="result_out",
                name="Result",
                description="Processed output",
                direction=ConnectorDirection.OUTBOUND,
                data_types=frozenset({DataType.JSON}),
            ),
        ]

    @classmethod
    def config_schema(cls) -> list[NodeConfigField]:
        return [
            NodeConfigField(
                name="transform_mode",
                label="Transform Mode",
                field_type=ConfigFieldType.SELECT,
                required=True,
                default="normalize",
                options=[
                    SelectOption("Normalize", "normalize"),
                    SelectOption("Aggregate", "aggregate"),
                ],
            ),
        ]

    async def execute(self, context: ExecutionContext) -> ExecutionResult:
        input_data = context.get_upstream_data("data_in")
        mode = self.config.get("transform_mode", "normalize")

        context.log_info(f"Processing with mode={mode}")
        result = {"processed": True, "mode": mode}

        return ExecutionResult.from_success({"result_out": result})

That's it. Because FlowNode uses __init_subclass__, importing this module auto-registers the node. It will appear in the node registry, the API, and the Flow Designer.

Registering via API (for non-Python nodes)

POST /nodes/registry
{
  "node_type": "my_api_node",
  "label": "My API Node",
  "description": "Registered via API",
  "category": "custom",
  "icon": "Plug",
  "color": "blue",
  "connectors": [
    {"id": "in", "name": "Input", "direction": "inbound", "data_types": ["any"]},
    {"id": "out", "name": "Output", "direction": "outbound", "data_types": ["json"]}
  ],
  "config_schema": [
    {"name": "api_url", "label": "API URL", "type": "string", "required": true}
  ]
}

5. Registration and Discovery Flow

Auto-Registration (Python)

1. Developer creates MyNode(FlowNode) in my_nodes.py
2. Python's __init_subclass__ fires → FlowNode._registered_subclasses["my_node"] = MyNode
3. On app startup: node_registry.sync_from_subclasses() picks up all registered classes
4. API serves them alongside DynamoDB-stored custom nodes

Package Discovery

from dagy.nodes.registry import node_registry

# Discover all nodes in a package
count = node_registry.discover_package("dagy.nodes.builtin")
# Walks all submodules, imports them, triggering __init_subclass__

# Discover built-in nodes specifically
count = node_registry.discover_builtin()
# Imports dagy.nodes.builtin which imports all builtin modules

Registry Singleton

The node_registry singleton provides:

  • register(node_cls) — Register a node class
  • get(node_type)Type[FlowNode] or None
  • list_all() → List of definition dicts
  • list_by_category(category) → Filtered list
  • search(query) → Full-text search across type, label, description, tags
  • create_instance(node_type, config, instance_id) → Instantiated node

6. Execution Model

ExecutionContext

Every node receives an ExecutionContext containing:

  • run_id, task_run_id — Identifiers for the current execution
  • flow_name, flow_version — Which flow is running
  • node_type, node_instance_id — Which node instance
  • attempt, max_retries — Retry tracking
  • parameters — Runtime parameters passed to the flow
  • secrets — Decrypted secrets (accessed via get_secret())
  • upstream_results — Output from upstream nodes (accessed via get_upstream_data())
  • environment, correlation_id, metadata — Additional context

ExecutionResult

@dataclass
class ExecutionResult:
    success: bool
    outputs: dict[str, Any]    # Keyed by connector ID
    error: str | None
    error_type: str | None
    state: NodeState
    elapsed_seconds: float
    logs: list[str]
    metrics: dict[str, float]
    metadata: dict[str, Any]

    @classmethod
    def from_success(cls, outputs: dict) -> "ExecutionResult": ...
    @classmethod
    def from_error(cls, error: str, error_type: str = "ExecutionError") -> "ExecutionResult": ...

Outputs are keyed by connector ID (e.g., {"data_out": [...], "metadata_out": {...}}), enabling the execution engine to route data to the correct downstream connectors.

Error Handling and Retries

The execution engine handles retries externally based on the node's retry configuration. The lifecycle:

execute() fails → on_error() called → if returns result, use it
                                     → if returns None, mark FAILED
                                     → if attempt < max_retries, mark RETRYING → re-execute

7. API Contracts

Node Registry Endpoints

MethodPathDescription
GET/nodes/registryList all nodes (builtin + custom for org)
GET/nodes/registry/builtinList built-in nodes only
GET/nodes/registry/customList org's custom nodes
GET/nodes/registry/categoriesList categories with counts
GET/nodes/registry/search?q=...Search nodes by query
GET/nodes/registry/{node_type}Get single node definition
POST/nodes/registryRegister a custom node
PUT/nodes/registry/{node_type}Update a custom node
DELETE/nodes/registry/{node_type}Delete a custom node
POST/nodes/validate-connectionValidate a connection

Request/Response Examples

List nodes:

GET /nodes/registry?org_id=my_org
→ { "items": [...], "total": 27, "builtin_count": 24, "custom_count": 3 }

Register custom node:

POST /nodes/registry
Content-Type: application/json
X-Org-Id: my_org

{
  "node_type": "custom_enricher",
  "label": "Data Enricher",
  "description": "Enriches records with external data",
  "category": "transform",
  "icon": "Sparkles",
  "color": "violet",
  "connectors": [...],
  "config_schema": [...]
}
→ { "node_type": "custom_enricher", "label": "Data Enricher", ... }

Validate connection:

POST /nodes/validate-connection
{
  "source_node_type": "s3_ingest",
  "source_connector_id": "data_out",
  "target_node_type": "schema_validator",
  "target_connector_id": "data_in",
  "existing_source_connections": 0,
  "existing_target_connections": 0
}
→ { "valid": true, "errors": [] }

Frontend Registry Client

The node-registry-client.ts provides a cached client:

import { fetchNodeDefinitions, searchNodeDefinitions } from "./node-registry-client";

// Fetches from backend with 60s cache, falls back to static definitions
const nodes = await fetchNodeDefinitions(orgId, authHeaders);

// Search with full-text matching
const results = await searchNodeDefinitions("embedding", orgId);

8. Persistence Design

DynamoDB Tables

DAGY_NODE_DEFINITIONS — Stores custom node definitions.

AttributeTypeKey
node_typeStringPartition Key
org_idStringGSI partition
categoryStringGSI partition
labelString
descriptionString
iconString
colorString
versionString
authorString
officialBoolean
tagsList
connectors_jsonStringJSON-serialized connectors
config_schema_jsonStringJSON-serialized config schema
import_pathStringPython import path
enabledBoolean
deprecatedBoolean
created_atStringISO timestamp
updated_atStringISO timestamp

GSIs:

  • gsi_org_node_defs — Query by org_id, sorted by updated_at
  • gsi_category_node_defs — Query by category, sorted by category_sort
  • gsi_org_category_node_defs — Query by org_id, sorted by category_sort

DAGY_NODE_VERSIONS — Stores version history for custom nodes.

AttributeTypeKey
node_typeStringPartition Key
versionStringSort Key
org_idString
definition_snapshotStringFull JSON snapshot
change_summaryString
created_byString
created_atStringISO timestamp

PynamoDB Models

class NodeDefinitionModel(Model):
    class Meta:
        table_name = os.environ.get("DAGY_NODE_DEFINITIONS_TABLE", "DAGY_NODE_DEFINITIONS")
        region = os.environ.get("AWS_REGION", "us-east-1")

    node_type = UnicodeAttribute(hash_key=True)
    org_id = UnicodeAttribute()
    # ... (see src/dagy_api/nodes/models.py for full definition)

9. Frontend Node Model

Simplified Task-Only Design

The frontend Flow Builder presents a single universal Task node type for maximum flexibility and simplicity. All flow logic is expressed through generic Task nodes with configurable code execution:

FieldTypePurpose
codetextareaJavaScript or Python code to execute in the task
importPathstringOptional module/import path for dependencies
retriesintegerNumber of retry attempts on failure (0–10)
retryDelaySecondsintegerDelay between retry attempts (seconds)
timeoutSecondsintegerTask execution timeout (seconds)
concurrencyLimitintegerMax parallel executions (0 = unlimited)

Connectors: All Task nodes have two default connectors:

  • Input (inbound, top) — Accepts any data type, unlimited connections
  • Output (outbound, bottom) — Produces any data type, unlimited connections

Node Definition Catalog (node-definitions.ts): Now ~110 lines with a single Task entry. The getNodeTypeByType() function falls back to Task for any unknown type, ensuring forward compatibility.

Backend Node Framework (Reference)

The backend Python SDK (src/dagy/nodes/) supports custom node types organized by category. These are available for direct Python development and API registration, but are not exposed in the frontend UI:

  • Ingestion — S3, Postgres, HTTP, Kafka, BigQuery readers
  • Transform — Schema validation, data normalization, pandas, SQL, deduplication
  • LLM — OpenAI embeddings, GPT-4 classification, Claude summarization, LLM routing
  • Vector DB — Pinecone, Weaviate, Chroma integrations
  • Notifications — Slack alerts, email, webhooks, S3 writes
  • Custom Code — Python functions, shell commands, Docker containers
  • Control Flow — Conditional branches, merge, delay/wait

These 28 node types remain available in the backend for:

  1. Direct Python SDK usage (developers can subclass them)
  2. API-based registration and discovery
  3. Backend execution and flow validation

However, they are not rendered in the frontend UI. Instead, developers configure Task nodes with the appropriate code logic to replicate their functionality.


10. Frontend Architecture

Component Hierarchy

FlowBuilder
├── FlowConfigPanel (flow-level: name, version, executor, environment)
├── Toolbar (undo/redo, save, deploy, zoom)
├── NodeLibraryPanel (single "Task" entry, drag source)
├── ReactFlow Canvas
│   ├── FlowNodeComponent (per node)
│   │   ├── ConnectorHandle[] (dynamic inbound/outbound handles)
│   │   └── ConnectorTooltip (hover info)
│   ├── MiniMap
│   └── Controls
├── NodeConfigPanel (selected Task node configuration)
│   ├── Config tab (retries, timeout, concurrency, importPath)
│   └── Code tab (code editor for task logic)
├── ValidationPanel (error/warning list)
└── DeployDialog (flow name, version, environment)

Key Modules

ModulePurpose
node-definitions.tsStatic Task node definition (~110 lines); getNodeTypeByType() falls back to Task for unknown types
node-registry-client.tsBackend API client with caching (optional, for custom nodes if needed)
connection-validator.tsFrontend connection validation logic
flow-node.tsxCustom React Flow node with dynamic connectors
serializer.tsCanvas ↔ FlowSpec bidirectional conversion; guessNodeType() always returns Task; flowSpecToCanvas() maps all tasks to generic Task visual
validation.tsDAG validation (cycles, orphans, required fields, connectors)
types.tsTypeScript type definitions
flow-config-panel.tsxFlow-level configuration (name, version, executor, environment)

Connection Validation Flow (Frontend)

User drags edge from source handle to target handle
  → onConnect fires in flow-builder.tsx
  → validatePendingConnection() called
    → Finds source/target connectors by handle ID
    → Checks direction (outbound → inbound)
    → Checks data type compatibility
    → Checks cardinality limits
  → If valid: edge is created with connector metadata
  → If invalid: edge is rejected, console warning logged

Frontend Serialization (Task-Only Model)

The serializer.ts module handles conversion between the visual canvas and flow specifications:

guessNodeType(): Always returns the Task node type, regardless of input. This ensures all nodes—whether loaded from a backend spec or newly created—are represented as Task nodes in the frontend.

flowSpecToCanvas(): Maps all loaded tasks from a flow specification to the generic Task visual representation. Each node is instantiated with:

  • Default Input and Output connectors (any type, zero_or_many cardinality)
  • Config fields populated from the task's metadata (code, importPath, retries, etc.)
  • Visual positioning preserved from the specification

Round-trip fidelity: When a user saves a flow, Task node configurations are serialized back to the flow spec, preserving all runtime parameters. The backend then executes these Task nodes using its extensible execution framework.


11. File Structure

src/dagy/nodes/
├── __init__.py              # Public exports
├── base.py                  # FlowNode abstract base class
├── connectors.py            # ConnectorDef, DataType, validation
├── metadata.py              # NodeMetadata, NodeCategory, NodeConfigField
├── execution.py             # ExecutionContext, ExecutionResult, NodeState
├── exceptions.py            # Exception hierarchy
├── registry.py              # NodeRegistry singleton
└── builtin/
    ├── __init__.py           # Imports all modules to trigger registration
    ├── ingestion.py          # S3, Postgres, HTTP, Kafka, BigQuery
    ├── transform.py          # Validator, Normalizer, Pandas, SQL, Dedup
    ├── llm.py                # OpenAI, GPT-4, Claude, Router
    ├── vectordb.py           # Pinecone, Weaviate, Chroma
    ├── notification.py       # Slack, Email, Webhook, S3 Write
    └── control_flow.py       # Branch, Merge, Wait, Python, Shell, Docker

src/dagy_api/nodes/
├── __init__.py
├── models.py                # PynamoDB DynamoDB models
└── router.py                # FastAPI router with all endpoints

web/src/components/flow-builder/
├── index.ts                 # Public exports
├── types.ts                 # TypeScript type definitions
├── node-definitions.ts      # Single Task node definition (~110 lines)
├── node-registry-client.ts  # Backend API client (optional for custom nodes)
├── connection-validator.ts  # Frontend connection validation
├── flow-node.tsx            # React Flow custom node component
├── flow-builder.tsx         # Main canvas component
├── node-panel.tsx           # Node sidebar (single Task entry)
├── node-config-panel.tsx    # Task node configuration (Config + Code tabs)
├── flow-config-panel.tsx    # Flow-level configuration (name, version, executor, environment)
├── serializer.ts            # Canvas ↔ FlowSpec conversion (guessNodeType → Task)
├── validation.ts            # DAG validation
├── validation-panel.tsx     # Validation error display
├── deploy-dialog.tsx        # Deployment dialog
└── toolbar.tsx              # Canvas toolbar

infrastructure/
└── dagy_stack.py            # CDK: DAGY_NODE_DEFINITIONS, DAGY_NODE_VERSIONS tables