Back to docs
Overview

Flow Node Framework — Implementation Review

**Date:** 2026-03-14 **Scope:** Full implementation audit of the FlowNode framework across Python SDK, REST API, DynamoDB persistence, and React frontend **Files reviewed:** 18 source files across 4 layers


Executive Summary

The Flow Node framework is architecturally sound with clean abstractions, a well-designed type system, and good separation of concerns across the stack. The __init_subclass__ auto-registration pattern, frozen dataclasses for immutable connector definitions, and the layered validation approach (Python → API → Frontend) are all strong design choices.

However, the review identified 27 findings across security, reliability, scalability, and completeness dimensions. The most critical gaps are: the absence of authentication/authorization on all API endpoints, no timeout enforcement during node execution, missing output validation against declared connector schemas, race conditions in the global registry, and a thread-unsafe module-level cache in the frontend client.


Findings by Priority

P0 — Critical (Must Fix Before Production)

1. API Endpoints Have No Authentication or Authorization

File: src/dagy_api/nodes/router.py Lines: 172-178, 189, 230, 265

The _get_org_id_from_request dependency is a hardcoded stub returning "default_org". Every endpoint accepts org_id as a plain query parameter, meaning any caller can impersonate any organization. The DELETE and PUT endpoints check item.org_id != org_id but this is meaningless when the org_id is attacker-controlled.

Impact: Any unauthenticated user can read, create, modify, or delete any organization's custom nodes.

Recommendation: Wire the org_id extraction into the existing Clerk/JWT auth middleware. Use Depends(_get_org_id_from_request) and extract from verified JWT claims. Add RBAC permission checks (at minimum nodes:read, nodes:write, nodes:delete) using the existing 4-role permission model.


2. No Timeout Enforcement in Node Execution

File: src/dagy/nodes/base.py Lines: 366-470

The run() method calls await self.execute(context) with no timeout wrapper. A misbehaving or malicious custom node can block the execution engine indefinitely. The NodeTimeoutError exception exists but is never raised by the framework.

Impact: A single hung node can block an entire execution worker, leading to resource exhaustion.

Recommendation: Wrap the execute() call with asyncio.wait_for():

import asyncio
timeout = self.config.get("_timeout_seconds", 300)  # or from metadata
try:
    result = await asyncio.wait_for(self.execute(context), timeout=timeout)
except asyncio.TimeoutError:
    context.mark_timed_out(timeout)
    raise NodeTimeoutError(f"Timed out after {timeout}s", node_type=self.get_node_type())

3. Arbitrary Code Execution via source_code Field

File: src/dagy_api/nodes/models.py line 128, src/dagy_api/nodes/router.py lines 76, 94

The RegisterNodeRequest and NodeDefinitionModel both accept a source_code field containing raw Python. If any code path ever exec()s this (e.g., during node instantiation from a custom definition), it becomes an arbitrary code execution vector.

Impact: Even if not currently executed, the field's presence creates a latent RCE risk. Any future developer connecting source_code to a runtime loader creates a critical vulnerability.

Recommendation: Either remove the source_code field entirely and require nodes to be deployed as Python packages (via import_path), or add strict sandboxing (e.g., RestrictedPython) if inline code execution is a deliberate feature. At minimum, add a prominent # SECURITY: Never exec() this field comment and validate the field length.


4. Registry Singleton Is Not Thread-Safe

File: src/dagy/nodes/registry.py Lines: 49-53, 59-97

NodeRegistry uses plain dict for _nodes and _metadata_cache. In async/threaded execution (FastAPI with uvicorn workers), concurrent register(), discover_package(), and list_all() calls can corrupt state or produce inconsistent reads.

Impact: Race conditions during concurrent node discovery or registration, potential RuntimeError: dictionary changed size during iteration.

Recommendation: Use threading.Lock for mutations, or switch to a concurrent.futures-compatible structure. For the FastAPI hot path, consider making _get_builtin_definitions() cache its result after the first call rather than re-discovering on every request.


P1 — High (Should Fix Before GA)

5. No Output Validation Against Declared Connectors

File: src/dagy/nodes/base.py Lines: 417-444

After execute() returns an ExecutionResult, the framework never validates that result.outputs keys match the declared outbound connector IDs, or that the output data types match the connector's data_types. A node declaring data_out: DataType.JSON can return a raw string and the framework will silently pass it downstream.

Recommendation: Add post-execution output validation in run():

declared_outputs = {c.id for c in self.__class__.outbound_connectors()}
actual_outputs = set(result.outputs.keys())
unexpected = actual_outputs - declared_outputs
if unexpected:
    self._logger.warning("Node produced undeclared outputs: %s", unexpected)

6. _get_builtin_definitions() Rediscovers on Every API Call

File: src/dagy_api/nodes/router.py Lines: 142-151

Every call to list_all_nodes, list_categories, get_node_definition, register_custom_node, and validate_connection_endpoint invokes _get_builtin_definitions(), which calls node_registry.discover_builtin() + sync_from_subclasses() + list_all(). This involves importing all builtin modules, iterating all registered nodes, and serializing them on every HTTP request.

Impact: Unnecessary CPU overhead on every API call. With 27 built-in nodes, this adds measurable latency.

Recommendation: Cache the result of _get_builtin_definitions() at module level or in a @lru_cache. Built-in definitions never change at runtime.


7. Frontend Module-Level Cache Is Not Per-Org

File: web/src/components/flow-builder/node-registry-client.ts Lines: 23-25

cachedNodes is a module-level singleton. In a multi-tenant SaaS where different orgs have different custom nodes, switching org context (or in SSR scenarios) serves stale data from the wrong org.

Recommendation: Key the cache by org_id:

const cache = new Map<string, { nodes: NodeTypeDefinition[]; timestamp: number }>();

8. validate_connection_endpoint Duplicates Python SDK Logic

File: src/dagy_api/nodes/router.py Lines: 421-495

The connection validation endpoint reimplements the type compatibility and cardinality checks using dict-based logic instead of delegating to the well-tested ConnectorDef and validate_connection() from the SDK. This creates two divergent validation paths that can drift.

Recommendation: Deserialize the connector dicts into ConnectorDef objects using ConnectorDef.from_dict() and call validate_connection() directly.


9. NodeDefinitionModel Lacks Conditional Writes

File: src/dagy_api/nodes/router.py Lines: 284-347, 350-395

register_custom_node uses a check-then-write pattern: it queries for existence, then saves. Under concurrent requests, two callers could both pass the existence check and create duplicate registrations (the second silently overwrites the first since node_type is the hash key).

Similarly, update_custom_node reads then writes without a condition expression, enabling lost-update races.

Recommendation: Use DynamoDB conditional writes:

item.save(condition=NodeDefinitionModel.node_type.does_not_exist())  # for create
item.save(condition=NodeDefinitionModel.updated_at == expected_version)  # for update

10. No Version History Written on Updates

File: src/dagy_api/nodes/router.py Lines: 350-395

NodeVersionModel exists for version history, but update_custom_node never writes a version record. The version history table is effectively dead code.

Recommendation: Before updating the main record, snapshot the current state into NodeVersionModel:

NodeVersionModel(
    node_type=node_type,
    version=item.version,
    org_id=item.org_id,
    definition_snapshot=item.to_api_dict(),
    created_at=now,
).save()

11. ConnectorDef.metadata Uses Mutable Default on Frozen Dataclass

File: src/dagy/nodes/connectors.py Lines: 77, 102

ConnectorDef is frozen=True but metadata: Dict[str, Any] uses a mutable dict as the default factory. While the dataclass itself is frozen (can't reassign self.metadata), the dict's contents can still be mutated (connector.metadata["key"] = "value"), breaking the immutability contract.

Recommendation: Use types.MappingProxyType for the metadata field, or document that metadata should not be mutated after construction.


12. NodeMetadata.tags Uses Mutable List on Frozen Dataclass

File: src/dagy/nodes/metadata.py Lines: 149, 180

Same issue as finding 11. tags: List[str] can be mutated after construction despite the frozen=True decorator.

Recommendation: Use tuple[str, ...] instead of List[str] for tags.


P2 — Medium (Should Fix Before Scale)

13. on_cancel Is Synchronous While All Other Hooks Are Async

File: src/dagy/nodes/base.py Lines: 295-301

on_cancel is defined as def on_cancel(self, context) (sync), while pre_execute, post_execute, on_error, and validate_config are all async def. This inconsistency means a node that needs to do async cleanup on cancellation (e.g., closing database connections, aborting HTTP requests) cannot await inside on_cancel.

Recommendation: Change to async def on_cancel(self, context: ExecutionContext) -> None.


14. State Transition Validation Missing

File: src/dagy/nodes/execution.py Lines: 174-201

State transitions are unguarded. Any code can call mark_completed() after mark_failed(), or mark_running() after mark_completed(). The NodeState enum defines 9 states but there's no state machine enforcing valid transitions.

Recommendation: Add a guard to each mark_* method:

_VALID_TRANSITIONS = {
    NodeState.PENDING: {NodeState.RUNNING, NodeState.SKIPPED, NodeState.QUEUED},
    NodeState.QUEUED: {NodeState.RUNNING, NodeState.CANCELLED},
    NodeState.RUNNING: {NodeState.COMPLETED, NodeState.FAILED, NodeState.TIMED_OUT, NodeState.CANCELLED},
    ...
}
def _transition_to(self, new_state: NodeState) -> None:
    if new_state not in _VALID_TRANSITIONS.get(self._state, set()):
        raise ValueError(f"Invalid transition: {self._state} -> {new_state}")
    self._state = new_state

15. ExecutionContext.secrets Stored in Plain Text

File: src/dagy/nodes/execution.py Lines: 71

secrets: Dict[str, str] stores resolved secret values as plain strings in the dataclass. These appear in repr(), debugger inspections, and potentially in serialized logs.

Recommendation: Wrap secret values in a SecretStr class that masks __repr__ and __str__, similar to Pydantic's SecretStr.


16. DynamoDB GSI Uses read_capacity_units = 1 with PAY_PER_REQUEST

File: src/dagy_api/nodes/models.py Lines: 30-33, 42-45, 54-57

The GSI Meta classes set read_capacity_units = 1 and write_capacity_units = 1, but the main table uses billing_mode = "PAY_PER_REQUEST" (on-demand). When the table is on-demand, GSI capacity settings are ignored by DynamoDB, but this inconsistency is confusing. More importantly, the CDK stack creates the table as PAY_PER_REQUEST while PynamoDB model declares explicit capacities — these should be consistent.

Recommendation: Remove read_capacity_units and write_capacity_units from GSI Meta classes to match the on-demand billing mode.


17. No Pagination on List Endpoints

File: src/dagy_api/nodes/router.py Lines: 184-219

list_all_nodes returns all nodes in a single response. With custom nodes from many organizations, this could grow unbounded.

Recommendation: Add limit and offset (or cursor-based) pagination parameters and return a next_cursor in the response.


18. Frontend getGroupedNodeDefinitions Has Redundant Category Check

File: web/src/components/flow-builder/node-registry-client.ts Lines: 209-216

The function checks if NODE_CATEGORIES already contains control_flow before adding it. Since NODE_CATEGORIES was already updated to include control_flow (the duplicate key fix), this conditional is dead code.

Recommendation: Remove the conditional spread. Just use categories: NODE_CATEGORIES.


19. Data Type Compatibility Is Duplicated in Three Places

Files: connectors.py:54-64, types.ts:192-202, router.py:465-481

The implicit type coercion matrix is defined independently in Python (DataType.is_compatible), TypeScript (DATA_TYPE_COMPAT), and re-implemented inline in the API validation endpoint. Any addition or change to the coercion rules requires synchronized updates in three files.

Recommendation: Make the Python implementation authoritative. The frontend should fetch the compatibility matrix from the API (add a GET /nodes/type-compatibility endpoint) rather than maintaining its own copy. At minimum, add a comment in each file pointing to the canonical definition.


P3 — Low (Nice to Have / Tech Debt)

20. Built-in Nodes Have Placeholder Execute Methods

Files: All files in src/dagy/nodes/builtin/

Every built-in node's execute() method returns placeholder data (e.g., {"_placeholder": True}). While these serve as scaffolding, having 27 nodes that all silently return fake data is risky — a developer testing flows could mistake placeholder output for real results.

Recommendation: Add a warning log: context.log_warning("Using placeholder implementation — not connected to real service") in each placeholder execute method. Alternatively, raise NotImplementedError and have the execution engine handle it gracefully.


21. FlowNode._registered_subclasses Is a Class Variable Shared Across All Subclasses

File: src/dagy/nodes/base.py Lines: 119

_registered_subclasses: ClassVar[Dict[str, Type["FlowNode"]]] = {} is defined on the base class. Because it's a mutable class variable, it's shared across all subclasses — which is the intent, but it means any subclass can inadvertently clear it (MyNode._registered_subclasses.clear()).

Recommendation: This is a minor concern but worth noting. Consider making it a module-level _registered_subclasses dict instead of a class variable.


22. No Audit Logging on Node CRUD Operations

File: src/dagy_api/nodes/router.py

The register_custom_node, update_custom_node, and delete_custom_node endpoints only emit Python logger.info calls. There's no structured audit trail (who created/modified/deleted what, when, from what IP).

Recommendation: Integrate with the existing audit logging infrastructure mentioned in the docs (the platform already has audit logging on all mutations). Emit structured audit events with user_id, action, resource, and timestamp.


23. Missing __init__.py for src/dagy_api/nodes/

File: src/dagy_api/nodes/

The nodes directory under dagy_api may need an __init__.py for proper Python packaging. Depending on the project's use of implicit namespace packages, this could cause import issues.

Recommendation: Verify the import works without it; if not, add an empty __init__.py.


24. ConnectorTooltip Position Hardcoded to Top/Bottom Only

File: web/src/components/flow-builder/flow-node.tsx Lines: 107-146

The tooltip only handles "top" and "bottom" positions, but ConnectorDef.position_hint supports "left" and "right" as well. Handles placed on the left/right edges will have incorrectly positioned tooltips.

Recommendation: Add "left" and "right" positioning cases to the tooltip component.


25. No from_dict on ExecutionResult or ExecutionContext

File: src/dagy/nodes/execution.py

ExecutionResult has to_dict() but no from_dict() classmethod. If results need to be deserialized (e.g., from a serialized execution log or a resumed run), there's no standard way to reconstruct the object.

Recommendation: Add from_dict() classmethods to both ExecutionResult and ExecutionContext.


26. Frontend DagEdge Extension May Not Survive React Flow Serialization

File: web/src/components/flow-builder/types.ts Lines: 134-138

DagEdge extends Edge with sourceConnectorId and targetConnectorId as top-level properties. React Flow stores edge data in the data property, not as top-level fields. These custom fields may be silently dropped during React Flow's internal state management.

Recommendation: Move sourceConnectorId and targetConnectorId into the data property to survive React Flow's serialization/deserialization cycle:

export type DagEdge = Edge & {
  data?: {
    sourceConnectorId?: string;
    targetConnectorId?: string;
  };
};

27. NodeRegistry.search() Calls list_all() Which Serializes All Nodes

File: src/dagy/nodes/registry.py Lines: 171-187

Every search call serializes all registered nodes into dicts just to do string matching. For a registry with many nodes, this is wasteful.

Recommendation: Search against the _metadata_cache directly instead of serializing:

def search(self, query: str) -> List[Dict[str, Any]]:
    q = query.lower()
    matching_types = [
        nt for nt, meta in self._metadata_cache.items()
        if q in f"{meta.node_type} {meta.label} {meta.description} {' '.join(meta.tags)}".lower()
    ]
    return [self._nodes[nt].to_definition_dict() for nt in matching_types]

Requirements Gap Analysis

RequirementStatusNotes
FlowNode Abstract Base Class✅ CompleteClean design with lifecycle hooks
Connector Model with typed data flow✅ Complete15 data types, implicit coercion, cardinality
Connection Validation✅ CompletePython + API + Frontend triple validation
Execution Architecture with lifecycle⚠️ PartialNo timeout enforcement, no output validation
Node Registry with auto-discovery✅ Complete__init_subclass__ + package discovery
DynamoDB Persistence✅ Complete2 tables, 3 GSIs, but no conditional writes
REST API (CRUD + validate)⚠️ Partial10 endpoints work, but no auth, no pagination
Frontend Integration✅ CompleteReact Flow handles, tooltips, validation
RBAC / Multi-tenant Safety❌ MissingNo auth on any endpoint
Audit Logging❌ MissingOnly Python logger.info
Version History⚠️ PartialModel exists, never written to
Error Channel Support✅ CompleteDataType.ERROR + error_out connectors
Control Flow Nodes✅ CompleteConditional branch, merge, wait

Enterprise Readiness Assessment

DimensionRatingKey Gap
Security🔴 Not ReadyNo authentication on any endpoint; source_code field is latent RCE
Auditability🔴 Not ReadyNo structured audit trail for CRUD operations
Scalability🟡 Needs WorkNo pagination, per-request rediscovery, no caching
Maintainability🟢 GoodClean abstractions, good separation of concerns, comprehensive docs
Observability🟡 Needs WorkExecution logging exists but no metrics emission
Extensibility🟢 Good__init_subclass__ pattern, config schema, lifecycle hooks
Reliability🟡 Needs WorkNo timeout enforcement, no conditional writes, no state machine
Multi-tenant Safety🔴 Not ReadyOrg isolation is cosmetic (query param, not JWT claim)

Recommended Fix Order

  1. Wire authentication to all /nodes/ endpoints (P0-1)
  2. Add timeout enforcement in FlowNode.run() (P0-2)
  3. Remove or sandbox source_code field (P0-3)
  4. Add thread safety to NodeRegistry (P0-4)
  5. Cache built-in definitions in the API layer (P1-6)
  6. Add output validation post-execution (P1-5)
  7. Use conditional writes in DynamoDB (P1-9)
  8. Write version history on updates (P1-10)
  9. Key frontend cache by org_id (P1-7)
  10. Unify validation logic — API should delegate to SDK (P1-8)