This guide is applicable to Dagster Cloud.
This guide details a workflow to test Dagster code in your cloud environment without impacting your production data. To highlight this functionality, we’ll leverage Dagster Cloud branch deployments and a Snowflake database to:
With these tools, we can merge changes with confidence in the impact on our data platform and with assurance that our code will execute as intended.
Here’s an overview of the main concepts we’ll be using:
This guide is an extension of the Transitioning Data Pipelines from Development to Production guide, illustrating a workflow for staging deployments. We’ll use the examples from this guide to build a workflow atop Dagster Cloud’s branch deployment feature.
To complete the steps in this guide, you'll need:
branch_deployments.yml
GitHub workflow filerepo
)We have a PRODUCTION
Snowflake database with a schema named HACKER_NEWS
. In our production cloud environment, we’d like to write tables to Snowflake containing subsets of Hacker News data. These tables will be:
ITEMS
- A table containing the entire datasetCOMMENTS
- A table containing data about commentsSTORIES
- A table containing data about storiesTo set up a branch deployment workflow to construct and test these tables, we will:
PRODUCTION_CLONE_<ID>
, where <ID>
is the pull request ID of the branch. Then we'll create a branch deployment and test our Hacker News assets against our newly cloned database.In production, we want to write three tables to Snowflake:ITEMS
, COMMENTS
, and STORIES
. We can define these tables as assets as follows:
# assets.py
import pandas as pd
import requests
from dagster import asset
@asset(
config_schema={"N": int},
io_manager_key="snowflake_io_manager",
)
def items(context) -> pd.DataFrame:
"""Items from the Hacker News API: each is a story or a comment on a story."""
rows = []
max_id = requests.get(
"https://hacker-news.firebaseio.com/v0/maxitem.json", timeout=5
).json()
# Hacker News API is 1-indexed, so adjust range by 1
for item_id in range(max_id - context.op_config["N"] + 1, max_id + 1):
item_url = f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
rows.append(requests.get(item_url, timeout=5).json())
# ITEM_FIELD_NAMES is a list of the column names in the Hacker News dataset
result = pd.DataFrame(rows, columns=ITEM_FIELD_NAMES).drop_duplicates(subset=["id"])
result.rename(columns={"by": "user_id"}, inplace=True)
return result
@asset(
io_manager_key="snowflake_io_manager",
)
def comments(items: pd.DataFrame) -> pd.DataFrame:
"""Comments from the Hacker News API."""
return items[items["type"] == "comment"]
@asset(
io_manager_key="snowflake_io_manager",
)
def stories(items: pd.DataFrame) -> pd.DataFrame:
"""Stories from the Hacker News API."""
return items[items["type"] == "story"]
As you can see, our assets use an I/O manager named snowflake_io_manager
. Using I/O managers and other resources allow us to swap out implementations per environment without modifying our business logic.
At runtime, we’d like to determine which environment our code is running in: branch deployment, or production. This information dictates how our code should execute, specifically with which credentials and with which database.
To ensure we can't accidentally write to production from within our branch deployment, we’ll use a different set of credentials from production and write to our database clone.
Dagster automatically sets certain environment variables containing deployment metadata, allowing us to read these environment variables to discern between deployments. We can access the DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT
environment variable to determine the currently executing environment.
Because we want to configure our assets to write to Snowflake using a different set of credentials and database in each environment, we’ll configure a separate I/O manager for each environment:
# repository.py
from dagster import repository, with_resources
from ..assets import comments, items, stories
snowflake_io_manager = build_snowflake_io_manager([SnowflakePandasTypeHandler()])
@repository
def repo():
snowflake_config = {
"account": "abc1234.us-east-1",
"user": "system@company.com",
"password": {"env": "SYSTEM_SNOWFLAKE_PASSWORD"},
"schema": "HACKER_NEWS",
}
resource_defs = {
"branch": {
"snowflake_io_manager": snowflake_io_manager.configured(
{
**snowflake_config,
"database": f"PRODUCTION_CLONE_{os.getenv('DAGSTER_CLOUD_PULL_REQUEST_ID')}",
}
),
},
"production": {
"snowflake_io_manager": snowflake_io_manager.configured(
{
**snowflake_config,
"database": "PRODUCTION",
}
),
},
}
def get_current_env():
is_branch_depl = os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT")
assert is_branch_depl != None # env var must be set
return "branch" if is_branch_depl else "prod"
return [
with_resources(
[items, comments, stories], resource_defs=resource_defs[get_current_env()]
),
]
Refer to the Dagster Cloud environment variables documentation for more info about available environment variables.
The branch_deployments.yml
workflow file from the in .github/workflows
defines a dagster_cloud_build_push
job that builds each branch deployment. In order to add this workflow file, it must be located in your project directory under .github/workflows
. We can modify this workflow file to add a custom step that launches a job within our repository that will clone the production database upon deployment.
We’ll first need to define a job that clones our PRODUCTION
database for each branch deployment. Later, in our GitHub actions workflow, we can trigger this job to run upon each redeploy. Each clone will be named PRODUCTION_CLONE_<ID>
with <ID>
representing the pull request ID, ensuring each branch deployment has a unique clone. This job will drop a database clone if it exists and then reclone from production, ensuring each redeployment has a fresh clone of PRODUCTION
:
@op(required_resource_keys={"snowflake"})
def drop_database_clone(context):
context.resources.snowflake.execute_query(
f"DROP DATABASE IF EXISTS PRODUCTION_CLONE_{os.environ['DAGSTER_CLOUD_PULL_REQUEST_ID']}"
)
@op(required_resource_keys={"snowflake"}, ins={"start": In(Nothing)})
def clone_production_database(context):
context.resources.snowflake.execute_query(
f"CREATE DATABASE PRODUCTION_CLONE_{os.environ['DAGSTER_CLOUD_PULL_REQUEST_ID']} CLONE \"PRODUCTION\""
)
@graph
def clone_prod():
clone_production_database(start=drop_database_clone())
We’ve defined drop_database_clone
and clone_production_database
to utilize the Snowflake resource. The Snowflake resource will use the same configuration as the Snowflake I/O manager to generate a connection to Snowflake. However, while our I/O manager writes outputs to Snowflake, the Snowflake resource executes queries against Snowflake.
We can then add the clone_prod
job to our repository, configuring it with the resources corresponding to the current environment. We can modify the resource mapping by environment in our repository as follows:
resource_defs = {
"branch": {
"snowflake_io_manager": snowflake_io_manager.configured(
{
**snowflake_config,
"database": f"PRODUCTION_CLONE_{os.getenv('DAGSTER_CLOUD_PULL_REQUEST_ID')}",
}
),
"snowflake": snowflake_resource.configured(
{
**snowflake_config,
"database": f"PRODUCTION_CLONE_{os.getenv('DAGSTER_CLOUD_PULL_REQUEST_ID')}",
}
),
},
"production": {
"snowflake_io_manager": snowflake_io_manager.configured(
{
**snowflake_config,
"database": "PRODUCTION",
}
),
"snowflake": snowflake_resource.configured(
{**snowflake_config, "database": "PRODUCTION"}
),
},
}
Then, we can add the clone_prod
job to our repository:
@repository
def repo():
...
branch_deployment_jobs = [
clone_prod.to_job(resource_defs=resource_defs[get_current_env()])
]
return [
with_resources(
[items, comments, stories], resource_defs=resource_defs[get_current_env()]
),
*(
branch_deployment_jobs
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT")
else []
),
]
The branch_deployments.yml
file located in .github/workflows/branch_deployments.yml
defines a dagster_cloud_build_push
job with a series of steps that launch a branch deployment. Because we want to queue a run of clone_prod
within each deployment after it launches, we'll add an additional step at the end dagster_cloud_build_push
. This job is triggered on multiple pull request events: opened
, synchronize
, reopen
, and closed
. This means that upon future pushes to the branch, we'll trigger a run of clone_prod
. The if
condition below ensures that clone_prod
will not be be run if the pull request is closed:
# .github/workflows/branch_deployments.yml
name: Dagster Branch Deployments
on:
pull_request:
types: [opened, synchronize, reopened, closed]
env:
DAGSTER_CLOUD_URL: ${{ secrets.DAGSTER_CLOUD_URL }}
jobs:
dagster_cloud_build_push:
runs-on: ubuntu-latest
name: Dagster Branch Deployments
strategy:
...
steps:
# Existing steps here
...
- name: Clone Snowflake schema upon launch
if: github.event.action != 'closed'
uses: dagster-io/cloud-branch-deployments-action/run@main
with:
location: ${{ toJson(matrix.location) }}
deployment: ${{ steps.deploy.outputs.deployment }}
repository: repo
job: clone_prod
env:
DAGSTER_CLOUD_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_API_TOKEN }}
Opening a pull request for our current branch will automatically kick off a branch deployment. After the deployment launches, we can confirm that the clone_prod
job has run:
Alternatively, the logs for the branch deployment workflow can be found in the "Actions" tab on the GitHub pull request.
We can also view our database in Snowflake to confirm that a clone exists for each branch deployment. When we materialize our assets within our branch deployment, we’ll now be writing to our clone of PRODUCTION
. Within Snowflake, we can run queries against this clone to confirm the validity of our data:
Now that we’ve confirmed that our assets materialize correctly, we can merge this branch! But first, we’ll configure our branch deployments to drop the schema clone upon closing our branch. We can add another job to our repository that reuses our drop_database_clone
op:
@graph
def drop_prod_clone():
drop_database_clone()
@repository
def repo():
...
branch_deployment_jobs = [
clone_prod.to_job(resource_defs=resource_defs[get_current_env()]),
drop_prod_clone.to_job(resource_defs=resource_defs[get_current_env()]),
]
return [
with_resources(
[items, comments, stories], resource_defs=resource_defs[get_current_env()]
),
*(
branch_deployment_jobs
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT")
else []
),
]
Then, we can add an additional step to our branch_deployments.yml
file that queues a run of our drop_prod_clone
job:
# .github/workflows/branch_deployments.yml
name: Dagster Branch Deployments
on:
pull_request:
types: [opened, synchronize, reopened, closed]
env:
DAGSTER_CLOUD_URL: ${{ secrets.DAGSTER_CLOUD_URL }}
jobs:
dagster_cloud_build_push:
runs-on: ubuntu-latest
name: Dagster Branch Deployments
strategy:
...
steps:
# Existing steps here
...
- name: Clone Snowflake schema upon launch
...
- name: Delete schema clone upon PR close
if: github.event.action == 'closed'
uses: dagster-io/cloud-branch-deployments-action/run@main
with:
location: ${{ toJson(matrix.location) }}
deployment: ${{ steps.deploy.outputs.deployment }}
repository: repo
job: drop_prod_clone
env:
DAGSTER_CLOUD_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_API_TOKEN }}
After merging our branch, viewing our Snowflake database will confirm that our branch deployment step has successfully deleted our database clone.
We’ve now built an elegant workflow that enables future branch deployments to automatically have access to their own clones of our production database that are cleaned up upon merge!