Resources

@dagster.resource(config_schema=None, description=None, required_resource_keys=None, version=None)[source]

Define a resource.

The decorated function should accept an InitResourceContext and return an instance of the resource. This function will become the resource_fn of an underlying ResourceDefinition.

If the decorated function yields once rather than returning (in the manner of functions decorable with @contextlib.contextmanager) then the body of the function after the yield will be run after execution resolves, allowing users to write their own teardown/cleanup logic.

Parameters:
  • config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in init_context.resource_config. If not set, Dagster will accept any config provided.

  • description (Optional[str]) – A human-readable description of the resource.

  • version (Optional[str]) – (Experimental) The version of a resource function. Two wrapped resource functions should only have the same version if they produce the same resource definition when provided with the same inputs.

  • required_resource_keys (Optional[Set[str]]) – Keys for the resources required by this resource.

class dagster.ResourceDefinition(resource_fn, config_schema=None, description=None, required_resource_keys=None, version=None)[source]

Core class for defining resources.

Resources are scoped ways to make external resources (like database connections) available to during job execution and to clean up after execution resolves.

If resource_fn yields once rather than returning (in the manner of functions decorable with @contextlib.contextmanager) then the body of the function after the yield will be run after execution resolves, allowing users to write their own teardown/cleanup logic.

Depending on your executor, resources may be instantiated and cleaned up more than once in a job execution.

Parameters:
  • resource_fn (Callable[[InitResourceContext], Any]) – User-provided function to instantiate the resource, which will be made available to executions keyed on the context.resources object.

  • config_schema (Optional[ConfigSchema) – The schema for the config. If set, Dagster will check that config provided for the resource matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the resource.

  • description (Optional[str]) – A human-readable description of the resource.

  • required_resource_keys – (Optional[Set[str]]) Keys for the resources required by this resource. A DagsterInvariantViolationError will be raised during initialization if dependencies are cyclic.

  • version (Optional[str]) – (Experimental) The version of the resource’s definition fn. Two wrapped resource functions should only have the same version if they produce the same resource definition when provided with the same inputs.

class dagster.InitResourceContext(resource_config, resources, resource_def=None, instance=None, dagster_run=None, log_manager=None)[source]

The context object available as the argument to the initialization function of a dagster.ResourceDefinition.

Users should not instantiate this object directly. To construct an InitResourceContext for testing purposes, use dagster.build_init_resource_context().

resource_config

The configuration data provided by the run config. The schema for this data is defined by the config_field argument to ResourceDefinition.

Type:

Any

resource_def

The definition of the resource currently being constructed.

Type:

ResourceDefinition

log_manager

The log manager for this run of the job or pipeline

Type:

DagsterLogManager

resources

The resources that are available to the resource that we are initalizing.

Type:

ScopedResources

dagster_run

The dagster run to use. When initializing resources outside of execution context, this will be None.

Type:

Optional[PipelineRun]

run_id

The id for this run of the job or pipeline. When initializing resources outside of execution context, this will be None.

Type:

Optional[str]

pipeline_run

(legacy) The dagster run to use. When initializing resources outside of execution context, this will be None.

Type:

Optional[PipelineRun]

Example:

from dagster import resource, InitResourceContext

@resource
def the_resource(init_context: InitResourceContext):
    init_context.log.info("Hello, world!")
dagster.make_values_resource(**kwargs)[source]

A helper function that creates a ResourceDefinition to take in user-defined values.

This is useful for sharing values between ops.

Parameters:

**kwargs – Arbitrary keyword arguments that will be passed to the config schema of the returned resource definition. If not set, Dagster will accept any config provided for the resource.

For example:

@op(required_resource_keys={"globals"})
def my_op(context):
    print(context.resources.globals["my_str_var"])

@job(resource_defs={"globals": make_values_resource(my_str_var=str, my_int_var=int)})
def my_job():
    my_op()
Returns:

A resource that passes in user-defined values.

Return type:

ResourceDefinition

dagster.build_init_resource_context(config=None, resources=None, instance=None)[source]

Builds resource initialization context from provided parameters.

build_init_resource_context can be used as either a function or context manager. If there is a provided resource to build_init_resource_context that is a context manager, then it must be used as a context manager. This function can be used to provide the context argument to the invocation of a resource.

Parameters:
  • resources (Optional[Dict[str, Any]]) – The resources to provide to the context. These can be either values or resource definitions.

  • config (Optional[Any]) – The resource config to provide to the context.

  • instance (Optional[DagsterInstance]) – The dagster instance configured for the context. Defaults to DagsterInstance.ephemeral().

Examples

context = build_init_resource_context()
resource_to_init(context)

with build_init_resource_context(
    resources={"foo": context_manager_resource}
) as context:
    resource_to_init(context)
dagster.build_resources(resources, instance=None, resource_config=None, pipeline_run=None, log_manager=None)[source]

Context manager that yields resources using provided resource definitions and run config.

This API allows for using resources in an independent context. Resources will be initialized with the provided run config, and optionally, pipeline_run. The resulting resources will be yielded on a dictionary keyed identically to that provided for resource_defs. Upon exiting the context, resources will also be torn down safely.

Parameters:
  • resources (Mapping[str, Any]) – Resource instances or definitions to build. All required resource dependencies to a given resource must be contained within this dictionary, or the resource build will fail.

  • instance (Optional[DagsterInstance]) – The dagster instance configured to instantiate resources on.

  • resource_config (Optional[Mapping[str, Any]]) – A dict representing the config to be provided to each resource during initialization and teardown.

  • pipeline_run (Optional[PipelineRun]) – The pipeline run to provide during resource initialization and teardown. If the provided resources require either the pipeline_run or run_id attributes of the provided context during resource initialization and/or teardown, this must be provided, or initialization will fail.

  • log_manager (Optional[DagsterLogManager]) – Log Manager to use during resource initialization. Defaults to system log manager.

Examples:

from dagster import resource, build_resources

@resource
def the_resource():
    return "foo"

with build_resources(resources={"from_def": the_resource, "from_val": "bar"}) as resources:
    assert resources.from_def == "foo"
    assert resources.from_val == "bar"
dagster.with_resources(definitions, resource_defs, resource_config_by_key=None)[source]

Adds dagster resources to copies of resource-requiring dagster definitions.

An error will be thrown if any provided definitions have a conflicting resource definition provided for a key provided to resource_defs. Resource config can be provided, with keys in the config dictionary corresponding to the keys for each resource definition. If any definition has unsatisfied resource keys after applying with_resources, an error will be thrown.

Parameters:
  • definitions (Iterable[ResourceAddable]) – Dagster definitions to provide resources to.

  • resource_defs (Mapping[str, ResourceDefinition]) – Mapping of resource keys to ResourceDefinition objects to satisfy resource requirements of provided dagster definitions.

  • resource_config_by_key (Optional[Mapping[str, Any]]) – Specifies config for provided resources. The key in this dictionary corresponds to configuring the same key in the resource_defs dictionary.

Examples:

from dagster import asset, resource, with_resources

@resource(config_schema={"bar": str})
def foo_resource():
    ...

@asset(required_resource_keys={"foo"})
def asset1(context):
    foo = context.resources.foo
    ...

@asset(required_resource_keys={"foo"})
def asset2(context):
    foo = context.resources.foo
    ...

asset1_with_foo, asset2_with_foo = with_resources(
    [the_asset, other_asset],
    resource_config_by_key={
        "foo": {
            "config": {"bar": ...}
        }
    }
)