Back to docs
Python SDK

Dagy SDK Cookbook

A comprehensive guide with 50+ practical examples for building workflows with the Dagy Python SDK. Each example is complete, runnable, and demonstrates real-world patterns.

Table of Contents

  1. Getting Started
  2. Data Engineering
  3. API & Web
  4. ML & AI
  5. File Processing
  6. Retry & Error Handling
  7. Scheduling
  8. Notifications & Monitoring
  9. Advanced Patterns
  10. Real-World Scenarios

Getting Started

1. Hello World - Simplest Flow

The most basic Dagy flow with a single task that returns a greeting.

from dagy import flow, task

@task
def greet(name: str) -> str:
    """Simple greeting task."""
    return f"Hello, {name}!"

@flow
def hello_world_flow(name: str = "World") -> str:
    """Simplest Dagy flow - single task."""
    return greet(name=name)

# Local execution
if __name__ == "__main__":
    result = hello_world_flow.run_local(name="Alice")
    print(result)  # Output: Hello, Alice!

Key Concepts:

  • @task decorator wraps functions as executable tasks
  • @flow decorator creates a workflow orchestrating tasks
  • run_local() executes the flow locally for testing
  • Return values pass between tasks and flows

2. Two-Task Chain - Passing Data Between Tasks

Chain two tasks where the first task's output becomes the second task's input.

from dagy import flow, task

@task
def fetch_user(user_id: int) -> dict:
    """Fetch user data from simulated database."""
    users = {
        1: {"id": 1, "name": "Alice", "email": "alice@example.com"},
        2: {"id": 2, "name": "Bob", "email": "bob@example.com"},
    }
    return users.get(user_id, {})

@task
def format_user_info(user: dict) -> str:
    """Format user data as a readable string."""
    if not user:
        return "User not found"
    return f"User: {user['name']} ({user['email']})"

@flow
def user_info_flow(user_id: int) -> str:
    """Chain two tasks: fetch user, then format info."""
    user_data = fetch_user(user_id=user_id)
    formatted = format_user_info(user=user_data)
    return formatted

# Local execution
if __name__ == "__main__":
    result = user_info_flow.run_local(user_id=1)
    print(result)  # Output: User: Alice (alice@example.com)

Key Concepts:

  • Task outputs automatically become inputs to subsequent tasks
  • Data flows naturally through the pipeline
  • Each task is independent and reusable

3. Multi-Step Pipeline - 3+ Tasks with Data Flow

Build a more complex pipeline with multiple sequential tasks.

from dagy import flow, task
import json
from datetime import datetime

@task
def extract_data(source: str) -> list:
    """Extract raw data from source."""
    # Simulated data extraction
    return [
        {"id": 1, "value": 100, "timestamp": "2024-01-01"},
        {"id": 2, "value": 200, "timestamp": "2024-01-02"},
        {"id": 3, "value": 150, "timestamp": "2024-01-03"},
    ]

@task
def transform_data(raw_data: list) -> list:
    """Transform and clean data."""
    transformed = []
    for record in raw_data:
        transformed.append({
            "id": record["id"],
            "value": float(record["value"]),
            "date": record["timestamp"],
            "processed_at": datetime.now().isoformat(),
        })
    return transformed

@task
def validate_data(data: list) -> dict:
    """Validate transformed data and return stats."""
    valid_count = len(data)
    total_value = sum(record["value"] for record in data)
    return {
        "valid_records": valid_count,
        "total_value": total_value,
        "average": total_value / valid_count if valid_count > 0 else 0,
    }

@task
def store_results(stats: dict) -> str:
    """Store validation results."""
    result_json = json.dumps(stats, indent=2)
    # In production: store to database or file
    return result_json

@flow
def data_pipeline(source: str = "database") -> str:
    """Multi-step ETL pipeline."""
    raw = extract_data(source=source)
    transformed = transform_data(raw_data=raw)
    stats = validate_data(data=transformed)
    results = store_results(stats=stats)
    return results

# Local execution
if __name__ == "__main__":
    result = data_pipeline.run_local(source="production_db")
    print(result)

Key Concepts:

  • Multi-step sequential processing
  • Data transformation at each stage
  • Aggregation and validation steps

4. Parameterized Flow - Accepting Runtime Parameters

Create flexible flows that accept various parameters at runtime.

from dagy import flow, task
from enum import Enum

class DataFormat(str, Enum):
    """Supported data formats."""
    JSON = "json"
    CSV = "csv"
    XML = "xml"

@task
def load_data(source: str) -> list:
    """Load data from source."""
    return [
        {"id": 1, "name": "Item A", "price": 29.99},
        {"id": 2, "name": "Item B", "price": 49.99},
        {"id": 3, "name": "Item C", "price": 19.99},
    ]

@task
def format_output(data: list, format: DataFormat, include_total: bool = True) -> str:
    """Format data according to specified format."""
    if format == DataFormat.JSON:
        import json
        output = {"items": data}
        if include_total:
            output["total"] = sum(item["price"] for item in data)
        return json.dumps(output)

    elif format == DataFormat.CSV:
        lines = ["id,name,price"]
        for item in data:
            lines.append(f"{item['id']},{item['name']},{item['price']}")
        if include_total:
            total = sum(item["price"] for item in data)
            lines.append(f"TOTAL,{total}")
        return "\n".join(lines)

    return str(data)

@flow
def parameterized_flow(
    source: str = "default",
    output_format: DataFormat = DataFormat.JSON,
    include_total: bool = True,
) -> str:
    """Flow accepting multiple runtime parameters."""
    data = load_data(source=source)
    formatted = format_output(
        data=data,
        format=output_format,
        include_total=include_total
    )
    return formatted

# Local execution with different parameters
if __name__ == "__main__":
    # JSON format with totals
    result1 = parameterized_flow.run_local(
        source="api",
        output_format=DataFormat.JSON,
        include_total=True
    )
    print("JSON Output:")
    print(result1)

    # CSV format without totals
    result2 = parameterized_flow.run_local(
        source="database",
        output_format=DataFormat.CSV,
        include_total=False
    )
    print("\nCSV Output:")
    print(result2)

Key Concepts:

  • Type-annotated parameters for runtime configuration
  • Enum types for valid option constraints
  • Flexible flows supporting multiple use cases

5. Local Development - Using run_local() for Testing

Best practices for testing flows locally before deployment.

from dagy import flow, task
import logging

# Configure logging for debugging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@task
def step_one(value: int) -> int:
    """First processing step."""
    logger.info(f"Step 1: Processing value {value}")
    result = value * 2
    logger.info(f"Step 1: Result {result}")
    return result

@task
def step_two(value: int) -> int:
    """Second processing step."""
    logger.info(f"Step 2: Processing value {value}")
    result = value + 10
    logger.info(f"Step 2: Result {result}")
    return result

@task
def step_three(value: int) -> dict:
    """Final processing step."""
    logger.info(f"Step 3: Processing value {value}")
    result = {
        "final_value": value,
        "is_even": value % 2 == 0,
        "square": value ** 2,
    }
    logger.info(f"Step 3: Result {result}")
    return result

@flow
def local_dev_flow(initial_value: int) -> dict:
    """Flow designed for local testing and debugging."""
    val1 = step_one(value=initial_value)
    val2 = step_two(value=val1)
    result = step_three(value=val2)
    return result

# Local testing with different parameters
if __name__ == "__main__":
    print("=== Test 1: Basic execution ===")
    result1 = local_dev_flow.run_local(initial_value=5)
    print(f"Result: {result1}\n")

    print("=== Test 2: Different input ===")
    result2 = local_dev_flow.run_local(initial_value=10)
    print(f"Result: {result2}\n")

    print("=== Test 3: Edge case - zero ===")
    result3 = local_dev_flow.run_local(initial_value=0)
    print(f"Result: {result3}\n")

    # Test with fail_fast to stop on first error
    print("=== Test 4: With fail_fast ===")
    result4 = local_dev_flow.run_local(initial_value=3, fail_fast=True)
    print(f"Result: {result4}\n")

    # Test with max_workers for parallel execution
    print("=== Test 5: With max_workers ===")
    result5 = local_dev_flow.run_local(initial_value=7, max_workers=2)
    print(f"Result: {result5}")

Key Concepts:

  • fail_fast=True stops pipeline on first task failure
  • max_workers controls parallelism in local testing
  • Logging for debugging task execution flow
  • Testing multiple scenarios before deployment

Data Engineering

6. CSV to JSON Converter

Transform CSV data to JSON format with schema mapping.

from dagy import flow, task
import csv
import json
import io

@task
def read_csv(csv_content: str) -> list:
    """Parse CSV content into list of dicts."""
    reader = csv.DictReader(io.StringIO(csv_content))
    return list(reader)

@task
def transform_to_json(records: list, pretty: bool = True) -> str:
    """Convert records to JSON format."""
    data = {
        "count": len(records),
        "records": records,
        "metadata": {
            "source": "csv_converter",
            "record_count": len(records),
        }
    }
    if pretty:
        return json.dumps(data, indent=2)
    return json.dumps(data)

@task
def validate_json(json_str: str) -> dict:
    """Validate and return JSON structure info."""
    data = json.loads(json_str)
    return {
        "is_valid": True,
        "record_count": data.get("count", 0),
        "has_metadata": "metadata" in data,
    }

@flow
def csv_to_json_flow(csv_content: str, pretty: bool = True) -> dict:
    """Convert CSV to JSON with validation."""
    records = read_csv(csv_content=csv_content)
    json_output = transform_to_json(records=records, pretty=pretty)
    validation = validate_json(json_str=json_output)
    return {
        "json": json_output,
        "validation": validation,
    }

# Local execution
if __name__ == "__main__":
    sample_csv = """name,email,age
Alice,alice@example.com,28
Bob,bob@example.com,35
Charlie,charlie@example.com,32"""

    result = csv_to_json_flow.run_local(csv_content=sample_csv, pretty=True)
    print(result["validation"])
    print(result["json"])

Key Concepts:

  • CSV parsing with DictReader
  • Schema-aware transformation
  • Output validation

7. S3 File Processor

Download files from S3, process them, and upload results.

from dagy import flow, task
import json

@task
def download_from_s3(bucket: str, key: str) -> str:
    """Download file from S3 bucket."""
    # Simulated S3 download
    print(f"Downloading s3://{bucket}/{key}")
    # In production: use boto3
    return f"Content of {key}"

@task
def process_file(content: str) -> dict:
    """Process downloaded file content."""
    lines = content.split("\n")
    return {
        "line_count": len(lines),
        "content_length": len(content),
        "processed": True,
    }

@task
def upload_to_s3(bucket: str, key: str, data: dict) -> str:
    """Upload processed results to S3."""
    json_content = json.dumps(data)
    output_key = f"processed/{key}.json"
    print(f"Uploading to s3://{bucket}/{output_key}")
    # In production: use boto3
    return output_key

@flow
def s3_processor_flow(
    input_bucket: str,
    input_key: str,
    output_bucket: str,
) -> str:
    """Download, process, and upload S3 files."""
    content = download_from_s3(bucket=input_bucket, key=input_key)
    processed = process_file(content=content)
    output_key = upload_to_s3(
        bucket=output_bucket,
        key=input_key,
        data=processed
    )
    return output_key

# Local execution
if __name__ == "__main__":
    result = s3_processor_flow.run_local(
        input_bucket="raw-data",
        input_key="file.txt",
        output_bucket="processed-data"
    )
    print(f"Output location: s3://{result}")

Key Concepts:

  • Cloud storage integration patterns
  • Content transformation
  • File output tracking

8. Database ETL Pipeline

Extract from database, transform, and load to target system.

from dagy import flow, task
from typing import List
from datetime import datetime

@task
def extract_from_db(query: str, limit: int = 1000) -> List[dict]:
    """Extract data from source database."""
    # Simulated database query
    print(f"Executing query with limit {limit}")
    return [
        {"id": i, "value": i * 10, "created": "2024-01-01"}
        for i in range(1, min(101, limit + 1))
    ]

@task
def transform_records(records: List[dict]) -> List[dict]:
    """Apply transformations to extracted records."""
    transformed = []
    for record in records:
        transformed.append({
            **record,
            "value_usd": record["value"] * 1.1,
            "processed_at": datetime.now().isoformat(),
            "status": "processed",
        })
    return transformed

@task
def load_to_warehouse(records: List[dict], table: str) -> dict:
    """Load transformed data to data warehouse."""
    # Simulated warehouse load
    print(f"Loading {len(records)} records to {table}")
    return {
        "table": table,
        "rows_loaded": len(records),
        "timestamp": datetime.now().isoformat(),
        "status": "success",
    }

@flow
def etl_pipeline(
    source_query: str,
    target_table: str,
    record_limit: int = 1000,
) -> dict:
    """Complete ETL pipeline: Extract → Transform → Load."""
    records = extract_from_db(query=source_query, limit=record_limit)
    transformed = transform_records(records=records)
    result = load_to_warehouse(records=transformed, table=target_table)
    return result

# Local execution
if __name__ == "__main__":
    result = etl_pipeline.run_local(
        source_query="SELECT * FROM raw_data",
        target_table="warehouse.processed_data",
        record_limit=100
    )
    print(result)

Key Concepts:

  • Extract phase with query execution
  • Transform phase with record mapping
  • Load phase with batch insertion

9. Data Validation Pipeline

Validate data structure and report on quality issues.

from dagy import flow, task
from typing import List

@task
def load_data_batch(batch_id: str) -> List[dict]:
    """Load batch of data for validation."""
    # Simulated data load
    return [
        {"id": 1, "email": "alice@example.com", "age": 28},
        {"id": 2, "email": "bob@example.com", "age": 35},
        {"id": 3, "email": None, "age": 30},  # Invalid: missing email
        {"id": 4, "email": "invalid-email", "age": -5},  # Invalid: bad email and age
    ]

@task
def validate_schema(records: List[dict]) -> tuple:
    """Validate record schema and types."""
    required_fields = {"id", "email", "age"}
    errors = []
    valid_records = []

    for i, record in enumerate(records):
        record_errors = []

        # Check required fields
        if not all(field in record for field in required_fields):
            record_errors.append(f"Missing required field(s)")

        # Validate email
        if record.get("email") and "@" not in str(record.get("email")):
            record_errors.append(f"Invalid email format")

        # Validate age
        age = record.get("age")
        if age is not None and (not isinstance(age, (int, float)) or age < 0 or age > 150):
            record_errors.append(f"Invalid age value")

        if record_errors:
            errors.append({"row": i, "record": record, "errors": record_errors})
        else:
            valid_records.append(record)

    return valid_records, errors

@task
def generate_report(valid_records: List[dict], errors: List[dict]) -> dict:
    """Generate validation report."""
    total = valid_records.__len__() + errors.__len__()
    return {
        "total_records": total,
        "valid_records": len(valid_records),
        "invalid_records": len(errors),
        "validity_percentage": (len(valid_records) / total * 100) if total > 0 else 0,
        "errors": errors,
        "valid_sample": valid_records[:5] if valid_records else [],
    }

@flow
def validation_pipeline(batch_id: str) -> dict:
    """Validate data quality and generate report."""
    records = load_data_batch(batch_id=batch_id)
    valid, errors = validate_schema(records=records)
    report = generate_report(valid_records=valid, errors=errors)
    return report

# Local execution
if __name__ == "__main__":
    result = validation_pipeline.run_local(batch_id="batch_001")
    print(f"Valid records: {result['valid_records']}")
    print(f"Invalid records: {result['invalid_records']}")
    print(f"Validity: {result['validity_percentage']:.1f}%")
    if result['errors']:
        print(f"First error: {result['errors'][0]}")

Key Concepts:

  • Schema validation
  • Field-level validation rules
  • Error collection and reporting
  • Quality metrics computation

10. Incremental Data Load

Process only new or updated records since last run.

from dagy import flow, task
from datetime import datetime, timedelta

@task
def get_last_checkpoint(checkpoint_file: str) -> datetime:
    """Get timestamp of last successful run."""
    # Simulated checkpoint retrieval
    # In production: read from database or file
    return datetime.now() - timedelta(days=1)

@task
def fetch_updated_records(since: datetime) -> list:
    """Fetch only records updated since checkpoint."""
    # Simulated query filtering by timestamp
    now = datetime.now()
    return [
        {"id": i, "updated": now - timedelta(hours=h)}
        for i, h in enumerate([0, 2, 4, 6, 24, 48])
    ]

@task
def process_incremental(records: list) -> dict:
    """Process the incremental batch."""
    return {
        "batch_size": len(records),
        "processed_ids": [r["id"] for r in records],
        "timestamp": datetime.now().isoformat(),
    }

@task
def update_checkpoint(checkpoint_file: str, timestamp: datetime) -> str:
    """Update checkpoint with current timestamp."""
    # In production: write to database or file
    return f"Checkpoint updated to {timestamp}"

@flow
def incremental_load_flow(checkpoint_file: str) -> dict:
    """Load and process only new/updated records."""
    last_run = get_last_checkpoint(checkpoint_file=checkpoint_file)
    new_records = fetch_updated_records(since=last_run)
    result = process_incremental(records=new_records)
    checkpoint_msg = update_checkpoint(
        checkpoint_file=checkpoint_file,
        timestamp=datetime.now()
    )
    return {"result": result, "checkpoint": checkpoint_msg}

# Local execution
if __name__ == "__main__":
    result = incremental_load_flow.run_local(
        checkpoint_file="/var/checkpoints/last_run.txt"
    )
    print(result)

Key Concepts:

  • Checkpoint/watermark pattern
  • Timestamp-based filtering
  • Incremental processing efficiency

11. Data Deduplication

Detect and remove duplicate records from dataset.

from dagy import flow, task
from typing import List

@task
def load_raw_data() -> List[dict]:
    """Load data that may contain duplicates."""
    return [
        {"id": 1, "email": "alice@example.com", "name": "Alice"},
        {"id": 2, "email": "bob@example.com", "name": "Bob"},
        {"id": 1, "email": "alice@example.com", "name": "Alice"},  # Duplicate
        {"id": 3, "email": "charlie@example.com", "name": "Charlie"},
        {"id": 2, "email": "bob@example.com", "name": "Bob"},  # Duplicate
        {"id": 4, "email": "dave@example.com", "name": "Dave"},
    ]

@task
def deduplicate_exact(records: List[dict]) -> tuple:
    """Remove exact duplicate records."""
    seen = set()
    unique = []
    duplicates = []

    for record in records:
        # Create hashable key from record
        key = tuple(sorted((k, v) for k, v in record.items() if k != "id"))

        if key in seen:
            duplicates.append(record)
        else:
            seen.add(key)
            unique.append(record)

    return unique, duplicates

@task
def generate_dedup_report(unique: List[dict], duplicates: List[dict]) -> dict:
    """Report on deduplication results."""
    return {
        "original_count": len(unique) + len(duplicates),
        "unique_count": len(unique),
        "duplicate_count": len(duplicates),
        "dedup_percentage": (len(duplicates) / (len(unique) + len(duplicates)) * 100)
            if (len(unique) + len(duplicates)) > 0 else 0,
    }

@flow
def deduplication_flow() -> dict:
    """Identify and remove duplicate records."""
    data = load_raw_data()
    unique, duplicates = deduplicate_exact(records=data)
    report = generate_dedup_report(unique=unique, duplicates=duplicates)
    return {
        "report": report,
        "unique_records": unique,
    }

# Local execution
if __name__ == "__main__":
    result = deduplication_flow.run_local()
    print(f"Report: {result['report']}")
    print(f"Unique records: {len(result['unique_records'])}")

Key Concepts:

  • Set-based duplicate detection
  • Hashable key generation
  • Deduplication reporting

12. Aggregation Pipeline

Group data and compute aggregate statistics.

from dagy import flow, task
from typing import List
from collections import defaultdict

@task
def fetch_sales_data() -> List[dict]:
    """Fetch raw sales transactions."""
    return [
        {"date": "2024-01-01", "product": "Widget A", "quantity": 5, "price": 10.00},
        {"date": "2024-01-01", "product": "Widget B", "quantity": 3, "price": 15.00},
        {"date": "2024-01-02", "product": "Widget A", "quantity": 8, "price": 10.00},
        {"date": "2024-01-02", "product": "Widget C", "quantity": 2, "price": 25.00},
        {"date": "2024-01-03", "product": "Widget B", "quantity": 6, "price": 15.00},
    ]

@task
def aggregate_by_product(transactions: List[dict]) -> dict:
    """Aggregate sales by product."""
    aggregates = defaultdict(lambda: {"total_qty": 0, "total_revenue": 0, "count": 0})

    for txn in transactions:
        product = txn["product"]
        aggregates[product]["total_qty"] += txn["quantity"]
        aggregates[product]["total_revenue"] += txn["quantity"] * txn["price"]
        aggregates[product]["count"] += 1

    # Convert to regular dict and compute averages
    result = {}
    for product, agg in aggregates.items():
        result[product] = {
            **agg,
            "avg_qty_per_sale": agg["total_qty"] / agg["count"],
            "avg_price_per_unit": agg["total_revenue"] / agg["total_qty"],
        }

    return result

@task
def aggregate_by_date(transactions: List[dict]) -> dict:
    """Aggregate sales by date."""
    by_date = defaultdict(lambda: {"count": 0, "total": 0.0})

    for txn in transactions:
        date = txn["date"]
        revenue = txn["quantity"] * txn["price"]
        by_date[date]["count"] += 1
        by_date[date]["total"] += revenue

    return dict(by_date)

@flow
def aggregation_flow() -> dict:
    """Aggregate transaction data multiple ways."""
    transactions = fetch_sales_data()
    by_product = aggregate_by_product(transactions=transactions)
    by_date = aggregate_by_date(transactions=transactions)

    return {
        "by_product": by_product,
        "by_date": by_date,
        "total_transactions": len(transactions),
    }

# Local execution
if __name__ == "__main__":
    result = aggregation_flow.run_local()
    print("By Product:")
    for product, stats in result["by_product"].items():
        print(f"  {product}: {stats['total_qty']} units, ${stats['total_revenue']:.2f}")
    print("\nBy Date:")
    for date, stats in result["by_date"].items():
        print(f"  {date}: {stats['count']} sales, ${stats['total']:.2f}")

Key Concepts:

  • GroupBy aggregation pattern
  • Multiple aggregation dimensions
  • Computed metrics (averages, totals)

13. Multi-Source Data Merge

Combine data from multiple sources with deduplication and conflict resolution.

from dagy import flow, task
from typing import List

@task
def fetch_source_a() -> List[dict]:
    """Fetch data from first source."""
    return [
        {"id": 1, "name": "Alice", "email": "alice@example.com", "source": "A"},
        {"id": 2, "name": "Bob", "email": "bob@example.com", "source": "A"},
    ]

@task
def fetch_source_b() -> List[dict]:
    """Fetch data from second source."""
    return [
        {"id": 2, "name": "Robert", "email": "bob@example.com", "source": "B"},
        {"id": 3, "name": "Charlie", "email": "charlie@example.com", "source": "B"},
    ]

@task
def merge_sources(records_a: List[dict], records_b: List[dict]) -> dict:
    """Merge records from multiple sources."""
    merged = {}
    conflicts = []

    # Process source A
    for record in records_a:
        merged[record["id"]] = record.copy()

    # Process source B
    for record in records_b:
        record_id = record["id"]
        if record_id in merged:
            existing = merged[record_id]
            # Detect conflicts in name
            if existing.get("name") != record.get("name"):
                conflicts.append({
                    "id": record_id,
                    "field": "name",
                    "source_a": existing.get("name"),
                    "source_b": record.get("name"),
                })
            # Keep longer name (assumed more complete)
            if len(record.get("name", "")) > len(existing.get("name", "")):
                merged[record_id]["name"] = record["name"]
            merged[record_id]["sources"] = ["A", "B"]
        else:
            merged[record_id] = record.copy()
            merged[record_id]["sources"] = ["B"]

    return {
        "merged_records": list(merged.values()),
        "record_count": len(merged),
        "conflicts": conflicts,
    }

@flow
def multi_source_merge_flow() -> dict:
    """Merge data from multiple sources with conflict detection."""
    src_a = fetch_source_a()
    src_b = fetch_source_b()
    result = merge_sources(records_a=src_a, records_b=src_b)
    return result

# Local execution
if __name__ == "__main__":
    result = multi_source_merge_flow.run_local()
    print(f"Total records: {result['record_count']}")
    print(f"Conflicts: {len(result['conflicts'])}")
    for record in result["merged_records"]:
        print(f"  ID {record['id']}: {record['name']} (from {record['sources']})")

Key Concepts:

  • Multi-source data integration
  • Conflict detection and resolution
  • Deduplication during merge

API & Web

14. REST API Poller

Poll REST endpoint repeatedly and store results.

from dagy import flow, task
import time
from datetime import datetime

@task
def poll_api_endpoint(
    url: str,
    poll_count: int = 3,
    interval_seconds: int = 1,
) -> list:
    """Poll API endpoint multiple times."""
    results = []

    for i in range(poll_count):
        # Simulated API call
        timestamp = datetime.now().isoformat()
        data = {
            "poll_number": i + 1,
            "timestamp": timestamp,
            "status": "success",
            "data": {"counter": i + 1, "value": (i + 1) * 100},
        }
        results.append(data)
        print(f"Poll {i + 1}/{poll_count}: {data}")

        if i < poll_count - 1:
            time.sleep(interval_seconds)

    return results

@task
def aggregate_poll_results(poll_results: list) -> dict:
    """Aggregate results from multiple polls."""
    successful = len(poll_results)
    values = [r["data"]["value"] for r in poll_results]

    return {
        "total_polls": len(poll_results),
        "successful_polls": successful,
        "values": values,
        "average_value": sum(values) / len(values) if values else 0,
        "max_value": max(values) if values else None,
        "min_value": min(values) if values else None,
    }

@task
def store_poll_results(results: dict, destination: str) -> str:
    """Store aggregated poll results."""
    # Simulated storage
    print(f"Storing {len(results['values'])} results to {destination}")
    return f"Stored at {destination}"

@flow
def api_poller_flow(
    api_url: str,
    poll_count: int = 5,
    interval_seconds: int = 2,
) -> dict:
    """Poll API endpoint and store aggregated results."""
    poll_data = poll_api_endpoint(
        url=api_url,
        poll_count=poll_count,
        interval_seconds=interval_seconds
    )
    aggregates = aggregate_poll_results(poll_results=poll_data)
    storage_result = store_poll_results(
        results=aggregates,
        destination="/data/polls"
    )

    return {
        "aggregates": aggregates,
        "storage": storage_result,
    }

# Local execution
if __name__ == "__main__":
    result = api_poller_flow.run_local(
        api_url="https://api.example.com/data",
        poll_count=3,
        interval_seconds=1
    )
    print(f"Average: {result['aggregates']['average_value']}")

Key Concepts:

  • Repeated API polling with intervals
  • Result aggregation from multiple calls
  • Time-series data handling

15. Webhook-Triggered Pipeline

Process incoming webhook payloads through a pipeline.

from dagy import flow, task
from typing import Dict
import hashlib
import hmac

@task
def verify_webhook_signature(
    payload: Dict,
    signature: str,
    secret: str,
) -> bool:
    """Verify webhook signature for authenticity."""
    # Compute HMAC signature
    payload_str = str(payload)
    expected_sig = hmac.new(
        secret.encode(),
        payload_str.encode(),
        hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(expected_sig, signature)

@task
def parse_webhook_data(payload: Dict) -> Dict:
    """Parse and validate webhook payload."""
    required_fields = {"event_type", "timestamp", "data"}

    if not all(f in payload for f in required_fields):
        raise ValueError("Missing required webhook fields")

    return {
        "event_type": payload["event_type"],
        "timestamp": payload["timestamp"],
        "data": payload["data"],
        "received_at": __import__("datetime").datetime.now().isoformat(),
    }

@task
def process_webhook_event(parsed_data: Dict) -> dict:
    """Process the parsed webhook event."""
    event_type = parsed_data["event_type"]

    handlers = {
        "user.created": lambda d: {"action": "create_profile", "user_id": d["data"].get("user_id")},
        "user.deleted": lambda d: {"action": "deactivate_profile", "user_id": d["data"].get("user_id")},
        "order.completed": lambda d: {"action": "process_fulfillment", "order_id": d["data"].get("order_id")},
    }

    handler = handlers.get(event_type)
    if not handler:
        return {"action": "unknown", "event": event_type}

    return handler(parsed_data)

@task
def store_webhook_record(processed: dict, webhook_id: str) -> str:
    """Store processed webhook for audit trail."""
    # Simulated storage
    return f"Webhook {webhook_id} processed: {processed['action']}"

@flow
def webhook_flow(
    payload: Dict,
    signature: str,
    secret: str,
    webhook_id: str = "unknown",
) -> dict:
    """Process incoming webhook with signature verification."""
    is_valid = verify_webhook_signature(
        payload=payload,
        signature=signature,
        secret=secret
    )

    if not is_valid:
        raise ValueError("Invalid webhook signature")

    parsed = parse_webhook_data(payload=payload)
    processed = process_webhook_event(parsed_data=parsed)
    audit_log = store_webhook_record(processed=processed, webhook_id=webhook_id)

    return {
        "success": True,
        "processed": processed,
        "audit_log": audit_log,
    }

# Local execution
if __name__ == "__main__":
    payload = {
        "event_type": "user.created",
        "timestamp": "2024-01-01T10:00:00Z",
        "data": {"user_id": "usr_123", "email": "new@example.com"},
    }

    secret = "webhook_secret_key"
    sig_payload = str(payload)
    signature = hmac.new(
        secret.encode(),
        sig_payload.encode(),
        hashlib.sha256
    ).hexdigest()

    result = webhook_flow.run_local(
        payload=payload,
        signature=signature,
        secret=secret,
        webhook_id="wh_001"
    )
    print(result)

Key Concepts:

  • Webhook signature verification
  • Event-type dispatch pattern
  • Audit trail logging

16. Multi-API Orchestration

Call multiple APIs in sequence with data flowing through pipeline.

from dagy import flow, task

@task
def fetch_user_details(user_id: str) -> dict:
    """Fetch user details from API."""
    return {
        "user_id": user_id,
        "name": "Alice Smith",
        "email": "alice@example.com",
        "organization_id": "org_123",
    }

@task
def fetch_organization_info(org_id: str) -> dict:
    """Fetch organization details from API."""
    return {
        "org_id": org_id,
        "name": "Example Corp",
        "subscription_tier": "premium",
        "region": "us-west-2",
    }

@task
def fetch_user_permissions(user_id: str, org_id: str) -> list:
    """Fetch user permissions for organization."""
    return [
        "read:documents",
        "write:documents",
        "admin:organization",
    ]

@task
def aggregate_user_profile(
    user: dict,
    org: dict,
    permissions: list
) -> dict:
    """Aggregate all fetched data into complete profile."""
    return {
        "profile": {
            "user": user,
            "organization": org,
            "permissions": permissions,
            "effective_tier": org["subscription_tier"],
            "is_admin": "admin:organization" in permissions,
        },
        "completed_at": __import__("datetime").datetime.now().isoformat(),
    }

@flow
def multi_api_orchestration_flow(user_id: str) -> dict:
    """Orchestrate calls to multiple APIs with data flow."""
    user = fetch_user_details(user_id=user_id)
    org = fetch_organization_info(org_id=user["organization_id"])
    permissions = fetch_user_permissions(user_id=user_id, org_id=user["organization_id"])
    profile = aggregate_user_profile(user=user, org=org, permissions=permissions)

    return profile

# Local execution
if __name__ == "__main__":
    result = multi_api_orchestration_flow.run_local(user_id="usr_123")
    print(f"User: {result['profile']['user']['name']}")
    print(f"Org: {result['profile']['organization']['name']}")
    print(f"Is Admin: {result['profile']['is_admin']}")

Key Concepts:

  • Sequential API calls with data dependencies
  • Data aggregation from multiple sources
  • Cross-service orchestration pattern

17. API Rate-Limited Fetcher

Respect API rate limits while fetching data.

from dagy import flow, task
import time
from typing import List

@task
def fetch_with_rate_limit(
    endpoint: str,
    items: List[str],
    rate_limit_per_second: float = 2.0,
) -> List[dict]:
    """Fetch items respecting rate limits."""
    min_interval = 1.0 / rate_limit_per_second
    results = []

    for i, item in enumerate(items):
        # Simulated API fetch
        result = {
            "item": item,
            "data": f"Data for {item}",
            "timestamp": __import__("datetime").datetime.now().isoformat(),
        }
        results.append(result)
        print(f"Fetched {i + 1}/{len(items)}: {item}")

        # Rate limiting (except on last item)
        if i < len(items) - 1:
            time.sleep(min_interval)

    return results

@task
def batch_results(results: List[dict], batch_size: int = 5) -> List[List[dict]]:
    """Group results into batches."""
    batches = []
    for i in range(0, len(results), batch_size):
        batches.append(results[i:i + batch_size])
    return batches

@task
def process_batches(batches: List[List[dict]]) -> dict:
    """Process batches with aggregated stats."""
    return {
        "total_items": sum(len(b) for b in batches),
        "batch_count": len(batches),
        "batches_processed": len(batches),
        "avg_batch_size": sum(len(b) for b in batches) / len(batches) if batches else 0,
    }

@flow
def rate_limited_fetch_flow(
    items: List[str],
    rate_limit: float = 2.0,
) -> dict:
    """Fetch data with rate limiting and batch processing."""
    fetched = fetch_with_rate_limit(
        endpoint="/api/data",
        items=items,
        rate_limit_per_second=rate_limit
    )
    batches = batch_results(results=fetched, batch_size=5)
    stats = process_batches(batches=batches)

    return {
        "stats": stats,
        "items_fetched": len(fetched),
    }

# Local execution
if __name__ == "__main__":
    items = [f"item_{i}" for i in range(1, 11)]
    result = rate_limited_fetch_flow.run_local(
        items=items,
        rate_limit=2.0
    )
    print(result)

Key Concepts:

  • Rate limiting with min_interval calculation
  • Request throttling
  • Batch processing for efficiency

18. GraphQL Data Collector

Query GraphQL endpoints and collect nested data.

from dagy import flow, task
from typing import Dict, List

@task
def query_graphql(query: str, variables: Dict = None) -> dict:
    """Execute GraphQL query."""
    # Simulated GraphQL response
    return {
        "data": {
            "users": [
                {
                    "id": "usr_1",
                    "name": "Alice",
                    "posts": [
                        {"id": "post_1", "title": "First Post"},
                        {"id": "post_2", "title": "Second Post"},
                    ]
                },
                {
                    "id": "usr_2",
                    "name": "Bob",
                    "posts": [
                        {"id": "post_3", "title": "Hello World"},
                    ]
                },
            ]
        },
        "errors": None,
    }

@task
def flatten_graphql_response(response: dict) -> tuple:
    """Flatten nested GraphQL response into users and posts."""
    users = []
    posts = []

    for user in response["data"]["users"]:
        user_record = {"id": user["id"], "name": user["name"], "post_count": len(user.get("posts", []))}
        users.append(user_record)

        for post in user.get("posts", []):
            posts.append({
                **post,
                "author_id": user["id"],
                "author_name": user["name"],
            })

    return users, posts

@task
def generate_collection_report(users: List[dict], posts: List[dict]) -> dict:
    """Generate report on collected data."""
    return {
        "total_users": len(users),
        "total_posts": len(posts),
        "avg_posts_per_user": len(posts) / len(users) if users else 0,
        "users": users,
        "posts_sample": posts[:5],
    }

@flow
def graphql_collector_flow(query: str) -> dict:
    """Collect and flatten data from GraphQL endpoint."""
    response = query_graphql(query=query)
    users, posts = flatten_graphql_response(response=response)
    report = generate_collection_report(users=users, posts=posts)

    return report

# Local execution
if __name__ == "__main__":
    query = """
    query GetUsers {
        users {
            id
            name
            posts { id title }
        }
    }
    """
    result = graphql_collector_flow.run_local(query=query)
    print(f"Users: {result['total_users']}, Posts: {result['total_posts']}")

Key Concepts:

  • GraphQL query execution
  • Nested response flattening
  • Data normalization

19. Web Scraper Pipeline

Fetch, parse, and extract structured data from web content.

from dagy import flow, task
from typing import List
import re

@task
def fetch_webpage(url: str) -> str:
    """Fetch HTML content from webpage."""
    # Simulated HTML response
    return """
    <html>
        <head><title>Product Listings</title></head>
        <body>
            <div class="product">
                <h3>Widget A</h3>
                <span class="price">$19.99</span>
                <p class="description">High quality widget</p>
            </div>
            <div class="product">
                <h3>Widget B</h3>
                <span class="price">$29.99</span>
                <p class="description">Premium widget</p>
            </div>
            <div class="product">
                <h3>Widget C</h3>
                <span class="price">$9.99</span>
                <p class="description">Budget widget</p>
            </div>
        </body>
    </html>
    """

@task
def parse_products(html: str) -> List[dict]:
    """Parse HTML and extract product data."""
    products = []

    # Simple regex-based parsing (in production: use BeautifulSoup)
    product_pattern = r'<div class="product">(.*?)</div>'
    for product_html in re.findall(product_pattern, html, re.DOTALL):
        # Extract fields
        name_match = re.search(r'<h3>(.*?)</h3>', product_html)
        price_match = re.search(r'<span class="price">\$(.*?)</span>', product_html)
        desc_match = re.search(r'<p class="description">(.*?)</p>', product_html)

        if name_match and price_match:
            products.append({
                "name": name_match.group(1).strip(),
                "price": float(price_match.group(1)),
                "description": desc_match.group(1).strip() if desc_match else "",
            })

    return products

@task
def store_products(products: List[dict], destination: str) -> dict:
    """Store extracted products."""
    return {
        "stored_count": len(products),
        "destination": destination,
        "products": products,
    }

@flow
def web_scraper_flow(url: str) -> dict:
    """Scrape, parse, and store web content."""
    html = fetch_webpage(url=url)
    products = parse_products(html=html)
    result = store_products(products=products, destination="/data/products")

    return result

# Local execution
if __name__ == "__main__":
    result = web_scraper_flow.run_local(url="https://example.com/products")
    print(f"Products scraped: {result['stored_count']}")
    for product in result['products']:
        print(f"  {product['name']}: ${product['price']:.2f}")

Key Concepts:

  • HTML fetching and parsing
  • Regex-based data extraction
  • Data persistence

ML & AI

20. Model Training Pipeline

Prepare data, train model, and evaluate performance.

from dagy import flow, task
from typing import Tuple, Dict

@task
def prepare_training_data(dataset_path: str) -> Tuple[list, list]:
    """Load and prepare training data."""
    # Simulated data loading
    features = [[1.0, 2.0], [2.0, 3.0], [3.0, 4.0], [4.0, 5.0]]
    labels = [0, 1, 0, 1]
    return features, labels

@task
def split_train_test(
    features: list,
    labels: list,
    test_split: float = 0.2
) -> dict:
    """Split data into training and test sets."""
    split_idx = int(len(features) * (1 - test_split))

    return {
        "X_train": features[:split_idx],
        "y_train": labels[:split_idx],
        "X_test": features[split_idx:],
        "y_test": labels[split_idx:],
    }

@task
def train_model(X_train: list, y_train: list) -> dict:
    """Train ML model on training data."""
    # Simulated model training
    model_config = {
        "algorithm": "logistic_regression",
        "epochs": 100,
        "learning_rate": 0.01,
    }

    # Simulated training metrics
    training_log = {
        "initial_loss": 0.85,
        "final_loss": 0.23,
        "epochs_completed": 100,
        "converged": True,
    }

    return {
        "model_config": model_config,
        "training_log": training_log,
        "model_id": "model_v1_20240101",
    }

@task
def evaluate_model(
    model: dict,
    X_test: list,
    y_test: list
) -> dict:
    """Evaluate model performance on test set."""
    # Simulated evaluation
    return {
        "model_id": model["model_id"],
        "accuracy": 0.92,
        "precision": 0.89,
        "recall": 0.95,
        "f1_score": 0.92,
        "test_samples": len(X_test),
    }

@flow
def model_training_pipeline(dataset_path: str) -> dict:
    """Complete ML pipeline: prepare → train → evaluate."""
    features, labels = prepare_training_data(dataset_path=dataset_path)
    split_data = split_train_test(features=features, labels=labels, test_split=0.2)
    model = train_model(X_train=split_data["X_train"], y_train=split_data["y_train"])
    metrics = evaluate_model(
        model=model,
        X_test=split_data["X_test"],
        y_test=split_data["y_test"]
    )

    return {
        "model": model,
        "evaluation": metrics,
        "status": "success" if metrics["accuracy"] > 0.8 else "needs_improvement",
    }

# Local execution
if __name__ == "__main__":
    result = model_training_pipeline.run_local(dataset_path="/data/training.csv")
    print(f"Model: {result['model']['model_id']}")
    print(f"Accuracy: {result['evaluation']['accuracy']:.2%}")

Key Concepts:

  • Data preparation and splitting
  • Model training orchestration
  • Metrics computation and evaluation

21. Feature Engineering Pipeline

Compute derived features for machine learning models.

from dagy import flow, task
from typing import List, Dict
import math

@task
def load_raw_data() -> List[Dict]:
    """Load raw data for feature engineering."""
    return [
        {"id": 1, "price": 100, "quantity": 5, "timestamp": "2024-01-01"},
        {"id": 2, "price": 150, "quantity": 3, "timestamp": "2024-01-02"},
        {"id": 3, "price": 200, "quantity": 8, "timestamp": "2024-01-03"},
    ]

@task
def compute_interaction_features(data: List[Dict]) -> List[Dict]:
    """Compute interaction features."""
    enhanced = []
    for record in data:
        record_copy = record.copy()
        record_copy["revenue"] = record["price"] * record["quantity"]
        record_copy["price_qty_ratio"] = record["price"] / max(record["quantity"], 1)
        enhanced.append(record_copy)
    return enhanced

@task
def compute_statistical_features(data: List[Dict]) -> Tuple[List[Dict], dict]:
    """Compute statistical aggregation features."""
    prices = [r["price"] for r in data]
    avg_price = sum(prices) / len(prices) if prices else 0
    max_price = max(prices) if prices else 0

    enhanced = []
    for record in data:
        record_copy = record.copy()
        record_copy["price_zscore"] = (record["price"] - avg_price) / max(1, max_price - avg_price)
        record_copy["above_avg_price"] = 1 if record["price"] > avg_price else 0
        enhanced.append(record_copy)

    return enhanced, {"avg_price": avg_price, "max_price": max_price}

@task
def store_features(
    data: List[Dict],
    stats: dict,
    destination: str
) -> dict:
    """Store computed features."""
    return {
        "features_stored": len(data),
        "destination": destination,
        "feature_count": len(data[0]) if data else 0,
        "stats": stats,
    }

@flow
def feature_engineering_flow() -> dict:
    """Compute and store derived features."""
    raw_data = load_raw_data()
    with_interactions = compute_interaction_features(data=raw_data)
    with_stats, stats = compute_statistical_features(data=with_interactions)
    result = store_features(
        data=with_stats,
        stats=stats,
        destination="/data/features"
    )

    return {
        "result": result,
        "sample_record": with_stats[0] if with_stats else None,
    }

# Local execution
if __name__ == "__main__":
    result = feature_engineering_flow.run_local()
    print(f"Features stored: {result['result']['features_stored']}")
    print(f"Sample: {result['sample_record']}")

Key Concepts:

  • Interaction feature computation
  • Statistical feature generation
  • Feature normalization and scaling

22. Batch Inference Pipeline

Run predictions on batches of data.

from dagy import flow, task
from typing import List, Dict

@task
def load_model(model_path: str) -> dict:
    """Load trained model from disk."""
    # Simulated model loading
    return {
        "model_id": "model_v1",
        "path": model_path,
        "loaded_at": __import__("datetime").datetime.now().isoformat(),
    }

@task
def load_inference_batch(batch_id: str, batch_size: int = 32) -> List[Dict]:
    """Load batch of data for inference."""
    # Simulated batch loading
    return [
        {"id": i, "features": [float(i), float(i * 2)]}
        for i in range(1, batch_size + 1)
    ]

@task
def run_inference(model: dict, batch: List[Dict]) -> List[Dict]:
    """Run model inference on batch."""
    predictions = []
    for record in batch:
        # Simulated prediction
        pred_score = sum(record["features"]) / len(record["features"])
        predictions.append({
            "id": record["id"],
            "score": min(0.99, pred_score / 10),  # Normalize
            "prediction": 1 if pred_score > 5 else 0,
            "confidence": 0.92,
        })
    return predictions

@task
def aggregate_results(predictions: List[Dict]) -> dict:
    """Aggregate inference results."""
    scores = [p["score"] for p in predictions]
    class_counts = {}
    for pred in predictions:
        pred_class = pred["prediction"]
        class_counts[pred_class] = class_counts.get(pred_class, 0) + 1

    return {
        "total_predictions": len(predictions),
        "avg_score": sum(scores) / len(scores) if scores else 0,
        "class_distribution": class_counts,
        "predictions_sample": predictions[:5],
    }

@flow
def batch_inference_flow(batch_id: str, model_path: str) -> dict:
    """Run batch inference pipeline."""
    model = load_model(model_path=model_path)
    batch = load_inference_batch(batch_id=batch_id, batch_size=32)
    predictions = run_inference(model=model, batch=batch)
    aggregates = aggregate_results(predictions=predictions)

    return {
        "batch_id": batch_id,
        "results": aggregates,
    }

# Local execution
if __name__ == "__main__":
    result = batch_inference_flow.run_local(
        batch_id="batch_001",
        model_path="/models/model_v1.pkl"
    )
    print(f"Predictions: {result['results']['total_predictions']}")
    print(f"Avg score: {result['results']['avg_score']:.2f}")

Key Concepts:

  • Model loading from disk
  • Batch prediction execution
  • Result aggregation and distribution analysis

23. A/B Test Analysis

Analyze experimental results and compute statistical significance.

from dagy import flow, task
import math

@task
def fetch_experiment_results(experiment_id: str) -> tuple:
    """Fetch results for control and treatment groups."""
    # Simulated experiment data
    control = {
        "group": "control",
        "sample_size": 1000,
        "conversions": 120,
        "revenue": 45000,
    }
    treatment = {
        "group": "treatment",
        "sample_size": 1000,
        "conversions": 150,
        "revenue": 54000,
    }
    return control, treatment

@task
def compute_statistics(control: dict, treatment: dict) -> dict:
    """Compute statistical metrics for both groups."""
    ctrl_conv_rate = control["conversions"] / control["sample_size"]
    treat_conv_rate = treatment["conversions"] / treatment["sample_size"]

    ctrl_revenue_per_user = control["revenue"] / control["sample_size"]
    treat_revenue_per_user = treatment["revenue"] / treatment["sample_size"]

    return {
        "control": {
            "conversion_rate": ctrl_conv_rate,
            "revenue_per_user": ctrl_revenue_per_user,
            "total_revenue": control["revenue"],
        },
        "treatment": {
            "conversion_rate": treat_conv_rate,
            "revenue_per_user": treat_revenue_per_user,
            "total_revenue": treatment["revenue"],
        },
    }

@task
def compute_lift(stats: dict) -> dict:
    """Compute uplift metrics."""
    ctrl_conv = stats["control"]["conversion_rate"]
    treat_conv = stats["treatment"]["conversion_rate"]

    conversion_lift = ((treat_conv - ctrl_conv) / ctrl_conv * 100) if ctrl_conv > 0 else 0

    ctrl_revenue = stats["control"]["revenue_per_user"]
    treat_revenue = stats["treatment"]["revenue_per_user"]

    revenue_lift = ((treat_revenue - ctrl_revenue) / ctrl_revenue * 100) if ctrl_revenue > 0 else 0

    return {
        "conversion_lift_percentage": conversion_lift,
        "revenue_lift_percentage": revenue_lift,
        "winner": "treatment" if conversion_lift > 0 else "control",
    }

@flow
def ab_test_analysis_flow(experiment_id: str) -> dict:
    """Analyze A/B test results."""
    control, treatment = fetch_experiment_results(experiment_id=experiment_id)
    stats = compute_statistics(control=control, treatment=treatment)
    lift = compute_lift(stats=stats)

    return {
        "experiment_id": experiment_id,
        "statistics": stats,
        "lift": lift,
        "recommendation": f"Choose {lift['winner']} with {abs(lift['conversion_lift_percentage']):.1f}% lift",
    }

# Local execution
if __name__ == "__main__":
    result = ab_test_analysis_flow.run_local(experiment_id="exp_20240101")
    print(f"Winner: {result['lift']['winner']}")
    print(f"Conversion lift: {result['lift']['conversion_lift_percentage']:.2f}%")
    print(f"Recommendation: {result['recommendation']}")

Key Concepts:

  • Statistical metrics computation
  • Control vs treatment comparison
  • Lift calculation and significance testing

24. Data Labeling Pipeline

Prepare data for manual or automated labeling.

from dagy import flow, task
from typing import List, Dict

@task
def load_unlabeled_data(source: str) -> List[Dict]:
    """Load data that needs labeling."""
    return [
        {"id": 1, "text": "Great product!", "category": None},
        {"id": 2, "text": "Terrible quality", "category": None},
        {"id": 3, "text": "Average at best", "category": None},
    ]

@task
def preprocess_for_labeling(data: List[Dict]) -> List[Dict]:
    """Preprocess data to make labeling easier."""
    processed = []
    for record in data:
        processed_record = record.copy()
        # Normalize text
        processed_record["text_normalized"] = record["text"].lower().strip()
        # Add metadata
        processed_record["text_length"] = len(record["text"])
        processed_record["word_count"] = len(record["text"].split())
        processed_record["ready_for_labeling"] = True
        processed.append(processed_record)
    return processed

@task
def create_labeling_batch(
    data: List[Dict],
    batch_size: int = 10
) -> List[List[Dict]]:
    """Create batches for labeling."""
    batches = []
    for i in range(0, len(data), batch_size):
        batches.append(data[i:i + batch_size])
    return batches

@task
def store_labeling_tasks(batches: List[List[Dict]], destination: str) -> dict:
    """Store labeling tasks for workers."""
    return {
        "total_tasks": sum(len(b) for b in batches),
        "batch_count": len(batches),
        "destination": destination,
        "task_format": "sentiment_classification",
    }

@flow
def labeling_pipeline(source: str) -> dict:
    """Prepare data for labeling."""
    data = load_unlabeled_data(source=source)
    preprocessed = preprocess_for_labeling(data=data)
    batches = create_labeling_batch(data=preprocessed, batch_size=10)
    result = store_labeling_tasks(batches=batches, destination="/labeling/tasks")

    return {
        "result": result,
        "total_records": len(data),
    }

# Local execution
if __name__ == "__main__":
    result = labeling_pipeline.run_local(source="feedback_data")
    print(f"Total records: {result['total_records']}")
    print(f"Batches created: {result['result']['batch_count']}")

Key Concepts:

  • Data normalization for labeling
  • Batch creation for workers
  • Task storage and tracking

25. Model Monitoring Pipeline

Detect data drift and model degradation.

from dagy import flow, task
from typing import Dict

@task
def fetch_model_predictions(model_id: str, time_window: str = "1d") -> list:
    """Fetch recent model predictions."""
    # Simulated prediction data
    return [
        {"timestamp": "2024-01-01T10:00:00Z", "prediction": 0.85, "actual": 1},
        {"timestamp": "2024-01-01T11:00:00Z", "prediction": 0.72, "actual": 0},
        {"timestamp": "2024-01-01T12:00:00Z", "prediction": 0.91, "actual": 1},
    ]

@task
def compute_performance_metrics(predictions: list) -> dict:
    """Compute current performance metrics."""
    correct = sum(1 for p in predictions if (p["prediction"] > 0.5) == p["actual"])
    accuracy = correct / len(predictions) if predictions else 0

    return {
        "accuracy": accuracy,
        "prediction_count": len(predictions),
        "avg_confidence": sum(p["prediction"] for p in predictions) / len(predictions) if predictions else 0,
    }

@task
def detect_drift(current_metrics: dict, baseline_metrics: dict = None) -> dict:
    """Detect if model performance has degraded."""
    if baseline_metrics is None:
        baseline_metrics = {"accuracy": 0.92, "avg_confidence": 0.87}

    accuracy_drift = baseline_metrics["accuracy"] - current_metrics["accuracy"]
    confidence_drift = baseline_metrics["avg_confidence"] - current_metrics["avg_confidence"]

    has_drift = abs(accuracy_drift) > 0.05

    return {
        "has_drift": has_drift,
        "accuracy_change": accuracy_drift,
        "confidence_change": confidence_drift,
        "drift_severity": "high" if abs(accuracy_drift) > 0.1 else "low" if has_drift else "none",
    }

@task
def generate_alert(drift_info: dict) -> dict:
    """Generate alert if significant drift detected."""
    if drift_info["has_drift"]:
        return {
            "alert_level": "warning" if drift_info["drift_severity"] == "high" else "info",
            "message": f"Model drift detected: {drift_info['accuracy_change']:.2%}",
            "action_required": drift_info["drift_severity"] == "high",
        }
    return {"alert_level": "none", "message": "Model performing normally", "action_required": False}

@flow
def monitoring_pipeline(model_id: str) -> dict:
    """Monitor model performance and detect drift."""
    predictions = fetch_model_predictions(model_id=model_id)
    metrics = compute_performance_metrics(predictions=predictions)
    drift = detect_drift(current_metrics=metrics)
    alert = generate_alert(drift_info=drift)

    return {
        "model_id": model_id,
        "metrics": metrics,
        "drift": drift,
        "alert": alert,
    }

# Local execution
if __name__ == "__main__":
    result = monitoring_pipeline.run_local(model_id="model_v1")
    print(f"Accuracy: {result['metrics']['accuracy']:.2%}")
    print(f"Alert: {result['alert']['message']}")

Key Concepts:

  • Performance metric tracking
  • Drift detection algorithms
  • Alert generation and escalation

File Processing

26. Image Resize Pipeline

Batch resize images and maintain aspect ratio.

from dagy import flow, task
from typing import List, Tuple

@task
def list_image_files(directory: str) -> List[str]:
    """List all image files in directory."""
    # Simulated file listing
    return [
        f"{directory}/image_001.jpg",
        f"{directory}/image_002.jpg",
        f"{directory}/image_003.png",
    ]

@task
def resize_image(
    image_path: str,
    target_width: int,
    target_height: int,
) -> dict:
    """Resize single image maintaining aspect ratio."""
    # Simulated image resize
    filename = image_path.split("/")[-1]
    return {
        "original": image_path,
        "output": f"/resized/{filename}",
        "original_dims": (1920, 1080),
        "target_dims": (target_width, target_height),
        "maintained_aspect_ratio": True,
    }

@task
def batch_resize(
    images: List[str],
    target_width: int = 800,
    target_height: int = 600,
) -> List[dict]:
    """Resize multiple images in batch."""
    results = []
    for image in images:
        result = resize_image(image, target_width, target_height)
        results.append(result)
    return results

@task
def compute_resize_stats(results: List[dict]) -> dict:
    """Compute batch resize statistics."""
    return {
        "total_images": len(results),
        "successful": len(results),
        "total_space_saved": sum(
            (1920 * 1080 - 800 * 600)
            for _ in results
        ),
        "avg_compression": 0.65,
    }

@flow
def image_resize_pipeline(
    source_directory: str,
    target_width: int = 800,
    target_height: int = 600,
) -> dict:
    """Resize batch of images."""
    images = list_image_files(directory=source_directory)
    resized = batch_resize(images=images, target_width=target_width, target_height=target_height)
    stats = compute_resize_stats(results=resized)

    return {
        "stats": stats,
        "outputs": resized,
    }

# Local execution
if __name__ == "__main__":
    result = image_resize_pipeline.run_local(
        source_directory="/images/originals",
        target_width=800,
        target_height=600
    )
    print(f"Resized: {result['stats']['total_images']} images")
    print(f"Space saved: {result['stats']['total_space_saved']} bytes")

Key Concepts:

  • Batch file processing
  • Aspect ratio preservation
  • Compression metrics

27. PDF Processing Pipeline

Extract text and metadata from PDF files.

from dagy import flow, task
from typing import List, Dict

@task
def list_pdf_files(directory: str) -> List[str]:
    """List PDF files in directory."""
    return [
        f"{directory}/document_001.pdf",
        f"{directory}/document_002.pdf",
    ]

@task
def extract_pdf_content(pdf_path: str) -> dict:
    """Extract text and metadata from PDF."""
    # Simulated PDF extraction
    return {
        "path": pdf_path,
        "title": "Sample Document",
        "pages": 5,
        "text": "Lorem ipsum dolor sit amet...",
        "metadata": {
            "author": "John Doe",
            "created": "2024-01-01",
            "modified": "2024-01-15",
        },
    }

@task
def batch_extract_pdfs(pdf_files: List[str]) -> List[dict]:
    """Extract content from multiple PDFs."""
    results = []
    for pdf_file in pdf_files:
        content = extract_pdf_content(pdf_path=pdf_file)
        results.append(content)
    return results

@task
def index_pdf_content(extractions: List[dict]) -> dict:
    """Create searchable index of extracted content."""
    index = {
        "total_documents": len(extractions),
        "total_pages": sum(doc["pages"] for doc in extractions),
        "documents": [
            {
                "path": doc["path"],
                "title": doc["title"],
                "author": doc["metadata"].get("author"),
                "page_count": doc["pages"],
            }
            for doc in extractions
        ],
    }
    return index

@flow
def pdf_processing_flow(directory: str) -> dict:
    """Extract and index PDF content."""
    pdfs = list_pdf_files(directory=directory)
    extractions = batch_extract_pdfs(pdf_files=pdfs)
    index = index_pdf_content(extractions=extractions)

    return index

# Local execution
if __name__ == "__main__":
    result = pdf_processing_flow.run_local(directory="/documents/pdfs")
    print(f"Documents processed: {result['total_documents']}")
    print(f"Total pages: {result['total_pages']}")

Key Concepts:

  • PDF text extraction
  • Metadata parsing
  • Content indexing

28. Log File Analyzer

Parse, aggregate, and analyze log files.

from dagy import flow, task
from typing import List, Dict
import re
from collections import Counter

@task
def read_log_file(log_path: str) -> List[str]:
    """Read log file line by line."""
    # Simulated log data
    return [
        "2024-01-01T10:00:00 ERROR Database connection failed",
        "2024-01-01T10:01:00 INFO Request processed successfully",
        "2024-01-01T10:02:00 WARNING Slow query detected",
        "2024-01-01T10:03:00 ERROR Authentication failed for user_123",
        "2024-01-01T10:04:00 INFO Request processed successfully",
    ]

@task
def parse_log_lines(lines: List[str]) -> List[dict]:
    """Parse log lines into structured format."""
    parsed = []
    pattern = r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) (\w+) (.*)"

    for line in lines:
        match = re.match(pattern, line)
        if match:
            parsed.append({
                "timestamp": match.group(1),
                "level": match.group(2),
                "message": match.group(3),
            })

    return parsed

@task
def aggregate_log_stats(logs: List[dict]) -> dict:
    """Aggregate logging statistics."""
    level_counts = Counter(log["level"] for log in logs)

    return {
        "total_logs": len(logs),
        "by_level": dict(level_counts),
        "error_count": level_counts.get("ERROR", 0),
        "warning_count": level_counts.get("WARNING", 0),
        "info_count": level_counts.get("INFO", 0),
    }

@task
def identify_patterns(logs: List[dict]) -> List[str]:
    """Identify common error patterns."""
    errors = [log["message"] for log in logs if log["level"] == "ERROR"]
    # Find most common error types
    return errors[:3] if errors else []

@flow
def log_analysis_flow(log_path: str) -> dict:
    """Analyze log file for patterns and statistics."""
    lines = read_log_file(log_path=log_path)
    parsed = parse_log_lines(lines=lines)
    stats = aggregate_log_stats(logs=parsed)
    patterns = identify_patterns(logs=parsed)

    return {
        "statistics": stats,
        "error_patterns": patterns,
        "log_sample": parsed[:3],
    }

# Local execution
if __name__ == "__main__":
    result = log_analysis_flow.run_local(log_path="/var/log/app.log")
    print(f"Total logs: {result['statistics']['total_logs']}")
    print(f"Error count: {result['statistics']['error_count']}")
    print(f"Patterns: {result['error_patterns']}")

Key Concepts:

  • Regex-based log parsing
  • Log level aggregation
  • Pattern identification

29. Archive and Cleanup

Compress old files and delete expired content.

from dagy import flow, task
from datetime import datetime, timedelta
from typing import List, Tuple

@task
def find_old_files(directory: str, days_old: int = 30) -> List[dict]:
    """Find files older than specified days."""
    # Simulated old file discovery
    cutoff_date = datetime.now() - timedelta(days=days_old)

    return [
        {"path": f"{directory}/file_001.log", "size": 5242880, "modified": (datetime.now() - timedelta(days=45)).isoformat()},
        {"path": f"{directory}/file_002.log", "size": 3145728, "modified": (datetime.now() - timedelta(days=60)).isoformat()},
    ]

@task
def archive_files(old_files: List[dict], archive_path: str) -> dict:
    """Compress old files into archive."""
    total_size = sum(f["size"] for f in old_files)
    # Simulated compression (assume 70% compression ratio)
    compressed_size = int(total_size * 0.3)

    return {
        "files_archived": len(old_files),
        "original_size": total_size,
        "compressed_size": compressed_size,
        "archive_path": archive_path,
        "compression_ratio": 1 - (compressed_size / total_size),
    }

@task
def delete_expired_files(old_files: List[dict]) -> dict:
    """Delete files after archiving."""
    deleted_count = len(old_files)
    total_freed = sum(f["size"] for f in old_files)

    return {
        "files_deleted": deleted_count,
        "space_freed": total_freed,
        "status": "success",
    }

@flow
def archive_cleanup_flow(directory: str, days_old: int = 30) -> dict:
    """Archive old files and cleanup."""
    old_files = find_old_files(directory=directory, days_old=days_old)
    archive_result = archive_files(old_files=old_files, archive_path=f"/archive/{datetime.now().isoformat()}.tar.gz")
    cleanup_result = delete_expired_files(old_files=old_files)

    return {
        "archive": archive_result,
        "cleanup": cleanup_result,
        "total_space_freed": archive_result["original_size"],
    }

# Local execution
if __name__ == "__main__":
    result = archive_cleanup_flow.run_local(directory="/var/log", days_old=30)
    print(f"Archived: {result['archive']['files_archived']} files")
    print(f"Space freed: {result['cleanup']['space_freed']} bytes")

Key Concepts:

  • File age detection
  • Compression and archiving
  • Cleanup operations

30. File Format Migration

Convert between different file formats.

from dagy import flow, task
from typing import List

@task
def list_files_to_convert(directory: str, source_format: str) -> List[str]:
    """List files of source format."""
    return [
        f"{directory}/data_001.csv",
        f"{directory}/data_002.csv",
    ]

@task
def convert_format(
    file_path: str,
    source_format: str,
    target_format: str,
) -> dict:
    """Convert single file between formats."""
    filename = file_path.split("/")[-1].rsplit(".", 1)[0]
    output_path = f"/converted/{filename}.{target_format}"

    return {
        "source": file_path,
        "target": output_path,
        "source_format": source_format,
        "target_format": target_format,
        "status": "success",
    }

@task
def batch_convert(
    files: List[str],
    source_format: str,
    target_format: str,
) -> List[dict]:
    """Convert multiple files in batch."""
    results = []
    for file in files:
        result = convert_format(file, source_format, target_format)
        results.append(result)
    return results

@task
def validate_conversions(conversions: List[dict]) -> dict:
    """Validate conversion results."""
    successful = len([c for c in conversions if c["status"] == "success"])

    return {
        "total_conversions": len(conversions),
        "successful": successful,
        "failed": len(conversions) - successful,
        "success_rate": successful / len(conversions) if conversions else 0,
    }

@flow
def format_migration_flow(
    directory: str,
    source_format: str = "csv",
    target_format: str = "json",
) -> dict:
    """Migrate files between formats."""
    files = list_files_to_convert(directory=directory, source_format=source_format)
    conversions = batch_convert(
        files=files,
        source_format=source_format,
        target_format=target_format
    )
    validation = validate_conversions(conversions=conversions)

    return {
        "validation": validation,
        "conversions": conversions,
    }

# Local execution
if __name__ == "__main__":
    result = format_migration_flow.run_local(
        directory="/data",
        source_format="csv",
        target_format="json"
    )
    print(f"Success rate: {result['validation']['success_rate']:.1%}")

Key Concepts:

  • File format conversion
  • Batch processing
  • Conversion validation

Retry & Error Handling

31. Basic Retry

Simple retry with fixed delay between attempts.

from dagy import flow, task

@task(
    retries=3,
    retry_delay_seconds=2,
)
def unreliable_api_call(endpoint: str) -> dict:
    """Call external API that may fail."""
    import random

    # Simulates random failures (70% success rate)
    if random.random() < 0.7:
        return {"status": "success", "data": "API response"}
    else:
        raise Exception("API temporarily unavailable")

@task
def process_response(response: dict) -> str:
    """Process successful API response."""
    return f"Processed: {response['data']}"

@flow
def basic_retry_flow(endpoint: str) -> str:
    """Flow with basic retry on API calls."""
    response = unreliable_api_call(endpoint=endpoint)
    result = process_response(response=response)
    return result

# Local execution
if __name__ == "__main__":
    result = basic_retry_flow.run_local(endpoint="https://api.example.com/data")
    print(result)

Key Concepts:

  • retries parameter sets retry count
  • retry_delay_seconds for fixed delays between retries
  • Automatic retry on exception

32. Exponential Backoff

Increasing delays between retries to avoid overwhelming system.

from dagy import flow, task

def exponential_backoff_schedule(attempt: int) -> float:
    """Calculate exponential backoff delay."""
    # 2^attempt seconds: 1, 2, 4, 8, 16...
    return min(2 ** attempt, 60)  # Cap at 60 seconds

@task(
    retries=5,
    retry_delay_seconds=1,  # Base delay
    retry_condition_fn=lambda ex: "timeout" in str(ex).lower() or "unavailable" in str(ex).lower(),
)
def flaky_database_call(query: str) -> list:
    """Database call that may timeout."""
    import random

    if random.random() < 0.8:
        return [{"id": 1, "value": "data"}]
    else:
        raise TimeoutError("Database connection timeout")

@task
def aggregate_results(data: list) -> dict:
    """Aggregate database results."""
    return {
        "rows": len(data),
        "data": data,
    }

@flow
def exponential_backoff_flow(query: str) -> dict:
    """Flow with exponential backoff retry strategy."""
    data = flaky_database_call(query=query)
    result = aggregate_results(data=data)
    return result

# Local execution
if __name__ == "__main__":
    result = exponential_backoff_flow.run_local(query="SELECT * FROM users")
    print(result)

Key Concepts:

  • Exponential backoff reduces system load
  • retry_condition_fn for selective retries
  • Backoff cap prevents excessive waits

33. Retry with Jitter

Add randomness to prevent thundering herd problem.

from dagy import flow, task
import random

@task(
    retries=4,
    retry_delay_seconds=2,
    retry_jitter_factor=0.3,  # Add up to 30% random jitter
)
def rate_limited_endpoint(item_id: str) -> dict:
    """Call rate-limited endpoint."""
    import random

    # Simulates rate limiting
    if random.random() < 0.6:
        return {"item_id": item_id, "data": "success"}
    else:
        raise Exception("Rate limit exceeded")

@task
def process_item(item_data: dict) -> str:
    """Process fetched item."""
    return f"Processed item {item_data['item_id']}"

@flow
def jitter_retry_flow(item_id: str) -> str:
    """Flow with jittered retry delays."""
    item = rate_limited_endpoint(item_id=item_id)
    result = process_item(item_data=item)
    return result

# Local execution
if __name__ == "__main__":
    result = jitter_retry_flow.run_local(item_id="item_123")
    print(result)

Key Concepts:

  • retry_jitter_factor adds randomness
  • Prevents synchronized retries from multiple clients
  • Improves overall system stability

34. Conditional Retry

Retry only on specific exception types.

from dagy import flow, task

class TransientError(Exception):
    """Temporary error that should be retried."""
    pass

class PermanentError(Exception):
    """Permanent error that shouldn't be retried."""
    pass

def should_retry(exception: Exception) -> bool:
    """Determine if exception warrants retry."""
    return isinstance(exception, TransientError)

@task(
    retries=3,
    retry_delay_seconds=2,
    retry_condition_fn=should_retry,
)
def operation_with_error_types(attempt_count: int = 0) -> str:
    """Operation that may fail with different error types."""
    import random

    error_type = random.choice(["transient", "permanent", "success"])

    if error_type == "success":
        return "Operation successful"
    elif error_type == "transient":
        raise TransientError("Temporary network issue")
    else:
        raise PermanentError("Invalid input provided")

@task
def handle_result(result: str) -> dict:
    """Handle operation result."""
    return {"result": result, "status": "completed"}

@flow
def conditional_retry_flow() -> dict:
    """Flow with conditional retry logic."""
    try:
        result = operation_with_error_types()
        return handle_result(result=result)
    except PermanentError as e:
        return {"error": str(e), "status": "failed", "retryable": False}

# Local execution
if __name__ == "__main__":
    result = conditional_retry_flow.run_local()
    print(result)

Key Concepts:

  • retry_condition_fn filters which exceptions trigger retries
  • Custom exception types for different failure modes
  • Fail-fast on permanent errors

35. Custom Retry Strategy

Implement custom retry logic beyond standard parameters.

from dagy import flow, task
from datetime import datetime, timedelta

@task
def smart_retry_call(
    endpoint: str,
    max_attempts: int = 5,
) -> dict:
    """Task with custom retry strategy."""
    import random

    # Simulate call with exponential backoff
    for attempt in range(1, max_attempts + 1):
        try:
            if random.random() < 0.7:
                return {
                    "success": True,
                    "attempt": attempt,
                    "timestamp": datetime.now().isoformat(),
                    "endpoint": endpoint,
                }
            else:
                raise Exception(f"Attempt {attempt} failed")
        except Exception as e:
            if attempt == max_attempts:
                raise

            # Exponential backoff within task
            wait_time = min(2 ** (attempt - 1), 30)
            print(f"Retry {attempt + 1} after {wait_time}s")

@task
def log_retry_details(result: dict) -> str:
    """Log details about retry process."""
    return f"Succeeded on attempt {result['attempt']}"

@flow
def custom_retry_flow(endpoint: str) -> str:
    """Flow with custom retry strategy embedded in task."""
    result = smart_retry_call(endpoint=endpoint, max_attempts=5)
    log_msg = log_retry_details(result=result)
    return log_msg

# Local execution
if __name__ == "__main__":
    result = custom_retry_flow.run_local(endpoint="https://api.example.com")
    print(result)

Key Concepts:

  • Embedded retry logic for complex strategies
  • Custom backoff calculations
  • Attempt tracking

36. Timeout with Fallback

Handle timeout gracefully with fallback logic.

from dagy import flow, task
import time

@task(
    timeout_seconds=5,
)
def slow_operation(duration: float = 3) -> str:
    """Operation that may timeout."""
    time.sleep(duration)
    return "Operation completed"

@task
def fallback_operation(original_input: str) -> str:
    """Fallback when timeout occurs."""
    return f"Using cached result for {original_input}"

@task
def merge_results(primary: dict, fallback: str = None) -> dict:
    """Merge primary and fallback results."""
    return {
        "primary_success": "error" not in str(primary),
        "used_fallback": fallback is not None,
        "result": primary if "error" not in str(primary) else fallback,
    }

@flow
def timeout_fallback_flow(query: str) -> dict:
    """Handle timeout with fallback strategy."""
    try:
        primary = slow_operation(duration=3)
        fallback_result = None
    except TimeoutError:
        primary = {"error": "Operation timed out"}
        fallback_result = fallback_operation(original_input=query)

    result = merge_results(primary=primary, fallback=fallback_result)
    return result

# Local execution
if __name__ == "__main__":
    result = timeout_fallback_flow.run_local(query="search_term")
    print(result)

Key Concepts:

  • timeout_seconds parameter for task timeouts
  • Fallback execution on timeout
  • Graceful degradation pattern

Scheduling

37. Daily Report

Cron-scheduled daily aggregation and reporting.

from dagy import flow, task
from datetime import datetime, timedelta

@task
def fetch_daily_metrics(date: str) -> dict:
    """Fetch metrics for specified date."""
    return {
        "date": date,
        "active_users": 1250,
        "transactions": 3400,
        "revenue": 45230.50,
        "errors": 12,
    }

@task
def compute_daily_summary(metrics: dict) -> dict:
    """Compute daily summary statistics."""
    return {
        **metrics,
        "avg_transaction": metrics["revenue"] / metrics["transactions"],
        "error_rate": (metrics["errors"] / metrics["transactions"] * 100),
    }

@task
def generate_report(summary: dict) -> str:
    """Generate human-readable daily report."""
    return f"""
Daily Report - {summary['date']}
================================
Active Users: {summary['active_users']}
Transactions: {summary['transactions']}
Revenue: ${summary['revenue']:,.2f}
Errors: {summary['errors']} ({summary['error_rate']:.2f}%)
Avg Transaction: ${summary['avg_transaction']:.2f}
"""

@task
def send_report_email(report: str, recipients: list) -> dict:
    """Send report via email."""
    return {
        "status": "sent",
        "recipients": recipients,
        "timestamp": datetime.now().isoformat(),
    }

@flow(
    name="daily_report",
    description="Daily metrics aggregation and report",
)
def daily_report_flow(report_date: str = None) -> dict:
    """Daily scheduled reporting flow."""
    if report_date is None:
        report_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")

    metrics = fetch_daily_metrics(date=report_date)
    summary = compute_daily_summary(metrics=metrics)
    report = generate_report(summary=summary)
    email_result = send_report_email(
        report=report,
        recipients=["ops@example.com", "ceo@example.com"]
    )

    return {"report": report, "email": email_result}

# Local execution (would be scheduled via deployment)
if __name__ == "__main__":
    result = daily_report_flow.run_local(report_date="2024-01-01")
    print(result["report"])

Key Concepts:

  • Daily scheduling pattern
  • Date parameter for flexibility
  • Email notifications

38. Hourly Sync

Interval-based data synchronization.

from dagy import flow, task
from datetime import datetime, timedelta

@task
def check_sync_status(last_sync: str = None) -> dict:
    """Check when last sync occurred."""
    if last_sync is None:
        last_sync = (datetime.now() - timedelta(hours=1)).isoformat()

    return {
        "last_sync": last_sync,
        "time_since_sync": 3600,  # seconds
        "needs_sync": True,
    }

@task
def fetch_incremental_data(since: str) -> list:
    """Fetch data changed since last sync."""
    return [
        {"id": 1, "changed_at": "2024-01-01T10:00:00Z", "action": "create"},
        {"id": 2, "changed_at": "2024-01-01T10:30:00Z", "action": "update"},
        {"id": 3, "changed_at": "2024-01-01T11:00:00Z", "action": "delete"},
    ]

@task
def sync_to_target(data: list) -> dict:
    """Sync data to target system."""
    return {
        "synced_count": len(data),
        "timestamp": datetime.now().isoformat(),
        "status": "success",
    }

@task
def update_sync_checkpoint(sync_time: str) -> str:
    """Update checkpoint for next sync."""
    return f"Checkpoint updated to {sync_time}"

@flow(
    name="hourly_sync",
    description="Hourly incremental data sync",
)
def hourly_sync_flow(last_sync: str = None) -> dict:
    """Hourly data synchronization."""
    status = check_sync_status(last_sync=last_sync)
    data = fetch_incremental_data(since=status["last_sync"])
    sync_result = sync_to_target(data=data)
    checkpoint = update_sync_checkpoint(sync_time=datetime.now().isoformat())

    return {
        "sync_result": sync_result,
        "checkpoint": checkpoint,
    }

# Local execution
if __name__ == "__main__":
    result = hourly_sync_flow.run_local()
    print(result)

Key Concepts:

  • Hourly execution pattern
  • Incremental data fetching
  • Checkpoint tracking

39. Business Hours Only

Cron scheduling for weekdays 9-5.

from dagy import flow, task
from datetime import datetime

@task
def get_business_requests(queue: str) -> list:
    """Get requests from business hours queue."""
    return [
        {"id": 1, "type": "support", "priority": "high"},
        {"id": 2, "type": "feature", "priority": "medium"},
    ]

@task
def process_request(request: dict) -> dict:
    """Process single business request."""
    return {
        **request,
        "processed_at": datetime.now().isoformat(),
        "status": "completed",
    }

@task
def batch_process(requests: list) -> list:
    """Process multiple requests."""
    return [process_request(req) for req in requests]

@task
def send_summary(processed: list) -> dict:
    """Send summary to business team."""
    return {
        "total_processed": len(processed),
        "timestamp": datetime.now().isoformat(),
    }

@flow(
    name="business_hours_processor",
    description="Process requests during business hours",
)
def business_hours_flow() -> dict:
    """Process requests only during business hours."""
    requests = get_business_requests(queue="business")
    processed = batch_process(requests=requests)
    summary = send_summary(processed=processed)

    return {
        "processed_requests": processed,
        "summary": summary,
    }

# Local execution
if __name__ == "__main__":
    result = business_hours_flow.run_local()
    print(f"Processed: {result['summary']['total_processed']}")

Key Concepts:

  • Business hours scheduling (weekdays 9-5)
  • Request batching
  • Team notifications

40. Monthly Billing

First-of-month processing.

from dagy import flow, task
from datetime import datetime

@task
def identify_billable_accounts() -> list:
    """Find accounts to bill this month."""
    return [
        {"account_id": "acct_1", "subscription_tier": "pro", "amount": 99.99},
        {"account_id": "acct_2", "subscription_tier": "enterprise", "amount": 499.99},
        {"account_id": "acct_3", "subscription_tier": "pro", "amount": 99.99},
    ]

@task
def generate_invoices(accounts: list) -> list:
    """Generate invoices for all accounts."""
    invoices = []
    for account in accounts:
        invoices.append({
            "invoice_id": f"inv_{account['account_id']}_{datetime.now().strftime('%Y%m%d')}",
            "account_id": account["account_id"],
            "amount": account["amount"],
            "date": datetime.now().isoformat(),
        })
    return invoices

@task
def process_payments(invoices: list) -> dict:
    """Process payments for invoices."""
    return {
        "invoices_processed": len(invoices),
        "total_revenue": sum(inv["amount"] for inv in invoices),
        "timestamp": datetime.now().isoformat(),
    }

@task
def notify_accounts(invoices: list) -> str:
    """Send payment notifications."""
    return f"Notified {len(invoices)} accounts"

@flow(
    name="monthly_billing",
    description="Monthly billing and invoice processing",
)
def monthly_billing_flow() -> dict:
    """Monthly billing pipeline."""
    accounts = identify_billable_accounts()
    invoices = generate_invoices(accounts=accounts)
    payment_result = process_payments(invoices=invoices)
    notification = notify_accounts(invoices=invoices)

    return {
        "payment_result": payment_result,
        "notification": notification,
    }

# Local execution
if __name__ == "__main__":
    result = monthly_billing_flow.run_local()
    print(f"Revenue: ${result['payment_result']['total_revenue']:.2f}")

Key Concepts:

  • Monthly scheduling pattern
  • Batch invoice generation
  • Payment processing orchestration

41. One-Time Migration

Run-once setup flow for data migration.

from dagy import flow, task

@task
def backup_current_state() -> str:
    """Create backup before migration."""
    return f"Backup created at /backup/{__import__('datetime').datetime.now().isoformat()}"

@task
def migrate_data(backup_location: str) -> dict:
    """Perform data migration."""
    return {
        "backup_location": backup_location,
        "records_migrated": 5000,
        "errors": 0,
        "status": "success",
    }

@task
def validate_migration(result: dict) -> bool:
    """Validate migration results."""
    return result["errors"] == 0

@task
def commit_migration(validated: bool) -> dict:
    """Finalize migration if valid."""
    if validated:
        return {
            "migration_status": "committed",
            "timestamp": __import__("datetime").datetime.now().isoformat(),
        }
    return {"migration_status": "rolled_back"}

@flow(
    name="data_migration",
    description="One-time data migration flow",
)
def migration_flow() -> dict:
    """Execute one-time data migration."""
    backup = backup_current_state()
    migration = migrate_data(backup_location=backup)
    is_valid = validate_migration(result=migration)
    commit = commit_migration(validated=is_valid)

    return {
        "backup": backup,
        "migration": migration,
        "commit": commit,
    }

# Local execution
if __name__ == "__main__":
    result = migration_flow.run_local()
    print(f"Status: {result['commit']['migration_status']}")

Key Concepts:

  • One-time migration pattern
  • Backup and validation
  • Commit/rollback logic

Notifications & Monitoring

42. Failure Alert Flow

Notify operators when pipeline fails.

from dagy import flow, task

@task(
    on_failure="log_failure"
)
def risky_operation(item_id: str) -> dict:
    """Operation that might fail."""
    if item_id == "bad_item":
        raise ValueError(f"Cannot process {item_id}")
    return {"item_id": item_id, "processed": True}

@task
def log_failure(error: Exception, context: dict) -> str:
    """Log failure details."""
    return f"Failure logged: {str(error)}"

@task
def send_alert(error_message: str) -> dict:
    """Send alert notification."""
    return {
        "alert_sent": True,
        "channel": "slack",
        "message": f"Pipeline failed: {error_message}",
        "recipients": ["ops@example.com"],
    }

@task
def create_incident(error_info: str) -> str:
    """Create incident ticket."""
    return f"Incident #INC-001 created for {error_info}"

@flow
def failure_alert_flow(item_id: str) -> dict:
    """Flow with failure notifications."""
    try:
        result = risky_operation(item_id=item_id)
        return {"status": "success", "result": result}
    except Exception as e:
        error_msg = str(e)
        log_msg = log_failure(error=e, context={"item_id": item_id})
        alert = send_alert(error_message=error_msg)
        incident = create_incident(error_info=error_msg)

        return {
            "status": "failed",
            "error": error_msg,
            "alert": alert,
            "incident": incident,
        }

# Local execution
if __name__ == "__main__":
    result = failure_alert_flow.run_local(item_id="bad_item")
    print(result)

Key Concepts:

  • Error handling with callbacks
  • Alert escalation
  • Incident tracking

43. SLA Monitoring Flow

Alert when run duration exceeds SLA threshold.

from dagy import flow, task
import time
from datetime import datetime

@task
def start_sla_timer() -> dict:
    """Record start time for SLA tracking."""
    return {"start_time": datetime.now()}

@task
def execute_work(duration: float = 2) -> dict:
    """Execute main work."""
    time.sleep(duration)
    return {"work_completed": True}

@task
def check_sla(start_time: dict, sla_seconds: int = 5) -> dict:
    """Check if SLA was met."""
    end_time = datetime.now()
    elapsed = (end_time - start_time["start_time"]).total_seconds()

    return {
        "elapsed_seconds": elapsed,
        "sla_seconds": sla_seconds,
        "sla_met": elapsed <= sla_seconds,
        "exceeded_by": max(0, elapsed - sla_seconds),
    }

@task
def alert_sla_breach(sla_info: dict) -> dict:
    """Alert if SLA breached."""
    if not sla_info["sla_met"]:
        return {
            "alert_level": "warning",
            "message": f"SLA breached by {sla_info['exceeded_by']:.1f}s",
            "severity": "high" if sla_info["exceeded_by"] > 10 else "medium",
        }
    return {"alert_level": "none", "message": "SLA met"}

@flow
def sla_monitoring_flow(sla_threshold: int = 5) -> dict:
    """Monitor operation against SLA."""
    timer = start_sla_timer()
    work = execute_work(duration=3)
    sla_check = check_sla(start_time=timer, sla_seconds=sla_threshold)
    alert = alert_sla_breach(sla_info=sla_check)

    return {
        "sla_check": sla_check,
        "alert": alert,
    }

# Local execution
if __name__ == "__main__":
    result = sla_monitoring_flow.run_local(sla_threshold=5)
    print(f"SLA: {result['sla_check']['sla_met']}")
    print(f"Alert: {result['alert']['message']}")

Key Concepts:

  • SLA tracking
  • Duration monitoring
  • Performance alerts

44. Success Summary

Send completion digest with metrics.

from dagy import flow, task
from datetime import datetime

@task
def execute_workflow_steps() -> dict:
    """Execute main workflow."""
    return {
        "tasks_completed": 5,
        "records_processed": 1250,
        "errors": 0,
        "start_time": datetime.now(),
    }

@task
def compute_metrics(result: dict) -> dict:
    """Compute workflow metrics."""
    return {
        **result,
        "success_rate": 1.0,
        "end_time": datetime.now(),
        "duration_seconds": 42,
        "throughput": result["records_processed"] / 42,
    }

@task
def format_summary(metrics: dict) -> str:
    """Format metrics as summary message."""
    return f"""
Workflow Completed Successfully
===============================
Tasks Completed: {metrics['tasks_completed']}
Records Processed: {metrics['records_processed']}
Duration: {metrics['duration_seconds']}s
Throughput: {metrics['throughput']:.1f} records/sec
Success Rate: {metrics['success_rate']:.1%}
Errors: {metrics['errors']}
"""

@task
def send_summary_email(summary: str, recipients: list) -> dict:
    """Send summary to stakeholders."""
    return {
        "status": "sent",
        "recipients": recipients,
        "timestamp": datetime.now().isoformat(),
    }

@flow
def success_summary_flow(recipients: list = None) -> dict:
    """Send success completion digest."""
    if recipients is None:
        recipients = ["team@example.com"]

    workflow_result = execute_workflow_steps()
    metrics = compute_metrics(result=workflow_result)
    summary = format_summary(metrics=metrics)
    email = send_summary_email(summary=summary, recipients=recipients)

    return {
        "summary": summary,
        "email_status": email["status"],
    }

# Local execution
if __name__ == "__main__":
    result = success_summary_flow.run_local()
    print(result["summary"])

Key Concepts:

  • Completion metrics
  • Summary formatting
  • Team notifications

45. Multi-Channel Alert

Send alerts to Slack and email simultaneously.

from dagy import flow, task
from datetime import datetime

@task
def detect_alert_condition(metric: float, threshold: float = 0.8) -> bool:
    """Check if alert condition is met."""
    return metric > threshold

@task
def format_alert_message(condition: bool, metric: float) -> str:
    """Format alert message."""
    if condition:
        return f"ALERT: Metric {metric:.2f} exceeds threshold"
    return f"INFO: Metric {metric:.2f} is normal"

@task
def send_slack_notification(message: str, channel: str = "#alerts") -> dict:
    """Send alert to Slack."""
    return {
        "channel": channel,
        "message": message,
        "status": "sent",
        "timestamp": datetime.now().isoformat(),
    }

@task
def send_email_notification(message: str, recipients: list) -> dict:
    """Send alert via email."""
    return {
        "recipients": recipients,
        "subject": "System Alert",
        "body": message,
        "status": "sent",
    }

@task
def aggregate_notifications(slack: dict, email: dict) -> dict:
    """Aggregate notification results."""
    return {
        "channels_notified": ["slack", "email"],
        "slack": slack,
        "email": email,
        "all_sent": slack["status"] == "sent" and email["status"] == "sent",
    }

@flow
def multi_channel_alert_flow(
    metric: float,
    threshold: float = 0.8,
    recipients: list = None
) -> dict:
    """Send alerts to multiple channels."""
    if recipients is None:
        recipients = ["ops@example.com"]

    has_alert = detect_alert_condition(metric=metric, threshold=threshold)
    message = format_alert_message(condition=has_alert, metric=metric)

    slack_result = send_slack_notification(message=message, channel="#alerts")
    email_result = send_email_notification(message=message, recipients=recipients)
    aggregated = aggregate_notifications(slack=slack_result, email=email_result)

    return aggregated

# Local execution
if __name__ == "__main__":
    result = multi_channel_alert_flow.run_local(metric=0.95, threshold=0.8)
    print(f"All sent: {result['all_sent']}")

Key Concepts:

  • Multiple notification channels
  • Parallel alert dispatch
  • Delivery confirmation

Advanced Patterns

46. Parallel Processing

Execute multiple independent tasks in parallel.

from dagy import flow, task
from typing import List

@task
def process_region_a(data: list) -> dict:
    """Process data for region A."""
    return {
        "region": "A",
        "processed": len(data),
        "revenue": sum(d.get("value", 0) for d in data),
    }

@task
def process_region_b(data: list) -> dict:
    """Process data for region B."""
    return {
        "region": "B",
        "processed": len(data),
        "revenue": sum(d.get("value", 0) for d in data),
    }

@task
def process_region_c(data: list) -> dict:
    """Process data for region C."""
    return {
        "region": "C",
        "processed": len(data),
        "revenue": sum(d.get("value", 0) for d in data),
    }

@task
def aggregate_regional_results(
    region_a: dict,
    region_b: dict,
    region_c: dict
) -> dict:
    """Combine results from all regions."""
    all_results = [region_a, region_b, region_c]

    return {
        "total_processed": sum(r["processed"] for r in all_results),
        "total_revenue": sum(r["revenue"] for r in all_results),
        "by_region": all_results,
    }

@flow
def parallel_processing_flow() -> dict:
    """Process multiple regions in parallel."""
    region_data = [
        {"region": "A", "data": [{"value": 100}, {"value": 200}]},
        {"region": "B", "data": [{"value": 150}, {"value": 250}]},
        {"region": "C", "data": [{"value": 120}, {"value": 180}]},
    ]

    # Execute tasks in parallel
    result_a = process_region_a(data=region_data[0]["data"])
    result_b = process_region_b(data=region_data[1]["data"])
    result_c = process_region_c(data=region_data[2]["data"])

    aggregated = aggregate_regional_results(
        region_a=result_a,
        region_b=result_b,
        region_c=result_c
    )

    return aggregated

# Local execution with parallel workers
if __name__ == "__main__":
    result = parallel_processing_flow.run_local(max_workers=3)
    print(f"Total revenue: ${result['total_revenue']}")

Key Concepts:

  • Independent task execution
  • max_workers for parallelism
  • Result aggregation from parallel paths

47. Diamond Dependency

A→B,C→D pattern with multiple dependencies.

from dagy import flow, task

@task
def fetch_user_data(user_id: str) -> dict:
    """Fetch initial user data."""
    return {
        "user_id": user_id,
        "name": "Alice",
        "account_id": "acct_123",
    }

@task
def enrich_with_profile(user: dict) -> dict:
    """Enrich with profile information."""
    return {
        **user,
        "profile": {
            "tier": "premium",
            "created": "2024-01-01",
        }
    }

@task
def enrich_with_activity(user: dict) -> dict:
    """Enrich with activity information."""
    return {
        **user,
        "activity": {
            "last_login": "2024-01-15",
            "login_count": 42,
        }
    }

@task
def merge_enriched_data(profile_enriched: dict, activity_enriched: dict) -> dict:
    """Merge both enrichments."""
    return {
        "user_id": profile_enriched["user_id"],
        "name": profile_enriched["name"],
        "profile": profile_enriched.get("profile"),
        "activity": activity_enriched.get("activity"),
        "complete": True,
    }

@flow
def diamond_dependency_flow(user_id: str) -> dict:
    """Execute diamond dependency pattern: A→B,C→D."""
    # A: Fetch base data
    user = fetch_user_data(user_id=user_id)

    # B and C: Parallel enrichment (both depend on A)
    profile = enrich_with_profile(user=user)
    activity = enrich_with_activity(user=user)

    # D: Merge results (depends on B and C)
    merged = merge_enriched_data(profile_enriched=profile, activity_enriched=activity)

    return merged

# Local execution
if __name__ == "__main__":
    result = diamond_dependency_flow.run_local(user_id="usr_123")
    print(result)

Key Concepts:

  • Multiple dependency paths
  • Parallel middle stages
  • Convergence at final stage

48. Dynamic Parameters

Compute parameters at runtime for subsequent tasks.

from dagy import flow, task
import os

@task
def determine_execution_environment() -> dict:
    """Determine runtime environment and parameters."""
    env = os.environ.get("ENV", "dev")

    config = {
        "dev": {"batch_size": 10, "timeout": 30, "retry_count": 3},
        "staging": {"batch_size": 100, "timeout": 60, "retry_count": 2},
        "prod": {"batch_size": 1000, "timeout": 120, "retry_count": 1},
    }

    return config.get(env, config["dev"])

@task
def fetch_data_with_config(config: dict) -> list:
    """Fetch data using dynamic configuration."""
    batch_size = config["batch_size"]
    return [{"id": i, "value": i * 10} for i in range(batch_size)]

@task
def process_batch(data: list, timeout: int) -> dict:
    """Process batch with computed timeout."""
    return {
        "processed": len(data),
        "timeout_seconds": timeout,
        "status": "success",
    }

@flow
def dynamic_parameters_flow() -> dict:
    """Use runtime-computed parameters."""
    config = determine_execution_environment()
    data = fetch_data_with_config(config=config)
    result = process_batch(data=data, timeout=config["timeout"])

    return {
        "config": config,
        "result": result,
    }

# Local execution
if __name__ == "__main__":
    result = dynamic_parameters_flow.run_local()
    print(f"Configuration: {result['config']}")

Key Concepts:

  • Runtime parameter computation
  • Environment-based configuration
  • Dynamic task behavior

49. Checkpointing

Save intermediate state for recovery.

from dagy import flow, task
import json

@task
def load_checkpoint(checkpoint_file: str) -> dict:
    """Load previous checkpoint if exists."""
    # Simulated checkpoint loading
    return {
        "checkpoint_exists": False,
        "last_processed_id": 0,
        "file": checkpoint_file,
    }

@task
def process_with_checkpoint(
    start_id: int,
    batch_size: int = 5
) -> dict:
    """Process data starting from checkpoint."""
    data = [{"id": i} for i in range(start_id, start_id + batch_size)]

    return {
        "processed_ids": [d["id"] for d in data],
        "last_id": start_id + batch_size - 1,
        "batch_size": len(data),
    }

@task
def save_checkpoint(progress: dict, checkpoint_file: str) -> str:
    """Save progress checkpoint."""
    checkpoint = {
        "last_processed_id": progress["last_id"],
        "timestamp": __import__("datetime").datetime.now().isoformat(),
    }

    # Simulated save
    return f"Checkpoint saved: {checkpoint}"

@task
def resume_processing(progress: dict) -> dict:
    """Resume processing from checkpoint."""
    return {
        "resumed_from_id": progress["last_id"] + 1,
        "status": "resumed",
    }

@flow
def checkpointing_flow(checkpoint_file: str = "/checkpoint/state.json") -> dict:
    """Flow with checkpointing for recovery."""
    checkpoint = load_checkpoint(checkpoint_file=checkpoint_file)
    start_id = checkpoint.get("last_processed_id", 0)

    progress = process_with_checkpoint(start_id=start_id, batch_size=5)
    saved = save_checkpoint(progress=progress, checkpoint_file=checkpoint_file)
    resumed = resume_processing(progress=progress)

    return {
        "progress": progress,
        "checkpoint": saved,
        "next_step": resumed,
    }

# Local execution
if __name__ == "__main__":
    result = checkpointing_flow.run_local()
    print(result)

Key Concepts:

  • State persistence
  • Recovery from checkpoints
  • Progress tracking

50. Idempotent Tasks

Design tasks safe to re-run without side effects.

from dagy import flow, task
import hashlib

@task
def generate_idempotent_key(input_data: dict) -> str:
    """Generate deterministic key from input."""
    data_str = json.dumps(input_data, sort_keys=True)
    return hashlib.sha256(data_str.encode()).hexdigest()

@task
def execute_idempotent_operation(
    operation_id: str,
    operation_data: dict
) -> dict:
    """Execute operation with idempotency check."""
    # Simulated operation
    return {
        "operation_id": operation_id,
        "data": operation_data,
        "result": "processed",
        "timestamp": __import__("datetime").datetime.now().isoformat(),
    }

@task
def check_operation_exists(operation_id: str) -> bool:
    """Check if operation was already executed."""
    # Simulated database lookup
    return False  # Not executed yet

@task
def store_operation_result(operation: dict) -> str:
    """Store result for idempotency tracking."""
    return f"Operation {operation['operation_id']} stored"

@flow
def idempotent_flow(input_data: dict) -> dict:
    """Execute operation with idempotency guarantees."""
    import json

    operation_id = generate_idempotent_key(input_data=input_data)

    # Check if already executed
    exists = check_operation_exists(operation_id=operation_id)

    if exists:
        return {"status": "skipped", "reason": "already_executed", "operation_id": operation_id}

    # Execute and store
    result = execute_idempotent_operation(
        operation_id=operation_id,
        operation_data=input_data
    )
    stored = store_operation_result(operation=result)

    return {
        "operation_id": operation_id,
        "result": result,
        "storage": stored,
    }

# Local execution
if __name__ == "__main__":
    import json

    input_data = {"user_id": "usr_123", "action": "create_account"}
    result = idempotent_flow.run_local(input_data=input_data)
    print(result)

Key Concepts:

  • Deterministic operation keys
  • Duplicate detection
  • Safe re-execution

51. Resource-Intensive Flow

Configure flow for resource-intensive operations.

from dagy import flow, task

@task(
    concurrency_limit=2,  # Limit parallel execution
    timeout_seconds=300,
)
def heavy_computation(data_size: int) -> dict:
    """CPU-intensive task."""
    # Simulated heavy computation
    result_sum = sum(range(data_size))

    return {
        "data_size": data_size,
        "result": result_sum,
        "computation_complete": True,
    }

@task
def aggregate_results(results: list) -> dict:
    """Aggregate computation results."""
    return {
        "total_computations": len(results),
        "combined_result": sum(r["result"] for r in results),
    }

@flow
def resource_intensive_flow(sizes: list = None) -> dict:
    """Execute resource-intensive operations with limits."""
    if sizes is None:
        sizes = [1000000, 2000000, 1500000]

    # Execute with concurrency limit
    results = [heavy_computation(data_size=size) for size in sizes]
    aggregated = aggregate_results(results=results)

    return aggregated

# Local execution with limited workers
if __name__ == "__main__":
    result = resource_intensive_flow.run_local(max_workers=2)
    print(f"Total: {result['combined_result']}")

Key Concepts:

  • concurrency_limit for resource control
  • Timeout for long operations
  • Worker limiting

52. Long-Running Flow

Design flows for operations that take hours.

from dagy import flow, task
import time
from datetime import datetime, timedelta

@task
def start_long_operation(operation_id: str) -> dict:
    """Initiate long-running operation."""
    return {
        "operation_id": operation_id,
        "started_at": datetime.now().isoformat(),
        "estimated_duration_hours": 2,
    }

@task
def poll_operation_status(operation_id: str, max_polls: int = 120) -> dict:
    """Poll operation status with retries."""
    # Simulated polling
    return {
        "operation_id": operation_id,
        "status": "completed",
        "completed_at": datetime.now().isoformat(),
        "total_polls": 5,
    }

@task
def collect_long_operation_results(operation: dict) -> dict:
    """Collect results from completed operation."""
    return {
        "operation_id": operation["operation_id"],
        "result_count": 5000,
        "success_rate": 0.99,
        "errors": 50,
    }

@flow
def long_running_flow(operation_id: str) -> dict:
    """Execute long-running asynchronous operation."""
    start = start_long_operation(operation_id=operation_id)
    status = poll_operation_status(operation_id=operation_id)
    results = collect_long_operation_results(operation=status)

    duration = (
        datetime.fromisoformat(status["completed_at"]) -
        datetime.fromisoformat(start["started_at"])
    )

    return {
        "operation_id": operation_id,
        "duration_seconds": duration.total_seconds(),
        "results": results,
    }

# Local execution
if __name__ == "__main__":
    result = long_running_flow.run_local(operation_id="op_20240101_001")
    print(f"Duration: {result['duration_seconds']:.1f}s")
    print(f"Results: {result['results']['result_count']}")

Key Concepts:

  • Polling pattern for async operations
  • Status tracking
  • Long-duration timeout configuration

Real-World Scenarios

53. E-Commerce Order Pipeline

Complete order processing from order to fulfillment.

from dagy import flow, task
from typing import Dict

@task
def create_order(user_id: str, items: list, total: float) -> dict:
    """Create new order in system."""
    return {
        "order_id": f"ord_{user_id}_{__import__('datetime').datetime.now().isoformat().replace(':', '').replace('-', '')}",
        "user_id": user_id,
        "items": items,
        "total": total,
        "status": "created",
    }

@task
def process_payment(order: dict, payment_method: str) -> dict:
    """Process payment for order."""
    return {
        "order_id": order["order_id"],
        "amount": order["total"],
        "payment_method": payment_method,
        "transaction_id": f"txn_{order['order_id']}",
        "status": "completed",
    }

@task
def update_inventory(order: dict) -> dict:
    """Update inventory for ordered items."""
    return {
        "order_id": order["order_id"],
        "items_reserved": len(order["items"]),
        "status": "reserved",
    }

@task
def create_fulfillment(order: dict, inventory_update: dict) -> dict:
    """Create fulfillment for order."""
    return {
        "order_id": order["order_id"],
        "fulfillment_id": f"ful_{order['order_id']}",
        "status": "pending",
        "shipping_address": "123 Main St",
    }

@task
def send_order_confirmation(order: dict, payment: dict) -> str:
    """Send confirmation email to customer."""
    return f"Confirmation email sent to user {order['user_id']}"

@flow
def ecommerce_order_flow(
    user_id: str,
    items: list,
    total: float,
    payment_method: str = "credit_card",
) -> dict:
    """Complete order pipeline: create → pay → inventory → fulfill → notify."""
    order = create_order(user_id=user_id, items=items, total=total)
    payment = process_payment(order=order, payment_method=payment_method)
    inventory = update_inventory(order=order)
    fulfillment = create_fulfillment(order=order, inventory_update=inventory)
    confirmation = send_order_confirmation(order=order, payment=payment)

    return {
        "order": order,
        "payment": payment,
        "fulfillment": fulfillment,
        "confirmation": confirmation,
    }

# Local execution
if __name__ == "__main__":
    items = [
        {"sku": "WIDGET-001", "qty": 2, "price": 29.99},
        {"sku": "GADGET-001", "qty": 1, "price": 49.99},
    ]
    total = (2 * 29.99) + (1 * 49.99)

    result = ecommerce_order_flow.run_local(
        user_id="usr_123",
        items=items,
        total=total,
        payment_method="credit_card"
    )
    print(f"Order: {result['order']['order_id']}")
    print(f"Status: Payment {result['payment']['status']}")

Key Concepts:

  • Multi-stage order processing
  • Payment orchestration
  • Cross-system coordination

54. CI/CD Pipeline

Build, test, deploy, verify automation.

from dagy import flow, task

@task
def checkout_code(repo: str, branch: str) -> dict:
    """Check out source code from repository."""
    return {
        "repo": repo,
        "branch": branch,
        "commit": "abc123def456",
        "status": "checked_out",
    }

@task
def build_application(code: dict) -> dict:
    """Build application artifacts."""
    return {
        "build_id": f"build_{code['commit'][:7]}",
        "artifact": "app-1.0.0.jar",
        "status": "success",
        "duration_seconds": 45,
    }

@task
def run_tests(build: dict) -> dict:
    """Run automated tests."""
    return {
        "build_id": build["build_id"],
        "tests_run": 250,
        "tests_passed": 248,
        "tests_failed": 2,
        "coverage": 0.87,
    }

@task(
    retries=2,
    timeout_seconds=300,
)
def deploy_to_staging(build: dict) -> dict:
    """Deploy to staging environment."""
    return {
        "build_id": build["build_id"],
        "environment": "staging",
        "url": "https://staging-app.example.com",
        "status": "deployed",
    }

@task
def run_smoke_tests(deployment: dict) -> bool:
    """Run smoke tests on deployed app."""
    return True  # All tests passed

@task
def deploy_to_production(build: dict) -> dict:
    """Deploy to production if all checks pass."""
    return {
        "build_id": build["build_id"],
        "environment": "production",
        "url": "https://app.example.com",
        "status": "deployed",
        "deployment_time": __import__("datetime").datetime.now().isoformat(),
    }

@flow
def cicd_pipeline_flow(repo: str, branch: str) -> dict:
    """Complete CI/CD: checkout → build → test → stage → verify → prod."""
    code = checkout_code(repo=repo, branch=branch)
    build = build_application(code=code)
    tests = run_tests(build=build)

    if tests["tests_failed"] > 0:
        return {
            "status": "failed",
            "reason": f"{tests['tests_failed']} test failures",
            "build_id": build["build_id"],
        }

    staging = deploy_to_staging(build=build)
    smoke_ok = run_smoke_tests(deployment=staging)

    if smoke_ok:
        prod = deploy_to_production(build=build)
        return {
            "status": "success",
            "build_id": build["build_id"],
            "staging": staging,
            "production": prod,
        }

    return {"status": "smoke_tests_failed"}

# Local execution
if __name__ == "__main__":
    result = cicd_pipeline_flow.run_local(
        repo="https://github.com/example/app.git",
        branch="main"
    )
    print(f"Status: {result['status']}")

Key Concepts:

  • Sequential build stages
  • Test-driven progression
  • Environment-based deployment

55. Customer Onboarding

Automated customer setup workflow.

from dagy import flow, task
from datetime import datetime

@task
def validate_signup(email: str, name: str) -> dict:
    """Validate customer signup data."""
    return {
        "email": email,
        "name": name,
        "valid": "@" in email and len(name) > 2,
    }

@task
def create_customer_account(validation: dict) -> dict:
    """Create account in system."""
    customer_id = f"cust_{int(datetime.now().timestamp())}"

    return {
        "customer_id": customer_id,
        "email": validation["email"],
        "name": validation["name"],
        "created_at": datetime.now().isoformat(),
        "status": "active",
    }

@task
def configure_account_settings(customer: dict) -> dict:
    """Configure default account settings."""
    return {
        "customer_id": customer["customer_id"],
        "settings": {
            "plan": "free",
            "notifications": True,
            "api_enabled": True,
        },
        "status": "configured",
    }

@task
def provision_resources(customer: dict, settings: dict) -> dict:
    """Provision API keys, storage, etc."""
    return {
        "customer_id": customer["customer_id"],
        "api_key": f"sk_{customer['customer_id']}",
        "storage_gb": 1,
        "status": "provisioned",
    }

@task
def send_welcome_email(customer: dict, resources: dict) -> str:
    """Send welcome email to new customer."""
    return f"Welcome email sent to {customer['email']}"

@task
def schedule_onboarding_call(customer: dict) -> dict:
    """Schedule initial onboarding call."""
    return {
        "customer_id": customer["customer_id"],
        "call_scheduled": True,
        "date": "2024-01-10T10:00:00Z",
        "duration_minutes": 30,
    }

@flow
def customer_onboarding_flow(
    email: str,
    name: str,
) -> dict:
    """Complete customer onboarding workflow."""
    validation = validate_signup(email=email, name=name)

    if not validation["valid"]:
        return {"status": "validation_failed", "reason": "Invalid input"}

    customer = create_customer_account(validation=validation)
    settings = configure_account_settings(customer=customer)
    resources = provision_resources(customer=customer, settings=settings)
    welcome = send_welcome_email(customer=customer, resources=resources)
    call = schedule_onboarding_call(customer=customer)

    return {
        "customer": customer,
        "resources": resources,
        "welcome_email": welcome,
        "onboarding_call": call,
        "status": "onboarded",
    }

# Local execution
if __name__ == "__main__":
    result = customer_onboarding_flow.run_local(
        email="newcustomer@example.com",
        name="Jane Doe"
    )
    print(f"Customer ID: {result['customer']['customer_id']}")
    print(f"Status: {result['status']}")

Key Concepts:

  • Multi-stage customer activation
  • Resource provisioning
  • Automated communications

56. Financial Reconciliation

Match transactions and report discrepancies.

from dagy import flow, task
from typing import List

@task
def fetch_bank_transactions(account: str, date_range: str) -> List[dict]:
    """Fetch transactions from bank."""
    return [
        {"id": "bank_001", "amount": 100.00, "date": "2024-01-01", "description": "Payment"},
        {"id": "bank_002", "amount": 250.50, "date": "2024-01-02", "description": "Deposit"},
    ]

@task
def fetch_internal_transactions(account: str, date_range: str) -> List[dict]:
    """Fetch transactions from internal ledger."""
    return [
        {"id": "int_001", "amount": 100.00, "date": "2024-01-01", "description": "Payment"},
        {"id": "int_002", "amount": 250.50, "date": "2024-01-02", "description": "Deposit"},
        {"id": "int_003", "amount": 50.00, "date": "2024-01-02", "description": "Fee"},
    ]

@task
def match_transactions(bank: List[dict], internal: List[dict]) -> dict:
    """Match bank to internal transactions."""
    matched = []
    unmatched_bank = bank.copy()
    unmatched_internal = internal.copy()

    for ext in bank:
        for i, int_txn in enumerate(unmatched_internal):
            if ext["amount"] == int_txn["amount"]:
                matched.append({"bank": ext, "internal": int_txn})
                unmatched_internal.pop(i)
                unmatched_bank.remove(ext)
                break

    return {
        "matched": len(matched),
        "unmatched_bank": unmatched_bank,
        "unmatched_internal": unmatched_internal,
    }

@task
def generate_reconciliation_report(match_result: dict) -> dict:
    """Generate reconciliation report."""
    return {
        "matched_count": match_result["matched"],
        "discrepancies": len(match_result["unmatched_bank"]) + len(match_result["unmatched_internal"]),
        "reconciliation_status": "balanced" if len(match_result["unmatched_bank"]) == 0 else "discrepancies_found",
        "unmatched_transactions": {
            "from_bank": match_result["unmatched_bank"],
            "from_internal": match_result["unmatched_internal"],
        }
    }

@flow
def reconciliation_flow(account: str, date_range: str) -> dict:
    """Reconcile bank and internal transactions."""
    bank_txns = fetch_bank_transactions(account=account, date_range=date_range)
    internal_txns = fetch_internal_transactions(account=account, date_range=date_range)
    matches = match_transactions(bank=bank_txns, internal=internal_txns)
    report = generate_reconciliation_report(match_result=matches)

    return report

# Local execution
if __name__ == "__main__":
    result = reconciliation_flow.run_local(
        account="ACC123",
        date_range="2024-01-01:2024-01-31"
    )
    print(f"Status: {result['reconciliation_status']}")
    print(f"Discrepancies: {result['discrepancies']}")

Key Concepts:

  • Transaction matching algorithm
  • Discrepancy detection
  • Financial reporting

57. Content Publishing Pipeline

Draft → review → transform → publish workflow.

from dagy import flow, task
from datetime import datetime

@task
def fetch_draft_content(content_id: str) -> dict:
    """Fetch draft content."""
    return {
        "content_id": content_id,
        "title": "Sample Article",
        "body": "This is the article content...",
        "status": "draft",
        "created_by": "author@example.com",
    }

@task
def perform_content_review(content: dict) -> dict:
    """Review content for quality."""
    issues = []

    if len(content["title"]) < 5:
        issues.append("Title too short")
    if len(content["body"]) < 100:
        issues.append("Content too short")

    return {
        "content_id": content["content_id"],
        "approved": len(issues) == 0,
        "issues": issues,
    }

@task
def transform_for_publication(content: dict) -> dict:
    """Transform content to publication format."""
    return {
        "content_id": content["content_id"],
        "title": content["title"],
        "body": content["body"],
        "slug": content["title"].lower().replace(" ", "-"),
        "html": f"<article><h1>{content['title']}</h1><p>{content['body']}</p></article>",
        "publication_ready": True,
    }

@task
def publish_to_website(transformed: dict) -> dict:
    """Publish to website."""
    return {
        "content_id": transformed["content_id"],
        "url": f"https://blog.example.com/{transformed['slug']}",
        "published_at": datetime.now().isoformat(),
        "status": "published",
    }

@flow
def publishing_pipeline_flow(content_id: str) -> dict:
    """Content publishing: draft → review → transform → publish."""
    content = fetch_draft_content(content_id=content_id)
    review = perform_content_review(content=content)

    if not review["approved"]:
        return {
            "status": "rejected",
            "content_id": content_id,
            "issues": review["issues"],
        }

    transformed = transform_for_publication(content=content)
    published = publish_to_website(transformed=transformed)

    return {
        "status": "published",
        "content_id": content_id,
        "url": published["url"],
    }

# Local execution
if __name__ == "__main__":
    result = publishing_pipeline_flow.run_local(content_id="content_001")
    print(f"Status: {result['status']}")

Key Concepts:

  • Multi-stage approval workflow
  • Content transformation
  • Publication orchestration

Best Practices

General Guidelines

  1. Task Design: Keep tasks focused and single-responsibility
  2. Error Handling: Use retry strategies and timeout configurations
  3. Data Flow: Leverage automatic parameter passing between tasks
  4. Testing: Always test flows locally before deployment with run_local()
  5. Monitoring: Implement alerts and logging for production flows
  6. Documentation: Add docstrings to all tasks and flows
  7. Configuration: Use parameters for environment-specific behavior
  8. Performance: Utilize max_workers for parallel task execution
  9. Resilience: Implement checkpoints for long-running operations
  10. Idempotency: Design tasks to be safely re-executable

Deployment

from dagy import build_artifact, deploy_artifact

# Build artifact for deployment
artifact = build_artifact(your_flow_instance)

# Deploy to Dagy platform
client = DagyClient(api_key="your-key", api_url="https://api.dagy.io")
deployment = deploy_artifact(artifact, client)

print(f"Deployed flow ID: {deployment['flow_id']}")

Local Testing Pattern

# Test with different parameters
test_cases = [
    {"param1": "value1", "param2": "value2"},
    {"param1": "value3", "param2": "value4"},
]

for test_case in test_cases:
    result = your_flow.run_local(**test_case, fail_fast=True)
    assert result["status"] == "success"
    print(f"Test passed: {test_case}")

Additional Resources


Last Updated: January 2024 SDK Version: 1.0+ Cookbook Version: 2.0