Resources are objects that are shared across the implementations of multiple software-defined assets and ops and that can be plugged in after defining those ops and assets.
Resources typically model external components that assets and ops interact with. For example, a resource might be a connection to a data warehouse like Snowflake or a service like Slack.
So, why use resources?
Name | Description |
---|---|
@resource | The decorator used to define resources. The decorated function is called a resource_fn . The decorator returns a ResourceDefinition . |
ResourceDefinition | Class for resource definitions. You almost never want to use initialize this class directly. Instead, you should use the @resource which returns a ResourceDefinition . |
InitResourceContext | The context object provided to a resource during initialization. This object contains required resource, config, and other run information. |
build_init_resource_context | Function for building an InitResourceContext outside of execution, intended to be used when testing a resource. |
build_resources | Function for initializing a set of resources outside of the context of a job's execution. |
with_resources | Function for providing resources to software-defined assets and source assets. |
To define a resource, use the @resource
decorator. Wrap a function that takes an init_context
as the first parameter, which is an instance of InitResourceContext
. From this function, return or yield the object that you would like to be available as a resource.
from dagster import resource
class ExternalCerealFetcher:
def fetch_new_cereals(self, start_ts, end_ts):
pass
@resource
def cereal_fetcher(init_context):
return ExternalCerealFetcher()
Software-defined assets use resource keys to access resources:
from dagster import asset
@asset(required_resource_keys={"foo"})
def asset_requires_resource(context):
do_something_with_resource(context.resources.foo)
Resources can be provided to software-defined assets using the with_resources
function. This function takes in a sequence of assets and returns transformed versions of those assets with the provided resources specified:
from dagster import repository, with_resources
@repository
def repo():
return [
*with_resources(
definitions=[asset_requires_resource],
resource_defs={"foo": foo_resource},
)
]
When defining asset jobs (using define_asset_job
), you don't need to provide resources to the job directly. The job will make use of the resources provided to the assets.
Like software-defined assets, ops use resource keys to access resources:
from dagster import op
CREATE_TABLE_1_QUERY = "create table_1 as select * from table_0"
@op(required_resource_keys={"database"})
def op_requires_resources(context):
context.resources.database.execute_query(CREATE_TABLE_1_QUERY)
Jobs provide resources to the ops inside them. A job has a dictionary that maps resource keys to resource definitions. You can supply this dictionary to the resource_defs
argument when using either of the ways to construct a job: GraphDefinition.to_job
or @job
.
Supplying resources when using GraphDefinition.to_job
is especially common, because you can build multiple jobs from the same graph that are distinguished by their different resources.
from dagster import graph
@graph
def do_database_stuff():
op_requires_resources()
do_database_stuff_prod = do_database_stuff.to_job(
resource_defs={"database": database_resource_a}
)
do_database_stuff_dev = do_database_stuff.to_job(
resource_defs={"database": database_resource_b}
)
Supplying resources to the @job
, i.e. when there aren't multiple jobs for the same graph, is also useful. For example, if you want to use an off-the-shelf resource or supply configuration in one place instead of in every op.
from dagster import job
@job(resource_defs={"database": database_resource})
def do_database_stuff_job():
op_requires_resources()
ResourceDefinitions
can have a config schema, which allows you to customize behavior at runtime through run configuration.For example, let's say we wanted to pass a connection string to our DatabaseConnection
resource.
class DatabaseConnection:
def __init__(self, connection: str):
self.connection = connection
@resource(config_schema={"connection": str})
def db_resource(init_context):
connection = init_context.resource_config["connection"]
return DatabaseConnection(connection)
Dagster resources can serve as context managers, for scenarios where it is necessary to perform some sort of cleanup of the resource after execution. Let’s take the example of a database connection. We might want to clean up the connection once we are done using it. We can incorporate this into our resource like so:
@resource
@contextmanager
def db_connection():
try:
db_conn = get_db_connection()
yield db_conn
finally:
cleanup_db_connection(db_conn)
At spinup time, Dagster will run the code within the try block, and be expecting a single yield. Having more than one yield will cause an error. The yielded object will be available to code that requires the resource:
@op(required_resource_keys={"db_connection"})
def use_db_connection(context):
db_conn = context.resources.db_connection
...
Once execution finishes, the finally block of the resource init function will run. In the case of our db_connection resource, this will run the cleanup function.
An important nuance is that resources are initialized (and torn down) once per process. This means that if using the in-process executor, which runs all steps in a single process, resources will be initialized at the beginning of execution, and torn down after every single step is finished executing. In contrast, when using the multiprocess executor (or other out-of-process executors), where there is a single process for each step, at the beginning of each step execution, the resource will be initialized, and at the end of that step’s execution, the finally block will be run.
You can test the initialization of a resource by invoking the resource definition. This will run the underlying decorated function.
from dagster import resource
@resource
def my_resource(_):
return "foo"
def test_my_resource():
assert my_resource(None) == "foo"
If your resource requires other resources or config, then you can provide a InitResourceContext
object by using the build_init_resource_context
function.
from dagster import build_init_resource_context, resource
@resource(required_resource_keys={"foo"}, config_schema={"bar": str})
def my_resource_requires_context(init_context):
return init_context.resources.foo, init_context.resource_config["bar"]
def test_my_resource_with_context():
init_context = build_init_resource_context(
resources={"foo": "foo_str"}, config={"bar": "bar_str"}
)
assert my_resource_requires_context(init_context) == ("foo_str", "bar_str")
If your resource is a context manager, then you can open it as one using python's with
syntax.
from contextlib import contextmanager
from dagster import resource
@resource
@contextmanager
def my_cm_resource(_):
yield "foo"
def test_cm_resource():
with my_cm_resource(None) as initialized_resource:
assert initialized_resource == "foo"
There are scenarios where you might want to reuse the code written within your resources outside of the context of execution. Consider a case where you have a resource db_connection
, and you want to use that resource outside of the context of an execution. You can use the build_resources
API to initialize this resource outside of execution.
from dagster import resource, build_resources
@resource
def the_credentials():
...
@resource(required_resource_keys={"credentials"})
def the_db_connection(init_context):
get_the_db_connection(init_context.resources.credentials)
def uses_db_connection():
with build_resources(
{"db_connection": the_db_connection, "credentials": the_credentials}
) as resources:
conn = resources.db_connection
...
Resources can depend upon other resources. Use the required_resource_keys
parameter of the @resource
decorator to specify which resources to depend upon. Access the required resources through the context object provided to the wrapped function.
from dagster import resource
@resource
def credentials():
return ("bad_username", "easy_password")
@resource(required_resource_keys={"credentials"})
def client(init_context):
username, password = init_context.resources.credentials
return Client(username, password)
Now, consider an op that will use the client
resource:
from dagster import graph, op
@op(required_resource_keys={"client"})
def get_client(context):
return context.resources.client
When constructing a job that includes that op, we provide the resource client
, but also credentials
, because client
requires it.
@job(resource_defs={"credentials": credentials, "client": client})
def connect():
get_client()
For more examples of resources, check out the following in our Hacker News example: