Namespaces, Versioning & Tags
Three metadata dimensions organize flows across teams, environments, and release cycles.
| Dimension | Type | Mutable | Default | Where set |
|---|---|---|---|---|
| Namespace | str | No (set at registration) | CWD relative to git root | @flow → deploy(), CLI --namespace |
| Version | str | No (immutable per registration) | "latest" | @flow(version=...), CLI --flow-version |
| Tags | Dict[str, str] | Yes (via re-deploy / API) | {} | deploy(tags=...), API body |
Namespaces
A namespace is a hierarchical string that groups related flows. It maps naturally to directory structure, team boundaries, or domain areas.
How namespaces are derived
When you call flow.deploy() or dagy deploy, the namespace is resolved in this order:
- Explicit value: if you pass
namespace="data/ingestion", that value is used as-is. - Auto-derived: if omitted, Dagy computes the namespace from the source file's directory path relative to the current working directory.
# File: pipelines/data/ingestion/load_orders.py
# CWD: pipelines/
result = my_flow.deploy()
# namespace → "data/ingestion" (auto-derived from CWD-relative path)
Auto-derivation logic (src/dagy/core/flow.py):
src = Path(inspect.getfile(fn)).resolve()
rel = src.parent.relative_to(Path.cwd())
namespace = str(rel) if str(rel) != "." else ""
If the source file is at the CWD root, namespace is an empty string.
Setting namespaces
Python SDK:
result = my_flow.deploy(
name="load-orders",
namespace="data/ingestion",
)
CLI:
dagy deploy ./dist/artifact.zip \
--deployment load-orders \
--namespace "data/ingestion"
API (POST /flows):
{
"flow_spec": { "name": "load_orders", "version": "1.0.0", "..." : "..." },
"namespace": "data/ingestion",
"deployment_name": "load-orders"
}
Namespace conventions
| Pattern | Example | Use case |
|---|---|---|
team/project | data-eng/etl | Team-scoped ownership |
domain/subdomain | billing/invoices | Domain-driven design |
environment | staging | Environment separation |
| Flat | "" (empty) | Small projects, single team |
Namespaces are stored with the flow record and surfaced as filterable chips in the Flows UI. Clicking a namespace chip filters the grid to matching flows.
API Fields
| Model | Field |
|---|---|
FlowRegisterRequest | namespace: Optional[str] |
FlowListItem | namespace: Optional[str] |
FlowDetailResponse | namespace: Optional[str] |
Versioning
Every flow registration is identified by a composite key: flow_name + flow_version.
Version format
Versions are free-form strings. Dagy does not enforce semver, but these are the common patterns:
| Strategy | Example | When to use |
|---|---|---|
"latest" | my_flow:latest | Development, CI/CD auto-deploy (default) |
| Semantic | "1.0.0", "2.3.1" | Production releases with explicit rollback points |
| Build hash | "abc123" | Immutable builds tied to commit SHAs |
| Date-based | "2026-03-08" | Nightly builds |
How versions flow through the system
@flow(version="1.0.0") ← declared in code
│
▼
flow.build() ← baked into FlowSpec.version
│
▼
flow.deploy() ← uploaded as flow_version="1.0.0"
│ (or "latest" if not specified in @flow)
▼
Flow(flow_name, ← persisted
flow_version)
│
▼
Deployment ← deployment pins to a specific flow_version
│
▼
Run.flow_version ← each run records which version it executed
Setting the version
In the @flow decorator:
@flow(name="my_pipeline", version="1.0.0")
def my_pipeline():
...
The version field is embedded in FlowSpec at build time. If omitted, defaults to "latest".
At deploy time (CLI):
dagy deploy ./dist/artifact.zip \
--deployment my-pipeline \
--flow-name my_pipeline \
--flow-version 1.0.0
!!! note
The CLI currently hardcodes flow_version="latest" when auto-detecting from the artifact. Use --flow-version to override with an explicit version.
Triggering a specific version:
# Python client
client.trigger_run({
"flow_name": "my_pipeline",
"flow_version": "1.0.0",
"parameters": {"source": "s3://bucket/data.csv"},
})
# API
curl -X POST https://api.dagy.io/v1/runs \
-H "Authorization: Bearer $TOKEN" \
-d '{
"flow_name": "my_pipeline",
"flow_version": "1.0.0",
"parameters": {}
}'
Version immutability
A flow_name:flow_version pair is immutable once registered. Re-deploying the same pair overwrites the artifact and spec (the record is upserted), but the version string itself cannot be changed after creation. To release a new version of a flow, register it under a new flow_version.
Deployment version counter
Each deployment also tracks a deployment_version (integer), which auto-increments on every re-deploy. This is separate from flow_version and represents how many times a deployment has been updated, useful for audit trails.
Code hash (change detection)
Every artifact built by dagy build or flow.deploy() includes a code_hash in its metadata.json. The hash is computed as:
SHA256( canonical_json(flow_spec, sort_keys=True) + "|" + source_file_hash )
This captures both DAG structure changes (tasks, edges, timeouts, retries) and source code changes (the Python file that defines the flow).
When deploying, the CLI and SDK compare the local code_hash against the remote code_hash of the currently deployed version. If they match, the deployment is skipped with a message. Use --force (CLI) or force=True (SDK) to override.
| Scenario | Behavior |
|---|---|
| First deploy (no remote hash) | Deploys normally |
| Code or spec changed | Deploys normally |
| Nothing changed | Skipped (unless --force) |
Remote flow has no code_hash (legacy) | Deploys normally |
API Fields
| Model | Field | Role |
|---|---|---|
FlowSpecModel.version | str | Embedded in spec JSON |
FlowRegisterResponse.flow_version | str | Returned after registration |
RunResponse.flow_version | str | Returned with run status |
FlowModel.code_hash | str (optional) | SHA-256 of flow spec + source code |
Tags
Tags are arbitrary key-value pairs (Dict[str, str]) attached to flows, deployments, and DAG drafts. They provide a flexible, non-hierarchical labeling system for filtering, searching, and organizing resources.
Setting tags
Python SDK (deploy()):
result = my_flow.deploy(
name="load-orders-prod",
tags={
"team": "data-eng",
"env": "prod",
"cost-center": "analytics",
"criticality": "high",
},
)
API (POST /flows):
{
"flow_spec": { "..." : "..." },
"deployment_name": "load-orders-prod",
"tags": {
"team": "data-eng",
"env": "prod",
"cost-center": "analytics"
}
}
Deployments API (POST /deployments):
{
"name": "load-orders-prod",
"flow_name": "load_orders",
"flow_version": "1.0.0",
"tags": {
"team": "data-eng",
"env": "prod"
}
}
DAG Drafts (POST /drafts):
{
"name": "My Draft Pipeline",
"canvas_json": "{ ... }",
"tags": {
"project": "migration",
"status": "wip"
}
}
!!! info "CLI limitation"
The dagy deploy CLI does not currently expose a --tags flag. Use the Python SDK or API to set tags. Tags set via the SDK's deploy() method are passed through to the API.
Tag conventions
| Key | Example values | Purpose |
|---|---|---|
team | data-eng, ml-ops, platform | Ownership |
env | dev, staging, prod | Environment targeting |
cost-center | analytics, billing | Cost allocation |
criticality | high, medium, low | Operational priority |
project | migration-v2, q1-launch | Project tracking |
data-classification | pii, public, internal | Compliance |
Tags in the UI
In the Flows page, tags appear as clickable chips on each flow card. Clicking a tag chip filters the grid to show only flows with that tag. Multiple tag filters are combined with AND logic. Click team:data-eng and env:prod to see only production data-eng flows.
API Fields
API models that carry tags:
| Model | Field |
|---|---|
FlowRegisterRequest | tags: Optional[Dict[str, str]] |
FlowRegisterResponse | tags: Optional[Dict[str, str]] |
FlowListItem | tags: Optional[Dict[str, str]] |
FlowDetailResponse | tags: Optional[Dict[str, str]] |
DeploymentRequest | tags: Optional[Dict[str, str]] |
DagDraftCreateRequest | tags: Optional[Dict[str, str]] |
DagDraftUpdateRequest | tags: Optional[Dict[str, str]] |
DagDraftResponse | tags: Optional[Dict[str, str]] |
Putting it all together
A typical production setup combines all three dimensions:
from dagy import flow, task
@task
def extract(source: str) -> list:
...
@task
def transform(records: list) -> list:
...
@task
def load(records: list) -> dict:
...
@flow(name="order_etl", version="2.1.0")
def order_etl(source: str = "s3://data/orders/"):
raw = extract(source)
clean = transform(raw)
return load(clean)
# Deploy with full metadata
result = order_etl.deploy(
name="order-etl-prod",
namespace="data/orders",
tags={
"team": "data-eng",
"env": "prod",
"criticality": "high",
},
schedule="0 6 * * *",
)
This creates:
| Field | Value |
|---|---|
flow_name | order_etl |
flow_version | 2.1.0 |
deployment_name | order-etl-prod |
namespace | data/orders |
tags | {"team": "data-eng", "env": "prod", "criticality": "high"} |
In the Flows UI, this flow appears under the data/orders namespace group with three tag chips. Team members can filter by any combination to quickly find what they need.
API lookup patterns
| What you want | Endpoint | Key fields |
|---|---|---|
| Get a specific flow version | GET /flows/{flow_name}/{flow_version} | flow_name + flow_version |
| List all flows (with namespace/tags in response) | GET /flows | Paginated, includes namespace and tags |
| Trigger a run for a specific version | POST /runs | flow_name + flow_version in body |
| Trigger a run via deployment | POST /runs | deployment in body (version resolved from deployment) |
| List runs filtered by flow | GET /runs?flow_name=order_etl | Indexed query |