Dagster orchestrates dbt alongside other technologies, so you can combine dbt with Spark, Python, etc. in a single workflow. Dagster's software-defined asset (SDA) abstractions make it simple to define data assets that depend on specific dbt models, or to define the computation required to compute the sources that your dbt models depend on.
This guide focuses on how to work with dbt models through the SDA framework, which we recommend for most use cases.
A software-defined asset contains an asset key, a set of upstream asset keys, and an operation that is responsible for computing the asset from its upstream dependencies. Models defined in a dbt project are conceptually similar to Dagster's software-defined assets:
ref
or source
calls within the model's definition.These similarities make it natural to interact with dbt models as SDAs. Dagster has built-in support for loading dbt models, seeds, and snapshots as SDAs. This allows you to:
For smaller dbt projects, where compilation time is not a concern, the simplest way to load your dbt assets into Dagster is the following:
from dagster_dbt import load_assets_from_dbt_project
dbt_assets = load_assets_from_dbt_project(project_dir="path/to/dbt/project")
The load_assets_from_dbt_project
function:
For larger projects, the overhead involved with recompiling the entire project may be a concern. In these cases, you can load dbt models from an existing dbt manifest.json
file:
import json
from dagster_dbt import load_assets_from_dbt_manifest
dbt_assets = load_assets_from_dbt_manifest(
json.load("path/to/dbt/manifest.json", encoding="utf8"),
)
Note: if you make any changes to your dbt project that change the structure of the project (such as changing the dependencies of a model or adding a new one), you'll need to regenerate your manifest file for those changes to be reflected in Dagster.
Assets loaded from dbt require a dbt resource, which is responsible for firing off dbt CLI commands. This resource will be responsible for firing off dbt CLI commands. The dagster-dbt
integration provides the dbt_cli_resource
for this purpose. This resource can be configured with CLI flags that will be passed into every dbt invocation.
The most important flag to set is the project_dir
flag, which points Dagster at the directory of your dbt project. For a full list of configuration options, refer to the dbt_cli_resource API docs.
You can configure this resource and add it to your dbt assets by doing the following:
from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project
from dagster import with_resources
DBT_PROJECT_PATH = "path/to/dbt_project"
dbt_assets = with_resources(
load_assets_from_dbt_project(DBT_PROJECT_PATH),
{
"dbt": dbt_cli_resource.configured(
{"project_dir": DBT_PROJECT_PATH},
)
},
)
Once you have your dbt assets, you can define a job that runs some or all of these assets on a schedule:
from dagster import ScheduleDefinition, define_asset_job, repository
run_everything_job = define_asset_job("run_everything", selection="*")
# only my_model and its children
run_something_job = define_asset_job("run_something", selection="my_model*")
@repository
def my_repo():
return [
dbt_assets,
ScheduleDefinition(
job=run_something_job,
cron_schedule="@daily",
),
ScheduleDefinition(
job=run_everything_job,
cron_schedule="@weekly",
),
]
Refer to the Schedule documentation for more info on running jobs on a schedule.
In Dagster, each asset has an asset key to identify it. Dagster automatically generates these keys for each dbt node in the project as well as the sources for each node.
For models, seeds, and snapshots, the default asset key will be the configured schema for that node (if any), concatenated with the name of the node.
For example, if you have configured a custom schema for a subdirectory in your dbt_project.yml
file:
models:
my_project:
marketing:
+schema: marketing
Then the asset key for a model named "some_model" will be marketing/some_model
. If you have not configured a custom schema, then the asset key will simply be some_model
.
For sources, the default asset key will be the name of the source concatenated with the name of the source table.
For example, the source table defined in the following sources.yaml
will be jaffle_shop/orders
:
sources:
- name: jaffle_shop
tables:
- name: orders
A common pattern is to use the prefix of an asset key to indicate what database an asset is stored in. For example, you might want all of your assets stored in Snowflake to start with the prefix snowflake
.
To add a prefix to the models generated by your dbt project, you can pass in a key_prefix
argument to either the load_assets_from_dbt_manifest
or load_assets_from_dbt_project
function:
dbt_assets = load_assets_from_dbt_project(
"path/to/dbt_project",
key_prefix="snowflake",
)
Note: The key_prefix
argument only applies to models. If you want to apply a prefix to the source keys that Dagster generates, pass in a source_key_prefix
argument:
dbt_assets = load_assets_from_dbt_project(
"path/to/dbt_project",
source_key_prefix="snowflake",
)
Dagster allows you to define assets that are downstream of specific dbt models. One property of dbt-based assets is that the external tool - in this case, dbt - handles storing each model in the database internally, rather than Dagster directly storing the tables that are updated.
This means that there's a range of ways to load a dbt model as input to a Python function. For example, you might want to load the contents as a Pandas dataframe or into a PySpark session. You can specify this loading behavior on each downstream asset. For example, if you wanted to consume a dbt model with the asset key my_dbt_model
as a Pandas dataframe, that would look something like the following:
@asset(
ins={"my_dbt_model": AssetIn(input_manager_key="pandas_df_manager")},
)
def my_downstream_asset(my_dbt_model):
# my_dbt_model is a Pandas dataframe
return my_dbt_model.where(foo="bar")
The implementation of your IO manager depends on:
A simple IO manager implementation that loads data from a dbt-managed table into a Pandas dataframe would look something like the following:
import pandas as pd
from dagster import IOManager, io_manager
class PandasIOManager(IOManager):
def __init__(self, con_string: str):
self._con = con_string
def handle_output(self, context, obj):
# dbt handles outputs for us
pass
def load_input(self, context) -> pd.DataFrame:
"""Load the contents of a table as a pandas DataFrame."""
table_name = context.asset_key.path[-1]
return pd.read_sql(f"SELECT * FROM {table_name}", con=self._con)
@io_manager(config_schema={"con_string": str})
def pandas_io_manager(context):
return PandasIOManager(context.resource_config["con_string"])
Once your IO manager is defined, you can supply it like any other resource when calling with_resources
:
from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project
from dagster import with_resources
dbt_assets = with_resources(
load_assets_from_dbt_project(...),
{
"dbt": dbt_cli_resource.configured(
{"project_dir": "path/to/dbt_project"},
),
"pandas_df_manager": pandas_io_manager.configured(
{"con_string": "..."},
),
},
)
If you find a bug or want to add a feature to the dagster-dbt
library, we invite you to contribute.
If you have questions on using dbt with Dagster, we'd love to hear from you: