ECS Fargate Execution Framework Architecture
The Dagy ECS Fargate execution framework enables distributed execution of DAG workloads on AWS ECS Fargate, suitable for long-running workloads that exceed Lambda's 15-minute execution limit or require more than 10 GB of memory. The framework follows a control-plane pattern where Lambda functions coordinate DAG launches while containerized ECS Fargate tasks execute the actual work.
Overview
The Dagy ECS Fargate execution framework enables distributed execution of DAG workloads on AWS ECS Fargate, suitable for long-running workloads that exceed Lambda's 15-minute execution limit or require more than 10 GB of memory. The framework follows a control-plane pattern where Lambda functions coordinate DAG launches while containerized ECS Fargate tasks execute the actual work.
The architecture is built on three core components:
- Control Plane (Lambda): The
DagLauncherclass insrc/dagy_api/ecs/launcher.pyaccepts launch requests, resolves dependencies, creates run records, and submits work to ECS. - Data Plane (ECS Fargate): The
dagy-workercontainer runningsrc/dagy_api/ecs/worker_main.pyexecutes DAGs, manages artifacts, and reports status back to DynamoDB. - Status Reconciler (Lambda): The
reconcile_ecs_runs()function insrc/dagy_api/ecs/reconciler.pyperiodically reconciles stale task statuses.
Execution Framework Design
Control-Plane Pattern
The ECS execution framework implements a Lambda-based control plane that decouples task submission from task execution. This pattern provides several advantages over direct execution:
The control plane is stateless and horizontally scalable. Multiple Lambda instances can handle simultaneous launch requests without coordination. The DagLauncher class in src/dagy_api/ecs/launcher.py is instantiated fresh for each request, reading cluster configuration from environment variables set by the CDK stack.
Task submission is asynchronous and resilient. Once a task is submitted to ECS, the control plane immediately returns a launch result containing the task ARN. If the ECS worker crashes or encounters infrastructure failures, the separate status reconciler can detect and handle recovery.
The control plane is responsible for resolving all static configuration before task submission. It queries DynamoDB to resolve the flow definition, validates artifacts exist in S3, resolves dependency packages, and creates the initial run record. This ensures the ECS worker only needs to download and execute, without needing to validate the configuration.
ECS Fargate Runtime Model
ECS Fargate is a serverless container compute service that manages the underlying EC2 infrastructure. Dagy uses Fargate with the awsvpc network mode, which attaches each task to a VPC subnet with an ENI. This provides:
Fine-grained security control through security groups. Each Dagy task can be isolated to specific security groups with restricted ingress/egress rules.
Predictable networking. Tasks can be deployed to private subnets with no public IP, accessing S3 and DynamoDB through VPC endpoints.
Flexible resource sizing. Fargate supports a matrix of vCPU and memory combinations from 256 CPU units / 512 MB up to 16384 CPU units / 122 GB.
Ephemeral storage. Each task gets 20-200 GiB of ephemeral storage at /tmp, suitable for intermediate artifacts and unpacked dependencies.
The CDK stack in infrastructure/dagy_stack.py provisions the ECS cluster with both Fargate and Fargate Spot capacity providers. Fargate Spot offers up to 70% cost savings at the risk of interruption with a 2-minute notice period, making it ideal for batch workloads that can tolerate occasional failures and reruns.
Lambda-to-ECS Launch Flow
The launch flow proceeds through these steps:
First, the control plane validates the launch request and resolves required resources. The DagLauncher.launch() method in src/dagy_api/ecs/launcher.py accepts a DagLaunchRequest containing the flow name, version, parameters, and optional resource overrides.
Second, the flow definition is resolved from DynamoDB via the _resolve_flow() method. For "latest" version requests, a query returns the most recent version scoped to the org_id. The method validates that the flow has an artifact_s3_uri pointing to a packaged artifact in S3.
Third, dependency packages are resolved via _resolve_dep_packages(). For named deployments, the method queries the DeploymentModel to extract dep_package_slugs, then queries DepPackageModel to map those slugs to S3 URIs. This enables different deployments to use different sets of dependencies.
Fourth, a run record is created in DynamoDB with status QUEUED. The run_id is generated as a UUID. Additional metadata like run_slug, trigger_source, and triggered_by are stored for observability. A FlowSpec is built from the artifact's flow_spec.json and serialized.
Fifth, the EcsBackend.submit_run() method in src/dagy_api/backends/ecs.py takes over. It resolves resource sizing based on workload profile or explicit overrides, registers an ECS task definition, and calls ecs.run_task() with the cluster ARN.
Sixth, the ECS control plane provisions and launches the task. The task enters PROVISIONING, then PENDING, then RUNNING status as resources are allocated and the container image is pulled.
Seventh, the ECS worker container starts and begins execution. It updates the run record to RUNNING with started_at timestamp.
Eighth, the control plane returns immediately with a DagLaunchResult containing success, run_id, and task_arn for caller tracking.
DAG Execution Lifecycle
The worker implements a structured lifecycle for each DAG execution, with clear phase transitions and error handling.
Startup Phase: The worker begins in main() of src/dagy_api/ecs/worker_main.py. It sets up structured JSON logging via StructuredFormatter, parses configuration from environment variables via WorkerConfig.from_env(), and validates required inputs. If validation fails (missing run_id, artifact URI, etc.), the worker exits with code EXIT_INVALID_INPUT (2).
Infrastructure Phase: The worker initializes the DynamoDB repository and updates the run status to RUNNING with the current timestamp. This signals to clients and the reconciler that the task has started. If DynamoDB is unreachable, the worker exits with code EXIT_INFRA_ERROR (3).
Artifact Download Phase: The worker downloads the flow artifact from S3 using _download_artifact(). The artifact is a ZIP file containing flow_spec.json, metadata.json, and Python source files. The ZIP is extracted to a workspace directory with safe path validation to prevent zip-slip vulnerabilities (CVE-2007-4559).
Dependency Install Phase: The worker downloads and installs dependency packages via _install_dep_packages(). Each package URI is downloaded from S3, extracted, and any requirements.txt is processed with pip to install packages into a site-packages directory. The site-packages are added to sys.path for import resolution.
FlowSpec Load Phase: The worker loads the FlowSpec from flow_spec.json via _load_flow_spec(). It deserializes task definitions and task run specifications from JSON. It also attempts to load the Python source module and flow object for dynamic rebuild support.
DAG Execution Phase: The worker executes the DAG via _execute_dag() using the LocalExecutor with ThreadPoolExecutor for parallelism. Task runs are scheduled as dependencies become available. For each task run, the worker resolves the task callable, invokes the executor, and stores results in DynamoDB task_runs table. If any task fails and has no retries, or if a dependency of a task has failed, the downstream task is skipped.
Status Reporting Phase: Upon successful completion, the worker updates the run status to SUCCEEDED with completed_at timestamp and records usage metrics. Upon failure, it updates the status to FAILED with an error_message (truncated to 2048 chars) and uploads the full exception traceback to S3 via _capture_exception_to_s3().
Cleanup Phase: The workspace directory is removed in the finally block, ensuring temporary files don't accumulate.
Exit codes communicate the failure mode: 0 for success, 1 for execution failures (task errors), 2 for invalid input, 3 for infrastructure errors.
Task Definition Model and Multi-Tenancy
ECS task definitions are registered dynamically for each run in _register_task_definition() of src/dagy_api/backends/ecs.py. The task definition specifies:
The container image: Always the dagy-worker image pushed to ECR. Tags allow for rollout control (dagy-worker:latest vs dagy-worker:v1.2.3).
Environment variables: All runtime configuration is passed as environment variables, including run_id, flow_name, artifact_s3_uri, org_id, and parameters. This supports the worker's stateless design.
IAM roles: The task execution role (ecs-exec-role) is used by ECS to pull the image and inject secrets. The task role (ecs-task-role) is assumed by the container for S3 and DynamoDB access.
Resource limits: cpu and memory are specified per Fargate compatibility matrix (e.g., 1024 CPU / 2048 MB). Ephemeral storage can be overridden (21-200 GiB).
Logging configuration: awslogs driver sends stdout/stderr to CloudWatch Logs under /ecs/dagy-worker/{run_id}.
Secrets injection: Optional secret references via valueFrom point to Secrets Manager ARNs for injecting sensitive data without exposing in logs.
Multi-tenancy is enforced through org_id isolation. Each run is tagged with its org_id, which is passed to the worker via environment variable. The worker uses org_id as a partition key for DynamoDB queries and S3 object paths. The ECS worker runs with fine-grained IAM permissions that limit S3 access to specific org_id prefixes, preventing cross-tenant data leakage.
IAM Model and Role Separation
The framework uses three distinct IAM roles for proper privilege isolation:
DAG Launcher Role (dag-launcher-role)
This role is assumed by Lambda functions that invoke DagLauncher. It has limited permissions:
- ecs:DescribeTaskDefinition, ecs:RegisterTaskDefinition, ecs:RunTask (to launch tasks)
- dynamodb:PutItem, dynamodb:GetItem, dynamodb:UpdateItem (to create and update run records)
- s3:GetObject (to read flow artifacts and validate S3 URIs)
- logs:CreateLogStream, logs:PutLogEvents (to write structured logs)
The launcher role does NOT have permissions to assume other roles or modify IAM policies. This prevents privilege escalation.
ECS Task Execution Role (ecs-exec-role)
This role is used by ECS itself (not the container) to perform actions on behalf of the task:
- ecr:GetAuthorizationToken, ecr:BatchGetImage, ecr:GetDownloadUrlForLayer (to pull the dagy-worker image)
- logs:CreateLogGroup, logs:CreateLogStream, logs:PutLogEvents (to emit logs to CloudWatch Logs)
- secretsmanager:GetSecretValue (to inject secrets at runtime)
The execution role is scoped to the specific log group /ecs/dagy-worker and the ECR repository containing dagy-worker images.
ECS Task Runtime Role (ecs-task-role)
This role is assumed by the container process (worker_main.py) and has permissions to access data:
- s3:GetObject (for org_id/{run_id}/* prefix to download artifacts and dependency packages)
- s3:PutObject (for org_id/{run_id}/* prefix to upload exception traces)
- dynamodb:Query, dynamodb:GetItem, dynamodb:PutItem, dynamodb:UpdateItem (to read flows, create/update run records and task runs)
- cloudwatch:PutMetricData (optional, for custom metrics)
The role uses resource conditions to scope S3 access to objects with the org_id prefix, enforcing tenant isolation at the IAM level.
Network Design
VPC and Subnet Configuration
The ECS cluster is deployed to a VPC specified in CDK configuration. For production deployments, private subnets are recommended to prevent accidental public exposure. The dagy-worker tasks are launched in awsvpc mode, which creates an elastic network interface (ENI) in the specified subnet.
Security groups control inbound and outbound traffic. For ECS tasks, a typical configuration allows:
- No inbound traffic (tasks are pull-based, not exposed as services)
- Outbound HTTPS to S3, DynamoDB, and CloudWatch Logs (either via NAT Gateway or VPC endpoints)
- Outbound to Secrets Manager and ECR if using secrets injection or custom images
Service Discovery and Load Balancing
Individual ECS tasks are not discoverable via service discovery because they are ephemeral compute resources. They do not listen on network ports for incoming requests. Instead, they pull work from DynamoDB and push status updates back to DynamoDB.
If external clients need to trigger DAG launches, they invoke Lambda via API Gateway, which invokes DagLauncher. The launcher immediately returns the run_id and task_arn for polling.
Egress and S3/DynamoDB Access
Tasks access AWS services (S3, DynamoDB, CloudWatch Logs, Secrets Manager, and ECR) over HTTPS using VPC endpoints (for data plane services like S3 and DynamoDB) and service endpoints (for control plane services like ECR and Secrets Manager).
If the VPC has no internet gateway and tasks are in private subnets, VPC endpoints must be configured for:
- s3 (gateway or interface endpoint)
- dynamodb (gateway endpoint only)
- logs (interface endpoint)
- secretsmanager (interface endpoint)
- ecr.api and ecr.dkr (interface endpoints)
This enables secure, low-latency access without egress through a NAT Gateway, reducing costs and latency.
Configuration and Secrets Model
Environment Variables
The worker receives configuration entirely via environment variables set by the control plane. This list includes:
Required identifiers:
- DAGY_RUN_ID: UUID of the execution
- DAGY_FLOW_NAME: Name of the flow to execute
- DAGY_FLOW_VERSION: Version of the flow (e.g., "1.0.0" or "latest")
- DAGY_ARTIFACT_S3_URI: Full S3 URI (s3://bucket/key) of the flow artifact
Tenant isolation:
- DAGY_ORG_ID: Organization/tenant ID for multi-tenancy
Runtime parameters:
- DAGY_PARAMETERS: JSON-encoded dictionary of flow parameters
Observability:
- DAGY_CORRELATION_ID: Request ID for distributed tracing
- DAGY_ENVIRONMENT: Environment name (develop/staging/production)
- DAGY_RUN_SLUG: Human-readable slug (flow-name-short-uuid)
- DAGY_DEPLOYMENT_NAME: Deployment context
Dependencies:
- DAGY_DEP_PACKAGE_URIS: Comma-separated S3 URIs of dependency packages
Execution tuning:
- DAGY_MAX_WORKERS: Number of concurrent task threads (default 4)
- DAGY_LOG_LEVEL: Logging verbosity (default INFO)
AWS resources:
- DAGY_ARTIFACTS_BUCKET: S3 bucket for run artifacts
- DAGY_TRACES_BUCKET: S3 bucket for exception traces
- DAGY_FLOWS, DAGY_RUNS, DAGY_TASK_RUNS, DAGY_DEPLOYMENTS, DAGY_DEP_PACKAGES: DynamoDB table names
Secrets:
- DAGY_SECRETS_KEY: Encryption key for Dagy-managed secrets (optional)
Secrets Manager Injection
Sensitive runtime configuration (database passwords, API keys) can be injected from AWS Secrets Manager without exposing in environment variables or logs. The control plane can specify secrets_env mapping:
{
"DB_PASSWORD": "arn:aws:secretsmanager:us-east-1:123456789:secret:prod/db-password",
"API_KEY": "arn:aws:secretsmanager:us-east-1:123456789:secret:prod/api-key"
}
These are passed to ECS as container secrets (valueFrom references), which ECS injects at container startup. The container receives the secret values in environment variables without them appearing in logs or task definition history.
The ECS task execution role must have secretsmanager:GetSecretValue permission on the ARNs.
Runtime Overrides
The DagLaunchRequest supports per-run resource overrides:
- workload_profile: Named preset (small/medium/large/xlarge/heavy/max)
- cpu: Explicit CPU (256, 512, 1024, ..., 16384)
- memory: Explicit memory (512, 1024, ..., 122880)
- ephemeral_storage_gib: Storage (21-200)
- extra_env_vars: Additional environment variables
The backend validates that cpu/memory combinations are valid for Fargate and applies them to the task definition.
State and Artifact Persistence Model
DynamoDB Tables
The framework uses five DynamoDB tables for state storage:
DAGY_FLOWS: Stores flow definitions with schema:
- Partition key: flow_name
- Sort key: flow_version
- Attributes: spec (JSON), artifact_s3_uri, org_id, created_at
- Indexes: org_id_index for multi-tenant queries
DAGY_RUNS: Stores execution records with schema:
- Partition key: run_id
- Attributes: org_id, flow_name, flow_version, status, parameters, created_at, started_at, completed_at, error_message, external_id (task ARN)
- Indexes: flow_name_index, status_runs_index (for reconciler queries)
DAGY_TASK_RUNS: Stores per-task execution records with schema:
- Partition key: run_id
- Sort key: task_run_id
- Attributes: task_name, status, attempt, max_retries, start_time, end_time, error, executor
DAGY_DEPLOYMENTS: Stores deployment metadata with schema:
- Partition key: deployment_name
- Attributes: dep_package_slugs, environment, created_at
DAGY_DEP_PACKAGES: Stores dependency package references with schema:
- Partition key: slug
- Attributes: package_s3_uri, org_id, version, created_at
S3 Artifacts and Traces
Artifacts are stored in DAGY_ARTIFACT_BUCKET under org_id/{run_id}/artifact.zip. The artifact is a ZIP file containing:
- flow_spec.json: Serialized FlowSpec
- metadata.json: Flow metadata (entrypoint, etc.)
- *.py: Python source files
- requirements.txt (optional): Dependencies
Exception traces are stored in DAGY_TRACES_BUCKET under exceptions/{org_id}/{run_id}/traceback.txt when a DAG execution fails.
Write Patterns
The control plane performs writes immediately upon launch request:
- PutItem into DAGY_RUNS with status QUEUED
- RunTask on ECS (asynchronous)
- Return run_id to caller
The worker performs writes as execution progresses:
- UpdateItem on DAGY_RUNS to status RUNNING with started_at
- PutItem into DAGY_TASK_RUNS for each task (as they start and complete)
- UpdateItem on DAGY_RUNS to status SUCCEEDED/FAILED with completed_at
The reconciler performs conditional writes:
- Query DAGY_RUNS for status=RUNNING and executor=ecs
- For each run, describe the ECS task
- If task is STOPPED, UpdateItem on DAGY_RUNS with new status
This pattern ensures the control plane never blocks on I/O; the worker updates state asynchronously. The reconciler periodically detects any stale states and corrects them.
Observability Model
Structured JSON Logging
All logs are emitted as single-line JSON objects for easy parsing by CloudWatch Insights. The StructuredFormatter in src/dagy_api/ecs/worker_main.py emits logs with fields:
{
"timestamp": "2024-03-14T10:30:45.123Z",
"level": "INFO",
"logger": "dagy.ecs.worker",
"message": "DAG execution starting",
"event_type": "dag_execution_start",
"run_id": "550e8400-e29b-41d4-a716-446655440000",
"flow_name": "etl_pipeline",
"org_id": "org-123",
"correlation_id": "req-abc-123",
"task_count": 5,
"task_list": ["extract", "transform", "load"]
}
Custom event_type fields enable filtering and alerting on specific lifecycle events:
- worker_start: Worker container initialization
- validation_error: Configuration validation failure
- artifact_download_start/complete: S3 artifact operations
- dep_install_start/complete: Dependency package installation
- dag_execution_start: DAG execution beginning
- task_start/end: Individual task lifecycle
- worker_complete/failed: Execution completion
- exception_captured: Exception trace upload
CloudWatch Integration
Logs are collected in /ecs/dagy-worker CloudWatch log group. A log stream is created per run_id for easy isolation. The control plane can stream logs using:
logs.describe_log_streams(logGroupName='/ecs/dagy-worker', logStreamNamePrefix=run_id)
logs.get_log_events(logGroupName='...', logStreamName='...')
CloudWatch Insights queries can correlate logs across runs:
fields @timestamp, run_id, event_type, duration_seconds
| filter status = "FAILED"
| stats sum(duration_seconds) as total_time by flow_name
Correlation IDs and Distributed Tracing
Every launch request receives a correlation_id (UUID or custom value). This ID is:
- Stored in the run record as correlation_id
- Passed to the worker via DAGY_CORRELATION_ID environment variable
- Included in every log line emitted by the worker
- Propagated to task_run records for cross-run correlation
For batch launches (launch_batch), a parent correlation_id is generated, and each run gets a child correlation_id (parent:0, parent:1, etc.) for hierarchical tracing.
External systems (orchestrators, monitoring dashboards) can query by correlation_id to see the full trace of a multi-step workflow.
Metrics and Alarms
The framework records usage events and aggregates for billing and monitoring. The worker calls _record_usage() with:
- duration_seconds: Total execution time
- task_count: Number of tasks in the flow
- error_count: 0 for success, 1 for failure
- estimated_cost_usd: Computed based on duration and resource profile
Custom CloudWatch metrics can be published for:
- RunCompleted (Count, filtered by executor=ecs)
- RunDuration (Histogram, by flow_name)
- RunFailureRate (Percentage, by environment)
Supported Execution Patterns
Direct Invocation
A Lambda function or SDK directly imports DagLauncher and calls launch():
from dagy_api.ecs.launcher import DagLauncher
launcher = DagLauncher()
result = launcher.launch(
flow_name="my_etl",
flow_version="1.0.0",
org_id="acme-corp",
parameters={"date": "2024-03-14"}
)
print(result.run_id, result.task_arn)
This is synchronous from the caller's perspective (returns immediately once task is submitted).
Scheduled Execution
An EventBridge rule triggers a Lambda on a schedule, which invokes DagLauncher:
Rule: "trigger-etl-every-day-at-9am"
Target: Lambda function running DagLauncher
Event: {"action": "launch", "flow_name": "etl_pipeline", "org_id": "acme-corp"}
The CDK stack can provision such rules for recurring workloads.
API-Driven Execution
An HTTP API powered by API Gateway dispatches launch requests to a Lambda handler:
POST /api/flows/{flowName}/runs
Body: {"org_id": "acme-corp", "parameters": {...}}
Response: {"run_id": "...", "task_arn": "..."}
The control plane handler in src/dag_launcher_lambda.py parses the event and delegates to DagLauncher.
Lambda-Triggered Execution
An S3 event (object created), SQS event, or SNS message triggers a Lambda that invokes DagLauncher in response. This enables reactive workload submission.
Batch and Partitioned Execution
For fan-out patterns, launcher.launch_batch() submits multiple independent runs:
results = launcher.launch_batch([
{"flow_name": "process_shard", "parameters": {"shard": "0"}},
{"flow_name": "process_shard", "parameters": {"shard": "1"}},
{"flow_name": "process_shard", "parameters": {"shard": "2"}},
], org_id="acme-corp", environment="production")
Each request is processed independently and can fail without affecting siblings.
Retry and Replay Model
Idempotent Reruns
The framework supports replaying failed runs from a checkpoint. Each run_id is unique, creating a new execution record and task run record. If a run fails, the client can:
- Inspect the error_message and exception trace in S3
- Fix the underlying issue (update the flow, adjust parameters, etc.)
- Call launcher.launch() again with a new run (different run_id)
The new run will start from the beginning. There is no built-in checkpoint/restart within a single run.
Task-Level Retries
Individual tasks within a flow can be configured with retries in the FlowSpec. The TaskSpec supports:
- retries: Number of times to retry on failure (default 0)
- retry_delay_seconds: Delay between retry attempts
- retry_jitter_factor: Randomization factor for exponential backoff
The ECS worker's LocalExecutor handles task retries. When a task fails and has retries remaining, the executor waits for retry_delay_seconds (plus jitter), then re-invokes the task. The attempt count and error are recorded in the task_run record.
If all retries are exhausted, the task fails and downstream tasks are skipped.
Checkpoint Awareness
The worker is checkpoint-aware but does not implement persistence. If a task produces an output (return value), that output is stored in memory and hydrated into the inputs of dependent tasks. If the container is OOM-killed before all tasks complete, the run is marked FAILED and the user must resubmit.
For long-running flows, checkpoint strategies can be implemented in user code:
- Save intermediate results to S3 within the task
- On retry or new run, load the checkpoint and skip prior tasks
- Use idempotent operations to avoid re-processing data
Workload Profiles for Right-Sizing
The framework provides six named workload profiles to simplify resource selection:
| Profile | CPU | Memory | Use Case |
|---|---|---|---|
| small | 256 (0.25 vCPU) | 512 MB | Minimal workloads, long-running batch jobs with low CPU |
| medium | 1024 (1 vCPU) | 2048 MB | Standard Python scripts, data transformation |
| large | 2048 (2 vCPU) | 4096 MB | Parallel task execution, machine learning inference |
| xlarge | 4096 (4 vCPU) | 8192 MB | Heavy compute, multi-threaded workloads |
| heavy | 8192 (8 vCPU) | 16384 MB | Dense workloads, large data processing in memory |
| max | 16384 (16 vCPU) | 32768 MB | Maximum available resources on Fargate |
A launch request specifies the profile:
launcher.launch(
flow_name="train_model",
org_id="acme-corp",
workload_profile="large"
)
The backend maps the profile name to CPU and memory via WORKLOAD_PROFILES dict and validates the combination is valid for Fargate.
Users can also specify explicit cpu and memory, which overrides any profile. The backend validates using validate_fargate_resources() that the combination is valid.
Ephemeral storage defaults to 21 GiB (minimum) but can be overridden per run. All profiles use the same ephemeral storage; it is independent of CPU/memory selection.
This architecture is designed for multi-tenant, highly scalable DAG execution with clear separation of concerns, fine-grained security, and comprehensive observability. The control-plane pattern enables rapid task submission while the worker's self-contained execution model ensures resilience and auditability.