Flow Node Framework — Implementation Review
**Date:** 2026-03-14 **Scope:** Full implementation audit of the FlowNode framework across Python SDK, REST API, DynamoDB persistence, and React frontend **Files reviewed:** 18 source files across 4 layers
Executive Summary
The Flow Node framework is architecturally sound with clean abstractions, a well-designed type system, and good separation of concerns across the stack. The __init_subclass__ auto-registration pattern, frozen dataclasses for immutable connector definitions, and the layered validation approach (Python → API → Frontend) are all strong design choices.
However, the review identified 27 findings across security, reliability, scalability, and completeness dimensions. The most critical gaps are: the absence of authentication/authorization on all API endpoints, no timeout enforcement during node execution, missing output validation against declared connector schemas, race conditions in the global registry, and a thread-unsafe module-level cache in the frontend client.
Findings by Priority
P0 — Critical (Must Fix Before Production)
1. API Endpoints Have No Authentication or Authorization
File: src/dagy_api/nodes/router.py
Lines: 172-178, 189, 230, 265
The _get_org_id_from_request dependency is a hardcoded stub returning "default_org". Every endpoint accepts org_id as a plain query parameter, meaning any caller can impersonate any organization. The DELETE and PUT endpoints check item.org_id != org_id but this is meaningless when the org_id is attacker-controlled.
Impact: Any unauthenticated user can read, create, modify, or delete any organization's custom nodes.
Recommendation: Wire the org_id extraction into the existing Clerk/JWT auth middleware. Use Depends(_get_org_id_from_request) and extract from verified JWT claims. Add RBAC permission checks (at minimum nodes:read, nodes:write, nodes:delete) using the existing 4-role permission model.
2. No Timeout Enforcement in Node Execution
File: src/dagy/nodes/base.py
Lines: 366-470
The run() method calls await self.execute(context) with no timeout wrapper. A misbehaving or malicious custom node can block the execution engine indefinitely. The NodeTimeoutError exception exists but is never raised by the framework.
Impact: A single hung node can block an entire execution worker, leading to resource exhaustion.
Recommendation: Wrap the execute() call with asyncio.wait_for():
import asyncio
timeout = self.config.get("_timeout_seconds", 300) # or from metadata
try:
result = await asyncio.wait_for(self.execute(context), timeout=timeout)
except asyncio.TimeoutError:
context.mark_timed_out(timeout)
raise NodeTimeoutError(f"Timed out after {timeout}s", node_type=self.get_node_type())
3. Arbitrary Code Execution via source_code Field
File: src/dagy_api/nodes/models.py line 128, src/dagy_api/nodes/router.py lines 76, 94
The RegisterNodeRequest and NodeDefinitionModel both accept a source_code field containing raw Python. If any code path ever exec()s this (e.g., during node instantiation from a custom definition), it becomes an arbitrary code execution vector.
Impact: Even if not currently executed, the field's presence creates a latent RCE risk. Any future developer connecting source_code to a runtime loader creates a critical vulnerability.
Recommendation: Either remove the source_code field entirely and require nodes to be deployed as Python packages (via import_path), or add strict sandboxing (e.g., RestrictedPython) if inline code execution is a deliberate feature. At minimum, add a prominent # SECURITY: Never exec() this field comment and validate the field length.
4. Registry Singleton Is Not Thread-Safe
File: src/dagy/nodes/registry.py
Lines: 49-53, 59-97
NodeRegistry uses plain dict for _nodes and _metadata_cache. In async/threaded execution (FastAPI with uvicorn workers), concurrent register(), discover_package(), and list_all() calls can corrupt state or produce inconsistent reads.
Impact: Race conditions during concurrent node discovery or registration, potential RuntimeError: dictionary changed size during iteration.
Recommendation: Use threading.Lock for mutations, or switch to a concurrent.futures-compatible structure. For the FastAPI hot path, consider making _get_builtin_definitions() cache its result after the first call rather than re-discovering on every request.
P1 — High (Should Fix Before GA)
5. No Output Validation Against Declared Connectors
File: src/dagy/nodes/base.py
Lines: 417-444
After execute() returns an ExecutionResult, the framework never validates that result.outputs keys match the declared outbound connector IDs, or that the output data types match the connector's data_types. A node declaring data_out: DataType.JSON can return a raw string and the framework will silently pass it downstream.
Recommendation: Add post-execution output validation in run():
declared_outputs = {c.id for c in self.__class__.outbound_connectors()}
actual_outputs = set(result.outputs.keys())
unexpected = actual_outputs - declared_outputs
if unexpected:
self._logger.warning("Node produced undeclared outputs: %s", unexpected)
6. _get_builtin_definitions() Rediscovers on Every API Call
File: src/dagy_api/nodes/router.py
Lines: 142-151
Every call to list_all_nodes, list_categories, get_node_definition, register_custom_node, and validate_connection_endpoint invokes _get_builtin_definitions(), which calls node_registry.discover_builtin() + sync_from_subclasses() + list_all(). This involves importing all builtin modules, iterating all registered nodes, and serializing them on every HTTP request.
Impact: Unnecessary CPU overhead on every API call. With 27 built-in nodes, this adds measurable latency.
Recommendation: Cache the result of _get_builtin_definitions() at module level or in a @lru_cache. Built-in definitions never change at runtime.
7. Frontend Module-Level Cache Is Not Per-Org
File: web/src/components/flow-builder/node-registry-client.ts
Lines: 23-25
cachedNodes is a module-level singleton. In a multi-tenant SaaS where different orgs have different custom nodes, switching org context (or in SSR scenarios) serves stale data from the wrong org.
Recommendation: Key the cache by org_id:
const cache = new Map<string, { nodes: NodeTypeDefinition[]; timestamp: number }>();
8. validate_connection_endpoint Duplicates Python SDK Logic
File: src/dagy_api/nodes/router.py
Lines: 421-495
The connection validation endpoint reimplements the type compatibility and cardinality checks using dict-based logic instead of delegating to the well-tested ConnectorDef and validate_connection() from the SDK. This creates two divergent validation paths that can drift.
Recommendation: Deserialize the connector dicts into ConnectorDef objects using ConnectorDef.from_dict() and call validate_connection() directly.
9. NodeDefinitionModel Lacks Conditional Writes
File: src/dagy_api/nodes/router.py
Lines: 284-347, 350-395
register_custom_node uses a check-then-write pattern: it queries for existence, then saves. Under concurrent requests, two callers could both pass the existence check and create duplicate registrations (the second silently overwrites the first since node_type is the hash key).
Similarly, update_custom_node reads then writes without a condition expression, enabling lost-update races.
Recommendation: Use DynamoDB conditional writes:
item.save(condition=NodeDefinitionModel.node_type.does_not_exist()) # for create
item.save(condition=NodeDefinitionModel.updated_at == expected_version) # for update
10. No Version History Written on Updates
File: src/dagy_api/nodes/router.py
Lines: 350-395
NodeVersionModel exists for version history, but update_custom_node never writes a version record. The version history table is effectively dead code.
Recommendation: Before updating the main record, snapshot the current state into NodeVersionModel:
NodeVersionModel(
node_type=node_type,
version=item.version,
org_id=item.org_id,
definition_snapshot=item.to_api_dict(),
created_at=now,
).save()
11. ConnectorDef.metadata Uses Mutable Default on Frozen Dataclass
File: src/dagy/nodes/connectors.py
Lines: 77, 102
ConnectorDef is frozen=True but metadata: Dict[str, Any] uses a mutable dict as the default factory. While the dataclass itself is frozen (can't reassign self.metadata), the dict's contents can still be mutated (connector.metadata["key"] = "value"), breaking the immutability contract.
Recommendation: Use types.MappingProxyType for the metadata field, or document that metadata should not be mutated after construction.
12. NodeMetadata.tags Uses Mutable List on Frozen Dataclass
File: src/dagy/nodes/metadata.py
Lines: 149, 180
Same issue as finding 11. tags: List[str] can be mutated after construction despite the frozen=True decorator.
Recommendation: Use tuple[str, ...] instead of List[str] for tags.
P2 — Medium (Should Fix Before Scale)
13. on_cancel Is Synchronous While All Other Hooks Are Async
File: src/dagy/nodes/base.py
Lines: 295-301
on_cancel is defined as def on_cancel(self, context) (sync), while pre_execute, post_execute, on_error, and validate_config are all async def. This inconsistency means a node that needs to do async cleanup on cancellation (e.g., closing database connections, aborting HTTP requests) cannot await inside on_cancel.
Recommendation: Change to async def on_cancel(self, context: ExecutionContext) -> None.
14. State Transition Validation Missing
File: src/dagy/nodes/execution.py
Lines: 174-201
State transitions are unguarded. Any code can call mark_completed() after mark_failed(), or mark_running() after mark_completed(). The NodeState enum defines 9 states but there's no state machine enforcing valid transitions.
Recommendation: Add a guard to each mark_* method:
_VALID_TRANSITIONS = {
NodeState.PENDING: {NodeState.RUNNING, NodeState.SKIPPED, NodeState.QUEUED},
NodeState.QUEUED: {NodeState.RUNNING, NodeState.CANCELLED},
NodeState.RUNNING: {NodeState.COMPLETED, NodeState.FAILED, NodeState.TIMED_OUT, NodeState.CANCELLED},
...
}
def _transition_to(self, new_state: NodeState) -> None:
if new_state not in _VALID_TRANSITIONS.get(self._state, set()):
raise ValueError(f"Invalid transition: {self._state} -> {new_state}")
self._state = new_state
15. ExecutionContext.secrets Stored in Plain Text
File: src/dagy/nodes/execution.py
Lines: 71
secrets: Dict[str, str] stores resolved secret values as plain strings in the dataclass. These appear in repr(), debugger inspections, and potentially in serialized logs.
Recommendation: Wrap secret values in a SecretStr class that masks __repr__ and __str__, similar to Pydantic's SecretStr.
16. DynamoDB GSI Uses read_capacity_units = 1 with PAY_PER_REQUEST
File: src/dagy_api/nodes/models.py
Lines: 30-33, 42-45, 54-57
The GSI Meta classes set read_capacity_units = 1 and write_capacity_units = 1, but the main table uses billing_mode = "PAY_PER_REQUEST" (on-demand). When the table is on-demand, GSI capacity settings are ignored by DynamoDB, but this inconsistency is confusing. More importantly, the CDK stack creates the table as PAY_PER_REQUEST while PynamoDB model declares explicit capacities — these should be consistent.
Recommendation: Remove read_capacity_units and write_capacity_units from GSI Meta classes to match the on-demand billing mode.
17. No Pagination on List Endpoints
File: src/dagy_api/nodes/router.py
Lines: 184-219
list_all_nodes returns all nodes in a single response. With custom nodes from many organizations, this could grow unbounded.
Recommendation: Add limit and offset (or cursor-based) pagination parameters and return a next_cursor in the response.
18. Frontend getGroupedNodeDefinitions Has Redundant Category Check
File: web/src/components/flow-builder/node-registry-client.ts
Lines: 209-216
The function checks if NODE_CATEGORIES already contains control_flow before adding it. Since NODE_CATEGORIES was already updated to include control_flow (the duplicate key fix), this conditional is dead code.
Recommendation: Remove the conditional spread. Just use categories: NODE_CATEGORIES.
19. Data Type Compatibility Is Duplicated in Three Places
Files: connectors.py:54-64, types.ts:192-202, router.py:465-481
The implicit type coercion matrix is defined independently in Python (DataType.is_compatible), TypeScript (DATA_TYPE_COMPAT), and re-implemented inline in the API validation endpoint. Any addition or change to the coercion rules requires synchronized updates in three files.
Recommendation: Make the Python implementation authoritative. The frontend should fetch the compatibility matrix from the API (add a GET /nodes/type-compatibility endpoint) rather than maintaining its own copy. At minimum, add a comment in each file pointing to the canonical definition.
P3 — Low (Nice to Have / Tech Debt)
20. Built-in Nodes Have Placeholder Execute Methods
Files: All files in src/dagy/nodes/builtin/
Every built-in node's execute() method returns placeholder data (e.g., {"_placeholder": True}). While these serve as scaffolding, having 27 nodes that all silently return fake data is risky — a developer testing flows could mistake placeholder output for real results.
Recommendation: Add a warning log: context.log_warning("Using placeholder implementation — not connected to real service") in each placeholder execute method. Alternatively, raise NotImplementedError and have the execution engine handle it gracefully.
21. FlowNode._registered_subclasses Is a Class Variable Shared Across All Subclasses
File: src/dagy/nodes/base.py
Lines: 119
_registered_subclasses: ClassVar[Dict[str, Type["FlowNode"]]] = {} is defined on the base class. Because it's a mutable class variable, it's shared across all subclasses — which is the intent, but it means any subclass can inadvertently clear it (MyNode._registered_subclasses.clear()).
Recommendation: This is a minor concern but worth noting. Consider making it a module-level _registered_subclasses dict instead of a class variable.
22. No Audit Logging on Node CRUD Operations
File: src/dagy_api/nodes/router.py
The register_custom_node, update_custom_node, and delete_custom_node endpoints only emit Python logger.info calls. There's no structured audit trail (who created/modified/deleted what, when, from what IP).
Recommendation: Integrate with the existing audit logging infrastructure mentioned in the docs (the platform already has audit logging on all mutations). Emit structured audit events with user_id, action, resource, and timestamp.
23. Missing __init__.py for src/dagy_api/nodes/
File: src/dagy_api/nodes/
The nodes directory under dagy_api may need an __init__.py for proper Python packaging. Depending on the project's use of implicit namespace packages, this could cause import issues.
Recommendation: Verify the import works without it; if not, add an empty __init__.py.
24. ConnectorTooltip Position Hardcoded to Top/Bottom Only
File: web/src/components/flow-builder/flow-node.tsx
Lines: 107-146
The tooltip only handles "top" and "bottom" positions, but ConnectorDef.position_hint supports "left" and "right" as well. Handles placed on the left/right edges will have incorrectly positioned tooltips.
Recommendation: Add "left" and "right" positioning cases to the tooltip component.
25. No from_dict on ExecutionResult or ExecutionContext
File: src/dagy/nodes/execution.py
ExecutionResult has to_dict() but no from_dict() classmethod. If results need to be deserialized (e.g., from a serialized execution log or a resumed run), there's no standard way to reconstruct the object.
Recommendation: Add from_dict() classmethods to both ExecutionResult and ExecutionContext.
26. Frontend DagEdge Extension May Not Survive React Flow Serialization
File: web/src/components/flow-builder/types.ts
Lines: 134-138
DagEdge extends Edge with sourceConnectorId and targetConnectorId as top-level properties. React Flow stores edge data in the data property, not as top-level fields. These custom fields may be silently dropped during React Flow's internal state management.
Recommendation: Move sourceConnectorId and targetConnectorId into the data property to survive React Flow's serialization/deserialization cycle:
export type DagEdge = Edge & {
data?: {
sourceConnectorId?: string;
targetConnectorId?: string;
};
};
27. NodeRegistry.search() Calls list_all() Which Serializes All Nodes
File: src/dagy/nodes/registry.py
Lines: 171-187
Every search call serializes all registered nodes into dicts just to do string matching. For a registry with many nodes, this is wasteful.
Recommendation: Search against the _metadata_cache directly instead of serializing:
def search(self, query: str) -> List[Dict[str, Any]]:
q = query.lower()
matching_types = [
nt for nt, meta in self._metadata_cache.items()
if q in f"{meta.node_type} {meta.label} {meta.description} {' '.join(meta.tags)}".lower()
]
return [self._nodes[nt].to_definition_dict() for nt in matching_types]
Requirements Gap Analysis
| Requirement | Status | Notes |
|---|---|---|
| FlowNode Abstract Base Class | ✅ Complete | Clean design with lifecycle hooks |
| Connector Model with typed data flow | ✅ Complete | 15 data types, implicit coercion, cardinality |
| Connection Validation | ✅ Complete | Python + API + Frontend triple validation |
| Execution Architecture with lifecycle | ⚠️ Partial | No timeout enforcement, no output validation |
| Node Registry with auto-discovery | ✅ Complete | __init_subclass__ + package discovery |
| DynamoDB Persistence | ✅ Complete | 2 tables, 3 GSIs, but no conditional writes |
| REST API (CRUD + validate) | ⚠️ Partial | 10 endpoints work, but no auth, no pagination |
| Frontend Integration | ✅ Complete | React Flow handles, tooltips, validation |
| RBAC / Multi-tenant Safety | ❌ Missing | No auth on any endpoint |
| Audit Logging | ❌ Missing | Only Python logger.info |
| Version History | ⚠️ Partial | Model exists, never written to |
| Error Channel Support | ✅ Complete | DataType.ERROR + error_out connectors |
| Control Flow Nodes | ✅ Complete | Conditional branch, merge, wait |
Enterprise Readiness Assessment
| Dimension | Rating | Key Gap |
|---|---|---|
| Security | 🔴 Not Ready | No authentication on any endpoint; source_code field is latent RCE |
| Auditability | 🔴 Not Ready | No structured audit trail for CRUD operations |
| Scalability | 🟡 Needs Work | No pagination, per-request rediscovery, no caching |
| Maintainability | 🟢 Good | Clean abstractions, good separation of concerns, comprehensive docs |
| Observability | 🟡 Needs Work | Execution logging exists but no metrics emission |
| Extensibility | 🟢 Good | __init_subclass__ pattern, config schema, lifecycle hooks |
| Reliability | 🟡 Needs Work | No timeout enforcement, no conditional writes, no state machine |
| Multi-tenant Safety | 🔴 Not Ready | Org isolation is cosmetic (query param, not JWT claim) |
Recommended Fix Order
- Wire authentication to all
/nodes/endpoints (P0-1) - Add timeout enforcement in
FlowNode.run()(P0-2) - Remove or sandbox
source_codefield (P0-3) - Add thread safety to NodeRegistry (P0-4)
- Cache built-in definitions in the API layer (P1-6)
- Add output validation post-execution (P1-5)
- Use conditional writes in DynamoDB (P1-9)
- Write version history on updates (P1-10)
- Key frontend cache by org_id (P1-7)
- Unify validation logic — API should delegate to SDK (P1-8)