Dagy SDK Cookbook
A comprehensive guide with 50+ practical examples for building workflows with the Dagy Python SDK. Each example is complete, runnable, and demonstrates real-world patterns.
Table of Contents
- Getting Started
- Data Engineering
- API & Web
- ML & AI
- File Processing
- Retry & Error Handling
- Scheduling
- Notifications & Monitoring
- Advanced Patterns
- Real-World Scenarios
Getting Started
1. Hello World - Simplest Flow
The most basic Dagy flow with a single task that returns a greeting.
from dagy import flow, task
@task
def greet(name: str) -> str:
"""Simple greeting task."""
return f"Hello, {name}!"
@flow
def hello_world_flow(name: str = "World") -> str:
"""Simplest Dagy flow - single task."""
return greet(name=name)
# Local execution
if __name__ == "__main__":
result = hello_world_flow.run_local(name="Alice")
print(result) # Output: Hello, Alice!
Key Concepts:
@taskdecorator wraps functions as executable tasks@flowdecorator creates a workflow orchestrating tasksrun_local()executes the flow locally for testing- Return values pass between tasks and flows
2. Two-Task Chain - Passing Data Between Tasks
Chain two tasks where the first task's output becomes the second task's input.
from dagy import flow, task
@task
def fetch_user(user_id: int) -> dict:
"""Fetch user data from simulated database."""
users = {
1: {"id": 1, "name": "Alice", "email": "alice@example.com"},
2: {"id": 2, "name": "Bob", "email": "bob@example.com"},
}
return users.get(user_id, {})
@task
def format_user_info(user: dict) -> str:
"""Format user data as a readable string."""
if not user:
return "User not found"
return f"User: {user['name']} ({user['email']})"
@flow
def user_info_flow(user_id: int) -> str:
"""Chain two tasks: fetch user, then format info."""
user_data = fetch_user(user_id=user_id)
formatted = format_user_info(user=user_data)
return formatted
# Local execution
if __name__ == "__main__":
result = user_info_flow.run_local(user_id=1)
print(result) # Output: User: Alice (alice@example.com)
Key Concepts:
- Task outputs automatically become inputs to subsequent tasks
- Data flows naturally through the pipeline
- Each task is independent and reusable
3. Multi-Step Pipeline - 3+ Tasks with Data Flow
Build a more complex pipeline with multiple sequential tasks.
from dagy import flow, task
import json
from datetime import datetime
@task
def extract_data(source: str) -> list:
"""Extract raw data from source."""
# Simulated data extraction
return [
{"id": 1, "value": 100, "timestamp": "2024-01-01"},
{"id": 2, "value": 200, "timestamp": "2024-01-02"},
{"id": 3, "value": 150, "timestamp": "2024-01-03"},
]
@task
def transform_data(raw_data: list) -> list:
"""Transform and clean data."""
transformed = []
for record in raw_data:
transformed.append({
"id": record["id"],
"value": float(record["value"]),
"date": record["timestamp"],
"processed_at": datetime.now().isoformat(),
})
return transformed
@task
def validate_data(data: list) -> dict:
"""Validate transformed data and return stats."""
valid_count = len(data)
total_value = sum(record["value"] for record in data)
return {
"valid_records": valid_count,
"total_value": total_value,
"average": total_value / valid_count if valid_count > 0 else 0,
}
@task
def store_results(stats: dict) -> str:
"""Store validation results."""
result_json = json.dumps(stats, indent=2)
# In production: store to database or file
return result_json
@flow
def data_pipeline(source: str = "database") -> str:
"""Multi-step ETL pipeline."""
raw = extract_data(source=source)
transformed = transform_data(raw_data=raw)
stats = validate_data(data=transformed)
results = store_results(stats=stats)
return results
# Local execution
if __name__ == "__main__":
result = data_pipeline.run_local(source="production_db")
print(result)
Key Concepts:
- Multi-step sequential processing
- Data transformation at each stage
- Aggregation and validation steps
4. Parameterized Flow - Accepting Runtime Parameters
Create flexible flows that accept various parameters at runtime.
from dagy import flow, task
from enum import Enum
class DataFormat(str, Enum):
"""Supported data formats."""
JSON = "json"
CSV = "csv"
XML = "xml"
@task
def load_data(source: str) -> list:
"""Load data from source."""
return [
{"id": 1, "name": "Item A", "price": 29.99},
{"id": 2, "name": "Item B", "price": 49.99},
{"id": 3, "name": "Item C", "price": 19.99},
]
@task
def format_output(data: list, format: DataFormat, include_total: bool = True) -> str:
"""Format data according to specified format."""
if format == DataFormat.JSON:
import json
output = {"items": data}
if include_total:
output["total"] = sum(item["price"] for item in data)
return json.dumps(output)
elif format == DataFormat.CSV:
lines = ["id,name,price"]
for item in data:
lines.append(f"{item['id']},{item['name']},{item['price']}")
if include_total:
total = sum(item["price"] for item in data)
lines.append(f"TOTAL,{total}")
return "\n".join(lines)
return str(data)
@flow
def parameterized_flow(
source: str = "default",
output_format: DataFormat = DataFormat.JSON,
include_total: bool = True,
) -> str:
"""Flow accepting multiple runtime parameters."""
data = load_data(source=source)
formatted = format_output(
data=data,
format=output_format,
include_total=include_total
)
return formatted
# Local execution with different parameters
if __name__ == "__main__":
# JSON format with totals
result1 = parameterized_flow.run_local(
source="api",
output_format=DataFormat.JSON,
include_total=True
)
print("JSON Output:")
print(result1)
# CSV format without totals
result2 = parameterized_flow.run_local(
source="database",
output_format=DataFormat.CSV,
include_total=False
)
print("\nCSV Output:")
print(result2)
Key Concepts:
- Type-annotated parameters for runtime configuration
- Enum types for valid option constraints
- Flexible flows supporting multiple use cases
5. Local Development - Using run_local() for Testing
Best practices for testing flows locally before deployment.
from dagy import flow, task
import logging
# Configure logging for debugging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@task
def step_one(value: int) -> int:
"""First processing step."""
logger.info(f"Step 1: Processing value {value}")
result = value * 2
logger.info(f"Step 1: Result {result}")
return result
@task
def step_two(value: int) -> int:
"""Second processing step."""
logger.info(f"Step 2: Processing value {value}")
result = value + 10
logger.info(f"Step 2: Result {result}")
return result
@task
def step_three(value: int) -> dict:
"""Final processing step."""
logger.info(f"Step 3: Processing value {value}")
result = {
"final_value": value,
"is_even": value % 2 == 0,
"square": value ** 2,
}
logger.info(f"Step 3: Result {result}")
return result
@flow
def local_dev_flow(initial_value: int) -> dict:
"""Flow designed for local testing and debugging."""
val1 = step_one(value=initial_value)
val2 = step_two(value=val1)
result = step_three(value=val2)
return result
# Local testing with different parameters
if __name__ == "__main__":
print("=== Test 1: Basic execution ===")
result1 = local_dev_flow.run_local(initial_value=5)
print(f"Result: {result1}\n")
print("=== Test 2: Different input ===")
result2 = local_dev_flow.run_local(initial_value=10)
print(f"Result: {result2}\n")
print("=== Test 3: Edge case - zero ===")
result3 = local_dev_flow.run_local(initial_value=0)
print(f"Result: {result3}\n")
# Test with fail_fast to stop on first error
print("=== Test 4: With fail_fast ===")
result4 = local_dev_flow.run_local(initial_value=3, fail_fast=True)
print(f"Result: {result4}\n")
# Test with max_workers for parallel execution
print("=== Test 5: With max_workers ===")
result5 = local_dev_flow.run_local(initial_value=7, max_workers=2)
print(f"Result: {result5}")
Key Concepts:
fail_fast=Truestops pipeline on first task failuremax_workerscontrols parallelism in local testing- Logging for debugging task execution flow
- Testing multiple scenarios before deployment
Data Engineering
6. CSV to JSON Converter
Transform CSV data to JSON format with schema mapping.
from dagy import flow, task
import csv
import json
import io
@task
def read_csv(csv_content: str) -> list:
"""Parse CSV content into list of dicts."""
reader = csv.DictReader(io.StringIO(csv_content))
return list(reader)
@task
def transform_to_json(records: list, pretty: bool = True) -> str:
"""Convert records to JSON format."""
data = {
"count": len(records),
"records": records,
"metadata": {
"source": "csv_converter",
"record_count": len(records),
}
}
if pretty:
return json.dumps(data, indent=2)
return json.dumps(data)
@task
def validate_json(json_str: str) -> dict:
"""Validate and return JSON structure info."""
data = json.loads(json_str)
return {
"is_valid": True,
"record_count": data.get("count", 0),
"has_metadata": "metadata" in data,
}
@flow
def csv_to_json_flow(csv_content: str, pretty: bool = True) -> dict:
"""Convert CSV to JSON with validation."""
records = read_csv(csv_content=csv_content)
json_output = transform_to_json(records=records, pretty=pretty)
validation = validate_json(json_str=json_output)
return {
"json": json_output,
"validation": validation,
}
# Local execution
if __name__ == "__main__":
sample_csv = """name,email,age
Alice,alice@example.com,28
Bob,bob@example.com,35
Charlie,charlie@example.com,32"""
result = csv_to_json_flow.run_local(csv_content=sample_csv, pretty=True)
print(result["validation"])
print(result["json"])
Key Concepts:
- CSV parsing with DictReader
- Schema-aware transformation
- Output validation
7. S3 File Processor
Download files from S3, process them, and upload results.
from dagy import flow, task
import json
@task
def download_from_s3(bucket: str, key: str) -> str:
"""Download file from S3 bucket."""
# Simulated S3 download
print(f"Downloading s3://{bucket}/{key}")
# In production: use boto3
return f"Content of {key}"
@task
def process_file(content: str) -> dict:
"""Process downloaded file content."""
lines = content.split("\n")
return {
"line_count": len(lines),
"content_length": len(content),
"processed": True,
}
@task
def upload_to_s3(bucket: str, key: str, data: dict) -> str:
"""Upload processed results to S3."""
json_content = json.dumps(data)
output_key = f"processed/{key}.json"
print(f"Uploading to s3://{bucket}/{output_key}")
# In production: use boto3
return output_key
@flow
def s3_processor_flow(
input_bucket: str,
input_key: str,
output_bucket: str,
) -> str:
"""Download, process, and upload S3 files."""
content = download_from_s3(bucket=input_bucket, key=input_key)
processed = process_file(content=content)
output_key = upload_to_s3(
bucket=output_bucket,
key=input_key,
data=processed
)
return output_key
# Local execution
if __name__ == "__main__":
result = s3_processor_flow.run_local(
input_bucket="raw-data",
input_key="file.txt",
output_bucket="processed-data"
)
print(f"Output location: s3://{result}")
Key Concepts:
- Cloud storage integration patterns
- Content transformation
- File output tracking
8. Database ETL Pipeline
Extract from database, transform, and load to target system.
from dagy import flow, task
from typing import List
from datetime import datetime
@task
def extract_from_db(query: str, limit: int = 1000) -> List[dict]:
"""Extract data from source database."""
# Simulated database query
print(f"Executing query with limit {limit}")
return [
{"id": i, "value": i * 10, "created": "2024-01-01"}
for i in range(1, min(101, limit + 1))
]
@task
def transform_records(records: List[dict]) -> List[dict]:
"""Apply transformations to extracted records."""
transformed = []
for record in records:
transformed.append({
**record,
"value_usd": record["value"] * 1.1,
"processed_at": datetime.now().isoformat(),
"status": "processed",
})
return transformed
@task
def load_to_warehouse(records: List[dict], table: str) -> dict:
"""Load transformed data to data warehouse."""
# Simulated warehouse load
print(f"Loading {len(records)} records to {table}")
return {
"table": table,
"rows_loaded": len(records),
"timestamp": datetime.now().isoformat(),
"status": "success",
}
@flow
def etl_pipeline(
source_query: str,
target_table: str,
record_limit: int = 1000,
) -> dict:
"""Complete ETL pipeline: Extract → Transform → Load."""
records = extract_from_db(query=source_query, limit=record_limit)
transformed = transform_records(records=records)
result = load_to_warehouse(records=transformed, table=target_table)
return result
# Local execution
if __name__ == "__main__":
result = etl_pipeline.run_local(
source_query="SELECT * FROM raw_data",
target_table="warehouse.processed_data",
record_limit=100
)
print(result)
Key Concepts:
- Extract phase with query execution
- Transform phase with record mapping
- Load phase with batch insertion
9. Data Validation Pipeline
Validate data structure and report on quality issues.
from dagy import flow, task
from typing import List
@task
def load_data_batch(batch_id: str) -> List[dict]:
"""Load batch of data for validation."""
# Simulated data load
return [
{"id": 1, "email": "alice@example.com", "age": 28},
{"id": 2, "email": "bob@example.com", "age": 35},
{"id": 3, "email": None, "age": 30}, # Invalid: missing email
{"id": 4, "email": "invalid-email", "age": -5}, # Invalid: bad email and age
]
@task
def validate_schema(records: List[dict]) -> tuple:
"""Validate record schema and types."""
required_fields = {"id", "email", "age"}
errors = []
valid_records = []
for i, record in enumerate(records):
record_errors = []
# Check required fields
if not all(field in record for field in required_fields):
record_errors.append(f"Missing required field(s)")
# Validate email
if record.get("email") and "@" not in str(record.get("email")):
record_errors.append(f"Invalid email format")
# Validate age
age = record.get("age")
if age is not None and (not isinstance(age, (int, float)) or age < 0 or age > 150):
record_errors.append(f"Invalid age value")
if record_errors:
errors.append({"row": i, "record": record, "errors": record_errors})
else:
valid_records.append(record)
return valid_records, errors
@task
def generate_report(valid_records: List[dict], errors: List[dict]) -> dict:
"""Generate validation report."""
total = valid_records.__len__() + errors.__len__()
return {
"total_records": total,
"valid_records": len(valid_records),
"invalid_records": len(errors),
"validity_percentage": (len(valid_records) / total * 100) if total > 0 else 0,
"errors": errors,
"valid_sample": valid_records[:5] if valid_records else [],
}
@flow
def validation_pipeline(batch_id: str) -> dict:
"""Validate data quality and generate report."""
records = load_data_batch(batch_id=batch_id)
valid, errors = validate_schema(records=records)
report = generate_report(valid_records=valid, errors=errors)
return report
# Local execution
if __name__ == "__main__":
result = validation_pipeline.run_local(batch_id="batch_001")
print(f"Valid records: {result['valid_records']}")
print(f"Invalid records: {result['invalid_records']}")
print(f"Validity: {result['validity_percentage']:.1f}%")
if result['errors']:
print(f"First error: {result['errors'][0]}")
Key Concepts:
- Schema validation
- Field-level validation rules
- Error collection and reporting
- Quality metrics computation
10. Incremental Data Load
Process only new or updated records since last run.
from dagy import flow, task
from datetime import datetime, timedelta
@task
def get_last_checkpoint(checkpoint_file: str) -> datetime:
"""Get timestamp of last successful run."""
# Simulated checkpoint retrieval
# In production: read from database or file
return datetime.now() - timedelta(days=1)
@task
def fetch_updated_records(since: datetime) -> list:
"""Fetch only records updated since checkpoint."""
# Simulated query filtering by timestamp
now = datetime.now()
return [
{"id": i, "updated": now - timedelta(hours=h)}
for i, h in enumerate([0, 2, 4, 6, 24, 48])
]
@task
def process_incremental(records: list) -> dict:
"""Process the incremental batch."""
return {
"batch_size": len(records),
"processed_ids": [r["id"] for r in records],
"timestamp": datetime.now().isoformat(),
}
@task
def update_checkpoint(checkpoint_file: str, timestamp: datetime) -> str:
"""Update checkpoint with current timestamp."""
# In production: write to database or file
return f"Checkpoint updated to {timestamp}"
@flow
def incremental_load_flow(checkpoint_file: str) -> dict:
"""Load and process only new/updated records."""
last_run = get_last_checkpoint(checkpoint_file=checkpoint_file)
new_records = fetch_updated_records(since=last_run)
result = process_incremental(records=new_records)
checkpoint_msg = update_checkpoint(
checkpoint_file=checkpoint_file,
timestamp=datetime.now()
)
return {"result": result, "checkpoint": checkpoint_msg}
# Local execution
if __name__ == "__main__":
result = incremental_load_flow.run_local(
checkpoint_file="/var/checkpoints/last_run.txt"
)
print(result)
Key Concepts:
- Checkpoint/watermark pattern
- Timestamp-based filtering
- Incremental processing efficiency
11. Data Deduplication
Detect and remove duplicate records from dataset.
from dagy import flow, task
from typing import List
@task
def load_raw_data() -> List[dict]:
"""Load data that may contain duplicates."""
return [
{"id": 1, "email": "alice@example.com", "name": "Alice"},
{"id": 2, "email": "bob@example.com", "name": "Bob"},
{"id": 1, "email": "alice@example.com", "name": "Alice"}, # Duplicate
{"id": 3, "email": "charlie@example.com", "name": "Charlie"},
{"id": 2, "email": "bob@example.com", "name": "Bob"}, # Duplicate
{"id": 4, "email": "dave@example.com", "name": "Dave"},
]
@task
def deduplicate_exact(records: List[dict]) -> tuple:
"""Remove exact duplicate records."""
seen = set()
unique = []
duplicates = []
for record in records:
# Create hashable key from record
key = tuple(sorted((k, v) for k, v in record.items() if k != "id"))
if key in seen:
duplicates.append(record)
else:
seen.add(key)
unique.append(record)
return unique, duplicates
@task
def generate_dedup_report(unique: List[dict], duplicates: List[dict]) -> dict:
"""Report on deduplication results."""
return {
"original_count": len(unique) + len(duplicates),
"unique_count": len(unique),
"duplicate_count": len(duplicates),
"dedup_percentage": (len(duplicates) / (len(unique) + len(duplicates)) * 100)
if (len(unique) + len(duplicates)) > 0 else 0,
}
@flow
def deduplication_flow() -> dict:
"""Identify and remove duplicate records."""
data = load_raw_data()
unique, duplicates = deduplicate_exact(records=data)
report = generate_dedup_report(unique=unique, duplicates=duplicates)
return {
"report": report,
"unique_records": unique,
}
# Local execution
if __name__ == "__main__":
result = deduplication_flow.run_local()
print(f"Report: {result['report']}")
print(f"Unique records: {len(result['unique_records'])}")
Key Concepts:
- Set-based duplicate detection
- Hashable key generation
- Deduplication reporting
12. Aggregation Pipeline
Group data and compute aggregate statistics.
from dagy import flow, task
from typing import List
from collections import defaultdict
@task
def fetch_sales_data() -> List[dict]:
"""Fetch raw sales transactions."""
return [
{"date": "2024-01-01", "product": "Widget A", "quantity": 5, "price": 10.00},
{"date": "2024-01-01", "product": "Widget B", "quantity": 3, "price": 15.00},
{"date": "2024-01-02", "product": "Widget A", "quantity": 8, "price": 10.00},
{"date": "2024-01-02", "product": "Widget C", "quantity": 2, "price": 25.00},
{"date": "2024-01-03", "product": "Widget B", "quantity": 6, "price": 15.00},
]
@task
def aggregate_by_product(transactions: List[dict]) -> dict:
"""Aggregate sales by product."""
aggregates = defaultdict(lambda: {"total_qty": 0, "total_revenue": 0, "count": 0})
for txn in transactions:
product = txn["product"]
aggregates[product]["total_qty"] += txn["quantity"]
aggregates[product]["total_revenue"] += txn["quantity"] * txn["price"]
aggregates[product]["count"] += 1
# Convert to regular dict and compute averages
result = {}
for product, agg in aggregates.items():
result[product] = {
**agg,
"avg_qty_per_sale": agg["total_qty"] / agg["count"],
"avg_price_per_unit": agg["total_revenue"] / agg["total_qty"],
}
return result
@task
def aggregate_by_date(transactions: List[dict]) -> dict:
"""Aggregate sales by date."""
by_date = defaultdict(lambda: {"count": 0, "total": 0.0})
for txn in transactions:
date = txn["date"]
revenue = txn["quantity"] * txn["price"]
by_date[date]["count"] += 1
by_date[date]["total"] += revenue
return dict(by_date)
@flow
def aggregation_flow() -> dict:
"""Aggregate transaction data multiple ways."""
transactions = fetch_sales_data()
by_product = aggregate_by_product(transactions=transactions)
by_date = aggregate_by_date(transactions=transactions)
return {
"by_product": by_product,
"by_date": by_date,
"total_transactions": len(transactions),
}
# Local execution
if __name__ == "__main__":
result = aggregation_flow.run_local()
print("By Product:")
for product, stats in result["by_product"].items():
print(f" {product}: {stats['total_qty']} units, ${stats['total_revenue']:.2f}")
print("\nBy Date:")
for date, stats in result["by_date"].items():
print(f" {date}: {stats['count']} sales, ${stats['total']:.2f}")
Key Concepts:
- GroupBy aggregation pattern
- Multiple aggregation dimensions
- Computed metrics (averages, totals)
13. Multi-Source Data Merge
Combine data from multiple sources with deduplication and conflict resolution.
from dagy import flow, task
from typing import List
@task
def fetch_source_a() -> List[dict]:
"""Fetch data from first source."""
return [
{"id": 1, "name": "Alice", "email": "alice@example.com", "source": "A"},
{"id": 2, "name": "Bob", "email": "bob@example.com", "source": "A"},
]
@task
def fetch_source_b() -> List[dict]:
"""Fetch data from second source."""
return [
{"id": 2, "name": "Robert", "email": "bob@example.com", "source": "B"},
{"id": 3, "name": "Charlie", "email": "charlie@example.com", "source": "B"},
]
@task
def merge_sources(records_a: List[dict], records_b: List[dict]) -> dict:
"""Merge records from multiple sources."""
merged = {}
conflicts = []
# Process source A
for record in records_a:
merged[record["id"]] = record.copy()
# Process source B
for record in records_b:
record_id = record["id"]
if record_id in merged:
existing = merged[record_id]
# Detect conflicts in name
if existing.get("name") != record.get("name"):
conflicts.append({
"id": record_id,
"field": "name",
"source_a": existing.get("name"),
"source_b": record.get("name"),
})
# Keep longer name (assumed more complete)
if len(record.get("name", "")) > len(existing.get("name", "")):
merged[record_id]["name"] = record["name"]
merged[record_id]["sources"] = ["A", "B"]
else:
merged[record_id] = record.copy()
merged[record_id]["sources"] = ["B"]
return {
"merged_records": list(merged.values()),
"record_count": len(merged),
"conflicts": conflicts,
}
@flow
def multi_source_merge_flow() -> dict:
"""Merge data from multiple sources with conflict detection."""
src_a = fetch_source_a()
src_b = fetch_source_b()
result = merge_sources(records_a=src_a, records_b=src_b)
return result
# Local execution
if __name__ == "__main__":
result = multi_source_merge_flow.run_local()
print(f"Total records: {result['record_count']}")
print(f"Conflicts: {len(result['conflicts'])}")
for record in result["merged_records"]:
print(f" ID {record['id']}: {record['name']} (from {record['sources']})")
Key Concepts:
- Multi-source data integration
- Conflict detection and resolution
- Deduplication during merge
API & Web
14. REST API Poller
Poll REST endpoint repeatedly and store results.
from dagy import flow, task
import time
from datetime import datetime
@task
def poll_api_endpoint(
url: str,
poll_count: int = 3,
interval_seconds: int = 1,
) -> list:
"""Poll API endpoint multiple times."""
results = []
for i in range(poll_count):
# Simulated API call
timestamp = datetime.now().isoformat()
data = {
"poll_number": i + 1,
"timestamp": timestamp,
"status": "success",
"data": {"counter": i + 1, "value": (i + 1) * 100},
}
results.append(data)
print(f"Poll {i + 1}/{poll_count}: {data}")
if i < poll_count - 1:
time.sleep(interval_seconds)
return results
@task
def aggregate_poll_results(poll_results: list) -> dict:
"""Aggregate results from multiple polls."""
successful = len(poll_results)
values = [r["data"]["value"] for r in poll_results]
return {
"total_polls": len(poll_results),
"successful_polls": successful,
"values": values,
"average_value": sum(values) / len(values) if values else 0,
"max_value": max(values) if values else None,
"min_value": min(values) if values else None,
}
@task
def store_poll_results(results: dict, destination: str) -> str:
"""Store aggregated poll results."""
# Simulated storage
print(f"Storing {len(results['values'])} results to {destination}")
return f"Stored at {destination}"
@flow
def api_poller_flow(
api_url: str,
poll_count: int = 5,
interval_seconds: int = 2,
) -> dict:
"""Poll API endpoint and store aggregated results."""
poll_data = poll_api_endpoint(
url=api_url,
poll_count=poll_count,
interval_seconds=interval_seconds
)
aggregates = aggregate_poll_results(poll_results=poll_data)
storage_result = store_poll_results(
results=aggregates,
destination="/data/polls"
)
return {
"aggregates": aggregates,
"storage": storage_result,
}
# Local execution
if __name__ == "__main__":
result = api_poller_flow.run_local(
api_url="https://api.example.com/data",
poll_count=3,
interval_seconds=1
)
print(f"Average: {result['aggregates']['average_value']}")
Key Concepts:
- Repeated API polling with intervals
- Result aggregation from multiple calls
- Time-series data handling
15. Webhook-Triggered Pipeline
Process incoming webhook payloads through a pipeline.
from dagy import flow, task
from typing import Dict
import hashlib
import hmac
@task
def verify_webhook_signature(
payload: Dict,
signature: str,
secret: str,
) -> bool:
"""Verify webhook signature for authenticity."""
# Compute HMAC signature
payload_str = str(payload)
expected_sig = hmac.new(
secret.encode(),
payload_str.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected_sig, signature)
@task
def parse_webhook_data(payload: Dict) -> Dict:
"""Parse and validate webhook payload."""
required_fields = {"event_type", "timestamp", "data"}
if not all(f in payload for f in required_fields):
raise ValueError("Missing required webhook fields")
return {
"event_type": payload["event_type"],
"timestamp": payload["timestamp"],
"data": payload["data"],
"received_at": __import__("datetime").datetime.now().isoformat(),
}
@task
def process_webhook_event(parsed_data: Dict) -> dict:
"""Process the parsed webhook event."""
event_type = parsed_data["event_type"]
handlers = {
"user.created": lambda d: {"action": "create_profile", "user_id": d["data"].get("user_id")},
"user.deleted": lambda d: {"action": "deactivate_profile", "user_id": d["data"].get("user_id")},
"order.completed": lambda d: {"action": "process_fulfillment", "order_id": d["data"].get("order_id")},
}
handler = handlers.get(event_type)
if not handler:
return {"action": "unknown", "event": event_type}
return handler(parsed_data)
@task
def store_webhook_record(processed: dict, webhook_id: str) -> str:
"""Store processed webhook for audit trail."""
# Simulated storage
return f"Webhook {webhook_id} processed: {processed['action']}"
@flow
def webhook_flow(
payload: Dict,
signature: str,
secret: str,
webhook_id: str = "unknown",
) -> dict:
"""Process incoming webhook with signature verification."""
is_valid = verify_webhook_signature(
payload=payload,
signature=signature,
secret=secret
)
if not is_valid:
raise ValueError("Invalid webhook signature")
parsed = parse_webhook_data(payload=payload)
processed = process_webhook_event(parsed_data=parsed)
audit_log = store_webhook_record(processed=processed, webhook_id=webhook_id)
return {
"success": True,
"processed": processed,
"audit_log": audit_log,
}
# Local execution
if __name__ == "__main__":
payload = {
"event_type": "user.created",
"timestamp": "2024-01-01T10:00:00Z",
"data": {"user_id": "usr_123", "email": "new@example.com"},
}
secret = "webhook_secret_key"
sig_payload = str(payload)
signature = hmac.new(
secret.encode(),
sig_payload.encode(),
hashlib.sha256
).hexdigest()
result = webhook_flow.run_local(
payload=payload,
signature=signature,
secret=secret,
webhook_id="wh_001"
)
print(result)
Key Concepts:
- Webhook signature verification
- Event-type dispatch pattern
- Audit trail logging
16. Multi-API Orchestration
Call multiple APIs in sequence with data flowing through pipeline.
from dagy import flow, task
@task
def fetch_user_details(user_id: str) -> dict:
"""Fetch user details from API."""
return {
"user_id": user_id,
"name": "Alice Smith",
"email": "alice@example.com",
"organization_id": "org_123",
}
@task
def fetch_organization_info(org_id: str) -> dict:
"""Fetch organization details from API."""
return {
"org_id": org_id,
"name": "Example Corp",
"subscription_tier": "premium",
"region": "us-west-2",
}
@task
def fetch_user_permissions(user_id: str, org_id: str) -> list:
"""Fetch user permissions for organization."""
return [
"read:documents",
"write:documents",
"admin:organization",
]
@task
def aggregate_user_profile(
user: dict,
org: dict,
permissions: list
) -> dict:
"""Aggregate all fetched data into complete profile."""
return {
"profile": {
"user": user,
"organization": org,
"permissions": permissions,
"effective_tier": org["subscription_tier"],
"is_admin": "admin:organization" in permissions,
},
"completed_at": __import__("datetime").datetime.now().isoformat(),
}
@flow
def multi_api_orchestration_flow(user_id: str) -> dict:
"""Orchestrate calls to multiple APIs with data flow."""
user = fetch_user_details(user_id=user_id)
org = fetch_organization_info(org_id=user["organization_id"])
permissions = fetch_user_permissions(user_id=user_id, org_id=user["organization_id"])
profile = aggregate_user_profile(user=user, org=org, permissions=permissions)
return profile
# Local execution
if __name__ == "__main__":
result = multi_api_orchestration_flow.run_local(user_id="usr_123")
print(f"User: {result['profile']['user']['name']}")
print(f"Org: {result['profile']['organization']['name']}")
print(f"Is Admin: {result['profile']['is_admin']}")
Key Concepts:
- Sequential API calls with data dependencies
- Data aggregation from multiple sources
- Cross-service orchestration pattern
17. API Rate-Limited Fetcher
Respect API rate limits while fetching data.
from dagy import flow, task
import time
from typing import List
@task
def fetch_with_rate_limit(
endpoint: str,
items: List[str],
rate_limit_per_second: float = 2.0,
) -> List[dict]:
"""Fetch items respecting rate limits."""
min_interval = 1.0 / rate_limit_per_second
results = []
for i, item in enumerate(items):
# Simulated API fetch
result = {
"item": item,
"data": f"Data for {item}",
"timestamp": __import__("datetime").datetime.now().isoformat(),
}
results.append(result)
print(f"Fetched {i + 1}/{len(items)}: {item}")
# Rate limiting (except on last item)
if i < len(items) - 1:
time.sleep(min_interval)
return results
@task
def batch_results(results: List[dict], batch_size: int = 5) -> List[List[dict]]:
"""Group results into batches."""
batches = []
for i in range(0, len(results), batch_size):
batches.append(results[i:i + batch_size])
return batches
@task
def process_batches(batches: List[List[dict]]) -> dict:
"""Process batches with aggregated stats."""
return {
"total_items": sum(len(b) for b in batches),
"batch_count": len(batches),
"batches_processed": len(batches),
"avg_batch_size": sum(len(b) for b in batches) / len(batches) if batches else 0,
}
@flow
def rate_limited_fetch_flow(
items: List[str],
rate_limit: float = 2.0,
) -> dict:
"""Fetch data with rate limiting and batch processing."""
fetched = fetch_with_rate_limit(
endpoint="/api/data",
items=items,
rate_limit_per_second=rate_limit
)
batches = batch_results(results=fetched, batch_size=5)
stats = process_batches(batches=batches)
return {
"stats": stats,
"items_fetched": len(fetched),
}
# Local execution
if __name__ == "__main__":
items = [f"item_{i}" for i in range(1, 11)]
result = rate_limited_fetch_flow.run_local(
items=items,
rate_limit=2.0
)
print(result)
Key Concepts:
- Rate limiting with min_interval calculation
- Request throttling
- Batch processing for efficiency
18. GraphQL Data Collector
Query GraphQL endpoints and collect nested data.
from dagy import flow, task
from typing import Dict, List
@task
def query_graphql(query: str, variables: Dict = None) -> dict:
"""Execute GraphQL query."""
# Simulated GraphQL response
return {
"data": {
"users": [
{
"id": "usr_1",
"name": "Alice",
"posts": [
{"id": "post_1", "title": "First Post"},
{"id": "post_2", "title": "Second Post"},
]
},
{
"id": "usr_2",
"name": "Bob",
"posts": [
{"id": "post_3", "title": "Hello World"},
]
},
]
},
"errors": None,
}
@task
def flatten_graphql_response(response: dict) -> tuple:
"""Flatten nested GraphQL response into users and posts."""
users = []
posts = []
for user in response["data"]["users"]:
user_record = {"id": user["id"], "name": user["name"], "post_count": len(user.get("posts", []))}
users.append(user_record)
for post in user.get("posts", []):
posts.append({
**post,
"author_id": user["id"],
"author_name": user["name"],
})
return users, posts
@task
def generate_collection_report(users: List[dict], posts: List[dict]) -> dict:
"""Generate report on collected data."""
return {
"total_users": len(users),
"total_posts": len(posts),
"avg_posts_per_user": len(posts) / len(users) if users else 0,
"users": users,
"posts_sample": posts[:5],
}
@flow
def graphql_collector_flow(query: str) -> dict:
"""Collect and flatten data from GraphQL endpoint."""
response = query_graphql(query=query)
users, posts = flatten_graphql_response(response=response)
report = generate_collection_report(users=users, posts=posts)
return report
# Local execution
if __name__ == "__main__":
query = """
query GetUsers {
users {
id
name
posts { id title }
}
}
"""
result = graphql_collector_flow.run_local(query=query)
print(f"Users: {result['total_users']}, Posts: {result['total_posts']}")
Key Concepts:
- GraphQL query execution
- Nested response flattening
- Data normalization
19. Web Scraper Pipeline
Fetch, parse, and extract structured data from web content.
from dagy import flow, task
from typing import List
import re
@task
def fetch_webpage(url: str) -> str:
"""Fetch HTML content from webpage."""
# Simulated HTML response
return """
<html>
<head><title>Product Listings</title></head>
<body>
<div class="product">
<h3>Widget A</h3>
<span class="price">$19.99</span>
<p class="description">High quality widget</p>
</div>
<div class="product">
<h3>Widget B</h3>
<span class="price">$29.99</span>
<p class="description">Premium widget</p>
</div>
<div class="product">
<h3>Widget C</h3>
<span class="price">$9.99</span>
<p class="description">Budget widget</p>
</div>
</body>
</html>
"""
@task
def parse_products(html: str) -> List[dict]:
"""Parse HTML and extract product data."""
products = []
# Simple regex-based parsing (in production: use BeautifulSoup)
product_pattern = r'<div class="product">(.*?)</div>'
for product_html in re.findall(product_pattern, html, re.DOTALL):
# Extract fields
name_match = re.search(r'<h3>(.*?)</h3>', product_html)
price_match = re.search(r'<span class="price">\$(.*?)</span>', product_html)
desc_match = re.search(r'<p class="description">(.*?)</p>', product_html)
if name_match and price_match:
products.append({
"name": name_match.group(1).strip(),
"price": float(price_match.group(1)),
"description": desc_match.group(1).strip() if desc_match else "",
})
return products
@task
def store_products(products: List[dict], destination: str) -> dict:
"""Store extracted products."""
return {
"stored_count": len(products),
"destination": destination,
"products": products,
}
@flow
def web_scraper_flow(url: str) -> dict:
"""Scrape, parse, and store web content."""
html = fetch_webpage(url=url)
products = parse_products(html=html)
result = store_products(products=products, destination="/data/products")
return result
# Local execution
if __name__ == "__main__":
result = web_scraper_flow.run_local(url="https://example.com/products")
print(f"Products scraped: {result['stored_count']}")
for product in result['products']:
print(f" {product['name']}: ${product['price']:.2f}")
Key Concepts:
- HTML fetching and parsing
- Regex-based data extraction
- Data persistence
ML & AI
20. Model Training Pipeline
Prepare data, train model, and evaluate performance.
from dagy import flow, task
from typing import Tuple, Dict
@task
def prepare_training_data(dataset_path: str) -> Tuple[list, list]:
"""Load and prepare training data."""
# Simulated data loading
features = [[1.0, 2.0], [2.0, 3.0], [3.0, 4.0], [4.0, 5.0]]
labels = [0, 1, 0, 1]
return features, labels
@task
def split_train_test(
features: list,
labels: list,
test_split: float = 0.2
) -> dict:
"""Split data into training and test sets."""
split_idx = int(len(features) * (1 - test_split))
return {
"X_train": features[:split_idx],
"y_train": labels[:split_idx],
"X_test": features[split_idx:],
"y_test": labels[split_idx:],
}
@task
def train_model(X_train: list, y_train: list) -> dict:
"""Train ML model on training data."""
# Simulated model training
model_config = {
"algorithm": "logistic_regression",
"epochs": 100,
"learning_rate": 0.01,
}
# Simulated training metrics
training_log = {
"initial_loss": 0.85,
"final_loss": 0.23,
"epochs_completed": 100,
"converged": True,
}
return {
"model_config": model_config,
"training_log": training_log,
"model_id": "model_v1_20240101",
}
@task
def evaluate_model(
model: dict,
X_test: list,
y_test: list
) -> dict:
"""Evaluate model performance on test set."""
# Simulated evaluation
return {
"model_id": model["model_id"],
"accuracy": 0.92,
"precision": 0.89,
"recall": 0.95,
"f1_score": 0.92,
"test_samples": len(X_test),
}
@flow
def model_training_pipeline(dataset_path: str) -> dict:
"""Complete ML pipeline: prepare → train → evaluate."""
features, labels = prepare_training_data(dataset_path=dataset_path)
split_data = split_train_test(features=features, labels=labels, test_split=0.2)
model = train_model(X_train=split_data["X_train"], y_train=split_data["y_train"])
metrics = evaluate_model(
model=model,
X_test=split_data["X_test"],
y_test=split_data["y_test"]
)
return {
"model": model,
"evaluation": metrics,
"status": "success" if metrics["accuracy"] > 0.8 else "needs_improvement",
}
# Local execution
if __name__ == "__main__":
result = model_training_pipeline.run_local(dataset_path="/data/training.csv")
print(f"Model: {result['model']['model_id']}")
print(f"Accuracy: {result['evaluation']['accuracy']:.2%}")
Key Concepts:
- Data preparation and splitting
- Model training orchestration
- Metrics computation and evaluation
21. Feature Engineering Pipeline
Compute derived features for machine learning models.
from dagy import flow, task
from typing import List, Dict
import math
@task
def load_raw_data() -> List[Dict]:
"""Load raw data for feature engineering."""
return [
{"id": 1, "price": 100, "quantity": 5, "timestamp": "2024-01-01"},
{"id": 2, "price": 150, "quantity": 3, "timestamp": "2024-01-02"},
{"id": 3, "price": 200, "quantity": 8, "timestamp": "2024-01-03"},
]
@task
def compute_interaction_features(data: List[Dict]) -> List[Dict]:
"""Compute interaction features."""
enhanced = []
for record in data:
record_copy = record.copy()
record_copy["revenue"] = record["price"] * record["quantity"]
record_copy["price_qty_ratio"] = record["price"] / max(record["quantity"], 1)
enhanced.append(record_copy)
return enhanced
@task
def compute_statistical_features(data: List[Dict]) -> Tuple[List[Dict], dict]:
"""Compute statistical aggregation features."""
prices = [r["price"] for r in data]
avg_price = sum(prices) / len(prices) if prices else 0
max_price = max(prices) if prices else 0
enhanced = []
for record in data:
record_copy = record.copy()
record_copy["price_zscore"] = (record["price"] - avg_price) / max(1, max_price - avg_price)
record_copy["above_avg_price"] = 1 if record["price"] > avg_price else 0
enhanced.append(record_copy)
return enhanced, {"avg_price": avg_price, "max_price": max_price}
@task
def store_features(
data: List[Dict],
stats: dict,
destination: str
) -> dict:
"""Store computed features."""
return {
"features_stored": len(data),
"destination": destination,
"feature_count": len(data[0]) if data else 0,
"stats": stats,
}
@flow
def feature_engineering_flow() -> dict:
"""Compute and store derived features."""
raw_data = load_raw_data()
with_interactions = compute_interaction_features(data=raw_data)
with_stats, stats = compute_statistical_features(data=with_interactions)
result = store_features(
data=with_stats,
stats=stats,
destination="/data/features"
)
return {
"result": result,
"sample_record": with_stats[0] if with_stats else None,
}
# Local execution
if __name__ == "__main__":
result = feature_engineering_flow.run_local()
print(f"Features stored: {result['result']['features_stored']}")
print(f"Sample: {result['sample_record']}")
Key Concepts:
- Interaction feature computation
- Statistical feature generation
- Feature normalization and scaling
22. Batch Inference Pipeline
Run predictions on batches of data.
from dagy import flow, task
from typing import List, Dict
@task
def load_model(model_path: str) -> dict:
"""Load trained model from disk."""
# Simulated model loading
return {
"model_id": "model_v1",
"path": model_path,
"loaded_at": __import__("datetime").datetime.now().isoformat(),
}
@task
def load_inference_batch(batch_id: str, batch_size: int = 32) -> List[Dict]:
"""Load batch of data for inference."""
# Simulated batch loading
return [
{"id": i, "features": [float(i), float(i * 2)]}
for i in range(1, batch_size + 1)
]
@task
def run_inference(model: dict, batch: List[Dict]) -> List[Dict]:
"""Run model inference on batch."""
predictions = []
for record in batch:
# Simulated prediction
pred_score = sum(record["features"]) / len(record["features"])
predictions.append({
"id": record["id"],
"score": min(0.99, pred_score / 10), # Normalize
"prediction": 1 if pred_score > 5 else 0,
"confidence": 0.92,
})
return predictions
@task
def aggregate_results(predictions: List[Dict]) -> dict:
"""Aggregate inference results."""
scores = [p["score"] for p in predictions]
class_counts = {}
for pred in predictions:
pred_class = pred["prediction"]
class_counts[pred_class] = class_counts.get(pred_class, 0) + 1
return {
"total_predictions": len(predictions),
"avg_score": sum(scores) / len(scores) if scores else 0,
"class_distribution": class_counts,
"predictions_sample": predictions[:5],
}
@flow
def batch_inference_flow(batch_id: str, model_path: str) -> dict:
"""Run batch inference pipeline."""
model = load_model(model_path=model_path)
batch = load_inference_batch(batch_id=batch_id, batch_size=32)
predictions = run_inference(model=model, batch=batch)
aggregates = aggregate_results(predictions=predictions)
return {
"batch_id": batch_id,
"results": aggregates,
}
# Local execution
if __name__ == "__main__":
result = batch_inference_flow.run_local(
batch_id="batch_001",
model_path="/models/model_v1.pkl"
)
print(f"Predictions: {result['results']['total_predictions']}")
print(f"Avg score: {result['results']['avg_score']:.2f}")
Key Concepts:
- Model loading from disk
- Batch prediction execution
- Result aggregation and distribution analysis
23. A/B Test Analysis
Analyze experimental results and compute statistical significance.
from dagy import flow, task
import math
@task
def fetch_experiment_results(experiment_id: str) -> tuple:
"""Fetch results for control and treatment groups."""
# Simulated experiment data
control = {
"group": "control",
"sample_size": 1000,
"conversions": 120,
"revenue": 45000,
}
treatment = {
"group": "treatment",
"sample_size": 1000,
"conversions": 150,
"revenue": 54000,
}
return control, treatment
@task
def compute_statistics(control: dict, treatment: dict) -> dict:
"""Compute statistical metrics for both groups."""
ctrl_conv_rate = control["conversions"] / control["sample_size"]
treat_conv_rate = treatment["conversions"] / treatment["sample_size"]
ctrl_revenue_per_user = control["revenue"] / control["sample_size"]
treat_revenue_per_user = treatment["revenue"] / treatment["sample_size"]
return {
"control": {
"conversion_rate": ctrl_conv_rate,
"revenue_per_user": ctrl_revenue_per_user,
"total_revenue": control["revenue"],
},
"treatment": {
"conversion_rate": treat_conv_rate,
"revenue_per_user": treat_revenue_per_user,
"total_revenue": treatment["revenue"],
},
}
@task
def compute_lift(stats: dict) -> dict:
"""Compute uplift metrics."""
ctrl_conv = stats["control"]["conversion_rate"]
treat_conv = stats["treatment"]["conversion_rate"]
conversion_lift = ((treat_conv - ctrl_conv) / ctrl_conv * 100) if ctrl_conv > 0 else 0
ctrl_revenue = stats["control"]["revenue_per_user"]
treat_revenue = stats["treatment"]["revenue_per_user"]
revenue_lift = ((treat_revenue - ctrl_revenue) / ctrl_revenue * 100) if ctrl_revenue > 0 else 0
return {
"conversion_lift_percentage": conversion_lift,
"revenue_lift_percentage": revenue_lift,
"winner": "treatment" if conversion_lift > 0 else "control",
}
@flow
def ab_test_analysis_flow(experiment_id: str) -> dict:
"""Analyze A/B test results."""
control, treatment = fetch_experiment_results(experiment_id=experiment_id)
stats = compute_statistics(control=control, treatment=treatment)
lift = compute_lift(stats=stats)
return {
"experiment_id": experiment_id,
"statistics": stats,
"lift": lift,
"recommendation": f"Choose {lift['winner']} with {abs(lift['conversion_lift_percentage']):.1f}% lift",
}
# Local execution
if __name__ == "__main__":
result = ab_test_analysis_flow.run_local(experiment_id="exp_20240101")
print(f"Winner: {result['lift']['winner']}")
print(f"Conversion lift: {result['lift']['conversion_lift_percentage']:.2f}%")
print(f"Recommendation: {result['recommendation']}")
Key Concepts:
- Statistical metrics computation
- Control vs treatment comparison
- Lift calculation and significance testing
24. Data Labeling Pipeline
Prepare data for manual or automated labeling.
from dagy import flow, task
from typing import List, Dict
@task
def load_unlabeled_data(source: str) -> List[Dict]:
"""Load data that needs labeling."""
return [
{"id": 1, "text": "Great product!", "category": None},
{"id": 2, "text": "Terrible quality", "category": None},
{"id": 3, "text": "Average at best", "category": None},
]
@task
def preprocess_for_labeling(data: List[Dict]) -> List[Dict]:
"""Preprocess data to make labeling easier."""
processed = []
for record in data:
processed_record = record.copy()
# Normalize text
processed_record["text_normalized"] = record["text"].lower().strip()
# Add metadata
processed_record["text_length"] = len(record["text"])
processed_record["word_count"] = len(record["text"].split())
processed_record["ready_for_labeling"] = True
processed.append(processed_record)
return processed
@task
def create_labeling_batch(
data: List[Dict],
batch_size: int = 10
) -> List[List[Dict]]:
"""Create batches for labeling."""
batches = []
for i in range(0, len(data), batch_size):
batches.append(data[i:i + batch_size])
return batches
@task
def store_labeling_tasks(batches: List[List[Dict]], destination: str) -> dict:
"""Store labeling tasks for workers."""
return {
"total_tasks": sum(len(b) for b in batches),
"batch_count": len(batches),
"destination": destination,
"task_format": "sentiment_classification",
}
@flow
def labeling_pipeline(source: str) -> dict:
"""Prepare data for labeling."""
data = load_unlabeled_data(source=source)
preprocessed = preprocess_for_labeling(data=data)
batches = create_labeling_batch(data=preprocessed, batch_size=10)
result = store_labeling_tasks(batches=batches, destination="/labeling/tasks")
return {
"result": result,
"total_records": len(data),
}
# Local execution
if __name__ == "__main__":
result = labeling_pipeline.run_local(source="feedback_data")
print(f"Total records: {result['total_records']}")
print(f"Batches created: {result['result']['batch_count']}")
Key Concepts:
- Data normalization for labeling
- Batch creation for workers
- Task storage and tracking
25. Model Monitoring Pipeline
Detect data drift and model degradation.
from dagy import flow, task
from typing import Dict
@task
def fetch_model_predictions(model_id: str, time_window: str = "1d") -> list:
"""Fetch recent model predictions."""
# Simulated prediction data
return [
{"timestamp": "2024-01-01T10:00:00Z", "prediction": 0.85, "actual": 1},
{"timestamp": "2024-01-01T11:00:00Z", "prediction": 0.72, "actual": 0},
{"timestamp": "2024-01-01T12:00:00Z", "prediction": 0.91, "actual": 1},
]
@task
def compute_performance_metrics(predictions: list) -> dict:
"""Compute current performance metrics."""
correct = sum(1 for p in predictions if (p["prediction"] > 0.5) == p["actual"])
accuracy = correct / len(predictions) if predictions else 0
return {
"accuracy": accuracy,
"prediction_count": len(predictions),
"avg_confidence": sum(p["prediction"] for p in predictions) / len(predictions) if predictions else 0,
}
@task
def detect_drift(current_metrics: dict, baseline_metrics: dict = None) -> dict:
"""Detect if model performance has degraded."""
if baseline_metrics is None:
baseline_metrics = {"accuracy": 0.92, "avg_confidence": 0.87}
accuracy_drift = baseline_metrics["accuracy"] - current_metrics["accuracy"]
confidence_drift = baseline_metrics["avg_confidence"] - current_metrics["avg_confidence"]
has_drift = abs(accuracy_drift) > 0.05
return {
"has_drift": has_drift,
"accuracy_change": accuracy_drift,
"confidence_change": confidence_drift,
"drift_severity": "high" if abs(accuracy_drift) > 0.1 else "low" if has_drift else "none",
}
@task
def generate_alert(drift_info: dict) -> dict:
"""Generate alert if significant drift detected."""
if drift_info["has_drift"]:
return {
"alert_level": "warning" if drift_info["drift_severity"] == "high" else "info",
"message": f"Model drift detected: {drift_info['accuracy_change']:.2%}",
"action_required": drift_info["drift_severity"] == "high",
}
return {"alert_level": "none", "message": "Model performing normally", "action_required": False}
@flow
def monitoring_pipeline(model_id: str) -> dict:
"""Monitor model performance and detect drift."""
predictions = fetch_model_predictions(model_id=model_id)
metrics = compute_performance_metrics(predictions=predictions)
drift = detect_drift(current_metrics=metrics)
alert = generate_alert(drift_info=drift)
return {
"model_id": model_id,
"metrics": metrics,
"drift": drift,
"alert": alert,
}
# Local execution
if __name__ == "__main__":
result = monitoring_pipeline.run_local(model_id="model_v1")
print(f"Accuracy: {result['metrics']['accuracy']:.2%}")
print(f"Alert: {result['alert']['message']}")
Key Concepts:
- Performance metric tracking
- Drift detection algorithms
- Alert generation and escalation
File Processing
26. Image Resize Pipeline
Batch resize images and maintain aspect ratio.
from dagy import flow, task
from typing import List, Tuple
@task
def list_image_files(directory: str) -> List[str]:
"""List all image files in directory."""
# Simulated file listing
return [
f"{directory}/image_001.jpg",
f"{directory}/image_002.jpg",
f"{directory}/image_003.png",
]
@task
def resize_image(
image_path: str,
target_width: int,
target_height: int,
) -> dict:
"""Resize single image maintaining aspect ratio."""
# Simulated image resize
filename = image_path.split("/")[-1]
return {
"original": image_path,
"output": f"/resized/{filename}",
"original_dims": (1920, 1080),
"target_dims": (target_width, target_height),
"maintained_aspect_ratio": True,
}
@task
def batch_resize(
images: List[str],
target_width: int = 800,
target_height: int = 600,
) -> List[dict]:
"""Resize multiple images in batch."""
results = []
for image in images:
result = resize_image(image, target_width, target_height)
results.append(result)
return results
@task
def compute_resize_stats(results: List[dict]) -> dict:
"""Compute batch resize statistics."""
return {
"total_images": len(results),
"successful": len(results),
"total_space_saved": sum(
(1920 * 1080 - 800 * 600)
for _ in results
),
"avg_compression": 0.65,
}
@flow
def image_resize_pipeline(
source_directory: str,
target_width: int = 800,
target_height: int = 600,
) -> dict:
"""Resize batch of images."""
images = list_image_files(directory=source_directory)
resized = batch_resize(images=images, target_width=target_width, target_height=target_height)
stats = compute_resize_stats(results=resized)
return {
"stats": stats,
"outputs": resized,
}
# Local execution
if __name__ == "__main__":
result = image_resize_pipeline.run_local(
source_directory="/images/originals",
target_width=800,
target_height=600
)
print(f"Resized: {result['stats']['total_images']} images")
print(f"Space saved: {result['stats']['total_space_saved']} bytes")
Key Concepts:
- Batch file processing
- Aspect ratio preservation
- Compression metrics
27. PDF Processing Pipeline
Extract text and metadata from PDF files.
from dagy import flow, task
from typing import List, Dict
@task
def list_pdf_files(directory: str) -> List[str]:
"""List PDF files in directory."""
return [
f"{directory}/document_001.pdf",
f"{directory}/document_002.pdf",
]
@task
def extract_pdf_content(pdf_path: str) -> dict:
"""Extract text and metadata from PDF."""
# Simulated PDF extraction
return {
"path": pdf_path,
"title": "Sample Document",
"pages": 5,
"text": "Lorem ipsum dolor sit amet...",
"metadata": {
"author": "John Doe",
"created": "2024-01-01",
"modified": "2024-01-15",
},
}
@task
def batch_extract_pdfs(pdf_files: List[str]) -> List[dict]:
"""Extract content from multiple PDFs."""
results = []
for pdf_file in pdf_files:
content = extract_pdf_content(pdf_path=pdf_file)
results.append(content)
return results
@task
def index_pdf_content(extractions: List[dict]) -> dict:
"""Create searchable index of extracted content."""
index = {
"total_documents": len(extractions),
"total_pages": sum(doc["pages"] for doc in extractions),
"documents": [
{
"path": doc["path"],
"title": doc["title"],
"author": doc["metadata"].get("author"),
"page_count": doc["pages"],
}
for doc in extractions
],
}
return index
@flow
def pdf_processing_flow(directory: str) -> dict:
"""Extract and index PDF content."""
pdfs = list_pdf_files(directory=directory)
extractions = batch_extract_pdfs(pdf_files=pdfs)
index = index_pdf_content(extractions=extractions)
return index
# Local execution
if __name__ == "__main__":
result = pdf_processing_flow.run_local(directory="/documents/pdfs")
print(f"Documents processed: {result['total_documents']}")
print(f"Total pages: {result['total_pages']}")
Key Concepts:
- PDF text extraction
- Metadata parsing
- Content indexing
28. Log File Analyzer
Parse, aggregate, and analyze log files.
from dagy import flow, task
from typing import List, Dict
import re
from collections import Counter
@task
def read_log_file(log_path: str) -> List[str]:
"""Read log file line by line."""
# Simulated log data
return [
"2024-01-01T10:00:00 ERROR Database connection failed",
"2024-01-01T10:01:00 INFO Request processed successfully",
"2024-01-01T10:02:00 WARNING Slow query detected",
"2024-01-01T10:03:00 ERROR Authentication failed for user_123",
"2024-01-01T10:04:00 INFO Request processed successfully",
]
@task
def parse_log_lines(lines: List[str]) -> List[dict]:
"""Parse log lines into structured format."""
parsed = []
pattern = r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) (\w+) (.*)"
for line in lines:
match = re.match(pattern, line)
if match:
parsed.append({
"timestamp": match.group(1),
"level": match.group(2),
"message": match.group(3),
})
return parsed
@task
def aggregate_log_stats(logs: List[dict]) -> dict:
"""Aggregate logging statistics."""
level_counts = Counter(log["level"] for log in logs)
return {
"total_logs": len(logs),
"by_level": dict(level_counts),
"error_count": level_counts.get("ERROR", 0),
"warning_count": level_counts.get("WARNING", 0),
"info_count": level_counts.get("INFO", 0),
}
@task
def identify_patterns(logs: List[dict]) -> List[str]:
"""Identify common error patterns."""
errors = [log["message"] for log in logs if log["level"] == "ERROR"]
# Find most common error types
return errors[:3] if errors else []
@flow
def log_analysis_flow(log_path: str) -> dict:
"""Analyze log file for patterns and statistics."""
lines = read_log_file(log_path=log_path)
parsed = parse_log_lines(lines=lines)
stats = aggregate_log_stats(logs=parsed)
patterns = identify_patterns(logs=parsed)
return {
"statistics": stats,
"error_patterns": patterns,
"log_sample": parsed[:3],
}
# Local execution
if __name__ == "__main__":
result = log_analysis_flow.run_local(log_path="/var/log/app.log")
print(f"Total logs: {result['statistics']['total_logs']}")
print(f"Error count: {result['statistics']['error_count']}")
print(f"Patterns: {result['error_patterns']}")
Key Concepts:
- Regex-based log parsing
- Log level aggregation
- Pattern identification
29. Archive and Cleanup
Compress old files and delete expired content.
from dagy import flow, task
from datetime import datetime, timedelta
from typing import List, Tuple
@task
def find_old_files(directory: str, days_old: int = 30) -> List[dict]:
"""Find files older than specified days."""
# Simulated old file discovery
cutoff_date = datetime.now() - timedelta(days=days_old)
return [
{"path": f"{directory}/file_001.log", "size": 5242880, "modified": (datetime.now() - timedelta(days=45)).isoformat()},
{"path": f"{directory}/file_002.log", "size": 3145728, "modified": (datetime.now() - timedelta(days=60)).isoformat()},
]
@task
def archive_files(old_files: List[dict], archive_path: str) -> dict:
"""Compress old files into archive."""
total_size = sum(f["size"] for f in old_files)
# Simulated compression (assume 70% compression ratio)
compressed_size = int(total_size * 0.3)
return {
"files_archived": len(old_files),
"original_size": total_size,
"compressed_size": compressed_size,
"archive_path": archive_path,
"compression_ratio": 1 - (compressed_size / total_size),
}
@task
def delete_expired_files(old_files: List[dict]) -> dict:
"""Delete files after archiving."""
deleted_count = len(old_files)
total_freed = sum(f["size"] for f in old_files)
return {
"files_deleted": deleted_count,
"space_freed": total_freed,
"status": "success",
}
@flow
def archive_cleanup_flow(directory: str, days_old: int = 30) -> dict:
"""Archive old files and cleanup."""
old_files = find_old_files(directory=directory, days_old=days_old)
archive_result = archive_files(old_files=old_files, archive_path=f"/archive/{datetime.now().isoformat()}.tar.gz")
cleanup_result = delete_expired_files(old_files=old_files)
return {
"archive": archive_result,
"cleanup": cleanup_result,
"total_space_freed": archive_result["original_size"],
}
# Local execution
if __name__ == "__main__":
result = archive_cleanup_flow.run_local(directory="/var/log", days_old=30)
print(f"Archived: {result['archive']['files_archived']} files")
print(f"Space freed: {result['cleanup']['space_freed']} bytes")
Key Concepts:
- File age detection
- Compression and archiving
- Cleanup operations
30. File Format Migration
Convert between different file formats.
from dagy import flow, task
from typing import List
@task
def list_files_to_convert(directory: str, source_format: str) -> List[str]:
"""List files of source format."""
return [
f"{directory}/data_001.csv",
f"{directory}/data_002.csv",
]
@task
def convert_format(
file_path: str,
source_format: str,
target_format: str,
) -> dict:
"""Convert single file between formats."""
filename = file_path.split("/")[-1].rsplit(".", 1)[0]
output_path = f"/converted/{filename}.{target_format}"
return {
"source": file_path,
"target": output_path,
"source_format": source_format,
"target_format": target_format,
"status": "success",
}
@task
def batch_convert(
files: List[str],
source_format: str,
target_format: str,
) -> List[dict]:
"""Convert multiple files in batch."""
results = []
for file in files:
result = convert_format(file, source_format, target_format)
results.append(result)
return results
@task
def validate_conversions(conversions: List[dict]) -> dict:
"""Validate conversion results."""
successful = len([c for c in conversions if c["status"] == "success"])
return {
"total_conversions": len(conversions),
"successful": successful,
"failed": len(conversions) - successful,
"success_rate": successful / len(conversions) if conversions else 0,
}
@flow
def format_migration_flow(
directory: str,
source_format: str = "csv",
target_format: str = "json",
) -> dict:
"""Migrate files between formats."""
files = list_files_to_convert(directory=directory, source_format=source_format)
conversions = batch_convert(
files=files,
source_format=source_format,
target_format=target_format
)
validation = validate_conversions(conversions=conversions)
return {
"validation": validation,
"conversions": conversions,
}
# Local execution
if __name__ == "__main__":
result = format_migration_flow.run_local(
directory="/data",
source_format="csv",
target_format="json"
)
print(f"Success rate: {result['validation']['success_rate']:.1%}")
Key Concepts:
- File format conversion
- Batch processing
- Conversion validation
Retry & Error Handling
31. Basic Retry
Simple retry with fixed delay between attempts.
from dagy import flow, task
@task(
retries=3,
retry_delay_seconds=2,
)
def unreliable_api_call(endpoint: str) -> dict:
"""Call external API that may fail."""
import random
# Simulates random failures (70% success rate)
if random.random() < 0.7:
return {"status": "success", "data": "API response"}
else:
raise Exception("API temporarily unavailable")
@task
def process_response(response: dict) -> str:
"""Process successful API response."""
return f"Processed: {response['data']}"
@flow
def basic_retry_flow(endpoint: str) -> str:
"""Flow with basic retry on API calls."""
response = unreliable_api_call(endpoint=endpoint)
result = process_response(response=response)
return result
# Local execution
if __name__ == "__main__":
result = basic_retry_flow.run_local(endpoint="https://api.example.com/data")
print(result)
Key Concepts:
retriesparameter sets retry countretry_delay_secondsfor fixed delays between retries- Automatic retry on exception
32. Exponential Backoff
Increasing delays between retries to avoid overwhelming system.
from dagy import flow, task
def exponential_backoff_schedule(attempt: int) -> float:
"""Calculate exponential backoff delay."""
# 2^attempt seconds: 1, 2, 4, 8, 16...
return min(2 ** attempt, 60) # Cap at 60 seconds
@task(
retries=5,
retry_delay_seconds=1, # Base delay
retry_condition_fn=lambda ex: "timeout" in str(ex).lower() or "unavailable" in str(ex).lower(),
)
def flaky_database_call(query: str) -> list:
"""Database call that may timeout."""
import random
if random.random() < 0.8:
return [{"id": 1, "value": "data"}]
else:
raise TimeoutError("Database connection timeout")
@task
def aggregate_results(data: list) -> dict:
"""Aggregate database results."""
return {
"rows": len(data),
"data": data,
}
@flow
def exponential_backoff_flow(query: str) -> dict:
"""Flow with exponential backoff retry strategy."""
data = flaky_database_call(query=query)
result = aggregate_results(data=data)
return result
# Local execution
if __name__ == "__main__":
result = exponential_backoff_flow.run_local(query="SELECT * FROM users")
print(result)
Key Concepts:
- Exponential backoff reduces system load
retry_condition_fnfor selective retries- Backoff cap prevents excessive waits
33. Retry with Jitter
Add randomness to prevent thundering herd problem.
from dagy import flow, task
import random
@task(
retries=4,
retry_delay_seconds=2,
retry_jitter_factor=0.3, # Add up to 30% random jitter
)
def rate_limited_endpoint(item_id: str) -> dict:
"""Call rate-limited endpoint."""
import random
# Simulates rate limiting
if random.random() < 0.6:
return {"item_id": item_id, "data": "success"}
else:
raise Exception("Rate limit exceeded")
@task
def process_item(item_data: dict) -> str:
"""Process fetched item."""
return f"Processed item {item_data['item_id']}"
@flow
def jitter_retry_flow(item_id: str) -> str:
"""Flow with jittered retry delays."""
item = rate_limited_endpoint(item_id=item_id)
result = process_item(item_data=item)
return result
# Local execution
if __name__ == "__main__":
result = jitter_retry_flow.run_local(item_id="item_123")
print(result)
Key Concepts:
retry_jitter_factoradds randomness- Prevents synchronized retries from multiple clients
- Improves overall system stability
34. Conditional Retry
Retry only on specific exception types.
from dagy import flow, task
class TransientError(Exception):
"""Temporary error that should be retried."""
pass
class PermanentError(Exception):
"""Permanent error that shouldn't be retried."""
pass
def should_retry(exception: Exception) -> bool:
"""Determine if exception warrants retry."""
return isinstance(exception, TransientError)
@task(
retries=3,
retry_delay_seconds=2,
retry_condition_fn=should_retry,
)
def operation_with_error_types(attempt_count: int = 0) -> str:
"""Operation that may fail with different error types."""
import random
error_type = random.choice(["transient", "permanent", "success"])
if error_type == "success":
return "Operation successful"
elif error_type == "transient":
raise TransientError("Temporary network issue")
else:
raise PermanentError("Invalid input provided")
@task
def handle_result(result: str) -> dict:
"""Handle operation result."""
return {"result": result, "status": "completed"}
@flow
def conditional_retry_flow() -> dict:
"""Flow with conditional retry logic."""
try:
result = operation_with_error_types()
return handle_result(result=result)
except PermanentError as e:
return {"error": str(e), "status": "failed", "retryable": False}
# Local execution
if __name__ == "__main__":
result = conditional_retry_flow.run_local()
print(result)
Key Concepts:
retry_condition_fnfilters which exceptions trigger retries- Custom exception types for different failure modes
- Fail-fast on permanent errors
35. Custom Retry Strategy
Implement custom retry logic beyond standard parameters.
from dagy import flow, task
from datetime import datetime, timedelta
@task
def smart_retry_call(
endpoint: str,
max_attempts: int = 5,
) -> dict:
"""Task with custom retry strategy."""
import random
# Simulate call with exponential backoff
for attempt in range(1, max_attempts + 1):
try:
if random.random() < 0.7:
return {
"success": True,
"attempt": attempt,
"timestamp": datetime.now().isoformat(),
"endpoint": endpoint,
}
else:
raise Exception(f"Attempt {attempt} failed")
except Exception as e:
if attempt == max_attempts:
raise
# Exponential backoff within task
wait_time = min(2 ** (attempt - 1), 30)
print(f"Retry {attempt + 1} after {wait_time}s")
@task
def log_retry_details(result: dict) -> str:
"""Log details about retry process."""
return f"Succeeded on attempt {result['attempt']}"
@flow
def custom_retry_flow(endpoint: str) -> str:
"""Flow with custom retry strategy embedded in task."""
result = smart_retry_call(endpoint=endpoint, max_attempts=5)
log_msg = log_retry_details(result=result)
return log_msg
# Local execution
if __name__ == "__main__":
result = custom_retry_flow.run_local(endpoint="https://api.example.com")
print(result)
Key Concepts:
- Embedded retry logic for complex strategies
- Custom backoff calculations
- Attempt tracking
36. Timeout with Fallback
Handle timeout gracefully with fallback logic.
from dagy import flow, task
import time
@task(
timeout_seconds=5,
)
def slow_operation(duration: float = 3) -> str:
"""Operation that may timeout."""
time.sleep(duration)
return "Operation completed"
@task
def fallback_operation(original_input: str) -> str:
"""Fallback when timeout occurs."""
return f"Using cached result for {original_input}"
@task
def merge_results(primary: dict, fallback: str = None) -> dict:
"""Merge primary and fallback results."""
return {
"primary_success": "error" not in str(primary),
"used_fallback": fallback is not None,
"result": primary if "error" not in str(primary) else fallback,
}
@flow
def timeout_fallback_flow(query: str) -> dict:
"""Handle timeout with fallback strategy."""
try:
primary = slow_operation(duration=3)
fallback_result = None
except TimeoutError:
primary = {"error": "Operation timed out"}
fallback_result = fallback_operation(original_input=query)
result = merge_results(primary=primary, fallback=fallback_result)
return result
# Local execution
if __name__ == "__main__":
result = timeout_fallback_flow.run_local(query="search_term")
print(result)
Key Concepts:
timeout_secondsparameter for task timeouts- Fallback execution on timeout
- Graceful degradation pattern
Scheduling
37. Daily Report
Cron-scheduled daily aggregation and reporting.
from dagy import flow, task
from datetime import datetime, timedelta
@task
def fetch_daily_metrics(date: str) -> dict:
"""Fetch metrics for specified date."""
return {
"date": date,
"active_users": 1250,
"transactions": 3400,
"revenue": 45230.50,
"errors": 12,
}
@task
def compute_daily_summary(metrics: dict) -> dict:
"""Compute daily summary statistics."""
return {
**metrics,
"avg_transaction": metrics["revenue"] / metrics["transactions"],
"error_rate": (metrics["errors"] / metrics["transactions"] * 100),
}
@task
def generate_report(summary: dict) -> str:
"""Generate human-readable daily report."""
return f"""
Daily Report - {summary['date']}
================================
Active Users: {summary['active_users']}
Transactions: {summary['transactions']}
Revenue: ${summary['revenue']:,.2f}
Errors: {summary['errors']} ({summary['error_rate']:.2f}%)
Avg Transaction: ${summary['avg_transaction']:.2f}
"""
@task
def send_report_email(report: str, recipients: list) -> dict:
"""Send report via email."""
return {
"status": "sent",
"recipients": recipients,
"timestamp": datetime.now().isoformat(),
}
@flow(
name="daily_report",
description="Daily metrics aggregation and report",
)
def daily_report_flow(report_date: str = None) -> dict:
"""Daily scheduled reporting flow."""
if report_date is None:
report_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
metrics = fetch_daily_metrics(date=report_date)
summary = compute_daily_summary(metrics=metrics)
report = generate_report(summary=summary)
email_result = send_report_email(
report=report,
recipients=["ops@example.com", "ceo@example.com"]
)
return {"report": report, "email": email_result}
# Local execution (would be scheduled via deployment)
if __name__ == "__main__":
result = daily_report_flow.run_local(report_date="2024-01-01")
print(result["report"])
Key Concepts:
- Daily scheduling pattern
- Date parameter for flexibility
- Email notifications
38. Hourly Sync
Interval-based data synchronization.
from dagy import flow, task
from datetime import datetime, timedelta
@task
def check_sync_status(last_sync: str = None) -> dict:
"""Check when last sync occurred."""
if last_sync is None:
last_sync = (datetime.now() - timedelta(hours=1)).isoformat()
return {
"last_sync": last_sync,
"time_since_sync": 3600, # seconds
"needs_sync": True,
}
@task
def fetch_incremental_data(since: str) -> list:
"""Fetch data changed since last sync."""
return [
{"id": 1, "changed_at": "2024-01-01T10:00:00Z", "action": "create"},
{"id": 2, "changed_at": "2024-01-01T10:30:00Z", "action": "update"},
{"id": 3, "changed_at": "2024-01-01T11:00:00Z", "action": "delete"},
]
@task
def sync_to_target(data: list) -> dict:
"""Sync data to target system."""
return {
"synced_count": len(data),
"timestamp": datetime.now().isoformat(),
"status": "success",
}
@task
def update_sync_checkpoint(sync_time: str) -> str:
"""Update checkpoint for next sync."""
return f"Checkpoint updated to {sync_time}"
@flow(
name="hourly_sync",
description="Hourly incremental data sync",
)
def hourly_sync_flow(last_sync: str = None) -> dict:
"""Hourly data synchronization."""
status = check_sync_status(last_sync=last_sync)
data = fetch_incremental_data(since=status["last_sync"])
sync_result = sync_to_target(data=data)
checkpoint = update_sync_checkpoint(sync_time=datetime.now().isoformat())
return {
"sync_result": sync_result,
"checkpoint": checkpoint,
}
# Local execution
if __name__ == "__main__":
result = hourly_sync_flow.run_local()
print(result)
Key Concepts:
- Hourly execution pattern
- Incremental data fetching
- Checkpoint tracking
39. Business Hours Only
Cron scheduling for weekdays 9-5.
from dagy import flow, task
from datetime import datetime
@task
def get_business_requests(queue: str) -> list:
"""Get requests from business hours queue."""
return [
{"id": 1, "type": "support", "priority": "high"},
{"id": 2, "type": "feature", "priority": "medium"},
]
@task
def process_request(request: dict) -> dict:
"""Process single business request."""
return {
**request,
"processed_at": datetime.now().isoformat(),
"status": "completed",
}
@task
def batch_process(requests: list) -> list:
"""Process multiple requests."""
return [process_request(req) for req in requests]
@task
def send_summary(processed: list) -> dict:
"""Send summary to business team."""
return {
"total_processed": len(processed),
"timestamp": datetime.now().isoformat(),
}
@flow(
name="business_hours_processor",
description="Process requests during business hours",
)
def business_hours_flow() -> dict:
"""Process requests only during business hours."""
requests = get_business_requests(queue="business")
processed = batch_process(requests=requests)
summary = send_summary(processed=processed)
return {
"processed_requests": processed,
"summary": summary,
}
# Local execution
if __name__ == "__main__":
result = business_hours_flow.run_local()
print(f"Processed: {result['summary']['total_processed']}")
Key Concepts:
- Business hours scheduling (weekdays 9-5)
- Request batching
- Team notifications
40. Monthly Billing
First-of-month processing.
from dagy import flow, task
from datetime import datetime
@task
def identify_billable_accounts() -> list:
"""Find accounts to bill this month."""
return [
{"account_id": "acct_1", "subscription_tier": "pro", "amount": 99.99},
{"account_id": "acct_2", "subscription_tier": "enterprise", "amount": 499.99},
{"account_id": "acct_3", "subscription_tier": "pro", "amount": 99.99},
]
@task
def generate_invoices(accounts: list) -> list:
"""Generate invoices for all accounts."""
invoices = []
for account in accounts:
invoices.append({
"invoice_id": f"inv_{account['account_id']}_{datetime.now().strftime('%Y%m%d')}",
"account_id": account["account_id"],
"amount": account["amount"],
"date": datetime.now().isoformat(),
})
return invoices
@task
def process_payments(invoices: list) -> dict:
"""Process payments for invoices."""
return {
"invoices_processed": len(invoices),
"total_revenue": sum(inv["amount"] for inv in invoices),
"timestamp": datetime.now().isoformat(),
}
@task
def notify_accounts(invoices: list) -> str:
"""Send payment notifications."""
return f"Notified {len(invoices)} accounts"
@flow(
name="monthly_billing",
description="Monthly billing and invoice processing",
)
def monthly_billing_flow() -> dict:
"""Monthly billing pipeline."""
accounts = identify_billable_accounts()
invoices = generate_invoices(accounts=accounts)
payment_result = process_payments(invoices=invoices)
notification = notify_accounts(invoices=invoices)
return {
"payment_result": payment_result,
"notification": notification,
}
# Local execution
if __name__ == "__main__":
result = monthly_billing_flow.run_local()
print(f"Revenue: ${result['payment_result']['total_revenue']:.2f}")
Key Concepts:
- Monthly scheduling pattern
- Batch invoice generation
- Payment processing orchestration
41. One-Time Migration
Run-once setup flow for data migration.
from dagy import flow, task
@task
def backup_current_state() -> str:
"""Create backup before migration."""
return f"Backup created at /backup/{__import__('datetime').datetime.now().isoformat()}"
@task
def migrate_data(backup_location: str) -> dict:
"""Perform data migration."""
return {
"backup_location": backup_location,
"records_migrated": 5000,
"errors": 0,
"status": "success",
}
@task
def validate_migration(result: dict) -> bool:
"""Validate migration results."""
return result["errors"] == 0
@task
def commit_migration(validated: bool) -> dict:
"""Finalize migration if valid."""
if validated:
return {
"migration_status": "committed",
"timestamp": __import__("datetime").datetime.now().isoformat(),
}
return {"migration_status": "rolled_back"}
@flow(
name="data_migration",
description="One-time data migration flow",
)
def migration_flow() -> dict:
"""Execute one-time data migration."""
backup = backup_current_state()
migration = migrate_data(backup_location=backup)
is_valid = validate_migration(result=migration)
commit = commit_migration(validated=is_valid)
return {
"backup": backup,
"migration": migration,
"commit": commit,
}
# Local execution
if __name__ == "__main__":
result = migration_flow.run_local()
print(f"Status: {result['commit']['migration_status']}")
Key Concepts:
- One-time migration pattern
- Backup and validation
- Commit/rollback logic
Notifications & Monitoring
42. Failure Alert Flow
Notify operators when pipeline fails.
from dagy import flow, task
@task(
on_failure="log_failure"
)
def risky_operation(item_id: str) -> dict:
"""Operation that might fail."""
if item_id == "bad_item":
raise ValueError(f"Cannot process {item_id}")
return {"item_id": item_id, "processed": True}
@task
def log_failure(error: Exception, context: dict) -> str:
"""Log failure details."""
return f"Failure logged: {str(error)}"
@task
def send_alert(error_message: str) -> dict:
"""Send alert notification."""
return {
"alert_sent": True,
"channel": "slack",
"message": f"Pipeline failed: {error_message}",
"recipients": ["ops@example.com"],
}
@task
def create_incident(error_info: str) -> str:
"""Create incident ticket."""
return f"Incident #INC-001 created for {error_info}"
@flow
def failure_alert_flow(item_id: str) -> dict:
"""Flow with failure notifications."""
try:
result = risky_operation(item_id=item_id)
return {"status": "success", "result": result}
except Exception as e:
error_msg = str(e)
log_msg = log_failure(error=e, context={"item_id": item_id})
alert = send_alert(error_message=error_msg)
incident = create_incident(error_info=error_msg)
return {
"status": "failed",
"error": error_msg,
"alert": alert,
"incident": incident,
}
# Local execution
if __name__ == "__main__":
result = failure_alert_flow.run_local(item_id="bad_item")
print(result)
Key Concepts:
- Error handling with callbacks
- Alert escalation
- Incident tracking
43. SLA Monitoring Flow
Alert when run duration exceeds SLA threshold.
from dagy import flow, task
import time
from datetime import datetime
@task
def start_sla_timer() -> dict:
"""Record start time for SLA tracking."""
return {"start_time": datetime.now()}
@task
def execute_work(duration: float = 2) -> dict:
"""Execute main work."""
time.sleep(duration)
return {"work_completed": True}
@task
def check_sla(start_time: dict, sla_seconds: int = 5) -> dict:
"""Check if SLA was met."""
end_time = datetime.now()
elapsed = (end_time - start_time["start_time"]).total_seconds()
return {
"elapsed_seconds": elapsed,
"sla_seconds": sla_seconds,
"sla_met": elapsed <= sla_seconds,
"exceeded_by": max(0, elapsed - sla_seconds),
}
@task
def alert_sla_breach(sla_info: dict) -> dict:
"""Alert if SLA breached."""
if not sla_info["sla_met"]:
return {
"alert_level": "warning",
"message": f"SLA breached by {sla_info['exceeded_by']:.1f}s",
"severity": "high" if sla_info["exceeded_by"] > 10 else "medium",
}
return {"alert_level": "none", "message": "SLA met"}
@flow
def sla_monitoring_flow(sla_threshold: int = 5) -> dict:
"""Monitor operation against SLA."""
timer = start_sla_timer()
work = execute_work(duration=3)
sla_check = check_sla(start_time=timer, sla_seconds=sla_threshold)
alert = alert_sla_breach(sla_info=sla_check)
return {
"sla_check": sla_check,
"alert": alert,
}
# Local execution
if __name__ == "__main__":
result = sla_monitoring_flow.run_local(sla_threshold=5)
print(f"SLA: {result['sla_check']['sla_met']}")
print(f"Alert: {result['alert']['message']}")
Key Concepts:
- SLA tracking
- Duration monitoring
- Performance alerts
44. Success Summary
Send completion digest with metrics.
from dagy import flow, task
from datetime import datetime
@task
def execute_workflow_steps() -> dict:
"""Execute main workflow."""
return {
"tasks_completed": 5,
"records_processed": 1250,
"errors": 0,
"start_time": datetime.now(),
}
@task
def compute_metrics(result: dict) -> dict:
"""Compute workflow metrics."""
return {
**result,
"success_rate": 1.0,
"end_time": datetime.now(),
"duration_seconds": 42,
"throughput": result["records_processed"] / 42,
}
@task
def format_summary(metrics: dict) -> str:
"""Format metrics as summary message."""
return f"""
Workflow Completed Successfully
===============================
Tasks Completed: {metrics['tasks_completed']}
Records Processed: {metrics['records_processed']}
Duration: {metrics['duration_seconds']}s
Throughput: {metrics['throughput']:.1f} records/sec
Success Rate: {metrics['success_rate']:.1%}
Errors: {metrics['errors']}
"""
@task
def send_summary_email(summary: str, recipients: list) -> dict:
"""Send summary to stakeholders."""
return {
"status": "sent",
"recipients": recipients,
"timestamp": datetime.now().isoformat(),
}
@flow
def success_summary_flow(recipients: list = None) -> dict:
"""Send success completion digest."""
if recipients is None:
recipients = ["team@example.com"]
workflow_result = execute_workflow_steps()
metrics = compute_metrics(result=workflow_result)
summary = format_summary(metrics=metrics)
email = send_summary_email(summary=summary, recipients=recipients)
return {
"summary": summary,
"email_status": email["status"],
}
# Local execution
if __name__ == "__main__":
result = success_summary_flow.run_local()
print(result["summary"])
Key Concepts:
- Completion metrics
- Summary formatting
- Team notifications
45. Multi-Channel Alert
Send alerts to Slack and email simultaneously.
from dagy import flow, task
from datetime import datetime
@task
def detect_alert_condition(metric: float, threshold: float = 0.8) -> bool:
"""Check if alert condition is met."""
return metric > threshold
@task
def format_alert_message(condition: bool, metric: float) -> str:
"""Format alert message."""
if condition:
return f"ALERT: Metric {metric:.2f} exceeds threshold"
return f"INFO: Metric {metric:.2f} is normal"
@task
def send_slack_notification(message: str, channel: str = "#alerts") -> dict:
"""Send alert to Slack."""
return {
"channel": channel,
"message": message,
"status": "sent",
"timestamp": datetime.now().isoformat(),
}
@task
def send_email_notification(message: str, recipients: list) -> dict:
"""Send alert via email."""
return {
"recipients": recipients,
"subject": "System Alert",
"body": message,
"status": "sent",
}
@task
def aggregate_notifications(slack: dict, email: dict) -> dict:
"""Aggregate notification results."""
return {
"channels_notified": ["slack", "email"],
"slack": slack,
"email": email,
"all_sent": slack["status"] == "sent" and email["status"] == "sent",
}
@flow
def multi_channel_alert_flow(
metric: float,
threshold: float = 0.8,
recipients: list = None
) -> dict:
"""Send alerts to multiple channels."""
if recipients is None:
recipients = ["ops@example.com"]
has_alert = detect_alert_condition(metric=metric, threshold=threshold)
message = format_alert_message(condition=has_alert, metric=metric)
slack_result = send_slack_notification(message=message, channel="#alerts")
email_result = send_email_notification(message=message, recipients=recipients)
aggregated = aggregate_notifications(slack=slack_result, email=email_result)
return aggregated
# Local execution
if __name__ == "__main__":
result = multi_channel_alert_flow.run_local(metric=0.95, threshold=0.8)
print(f"All sent: {result['all_sent']}")
Key Concepts:
- Multiple notification channels
- Parallel alert dispatch
- Delivery confirmation
Advanced Patterns
46. Parallel Processing
Execute multiple independent tasks in parallel.
from dagy import flow, task
from typing import List
@task
def process_region_a(data: list) -> dict:
"""Process data for region A."""
return {
"region": "A",
"processed": len(data),
"revenue": sum(d.get("value", 0) for d in data),
}
@task
def process_region_b(data: list) -> dict:
"""Process data for region B."""
return {
"region": "B",
"processed": len(data),
"revenue": sum(d.get("value", 0) for d in data),
}
@task
def process_region_c(data: list) -> dict:
"""Process data for region C."""
return {
"region": "C",
"processed": len(data),
"revenue": sum(d.get("value", 0) for d in data),
}
@task
def aggregate_regional_results(
region_a: dict,
region_b: dict,
region_c: dict
) -> dict:
"""Combine results from all regions."""
all_results = [region_a, region_b, region_c]
return {
"total_processed": sum(r["processed"] for r in all_results),
"total_revenue": sum(r["revenue"] for r in all_results),
"by_region": all_results,
}
@flow
def parallel_processing_flow() -> dict:
"""Process multiple regions in parallel."""
region_data = [
{"region": "A", "data": [{"value": 100}, {"value": 200}]},
{"region": "B", "data": [{"value": 150}, {"value": 250}]},
{"region": "C", "data": [{"value": 120}, {"value": 180}]},
]
# Execute tasks in parallel
result_a = process_region_a(data=region_data[0]["data"])
result_b = process_region_b(data=region_data[1]["data"])
result_c = process_region_c(data=region_data[2]["data"])
aggregated = aggregate_regional_results(
region_a=result_a,
region_b=result_b,
region_c=result_c
)
return aggregated
# Local execution with parallel workers
if __name__ == "__main__":
result = parallel_processing_flow.run_local(max_workers=3)
print(f"Total revenue: ${result['total_revenue']}")
Key Concepts:
- Independent task execution
max_workersfor parallelism- Result aggregation from parallel paths
47. Diamond Dependency
A→B,C→D pattern with multiple dependencies.
from dagy import flow, task
@task
def fetch_user_data(user_id: str) -> dict:
"""Fetch initial user data."""
return {
"user_id": user_id,
"name": "Alice",
"account_id": "acct_123",
}
@task
def enrich_with_profile(user: dict) -> dict:
"""Enrich with profile information."""
return {
**user,
"profile": {
"tier": "premium",
"created": "2024-01-01",
}
}
@task
def enrich_with_activity(user: dict) -> dict:
"""Enrich with activity information."""
return {
**user,
"activity": {
"last_login": "2024-01-15",
"login_count": 42,
}
}
@task
def merge_enriched_data(profile_enriched: dict, activity_enriched: dict) -> dict:
"""Merge both enrichments."""
return {
"user_id": profile_enriched["user_id"],
"name": profile_enriched["name"],
"profile": profile_enriched.get("profile"),
"activity": activity_enriched.get("activity"),
"complete": True,
}
@flow
def diamond_dependency_flow(user_id: str) -> dict:
"""Execute diamond dependency pattern: A→B,C→D."""
# A: Fetch base data
user = fetch_user_data(user_id=user_id)
# B and C: Parallel enrichment (both depend on A)
profile = enrich_with_profile(user=user)
activity = enrich_with_activity(user=user)
# D: Merge results (depends on B and C)
merged = merge_enriched_data(profile_enriched=profile, activity_enriched=activity)
return merged
# Local execution
if __name__ == "__main__":
result = diamond_dependency_flow.run_local(user_id="usr_123")
print(result)
Key Concepts:
- Multiple dependency paths
- Parallel middle stages
- Convergence at final stage
48. Dynamic Parameters
Compute parameters at runtime for subsequent tasks.
from dagy import flow, task
import os
@task
def determine_execution_environment() -> dict:
"""Determine runtime environment and parameters."""
env = os.environ.get("ENV", "dev")
config = {
"dev": {"batch_size": 10, "timeout": 30, "retry_count": 3},
"staging": {"batch_size": 100, "timeout": 60, "retry_count": 2},
"prod": {"batch_size": 1000, "timeout": 120, "retry_count": 1},
}
return config.get(env, config["dev"])
@task
def fetch_data_with_config(config: dict) -> list:
"""Fetch data using dynamic configuration."""
batch_size = config["batch_size"]
return [{"id": i, "value": i * 10} for i in range(batch_size)]
@task
def process_batch(data: list, timeout: int) -> dict:
"""Process batch with computed timeout."""
return {
"processed": len(data),
"timeout_seconds": timeout,
"status": "success",
}
@flow
def dynamic_parameters_flow() -> dict:
"""Use runtime-computed parameters."""
config = determine_execution_environment()
data = fetch_data_with_config(config=config)
result = process_batch(data=data, timeout=config["timeout"])
return {
"config": config,
"result": result,
}
# Local execution
if __name__ == "__main__":
result = dynamic_parameters_flow.run_local()
print(f"Configuration: {result['config']}")
Key Concepts:
- Runtime parameter computation
- Environment-based configuration
- Dynamic task behavior
49. Checkpointing
Save intermediate state for recovery.
from dagy import flow, task
import json
@task
def load_checkpoint(checkpoint_file: str) -> dict:
"""Load previous checkpoint if exists."""
# Simulated checkpoint loading
return {
"checkpoint_exists": False,
"last_processed_id": 0,
"file": checkpoint_file,
}
@task
def process_with_checkpoint(
start_id: int,
batch_size: int = 5
) -> dict:
"""Process data starting from checkpoint."""
data = [{"id": i} for i in range(start_id, start_id + batch_size)]
return {
"processed_ids": [d["id"] for d in data],
"last_id": start_id + batch_size - 1,
"batch_size": len(data),
}
@task
def save_checkpoint(progress: dict, checkpoint_file: str) -> str:
"""Save progress checkpoint."""
checkpoint = {
"last_processed_id": progress["last_id"],
"timestamp": __import__("datetime").datetime.now().isoformat(),
}
# Simulated save
return f"Checkpoint saved: {checkpoint}"
@task
def resume_processing(progress: dict) -> dict:
"""Resume processing from checkpoint."""
return {
"resumed_from_id": progress["last_id"] + 1,
"status": "resumed",
}
@flow
def checkpointing_flow(checkpoint_file: str = "/checkpoint/state.json") -> dict:
"""Flow with checkpointing for recovery."""
checkpoint = load_checkpoint(checkpoint_file=checkpoint_file)
start_id = checkpoint.get("last_processed_id", 0)
progress = process_with_checkpoint(start_id=start_id, batch_size=5)
saved = save_checkpoint(progress=progress, checkpoint_file=checkpoint_file)
resumed = resume_processing(progress=progress)
return {
"progress": progress,
"checkpoint": saved,
"next_step": resumed,
}
# Local execution
if __name__ == "__main__":
result = checkpointing_flow.run_local()
print(result)
Key Concepts:
- State persistence
- Recovery from checkpoints
- Progress tracking
50. Idempotent Tasks
Design tasks safe to re-run without side effects.
from dagy import flow, task
import hashlib
@task
def generate_idempotent_key(input_data: dict) -> str:
"""Generate deterministic key from input."""
data_str = json.dumps(input_data, sort_keys=True)
return hashlib.sha256(data_str.encode()).hexdigest()
@task
def execute_idempotent_operation(
operation_id: str,
operation_data: dict
) -> dict:
"""Execute operation with idempotency check."""
# Simulated operation
return {
"operation_id": operation_id,
"data": operation_data,
"result": "processed",
"timestamp": __import__("datetime").datetime.now().isoformat(),
}
@task
def check_operation_exists(operation_id: str) -> bool:
"""Check if operation was already executed."""
# Simulated database lookup
return False # Not executed yet
@task
def store_operation_result(operation: dict) -> str:
"""Store result for idempotency tracking."""
return f"Operation {operation['operation_id']} stored"
@flow
def idempotent_flow(input_data: dict) -> dict:
"""Execute operation with idempotency guarantees."""
import json
operation_id = generate_idempotent_key(input_data=input_data)
# Check if already executed
exists = check_operation_exists(operation_id=operation_id)
if exists:
return {"status": "skipped", "reason": "already_executed", "operation_id": operation_id}
# Execute and store
result = execute_idempotent_operation(
operation_id=operation_id,
operation_data=input_data
)
stored = store_operation_result(operation=result)
return {
"operation_id": operation_id,
"result": result,
"storage": stored,
}
# Local execution
if __name__ == "__main__":
import json
input_data = {"user_id": "usr_123", "action": "create_account"}
result = idempotent_flow.run_local(input_data=input_data)
print(result)
Key Concepts:
- Deterministic operation keys
- Duplicate detection
- Safe re-execution
51. Resource-Intensive Flow
Configure flow for resource-intensive operations.
from dagy import flow, task
@task(
concurrency_limit=2, # Limit parallel execution
timeout_seconds=300,
)
def heavy_computation(data_size: int) -> dict:
"""CPU-intensive task."""
# Simulated heavy computation
result_sum = sum(range(data_size))
return {
"data_size": data_size,
"result": result_sum,
"computation_complete": True,
}
@task
def aggregate_results(results: list) -> dict:
"""Aggregate computation results."""
return {
"total_computations": len(results),
"combined_result": sum(r["result"] for r in results),
}
@flow
def resource_intensive_flow(sizes: list = None) -> dict:
"""Execute resource-intensive operations with limits."""
if sizes is None:
sizes = [1000000, 2000000, 1500000]
# Execute with concurrency limit
results = [heavy_computation(data_size=size) for size in sizes]
aggregated = aggregate_results(results=results)
return aggregated
# Local execution with limited workers
if __name__ == "__main__":
result = resource_intensive_flow.run_local(max_workers=2)
print(f"Total: {result['combined_result']}")
Key Concepts:
concurrency_limitfor resource control- Timeout for long operations
- Worker limiting
52. Long-Running Flow
Design flows for operations that take hours.
from dagy import flow, task
import time
from datetime import datetime, timedelta
@task
def start_long_operation(operation_id: str) -> dict:
"""Initiate long-running operation."""
return {
"operation_id": operation_id,
"started_at": datetime.now().isoformat(),
"estimated_duration_hours": 2,
}
@task
def poll_operation_status(operation_id: str, max_polls: int = 120) -> dict:
"""Poll operation status with retries."""
# Simulated polling
return {
"operation_id": operation_id,
"status": "completed",
"completed_at": datetime.now().isoformat(),
"total_polls": 5,
}
@task
def collect_long_operation_results(operation: dict) -> dict:
"""Collect results from completed operation."""
return {
"operation_id": operation["operation_id"],
"result_count": 5000,
"success_rate": 0.99,
"errors": 50,
}
@flow
def long_running_flow(operation_id: str) -> dict:
"""Execute long-running asynchronous operation."""
start = start_long_operation(operation_id=operation_id)
status = poll_operation_status(operation_id=operation_id)
results = collect_long_operation_results(operation=status)
duration = (
datetime.fromisoformat(status["completed_at"]) -
datetime.fromisoformat(start["started_at"])
)
return {
"operation_id": operation_id,
"duration_seconds": duration.total_seconds(),
"results": results,
}
# Local execution
if __name__ == "__main__":
result = long_running_flow.run_local(operation_id="op_20240101_001")
print(f"Duration: {result['duration_seconds']:.1f}s")
print(f"Results: {result['results']['result_count']}")
Key Concepts:
- Polling pattern for async operations
- Status tracking
- Long-duration timeout configuration
Real-World Scenarios
53. E-Commerce Order Pipeline
Complete order processing from order to fulfillment.
from dagy import flow, task
from typing import Dict
@task
def create_order(user_id: str, items: list, total: float) -> dict:
"""Create new order in system."""
return {
"order_id": f"ord_{user_id}_{__import__('datetime').datetime.now().isoformat().replace(':', '').replace('-', '')}",
"user_id": user_id,
"items": items,
"total": total,
"status": "created",
}
@task
def process_payment(order: dict, payment_method: str) -> dict:
"""Process payment for order."""
return {
"order_id": order["order_id"],
"amount": order["total"],
"payment_method": payment_method,
"transaction_id": f"txn_{order['order_id']}",
"status": "completed",
}
@task
def update_inventory(order: dict) -> dict:
"""Update inventory for ordered items."""
return {
"order_id": order["order_id"],
"items_reserved": len(order["items"]),
"status": "reserved",
}
@task
def create_fulfillment(order: dict, inventory_update: dict) -> dict:
"""Create fulfillment for order."""
return {
"order_id": order["order_id"],
"fulfillment_id": f"ful_{order['order_id']}",
"status": "pending",
"shipping_address": "123 Main St",
}
@task
def send_order_confirmation(order: dict, payment: dict) -> str:
"""Send confirmation email to customer."""
return f"Confirmation email sent to user {order['user_id']}"
@flow
def ecommerce_order_flow(
user_id: str,
items: list,
total: float,
payment_method: str = "credit_card",
) -> dict:
"""Complete order pipeline: create → pay → inventory → fulfill → notify."""
order = create_order(user_id=user_id, items=items, total=total)
payment = process_payment(order=order, payment_method=payment_method)
inventory = update_inventory(order=order)
fulfillment = create_fulfillment(order=order, inventory_update=inventory)
confirmation = send_order_confirmation(order=order, payment=payment)
return {
"order": order,
"payment": payment,
"fulfillment": fulfillment,
"confirmation": confirmation,
}
# Local execution
if __name__ == "__main__":
items = [
{"sku": "WIDGET-001", "qty": 2, "price": 29.99},
{"sku": "GADGET-001", "qty": 1, "price": 49.99},
]
total = (2 * 29.99) + (1 * 49.99)
result = ecommerce_order_flow.run_local(
user_id="usr_123",
items=items,
total=total,
payment_method="credit_card"
)
print(f"Order: {result['order']['order_id']}")
print(f"Status: Payment {result['payment']['status']}")
Key Concepts:
- Multi-stage order processing
- Payment orchestration
- Cross-system coordination
54. CI/CD Pipeline
Build, test, deploy, verify automation.
from dagy import flow, task
@task
def checkout_code(repo: str, branch: str) -> dict:
"""Check out source code from repository."""
return {
"repo": repo,
"branch": branch,
"commit": "abc123def456",
"status": "checked_out",
}
@task
def build_application(code: dict) -> dict:
"""Build application artifacts."""
return {
"build_id": f"build_{code['commit'][:7]}",
"artifact": "app-1.0.0.jar",
"status": "success",
"duration_seconds": 45,
}
@task
def run_tests(build: dict) -> dict:
"""Run automated tests."""
return {
"build_id": build["build_id"],
"tests_run": 250,
"tests_passed": 248,
"tests_failed": 2,
"coverage": 0.87,
}
@task(
retries=2,
timeout_seconds=300,
)
def deploy_to_staging(build: dict) -> dict:
"""Deploy to staging environment."""
return {
"build_id": build["build_id"],
"environment": "staging",
"url": "https://staging-app.example.com",
"status": "deployed",
}
@task
def run_smoke_tests(deployment: dict) -> bool:
"""Run smoke tests on deployed app."""
return True # All tests passed
@task
def deploy_to_production(build: dict) -> dict:
"""Deploy to production if all checks pass."""
return {
"build_id": build["build_id"],
"environment": "production",
"url": "https://app.example.com",
"status": "deployed",
"deployment_time": __import__("datetime").datetime.now().isoformat(),
}
@flow
def cicd_pipeline_flow(repo: str, branch: str) -> dict:
"""Complete CI/CD: checkout → build → test → stage → verify → prod."""
code = checkout_code(repo=repo, branch=branch)
build = build_application(code=code)
tests = run_tests(build=build)
if tests["tests_failed"] > 0:
return {
"status": "failed",
"reason": f"{tests['tests_failed']} test failures",
"build_id": build["build_id"],
}
staging = deploy_to_staging(build=build)
smoke_ok = run_smoke_tests(deployment=staging)
if smoke_ok:
prod = deploy_to_production(build=build)
return {
"status": "success",
"build_id": build["build_id"],
"staging": staging,
"production": prod,
}
return {"status": "smoke_tests_failed"}
# Local execution
if __name__ == "__main__":
result = cicd_pipeline_flow.run_local(
repo="https://github.com/example/app.git",
branch="main"
)
print(f"Status: {result['status']}")
Key Concepts:
- Sequential build stages
- Test-driven progression
- Environment-based deployment
55. Customer Onboarding
Automated customer setup workflow.
from dagy import flow, task
from datetime import datetime
@task
def validate_signup(email: str, name: str) -> dict:
"""Validate customer signup data."""
return {
"email": email,
"name": name,
"valid": "@" in email and len(name) > 2,
}
@task
def create_customer_account(validation: dict) -> dict:
"""Create account in system."""
customer_id = f"cust_{int(datetime.now().timestamp())}"
return {
"customer_id": customer_id,
"email": validation["email"],
"name": validation["name"],
"created_at": datetime.now().isoformat(),
"status": "active",
}
@task
def configure_account_settings(customer: dict) -> dict:
"""Configure default account settings."""
return {
"customer_id": customer["customer_id"],
"settings": {
"plan": "free",
"notifications": True,
"api_enabled": True,
},
"status": "configured",
}
@task
def provision_resources(customer: dict, settings: dict) -> dict:
"""Provision API keys, storage, etc."""
return {
"customer_id": customer["customer_id"],
"api_key": f"sk_{customer['customer_id']}",
"storage_gb": 1,
"status": "provisioned",
}
@task
def send_welcome_email(customer: dict, resources: dict) -> str:
"""Send welcome email to new customer."""
return f"Welcome email sent to {customer['email']}"
@task
def schedule_onboarding_call(customer: dict) -> dict:
"""Schedule initial onboarding call."""
return {
"customer_id": customer["customer_id"],
"call_scheduled": True,
"date": "2024-01-10T10:00:00Z",
"duration_minutes": 30,
}
@flow
def customer_onboarding_flow(
email: str,
name: str,
) -> dict:
"""Complete customer onboarding workflow."""
validation = validate_signup(email=email, name=name)
if not validation["valid"]:
return {"status": "validation_failed", "reason": "Invalid input"}
customer = create_customer_account(validation=validation)
settings = configure_account_settings(customer=customer)
resources = provision_resources(customer=customer, settings=settings)
welcome = send_welcome_email(customer=customer, resources=resources)
call = schedule_onboarding_call(customer=customer)
return {
"customer": customer,
"resources": resources,
"welcome_email": welcome,
"onboarding_call": call,
"status": "onboarded",
}
# Local execution
if __name__ == "__main__":
result = customer_onboarding_flow.run_local(
email="newcustomer@example.com",
name="Jane Doe"
)
print(f"Customer ID: {result['customer']['customer_id']}")
print(f"Status: {result['status']}")
Key Concepts:
- Multi-stage customer activation
- Resource provisioning
- Automated communications
56. Financial Reconciliation
Match transactions and report discrepancies.
from dagy import flow, task
from typing import List
@task
def fetch_bank_transactions(account: str, date_range: str) -> List[dict]:
"""Fetch transactions from bank."""
return [
{"id": "bank_001", "amount": 100.00, "date": "2024-01-01", "description": "Payment"},
{"id": "bank_002", "amount": 250.50, "date": "2024-01-02", "description": "Deposit"},
]
@task
def fetch_internal_transactions(account: str, date_range: str) -> List[dict]:
"""Fetch transactions from internal ledger."""
return [
{"id": "int_001", "amount": 100.00, "date": "2024-01-01", "description": "Payment"},
{"id": "int_002", "amount": 250.50, "date": "2024-01-02", "description": "Deposit"},
{"id": "int_003", "amount": 50.00, "date": "2024-01-02", "description": "Fee"},
]
@task
def match_transactions(bank: List[dict], internal: List[dict]) -> dict:
"""Match bank to internal transactions."""
matched = []
unmatched_bank = bank.copy()
unmatched_internal = internal.copy()
for ext in bank:
for i, int_txn in enumerate(unmatched_internal):
if ext["amount"] == int_txn["amount"]:
matched.append({"bank": ext, "internal": int_txn})
unmatched_internal.pop(i)
unmatched_bank.remove(ext)
break
return {
"matched": len(matched),
"unmatched_bank": unmatched_bank,
"unmatched_internal": unmatched_internal,
}
@task
def generate_reconciliation_report(match_result: dict) -> dict:
"""Generate reconciliation report."""
return {
"matched_count": match_result["matched"],
"discrepancies": len(match_result["unmatched_bank"]) + len(match_result["unmatched_internal"]),
"reconciliation_status": "balanced" if len(match_result["unmatched_bank"]) == 0 else "discrepancies_found",
"unmatched_transactions": {
"from_bank": match_result["unmatched_bank"],
"from_internal": match_result["unmatched_internal"],
}
}
@flow
def reconciliation_flow(account: str, date_range: str) -> dict:
"""Reconcile bank and internal transactions."""
bank_txns = fetch_bank_transactions(account=account, date_range=date_range)
internal_txns = fetch_internal_transactions(account=account, date_range=date_range)
matches = match_transactions(bank=bank_txns, internal=internal_txns)
report = generate_reconciliation_report(match_result=matches)
return report
# Local execution
if __name__ == "__main__":
result = reconciliation_flow.run_local(
account="ACC123",
date_range="2024-01-01:2024-01-31"
)
print(f"Status: {result['reconciliation_status']}")
print(f"Discrepancies: {result['discrepancies']}")
Key Concepts:
- Transaction matching algorithm
- Discrepancy detection
- Financial reporting
57. Content Publishing Pipeline
Draft → review → transform → publish workflow.
from dagy import flow, task
from datetime import datetime
@task
def fetch_draft_content(content_id: str) -> dict:
"""Fetch draft content."""
return {
"content_id": content_id,
"title": "Sample Article",
"body": "This is the article content...",
"status": "draft",
"created_by": "author@example.com",
}
@task
def perform_content_review(content: dict) -> dict:
"""Review content for quality."""
issues = []
if len(content["title"]) < 5:
issues.append("Title too short")
if len(content["body"]) < 100:
issues.append("Content too short")
return {
"content_id": content["content_id"],
"approved": len(issues) == 0,
"issues": issues,
}
@task
def transform_for_publication(content: dict) -> dict:
"""Transform content to publication format."""
return {
"content_id": content["content_id"],
"title": content["title"],
"body": content["body"],
"slug": content["title"].lower().replace(" ", "-"),
"html": f"<article><h1>{content['title']}</h1><p>{content['body']}</p></article>",
"publication_ready": True,
}
@task
def publish_to_website(transformed: dict) -> dict:
"""Publish to website."""
return {
"content_id": transformed["content_id"],
"url": f"https://blog.example.com/{transformed['slug']}",
"published_at": datetime.now().isoformat(),
"status": "published",
}
@flow
def publishing_pipeline_flow(content_id: str) -> dict:
"""Content publishing: draft → review → transform → publish."""
content = fetch_draft_content(content_id=content_id)
review = perform_content_review(content=content)
if not review["approved"]:
return {
"status": "rejected",
"content_id": content_id,
"issues": review["issues"],
}
transformed = transform_for_publication(content=content)
published = publish_to_website(transformed=transformed)
return {
"status": "published",
"content_id": content_id,
"url": published["url"],
}
# Local execution
if __name__ == "__main__":
result = publishing_pipeline_flow.run_local(content_id="content_001")
print(f"Status: {result['status']}")
Key Concepts:
- Multi-stage approval workflow
- Content transformation
- Publication orchestration
Best Practices
General Guidelines
- Task Design: Keep tasks focused and single-responsibility
- Error Handling: Use retry strategies and timeout configurations
- Data Flow: Leverage automatic parameter passing between tasks
- Testing: Always test flows locally before deployment with
run_local() - Monitoring: Implement alerts and logging for production flows
- Documentation: Add docstrings to all tasks and flows
- Configuration: Use parameters for environment-specific behavior
- Performance: Utilize
max_workersfor parallel task execution - Resilience: Implement checkpoints for long-running operations
- Idempotency: Design tasks to be safely re-executable
Deployment
from dagy import build_artifact, deploy_artifact
# Build artifact for deployment
artifact = build_artifact(your_flow_instance)
# Deploy to Dagy platform
client = DagyClient(api_key="your-key", api_url="https://api.dagy.io")
deployment = deploy_artifact(artifact, client)
print(f"Deployed flow ID: {deployment['flow_id']}")
Local Testing Pattern
# Test with different parameters
test_cases = [
{"param1": "value1", "param2": "value2"},
{"param1": "value3", "param2": "value4"},
]
for test_case in test_cases:
result = your_flow.run_local(**test_case, fail_fast=True)
assert result["status"] == "success"
print(f"Test passed: {test_case}")
Additional Resources
- Dagy Official Documentation
- Python Decorators Guide
- Task Configuration Reference
- Flow Configuration Reference
- Deployment Guide
Last Updated: January 2024 SDK Version: 1.0+ Cookbook Version: 2.0