from functools import update_wrapper
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
FrozenSet,
List,
Mapping,
Optional,
Sequence,
Tuple,
Type,
Union,
cast,
)
import dagster._check as check
from dagster._annotations import public
from dagster._config import Field, Shape
from dagster._config.config_type import ConfigType
from dagster._config.validate import validate_config
from dagster._core.definitions.composition import MappedInputPlaceholder
from dagster._core.definitions.dependency import (
DependencyDefinition,
DynamicCollectDependencyDefinition,
IDependencyDefinition,
MultiDependencyDefinition,
Node,
NodeHandle,
NodeInvocation,
SolidOutputHandle,
)
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.node_definition import NodeDefinition
from dagster._core.definitions.policy import RetryPolicy
from dagster._core.definitions.utils import check_valid_name
from dagster._core.errors import (
DagsterInvalidConfigError,
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
DagsterInvalidSubsetError,
)
from dagster._core.selector.subset_selector import (
AssetSelectionData,
LeafNodeSelection,
OpSelectionData,
parse_op_selection,
)
from dagster._core.storage.io_manager import io_manager
from dagster._core.utils import str_format_set
from dagster._utils import merge_dicts
from .asset_layer import AssetLayer, build_asset_selection_job
from .config import ConfigMapping
from .dependency import DependencyDefinition
from .executor_definition import ExecutorDefinition, multi_or_in_process_executor
from .graph_definition import GraphDefinition, SubselectedGraphDefinition
from .hook_definition import HookDefinition
from .logger_definition import LoggerDefinition
from .metadata import MetadataEntry, PartitionMetadataEntry, RawMetadataValue
from .mode import ModeDefinition
from .partition import PartitionSetDefinition, PartitionedConfig, PartitionsDefinition
from .pipeline_definition import PipelineDefinition
from .preset import PresetDefinition
from .resource_definition import ResourceDefinition
from .run_request import RunRequest
from .utils import DEFAULT_IO_MANAGER_KEY
from .version_strategy import VersionStrategy
if TYPE_CHECKING:
from dagster._core.execution.execute_in_process_result import ExecuteInProcessResult
from dagster._core.execution.resources_init import InitResourceContext
from dagster._core.instance import DagsterInstance
from dagster._core.snap import PipelineSnapshot
[docs]class JobDefinition(PipelineDefinition):
_cached_partition_set: Optional["PartitionSetDefinition"]
_subset_selection_data: Optional[Union[OpSelectionData, AssetSelectionData]]
input_values: Mapping[str, object]
def __init__(
self,
*,
graph_def: GraphDefinition,
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
executor_def: Optional[ExecutorDefinition] = None,
logger_defs: Optional[Mapping[str, LoggerDefinition]] = None,
name: Optional[str] = None,
config: Optional[Union[ConfigMapping, Mapping[str, object], PartitionedConfig]] = None,
description: Optional[str] = None,
partitions_def: Optional[PartitionsDefinition] = None,
tags: Optional[Mapping[str, Any]] = None,
metadata: Optional[Mapping[str, RawMetadataValue]] = None,
hook_defs: Optional[AbstractSet[HookDefinition]] = None,
op_retry_policy: Optional[RetryPolicy] = None,
version_strategy: Optional[VersionStrategy] = None,
_subset_selection_data: Optional[Union[OpSelectionData, AssetSelectionData]] = None,
asset_layer: Optional[AssetLayer] = None,
input_values: Optional[Mapping[str, object]] = None,
_metadata_entries: Optional[Sequence[Union[MetadataEntry, PartitionMetadataEntry]]] = None,
_executor_def_specified: Optional[bool] = None,
_logger_defs_specified: Optional[bool] = None,
_preset_defs: Optional[Sequence[PresetDefinition]] = None,
):
from dagster._loggers import default_loggers
check.inst_param(graph_def, "graph_def", GraphDefinition)
resource_defs = check.opt_mapping_param(
resource_defs, "resource_defs", key_type=str, value_type=ResourceDefinition
)
# We need to check whether an actual executor/logger def was passed in
# before we set a default executor/logger defs. This is so we can
# determine if someone passed in the default executor vs the system set
# it directly. Once JobDefinition no longer subclasses
# PipelineDefinition, we can change the default executor to be set
# elsewhere to avoid the need for this check.
self._executor_def_specified = (
_executor_def_specified
if _executor_def_specified is not None
else executor_def is not None
)
self._logger_defs_specified = (
_logger_defs_specified
if _logger_defs_specified is not None
else logger_defs is not None
)
executor_def = check.opt_inst_param(
executor_def, "executor_def", ExecutorDefinition, default=multi_or_in_process_executor
)
check.opt_mapping_param(
logger_defs,
"logger_defs",
key_type=str,
value_type=LoggerDefinition,
)
logger_defs = logger_defs or default_loggers()
name = check_valid_name(check.opt_str_param(name, "name", default=graph_def.name))
config = check.opt_inst_param(config, "config", (Mapping, ConfigMapping, PartitionedConfig))
description = check.opt_str_param(description, "description")
partitions_def = check.opt_inst_param(
partitions_def, "partitions_def", PartitionsDefinition
)
tags = check.opt_mapping_param(tags, "tags", key_type=str)
metadata = check.opt_mapping_param(metadata, "metadata", key_type=str)
hook_defs = check.opt_set_param(hook_defs, "hook_defs")
op_retry_policy = check.opt_inst_param(op_retry_policy, "op_retry_policy", RetryPolicy)
version_strategy = check.opt_inst_param(
version_strategy, "version_strategy", VersionStrategy
)
_subset_selection_data = check.opt_inst_param(
_subset_selection_data, "_subset_selection_data", (OpSelectionData, AssetSelectionData)
)
asset_layer = check.opt_inst_param(asset_layer, "asset_layer", AssetLayer)
input_values = check.opt_mapping_param(input_values, "input_values", key_type=str)
_metadata_entries = check.opt_sequence_param(_metadata_entries, "_metadata_entries")
_preset_defs = check.opt_sequence_param(
_preset_defs, "preset_defs", of_type=PresetDefinition
)
if resource_defs and DEFAULT_IO_MANAGER_KEY in resource_defs:
resource_defs_with_defaults = resource_defs
else:
resource_defs_with_defaults = merge_dicts(
{DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs or {}
)
presets = []
config_mapping = None
partitioned_config = None
if partitions_def:
if isinstance(config, (ConfigMapping, PartitionedConfig)):
check.failed(
"Can't supply a ConfigMapping or PartitionedConfig for 'config' when 'partitions_def' is supplied."
)
hardcoded_config = config if config else {}
partitioned_config = PartitionedConfig(partitions_def, lambda _: hardcoded_config)
if isinstance(config, ConfigMapping):
config_mapping = config
elif isinstance(config, PartitionedConfig):
partitioned_config = config
elif isinstance(config, dict):
check.invariant(
len(_preset_defs) == 0,
"Bad state: attempted to pass preset definitions to job alongside config dictionary.",
)
presets = [PresetDefinition(name="default", run_config=config)]
# Using config mapping here is a trick to make it so that the preset will be used even
# when no config is supplied for the job.
config_mapping = _config_mapping_with_default_value(
get_run_config_schema_for_job(
graph_def, resource_defs_with_defaults, executor_def, logger_defs, asset_layer
),
config,
name,
)
elif config is not None:
check.failed(
f"config param must be a ConfigMapping, a PartitionedConfig, or a dictionary, but "
f"is an object of type {type(config)}"
)
# Exists for backcompat - JobDefinition is implemented as a single-mode pipeline.
mode_def = ModeDefinition(
resource_defs=resource_defs_with_defaults,
logger_defs=logger_defs,
executor_defs=[executor_def] if executor_def else None,
_config_mapping=config_mapping,
_partitioned_config=partitioned_config,
)
self._cached_partition_set: Optional["PartitionSetDefinition"] = None
self._subset_selection_data = _subset_selection_data
self.input_values = input_values
for input_name in sorted(list(self.input_values.keys())):
if not graph_def.has_input(input_name):
raise DagsterInvalidDefinitionError(
f"Error when constructing JobDefinition '{name}': Input value provided for key '{input_name}', but job has no top-level input with that name."
)
super(JobDefinition, self).__init__(
name=name,
description=description,
mode_defs=[mode_def],
preset_defs=presets or _preset_defs,
tags=tags,
metadata=metadata,
metadata_entries=_metadata_entries,
hook_defs=hook_defs,
solid_retry_policy=op_retry_policy,
graph_def=graph_def,
version_strategy=version_strategy,
asset_layer=asset_layer,
)
@property
def target_type(self) -> str:
return "job"
@property
def is_job(self) -> bool:
return True
def describe_target(self):
return f"{self.target_type} '{self.name}'"
@public # type: ignore
@property
def executor_def(self) -> ExecutorDefinition:
return self.get_mode_definition().executor_defs[0]
@public # type: ignore
@property
def resource_defs(self) -> Mapping[str, ResourceDefinition]:
return self.get_mode_definition().resource_defs
@public # type: ignore
@property
def partitioned_config(self) -> Optional[PartitionedConfig]:
return self.get_mode_definition().partitioned_config
@public # type: ignore
@property
def config_mapping(self) -> Optional[ConfigMapping]:
return self.get_mode_definition().config_mapping
@public # type: ignore
@property
def loggers(self) -> Mapping[str, LoggerDefinition]:
return self.get_mode_definition().loggers
[docs] @public
def execute_in_process(
self,
run_config: Optional[Mapping[str, Any]] = None,
instance: Optional["DagsterInstance"] = None,
partition_key: Optional[str] = None,
raise_on_error: bool = True,
op_selection: Optional[Sequence[str]] = None,
asset_selection: Optional[Sequence[AssetKey]] = None,
run_id: Optional[str] = None,
input_values: Optional[Mapping[str, object]] = None,
) -> "ExecuteInProcessResult":
"""
Execute the Job in-process, gathering results in-memory.
The `executor_def` on the Job will be ignored, and replaced with the in-process executor.
If using the default `io_manager`, it will switch from filesystem to in-memory.
Args:
run_config (Optional[Mapping[str, Any]]:
The configuration for the run
instance (Optional[DagsterInstance]):
The instance to execute against, an ephemeral one will be used if none provided.
partition_key: (Optional[str])
The string partition key that specifies the run config to execute. Can only be used
to select run config for jobs with partitioned config.
raise_on_error (Optional[bool]): Whether or not to raise exceptions when they occur.
Defaults to ``True``.
op_selection (Optional[Sequence[str]]): A list of op selection queries (including single op
names) to execute. For example:
* ``['some_op']``: selects ``some_op`` itself.
* ``['*some_op']``: select ``some_op`` and all its ancestors (upstream dependencies).
* ``['*some_op+++']``: select ``some_op``, all its ancestors, and its descendants
(downstream dependencies) within 3 levels down.
* ``['*some_op', 'other_op_a', 'other_op_b+']``: select ``some_op`` and all its
ancestors, ``other_op_a`` itself, and ``other_op_b`` and its direct child ops.
input_values (Optional[Mapping[str, Any]]):
A dictionary that maps python objects to the top-level inputs of the job. Input values provided here will override input values that have been provided to the job directly.
Returns:
:py:class:`~dagster.ExecuteInProcessResult`
"""
from dagster._core.definitions.executor_definition import execute_in_process_executor
from dagster._core.execution.execute_in_process import core_execute_in_process
run_config = check.opt_mapping_param(run_config, "run_config")
op_selection = check.opt_sequence_param(op_selection, "op_selection", str)
asset_selection = check.opt_sequence_param(asset_selection, "asset_selection", AssetKey)
check.invariant(
not (op_selection and asset_selection),
"op_selection and asset_selection cannot both be provided as args to execute_in_process",
)
partition_key = check.opt_str_param(partition_key, "partition_key")
input_values = check.opt_mapping_param(input_values, "input_values")
# Combine provided input values at execute_in_process with input values
# provided to the definition. Input values provided at
# execute_in_process will override those provided on the definition.
input_values = merge_dicts(self.input_values, input_values)
resource_defs = dict(self.resource_defs)
logger_defs = dict(self.loggers)
ephemeral_job = JobDefinition(
name=self._name,
graph_def=self._graph_def,
resource_defs=_swap_default_io_man(resource_defs, self),
executor_def=execute_in_process_executor,
logger_defs=logger_defs,
hook_defs=self.hook_defs,
config=self.config_mapping or self.partitioned_config,
tags=self.tags,
op_retry_policy=self._solid_retry_policy,
version_strategy=self.version_strategy,
asset_layer=self.asset_layer,
input_values=input_values,
_executor_def_specified=self._executor_def_specified,
_logger_defs_specified=self._logger_defs_specified,
_preset_defs=self._preset_defs,
)
ephemeral_job = ephemeral_job.get_job_def_for_subset_selection(
op_selection, frozenset(asset_selection) if asset_selection else None
)
tags = None
if partition_key:
if not self.partitioned_config:
check.failed(
f"Provided partition key `{partition_key}` for job `{self._name}` without a partitioned config"
)
check.invariant(
not run_config,
"Cannot provide both run_config and partition_key arguments to `execute_in_process`",
)
partition_set = self.get_partition_set_def()
if not partition_set:
check.failed(
f"Provided partition key `{partition_key}` for job `{self._name}` without a partitioned config"
)
partition = partition_set.get_partition(partition_key)
run_config = partition_set.run_config_for_partition(partition)
tags = partition_set.tags_for_partition(partition)
return core_execute_in_process(
ephemeral_pipeline=ephemeral_job,
run_config=run_config,
instance=instance,
output_capturing_enabled=True,
raise_on_error=raise_on_error,
run_tags=tags,
run_id=run_id,
asset_selection=frozenset(asset_selection),
)
@property
def op_selection_data(self) -> Optional[OpSelectionData]:
return (
self._subset_selection_data
if isinstance(self._subset_selection_data, OpSelectionData)
else None
)
@property
def asset_selection_data(self) -> Optional[AssetSelectionData]:
return (
self._subset_selection_data
if isinstance(self._subset_selection_data, AssetSelectionData)
else None
)
@property
def is_subset_pipeline(self) -> bool:
if self._subset_selection_data:
return True
return False
def get_job_def_for_subset_selection(
self,
op_selection: Optional[Sequence[str]] = None,
asset_selection: Optional[FrozenSet[AssetKey]] = None,
):
check.invariant(
not (op_selection and asset_selection),
"op_selection and asset_selection cannot both be provided as args to execute_in_process",
)
if op_selection:
return self._get_job_def_for_op_selection(op_selection)
if asset_selection: # asset_selection:
return self._get_job_def_for_asset_selection(asset_selection)
else:
return self
def _get_job_def_for_asset_selection(
self,
asset_selection: Optional[FrozenSet[AssetKey]] = None,
) -> "JobDefinition":
asset_selection = check.opt_set_param(asset_selection, "asset_selection", AssetKey)
for asset in asset_selection:
nonexistent_assets = [
asset for asset in asset_selection if asset not in self.asset_layer.asset_keys
]
nonexistent_asset_strings = [
asset_str
for asset_str in (asset.to_string() for asset in nonexistent_assets)
if asset_str
]
if nonexistent_assets:
raise DagsterInvalidSubsetError(
"Assets provided in asset_selection argument "
f"{', '.join(nonexistent_asset_strings)} do not exist in parent asset group or job."
)
asset_selection_data = AssetSelectionData(
asset_selection=asset_selection,
parent_job_def=self,
)
check.invariant(
self.asset_layer.assets_defs_by_key is not None,
"Asset layer must have _asset_defs argument defined",
)
new_job = build_asset_selection_job(
name=self.name,
assets=set(self.asset_layer.assets_defs_by_key.values()),
source_assets=self.asset_layer.source_assets_by_key.values(),
executor_def=self.executor_def,
resource_defs=self.resource_defs,
description=self.description,
tags=self.tags,
asset_selection=asset_selection,
asset_selection_data=asset_selection_data,
config=self.config_mapping,
)
return new_job
def _get_job_def_for_op_selection(
self,
op_selection: Optional[Sequence[str]] = None,
) -> "JobDefinition":
if not op_selection:
return self
op_selection = check.opt_list_param(op_selection, "op_selection", str)
resolved_op_selection_dict = parse_op_selection(self, op_selection)
try:
sub_graph = get_subselected_graph_definition(self.graph, resolved_op_selection_dict)
return JobDefinition(
name=self.name,
description=self.description,
resource_defs=dict(self.resource_defs),
logger_defs=dict(self.loggers),
executor_def=self.executor_def,
config=self.config_mapping or self.partitioned_config,
tags=self.tags,
hook_defs=self.hook_defs,
op_retry_policy=self._solid_retry_policy,
graph_def=sub_graph,
version_strategy=self.version_strategy,
_executor_def_specified=self._executor_def_specified,
_logger_defs_specified=self._logger_defs_specified,
_subset_selection_data=OpSelectionData(
op_selection=op_selection,
resolved_op_selection=set(
resolved_op_selection_dict.keys()
), # equivalent to solids_to_execute. currently only gets top level nodes.
parent_job_def=self, # used by pipeline snapshot lineage
),
# TODO: subset this structure.
# https://github.com/dagster-io/dagster/issues/7541
asset_layer=self.asset_layer,
_preset_defs=self._preset_defs,
)
except DagsterInvalidDefinitionError as exc:
# This handles the case when you construct a subset such that an unsatisfied
# input cannot be loaded from config. Instead of throwing a DagsterInvalidDefinitionError,
# we re-raise a DagsterInvalidSubsetError.
raise DagsterInvalidSubsetError(
f"The attempted subset {str_format_set(resolved_op_selection_dict)} for graph "
f"{self.graph.name} results in an invalid graph."
) from exc
def get_partition_set_def(self) -> Optional["PartitionSetDefinition"]:
mode = self.get_mode_definition()
if not mode.partitioned_config:
return None
if not self._cached_partition_set:
tags_fn = mode.partitioned_config.tags_for_partition_fn
if not tags_fn:
tags_fn = lambda _: {}
self._cached_partition_set = PartitionSetDefinition(
job_name=self.name,
name=f"{self.name}_partition_set",
partitions_def=mode.partitioned_config.partitions_def,
run_config_fn_for_partition=mode.partitioned_config.run_config_for_partition_fn,
tags_fn_for_partition=tags_fn,
mode=mode.name,
)
return self._cached_partition_set
@public # type: ignore
@property
def partitions_def(self) -> Optional[PartitionsDefinition]:
mode = self.get_mode_definition()
if not mode.partitioned_config:
return None
return mode.partitioned_config.partitions_def
@public
def run_request_for_partition(
self,
partition_key: str,
run_key: Optional[str],
tags: Optional[Mapping[str, str]] = None,
) -> RunRequest:
partition_set = self.get_partition_set_def()
if not partition_set:
check.failed("Called run_request_for_partition on a non-partitioned job")
partition = partition_set.get_partition(partition_key)
run_config = partition_set.run_config_for_partition(partition)
run_request_tags = (
{**tags, **partition_set.tags_for_partition(partition)}
if tags
else partition_set.tags_for_partition(partition)
)
return RunRequest(
run_key=run_key, run_config=run_config, tags=run_request_tags, job_name=self.name
)
[docs] @public
def with_hooks(self, hook_defs: AbstractSet[HookDefinition]) -> "JobDefinition":
"""Apply a set of hooks to all op instances within the job."""
hook_defs = check.set_param(hook_defs, "hook_defs", of_type=HookDefinition)
job_def = JobDefinition(
name=self.name,
graph_def=self._graph_def,
resource_defs=dict(self.resource_defs),
logger_defs=dict(self.loggers),
executor_def=self.executor_def,
config=self.partitioned_config or self.config_mapping,
tags=self.tags,
hook_defs=hook_defs | self.hook_defs,
description=self._description,
op_retry_policy=self._solid_retry_policy,
asset_layer=self.asset_layer,
_subset_selection_data=self._subset_selection_data,
_executor_def_specified=self._executor_def_specified,
_logger_defs_specified=self._logger_defs_specified,
_preset_defs=self._preset_defs,
)
update_wrapper(job_def, self, updated=())
return job_def
def get_parent_pipeline_snapshot(self) -> Optional["PipelineSnapshot"]:
if self.op_selection_data:
return self.op_selection_data.parent_job_def.get_pipeline_snapshot()
elif self.asset_selection_data:
return self.asset_selection_data.parent_job_def.get_pipeline_snapshot()
else:
return None
def has_direct_input_value(self, input_name: str) -> bool:
return input_name in self.input_values
def get_direct_input_value(self, input_name: str) -> object:
if input_name not in self.input_values:
raise DagsterInvalidInvocationError(
f"On job '{self.name}', attempted to retrieve input value for input named '{input_name}', but no value was provided. Provided input values: {sorted(list(self.input_values.keys()))}"
)
return self.input_values[input_name]
def with_executor_def(self, executor_def: ExecutorDefinition) -> "JobDefinition":
return JobDefinition(
graph_def=self.graph,
resource_defs=dict(self.resource_defs),
executor_def=executor_def,
logger_defs=dict(self.loggers),
config=self.config_mapping or self.partitioned_config,
name=self.name,
description=self.description,
tags=self.tags,
_metadata_entries=self.metadata,
hook_defs=self.hook_defs,
op_retry_policy=self._solid_retry_policy,
version_strategy=self.version_strategy,
_subset_selection_data=self._subset_selection_data,
asset_layer=self.asset_layer,
input_values=self.input_values,
_executor_def_specified=False,
_logger_defs_specified=self._logger_defs_specified,
_preset_defs=self._preset_defs,
)
def with_logger_defs(self, logger_defs: Mapping[str, LoggerDefinition]) -> "JobDefinition":
return JobDefinition(
graph_def=self.graph,
resource_defs=dict(self.resource_defs),
executor_def=self.executor_def,
logger_defs=logger_defs,
config=self.config_mapping or self.partitioned_config,
name=self.name,
description=self.description,
tags=self.tags,
_metadata_entries=self.metadata,
hook_defs=self.hook_defs,
op_retry_policy=self._solid_retry_policy,
version_strategy=self.version_strategy,
_subset_selection_data=self._subset_selection_data,
asset_layer=self.asset_layer,
input_values=self.input_values,
_executor_def_specified=self._executor_def_specified,
_logger_defs_specified=False,
_preset_defs=self._preset_defs,
)
def _swap_default_io_man(resources: Mapping[str, ResourceDefinition], job: PipelineDefinition):
"""
Used to create the user facing experience of the default io_manager
switching to in-memory when using execute_in_process.
"""
from dagster._core.storage.mem_io_manager import mem_io_manager
if (
# pylint: disable=comparison-with-callable
resources.get(DEFAULT_IO_MANAGER_KEY) in [default_job_io_manager]
and job.version_strategy is None
):
updated_resources = dict(resources)
updated_resources[DEFAULT_IO_MANAGER_KEY] = mem_io_manager
return updated_resources
return resources
def _dep_key_of(node: Node) -> NodeInvocation:
return NodeInvocation(
name=node.definition.name,
alias=node.name,
tags=node.tags,
hook_defs=node.hook_defs,
retry_policy=node.retry_policy,
)
def get_subselected_graph_definition(
graph: GraphDefinition,
resolved_op_selection_dict: Mapping,
parent_handle: Optional[NodeHandle] = None,
) -> SubselectedGraphDefinition:
deps: Dict[
Union[str, NodeInvocation],
Dict[str, IDependencyDefinition],
] = {}
selected_nodes: List[Tuple[str, NodeDefinition]] = []
for node in graph.solids_in_topological_order:
node_handle = NodeHandle(node.name, parent=parent_handle)
# skip if the node isn't selected
if node.name not in resolved_op_selection_dict:
continue
# rebuild graph if any nodes inside the graph are selected
definition: Union[SubselectedGraphDefinition, NodeDefinition]
if node.is_graph and resolved_op_selection_dict[node.name] is not LeafNodeSelection:
definition = get_subselected_graph_definition(
cast(GraphDefinition, node.definition), # guaranteed by node.is_graph
resolved_op_selection_dict[node.name],
parent_handle=node_handle,
)
# use definition if the node as a whole is selected. this includes selecting the entire graph
else:
definition = node.definition
selected_nodes.append((node.name, definition))
# build dependencies for the node. we do it for both cases because nested graphs can have
# inputs and outputs too
deps[_dep_key_of(node)] = {}
for input_handle in node.input_handles():
if graph.dependency_structure.has_direct_dep(input_handle):
output_handle = graph.dependency_structure.get_direct_dep(input_handle)
if output_handle.solid.name in resolved_op_selection_dict:
deps[_dep_key_of(node)][input_handle.input_def.name] = DependencyDefinition(
solid=output_handle.solid.name, output=output_handle.output_def.name
)
elif graph.dependency_structure.has_dynamic_fan_in_dep(input_handle):
output_handle = graph.dependency_structure.get_dynamic_fan_in_dep(input_handle)
if output_handle.solid.name in resolved_op_selection_dict:
deps[_dep_key_of(node)][
input_handle.input_def.name
] = DynamicCollectDependencyDefinition(
solid_name=output_handle.solid.name,
output_name=output_handle.output_def.name,
)
elif graph.dependency_structure.has_fan_in_deps(input_handle):
output_handles = graph.dependency_structure.get_fan_in_deps(input_handle)
multi_dependencies = [
DependencyDefinition(
solid=output_handle.solid.name, output=output_handle.output_def.name
)
for output_handle in output_handles
if (
isinstance(output_handle, SolidOutputHandle)
and output_handle.solid.name in resolved_op_selection_dict
)
]
deps[_dep_key_of(node)][input_handle.input_def.name] = MultiDependencyDefinition(
cast(
List[Union[DependencyDefinition, Type[MappedInputPlaceholder]]],
multi_dependencies,
)
)
# else input is unconnected
# filter out unselected input/output mapping
new_input_mappings = list(
filter(
lambda input_mapping: input_mapping.maps_to.solid_name
in [name for name, _ in selected_nodes],
graph._input_mappings, # pylint: disable=protected-access
)
)
new_output_mappings = list(
filter(
lambda output_mapping: output_mapping.maps_from.solid_name
in [name for name, _ in selected_nodes],
graph._output_mappings, # pylint: disable=protected-access
)
)
return SubselectedGraphDefinition(
parent_graph_def=graph,
dependencies=deps,
node_defs=[definition for _, definition in selected_nodes],
input_mappings=new_input_mappings,
output_mappings=new_output_mappings,
)
def get_direct_input_values_from_job(target: PipelineDefinition) -> Mapping[str, Any]:
if target.is_job:
return cast(JobDefinition, target).input_values # pylint: disable=protected-access
else:
return {}
@io_manager(
description="Built-in filesystem IO manager that stores and retrieves values using pickling."
)
def default_job_io_manager(init_context: "InitResourceContext"):
from dagster._core.storage.fs_io_manager import PickledObjectFilesystemIOManager
instance = check.not_none(init_context.instance)
return PickledObjectFilesystemIOManager(base_dir=instance.storage_directory())
def _config_mapping_with_default_value(
inner_schema: ConfigType,
default_config: Dict[str, Any],
job_name: str,
) -> ConfigMapping:
if not isinstance(inner_schema, Shape):
check.failed("Only Shape (dictionary) config_schema allowed on Job ConfigMapping")
def config_fn(x):
return x
updated_fields = {}
field_aliases = inner_schema.field_aliases
for name, field in inner_schema.fields.items():
if name in default_config:
updated_fields[name] = Field(
config=field.config_type,
default_value=default_config[name],
description=field.description,
)
elif name in field_aliases and field_aliases[name] in default_config:
updated_fields[name] = Field(
config=field.config_type,
default_value=default_config[field_aliases[name]],
description=field.description,
)
else:
updated_fields[name] = field
config_schema = Shape(
fields=updated_fields,
description=(
"This run config schema was automatically populated with default values "
"from `default_config`."
),
field_aliases=inner_schema.field_aliases,
)
config_evr = validate_config(config_schema, default_config)
if not config_evr.success:
raise DagsterInvalidConfigError(
f"Error in config when building job '{job_name}' ",
config_evr.errors,
default_config,
)
return ConfigMapping(
config_fn=config_fn, config_schema=config_schema, receive_processed_config_values=False
)
def get_run_config_schema_for_job(
graph_def: GraphDefinition,
resource_defs: Mapping[str, ResourceDefinition],
executor_def: "ExecutorDefinition",
logger_defs: Mapping[str, LoggerDefinition],
asset_layer: Optional[AssetLayer],
) -> ConfigType:
return (
JobDefinition(
name=graph_def.name,
graph_def=graph_def,
resource_defs=resource_defs,
executor_def=executor_def,
logger_defs=logger_defs,
asset_layer=asset_layer,
)
.get_run_config_schema("default")
.run_config_schema_type
)