Back to docs
Guides

ECS Fargate Execution Guide

ECS Fargate execution allows you to run Dagy DAG flows as containerized workloads in AWS ECS, suitable for workloads exceeding Lambda's 15-minute timeout or requiring more than 10 GB of memory. This guide covers setup, usage, and operational patterns.

Getting Started with ECS Fargate Execution

ECS Fargate execution allows you to run Dagy DAG flows as containerized workloads in AWS ECS, suitable for workloads exceeding Lambda's 15-minute timeout or requiring more than 10 GB of memory. This guide covers setup, usage, and operational patterns.

Prerequisites

Before deploying ECS Fargate execution, ensure you have:

  1. An AWS account with appropriate IAM permissions to create EC2, ECS, Lambda, DynamoDB, and S3 resources
  2. AWS CDK installed and configured (version 2.x or later)
  3. Python 3.12+ installed locally for development
  4. Docker installed for building and testing container images
  5. The Dagy SDK and API installed in your Python environment

Verify your setup by running:

aws sts get-caller-identity
cdk --version
docker --version
python --version

Building and Pushing the Worker Docker Image

The ECS worker container image is built from Dockerfile.worker in the project root. This image contains the Dagy SDK, boto3, and the worker entrypoint at src/dagy_api/ecs/worker_main.py.

To build the image locally:

docker build -f Dockerfile.worker -t dagy-worker:latest .
docker tag dagy-worker:latest dagy-worker:v1.0.0

To push the image to an ECR repository:

First, create an ECR repository if it doesn't exist:

aws ecr create-repository --repository-name dagy-worker --region us-east-1

Then authenticate Docker to the registry and push:

aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin 123456789.dkr.ecr.us-east-1.amazonaws.com
docker tag dagy-worker:latest 123456789.dkr.ecr.us-east-1.amazonaws.com/dagy-worker:latest
docker push 123456789.dkr.ecr.us-east-1.amazonaws.com/dagy-worker:latest

The CDK stack in infrastructure/dagy_stack.py can be configured to use your ECR image. Set the lambda_image_uri in StackConfig to point to your image:

config = StackConfig(
    environment="production",
    app_name="dagy",
    lambda_image_uri="123456789.dkr.ecr.us-east-1.amazonaws.com/dagy-worker:latest",
    ...
)

For development and testing, you can use a local image if your ECS cluster has access to Docker. For production, always use a properly versioned image from a persistent registry.

Launching DAGs from Lambda

The primary way to launch DAGs on ECS Fargate is through the DagLauncher class, which is designed to be imported and used from Lambda functions or other Python code.

Single DAG Launch

The basic pattern imports DagLauncher and calls launch():

from dagy_api.ecs.launcher import DagLauncher, DagLaunchRequest

def lambda_handler(event, context):
    launcher = DagLauncher()

    # Option 1: Using keyword arguments
    result = launcher.launch(
        flow_name="etl_pipeline",
        flow_version="1.0.0",
        org_id="acme-corp",
        environment="production",
        parameters={
            "date": "2024-03-14",
            "source_bucket": "s3://data-input"
        },
        correlation_id="req-12345",
    )

    return {
        "statusCode": 200 if result.success else 500,
        "body": {
            "run_id": result.run_id,
            "task_arn": result.task_arn,
            "success": result.success,
            "error": result.error_message,
        }
    }

The DagLauncher constructor reads configuration from environment variables set by the CDK stack:

  • DAGY_ECS_CLUSTER_ARN: ARN of the ECS cluster
  • DAGY_ECS_EXECUTION_ROLE_ARN: ECS task execution role
  • DAGY_ECS_TASK_ROLE_ARN: ECS task runtime role
  • DAGY_ECS_WORKER_IMAGE: ECR image URI for dagy-worker
  • DAGY_ECS_SUBNETS: Comma-separated subnet IDs
  • DAGY_ECS_SECURITY_GROUPS: Comma-separated security group IDs
  • DAGY_ECS_LOG_GROUP: CloudWatch log group name
  • DAGY_ARTIFACT_BUCKET: S3 bucket for flow artifacts
  • DAGY_TRACES_BUCKET: S3 bucket for exception traces

These variables are typically set by the CDK stack as outputs. Alternatively, you can pass them explicitly to DagLauncher:

launcher = DagLauncher(
    cluster_arn="arn:aws:ecs:us-east-1:123456789:cluster/dagy",
    task_execution_role_arn="arn:aws:iam::123456789:role/ecs-exec-role",
    task_role_arn="arn:aws:iam::123456789:role/ecs-task-role",
    worker_image="123456789.dkr.ecr.us-east-1.amazonaws.com/dagy-worker:latest",
    subnets=["subnet-12345", "subnet-67890"],
    security_groups=["sg-12345"],
    log_group="/ecs/dagy-worker",
    region="us-east-1",
)

Launch Request Model

The DagLaunchRequest dataclass encapsulates all launch parameters:

@dataclass
class DagLaunchRequest:
    flow_name: str                                    # Required
    org_id: str                                       # Required
    flow_version: str = "latest"                      # Default to latest version
    parameters: Dict[str, Any] = field(default_factory=dict)
    environment: str = ""                             # "develop", "staging", "production"
    deployment_name: str = ""                         # Named deployment for dep packages
    correlation_id: str = ""                          # Distributed trace ID
    trigger_source: str = "lambda"                    # Audit trail
    triggered_by: str = ""                            # Lambda function ARN, etc.

    # Resource overrides
    workload_profile: Optional[str] = None            # "small", "medium", "large", etc.
    cpu: Optional[str] = None                         # Explicit CPU (256, 512, 1024, ...)
    memory: Optional[str] = None                      # Explicit memory
    ephemeral_storage_gib: Optional[int] = None       # 21-200 GiB

    # Additional context
    extra_env_vars: Dict[str, str] = field(default_factory=dict)
    metadata: Dict[str, Any] = field(default_factory=dict)

Launch Response

The launch() method returns a DagLaunchResult:

@dataclass(frozen=True)
class DagLaunchResult:
    success: bool                          # True if task was submitted
    run_id: str                            # UUID of the execution
    flow_name: str
    task_arn: Optional[str] = None         # ECS task ARN for polling
    correlation_id: str = ""
    error_message: Optional[str] = None
    metadata: Dict[str, Any] = field(default_factory=dict)

A successful result (success=True) means the ECS task was submitted. The task may still fail during execution; check the run status via get_run_status() or query DynamoDB directly.

Batch and Partitioned DAG Launches

For fan-out workloads, launch_batch() submits multiple independent runs:

results = launcher.launch_batch(
    requests=[
        {"flow_name": "process_shard", "parameters": {"shard": "0"}},
        {"flow_name": "process_shard", "parameters": {"shard": "1"}},
        {"flow_name": "process_shard", "parameters": {"shard": "2"}},
        {"flow_name": "process_shard", "parameters": {"shard": "3"}},
    ],
    org_id="acme-corp",
    environment="production",
    correlation_id="batch-req-12345",  # Parent correlation ID
)

# Each request gets a child correlation ID: batch-req-12345:0, batch-req-12345:1, etc.
for i, result in enumerate(results):
    print(f"Shard {i}: {result.run_id} -> {result.task_arn}")

This submits all requests in parallel and returns immediately. Use the results to track each run independently.

Configuring Workload Profiles and Resource Overrides

Named Workload Profiles

Dagy provides six named workload profiles for common use cases. Specify a profile in the launch request:

result = launcher.launch(
    flow_name="train_model",
    org_id="acme-corp",
    workload_profile="large",  # Use 2048 CPU / 4096 MB memory
)

The available profiles are:

  • small: 256 CPU / 512 MB (minimal, batch-friendly)
  • medium: 1024 CPU / 2048 MB (standard Python workloads)
  • large: 2048 CPU / 4096 MB (parallel workloads)
  • xlarge: 4096 CPU / 8192 MB (heavy computation)
  • heavy: 8192 CPU / 16384 MB (dense in-memory processing)
  • max: 16384 CPU / 32768 MB (maximum Fargate resources)

Explicit Resource Overrides

For fine-tuned control, specify CPU and memory explicitly:

result = launcher.launch(
    flow_name="etl_pipeline",
    org_id="acme-corp",
    cpu="2048",      # 2 vCPU
    memory="4096",   # 4 GB
)

The backend validates that the CPU/memory combination is valid for Fargate. Valid combinations include:

  • 256 CPU: 512, 1024, 2048 MB
  • 512 CPU: 1024, 2048, 3072, 4096 MB
  • 1024 CPU: 2048-8192 MB (in 1 GB increments)
  • 2048 CPU: 4096-16384 MB (in 1 GB increments)
  • 4096 CPU: 8192-30720 MB (in 1 GB increments)
  • 8192 CPU: 16384-61440 MB (in 4 GB increments)
  • 16384 CPU: 32768-122880 MB (in 8 GB increments)

Ephemeral Storage

ECS Fargate provides 20-200 GiB of ephemeral storage at /tmp. By default, Dagy allocates 21 GiB (the minimum), which should be sufficient for most workloads. To increase it:

result = launcher.launch(
    flow_name="large_data_process",
    org_id="acme-corp",
    ephemeral_storage_gib=100,  # 100 GiB for intermediate files
)

Use ephemeral storage for temporary files, unpacked artifacts, and intermediate results. Do not rely on it for persistence; it is cleaned up when the task stops.

Adding Dependency Packages

Dagy supports packaging and distributing custom Python dependencies via the dependency package system. This is useful for shared code, proprietary libraries, or pre-compiled extensions.

Creating a Dependency Package

A dependency package is a ZIP or tar.gz file containing:

my-deps/
├── requirements.txt          # pip install dependencies
├── __init__.py               # Empty, marks directory as Python package
└── custom_module/
    ├── __init__.py
    └── helpers.py            # Your custom code

Create the package:

mkdir -p my-deps/custom_module
echo "requests>=2.28.0" > my-deps/requirements.txt
echo "def helper_function(): pass" > my-deps/custom_module/helpers.py
cd my-deps
zip -r ../my-deps.zip .
cd ..

Push the package to S3:

aws s3 cp my-deps.zip s3://your-artifact-bucket/dep-packages/my-deps.zip

Associating with a Deployment

In the Dagy system, deployments group dependency packages. Define a deployment:

from dagy_api.persistence.models import DeploymentModel

DeploymentModel(
    deployment_name="production",
    environment="production",
    dep_package_slugs=["my-deps"],  # References the package by slug
).save()

The slug is a unique identifier for the dependency package. It should be registered separately:

from dagy_api.persistence.models import DepPackageModel

DepPackageModel(
    slug="my-deps",
    package_s3_uri="s3://your-artifact-bucket/dep-packages/my-deps.zip",
    org_id="acme-corp",
    version="1.0.0",
).save()

Using Dependency Packages in a Launch

When launching a DAG, specify the deployment_name:

result = launcher.launch(
    flow_name="my_pipeline",
    org_id="acme-corp",
    deployment_name="production",  # Pulls in dep packages from this deployment
    parameters={"data": "s3://bucket/input.csv"}
)

The launcher resolves the deployment, fetches the dep_package_slugs, and maps them to S3 URIs. These URIs are passed to the ECS worker via DAGY_DEP_PACKAGE_URIS. The worker downloads, extracts, and installs each package before executing the DAG.

Packages are installed to a site-packages directory and added to sys.path, making them importable from your flow code.

Setting Up Scheduled DAG Execution

For recurring workloads, use EventBridge to trigger DAG launches on a schedule.

EventBridge Rule and Lambda Target

The CDK stack can provision scheduled rules. Alternatively, create a rule manually:

aws events put-rule \
  --name dagy-etl-daily \
  --schedule-expression "cron(0 9 * * ? *)" \
  --state ENABLED \
  --description "Run ETL pipeline daily at 9 AM UTC"

Create or update the Lambda target:

aws events put-targets \
  --rule dagy-etl-daily \
  --targets "Id"="1","Arn"="arn:aws:lambda:us-east-1:123456789:function:dag-launcher","RoleArn"="arn:aws:iam::123456789:role/EventBridgeInvokeRole"

Configure the Lambda input to pass the launch request:

aws events put-targets \
  --rule dagy-etl-daily \
  --targets "Id"="1","Arn"="arn:aws:lambda:us-east-1:123456789:function:dag-launcher","Input"='{"action":"launch","flow_name":"etl_pipeline","org_id":"acme-corp","environment":"production"}'

At the scheduled time, EventBridge invokes the Lambda, which invokes DagLauncher. The DAG execution begins on ECS.

Cron Expression Format

EventBridge uses Unix cron expressions with six fields (minutes, hours, day of month, month, day of week, year):

  • 0 9 * * ? * – Every day at 9 AM UTC
  • 0 */6 * * ? * – Every 6 hours
  • 0 0 ? * MON-FRI * – Weekdays at midnight
  • 0 0 1 * ? * – First day of every month at midnight

For more complex patterns, use rate expressions:

  • rate(1 hour) – Every hour
  • rate(30 minutes) – Every 30 minutes
  • rate(5 days) – Every 5 days

API-Driven DAG Launch Examples

For external systems (web dashboards, REST APIs, webhooks), expose DAG launch as an HTTP endpoint.

API Gateway and Lambda Integration

Create an API Gateway HTTP API:

aws apigatewayv2 create-api --name dagy-flows --protocol-type HTTP --target <lambda-arn>

The Lambda handler receives the HTTP request:

def api_handler(event, context):
    """Handle HTTP POST to /flows/{flowName}/runs"""
    flow_name = event.get("pathParameters", {}).get("flowName", "")
    org_id = event.get("queryStringParameters", {}).get("org_id", "")
    body = json.loads(event.get("body", "{}"))
    parameters = body.get("parameters", {})

    launcher = DagLauncher()
    result = launcher.launch(
        flow_name=flow_name,
        org_id=org_id,
        parameters=parameters,
        environment="production",
        trigger_source="api",
        triggered_by=event.get("requestContext", {}).get("http", {}).get("sourceIp"),
    )

    return {
        "statusCode": 200 if result.success else 500,
        "body": json.dumps({
            "run_id": result.run_id,
            "task_arn": result.task_arn,
            "success": result.success,
        }),
    }

Example request:

curl -X POST https://api.example.com/flows/etl_pipeline/runs \
  -H "Content-Type: application/json" \
  -d '{
    "org_id": "acme-corp",
    "parameters": {
      "date": "2024-03-14",
      "source": "s3://bucket/input.csv"
    }
  }'

Response:

{
  "run_id": "550e8400-e29b-41d4-a716-446655440000",
  "task_arn": "arn:aws:ecs:us-east-1:123456789:task/dagy/...",
  "success": true
}

Polling for Run Status

After receiving a run_id, the client can poll for status:

def get_run_handler(event, context):
    run_id = event.get("pathParameters", {}).get("runId", "")

    launcher = DagLauncher()
    status = launcher.get_run_status(run_id)

    if not status:
        return {"statusCode": 404, "body": json.dumps({"error": "Run not found"})}

    return {
        "statusCode": 200,
        "body": json.dumps({
            "run_id": status["run_id"],
            "flow_name": status["flow_name"],
            "status": status["status"],
            "started_at": status["started_at"],
            "completed_at": status["completed_at"],
            "error_message": status["error_message"],
        }),
    }

The status field can be QUEUED, RUNNING, SUCCEEDED, FAILED, or CANCELLED.

Adding New DAGs or Execution Profiles

Packaging a New DAG

To create a new DAG, use the Dagy SDK:

from dagy import DAG, task

@task
def extract(source_path):
    import pandas as pd
    return pd.read_csv(source_path)

@task
def transform(data):
    return data.assign(processed=True)

@task
def load(data, destination):
    data.to_parquet(destination)

with DAG("etl_pipeline", version="1.0.0") as dag:
    data = extract(source_path="{{ parameters.source }}")
    transformed = transform(data=data)
    load(transformed, destination="{{ parameters.destination }}")

Save this as flows/etl_pipeline.py. Build the DAG:

dagy build flows/etl_pipeline.py --output artifact.zip --version 1.0.0

This creates an artifact.zip containing flow_spec.json, metadata.json, and your Python source.

Uploading the Artifact

Push the artifact to S3:

aws s3 cp artifact.zip s3://your-artifact-bucket/flows/etl_pipeline/1.0.0/artifact.zip

Registering the Flow

Create a flow record in DynamoDB:

from dagy_api.persistence.models import FlowModel
import json

flow_spec = json.loads(Path("artifact/flow_spec.json").read_text())

FlowModel(
    flow_name="etl_pipeline",
    flow_version="1.0.0",
    org_id="acme-corp",
    spec=json.dumps(flow_spec),
    artifact_s3_uri="s3://your-artifact-bucket/flows/etl_pipeline/1.0.0/artifact.zip",
).save()

Now you can launch:

launcher.launch(
    flow_name="etl_pipeline",
    flow_version="1.0.0",
    org_id="acme-corp",
)

Creating Execution Profiles

Execution profiles combine a flow version with specific resource and deployment settings. While Dagy doesn't have a built-in "profile" model, you can implement this in your application:

EXECUTION_PROFILES = {
    "etl_pipeline:quick": {
        "flow_name": "etl_pipeline",
        "flow_version": "1.0.0",
        "workload_profile": "medium",
        "deployment_name": "production",
    },
    "etl_pipeline:large_dataset": {
        "flow_name": "etl_pipeline",
        "flow_version": "1.0.0",
        "workload_profile": "xlarge",
        "deployment_name": "production",
    },
    "etl_pipeline:dev": {
        "flow_name": "etl_pipeline",
        "flow_version": "dev-branch",
        "workload_profile": "small",
        "deployment_name": "develop",
    },
}

# Launch using a profile
profile = EXECUTION_PROFILES["etl_pipeline:large_dataset"]
result = launcher.launch(
    org_id="acme-corp",
    environment="production",
    **profile,
)

Environment-Specific Configuration

Different environments (develop, staging, production) often need different resource allocations and deployments.

Multi-Environment Setup

Define environment-specific defaults:

ENVIRONMENT_CONFIG = {
    "develop": {
        "subnets": ["subnet-dev-1", "subnet-dev-2"],
        "security_groups": ["sg-dev"],
        "workload_profile": "small",
        "deployment_name": "develop",
    },
    "staging": {
        "subnets": ["subnet-staging-1", "subnet-staging-2"],
        "security_groups": ["sg-staging"],
        "workload_profile": "medium",
        "deployment_name": "staging",
    },
    "production": {
        "subnets": ["subnet-prod-1", "subnet-prod-2", "subnet-prod-3"],
        "security_groups": ["sg-prod"],
        "workload_profile": "large",
        "deployment_name": "production",
    },
}

def launch_for_environment(flow_name, environment, **overrides):
    launcher = DagLauncher()
    config = ENVIRONMENT_CONFIG[environment]
    return launcher.launch(
        flow_name=flow_name,
        environment=environment,
        org_id="acme-corp",
        **{**config, **overrides},  # Overrides can customize per-launch
    )

Parameter Templating

Use Dagy's parameter templating to inject environment-specific values:

result = launcher.launch(
    flow_name="etl_pipeline",
    org_id="acme-corp",
    environment="production",
    parameters={
        "output_bucket": f"s3://dagy-outputs-{environment}",
        "temp_bucket": f"s3://dagy-temp-{environment}",
    },
)

Your DAG can reference these parameters:

@task
def extract():
    import os
    output_bucket = os.environ.get("OUTPUT_BUCKET")  # From parameters
    ...

Secret Management for DAG Runtime

Sensitive information (database passwords, API keys, tokens) should never be stored in flow definitions or parameters. Dagy supports injecting secrets from AWS Secrets Manager.

Creating a Secret

Store a secret in Secrets Manager:

aws secretsmanager create-secret \
  --name prod/db-password \
  --secret-string "my-secure-password"

aws secretsmanager create-secret \
  --name prod/api-key \
  --secret-string "sk-1234567890"

Injecting Secrets at Launch

The launcher accepts secrets_env mapping (not yet exposed in the example, but can be added):

# Note: This requires extending DagLauncher and EcsBackend
# to support secrets_env parameter
result = launcher.launch(
    flow_name="api_pipeline",
    org_id="acme-corp",
    secrets_env={
        "DB_PASSWORD": "arn:aws:secretsmanager:us-east-1:123456789:secret:prod/db-password",
        "API_KEY": "arn:aws:secretsmanager:us-east-1:123456789:secret:prod/api-key",
    },
)

The ECS task execution role must have secretsmanager:GetSecretValue permission on these ARNs. ECS injects the secret values as environment variables in the container, and your code reads them:

@task
def connect_database():
    import os
    db_password = os.environ["DB_PASSWORD"]  # Injected by ECS
    # Use password to connect

Secrets are never logged or stored in task definitions, only referenced via ARN.

Monitoring and Observability

Viewing Logs

All ECS worker logs are written to CloudWatch Logs in JSON format. View logs for a specific run:

aws logs get-log-events \
  --log-group-name /ecs/dagy-worker \
  --log-stream-name <run_id> \
  --start-time $(date -d '5 minutes ago' +%s)000 \
  --query 'events[].message' \
  --output text

Search for specific events:

aws logs start-query \
  --log-group-name /ecs/dagy-worker \
  --start-time $(date -d '1 hour ago' +%s) \
  --end-time $(date +%s) \
  --query-string 'fields @timestamp, run_id, event_type | filter event_type = "task_failed"'

Tracking Runs

Query DynamoDB for run status:

from dagy_api.persistence.models import RunModel

run = RunModel.get(run_id)
print(f"Status: {run.status}")
print(f"Started: {run.started_at}")
print(f"Completed: {run.completed_at}")
print(f"Error: {run.error_message}")

Accessing Exception Traces

When a DAG fails, the full exception traceback is uploaded to S3:

aws s3 cp s3://your-traces-bucket/exceptions/<org_id>/<run_id>/traceback.txt .
cat traceback.txt

This helps with debugging failures without needing to access CloudWatch Logs.


This guide covers the most common patterns for launching and managing DAG executions on ECS Fargate. For advanced scenarios (custom metrics, VPC configuration, multi-region), refer to the architecture document and CDK stack configuration.