Dagy Integration Guide
This guide covers integrating Dagy with popular services and platforms. Each integration includes prerequisites, configuration steps, code examples, and best practices.
Table of Contents
- Amazon S3
- Apache Kafka
- PostgreSQL / MySQL
- MongoDB
- Redis
- Elasticsearch / OpenSearch
- Slack
- Email (SES / SMTP)
- REST APIs
- Snowflake / BigQuery
- Docker / Custom Containers
- CI/CD Systems (GitHub Actions)
Amazon S3
Amazon S3 is a scalable object storage service. Use S3 in Dagy to read/write data, trigger flows on new files, and archive results.
Prerequisites
pip install boto3
Configuration
Store AWS credentials in Dagy secrets:
dagy secret set AWS_ACCESS_KEY_ID "your-access-key"
dagy secret set AWS_SECRET_ACCESS_KEY "your-secret-key"
dagy secret set AWS_REGION "us-east-1"
Or use IAM roles (recommended for ECS/Lambda backends):
# dagy.yml
backend: ecs
ecs_config:
task_role_arn: "arn:aws:iam::ACCOUNT:role/DagyECSTaskRole"
Reading Files from S3
from dagy import flow, task
import boto3
from botocore.exceptions import ClientError
@task
def read_s3_file(bucket: str, key: str) -> str:
"""Read a file from S3."""
try:
s3_client = boto3.client('s3')
response = s3_client.get_object(Bucket=bucket, Key=key)
content = response['Body'].read().decode('utf-8')
return content
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
raise FileNotFoundError(f"File not found: s3://{bucket}/{key}")
raise
@task
def read_s3_json(bucket: str, key: str) -> dict:
"""Read JSON file from S3."""
import json
s3_client = boto3.client('s3')
response = s3_client.get_object(Bucket=bucket, Key=key)
return json.loads(response['Body'].read())
@task
def read_s3_parquet(bucket: str, key: str):
"""Read Parquet file from S3."""
import pandas as pd
s3_path = f"s3://{bucket}/{key}"
return pd.read_parquet(s3_path)
@flow
def read_flow():
text_data = read_s3_file("my-bucket", "data.txt")
json_data = read_s3_json("my-bucket", "config.json")
df = read_s3_parquet("my-bucket", "data.parquet")
return text_data, json_data, df
Writing Results to S3
from dagy import flow, task
import boto3
import json
@task
def write_s3_file(bucket: str, key: str, content: str) -> str:
"""Write content to S3."""
s3_client = boto3.client('s3')
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=content.encode('utf-8')
)
return f"s3://{bucket}/{key}"
@task
def write_s3_json(bucket: str, key: str, data: dict) -> str:
"""Write JSON to S3."""
s3_client = boto3.client('s3')
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=json.dumps(data),
ContentType='application/json'
)
return f"s3://{bucket}/{key}"
@task
def write_s3_parquet(bucket: str, key: str, df) -> str:
"""Write Pandas DataFrame to Parquet in S3."""
import pandas as pd
s3_path = f"s3://{bucket}/{key}"
df.to_parquet(s3_path)
return s3_path
@flow
def write_flow():
data = {"status": "success", "records": 100}
write_s3_json("my-bucket", "results.json", data)
write_s3_file("my-bucket", "summary.txt", "Processing complete")
S3 Event Sensor (Trigger on New Files)
from dagy import flow, task, SensorTask
from dagy.sensors import S3Sensor
import time
class S3FileSensor(SensorTask):
"""Sensor that watches S3 for new files."""
def __init__(self, bucket: str, prefix: str, poll_interval: int = 60):
self.bucket = bucket
self.prefix = prefix
self.poll_interval = poll_interval
self.last_modified = {}
def poll(self) -> dict:
"""Check for new files in S3."""
import boto3
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=self.bucket, Prefix=self.prefix)
if 'Contents' not in response:
return None
for obj in response['Contents']:
key = obj['Key']
last_modified = obj['LastModified'].timestamp()
# If we haven't seen this file or it's been updated
if key not in self.last_modified or self.last_modified[key] < last_modified:
self.last_modified[key] = last_modified
return {"bucket": self.bucket, "key": key, "size": obj['Size']}
return None
@task
def process_s3_file(file_info: dict) -> str:
"""Process file triggered by S3 sensor."""
bucket = file_info['bucket']
key = file_info['key']
print(f"Processing {bucket}/{key} (size: {file_info['size']} bytes)")
import boto3
s3 = boto3.client('s3')
obj = s3.get_object(Bucket=bucket, Key=key)
content = obj['Body'].read().decode('utf-8')
return f"Processed {len(content)} bytes"
@flow
def s3_event_flow():
"""Triggered when new file appears in S3."""
sensor = S3FileSensor(bucket="my-bucket", prefix="incoming/")
file_info = sensor.poll()
if file_info:
result = process_s3_file(file_info)
return result
S3 File Processing Pipeline (Complete Example)
from dagy import flow, task
import boto3
import json
@task
def read_raw_data(bucket: str, key: str) -> dict:
"""Read raw data from S3."""
s3 = boto3.client('s3')
obj = s3.get_object(Bucket=bucket, Key=key)
return json.loads(obj['Body'].read())
@task
def transform_data(data: dict) -> dict:
"""Transform the data."""
return {
"processed": True,
"record_count": len(data.get("records", [])),
"timestamp": data.get("timestamp"),
"status": "transformed"
}
@task
def validate_data(data: dict) -> bool:
"""Validate transformed data."""
required_fields = ["processed", "record_count", "status"]
return all(field in data for field in required_fields)
@task
def write_results(bucket: str, key: str, data: dict) -> str:
"""Write results back to S3."""
s3 = boto3.client('s3')
s3.put_object(
Bucket=bucket,
Key=key,
Body=json.dumps(data),
ContentType='application/json'
)
return f"Written to s3://{bucket}/{key}"
@flow
def s3_pipeline():
"""Complete S3 processing pipeline."""
raw_data = read_raw_data("input-bucket", "raw/data.json")
transformed = transform_data(raw_data)
is_valid = validate_data(transformed)
if is_valid:
result = write_results("output-bucket", "processed/data.json", transformed)
return {"status": "success", "result": result}
else:
return {"status": "validation_failed"}
Best Practices & Gotchas
- Connection pooling: Boto3 automatically pools connections. Reuse the same client instance across tasks when possible.
- Credentials: Use IAM roles when running on ECS/Lambda, avoid hardcoding credentials.
- Large files: For files >100MB, use S3 transfer manager for multipart uploads:
from boto3.s3.transfer import S3Transfer s3_transfer = S3Transfer(s3_client) s3_transfer.upload_file('local_file.txt', 'bucket', 'key') - Error handling: Always catch
ClientErrorand handle specific error codes. - Pagination: For listing large buckets, use
paginator:paginator = s3.get_paginator('list_objects_v2') for page in paginator.paginate(Bucket='bucket', Prefix='prefix/'): # process page
Apache Kafka
Apache Kafka is a distributed event streaming platform. Use Kafka in Dagy to consume messages, process streams, and produce results.
Prerequisites
pip install aiokafka
# OR
pip install confluent-kafka
Configuration
Store Kafka credentials in Dagy secrets:
dagy secret set KAFKA_BOOTSTRAP_SERVERS "broker1:9092,broker2:9092"
dagy secret set KAFKA_USERNAME "dagy-user"
dagy secret set KAFKA_PASSWORD "secure-password"
Consuming Messages from Kafka
from dagy import flow, task
import asyncio
@task
async def consume_kafka_messages(
topic: str,
group_id: str,
max_messages: int = 10
) -> list:
"""Consume messages from Kafka topic."""
from aiokafka import AIOKafkaConsumer
import os
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
group_id=group_id,
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
sasl_plain_username=os.getenv('KAFKA_USERNAME'),
sasl_plain_password=os.getenv('KAFKA_PASSWORD'),
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
messages = []
try:
await consumer.start()
async for msg in consumer:
messages.append(msg.value)
if len(messages) >= max_messages:
break
finally:
await consumer.stop()
return messages
@task
async def consume_kafka_stream(
topic: str,
group_id: str,
duration_seconds: int = 30
) -> list:
"""Consume Kafka messages for a duration."""
from aiokafka import AIOKafkaConsumer
import os
import time
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
group_id=group_id,
auto_offset_reset='latest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
messages = []
start_time = time.time()
try:
await consumer.start()
async for msg in consumer:
messages.append(msg.value)
if time.time() - start_time > duration_seconds:
break
finally:
await consumer.stop()
return messages
@flow
async def kafka_consume_flow():
"""Consume Kafka messages."""
messages = await consume_kafka_messages(
topic="events",
group_id="dagy-consumer"
)
return messages
Producing Messages to Kafka
from dagy import flow, task
import json
@task
async def produce_kafka_message(
topic: str,
message: dict,
key: str = None
) -> str:
"""Produce a single message to Kafka."""
from aiokafka import AIOKafkaProducer
import os
producer = AIOKafkaProducer(
bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
sasl_plain_username=os.getenv('KAFKA_USERNAME'),
sasl_plain_password=os.getenv('KAFKA_PASSWORD'),
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
try:
await producer.start()
partition, offset = await producer.send_and_wait(
topic,
value=message,
key=key.encode('utf-8') if key else None
)
return f"Message sent to partition {partition} at offset {offset}"
finally:
await producer.stop()
@task
async def batch_produce_kafka(topic: str, messages: list) -> str:
"""Produce multiple messages to Kafka."""
from aiokafka import AIOKafkaProducer
import os
producer = AIOKafkaProducer(
bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
try:
await producer.start()
for msg in messages:
await producer.send_and_wait(topic, value=msg)
return f"Produced {len(messages)} messages"
finally:
await producer.stop()
@flow
async def kafka_produce_flow():
"""Produce messages to Kafka."""
message = {
"event": "user_signup",
"user_id": "12345",
"timestamp": "2024-01-15T10:30:00Z"
}
result = await produce_kafka_message("events", message, key="user_12345")
return result
Kafka Stream Processing Pattern
from dagy import flow, task
import json
@task
async def process_kafka_stream(
input_topic: str,
output_topic: str,
group_id: str,
max_messages: int = 100
) -> dict:
"""Consume, process, and produce Kafka messages."""
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import os
consumer = AIOKafkaConsumer(
input_topic,
bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
group_id=group_id,
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
producer = AIOKafkaProducer(
bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
processed_count = 0
try:
await consumer.start()
await producer.start()
async for msg in consumer:
# Process the message
processed_msg = {
"original": msg.value,
"processed": True,
"processing_timestamp": __import__('datetime').datetime.utcnow().isoformat()
}
# Produce result
await producer.send_and_wait(output_topic, value=processed_msg)
processed_count += 1
if processed_count >= max_messages:
break
finally:
await consumer.stop()
await producer.stop()
return {"processed": processed_count, "status": "success"}
@flow
async def kafka_stream_flow():
"""Stream processing pipeline."""
result = await process_kafka_stream(
input_topic="raw_events",
output_topic="processed_events",
group_id="stream-processor"
)
return result
Kafka Configuration via Dagy Secrets
from dagy import flow, task
import os
@task
def get_kafka_config() -> dict:
"""Get Kafka configuration from secrets."""
return {
"bootstrap_servers": os.getenv('KAFKA_BOOTSTRAP_SERVERS'),
"username": os.getenv('KAFKA_USERNAME'),
"password": os.getenv('KAFKA_PASSWORD'),
"security_protocol": "SASL_SSL",
"sasl_mechanism": "PLAIN"
}
@flow
def kafka_config_flow():
"""Example showing config usage."""
config = get_kafka_config()
return config
Best Practices & Gotchas
- Consumer groups: Always use consumer groups for coordinated consumption and offset management.
- Error handling: Handle
KafkaErrorandGroupCoordinatorNotAvailableError:from aiokafka.errors import KafkaError try: await consumer.start() except KafkaError as e: print(f"Kafka error: {e}") - Async/await: Kafka operations are async. Use Step Functions backend for better async support.
- Partitioning: For parallelism, use multiple consumer instances on different partitions.
- Performance tuning: Adjust
batch_size,linger_ms, andacksfor throughput vs. durability trade-offs.
PostgreSQL / MySQL
Relational databases are ideal for structured data. Use PostgreSQL or MySQL in Dagy for ETL operations.
Prerequisites
pip install psycopg2-binary sqlalchemy
# OR for MySQL
pip install mysql-connector-python sqlalchemy
Configuration
Store database credentials in Dagy secrets:
dagy secret set DB_HOST "postgres.example.com"
dagy secret set DB_PORT "5432"
dagy secret set DB_NAME "dagy_db"
dagy secret set DB_USER "dagy_user"
dagy secret set DB_PASSWORD "secure-password"
Database Connections
from dagy import flow, task
import psycopg2
from psycopg2.extras import RealDictCursor
import os
def get_db_connection():
"""Get database connection."""
return psycopg2.connect(
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT', '5432'),
database=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD')
)
@task
def query_database(sql: str, params: tuple = None) -> list:
"""Execute SELECT query."""
conn = get_db_connection()
try:
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(sql, params or ())
results = cursor.fetchall()
cursor.close()
return [dict(row) for row in results]
finally:
conn.close()
@task
def execute_update(sql: str, params: tuple = None) -> int:
"""Execute INSERT/UPDATE/DELETE."""
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute(sql, params or ())
rows_affected = cursor.rowcount
conn.commit()
cursor.close()
return rows_affected
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
@task
def batch_insert(table: str, rows: list, columns: list) -> int:
"""Batch insert multiple rows."""
conn = get_db_connection()
try:
cursor = conn.cursor()
placeholders = ",".join(["%s"] * len(columns))
col_names = ",".join(columns)
sql = f"INSERT INTO {table} ({col_names}) VALUES ({placeholders})"
for row in rows:
cursor.execute(sql, tuple(row[col] for col in columns))
conn.commit()
return cursor.rowcount
finally:
cursor.close()
conn.close()
@flow
def query_flow():
"""Query database."""
results = query_database(
"SELECT * FROM users WHERE status = %s",
('active',)
)
return results
Connection Pooling with SQLAlchemy
from dagy import flow, task
from sqlalchemy import create_engine, text
import os
# Create global connection pool
def get_engine():
"""Get SQLAlchemy engine with connection pool."""
connection_string = (
f"postgresql+psycopg2://{os.getenv('DB_USER')}:"
f"{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:"
f"{os.getenv('DB_PORT', '5432')}/{os.getenv('DB_NAME')}"
)
return create_engine(
connection_string,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # Verify connection before using
echo=False
)
@task
def query_with_pool(sql: str) -> list:
"""Execute query using connection pool."""
engine = get_engine()
with engine.connect() as conn:
result = conn.execute(text(sql))
return [dict(row._mapping) for row in result]
@task
def insert_with_pool(table: str, data: dict) -> int:
"""Insert using connection pool."""
engine = get_engine()
with engine.connect() as conn:
cols = ",".join(data.keys())
vals = ",".join(["%s"] * len(data))
sql = f"INSERT INTO {table} ({cols}) VALUES ({vals})"
result = conn.execute(text(sql), list(data.values()))
conn.commit()
return result.rowcount
@flow
def pool_flow():
"""Flow with connection pooling."""
results = query_with_pool("SELECT COUNT(*) as cnt FROM users")
return results
ETL Pattern: Extract → Transform → Load
from dagy import flow, task
@task
def extract_data(source_query: str) -> list:
"""Extract data from source database."""
conn = psycopg2.connect(
host=os.getenv('DB_HOST'),
database=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD')
)
try:
cursor = conn.cursor()
cursor.execute(source_query)
results = cursor.fetchall()
cursor.close()
return results
finally:
conn.close()
@task
def transform_records(raw_records: list) -> list:
"""Transform extracted data."""
import datetime
transformed = []
for record in raw_records:
transformed.append({
"id": record[0],
"name": record[1].upper(),
"processed_at": datetime.datetime.utcnow().isoformat(),
"status": "transformed"
})
return transformed
@task
def load_data(table: str, records: list) -> str:
"""Load transformed data into target."""
conn = psycopg2.connect(
host=os.getenv('DB_HOST'),
database=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD')
)
try:
cursor = conn.cursor()
for record in records:
sql = f"""
INSERT INTO {table} (id, name, processed_at, status)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(sql, (
record['id'],
record['name'],
record['processed_at'],
record['status']
))
conn.commit()
return f"Loaded {len(records)} records"
finally:
cursor.close()
conn.close()
@flow
def etl_flow():
"""Complete ETL pipeline."""
raw_data = extract_data("SELECT id, name FROM users WHERE active = true")
transformed = transform_records(raw_data)
result = load_data("processed_users", transformed)
return result
Best Practices & Gotchas
- Connection pooling: Always use connection pools in production. Don't create new connections per task.
- Prepared statements: Use parameterized queries to prevent SQL injection:
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) - Transaction handling: Always commit/rollback explicitly:
try: cursor.execute(...) conn.commit() except Exception: conn.rollback() - Timeout handling: Set appropriate timeouts for long-running queries:
conn.set_session(options='-c statement_timeout=30000') - Bulk operations: Use batch operations for better performance with large datasets.
MongoDB
MongoDB is a NoSQL document database. Use it in Dagy for flexible document storage and processing.
Prerequisites
pip install pymongo
Configuration
dagy secret set MONGO_URI "mongodb+srv://user:password@cluster.mongodb.net/dbname"
dagy secret set MONGO_DATABASE "dagy_db"
CRUD Operations
from dagy import flow, task
from pymongo import MongoClient
from bson.objectid import ObjectId
import os
import json
def get_mongo_client():
"""Get MongoDB client."""
return MongoClient(os.getenv('MONGO_URI'))
@task
def insert_document(collection: str, document: dict) -> str:
"""Insert a single document."""
client = get_mongo_client()
try:
db = client[os.getenv('MONGO_DATABASE')]
result = db[collection].insert_one(document)
return str(result.inserted_id)
finally:
client.close()
@task
def insert_many(collection: str, documents: list) -> list:
"""Insert multiple documents."""
client = get_mongo_client()
try:
db = client[os.getenv('MONGO_DATABASE')]
result = db[collection].insert_many(documents)
return [str(id) for id in result.inserted_ids]
finally:
client.close()
@task
def find_document(collection: str, query: dict) -> dict:
"""Find a single document."""
client = get_mongo_client()
try:
db = client[os.getenv('MONGO_DATABASE')]
doc = db[collection].find_one(query)
if doc:
doc['_id'] = str(doc['_id'])
return doc
finally:
client.close()
@task
def find_many(collection: str, query: dict, limit: int = 100) -> list:
"""Find multiple documents."""
client = get_mongo_client()
try:
db = client[os.getenv('MONGO_DATABASE')]
docs = db[collection].find(query).limit(limit)
results = []
for doc in docs:
doc['_id'] = str(doc['_id'])
results.append(doc)
return results
finally:
client.close()
@task
def update_document(collection: str, query: dict, update: dict) -> int:
"""Update a document."""
client = get_mongo_client()
try:
db = client[os.getenv('MONGO_DATABASE')]
result = db[collection].update_one(query, {"$set": update})
return result.modified_count
finally:
client.close()
@task
def delete_document(collection: str, query: dict) -> int:
"""Delete a document."""
client = get_mongo_client()
try:
db = client[os.getenv('MONGO_DATABASE')]
result = db[collection].delete_one(query)
return result.deleted_count
finally:
client.close()
@flow
def mongodb_crud_flow():
"""MongoDB CRUD operations."""
# Insert
doc_id = insert_document("users", {
"name": "Alice",
"email": "alice@example.com",
"status": "active"
})
# Find
user = find_document("users", {"email": "alice@example.com"})
# Update
update_document("users", {"_id": ObjectId(doc_id)}, {"status": "inactive"})
# Find updated
updated_user = find_document("users", {"email": "alice@example.com"})
return updated_user
MongoDB Aggregation Pipeline
from dagy import flow, task
from pymongo import MongoClient
import os
@task
def aggregate_data(collection: str, pipeline: list) -> list:
"""Run MongoDB aggregation pipeline."""
client = MongoClient(os.getenv('MONGO_URI'))
try:
db = client[os.getenv('MONGO_DATABASE')]
results = list(db[collection].aggregate(pipeline))
for doc in results:
if '_id' in doc and isinstance(doc['_id'], dict):
doc['_id'] = str(doc['_id'])
return results
finally:
client.close()
@task
def group_and_sum(collection: str, group_field: str, sum_field: str) -> list:
"""Group and sum data."""
pipeline = [
{
"$group": {
"_id": f"${group_field}",
"total": {"$sum": f"${sum_field}"},
"count": {"$sum": 1}
}
},
{"$sort": {"total": -1}}
]
return aggregate_data(collection, pipeline)
@task
def match_and_project(collection: str, match: dict, projection: dict) -> list:
"""Match documents and project fields."""
pipeline = [
{"$match": match},
{"$project": projection},
{"$limit": 100}
]
return aggregate_data(collection, pipeline)
@flow
def mongodb_aggregation_flow():
"""Aggregation pipeline example."""
# Example: group orders by user and sum amounts
pipeline = [
{"$match": {"status": "completed"}},
{
"$group": {
"_id": "$user_id",
"total_spent": {"$sum": "$amount"},
"order_count": {"$sum": 1}
}
},
{"$sort": {"total_spent": -1}},
{"$limit": 10}
]
results = aggregate_data("orders", pipeline)
return results
MongoDB Data Processing Flow
from dagy import flow, task
@task
def load_user_data(query: dict) -> list:
"""Load user data."""
return find_many("users", query, limit=1000)
@task
def enrich_user_data(users: list) -> list:
"""Enrich user documents with computed fields."""
import datetime
enriched = []
for user in users:
user['enriched_at'] = datetime.datetime.utcnow().isoformat()
user['name_upper'] = user.get('name', '').upper()
user['account_age_days'] = (
datetime.datetime.utcnow() -
datetime.datetime.fromisoformat(user.get('created_at', '2024-01-01T00:00:00'))
).days
enriched.append(user)
return enriched
@task
def save_enriched_data(users: list) -> str:
"""Save enriched data to separate collection."""
client = MongoClient(os.getenv('MONGO_URI'))
try:
db = client[os.getenv('MONGO_DATABASE')]
result = db['users_enriched'].insert_many(users)
return f"Saved {len(result.inserted_ids)} enriched documents"
finally:
client.close()
@flow
def mongodb_processing_flow():
"""End-to-end MongoDB processing."""
users = load_user_data({"status": "active"})
enriched = enrich_user_data(users)
result = save_enriched_data(enriched)
return result
Best Practices & Gotchas
- Indexes: Create indexes on frequently queried fields for better performance.
- Bulk operations: Use
bulk_write()for better performance with large datasets:from pymongo import UpdateOne db.collection.bulk_write([UpdateOne({...}, {"$set": {...}}) for ...]) - Connection pooling: MongoClient automatically pools connections.
- Error handling: Handle
DuplicateKeyErrorandOperationFailure:from pymongo.errors import DuplicateKeyError - Document validation: Validate schema before inserting to catch issues early.
Redis
Redis is an in-memory data store. Use it in Dagy for caching, distributed locking, and event-driven triggers.
Prerequisites
pip install redis
Configuration
dagy secret set REDIS_HOST "redis.example.com"
dagy secret set REDIS_PORT "6379"
dagy secret set REDIS_DB "0"
dagy secret set REDIS_PASSWORD "optional-password"
Caching Intermediate Results
from dagy import flow, task
import redis
import json
import os
def get_redis_client():
"""Get Redis client."""
return redis.Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6379)),
db=int(os.getenv('REDIS_DB', 0)),
password=os.getenv('REDIS_PASSWORD'),
decode_responses=True
)
@task
def cache_set(key: str, value: dict, ttl: int = 3600) -> bool:
"""Set value in cache."""
client = get_redis_client()
return client.setex(key, ttl, json.dumps(value))
@task
def cache_get(key: str):
"""Get value from cache."""
client = get_redis_client()
value = client.get(key)
if value:
return json.loads(value)
return None
@task
def expensive_computation(data_id: str) -> dict:
"""Simulate expensive computation with caching."""
cache_key = f"computation:{data_id}"
# Check cache
cached = cache_get(cache_key)
if cached:
return cached
# Do work
import time
time.sleep(2)
result = {
"data_id": data_id,
"result": "computed",
"timestamp": __import__('datetime').datetime.utcnow().isoformat()
}
# Cache result
cache_set(cache_key, result, ttl=7200)
return result
@flow
def cache_accelerated_flow():
"""Pipeline with caching."""
result1 = expensive_computation("item1")
result2 = expensive_computation("item1") # Uses cache
return [result1, result2]
Distributed Locking for Idempotent Tasks
from dagy import flow, task
import redis
import time
import os
@task
def distributed_lock_task(lock_key: str, task_id: str, duration: int = 300) -> str:
"""Execute task with distributed lock."""
client = get_redis_client()
# Try to acquire lock
acquired = client.set(lock_key, task_id, nx=True, ex=duration)
if not acquired:
# Lock already held by another task
current_holder = client.get(lock_key)
return f"Lock held by {current_holder}"
try:
# Do work while holding lock
print(f"Lock acquired for {lock_key}")
time.sleep(2)
return f"Work completed for {lock_key}"
finally:
# Release lock
if client.get(lock_key) == task_id:
client.delete(lock_key)
@task
def idempotent_operation(operation_id: str) -> str:
"""Idempotent operation with lock."""
lock_key = f"operation:{operation_id}"
return distributed_lock_task(lock_key, operation_id, duration=60)
@flow
def idempotent_flow():
"""Flow with idempotent locking."""
result = idempotent_operation("op123")
return result
Redis Pub/Sub for Event-Driven Triggers
from dagy import flow, task
import redis
import json
import os
import threading
@task
def publish_event(channel: str, event: dict) -> int:
"""Publish event to Redis channel."""
client = get_redis_client()
subscribers = client.publish(channel, json.dumps(event))
return subscribers
@task
def subscribe_and_process(channel: str, timeout: int = 30) -> list:
"""Subscribe to channel and process events."""
client = redis.Redis(
host=os.getenv('REDIS_HOST', 'localhost'),
port=int(os.getenv('REDIS_PORT', 6379)),
decode_responses=True
)
pubsub = client.pubsub()
pubsub.subscribe(channel)
events = []
start_time = __import__('time').time()
for message in pubsub.listen():
if message['type'] == 'message':
events.append(json.loads(message['data']))
if __import__('time').time() - start_time > timeout:
break
pubsub.close()
return events
@task
def event_processor(events: list) -> dict:
"""Process events."""
return {
"event_count": len(events),
"events": events,
"processed_at": __import__('datetime').datetime.utcnow().isoformat()
}
@flow
def pubsub_flow():
"""Publish event and process."""
event = {
"type": "user_action",
"user_id": "123",
"action": "login",
"timestamp": __import__('datetime').datetime.utcnow().isoformat()
}
subs = publish_event("user_events", event)
return f"Published to {subs} subscribers"
Best Practices & Gotchas
- Key naming: Use structured key names like
entity:id:fieldfor clarity. - TTL: Always set TTL on keys to prevent unbounded memory growth.
- Serialization: Use JSON for complex data, be aware of serialization overhead.
- Cluster mode: In cluster mode, use
redis.cluster.RedisClusterinstead ofredis.Redis. - Connection pooling: RedisClient handles connection pooling automatically.
Elasticsearch / OpenSearch
Elasticsearch and OpenSearch are search and analytics engines. Use them in Dagy for full-text search, logging, and analytics.
Prerequisites
pip install elasticsearch
# OR for OpenSearch
pip install opensearch-py
Configuration
dagy secret set ELASTICSEARCH_HOSTS "https://elasticsearch.example.com:9200"
dagy secret set ELASTICSEARCH_USERNAME "elastic"
dagy secret set ELASTICSEARCH_PASSWORD "password"
Indexing Data
from dagy import flow, task
from elasticsearch import Elasticsearch
import os
import datetime
def get_es_client():
"""Get Elasticsearch client."""
return Elasticsearch(
hosts=[os.getenv('ELASTICSEARCH_HOSTS', 'http://localhost:9200')],
basic_auth=(
os.getenv('ELASTICSEARCH_USERNAME', 'elastic'),
os.getenv('ELASTICSEARCH_PASSWORD', 'changeme')
),
verify_certs=False
)
@task
def index_document(index: str, doc_id: str, document: dict) -> str:
"""Index a single document."""
client = get_es_client()
result = client.index(index=index, id=doc_id, document=document)
return result['_id']
@task
def bulk_index(index: str, documents: list) -> dict:
"""Bulk index documents."""
from elasticsearch.helpers import bulk
client = get_es_client()
actions = [
{
"_index": index,
"_id": doc.get("id", i),
"_source": doc
}
for i, doc in enumerate(documents)
]
success, failed = bulk(client, actions, raise_on_error=False)
return {
"success": success,
"failed": len(failed),
"failures": failed
}
@task
def search_documents(index: str, query: dict, size: int = 10) -> list:
"""Search documents."""
client = get_es_client()
results = client.search(index=index, query=query, size=size)
documents = []
for hit in results['hits']['hits']:
doc = hit['_source']
doc['_id'] = hit['_id']
documents.append(doc)
return documents
@task
def search_text(index: str, text: str, field: str = "content") -> list:
"""Full-text search."""
query = {
"match": {
field: {
"query": text,
"fuzziness": "AUTO"
}
}
}
return search_documents(index, query)
@flow
def elasticsearch_index_flow():
"""Index documents in Elasticsearch."""
documents = [
{
"id": 1,
"title": "Getting Started with Dagy",
"content": "Learn how to use Dagy for orchestration",
"timestamp": datetime.datetime.utcnow().isoformat()
},
{
"id": 2,
"title": "Advanced Dagy Patterns",
"content": "Master advanced patterns with Dagy",
"timestamp": datetime.datetime.utcnow().isoformat()
}
]
result = bulk_index("articles", documents)
return result
Aggregations and Analytics
from dagy import flow, task
@task
def aggregate_by_field(index: str, field: str, size: int = 10) -> list:
"""Aggregate by field (top values)."""
client = get_es_client()
aggs = {
"top_values": {
"terms": {
"field": field,
"size": size
}
}
}
results = client.search(
index=index,
query={"match_all": {}},
aggs=aggs,
size=0
)
buckets = results['aggregations']['top_values']['buckets']
return [
{"key": b['key'], "count": b['doc_count']}
for b in buckets
]
@task
def date_histogram(index: str, date_field: str, interval: str = "1d") -> list:
"""Create date histogram."""
client = get_es_client()
aggs = {
"timeline": {
"date_histogram": {
"field": date_field,
"calendar_interval": interval
}
}
}
results = client.search(
index=index,
query={"match_all": {}},
aggs=aggs,
size=0
)
buckets = results['aggregations']['timeline']['buckets']
return [
{"date": b['key_as_string'], "count": b['doc_count']}
for b in buckets
]
@flow
def analytics_flow():
"""Analytics pipeline."""
top_authors = aggregate_by_field("articles", "author.keyword")
timeline = date_histogram("articles", "timestamp")
return {
"top_authors": top_authors,
"timeline": timeline
}
Log Indexing Pipeline
from dagy import flow, task
import datetime
@task
def parse_logs(log_lines: list) -> list:
"""Parse log lines into structured documents."""
import re
documents = []
log_pattern = r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})\s+(\w+)\s+(.+)'
for line in log_lines:
match = re.match(log_pattern, line)
if match:
timestamp, level, message = match.groups()
documents.append({
"timestamp": timestamp,
"level": level,
"message": message,
"indexed_at": datetime.datetime.utcnow().isoformat()
})
return documents
@task
def index_logs(index: str, documents: list) -> dict:
"""Index parsed logs."""
result = bulk_index(index, documents)
return result
@flow
def log_indexing_flow():
"""Index application logs."""
log_lines = [
"2024-01-15T10:30:00 INFO Application started",
"2024-01-15T10:30:01 DEBUG Loading configuration",
"2024-01-15T10:30:02 ERROR Connection failed"
]
parsed = parse_logs(log_lines)
result = index_logs("application-logs", parsed)
return result
Best Practices & Gotchas
- Index naming: Use time-based index names for log data:
logs-YYYY.MM.DD. - Mapping: Define explicit mappings for consistent field types:
mapping = { "properties": { "timestamp": {"type": "date"}, "level": {"type": "keyword"} } } client.indices.put_mapping(index="logs", body=mapping) - Refresh interval: Adjust
refresh_intervalbased on latency requirements. - Sharding: Plan shard count based on data volume and query patterns.
Slack
Slack is a messaging platform. Use Dagy to send notifications, reports, and interactive messages.
Prerequisites
pip install slack-sdk
Configuration
dagy secret set SLACK_BOT_TOKEN "xoxb-your-token"
dagy secret set SLACK_CHANNEL_ID "C1234567890"
dagy secret set SLACK_WEBHOOK_URL "https://hooks.slack.com/services/..."
Sending Notifications via Slack SDK
from dagy import flow, task
from slack_sdk import WebClient
import os
def get_slack_client():
"""Get Slack client."""
return WebClient(token=os.getenv('SLACK_BOT_TOKEN'))
@task
def send_slack_message(channel: str, text: str) -> str:
"""Send simple text message to Slack."""
client = get_slack_client()
result = client.chat_postMessage(channel=channel, text=text)
return result['ts']
@task
def send_slack_blocks(channel: str, blocks: list) -> str:
"""Send rich message with blocks."""
client = get_slack_client()
result = client.chat_postMessage(channel=channel, blocks=blocks)
return result['ts']
@task
def send_slack_file(channel: str, file_path: str, title: str) -> str:
"""Upload file to Slack."""
client = get_slack_client()
with open(file_path, 'rb') as f:
result = client.files_upload(channels=channel, file=f, title=title)
return result['file']['id']
@flow
def slack_notification_flow():
"""Send Slack notification."""
message = "Processing completed successfully!"
ts = send_slack_message(os.getenv('SLACK_CHANNEL_ID'), message)
return ts
Slack Rich Messages with Blocks
from dagy import flow, task
@task
def send_status_report(channel: str, status: str, count: int) -> str:
"""Send formatted status report."""
client = get_slack_client()
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "Processing Report"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*Status:*\n{status}"
},
{
"type": "mrkdwn",
"text": f"*Records:*\n{count}"
}
]
},
{
"type": "divider"
},
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": f"Generated at {__import__('datetime').datetime.utcnow().isoformat()}"
}
]
}
]
result = send_slack_blocks(channel, blocks)
return result
@task
def send_error_alert(channel: str, error: str) -> str:
"""Send error alert."""
client = get_slack_client()
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": ":x: *Task Failed*"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"```{error}```"
}
}
]
result = send_slack_blocks(channel, blocks)
return result
@flow
def slack_reporting_flow():
"""Slack reporting pipeline."""
report_ts = send_status_report(
os.getenv('SLACK_CHANNEL_ID'),
"Success",
1000
)
return report_ts
Interactive Messages with Buttons
from dagy import flow, task
@task
def send_approval_message(channel: str, request_id: str) -> str:
"""Send message with approval buttons."""
client = get_slack_client()
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Approval Needed*\nRequest: {request_id}"
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "Approve"
},
"value": "approve",
"style": "primary",
"action_id": f"approve_{request_id}"
},
{
"type": "button",
"text": {
"type": "plain_text",
"text": "Deny"
},
"value": "deny",
"style": "danger",
"action_id": f"deny_{request_id}"
}
]
}
]
result = send_slack_blocks(channel, blocks)
return result['ts']
@flow
def approval_workflow():
"""Approval workflow with Slack."""
ts = send_approval_message(os.getenv('SLACK_CHANNEL_ID'), "REQ123")
return ts
Best Practices & Gotchas
- Rate limiting: Slack has rate limits. Implement exponential backoff:
import time retry_count = 0 while retry_count < 3: try: client.chat_postMessage(...) break except Exception as e: if "rate_limited" in str(e): time.sleep(2 ** retry_count) retry_count += 1 - Message updates: Use
chat_update()to edit existing messages instead of sending new ones. - Thread replies: Use
thread_tsparameter to reply in threads:client.chat_postMessage(channel=channel, text="Reply", thread_ts=ts)
Email (SES / SMTP)
Email is useful for notifications and reports. Use AWS SES or SMTP in Dagy to send emails.
Prerequisites
# For AWS SES
pip install boto3
# For SMTP
pip install secure-smtplib # Optional, standard library has smtplib
Configuration
# For AWS SES
dagy secret set AWS_SES_REGION "us-east-1"
# For SMTP
dagy secret set SMTP_HOST "smtp.gmail.com"
dagy secret set SMTP_PORT "587"
dagy secret set SMTP_USERNAME "your-email@gmail.com"
dagy secret set SMTP_PASSWORD "your-app-password"
dagy secret set SMTP_FROM_ADDRESS "your-email@gmail.com"
Sending Email via AWS SES
from dagy import flow, task
import boto3
import os
def get_ses_client():
"""Get SES client."""
return boto3.client('ses', region_name=os.getenv('AWS_SES_REGION', 'us-east-1'))
@task
def send_email_ses(
to_addresses: list,
subject: str,
body_text: str,
body_html: str = None
) -> str:
"""Send email via AWS SES."""
client = get_ses_client()
message = {
'Subject': {'Data': subject},
'Body': {'Text': {'Data': body_text}}
}
if body_html:
message['Body']['Html'] = {'Data': body_html}
response = client.send_email(
Source=os.getenv('SMTP_FROM_ADDRESS'),
Destination={'ToAddresses': to_addresses},
Message=message
)
return response['MessageId']
@task
def send_bulk_email_ses(
recipients: list,
subject: str,
body_text: str
) -> list:
"""Send bulk email via SES."""
client = get_ses_client()
message_ids = []
for recipient in recipients:
response = send_email_ses([recipient], subject, body_text)
message_ids.append(response)
return message_ids
@flow
def email_notification_flow():
"""Send email notification."""
message_id = send_email_ses(
to_addresses=["user@example.com"],
subject="Processing Complete",
body_text="Your data processing has completed successfully."
)
return message_id
Sending Email via SMTP
from dagy import flow, task
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os
@task
def send_email_smtp(
to_addresses: list,
subject: str,
body_text: str,
body_html: str = None
) -> str:
"""Send email via SMTP."""
smtp_server = os.getenv('SMTP_HOST')
smtp_port = int(os.getenv('SMTP_PORT', 587))
username = os.getenv('SMTP_USERNAME')
password = os.getenv('SMTP_PASSWORD')
from_address = os.getenv('SMTP_FROM_ADDRESS')
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = from_address
msg['To'] = ', '.join(to_addresses)
msg.attach(MIMEText(body_text, 'plain'))
if body_html:
msg.attach(MIMEText(body_html, 'html'))
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(username, password)
result = server.sendmail(from_address, to_addresses, msg.as_string())
return f"Sent to {len(to_addresses)} recipients"
@flow
def smtp_email_flow():
"""Send email via SMTP."""
result = send_email_smtp(
to_addresses=["user@example.com"],
subject="Report",
body_text="Here is your report."
)
return result
Email Templates and HTML Content
from dagy import flow, task
@task
def send_html_email(
to_addresses: list,
subject: str,
data: dict
) -> str:
"""Send formatted HTML email."""
html_body = f"""
<html>
<body>
<h1>Processing Report</h1>
<p>Status: <strong>{data['status']}</strong></p>
<p>Records processed: <strong>{data['count']}</strong></p>
<p>Timestamp: {data['timestamp']}</p>
<hr>
<p>Best regards,<br/>Dagy Platform</p>
</body>
</html>
"""
return send_email_smtp(
to_addresses=to_addresses,
subject=subject,
body_text=f"Status: {data['status']}, Count: {data['count']}",
body_html=html_body
)
@task
def send_report_email(
to_addresses: list,
results: dict
) -> str:
"""Send results report."""
import datetime
return send_html_email(
to_addresses=to_addresses,
subject="Dagy Processing Report",
data={
"status": "Success",
"count": results.get("count", 0),
"timestamp": datetime.datetime.utcnow().isoformat()
}
)
@flow
def report_email_flow():
"""Generate and send report."""
results = {"count": 1000, "status": "complete"}
return send_report_email(["admin@example.com"], results)
Best Practices & Gotchas
- Rate limiting: SES and SMTP have rate limits. Implement backoff.
- Attachments: Use
MIMEBasefor file attachments:import base64 from email.mime.base import MIMEBase part = MIMEBase('application', 'octet-stream') part.set_payload(open('file.pdf', 'rb').read()) base64.encodebytes(part.get_payload()) msg.attach(part) - DKIM/SPF: Verify domain in SES to avoid spam folder.
REST APIs
Most services expose REST APIs. Use HTTP clients in Dagy to integrate with any service.
Prerequisites
pip install httpx requests
HTTP Client Patterns
from dagy import flow, task
import httpx
import os
@task
def get_request(url: str, headers: dict = None, params: dict = None) -> dict:
"""Make GET request."""
with httpx.Client() as client:
response = client.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
@task
def post_request(url: str, data: dict, headers: dict = None) -> dict:
"""Make POST request."""
with httpx.Client() as client:
response = client.post(url, json=data, headers=headers)
response.raise_for_status()
return response.json()
@task
def put_request(url: str, data: dict, headers: dict = None) -> dict:
"""Make PUT request."""
with httpx.Client() as client:
response = client.put(url, json=data, headers=headers)
response.raise_for_status()
return response.json()
@task
def delete_request(url: str, headers: dict = None) -> dict:
"""Make DELETE request."""
with httpx.Client() as client:
response = client.delete(url, headers=headers)
response.raise_for_status()
return response.json()
@flow
def api_flow():
"""Basic API calls."""
# GET request
user = get_request("https://api.example.com/users/1")
# POST request
new_user = post_request(
"https://api.example.com/users",
{"name": "Alice", "email": "alice@example.com"}
)
return new_user
Authentication Handling
from dagy import flow, task
import httpx
import os
@task
def api_with_bearer_token(url: str, token: str) -> dict:
"""API call with Bearer token."""
headers = {"Authorization": f"Bearer {token}"}
return get_request(url, headers=headers)
@task
def api_with_api_key(url: str, api_key: str) -> dict:
"""API call with API key."""
headers = {"X-API-Key": api_key}
return get_request(url, headers=headers)
@task
def api_with_basic_auth(url: str, username: str, password: str) -> dict:
"""API call with Basic auth."""
import base64
credentials = base64.b64encode(f"{username}:{password}".encode()).decode()
headers = {"Authorization": f"Basic {credentials}"}
return get_request(url, headers=headers)
@task
def api_with_oauth(
auth_url: str,
client_id: str,
client_secret: str,
api_url: str
) -> dict:
"""API call with OAuth 2.0."""
# Get access token
token_response = post_request(
auth_url,
{
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
)
access_token = token_response['access_token']
# Call API with token
headers = {"Authorization": f"Bearer {access_token}"}
return get_request(api_url, headers=headers)
@flow
def auth_flow():
"""API with authentication."""
token = os.getenv('API_TOKEN')
data = api_with_bearer_token("https://api.example.com/data", token)
return data
Pagination and Rate Limiting
from dagy import flow, task
import httpx
import time
@task
def paginated_request(
base_url: str,
initial_page: int = 1,
page_size: int = 100
) -> list:
"""Fetch all paginated results."""
all_results = []
page = initial_page
with httpx.Client() as client:
while True:
response = client.get(
base_url,
params={"page": page, "per_page": page_size}
)
response.raise_for_status()
data = response.json()
if not data.get('results'):
break
all_results.extend(data['results'])
page += 1
if data.get('has_next') is False:
break
return all_results
@task
def rate_limited_request(
url: str,
max_requests_per_second: int = 5,
max_retries: int = 3
) -> dict:
"""Make request with rate limiting and retries."""
min_interval = 1 / max_requests_per_second
last_request_time = 0
for attempt in range(max_retries):
# Rate limiting
elapsed = time.time() - last_request_time
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
try:
with httpx.Client() as client:
response = client.get(url, timeout=10.0)
response.raise_for_status()
last_request_time = time.time()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 429: # Rate limited
wait_time = 2 ** attempt
print(f"Rate limited, waiting {wait_time}s")
time.sleep(wait_time)
elif attempt < max_retries - 1:
time.sleep(2 ** attempt)
else:
raise
@flow
def pagination_flow():
"""Paginated API requests."""
results = paginated_request("https://api.example.com/items")
return f"Fetched {len(results)} items"
Error Handling and Retries
from dagy import flow, task
import httpx
import time
@task
def resilient_request(
url: str,
max_retries: int = 3,
backoff_factor: float = 2.0
) -> dict:
"""Make request with comprehensive error handling."""
last_exception = None
for attempt in range(max_retries):
try:
with httpx.Client(timeout=30.0) as client:
response = client.get(url)
# Retry on server errors
if 500 <= response.status_code < 600:
raise httpx.HTTPStatusError(
f"Server error: {response.status_code}",
request=response.request,
response=response
)
response.raise_for_status()
return response.json()
except (httpx.ConnectError, httpx.TimeoutException, httpx.HTTPStatusError) as e:
last_exception = e
if attempt < max_retries - 1:
wait_time = backoff_factor ** attempt
print(f"Request failed, retrying in {wait_time}s...")
time.sleep(wait_time)
raise last_exception
@task
def safe_api_call(url: str) -> dict:
"""Safe API call with error handling."""
try:
return resilient_request(url, max_retries=3)
except Exception as e:
return {"error": str(e), "url": url}
@flow
def error_handling_flow():
"""Flow with error handling."""
result = safe_api_call("https://api.example.com/data")
return result
Multi-API Data Aggregation
from dagy import flow, task
import httpx
@task
def fetch_from_multiple_apis(api_urls: list) -> list:
"""Fetch from multiple APIs concurrently."""
results = []
with httpx.Client() as client:
for url in api_urls:
try:
response = client.get(url)
response.raise_for_status()
results.append({
"url": url,
"data": response.json(),
"status": "success"
})
except Exception as e:
results.append({
"url": url,
"error": str(e),
"status": "failed"
})
return results
@task
def aggregate_results(results: list) -> dict:
"""Aggregate results from multiple sources."""
successful = [r for r in results if r['status'] == 'success']
failed = [r for r in results if r['status'] == 'failed']
aggregated = {
"total_sources": len(results),
"successful": len(successful),
"failed": len(failed),
"data": [r['data'] for r in successful]
}
return aggregated
@flow
def multi_api_flow():
"""Aggregate data from multiple APIs."""
apis = [
"https://api1.example.com/data",
"https://api2.example.com/data",
"https://api3.example.com/data"
]
results = fetch_from_multiple_apis(apis)
aggregated = aggregate_results(results)
return aggregated
Best Practices & Gotchas
- Timeouts: Always set explicit timeouts to avoid hanging:
client.get(url, timeout=30.0) - Connection pooling: Reuse clients within tasks for connection pooling.
- Error codes: Handle specific HTTP error codes (429, 503, etc.) differently.
- SSL verification: In production, keep SSL verification enabled.
Snowflake / BigQuery
Cloud data warehouses are ideal for analytics. Integrate Dagy with Snowflake or BigQuery for ETL workflows.
Prerequisites
# For Snowflake
pip install snowflake-connector-python
# For BigQuery
pip install google-cloud-bigquery
Configuration
# Snowflake
dagy secret set SNOWFLAKE_ACCOUNT "xy12345.us-east-1"
dagy secret set SNOWFLAKE_USER "dagy_user"
dagy secret set SNOWFLAKE_PASSWORD "password"
dagy secret set SNOWFLAKE_WAREHOUSE "COMPUTE_WH"
dagy secret set SNOWFLAKE_DATABASE "ANALYTICS"
# BigQuery
dagy secret set GCP_PROJECT_ID "my-gcp-project"
dagy secret set GOOGLE_APPLICATION_CREDENTIALS "/path/to/credentials.json"
Snowflake Integration
from dagy import flow, task
import snowflake.connector
import os
def get_snowflake_connection():
"""Get Snowflake connection."""
return snowflake.connector.connect(
account=os.getenv('SNOWFLAKE_ACCOUNT'),
user=os.getenv('SNOWFLAKE_USER'),
password=os.getenv('SNOWFLAKE_PASSWORD'),
warehouse=os.getenv('SNOWFLAKE_WAREHOUSE'),
database=os.getenv('SNOWFLAKE_DATABASE')
)
@task
def query_snowflake(sql: str) -> list:
"""Execute query on Snowflake."""
conn = get_snowflake_connection()
try:
cursor = conn.cursor()
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
return results
finally:
conn.close()
@task
def load_to_snowflake(table: str, data: list, columns: list) -> int:
"""Load data into Snowflake."""
conn = get_snowflake_connection()
try:
cursor = conn.cursor()
placeholders = ",".join(["%s"] * len(columns))
col_names = ",".join(columns)
sql = f"INSERT INTO {table} ({col_names}) VALUES ({placeholders})"
for row in data:
values = [row.get(col) for col in columns]
cursor.execute(sql, values)
conn.commit()
return cursor.rowcount
finally:
cursor.close()
conn.close()
@task
def copy_into_snowflake(
table: str,
s3_path: str,
aws_key_id: str,
aws_secret_key: str
) -> str:
"""Load data from S3 using COPY INTO."""
conn = get_snowflake_connection()
try:
cursor = conn.cursor()
sql = f"""
COPY INTO {table}
FROM 's3://{s3_path}'
CREDENTIALS=(AWS_KEY_ID='{aws_key_id}' AWS_SECRET_KEY='{aws_secret_key}')
FILE_FORMAT=(TYPE='PARQUET')
"""
cursor.execute(sql)
conn.commit()
return "Data loaded successfully"
finally:
cursor.close()
conn.close()
@flow
def snowflake_etl_flow():
"""Snowflake ETL pipeline."""
# Query source
results = query_snowflake("SELECT * FROM staging.raw_data LIMIT 1000")
# Transform
transformed = [
{"id": r[0], "name": r[1].upper(), "status": "processed"}
for r in results
]
# Load
loaded = load_to_snowflake("analytics.processed_data", transformed,
["id", "name", "status"])
return f"Loaded {loaded} records"
BigQuery Integration
from dagy import flow, task
from google.cloud import bigquery
import os
def get_bigquery_client():
"""Get BigQuery client."""
return bigquery.Client(project=os.getenv('GCP_PROJECT_ID'))
@task
def query_bigquery(sql: str) -> list:
"""Execute query on BigQuery."""
client = get_bigquery_client()
query_job = client.query(sql)
results = query_job.result()
return [dict(row) for row in results]
@task
def load_to_bigquery(table_id: str, rows: list) -> str:
"""Load data into BigQuery."""
client = get_bigquery_client()
errors = client.insert_rows_json(table_id, rows)
if errors:
return f"Errors: {errors}"
return f"Loaded {len(rows)} rows"
@task
def create_bigquery_table(
table_id: str,
schema: list
) -> str:
"""Create BigQuery table."""
client = get_bigquery_client()
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table, exists_ok=True)
return f"Table {table.project}.{table.dataset_id}.{table.table_id} created"
@flow
def bigquery_etl_flow():
"""BigQuery ETL pipeline."""
sql = "SELECT * FROM `project.dataset.source_table` LIMIT 100"
results = query_bigquery(sql)
target_table = "project.dataset.target_table"
loaded = load_to_bigquery(target_table, results)
return loaded
Warehouse ETL Pattern
from dagy import flow, task
@task
def extract_from_warehouse(query: str, warehouse: str = "snowflake") -> list:
"""Extract from data warehouse."""
if warehouse == "snowflake":
return query_snowflake(query)
elif warehouse == "bigquery":
return query_bigquery(query)
@task
def transform_warehouse_data(data: list) -> list:
"""Transform warehouse data."""
import datetime
transformed = []
for row in data:
transformed.append({
**dict(row) if isinstance(row, dict) else {"value": row},
"transformed_at": datetime.datetime.utcnow().isoformat()
})
return transformed
@task
def load_to_warehouse(
table: str,
data: list,
warehouse: str = "snowflake"
) -> str:
"""Load to warehouse."""
if warehouse == "snowflake":
count = load_to_snowflake(table, data, list(data[0].keys()) if data else [])
elif warehouse == "bigquery":
count = load_to_bigquery(table, data)
return f"Loaded {count} records"
@flow
def warehouse_etl_flow():
"""End-to-end warehouse ETL."""
data = extract_from_warehouse(
"SELECT * FROM raw_data LIMIT 1000",
warehouse="snowflake"
)
transformed = transform_warehouse_data(data)
result = load_to_warehouse(
"processed_data",
transformed,
warehouse="snowflake"
)
return result
Best Practices & Gotchas
- Staging: Use staging tables for intermediate transformations.
- Partitioning: Partition large tables by date for better query performance.
- Clustering: Use clustering in BigQuery for columnar storage optimization.
- Cost control: Monitor and optimize queries to control costs.
Docker / Custom Containers
The ECS backend supports custom Docker containers. Use it for complex tasks requiring specific environments or dependencies.
Prerequisites
# Docker image with Dagy installed
FROM python:3.11-slim
RUN pip install dagy
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . /app
WORKDIR /app
Configuring Worker Images
# dagy.yml
backend: ecs
ecs_config:
task_role_arn: "arn:aws:iam::ACCOUNT:role/DagyECSTaskRole"
execution_role_arn: "arn:aws:iam::ACCOUNT:role/DagyECSExecutionRole"
container_image: "123456789.dkr.ecr.us-east-1.amazonaws.com/dagy-worker:latest"
cpu: "512"
memory: "1024"
environment:
LOG_LEVEL: "INFO"
WORKERS: "4"
Using Custom Docker Images
from dagy import flow, task
@task
def process_with_custom_image() -> str:
"""Task running in custom container."""
import subprocess
# Your custom tools available in container
result = subprocess.run(
["custom-tool", "--process"],
capture_output=True,
text=True
)
return result.stdout
@task
def ml_inference(model_path: str, data: dict) -> dict:
"""Run ML model in custom container."""
# Custom ML libraries available in container
import tensorflow as tf
model = tf.keras.models.load_model(model_path)
prediction = model.predict([data['features']])
return {"prediction": float(prediction[0][0])}
@task
def process_gpu_task(data: list) -> str:
"""GPU-accelerated processing."""
# GPU available in ECS task with GPU support
import torch
if torch.cuda.is_available():
device = torch.device("cuda")
print(f"Using GPU: {torch.cuda.get_device_name(0)}")
else:
device = torch.device("cpu")
return f"Processing on {device}"
@flow
def custom_container_flow():
"""Flow using custom Docker container."""
result = process_with_custom_image()
return result
Passing Environment Variables
from dagy import flow, task
import os
@task
def use_environment_variables() -> dict:
"""Access environment variables set in container."""
return {
"app_name": os.getenv('APP_NAME'),
"log_level": os.getenv('LOG_LEVEL'),
"workers": os.getenv('WORKERS')
}
@task
def configure_from_env() -> str:
"""Configure application from environment."""
host = os.getenv('DATABASE_HOST', 'localhost')
port = os.getenv('DATABASE_PORT', '5432')
connection_string = f"postgresql://{host}:{port}/dagy"
return f"Connecting to {connection_string}"
@flow
def env_config_flow():
"""Flow using environment configuration."""
config = use_environment_variables()
return config
Building and Pushing Docker Images
# Build image
docker build -t dagy-worker:latest .
# Tag for ECR
docker tag dagy-worker:latest 123456789.dkr.ecr.us-east-1.amazonaws.com/dagy-worker:latest
# Push to ECR
aws ecr get-login-password --region us-east-1 | \
docker login --username AWS --password-stdin 123456789.dkr.ecr.us-east-1.amazonaws.com
docker push 123456789.dkr.ecr.us-east-1.amazonaws.com/dagy-worker:latest
Custom Container Flow Example
from dagy import flow, task
@task
def heavy_computation() -> int:
"""Compute-intensive task."""
# This runs in ECS with configured CPU/memory
result = sum(i * i for i in range(10_000_000))
return result
@task
def memory_intensive_task(size_gb: int) -> str:
"""Memory-intensive task."""
# This runs in ECS with allocated memory
import numpy as np
# Allocate memory
data = np.zeros((size_gb * 125_000_000,), dtype=np.float32)
return f"Allocated {size_gb}GB of memory"
@flow
def resource_intensive_flow():
"""Flow requiring significant resources."""
result = heavy_computation()
mem_result = memory_intensive_task(2) # 2GB
return {"computation": result, "memory": mem_result}
Best Practices & Gotchas
- Image size: Keep Docker images small by using multi-stage builds.
- Caching: Use Docker layer caching to speed up builds.
- Security: Don't include credentials in images, use IAM roles instead.
- Logging: Ensure logs are captured and sent to CloudWatch:
ENV PYTHONUNBUFFERED=1
CI/CD Systems (GitHub Actions)
Integrate Dagy with GitHub Actions to trigger flows from CI/CD pipelines.
Prerequisites
pip install dagy # Dagy CLI
Configuration
# Store Dagy credentials in GitHub Secrets
DAGY_API_TOKEN: "your-api-token"
DAGY_WORKSPACE: "your-workspace"
Triggering Dagy Flows from CI/CD
# .github/workflows/deploy.yml
name: Deploy and Run Dagy Flow
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install Dagy
run: pip install dagy
- name: Deploy Flow
run: |
dagy login --token ${{ secrets.DAGY_API_TOKEN }}
dagy deploy --workspace ${{ secrets.DAGY_WORKSPACE }}
- name: Run Flow
run: |
dagy run --flow deployment_flow --workspace ${{ secrets.DAGY_WORKSPACE }}
Using Dagy CLI in Pipelines
# .github/workflows/test.yml
name: Run Tests with Dagy
on: [pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Dagy
run: pip install dagy
- name: Run Integration Tests
run: |
dagy run --flow test_integration_flow \
--param input_file=test_data.json \
--workspace ${{ secrets.DAGY_WORKSPACE }}
- name: Check Results
run: |
dagy status --flow test_integration_flow \
--workspace ${{ secrets.DAGY_WORKSPACE }}
Webhook-Based Integration
# .github/workflows/webhook.yml
name: Webhook Trigger
on:
workflow_dispatch:
inputs:
flow_name:
description: 'Dagy flow to run'
required: true
parameters:
description: 'Flow parameters (JSON)'
required: false
jobs:
trigger:
runs-on: ubuntu-latest
steps:
- name: Trigger Dagy Flow
run: |
curl -X POST \
https://api.dagy.io/flows/${{ github.event.inputs.flow_name }}/run \
-H "Authorization: Bearer ${{ secrets.DAGY_API_TOKEN }}" \
-H "Content-Type: application/json" \
-d '{"parameters": ${{ github.event.inputs.parameters }}}'
CI/CD Deployment Flow Example
from dagy import flow, task
import subprocess
import os
@task
def checkout_code(repo_url: str, ref: str) -> str:
"""Checkout code from repository."""
subprocess.run(["git", "clone", repo_url, "/tmp/repo"])
subprocess.run(["git", "-C", "/tmp/repo", "checkout", ref])
return "/tmp/repo"
@task
def run_tests(repo_path: str) -> dict:
"""Run test suite."""
result = subprocess.run(
["pytest", "-v"],
cwd=repo_path,
capture_output=True,
text=True
)
return {
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"passed": result.returncode == 0
}
@task
def build_artifact(repo_path: str) -> str:
"""Build deployment artifact."""
subprocess.run(
["python", "setup.py", "build"],
cwd=repo_path,
capture_output=True
)
return f"{repo_path}/dist"
@task
def deploy_artifact(artifact_path: str, environment: str) -> str:
"""Deploy artifact to environment."""
# Deployment logic
print(f"Deploying {artifact_path} to {environment}")
return f"Deployed to {environment}"
@task
def notify_deployment(
status: str,
commit_sha: str,
environment: str
) -> str:
"""Notify Slack about deployment."""
import requests
webhook_url = os.getenv('SLACK_WEBHOOK_URL')
message = {
"text": f"Deployment {status} for {commit_sha} to {environment}"
}
requests.post(webhook_url, json=message)
return "Notification sent"
@flow
def ci_cd_deployment_flow(
repo_url: str,
commit_sha: str,
environment: str = "staging"
):
"""Complete CI/CD deployment flow."""
# Checkout code
repo_path = checkout_code(repo_url, commit_sha)
# Run tests
test_results = run_tests(repo_path)
if not test_results['passed']:
notify_deployment("FAILED", commit_sha, environment)
raise Exception("Tests failed")
# Build
artifact_path = build_artifact(repo_path)
# Deploy
deploy_result = deploy_artifact(artifact_path, environment)
# Notify
notify_deployment("SUCCESS", commit_sha, environment)
return {
"deployment": deploy_result,
"environment": environment
}
Best Practices & Gotchas
- Secrets: Always use GitHub Secrets for sensitive data.
- Timeouts: Set appropriate timeouts for long-running flows.
- Notifications: Implement proper notification on success/failure.
- Rollback: Plan rollback procedures for failed deployments.
Summary
This guide covers integrating Dagy with 12 major services and platforms. Key takeaways:
- Use connection pooling for databases and HTTP clients
- Store credentials in secrets, never hardcode
- Implement error handling and retries for resilience
- Choose the right backend: Lambda for short tasks, Step Functions for medium/parallel, ECS for long-running
- Test integrations locally before deploying to production
- Monitor and log all integrations for debugging
For more information, visit the Dagy documentation at https://dagy.io/docs.