ECS Fargate Execution Guide
ECS Fargate execution allows you to run Dagy DAG flows as containerized workloads in AWS ECS, suitable for workloads exceeding Lambda's 15-minute timeout or requiring more than 10 GB of memory. This guide covers setup, usage, and operational patterns.
Getting Started with ECS Fargate Execution
ECS Fargate execution allows you to run Dagy DAG flows as containerized workloads in AWS ECS, suitable for workloads exceeding Lambda's 15-minute timeout or requiring more than 10 GB of memory. This guide covers setup, usage, and operational patterns.
Prerequisites
Before deploying ECS Fargate execution, ensure you have:
- An AWS account with appropriate IAM permissions to create EC2, ECS, Lambda, DynamoDB, and S3 resources
- AWS CDK installed and configured (version 2.x or later)
- Python 3.12+ installed locally for development
- Docker installed for building and testing container images
- The Dagy SDK and API installed in your Python environment
Verify your setup by running:
aws sts get-caller-identity
cdk --version
docker --version
python --version
Building and Pushing the Worker Docker Image
The ECS worker container image is built from Dockerfile.worker in the project root. This image contains the Dagy SDK, boto3, and the worker entrypoint at src/dagy_api/ecs/worker_main.py.
To build the image locally:
docker build -f Dockerfile.worker -t dagy-worker:latest .
docker tag dagy-worker:latest dagy-worker:v1.0.0
To push the image to an ECR repository:
First, create an ECR repository if it doesn't exist:
aws ecr create-repository --repository-name dagy-worker --region us-east-1
Then authenticate Docker to the registry and push:
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin 123456789.dkr.ecr.us-east-1.amazonaws.com
docker tag dagy-worker:latest 123456789.dkr.ecr.us-east-1.amazonaws.com/dagy-worker:latest
docker push 123456789.dkr.ecr.us-east-1.amazonaws.com/dagy-worker:latest
The CDK stack in infrastructure/dagy_stack.py can be configured to use your ECR image. Set the lambda_image_uri in StackConfig to point to your image:
config = StackConfig(
environment="production",
app_name="dagy",
lambda_image_uri="123456789.dkr.ecr.us-east-1.amazonaws.com/dagy-worker:latest",
...
)
For development and testing, you can use a local image if your ECS cluster has access to Docker. For production, always use a properly versioned image from a persistent registry.
Launching DAGs from Lambda
The primary way to launch DAGs on ECS Fargate is through the DagLauncher class, which is designed to be imported and used from Lambda functions or other Python code.
Single DAG Launch
The basic pattern imports DagLauncher and calls launch():
from dagy_api.ecs.launcher import DagLauncher, DagLaunchRequest
def lambda_handler(event, context):
launcher = DagLauncher()
# Option 1: Using keyword arguments
result = launcher.launch(
flow_name="etl_pipeline",
flow_version="1.0.0",
org_id="acme-corp",
environment="production",
parameters={
"date": "2024-03-14",
"source_bucket": "s3://data-input"
},
correlation_id="req-12345",
)
return {
"statusCode": 200 if result.success else 500,
"body": {
"run_id": result.run_id,
"task_arn": result.task_arn,
"success": result.success,
"error": result.error_message,
}
}
The DagLauncher constructor reads configuration from environment variables set by the CDK stack:
- DAGY_ECS_CLUSTER_ARN: ARN of the ECS cluster
- DAGY_ECS_EXECUTION_ROLE_ARN: ECS task execution role
- DAGY_ECS_TASK_ROLE_ARN: ECS task runtime role
- DAGY_ECS_WORKER_IMAGE: ECR image URI for dagy-worker
- DAGY_ECS_SUBNETS: Comma-separated subnet IDs
- DAGY_ECS_SECURITY_GROUPS: Comma-separated security group IDs
- DAGY_ECS_LOG_GROUP: CloudWatch log group name
- DAGY_ARTIFACT_BUCKET: S3 bucket for flow artifacts
- DAGY_TRACES_BUCKET: S3 bucket for exception traces
These variables are typically set by the CDK stack as outputs. Alternatively, you can pass them explicitly to DagLauncher:
launcher = DagLauncher(
cluster_arn="arn:aws:ecs:us-east-1:123456789:cluster/dagy",
task_execution_role_arn="arn:aws:iam::123456789:role/ecs-exec-role",
task_role_arn="arn:aws:iam::123456789:role/ecs-task-role",
worker_image="123456789.dkr.ecr.us-east-1.amazonaws.com/dagy-worker:latest",
subnets=["subnet-12345", "subnet-67890"],
security_groups=["sg-12345"],
log_group="/ecs/dagy-worker",
region="us-east-1",
)
Launch Request Model
The DagLaunchRequest dataclass encapsulates all launch parameters:
@dataclass
class DagLaunchRequest:
flow_name: str # Required
org_id: str # Required
flow_version: str = "latest" # Default to latest version
parameters: Dict[str, Any] = field(default_factory=dict)
environment: str = "" # "develop", "staging", "production"
deployment_name: str = "" # Named deployment for dep packages
correlation_id: str = "" # Distributed trace ID
trigger_source: str = "lambda" # Audit trail
triggered_by: str = "" # Lambda function ARN, etc.
# Resource overrides
workload_profile: Optional[str] = None # "small", "medium", "large", etc.
cpu: Optional[str] = None # Explicit CPU (256, 512, 1024, ...)
memory: Optional[str] = None # Explicit memory
ephemeral_storage_gib: Optional[int] = None # 21-200 GiB
# Additional context
extra_env_vars: Dict[str, str] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
Launch Response
The launch() method returns a DagLaunchResult:
@dataclass(frozen=True)
class DagLaunchResult:
success: bool # True if task was submitted
run_id: str # UUID of the execution
flow_name: str
task_arn: Optional[str] = None # ECS task ARN for polling
correlation_id: str = ""
error_message: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
A successful result (success=True) means the ECS task was submitted. The task may still fail during execution; check the run status via get_run_status() or query DynamoDB directly.
Batch and Partitioned DAG Launches
For fan-out workloads, launch_batch() submits multiple independent runs:
results = launcher.launch_batch(
requests=[
{"flow_name": "process_shard", "parameters": {"shard": "0"}},
{"flow_name": "process_shard", "parameters": {"shard": "1"}},
{"flow_name": "process_shard", "parameters": {"shard": "2"}},
{"flow_name": "process_shard", "parameters": {"shard": "3"}},
],
org_id="acme-corp",
environment="production",
correlation_id="batch-req-12345", # Parent correlation ID
)
# Each request gets a child correlation ID: batch-req-12345:0, batch-req-12345:1, etc.
for i, result in enumerate(results):
print(f"Shard {i}: {result.run_id} -> {result.task_arn}")
This submits all requests in parallel and returns immediately. Use the results to track each run independently.
Configuring Workload Profiles and Resource Overrides
Named Workload Profiles
Dagy provides six named workload profiles for common use cases. Specify a profile in the launch request:
result = launcher.launch(
flow_name="train_model",
org_id="acme-corp",
workload_profile="large", # Use 2048 CPU / 4096 MB memory
)
The available profiles are:
- small: 256 CPU / 512 MB (minimal, batch-friendly)
- medium: 1024 CPU / 2048 MB (standard Python workloads)
- large: 2048 CPU / 4096 MB (parallel workloads)
- xlarge: 4096 CPU / 8192 MB (heavy computation)
- heavy: 8192 CPU / 16384 MB (dense in-memory processing)
- max: 16384 CPU / 32768 MB (maximum Fargate resources)
Explicit Resource Overrides
For fine-tuned control, specify CPU and memory explicitly:
result = launcher.launch(
flow_name="etl_pipeline",
org_id="acme-corp",
cpu="2048", # 2 vCPU
memory="4096", # 4 GB
)
The backend validates that the CPU/memory combination is valid for Fargate. Valid combinations include:
- 256 CPU: 512, 1024, 2048 MB
- 512 CPU: 1024, 2048, 3072, 4096 MB
- 1024 CPU: 2048-8192 MB (in 1 GB increments)
- 2048 CPU: 4096-16384 MB (in 1 GB increments)
- 4096 CPU: 8192-30720 MB (in 1 GB increments)
- 8192 CPU: 16384-61440 MB (in 4 GB increments)
- 16384 CPU: 32768-122880 MB (in 8 GB increments)
Ephemeral Storage
ECS Fargate provides 20-200 GiB of ephemeral storage at /tmp. By default, Dagy allocates 21 GiB (the minimum), which should be sufficient for most workloads. To increase it:
result = launcher.launch(
flow_name="large_data_process",
org_id="acme-corp",
ephemeral_storage_gib=100, # 100 GiB for intermediate files
)
Use ephemeral storage for temporary files, unpacked artifacts, and intermediate results. Do not rely on it for persistence; it is cleaned up when the task stops.
Adding Dependency Packages
Dagy supports packaging and distributing custom Python dependencies via the dependency package system. This is useful for shared code, proprietary libraries, or pre-compiled extensions.
Creating a Dependency Package
A dependency package is a ZIP or tar.gz file containing:
my-deps/
├── requirements.txt # pip install dependencies
├── __init__.py # Empty, marks directory as Python package
└── custom_module/
├── __init__.py
└── helpers.py # Your custom code
Create the package:
mkdir -p my-deps/custom_module
echo "requests>=2.28.0" > my-deps/requirements.txt
echo "def helper_function(): pass" > my-deps/custom_module/helpers.py
cd my-deps
zip -r ../my-deps.zip .
cd ..
Push the package to S3:
aws s3 cp my-deps.zip s3://your-artifact-bucket/dep-packages/my-deps.zip
Associating with a Deployment
In the Dagy system, deployments group dependency packages. Define a deployment:
from dagy_api.persistence.models import DeploymentModel
DeploymentModel(
deployment_name="production",
environment="production",
dep_package_slugs=["my-deps"], # References the package by slug
).save()
The slug is a unique identifier for the dependency package. It should be registered separately:
from dagy_api.persistence.models import DepPackageModel
DepPackageModel(
slug="my-deps",
package_s3_uri="s3://your-artifact-bucket/dep-packages/my-deps.zip",
org_id="acme-corp",
version="1.0.0",
).save()
Using Dependency Packages in a Launch
When launching a DAG, specify the deployment_name:
result = launcher.launch(
flow_name="my_pipeline",
org_id="acme-corp",
deployment_name="production", # Pulls in dep packages from this deployment
parameters={"data": "s3://bucket/input.csv"}
)
The launcher resolves the deployment, fetches the dep_package_slugs, and maps them to S3 URIs. These URIs are passed to the ECS worker via DAGY_DEP_PACKAGE_URIS. The worker downloads, extracts, and installs each package before executing the DAG.
Packages are installed to a site-packages directory and added to sys.path, making them importable from your flow code.
Setting Up Scheduled DAG Execution
For recurring workloads, use EventBridge to trigger DAG launches on a schedule.
EventBridge Rule and Lambda Target
The CDK stack can provision scheduled rules. Alternatively, create a rule manually:
aws events put-rule \
--name dagy-etl-daily \
--schedule-expression "cron(0 9 * * ? *)" \
--state ENABLED \
--description "Run ETL pipeline daily at 9 AM UTC"
Create or update the Lambda target:
aws events put-targets \
--rule dagy-etl-daily \
--targets "Id"="1","Arn"="arn:aws:lambda:us-east-1:123456789:function:dag-launcher","RoleArn"="arn:aws:iam::123456789:role/EventBridgeInvokeRole"
Configure the Lambda input to pass the launch request:
aws events put-targets \
--rule dagy-etl-daily \
--targets "Id"="1","Arn"="arn:aws:lambda:us-east-1:123456789:function:dag-launcher","Input"='{"action":"launch","flow_name":"etl_pipeline","org_id":"acme-corp","environment":"production"}'
At the scheduled time, EventBridge invokes the Lambda, which invokes DagLauncher. The DAG execution begins on ECS.
Cron Expression Format
EventBridge uses Unix cron expressions with six fields (minutes, hours, day of month, month, day of week, year):
0 9 * * ? *– Every day at 9 AM UTC0 */6 * * ? *– Every 6 hours0 0 ? * MON-FRI *– Weekdays at midnight0 0 1 * ? *– First day of every month at midnight
For more complex patterns, use rate expressions:
rate(1 hour)– Every hourrate(30 minutes)– Every 30 minutesrate(5 days)– Every 5 days
API-Driven DAG Launch Examples
For external systems (web dashboards, REST APIs, webhooks), expose DAG launch as an HTTP endpoint.
API Gateway and Lambda Integration
Create an API Gateway HTTP API:
aws apigatewayv2 create-api --name dagy-flows --protocol-type HTTP --target <lambda-arn>
The Lambda handler receives the HTTP request:
def api_handler(event, context):
"""Handle HTTP POST to /flows/{flowName}/runs"""
flow_name = event.get("pathParameters", {}).get("flowName", "")
org_id = event.get("queryStringParameters", {}).get("org_id", "")
body = json.loads(event.get("body", "{}"))
parameters = body.get("parameters", {})
launcher = DagLauncher()
result = launcher.launch(
flow_name=flow_name,
org_id=org_id,
parameters=parameters,
environment="production",
trigger_source="api",
triggered_by=event.get("requestContext", {}).get("http", {}).get("sourceIp"),
)
return {
"statusCode": 200 if result.success else 500,
"body": json.dumps({
"run_id": result.run_id,
"task_arn": result.task_arn,
"success": result.success,
}),
}
Example request:
curl -X POST https://api.example.com/flows/etl_pipeline/runs \
-H "Content-Type: application/json" \
-d '{
"org_id": "acme-corp",
"parameters": {
"date": "2024-03-14",
"source": "s3://bucket/input.csv"
}
}'
Response:
{
"run_id": "550e8400-e29b-41d4-a716-446655440000",
"task_arn": "arn:aws:ecs:us-east-1:123456789:task/dagy/...",
"success": true
}
Polling for Run Status
After receiving a run_id, the client can poll for status:
def get_run_handler(event, context):
run_id = event.get("pathParameters", {}).get("runId", "")
launcher = DagLauncher()
status = launcher.get_run_status(run_id)
if not status:
return {"statusCode": 404, "body": json.dumps({"error": "Run not found"})}
return {
"statusCode": 200,
"body": json.dumps({
"run_id": status["run_id"],
"flow_name": status["flow_name"],
"status": status["status"],
"started_at": status["started_at"],
"completed_at": status["completed_at"],
"error_message": status["error_message"],
}),
}
The status field can be QUEUED, RUNNING, SUCCEEDED, FAILED, or CANCELLED.
Adding New DAGs or Execution Profiles
Packaging a New DAG
To create a new DAG, use the Dagy SDK:
from dagy import DAG, task
@task
def extract(source_path):
import pandas as pd
return pd.read_csv(source_path)
@task
def transform(data):
return data.assign(processed=True)
@task
def load(data, destination):
data.to_parquet(destination)
with DAG("etl_pipeline", version="1.0.0") as dag:
data = extract(source_path="{{ parameters.source }}")
transformed = transform(data=data)
load(transformed, destination="{{ parameters.destination }}")
Save this as flows/etl_pipeline.py. Build the DAG:
dagy build flows/etl_pipeline.py --output artifact.zip --version 1.0.0
This creates an artifact.zip containing flow_spec.json, metadata.json, and your Python source.
Uploading the Artifact
Push the artifact to S3:
aws s3 cp artifact.zip s3://your-artifact-bucket/flows/etl_pipeline/1.0.0/artifact.zip
Registering the Flow
Create a flow record in DynamoDB:
from dagy_api.persistence.models import FlowModel
import json
flow_spec = json.loads(Path("artifact/flow_spec.json").read_text())
FlowModel(
flow_name="etl_pipeline",
flow_version="1.0.0",
org_id="acme-corp",
spec=json.dumps(flow_spec),
artifact_s3_uri="s3://your-artifact-bucket/flows/etl_pipeline/1.0.0/artifact.zip",
).save()
Now you can launch:
launcher.launch(
flow_name="etl_pipeline",
flow_version="1.0.0",
org_id="acme-corp",
)
Creating Execution Profiles
Execution profiles combine a flow version with specific resource and deployment settings. While Dagy doesn't have a built-in "profile" model, you can implement this in your application:
EXECUTION_PROFILES = {
"etl_pipeline:quick": {
"flow_name": "etl_pipeline",
"flow_version": "1.0.0",
"workload_profile": "medium",
"deployment_name": "production",
},
"etl_pipeline:large_dataset": {
"flow_name": "etl_pipeline",
"flow_version": "1.0.0",
"workload_profile": "xlarge",
"deployment_name": "production",
},
"etl_pipeline:dev": {
"flow_name": "etl_pipeline",
"flow_version": "dev-branch",
"workload_profile": "small",
"deployment_name": "develop",
},
}
# Launch using a profile
profile = EXECUTION_PROFILES["etl_pipeline:large_dataset"]
result = launcher.launch(
org_id="acme-corp",
environment="production",
**profile,
)
Environment-Specific Configuration
Different environments (develop, staging, production) often need different resource allocations and deployments.
Multi-Environment Setup
Define environment-specific defaults:
ENVIRONMENT_CONFIG = {
"develop": {
"subnets": ["subnet-dev-1", "subnet-dev-2"],
"security_groups": ["sg-dev"],
"workload_profile": "small",
"deployment_name": "develop",
},
"staging": {
"subnets": ["subnet-staging-1", "subnet-staging-2"],
"security_groups": ["sg-staging"],
"workload_profile": "medium",
"deployment_name": "staging",
},
"production": {
"subnets": ["subnet-prod-1", "subnet-prod-2", "subnet-prod-3"],
"security_groups": ["sg-prod"],
"workload_profile": "large",
"deployment_name": "production",
},
}
def launch_for_environment(flow_name, environment, **overrides):
launcher = DagLauncher()
config = ENVIRONMENT_CONFIG[environment]
return launcher.launch(
flow_name=flow_name,
environment=environment,
org_id="acme-corp",
**{**config, **overrides}, # Overrides can customize per-launch
)
Parameter Templating
Use Dagy's parameter templating to inject environment-specific values:
result = launcher.launch(
flow_name="etl_pipeline",
org_id="acme-corp",
environment="production",
parameters={
"output_bucket": f"s3://dagy-outputs-{environment}",
"temp_bucket": f"s3://dagy-temp-{environment}",
},
)
Your DAG can reference these parameters:
@task
def extract():
import os
output_bucket = os.environ.get("OUTPUT_BUCKET") # From parameters
...
Secret Management for DAG Runtime
Sensitive information (database passwords, API keys, tokens) should never be stored in flow definitions or parameters. Dagy supports injecting secrets from AWS Secrets Manager.
Creating a Secret
Store a secret in Secrets Manager:
aws secretsmanager create-secret \
--name prod/db-password \
--secret-string "my-secure-password"
aws secretsmanager create-secret \
--name prod/api-key \
--secret-string "sk-1234567890"
Injecting Secrets at Launch
The launcher accepts secrets_env mapping (not yet exposed in the example, but can be added):
# Note: This requires extending DagLauncher and EcsBackend
# to support secrets_env parameter
result = launcher.launch(
flow_name="api_pipeline",
org_id="acme-corp",
secrets_env={
"DB_PASSWORD": "arn:aws:secretsmanager:us-east-1:123456789:secret:prod/db-password",
"API_KEY": "arn:aws:secretsmanager:us-east-1:123456789:secret:prod/api-key",
},
)
The ECS task execution role must have secretsmanager:GetSecretValue permission on these ARNs. ECS injects the secret values as environment variables in the container, and your code reads them:
@task
def connect_database():
import os
db_password = os.environ["DB_PASSWORD"] # Injected by ECS
# Use password to connect
Secrets are never logged or stored in task definitions, only referenced via ARN.
Monitoring and Observability
Viewing Logs
All ECS worker logs are written to CloudWatch Logs in JSON format. View logs for a specific run:
aws logs get-log-events \
--log-group-name /ecs/dagy-worker \
--log-stream-name <run_id> \
--start-time $(date -d '5 minutes ago' +%s)000 \
--query 'events[].message' \
--output text
Search for specific events:
aws logs start-query \
--log-group-name /ecs/dagy-worker \
--start-time $(date -d '1 hour ago' +%s) \
--end-time $(date +%s) \
--query-string 'fields @timestamp, run_id, event_type | filter event_type = "task_failed"'
Tracking Runs
Query DynamoDB for run status:
from dagy_api.persistence.models import RunModel
run = RunModel.get(run_id)
print(f"Status: {run.status}")
print(f"Started: {run.started_at}")
print(f"Completed: {run.completed_at}")
print(f"Error: {run.error_message}")
Accessing Exception Traces
When a DAG fails, the full exception traceback is uploaded to S3:
aws s3 cp s3://your-traces-bucket/exceptions/<org_id>/<run_id>/traceback.txt .
cat traceback.txt
This helps with debugging failures without needing to access CloudWatch Logs.
This guide covers the most common patterns for launching and managing DAG executions on ECS Fargate. For advanced scenarios (custom metrics, VPC configuration, multi-region), refer to the architecture document and CDK stack configuration.