Execution

Materializing Assets

dagster.materialize(assets, run_config=None, instance=None, resources=None, partition_key=None)[source]

Executes a single-threaded, in-process run which materializes provided assets.

By default, will materialize assets to the local filesystem.

Parameters:
  • assets (Sequence[Union[AssetsDefinition, SourceAsset]]) – The assets to materialize. Can also provide SourceAsset objects to fill dependencies for asset defs.

  • resources (Optional[Mapping[str, object]]) – The resources needed for execution. Can provide resource instances directly, or resource definitions. Note that if provided resources conflict with resources directly on assets, an error will be thrown.

  • run_config (Optional[Any]) – The run config to use for the run that materializes the assets.

  • partition_key – (Optional[str]) The string partition key that specifies the run config to execute. Can only be used to select run config for assets with partitioned config.

Returns:

The result of the execution.

Return type:

ExecuteInProcessResult

dagster.materialize_to_memory(assets, run_config=None, instance=None, resources=None, partition_key=None)[source]

Executes a single-threaded, in-process run which materializes provided assets in memory.

Will explicitly use mem_io_manager() for all required io manager keys. If any io managers are directly provided using the resources argument, a DagsterInvariantViolationError will be thrown.

Parameters:
  • assets (Sequence[Union[AssetsDefinition, SourceAsset]]) – The assets to materialize. Can also provide SourceAsset objects to fill dependencies for asset defs.

  • run_config (Optional[Any]) – The run config to use for the run that materializes the assets.

  • resources (Optional[Mapping[str, object]]) – The resources needed for execution. Can provide resource instances directly, or resource definitions. If provided resources conflict with resources directly on assets, an error will be thrown.

  • partition_key – (Optional[str]) The string partition key that specifies the run config to execute. Can only be used to select run config for assets with partitioned config.

Returns:

The result of the execution.

Return type:

ExecuteInProcessResult

Executing Jobs

class dagster.JobDefinition(*, graph_def, resource_defs=None, executor_def=None, logger_defs=None, name=None, config=None, description=None, partitions_def=None, tags=None, metadata=None, hook_defs=None, op_retry_policy=None, version_strategy=None, _subset_selection_data=None, asset_layer=None, input_values=None, _metadata_entries=None, _executor_def_specified=None, _logger_defs_specified=None, _preset_defs=None)[source]
execute_in_process(run_config=None, instance=None, partition_key=None, raise_on_error=True, op_selection=None, asset_selection=None, run_id=None, input_values=None)[source]

Execute the Job in-process, gathering results in-memory.

The executor_def on the Job will be ignored, and replaced with the in-process executor. If using the default io_manager, it will switch from filesystem to in-memory.

Parameters:
  • (Optional[Mapping[str (run_config) – The configuration for the run

  • Any]] – The configuration for the run

  • instance (Optional[DagsterInstance]) – The instance to execute against, an ephemeral one will be used if none provided.

  • partition_key – (Optional[str]) The string partition key that specifies the run config to execute. Can only be used to select run config for jobs with partitioned config.

  • raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur. Defaults to True.

  • op_selection (Optional[Sequence[str]]) – A list of op selection queries (including single op names) to execute. For example: * ['some_op']: selects some_op itself. * ['*some_op']: select some_op and all its ancestors (upstream dependencies). * ['*some_op+++']: select some_op, all its ancestors, and its descendants (downstream dependencies) within 3 levels down. * ['*some_op', 'other_op_a', 'other_op_b+']: select some_op and all its ancestors, other_op_a itself, and other_op_b and its direct child ops.

  • input_values (Optional[Mapping[str, Any]]) – A dictionary that maps python objects to the top-level inputs of the job. Input values provided here will override input values that have been provided to the job directly.

Returns:

ExecuteInProcessResult

dagster.execute_job(job, instance, run_config=None, tags=None, raise_on_error=False, op_selection=None, reexecution_options=None)[source]

Execute a job synchronously.

This API represents dagster’s python entrypoint for out-of-process execution. For most testing purposes, execute_in_process() will be more suitable, but when wanting to run execution using an out-of-process executor (such as dagster. multiprocess_executor), then execute_job is suitable.

execute_job expects a persistent DagsterInstance for execution, meaning the $DAGSTER_HOME environment variable must be set. It als expects a reconstructable pointer to a JobDefinition so that it can be reconstructed in separate processes. This can be done by wrapping the JobDefinition in a call to dagster. reconstructable().

from dagster import DagsterInstance, execute_job, job, reconstructable

@job
def the_job():
    ...

instance = DagsterInstance.get()
result = execute_job(reconstructable(the_job), instance=instance)
assert result.success

If using the to_job() method to construct the JobDefinition, then the invocation must be wrapped in a module-scope function, which can be passed to reconstructable.

from dagster import graph, reconstructable

@graph
def the_graph():
    ...

def define_job():
    return the_graph.to_job(...)

result = execute_job(reconstructable(define_job), ...)

Since execute_job is potentially executing outside of the current process, output objects need to be retrieved by use of the provided job’s io managers. Output objects can be retrieved by opening the result of execute_job as a context manager.

from dagster import execute_job

with execute_job(...) as result:
    output_obj = result.output_for_node("some_op")

execute_job can also be used to reexecute a run, by providing a ReexecutionOptions object.

from dagster import ReexecutionOptions, execute_job

instance = DagsterInstance.get()

options = ReexecutionOptions.from_failure(run_id=failed_run_id, instance)
execute_job(reconstructable(job), instance, reexecution_options=options)
Parameters:
  • job (ReconstructableJob) – A reconstructable pointer to a JobDefinition.

  • instance (DagsterInstance) – The instance to execute against.

  • run_config (Optional[dict]) – The configuration that parametrizes this run, as a dict.

  • tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to run logs.

  • raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur. Defaults to False.

  • op_selection (Optional[List[str]]) –

    A list of op selection queries (including single op names) to execute. For example:

    • ['some_op']: selects some_op itself.

    • ['*some_op']: select some_op and all its ancestors (upstream dependencies).

    • ['*some_op+++']: select some_op, all its ancestors, and its descendants (downstream dependencies) within 3 levels down.

    • ['*some_op', 'other_op_a', 'other_op_b+']: select some_op and all its ancestors, other_op_a itself, and other_op_b and its direct child ops.

  • reexecution_options (Optional[ReexecutionOptions]) – Reexecution options to provide to the run, if this run is intended to be a reexecution of a previous run. Cannot be used in tandem with the op_selection argument.

Returns:

The result of job execution.

Return type:

JobExecutionResult

class dagster.ReexecutionOptions(parent_run_id, step_selection=[])[source]

Reexecution options for python-based execution in Dagster.

Parameters:
  • parent_run_id (str) – The run_id of the run to reexecute.

  • step_selection (Sequence[str]) –

    The list of step selections to reexecute. Must be a subset or match of the set of steps executed in the original run. For example:

    • ['some_op']: selects some_op itself.

    • ['*some_op']: select some_op and all its ancestors (upstream dependencies).

    • ['*some_op+++']: select some_op, all its ancestors, and its descendants (downstream dependencies) within 3 levels down.

    • ['*some_op', 'other_op_a', 'other_op_b+']: select some_op and all its ancestors, other_op_a itself, and other_op_b and its direct child ops.

Executing Graphs

class dagster.GraphDefinition(name, *, description=None, node_defs=None, dependencies=None, input_mappings=None, output_mappings=None, config=None, tags=None, **kwargs)[source]

Defines a Dagster graph.

A graph is made up of

  • Nodes, which can either be an op (the functional unit of computation), or another graph.

  • Dependencies, which determine how the values produced by nodes as outputs flow from one node to another. This tells Dagster how to arrange nodes into a directed, acyclic graph (DAG) of compute.

End users should prefer the @graph decorator. GraphDefinition is generally intended to be used by framework authors or for programatically generated graphs.

Parameters:
  • name (str) – The name of the graph. Must be unique within any GraphDefinition or JobDefinition containing the graph.

  • description (Optional[str]) – A human-readable description of the pipeline.

  • node_defs (Optional[Sequence[NodeDefinition]]) – The set of ops / graphs used in this graph.

  • dependencies (Optional[Dict[Union[str, NodeInvocation], Dict[str, DependencyDefinition]]]) – A structure that declares the dependencies of each op’s inputs on the outputs of other ops in the graph. Keys of the top level dict are either the string names of ops in the graph or, in the case of aliased ops, NodeInvocations. Values of the top level dict are themselves dicts, which map input names belonging to the op or aliased op to DependencyDefinitions.

  • input_mappings (Optional[Sequence[InputMapping]]) – Defines the inputs to the nested graph, and how they map to the inputs of its constituent ops.

  • output_mappings (Optional[Sequence[OutputMapping]]) – Defines the outputs of the nested graph, and how they map from the outputs of its constituent ops.

  • config (Optional[ConfigMapping]) – Defines the config of the graph, and how its schema maps to the config of its constituent ops.

  • tags (Optional[Dict[str, Any]]) – Arbitrary metadata for any execution of the graph. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. These tag values may be overwritten by tag values provided at invocation time.

Examples

@op
def return_one():
    return 1

@op
def add_one(num):
    return num + 1

graph_def = GraphDefinition(
    name='basic',
    node_defs=[return_one, add_one],
    dependencies={'add_one': {'num': DependencyDefinition('return_one')}},
)
execute_in_process(run_config=None, instance=None, resources=None, raise_on_error=True, op_selection=None, run_id=None, input_values=None)[source]

Execute this graph in-process, collecting results in-memory.

Parameters:
  • run_config (Optional[Mapping[str, Any]]) – Run config to provide to execution. The configuration for the underlying graph should exist under the “ops” key.

  • instance (Optional[DagsterInstance]) – The instance to execute against, an ephemeral one will be used if none provided.

  • resources (Optional[Mapping[str, Any]]) – The resources needed if any are required. Can provide resource instances directly, or resource definitions.

  • raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur. Defaults to True.

  • op_selection (Optional[List[str]]) – A list of op selection queries (including single op names) to execute. For example: * ['some_op']: selects some_op itself. * ['*some_op']: select some_op and all its ancestors (upstream dependencies). * ['*some_op+++']: select some_op, all its ancestors, and its descendants (downstream dependencies) within 3 levels down. * ['*some_op', 'other_op_a', 'other_op_b+']: select some_op and all its ancestors, other_op_a itself, and other_op_b and its direct child ops.

  • input_values (Optional[Mapping[str, Any]]) – A dictionary that maps python objects to the top-level inputs of the graph.

Returns:

ExecuteInProcessResult

Execution results

class dagster.ExecuteInProcessResult(event_list, dagster_run, output_capture, job_def)[source]

Result object returned by in-process testing APIs.

Users should not instantiate this object directly. Used for retrieving run success, events, and outputs from execution methods that return this object.

This object is returned by: - dagster.GraphDefinition.execute_in_process() - dagster.JobDefinition.execute_in_process() - dagster.materialize_to_memory() - dagster.materialize()

property all_events

All dagster events emitted during execution.

Type:

List[DagsterEvent]

output_for_node(node_str, output_name='result')[source]

Retrieves output value with a particular name from the in-process run of the job.

Parameters:
  • node_str (str) – Name of the op/graph whose output should be retrieved. If the intended graph/op is nested within another graph, the syntax is outer_graph.inner_node.

  • output_name (Optional[str]) – Name of the output on the op/graph to retrieve. Defaults to result, the default output name in dagster.

Returns:

The value of the retrieved output.

Return type:

Any

output_value(output_name='result')[source]

Retrieves output of top-level job, if an output is returned.

Parameters:

output_name (Optional[str]) – The name of the output to retrieve. Defaults to result, the default output name in dagster.

Returns:

The value of the retrieved output.

Return type:

Any

property run_id

The unique identifier of the executed run.

class dagster.ExecuteJobResult(job_def, reconstruct_context, event_list, dagster_run)[source]

Result object returned by dagster.execute_job().

Used for retrieving run success, events, and outputs from execute_job. Users should not directly instantiate this class.

Events and run information can be retrieved off of the object directly. In order to access outputs, the ExecuteJobResult object needs to be opened as a context manager, which will re-initialize the resources from execution.

output_for_node(node_str, output_name='result')[source]

Retrieves output value with a particular name from the run of the job.

In order to use this method, the ExecuteJobResult object must be opened as a context manager. If this method is used without opening the context manager, it will result in a DagsterInvariantViolationError.

Parameters:
  • node_str (str) – Name of the op/graph whose output should be retrieved. If the intended graph/op is nested within another graph, the syntax is outer_graph.inner_node.

  • output_name (Optional[str]) – Name of the output on the op/graph to retrieve. Defaults to result, the default output name in dagster.

Returns:

The value of the retrieved output.

Return type:

Any

output_value(output_name='result')[source]

Retrieves output of top-level job, if an output is returned.

In order to use this method, the ExecuteJobResult object must be opened as a context manager. If this method is used without opening the context manager, it will result in a DagsterInvariantViolationError. If the top-level job has no output, calling this method will also result in a DagsterInvariantViolationError.

Parameters:

output_name (Optional[str]) – The name of the output to retrieve. Defaults to result, the default output name in dagster.

Returns:

The value of the retrieved output.

Return type:

Any

property run_id

The unique identifier of the executed run.

class dagster.DagsterEvent(event_type_value, pipeline_name, step_handle=None, solid_handle=None, step_kind_value=None, logging_tags=None, event_specific_data=None, message=None, pid=None, step_key=None)[source]

Events yielded by solid and pipeline execution.

Users should not instantiate this class.

event_type_value

Value for a DagsterEventType.

Type:

str

pipeline_name
Type:

str

solid_handle
Type:

NodeHandle

step_kind_value

Value for a StepKind.

Type:

str

logging_tags
Type:

Dict[str, str]

event_specific_data

Type must correspond to event_type_value.

Type:

Any

message
Type:

str

pid
Type:

int

step_key

DEPRECATED

Type:

Optional[str]

property event_type

The type of this event.

Type:

DagsterEventType

class dagster.DagsterEventType(value)[source]

The types of events that may be yielded by solid and pipeline execution.

Reconstructable jobs

class dagster.reconstructable(target)[source]

Create a ReconstructablePipeline from a function that returns a PipelineDefinition/JobDefinition, or a function decorated with @pipeline/@job.

When your pipeline/job must cross process boundaries, e.g., for execution on multiple nodes or in different systems (like dagstermill), Dagster must know how to reconstruct the pipeline/job on the other side of the process boundary.

Passing a job created with ~dagster.GraphDefinition.to_job to reconstructable(), requires you to wrap that job’s definition in a module-scoped function, and pass that function instead:

from dagster import graph, reconstructable

@graph
def my_graph():
    ...

def define_my_job():
    return my_graph.to_job()

reconstructable(define_my_job)

This function implements a very conservative strategy for reconstruction, so that its behavior is easy to predict, but as a consequence it is not able to reconstruct certain kinds of pipelines or jobs, such as those defined by lambdas, in nested scopes (e.g., dynamically within a method call), or in interactive environments such as the Python REPL or Jupyter notebooks.

If you need to reconstruct objects constructed in these ways, you should use build_reconstructable_job() instead, which allows you to specify your own reconstruction strategy.

Examples:

from dagster import job, reconstructable

@job
def foo_job():
    ...

reconstructable_foo_job = reconstructable(foo_job)


@graph
def foo():
    ...

def make_bar_job():
    return foo.to_job()

reconstructable_bar_job = reconstructable(make_bar_job)

Executors

dagster.multi_or_in_process_executor ExecutorDefinition[source]

The default executor for a job.

This is the executor available by default on a JobDefinition that does not provide custom executors. This executor has a multiprocessing-enabled mode, and a single-process mode. By default, multiprocessing mode is enabled. Switching between multiprocess mode and in-process mode can be achieved via config.

execution:
  config:
    multiprocess:


execution:
  config:
    in_process:

When using the multiprocess mode, max_concurrent and retries can also be configured.

execution:
  config:
    multiprocess:
      max_concurrent: 4
      retries:
      enabled:

The max_concurrent arg is optional and tells the execution engine how many processes may run concurrently. By default, or if you set max_concurrent to be 0, this is the return value of python:multiprocessing.cpu_count().

When using the in_process mode, then only retries can be configured.

Execution priority can be configured using the dagster/priority tag via solid metadata, where the higher the number the higher the priority. 0 is the default and both positive and negative numbers can be used.

dagster.in_process_executor ExecutorDefinition[source]

The in-process executor executes all steps in a single process.

For legacy pipelines, this will be the default executor. To select it explicitly, include the following top-level fragment in config:

execution:
  in_process:

Execution priority can be configured using the dagster/priority tag via solid/op metadata, where the higher the number the higher the priority. 0 is the default and both positive and negative numbers can be used.

dagster.multiprocess_executor ExecutorDefinition[source]

The multiprocess executor executes each step in an individual process.

Any job that does not specify custom executors will use the multiprocess_executor by default. For jobs or legacy pipelines, to configure the multiprocess executor, include a fragment such as the following in your run config:

execution:
  config:
    multiprocess:
      max_concurrent: 4

The max_concurrent arg is optional and tells the execution engine how many processes may run concurrently. By default, or if you set max_concurrent to be 0, this is the return value of python:multiprocessing.cpu_count().

Execution priority can be configured using the dagster/priority tag via solid/op metadata, where the higher the number the higher the priority. 0 is the default and both positive and negative numbers can be used.

Contexts

class dagster.OpExecutionContext(step_execution_context)[source]

The context object that can be made available as the first argument to an op’s compute function.

The context object provides system information such as resources, config, and logging to an op’s compute function. Users should not instantiate this object directly. To construct an OpExecutionContext for testing purposes, use dagster.build_op_context().

Example:

from dagster import op

@op
def hello_world(context: OpExecutionContext):
    context.log.info("Hello, world!")
asset_partition_key_for_input(input_name)

Returns the asset partition key for the given output. Defaults to “result”, which is the name of the default output.

asset_partition_key_for_output(output_name='result')

Returns the asset partition key for the given output. Defaults to “result”, which is the name of the default output.

asset_partitions_def_for_input(input_name)

The PartitionsDefinition on the upstream asset corresponding to this input.

asset_partitions_def_for_output(output_name='result')

The PartitionsDefinition on the upstream asset corresponding to this input.

asset_partitions_time_window_for_output(output_name='result')

The time window for the partitions of the output asset.

Raises an error if either of the following are true: - The output asset has no partitioning. - The output asset is not partitioned with a TimeWindowPartitionsDefinition.

get_mapping_key()

Which mapping_key this execution is for if downstream of a DynamicOutput, otherwise None.

get_tag(key)

Get a logging tag.

Parameters:

key (tag) – The tag to get.

Returns:

The value of the tag, if present.

Return type:

Optional[str]

property has_partition_key

Whether the current run is a partitioned run

has_tag(key)

Check if a logging tag is set.

Parameters:

key (str) – The tag to check.

Returns:

Whether the tag is set.

Return type:

bool

property instance

The current Dagster instance

Type:

DagsterInstance

property job_def

The currently executing job.

Type:

JobDefinition

property job_name

The name of the currently executing job.

Type:

str

property log

The log manager available in the execution context.

Type:

DagsterLogManager

log_event(event)

Log an AssetMaterialization, AssetObservation, or ExpectationResult from within the body of an op.

Events logged with this method will appear in the list of DagsterEvents, as well as the event log.

Parameters:

event (Union[AssetMaterialization, Materialization, AssetObservation, ExpectationResult]) – The event to log.

Examples:

from dagster import op, AssetMaterialization

@op
def log_materialization(context):
    context.log_event(AssetMaterialization("foo"))
property op_config

The parsed config specific to this op.

property op_def

The current op definition.

Type:

OpDefinition

property partition_key

The partition key for the current run.

Raises an error if the current run is not a partitioned run.

property partition_time_window

The partition time window for the current run.

Raises an error if the current run is not a partitioned run, or if the job’s partition definition is not a TimeWindowPartitionsDefinition.

property pdb

Gives access to pdb debugging from within the op.

Example:

@op
def debug(context):
    context.pdb.set_trace()
Type:

dagster.utils.forked_pdb.ForkedPdb

property resources

The currently available resources.

Type:

Resources

property retry_number

Which retry attempt is currently executing i.e. 0 for initial attempt, 1 for first retry, etc.

property run_config

The run config for the current execution.

Type:

dict

property run_id

The id of the current execution’s run.

Type:

str

dagster.build_op_context(resources=None, op_config=None, resources_config=None, instance=None, config=None, partition_key=None, mapping_key=None)[source]

Builds op execution context from provided parameters.

op is currently built on top of solid, and thus this function creates a SolidExecutionContext. build_op_context can be used as either a function or context manager. If there is a provided resource that is a context manager, then build_op_context must be used as a context manager. This function can be used to provide the context argument when directly invoking a op.

Parameters:
  • resources (Optional[Dict[str, Any]]) – The resources to provide to the context. These can be either values or resource definitions.

  • config (Optional[Any]) – The op config to provide to the context.

  • instance (Optional[DagsterInstance]) – The dagster instance configured for the context. Defaults to DagsterInstance.ephemeral().

  • mapping_key (Optional[str]) – A key representing the mapping key from an upstream dynamic output. Can be accessed using context.get_mapping_key().

  • partition_key (Optional[str]) – String value representing partition key to execute with.

Examples

context = build_op_context()
op_to_invoke(context)

with build_op_context(resources={"foo": context_manager_resource}) as context:
    op_to_invoke(context)
class dagster.TypeCheckContext(run_id, log_manager, scoped_resources_builder, dagster_type)[source]

The context object available to a type check function on a DagsterType.

log

Centralized log dispatch from user code.

Type:

DagsterLogManager

resources

An object whose attributes contain the resources available to this op.

Type:

Any

run_id

The id of this job run.

Type:

str

Job configuration

dagster.validate_run_config(job_def=None, run_config=None, mode=None, pipeline_def=None)[source]

Function to validate a provided run config blob against a given job. For legacy APIs, a pipeline/mode can also be passed in.

If validation is successful, this function will return a dictionary representation of the validated config actually used during execution.

Parameters:
  • job_def (Union[PipelineDefinition, JobDefinition]) – The job definition to validate run config against

  • run_config (Optional[Dict[str, Any]]) – The run config to validate

  • mode (str) – The mode of the pipeline to validate against (different modes may require different config)

  • pipeline_def (PipelineDefinition) – The pipeline definition to validate run config against.

Returns:

A dictionary representation of the validated config.

Return type:

Dict[str, Any]

Run Config Schema

The run_config used for jobs has the following schema:

{
  # configuration for execution, required if executors require config
  execution: {
    # the name of one, and only one available executor, typically 'in_process' or 'multiprocess'
    __executor_name__: {
      # executor-specific config, if required or permitted
      config: {
        ...
      }
    }
  },

  # configuration for loggers, required if loggers require config
  loggers: {
    # the name of an available logger
    __logger_name__: {
      # logger-specific config, if required or permitted
      config: {
        ...
      }
    },
    ...
  },

  # configuration for resources, required if resources require config
  resources: {
    # the name of a resource
    __resource_name__: {
      # resource-specific config, if required or permitted
      config: {
        ...
      }
    },
    ...
  },

  # configuration for underlying ops, required if ops require config
  ops: {

    # these keys align with the names of the ops, or their alias in this job
    __op_name__: {

      # pass any data that was defined via config_field
      config: ...,

      # configurably specify input values, keyed by input name
      inputs: {
        __input_name__: {
          # if an dagster_type_loader is specified, that schema must be satisfied here;
          # scalar, built-in types will generally allow their values to be specified directly:
          value: ...
        }
      },

    }
  },

}