The configured
API offers a way to configure a Dagster entity at definition time.
Name | Description |
---|---|
@configured | The decorator to configure a Dagster entity. |
configured | The method to configure a Dagster entity. |
The primary purpose of Dagster config is to provide values to ops and resources when running a job. Sometimes, however, you may find yourself with an op or resource that requires configuration, and you might not want whoever is running the job to need to provide that configuration. I.e. you may know the values of the config you want to provide at definition time instead of runtime.
When is this useful? Often library authors provide very flexible and configurable ops that can be used in a wide variety of operational contexts. For example, in our dbt integration, there is an op that could allow a user to run arbitrary dbt commands on a deployed instance, and leverage our config editor to make this easier.
However, typically you do not want this level of flexibility in a deployed job. You want most configuration options set in code and fixed for deployed. configured
provides the bridge between these worlds by offering a way to provide configuration at definition time. When invoked on a Dagster entity, it returns an interchangeable object with the given configuration "baked in".
configured
is available with the following definitions:
There are different ways to invoke configured
on an entity.
You can invoke the configured
as a method on a given entity.
east_unsigned_s3_session = s3_session.configured(
{"region": "us-east-1", "use_unsigned_session": False}
)
We also provide a configured
decorator that makes it easy to create a function-configured version of an object. You can find more information in the @configured
API reference.
@configured(s3_session)
def west_unsigned_s3_session(_init_context):
return {"region": "us-west-1", "use_unsigned_session": False}
If the config to supply to the object is constant, you may alternatively invoke this and call the result with a dict of config values to be curried. You can find more information in the @configured
API reference.
west_signed_s3_session = configured(s3_session)(
{"region": "us-west-1", "use_unsigned_session": False}
)
In other cases, it's useful to partially fill out the configuration at definition time and leave other configurations for runtime. For these cases, configured
can be used as a decorator, accepting a function that translates from runtime config to config that satisfies the entity's config schema. It returns an entity with the "outer" config schema as its schema.
from dagster import configured, resource
@resource(config_schema={"region": str, "use_unsigned_session": bool})
def s3_session(_init_context):
"""Connect to S3"""
@configured(s3_session, config_schema={"region": str})
def unsigned_s3_session(config):
return {"region": config["region"], "use_unsigned_session": False}
You can use the configured
API with any definition type in the same way. For example, to configure an op, you can simply invoke configured
on the op definition:
from dagster import Field, configured, op
@op(
config_schema={
"iterations": int,
"word": Field(str, is_required=False, default_value="hello"),
}
)
def example(context):
for _ in range(context.op_config["iterations"]):
context.log.info(context.op_config["word"])
# This example is fully configured. With this syntax, a name must be explicitly provided.
configured_example = configured(example, name="configured_example")(
{"iterations": 6, "word": "wheaties"}
)
# This example is partially configured: `iterations` is passed through
# The decorator yields an op named 'another_configured_example' (from the decorated function)
# with `int` as the `config_schema`.
@configured(example, int)
def another_configured_example(config):
return {"iterations": config, "word": "wheaties"}
A common pattern in the development cycle is to use different configuration for each environment. For example, you might connect to a local database during local development, and connect to a production database in your cloud environment. You can use the configured
API to select between different configurations at repository definition time:
@op(required_resource_keys={"database"})
def get_one(context):
context.resources.database.execute_query("SELECT 1")
@graph
def get_one_from_db():
get_one()
@repository
def my_repo():
resources_by_env = {
"local": {
"database": database_client.configured(
{
"username": {"env": "DEV_USER"},
"password": {"env": "DEV_PASSWORD"},
"hostname": "localhost",
"db_name": "DEVELOPMENT",
"port": "5432",
}
)
},
"production": {
"database": database_client.configured(
{
"username": {"env": "SYSTEM_USER"},
"password": {"env": "SYSTEM_PASSWORD"},
"hostname": "abccompany",
"db_name": "PRODUCTION",
"port": "5432",
}
)
},
}
return [get_one_from_db.to_job(resource_defs=resources_by_env[get_env()])]
# end_database_example
When using the decorator syntax (@configured
), the resulting op definition will inherit the name of the function being decorated (like another_configured_example
in the above example). When configuring an op completely with a config dictionary rather than with a function (as with configured_example
), you must add the positional argument name
in the call to configured
. When naming ops, remember that op definitions must have unique names within a repository or job.
@op(
config_schema={
"is_sample": Field(bool, is_required=False, default_value=False),
},
ins={"xs": In(List[Int])},
)
def get_dataset(context, xs):
if context.op_config["is_sample"]:
return xs[:5]
else:
return xs
# If we want to use the same op configured in multiple ways in the same job,
# we have to specify unique names when configuring them:
sample_dataset = configured(get_dataset, name="sample_dataset")({"is_sample": True})
full_dataset = configured(get_dataset, name="full_dataset")({"is_sample": False})
@job
def datasets():
sample_dataset()
full_dataset()
For more examples of jobs, check out the following in our Hacker News example: