Back to docs
Architecture

ECS Fargate Execution Framework Architecture

The Dagy ECS Fargate execution framework enables distributed execution of DAG workloads on AWS ECS Fargate, suitable for long-running workloads that exceed Lambda's 15-minute execution limit or require more than 10 GB of memory. The framework follows a control-plane pattern where Lambda functions coordinate DAG launches while containerized ECS Fargate tasks execute the actual work.

Overview

The Dagy ECS Fargate execution framework enables distributed execution of DAG workloads on AWS ECS Fargate, suitable for long-running workloads that exceed Lambda's 15-minute execution limit or require more than 10 GB of memory. The framework follows a control-plane pattern where Lambda functions coordinate DAG launches while containerized ECS Fargate tasks execute the actual work.

The architecture is built on three core components:

  1. Control Plane (Lambda): The DagLauncher class in src/dagy_api/ecs/launcher.py accepts launch requests, resolves dependencies, creates run records, and submits work to ECS.
  2. Data Plane (ECS Fargate): The dagy-worker container running src/dagy_api/ecs/worker_main.py executes DAGs, manages artifacts, and reports status back to DynamoDB.
  3. Status Reconciler (Lambda): The reconcile_ecs_runs() function in src/dagy_api/ecs/reconciler.py periodically reconciles stale task statuses.

Execution Framework Design

Control-Plane Pattern

The ECS execution framework implements a Lambda-based control plane that decouples task submission from task execution. This pattern provides several advantages over direct execution:

The control plane is stateless and horizontally scalable. Multiple Lambda instances can handle simultaneous launch requests without coordination. The DagLauncher class in src/dagy_api/ecs/launcher.py is instantiated fresh for each request, reading cluster configuration from environment variables set by the CDK stack.

Task submission is asynchronous and resilient. Once a task is submitted to ECS, the control plane immediately returns a launch result containing the task ARN. If the ECS worker crashes or encounters infrastructure failures, the separate status reconciler can detect and handle recovery.

The control plane is responsible for resolving all static configuration before task submission. It queries DynamoDB to resolve the flow definition, validates artifacts exist in S3, resolves dependency packages, and creates the initial run record. This ensures the ECS worker only needs to download and execute, without needing to validate the configuration.

ECS Fargate Runtime Model

ECS Fargate is a serverless container compute service that manages the underlying EC2 infrastructure. Dagy uses Fargate with the awsvpc network mode, which attaches each task to a VPC subnet with an ENI. This provides:

Fine-grained security control through security groups. Each Dagy task can be isolated to specific security groups with restricted ingress/egress rules.

Predictable networking. Tasks can be deployed to private subnets with no public IP, accessing S3 and DynamoDB through VPC endpoints.

Flexible resource sizing. Fargate supports a matrix of vCPU and memory combinations from 256 CPU units / 512 MB up to 16384 CPU units / 122 GB.

Ephemeral storage. Each task gets 20-200 GiB of ephemeral storage at /tmp, suitable for intermediate artifacts and unpacked dependencies.

The CDK stack in infrastructure/dagy_stack.py provisions the ECS cluster with both Fargate and Fargate Spot capacity providers. Fargate Spot offers up to 70% cost savings at the risk of interruption with a 2-minute notice period, making it ideal for batch workloads that can tolerate occasional failures and reruns.

Lambda-to-ECS Launch Flow

The launch flow proceeds through these steps:

First, the control plane validates the launch request and resolves required resources. The DagLauncher.launch() method in src/dagy_api/ecs/launcher.py accepts a DagLaunchRequest containing the flow name, version, parameters, and optional resource overrides.

Second, the flow definition is resolved from DynamoDB via the _resolve_flow() method. For "latest" version requests, a query returns the most recent version scoped to the org_id. The method validates that the flow has an artifact_s3_uri pointing to a packaged artifact in S3.

Third, dependency packages are resolved via _resolve_dep_packages(). For named deployments, the method queries the DeploymentModel to extract dep_package_slugs, then queries DepPackageModel to map those slugs to S3 URIs. This enables different deployments to use different sets of dependencies.

Fourth, a run record is created in DynamoDB with status QUEUED. The run_id is generated as a UUID. Additional metadata like run_slug, trigger_source, and triggered_by are stored for observability. A FlowSpec is built from the artifact's flow_spec.json and serialized.

Fifth, the EcsBackend.submit_run() method in src/dagy_api/backends/ecs.py takes over. It resolves resource sizing based on workload profile or explicit overrides, registers an ECS task definition, and calls ecs.run_task() with the cluster ARN.

Sixth, the ECS control plane provisions and launches the task. The task enters PROVISIONING, then PENDING, then RUNNING status as resources are allocated and the container image is pulled.

Seventh, the ECS worker container starts and begins execution. It updates the run record to RUNNING with started_at timestamp.

Eighth, the control plane returns immediately with a DagLaunchResult containing success, run_id, and task_arn for caller tracking.

DAG Execution Lifecycle

The worker implements a structured lifecycle for each DAG execution, with clear phase transitions and error handling.

Startup Phase: The worker begins in main() of src/dagy_api/ecs/worker_main.py. It sets up structured JSON logging via StructuredFormatter, parses configuration from environment variables via WorkerConfig.from_env(), and validates required inputs. If validation fails (missing run_id, artifact URI, etc.), the worker exits with code EXIT_INVALID_INPUT (2).

Infrastructure Phase: The worker initializes the DynamoDB repository and updates the run status to RUNNING with the current timestamp. This signals to clients and the reconciler that the task has started. If DynamoDB is unreachable, the worker exits with code EXIT_INFRA_ERROR (3).

Artifact Download Phase: The worker downloads the flow artifact from S3 using _download_artifact(). The artifact is a ZIP file containing flow_spec.json, metadata.json, and Python source files. The ZIP is extracted to a workspace directory with safe path validation to prevent zip-slip vulnerabilities (CVE-2007-4559).

Dependency Install Phase: The worker downloads and installs dependency packages via _install_dep_packages(). Each package URI is downloaded from S3, extracted, and any requirements.txt is processed with pip to install packages into a site-packages directory. The site-packages are added to sys.path for import resolution.

FlowSpec Load Phase: The worker loads the FlowSpec from flow_spec.json via _load_flow_spec(). It deserializes task definitions and task run specifications from JSON. It also attempts to load the Python source module and flow object for dynamic rebuild support.

DAG Execution Phase: The worker executes the DAG via _execute_dag() using the LocalExecutor with ThreadPoolExecutor for parallelism. Task runs are scheduled as dependencies become available. For each task run, the worker resolves the task callable, invokes the executor, and stores results in DynamoDB task_runs table. If any task fails and has no retries, or if a dependency of a task has failed, the downstream task is skipped.

Status Reporting Phase: Upon successful completion, the worker updates the run status to SUCCEEDED with completed_at timestamp and records usage metrics. Upon failure, it updates the status to FAILED with an error_message (truncated to 2048 chars) and uploads the full exception traceback to S3 via _capture_exception_to_s3().

Cleanup Phase: The workspace directory is removed in the finally block, ensuring temporary files don't accumulate.

Exit codes communicate the failure mode: 0 for success, 1 for execution failures (task errors), 2 for invalid input, 3 for infrastructure errors.

Task Definition Model and Multi-Tenancy

ECS task definitions are registered dynamically for each run in _register_task_definition() of src/dagy_api/backends/ecs.py. The task definition specifies:

The container image: Always the dagy-worker image pushed to ECR. Tags allow for rollout control (dagy-worker:latest vs dagy-worker:v1.2.3).

Environment variables: All runtime configuration is passed as environment variables, including run_id, flow_name, artifact_s3_uri, org_id, and parameters. This supports the worker's stateless design.

IAM roles: The task execution role (ecs-exec-role) is used by ECS to pull the image and inject secrets. The task role (ecs-task-role) is assumed by the container for S3 and DynamoDB access.

Resource limits: cpu and memory are specified per Fargate compatibility matrix (e.g., 1024 CPU / 2048 MB). Ephemeral storage can be overridden (21-200 GiB).

Logging configuration: awslogs driver sends stdout/stderr to CloudWatch Logs under /ecs/dagy-worker/{run_id}.

Secrets injection: Optional secret references via valueFrom point to Secrets Manager ARNs for injecting sensitive data without exposing in logs.

Multi-tenancy is enforced through org_id isolation. Each run is tagged with its org_id, which is passed to the worker via environment variable. The worker uses org_id as a partition key for DynamoDB queries and S3 object paths. The ECS worker runs with fine-grained IAM permissions that limit S3 access to specific org_id prefixes, preventing cross-tenant data leakage.

IAM Model and Role Separation

The framework uses three distinct IAM roles for proper privilege isolation:

DAG Launcher Role (dag-launcher-role)

This role is assumed by Lambda functions that invoke DagLauncher. It has limited permissions:

  • ecs:DescribeTaskDefinition, ecs:RegisterTaskDefinition, ecs:RunTask (to launch tasks)
  • dynamodb:PutItem, dynamodb:GetItem, dynamodb:UpdateItem (to create and update run records)
  • s3:GetObject (to read flow artifacts and validate S3 URIs)
  • logs:CreateLogStream, logs:PutLogEvents (to write structured logs)

The launcher role does NOT have permissions to assume other roles or modify IAM policies. This prevents privilege escalation.

ECS Task Execution Role (ecs-exec-role)

This role is used by ECS itself (not the container) to perform actions on behalf of the task:

  • ecr:GetAuthorizationToken, ecr:BatchGetImage, ecr:GetDownloadUrlForLayer (to pull the dagy-worker image)
  • logs:CreateLogGroup, logs:CreateLogStream, logs:PutLogEvents (to emit logs to CloudWatch Logs)
  • secretsmanager:GetSecretValue (to inject secrets at runtime)

The execution role is scoped to the specific log group /ecs/dagy-worker and the ECR repository containing dagy-worker images.

ECS Task Runtime Role (ecs-task-role)

This role is assumed by the container process (worker_main.py) and has permissions to access data:

  • s3:GetObject (for org_id/{run_id}/* prefix to download artifacts and dependency packages)
  • s3:PutObject (for org_id/{run_id}/* prefix to upload exception traces)
  • dynamodb:Query, dynamodb:GetItem, dynamodb:PutItem, dynamodb:UpdateItem (to read flows, create/update run records and task runs)
  • cloudwatch:PutMetricData (optional, for custom metrics)

The role uses resource conditions to scope S3 access to objects with the org_id prefix, enforcing tenant isolation at the IAM level.

Network Design

VPC and Subnet Configuration

The ECS cluster is deployed to a VPC specified in CDK configuration. For production deployments, private subnets are recommended to prevent accidental public exposure. The dagy-worker tasks are launched in awsvpc mode, which creates an elastic network interface (ENI) in the specified subnet.

Security groups control inbound and outbound traffic. For ECS tasks, a typical configuration allows:

  • No inbound traffic (tasks are pull-based, not exposed as services)
  • Outbound HTTPS to S3, DynamoDB, and CloudWatch Logs (either via NAT Gateway or VPC endpoints)
  • Outbound to Secrets Manager and ECR if using secrets injection or custom images

Service Discovery and Load Balancing

Individual ECS tasks are not discoverable via service discovery because they are ephemeral compute resources. They do not listen on network ports for incoming requests. Instead, they pull work from DynamoDB and push status updates back to DynamoDB.

If external clients need to trigger DAG launches, they invoke Lambda via API Gateway, which invokes DagLauncher. The launcher immediately returns the run_id and task_arn for polling.

Egress and S3/DynamoDB Access

Tasks access AWS services (S3, DynamoDB, CloudWatch Logs, Secrets Manager, and ECR) over HTTPS using VPC endpoints (for data plane services like S3 and DynamoDB) and service endpoints (for control plane services like ECR and Secrets Manager).

If the VPC has no internet gateway and tasks are in private subnets, VPC endpoints must be configured for:

  • s3 (gateway or interface endpoint)
  • dynamodb (gateway endpoint only)
  • logs (interface endpoint)
  • secretsmanager (interface endpoint)
  • ecr.api and ecr.dkr (interface endpoints)

This enables secure, low-latency access without egress through a NAT Gateway, reducing costs and latency.

Configuration and Secrets Model

Environment Variables

The worker receives configuration entirely via environment variables set by the control plane. This list includes:

Required identifiers:

  • DAGY_RUN_ID: UUID of the execution
  • DAGY_FLOW_NAME: Name of the flow to execute
  • DAGY_FLOW_VERSION: Version of the flow (e.g., "1.0.0" or "latest")
  • DAGY_ARTIFACT_S3_URI: Full S3 URI (s3://bucket/key) of the flow artifact

Tenant isolation:

  • DAGY_ORG_ID: Organization/tenant ID for multi-tenancy

Runtime parameters:

  • DAGY_PARAMETERS: JSON-encoded dictionary of flow parameters

Observability:

  • DAGY_CORRELATION_ID: Request ID for distributed tracing
  • DAGY_ENVIRONMENT: Environment name (develop/staging/production)
  • DAGY_RUN_SLUG: Human-readable slug (flow-name-short-uuid)
  • DAGY_DEPLOYMENT_NAME: Deployment context

Dependencies:

  • DAGY_DEP_PACKAGE_URIS: Comma-separated S3 URIs of dependency packages

Execution tuning:

  • DAGY_MAX_WORKERS: Number of concurrent task threads (default 4)
  • DAGY_LOG_LEVEL: Logging verbosity (default INFO)

AWS resources:

  • DAGY_ARTIFACTS_BUCKET: S3 bucket for run artifacts
  • DAGY_TRACES_BUCKET: S3 bucket for exception traces
  • DAGY_FLOWS, DAGY_RUNS, DAGY_TASK_RUNS, DAGY_DEPLOYMENTS, DAGY_DEP_PACKAGES: DynamoDB table names

Secrets:

  • DAGY_SECRETS_KEY: Encryption key for Dagy-managed secrets (optional)

Secrets Manager Injection

Sensitive runtime configuration (database passwords, API keys) can be injected from AWS Secrets Manager without exposing in environment variables or logs. The control plane can specify secrets_env mapping:

{
    "DB_PASSWORD": "arn:aws:secretsmanager:us-east-1:123456789:secret:prod/db-password",
    "API_KEY": "arn:aws:secretsmanager:us-east-1:123456789:secret:prod/api-key"
}

These are passed to ECS as container secrets (valueFrom references), which ECS injects at container startup. The container receives the secret values in environment variables without them appearing in logs or task definition history.

The ECS task execution role must have secretsmanager:GetSecretValue permission on the ARNs.

Runtime Overrides

The DagLaunchRequest supports per-run resource overrides:

  • workload_profile: Named preset (small/medium/large/xlarge/heavy/max)
  • cpu: Explicit CPU (256, 512, 1024, ..., 16384)
  • memory: Explicit memory (512, 1024, ..., 122880)
  • ephemeral_storage_gib: Storage (21-200)
  • extra_env_vars: Additional environment variables

The backend validates that cpu/memory combinations are valid for Fargate and applies them to the task definition.

State and Artifact Persistence Model

DynamoDB Tables

The framework uses five DynamoDB tables for state storage:

DAGY_FLOWS: Stores flow definitions with schema:

  • Partition key: flow_name
  • Sort key: flow_version
  • Attributes: spec (JSON), artifact_s3_uri, org_id, created_at
  • Indexes: org_id_index for multi-tenant queries

DAGY_RUNS: Stores execution records with schema:

  • Partition key: run_id
  • Attributes: org_id, flow_name, flow_version, status, parameters, created_at, started_at, completed_at, error_message, external_id (task ARN)
  • Indexes: flow_name_index, status_runs_index (for reconciler queries)

DAGY_TASK_RUNS: Stores per-task execution records with schema:

  • Partition key: run_id
  • Sort key: task_run_id
  • Attributes: task_name, status, attempt, max_retries, start_time, end_time, error, executor

DAGY_DEPLOYMENTS: Stores deployment metadata with schema:

  • Partition key: deployment_name
  • Attributes: dep_package_slugs, environment, created_at

DAGY_DEP_PACKAGES: Stores dependency package references with schema:

  • Partition key: slug
  • Attributes: package_s3_uri, org_id, version, created_at

S3 Artifacts and Traces

Artifacts are stored in DAGY_ARTIFACT_BUCKET under org_id/{run_id}/artifact.zip. The artifact is a ZIP file containing:

  • flow_spec.json: Serialized FlowSpec
  • metadata.json: Flow metadata (entrypoint, etc.)
  • *.py: Python source files
  • requirements.txt (optional): Dependencies

Exception traces are stored in DAGY_TRACES_BUCKET under exceptions/{org_id}/{run_id}/traceback.txt when a DAG execution fails.

Write Patterns

The control plane performs writes immediately upon launch request:

  1. PutItem into DAGY_RUNS with status QUEUED
  2. RunTask on ECS (asynchronous)
  3. Return run_id to caller

The worker performs writes as execution progresses:

  1. UpdateItem on DAGY_RUNS to status RUNNING with started_at
  2. PutItem into DAGY_TASK_RUNS for each task (as they start and complete)
  3. UpdateItem on DAGY_RUNS to status SUCCEEDED/FAILED with completed_at

The reconciler performs conditional writes:

  1. Query DAGY_RUNS for status=RUNNING and executor=ecs
  2. For each run, describe the ECS task
  3. If task is STOPPED, UpdateItem on DAGY_RUNS with new status

This pattern ensures the control plane never blocks on I/O; the worker updates state asynchronously. The reconciler periodically detects any stale states and corrects them.

Observability Model

Structured JSON Logging

All logs are emitted as single-line JSON objects for easy parsing by CloudWatch Insights. The StructuredFormatter in src/dagy_api/ecs/worker_main.py emits logs with fields:

{
  "timestamp": "2024-03-14T10:30:45.123Z",
  "level": "INFO",
  "logger": "dagy.ecs.worker",
  "message": "DAG execution starting",
  "event_type": "dag_execution_start",
  "run_id": "550e8400-e29b-41d4-a716-446655440000",
  "flow_name": "etl_pipeline",
  "org_id": "org-123",
  "correlation_id": "req-abc-123",
  "task_count": 5,
  "task_list": ["extract", "transform", "load"]
}

Custom event_type fields enable filtering and alerting on specific lifecycle events:

  • worker_start: Worker container initialization
  • validation_error: Configuration validation failure
  • artifact_download_start/complete: S3 artifact operations
  • dep_install_start/complete: Dependency package installation
  • dag_execution_start: DAG execution beginning
  • task_start/end: Individual task lifecycle
  • worker_complete/failed: Execution completion
  • exception_captured: Exception trace upload

CloudWatch Integration

Logs are collected in /ecs/dagy-worker CloudWatch log group. A log stream is created per run_id for easy isolation. The control plane can stream logs using:

logs.describe_log_streams(logGroupName='/ecs/dagy-worker', logStreamNamePrefix=run_id)
logs.get_log_events(logGroupName='...', logStreamName='...')

CloudWatch Insights queries can correlate logs across runs:

fields @timestamp, run_id, event_type, duration_seconds
| filter status = "FAILED"
| stats sum(duration_seconds) as total_time by flow_name

Correlation IDs and Distributed Tracing

Every launch request receives a correlation_id (UUID or custom value). This ID is:

  • Stored in the run record as correlation_id
  • Passed to the worker via DAGY_CORRELATION_ID environment variable
  • Included in every log line emitted by the worker
  • Propagated to task_run records for cross-run correlation

For batch launches (launch_batch), a parent correlation_id is generated, and each run gets a child correlation_id (parent:0, parent:1, etc.) for hierarchical tracing.

External systems (orchestrators, monitoring dashboards) can query by correlation_id to see the full trace of a multi-step workflow.

Metrics and Alarms

The framework records usage events and aggregates for billing and monitoring. The worker calls _record_usage() with:

  • duration_seconds: Total execution time
  • task_count: Number of tasks in the flow
  • error_count: 0 for success, 1 for failure
  • estimated_cost_usd: Computed based on duration and resource profile

Custom CloudWatch metrics can be published for:

  • RunCompleted (Count, filtered by executor=ecs)
  • RunDuration (Histogram, by flow_name)
  • RunFailureRate (Percentage, by environment)

Supported Execution Patterns

Direct Invocation

A Lambda function or SDK directly imports DagLauncher and calls launch():

from dagy_api.ecs.launcher import DagLauncher

launcher = DagLauncher()
result = launcher.launch(
    flow_name="my_etl",
    flow_version="1.0.0",
    org_id="acme-corp",
    parameters={"date": "2024-03-14"}
)
print(result.run_id, result.task_arn)

This is synchronous from the caller's perspective (returns immediately once task is submitted).

Scheduled Execution

An EventBridge rule triggers a Lambda on a schedule, which invokes DagLauncher:

Rule: "trigger-etl-every-day-at-9am"
Target: Lambda function running DagLauncher
Event: {"action": "launch", "flow_name": "etl_pipeline", "org_id": "acme-corp"}

The CDK stack can provision such rules for recurring workloads.

API-Driven Execution

An HTTP API powered by API Gateway dispatches launch requests to a Lambda handler:

POST /api/flows/{flowName}/runs
Body: {"org_id": "acme-corp", "parameters": {...}}
Response: {"run_id": "...", "task_arn": "..."}

The control plane handler in src/dag_launcher_lambda.py parses the event and delegates to DagLauncher.

Lambda-Triggered Execution

An S3 event (object created), SQS event, or SNS message triggers a Lambda that invokes DagLauncher in response. This enables reactive workload submission.

Batch and Partitioned Execution

For fan-out patterns, launcher.launch_batch() submits multiple independent runs:

results = launcher.launch_batch([
    {"flow_name": "process_shard", "parameters": {"shard": "0"}},
    {"flow_name": "process_shard", "parameters": {"shard": "1"}},
    {"flow_name": "process_shard", "parameters": {"shard": "2"}},
], org_id="acme-corp", environment="production")

Each request is processed independently and can fail without affecting siblings.

Retry and Replay Model

Idempotent Reruns

The framework supports replaying failed runs from a checkpoint. Each run_id is unique, creating a new execution record and task run record. If a run fails, the client can:

  1. Inspect the error_message and exception trace in S3
  2. Fix the underlying issue (update the flow, adjust parameters, etc.)
  3. Call launcher.launch() again with a new run (different run_id)

The new run will start from the beginning. There is no built-in checkpoint/restart within a single run.

Task-Level Retries

Individual tasks within a flow can be configured with retries in the FlowSpec. The TaskSpec supports:

  • retries: Number of times to retry on failure (default 0)
  • retry_delay_seconds: Delay between retry attempts
  • retry_jitter_factor: Randomization factor for exponential backoff

The ECS worker's LocalExecutor handles task retries. When a task fails and has retries remaining, the executor waits for retry_delay_seconds (plus jitter), then re-invokes the task. The attempt count and error are recorded in the task_run record.

If all retries are exhausted, the task fails and downstream tasks are skipped.

Checkpoint Awareness

The worker is checkpoint-aware but does not implement persistence. If a task produces an output (return value), that output is stored in memory and hydrated into the inputs of dependent tasks. If the container is OOM-killed before all tasks complete, the run is marked FAILED and the user must resubmit.

For long-running flows, checkpoint strategies can be implemented in user code:

  • Save intermediate results to S3 within the task
  • On retry or new run, load the checkpoint and skip prior tasks
  • Use idempotent operations to avoid re-processing data

Workload Profiles for Right-Sizing

The framework provides six named workload profiles to simplify resource selection:

ProfileCPUMemoryUse Case
small256 (0.25 vCPU)512 MBMinimal workloads, long-running batch jobs with low CPU
medium1024 (1 vCPU)2048 MBStandard Python scripts, data transformation
large2048 (2 vCPU)4096 MBParallel task execution, machine learning inference
xlarge4096 (4 vCPU)8192 MBHeavy compute, multi-threaded workloads
heavy8192 (8 vCPU)16384 MBDense workloads, large data processing in memory
max16384 (16 vCPU)32768 MBMaximum available resources on Fargate

A launch request specifies the profile:

launcher.launch(
    flow_name="train_model",
    org_id="acme-corp",
    workload_profile="large"
)

The backend maps the profile name to CPU and memory via WORKLOAD_PROFILES dict and validates the combination is valid for Fargate.

Users can also specify explicit cpu and memory, which overrides any profile. The backend validates using validate_fargate_resources() that the combination is valid.

Ephemeral storage defaults to 21 GiB (minimum) but can be overridden per run. All profiles use the same ephemeral storage; it is independent of CPU/memory selection.


This architecture is designed for multi-tenant, highly scalable DAG execution with clear separation of concerns, fine-grained security, and comprehensive observability. The control-plane pattern enables rapid task submission while the worker's self-contained execution model ensures resilience and auditability.