from functools import update_wrapper
from typing import (
    TYPE_CHECKING,
    AbstractSet,
    Any,
    Dict,
    FrozenSet,
    List,
    Mapping,
    Optional,
    Sequence,
    Tuple,
    Type,
    Union,
    cast,
)
import dagster._check as check
from dagster._annotations import public
from dagster._config import Field, Shape
from dagster._config.config_type import ConfigType
from dagster._config.validate import validate_config
from dagster._core.definitions.composition import MappedInputPlaceholder
from dagster._core.definitions.dependency import (
    DependencyDefinition,
    DynamicCollectDependencyDefinition,
    IDependencyDefinition,
    MultiDependencyDefinition,
    Node,
    NodeHandle,
    NodeInvocation,
    SolidOutputHandle,
)
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.node_definition import NodeDefinition
from dagster._core.definitions.policy import RetryPolicy
from dagster._core.definitions.utils import check_valid_name
from dagster._core.errors import (
    DagsterInvalidConfigError,
    DagsterInvalidDefinitionError,
    DagsterInvalidInvocationError,
    DagsterInvalidSubsetError,
)
from dagster._core.selector.subset_selector import (
    AssetSelectionData,
    LeafNodeSelection,
    OpSelectionData,
    parse_op_selection,
)
from dagster._core.storage.io_manager import io_manager
from dagster._core.utils import str_format_set
from dagster._utils import merge_dicts
from .asset_layer import AssetLayer, build_asset_selection_job
from .config import ConfigMapping
from .dependency import DependencyDefinition
from .executor_definition import ExecutorDefinition, multi_or_in_process_executor
from .graph_definition import GraphDefinition, SubselectedGraphDefinition
from .hook_definition import HookDefinition
from .logger_definition import LoggerDefinition
from .metadata import MetadataEntry, PartitionMetadataEntry, RawMetadataValue
from .mode import ModeDefinition
from .partition import PartitionSetDefinition, PartitionedConfig, PartitionsDefinition
from .pipeline_definition import PipelineDefinition
from .preset import PresetDefinition
from .resource_definition import ResourceDefinition
from .run_request import RunRequest
from .utils import DEFAULT_IO_MANAGER_KEY
from .version_strategy import VersionStrategy
if TYPE_CHECKING:
    from dagster._core.execution.execute_in_process_result import ExecuteInProcessResult
    from dagster._core.execution.resources_init import InitResourceContext
    from dagster._core.instance import DagsterInstance
    from dagster._core.snap import PipelineSnapshot
[docs]class JobDefinition(PipelineDefinition):
    _cached_partition_set: Optional["PartitionSetDefinition"]
    _subset_selection_data: Optional[Union[OpSelectionData, AssetSelectionData]]
    input_values: Mapping[str, object]
    def __init__(
        self,
        *,
        graph_def: GraphDefinition,
        resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
        executor_def: Optional[ExecutorDefinition] = None,
        logger_defs: Optional[Mapping[str, LoggerDefinition]] = None,
        name: Optional[str] = None,
        config: Optional[Union[ConfigMapping, Mapping[str, object], PartitionedConfig]] = None,
        description: Optional[str] = None,
        partitions_def: Optional[PartitionsDefinition] = None,
        tags: Optional[Mapping[str, Any]] = None,
        metadata: Optional[Mapping[str, RawMetadataValue]] = None,
        hook_defs: Optional[AbstractSet[HookDefinition]] = None,
        op_retry_policy: Optional[RetryPolicy] = None,
        version_strategy: Optional[VersionStrategy] = None,
        _subset_selection_data: Optional[Union[OpSelectionData, AssetSelectionData]] = None,
        asset_layer: Optional[AssetLayer] = None,
        input_values: Optional[Mapping[str, object]] = None,
        _metadata_entries: Optional[Sequence[Union[MetadataEntry, PartitionMetadataEntry]]] = None,
        _executor_def_specified: Optional[bool] = None,
        _logger_defs_specified: Optional[bool] = None,
        _preset_defs: Optional[Sequence[PresetDefinition]] = None,
    ):
        from dagster._loggers import default_loggers
        check.inst_param(graph_def, "graph_def", GraphDefinition)
        resource_defs = check.opt_mapping_param(
            resource_defs, "resource_defs", key_type=str, value_type=ResourceDefinition
        )
        # We need to check whether an actual executor/logger def was passed in
        # before we set a default executor/logger defs. This is so we can
        # determine if someone passed in the default executor vs the system set
        # it directly. Once JobDefinition no longer subclasses
        # PipelineDefinition, we can change the default executor to be set
        # elsewhere to avoid the need for this check.
        self._executor_def_specified = (
            _executor_def_specified
            if _executor_def_specified is not None
            else executor_def is not None
        )
        self._logger_defs_specified = (
            _logger_defs_specified
            if _logger_defs_specified is not None
            else logger_defs is not None
        )
        executor_def = check.opt_inst_param(
            executor_def, "executor_def", ExecutorDefinition, default=multi_or_in_process_executor
        )
        check.opt_mapping_param(
            logger_defs,
            "logger_defs",
            key_type=str,
            value_type=LoggerDefinition,
        )
        logger_defs = logger_defs or default_loggers()
        name = check_valid_name(check.opt_str_param(name, "name", default=graph_def.name))
        config = check.opt_inst_param(config, "config", (Mapping, ConfigMapping, PartitionedConfig))
        description = check.opt_str_param(description, "description")
        partitions_def = check.opt_inst_param(
            partitions_def, "partitions_def", PartitionsDefinition
        )
        tags = check.opt_mapping_param(tags, "tags", key_type=str)
        metadata = check.opt_mapping_param(metadata, "metadata", key_type=str)
        hook_defs = check.opt_set_param(hook_defs, "hook_defs")
        op_retry_policy = check.opt_inst_param(op_retry_policy, "op_retry_policy", RetryPolicy)
        version_strategy = check.opt_inst_param(
            version_strategy, "version_strategy", VersionStrategy
        )
        _subset_selection_data = check.opt_inst_param(
            _subset_selection_data, "_subset_selection_data", (OpSelectionData, AssetSelectionData)
        )
        asset_layer = check.opt_inst_param(asset_layer, "asset_layer", AssetLayer)
        input_values = check.opt_mapping_param(input_values, "input_values", key_type=str)
        _metadata_entries = check.opt_sequence_param(_metadata_entries, "_metadata_entries")
        _preset_defs = check.opt_sequence_param(
            _preset_defs, "preset_defs", of_type=PresetDefinition
        )
        if resource_defs and DEFAULT_IO_MANAGER_KEY in resource_defs:
            resource_defs_with_defaults = resource_defs
        else:
            resource_defs_with_defaults = merge_dicts(
                {DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs or {}
            )
        presets = []
        config_mapping = None
        partitioned_config = None
        if partitions_def:
            if isinstance(config, (ConfigMapping, PartitionedConfig)):
                check.failed(
                    "Can't supply a ConfigMapping or PartitionedConfig for 'config' when 'partitions_def' is supplied."
                )
            hardcoded_config = config if config else {}
            partitioned_config = PartitionedConfig(partitions_def, lambda _: hardcoded_config)
        if isinstance(config, ConfigMapping):
            config_mapping = config
        elif isinstance(config, PartitionedConfig):
            partitioned_config = config
        elif isinstance(config, dict):
            check.invariant(
                len(_preset_defs) == 0,
                "Bad state: attempted to pass preset definitions to job alongside config dictionary.",
            )
            presets = [PresetDefinition(name="default", run_config=config)]
            # Using config mapping here is a trick to make it so that the preset will be used even
            # when no config is supplied for the job.
            config_mapping = _config_mapping_with_default_value(
                get_run_config_schema_for_job(
                    graph_def, resource_defs_with_defaults, executor_def, logger_defs, asset_layer
                ),
                config,
                name,
            )
        elif config is not None:
            check.failed(
                f"config param must be a ConfigMapping, a PartitionedConfig, or a dictionary, but "
                f"is an object of type {type(config)}"
            )
        # Exists for backcompat - JobDefinition is implemented as a single-mode pipeline.
        mode_def = ModeDefinition(
            resource_defs=resource_defs_with_defaults,
            logger_defs=logger_defs,
            executor_defs=[executor_def] if executor_def else None,
            _config_mapping=config_mapping,
            _partitioned_config=partitioned_config,
        )
        self._cached_partition_set: Optional["PartitionSetDefinition"] = None
        self._subset_selection_data = _subset_selection_data
        self.input_values = input_values
        for input_name in sorted(list(self.input_values.keys())):
            if not graph_def.has_input(input_name):
                raise DagsterInvalidDefinitionError(
                    f"Error when constructing JobDefinition '{name}': Input value provided for key '{input_name}', but job has no top-level input with that name."
                )
        super(JobDefinition, self).__init__(
            name=name,
            description=description,
            mode_defs=[mode_def],
            preset_defs=presets or _preset_defs,
            tags=tags,
            metadata=metadata,
            metadata_entries=_metadata_entries,
            hook_defs=hook_defs,
            solid_retry_policy=op_retry_policy,
            graph_def=graph_def,
            version_strategy=version_strategy,
            asset_layer=asset_layer,
        )
    @property
    def target_type(self) -> str:
        return "job"
    @property
    def is_job(self) -> bool:
        return True
    def describe_target(self):
        return f"{self.target_type} '{self.name}'"
    @public  # type: ignore
    @property
    def executor_def(self) -> ExecutorDefinition:
        return self.get_mode_definition().executor_defs[0]
    @public  # type: ignore
    @property
    def resource_defs(self) -> Mapping[str, ResourceDefinition]:
        return self.get_mode_definition().resource_defs
    @public  # type: ignore
    @property
    def partitioned_config(self) -> Optional[PartitionedConfig]:
        return self.get_mode_definition().partitioned_config
    @public  # type: ignore
    @property
    def config_mapping(self) -> Optional[ConfigMapping]:
        return self.get_mode_definition().config_mapping
    @public  # type: ignore
    @property
    def loggers(self) -> Mapping[str, LoggerDefinition]:
        return self.get_mode_definition().loggers
[docs]    @public
    def execute_in_process(
        self,
        run_config: Optional[Mapping[str, Any]] = None,
        instance: Optional["DagsterInstance"] = None,
        partition_key: Optional[str] = None,
        raise_on_error: bool = True,
        op_selection: Optional[Sequence[str]] = None,
        asset_selection: Optional[Sequence[AssetKey]] = None,
        run_id: Optional[str] = None,
        input_values: Optional[Mapping[str, object]] = None,
    ) -> "ExecuteInProcessResult":
        """
        Execute the Job in-process, gathering results in-memory.
        The `executor_def` on the Job will be ignored, and replaced with the in-process executor.
        If using the default `io_manager`, it will switch from filesystem to in-memory.
        Args:
            run_config (Optional[Mapping[str, Any]]:
                The configuration for the run
            instance (Optional[DagsterInstance]):
                The instance to execute against, an ephemeral one will be used if none provided.
            partition_key: (Optional[str])
                The string partition key that specifies the run config to execute. Can only be used
                to select run config for jobs with partitioned config.
            raise_on_error (Optional[bool]): Whether or not to raise exceptions when they occur.
                Defaults to ``True``.
            op_selection (Optional[Sequence[str]]): A list of op selection queries (including single op
                names) to execute. For example:
                * ``['some_op']``: selects ``some_op`` itself.
                * ``['*some_op']``: select ``some_op`` and all its ancestors (upstream dependencies).
                * ``['*some_op+++']``: select ``some_op``, all its ancestors, and its descendants
                (downstream dependencies) within 3 levels down.
                * ``['*some_op', 'other_op_a', 'other_op_b+']``: select ``some_op`` and all its
                ancestors, ``other_op_a`` itself, and ``other_op_b`` and its direct child ops.
            input_values (Optional[Mapping[str, Any]]):
                A dictionary that maps python objects to the top-level inputs of the job. Input values provided here will override input values that have been provided to the job directly.
        Returns:
            :py:class:`~dagster.ExecuteInProcessResult`
        """
        from dagster._core.definitions.executor_definition import execute_in_process_executor
        from dagster._core.execution.execute_in_process import core_execute_in_process
        run_config = check.opt_mapping_param(run_config, "run_config")
        op_selection = check.opt_sequence_param(op_selection, "op_selection", str)
        asset_selection = check.opt_sequence_param(asset_selection, "asset_selection", AssetKey)
        check.invariant(
            not (op_selection and asset_selection),
            "op_selection and asset_selection cannot both be provided as args to execute_in_process",
        )
        partition_key = check.opt_str_param(partition_key, "partition_key")
        input_values = check.opt_mapping_param(input_values, "input_values")
        # Combine provided input values at execute_in_process with input values
        # provided to the definition. Input values provided at
        # execute_in_process will override those provided on the definition.
        input_values = merge_dicts(self.input_values, input_values)
        resource_defs = dict(self.resource_defs)
        logger_defs = dict(self.loggers)
        ephemeral_job = JobDefinition(
            name=self._name,
            graph_def=self._graph_def,
            resource_defs=_swap_default_io_man(resource_defs, self),
            executor_def=execute_in_process_executor,
            logger_defs=logger_defs,
            hook_defs=self.hook_defs,
            config=self.config_mapping or self.partitioned_config,
            tags=self.tags,
            op_retry_policy=self._solid_retry_policy,
            version_strategy=self.version_strategy,
            asset_layer=self.asset_layer,
            input_values=input_values,
            _executor_def_specified=self._executor_def_specified,
            _logger_defs_specified=self._logger_defs_specified,
            _preset_defs=self._preset_defs,
        )
        ephemeral_job = ephemeral_job.get_job_def_for_subset_selection(
            op_selection, frozenset(asset_selection) if asset_selection else None
        )
        tags = None
        if partition_key:
            if not self.partitioned_config:
                check.failed(
                    f"Provided partition key `{partition_key}` for job `{self._name}` without a partitioned config"
                )
            check.invariant(
                not run_config,
                "Cannot provide both run_config and partition_key arguments to `execute_in_process`",
            )
            partition_set = self.get_partition_set_def()
            if not partition_set:
                check.failed(
                    f"Provided partition key `{partition_key}` for job `{self._name}` without a partitioned config"
                )
            partition = partition_set.get_partition(partition_key)
            run_config = partition_set.run_config_for_partition(partition)
            tags = partition_set.tags_for_partition(partition)
        return core_execute_in_process(
            ephemeral_pipeline=ephemeral_job,
            run_config=run_config,
            instance=instance,
            output_capturing_enabled=True,
            raise_on_error=raise_on_error,
            run_tags=tags,
            run_id=run_id,
            asset_selection=frozenset(asset_selection),
        ) 
    @property
    def op_selection_data(self) -> Optional[OpSelectionData]:
        return (
            self._subset_selection_data
            if isinstance(self._subset_selection_data, OpSelectionData)
            else None
        )
    @property
    def asset_selection_data(self) -> Optional[AssetSelectionData]:
        return (
            self._subset_selection_data
            if isinstance(self._subset_selection_data, AssetSelectionData)
            else None
        )
    @property
    def is_subset_pipeline(self) -> bool:
        if self._subset_selection_data:
            return True
        return False
    def get_job_def_for_subset_selection(
        self,
        op_selection: Optional[Sequence[str]] = None,
        asset_selection: Optional[FrozenSet[AssetKey]] = None,
    ):
        check.invariant(
            not (op_selection and asset_selection),
            "op_selection and asset_selection cannot both be provided as args to execute_in_process",
        )
        if op_selection:
            return self._get_job_def_for_op_selection(op_selection)
        if asset_selection:  # asset_selection:
            return self._get_job_def_for_asset_selection(asset_selection)
        else:
            return self
    def _get_job_def_for_asset_selection(
        self,
        asset_selection: Optional[FrozenSet[AssetKey]] = None,
    ) -> "JobDefinition":
        asset_selection = check.opt_set_param(asset_selection, "asset_selection", AssetKey)
        for asset in asset_selection:
            nonexistent_assets = [
                asset for asset in asset_selection if asset not in self.asset_layer.asset_keys
            ]
            nonexistent_asset_strings = [
                asset_str
                for asset_str in (asset.to_string() for asset in nonexistent_assets)
                if asset_str
            ]
            if nonexistent_assets:
                raise DagsterInvalidSubsetError(
                    "Assets provided in asset_selection argument "
                    f"{', '.join(nonexistent_asset_strings)} do not exist in parent asset group or job."
                )
        asset_selection_data = AssetSelectionData(
            asset_selection=asset_selection,
            parent_job_def=self,
        )
        check.invariant(
            self.asset_layer.assets_defs_by_key is not None,
            "Asset layer must have _asset_defs argument defined",
        )
        new_job = build_asset_selection_job(
            name=self.name,
            assets=set(self.asset_layer.assets_defs_by_key.values()),
            source_assets=self.asset_layer.source_assets_by_key.values(),
            executor_def=self.executor_def,
            resource_defs=self.resource_defs,
            description=self.description,
            tags=self.tags,
            asset_selection=asset_selection,
            asset_selection_data=asset_selection_data,
            config=self.config_mapping,
        )
        return new_job
    def _get_job_def_for_op_selection(
        self,
        op_selection: Optional[Sequence[str]] = None,
    ) -> "JobDefinition":
        if not op_selection:
            return self
        op_selection = check.opt_list_param(op_selection, "op_selection", str)
        resolved_op_selection_dict = parse_op_selection(self, op_selection)
        try:
            sub_graph = get_subselected_graph_definition(self.graph, resolved_op_selection_dict)
            return JobDefinition(
                name=self.name,
                description=self.description,
                resource_defs=dict(self.resource_defs),
                logger_defs=dict(self.loggers),
                executor_def=self.executor_def,
                config=self.config_mapping or self.partitioned_config,
                tags=self.tags,
                hook_defs=self.hook_defs,
                op_retry_policy=self._solid_retry_policy,
                graph_def=sub_graph,
                version_strategy=self.version_strategy,
                _executor_def_specified=self._executor_def_specified,
                _logger_defs_specified=self._logger_defs_specified,
                _subset_selection_data=OpSelectionData(
                    op_selection=op_selection,
                    resolved_op_selection=set(
                        resolved_op_selection_dict.keys()
                    ),  # equivalent to solids_to_execute. currently only gets top level nodes.
                    parent_job_def=self,  # used by pipeline snapshot lineage
                ),
                # TODO: subset this structure.
                # https://github.com/dagster-io/dagster/issues/7541
                asset_layer=self.asset_layer,
                _preset_defs=self._preset_defs,
            )
        except DagsterInvalidDefinitionError as exc:
            # This handles the case when you construct a subset such that an unsatisfied
            # input cannot be loaded from config. Instead of throwing a DagsterInvalidDefinitionError,
            # we re-raise a DagsterInvalidSubsetError.
            raise DagsterInvalidSubsetError(
                f"The attempted subset {str_format_set(resolved_op_selection_dict)} for graph "
                f"{self.graph.name} results in an invalid graph."
            ) from exc
    def get_partition_set_def(self) -> Optional["PartitionSetDefinition"]:
        mode = self.get_mode_definition()
        if not mode.partitioned_config:
            return None
        if not self._cached_partition_set:
            tags_fn = mode.partitioned_config.tags_for_partition_fn
            if not tags_fn:
                tags_fn = lambda _: {}
            self._cached_partition_set = PartitionSetDefinition(
                job_name=self.name,
                name=f"{self.name}_partition_set",
                partitions_def=mode.partitioned_config.partitions_def,
                run_config_fn_for_partition=mode.partitioned_config.run_config_for_partition_fn,
                tags_fn_for_partition=tags_fn,
                mode=mode.name,
            )
        return self._cached_partition_set
    @public  # type: ignore
    @property
    def partitions_def(self) -> Optional[PartitionsDefinition]:
        mode = self.get_mode_definition()
        if not mode.partitioned_config:
            return None
        return mode.partitioned_config.partitions_def
    @public
    def run_request_for_partition(
        self,
        partition_key: str,
        run_key: Optional[str],
        tags: Optional[Mapping[str, str]] = None,
    ) -> RunRequest:
        partition_set = self.get_partition_set_def()
        if not partition_set:
            check.failed("Called run_request_for_partition on a non-partitioned job")
        partition = partition_set.get_partition(partition_key)
        run_config = partition_set.run_config_for_partition(partition)
        run_request_tags = (
            {**tags, **partition_set.tags_for_partition(partition)}
            if tags
            else partition_set.tags_for_partition(partition)
        )
        return RunRequest(
            run_key=run_key, run_config=run_config, tags=run_request_tags, job_name=self.name
        )
[docs]    @public
    def with_hooks(self, hook_defs: AbstractSet[HookDefinition]) -> "JobDefinition":
        """Apply a set of hooks to all op instances within the job."""
        hook_defs = check.set_param(hook_defs, "hook_defs", of_type=HookDefinition)
        job_def = JobDefinition(
            name=self.name,
            graph_def=self._graph_def,
            resource_defs=dict(self.resource_defs),
            logger_defs=dict(self.loggers),
            executor_def=self.executor_def,
            config=self.partitioned_config or self.config_mapping,
            tags=self.tags,
            hook_defs=hook_defs | self.hook_defs,
            description=self._description,
            op_retry_policy=self._solid_retry_policy,
            asset_layer=self.asset_layer,
            _subset_selection_data=self._subset_selection_data,
            _executor_def_specified=self._executor_def_specified,
            _logger_defs_specified=self._logger_defs_specified,
            _preset_defs=self._preset_defs,
        )
        update_wrapper(job_def, self, updated=())
        return job_def 
    def get_parent_pipeline_snapshot(self) -> Optional["PipelineSnapshot"]:
        if self.op_selection_data:
            return self.op_selection_data.parent_job_def.get_pipeline_snapshot()
        elif self.asset_selection_data:
            return self.asset_selection_data.parent_job_def.get_pipeline_snapshot()
        else:
            return None
    def has_direct_input_value(self, input_name: str) -> bool:
        return input_name in self.input_values
    def get_direct_input_value(self, input_name: str) -> object:
        if input_name not in self.input_values:
            raise DagsterInvalidInvocationError(
                f"On job '{self.name}', attempted to retrieve input value for input named '{input_name}', but no value was provided. Provided input values: {sorted(list(self.input_values.keys()))}"
            )
        return self.input_values[input_name]
    def with_executor_def(self, executor_def: ExecutorDefinition) -> "JobDefinition":
        return JobDefinition(
            graph_def=self.graph,
            resource_defs=dict(self.resource_defs),
            executor_def=executor_def,
            logger_defs=dict(self.loggers),
            config=self.config_mapping or self.partitioned_config,
            name=self.name,
            description=self.description,
            tags=self.tags,
            _metadata_entries=self.metadata,
            hook_defs=self.hook_defs,
            op_retry_policy=self._solid_retry_policy,
            version_strategy=self.version_strategy,
            _subset_selection_data=self._subset_selection_data,
            asset_layer=self.asset_layer,
            input_values=self.input_values,
            _executor_def_specified=False,
            _logger_defs_specified=self._logger_defs_specified,
            _preset_defs=self._preset_defs,
        )
    def with_logger_defs(self, logger_defs: Mapping[str, LoggerDefinition]) -> "JobDefinition":
        return JobDefinition(
            graph_def=self.graph,
            resource_defs=dict(self.resource_defs),
            executor_def=self.executor_def,
            logger_defs=logger_defs,
            config=self.config_mapping or self.partitioned_config,
            name=self.name,
            description=self.description,
            tags=self.tags,
            _metadata_entries=self.metadata,
            hook_defs=self.hook_defs,
            op_retry_policy=self._solid_retry_policy,
            version_strategy=self.version_strategy,
            _subset_selection_data=self._subset_selection_data,
            asset_layer=self.asset_layer,
            input_values=self.input_values,
            _executor_def_specified=self._executor_def_specified,
            _logger_defs_specified=False,
            _preset_defs=self._preset_defs,
        ) 
def _swap_default_io_man(resources: Mapping[str, ResourceDefinition], job: PipelineDefinition):
    """
    Used to create the user facing experience of the default io_manager
    switching to in-memory when using execute_in_process.
    """
    from dagster._core.storage.mem_io_manager import mem_io_manager
    if (
        # pylint: disable=comparison-with-callable
        resources.get(DEFAULT_IO_MANAGER_KEY) in [default_job_io_manager]
        and job.version_strategy is None
    ):
        updated_resources = dict(resources)
        updated_resources[DEFAULT_IO_MANAGER_KEY] = mem_io_manager
        return updated_resources
    return resources
def _dep_key_of(node: Node) -> NodeInvocation:
    return NodeInvocation(
        name=node.definition.name,
        alias=node.name,
        tags=node.tags,
        hook_defs=node.hook_defs,
        retry_policy=node.retry_policy,
    )
def get_subselected_graph_definition(
    graph: GraphDefinition,
    resolved_op_selection_dict: Mapping,
    parent_handle: Optional[NodeHandle] = None,
) -> SubselectedGraphDefinition:
    deps: Dict[
        Union[str, NodeInvocation],
        Dict[str, IDependencyDefinition],
    ] = {}
    selected_nodes: List[Tuple[str, NodeDefinition]] = []
    for node in graph.solids_in_topological_order:
        node_handle = NodeHandle(node.name, parent=parent_handle)
        # skip if the node isn't selected
        if node.name not in resolved_op_selection_dict:
            continue
        # rebuild graph if any nodes inside the graph are selected
        definition: Union[SubselectedGraphDefinition, NodeDefinition]
        if node.is_graph and resolved_op_selection_dict[node.name] is not LeafNodeSelection:
            definition = get_subselected_graph_definition(
                cast(GraphDefinition, node.definition),  # guaranteed by node.is_graph
                resolved_op_selection_dict[node.name],
                parent_handle=node_handle,
            )
        # use definition if the node as a whole is selected. this includes selecting the entire graph
        else:
            definition = node.definition
        selected_nodes.append((node.name, definition))
        # build dependencies for the node. we do it for both cases because nested graphs can have
        # inputs and outputs too
        deps[_dep_key_of(node)] = {}
        for input_handle in node.input_handles():
            if graph.dependency_structure.has_direct_dep(input_handle):
                output_handle = graph.dependency_structure.get_direct_dep(input_handle)
                if output_handle.solid.name in resolved_op_selection_dict:
                    deps[_dep_key_of(node)][input_handle.input_def.name] = DependencyDefinition(
                        solid=output_handle.solid.name, output=output_handle.output_def.name
                    )
            elif graph.dependency_structure.has_dynamic_fan_in_dep(input_handle):
                output_handle = graph.dependency_structure.get_dynamic_fan_in_dep(input_handle)
                if output_handle.solid.name in resolved_op_selection_dict:
                    deps[_dep_key_of(node)][
                        input_handle.input_def.name
                    ] = DynamicCollectDependencyDefinition(
                        solid_name=output_handle.solid.name,
                        output_name=output_handle.output_def.name,
                    )
            elif graph.dependency_structure.has_fan_in_deps(input_handle):
                output_handles = graph.dependency_structure.get_fan_in_deps(input_handle)
                multi_dependencies = [
                    DependencyDefinition(
                        solid=output_handle.solid.name, output=output_handle.output_def.name
                    )
                    for output_handle in output_handles
                    if (
                        isinstance(output_handle, SolidOutputHandle)
                        and output_handle.solid.name in resolved_op_selection_dict
                    )
                ]
                deps[_dep_key_of(node)][input_handle.input_def.name] = MultiDependencyDefinition(
                    cast(
                        List[Union[DependencyDefinition, Type[MappedInputPlaceholder]]],
                        multi_dependencies,
                    )
                )
            # else input is unconnected
    # filter out unselected input/output mapping
    new_input_mappings = list(
        filter(
            lambda input_mapping: input_mapping.maps_to.solid_name
            in [name for name, _ in selected_nodes],
            graph._input_mappings,  # pylint: disable=protected-access
        )
    )
    new_output_mappings = list(
        filter(
            lambda output_mapping: output_mapping.maps_from.solid_name
            in [name for name, _ in selected_nodes],
            graph._output_mappings,  # pylint: disable=protected-access
        )
    )
    return SubselectedGraphDefinition(
        parent_graph_def=graph,
        dependencies=deps,
        node_defs=[definition for _, definition in selected_nodes],
        input_mappings=new_input_mappings,
        output_mappings=new_output_mappings,
    )
def get_direct_input_values_from_job(target: PipelineDefinition) -> Mapping[str, Any]:
    if target.is_job:
        return cast(JobDefinition, target).input_values  # pylint: disable=protected-access
    else:
        return {}
@io_manager(
    description="Built-in filesystem IO manager that stores and retrieves values using pickling."
)
def default_job_io_manager(init_context: "InitResourceContext"):
    from dagster._core.storage.fs_io_manager import PickledObjectFilesystemIOManager
    instance = check.not_none(init_context.instance)
    return PickledObjectFilesystemIOManager(base_dir=instance.storage_directory())
def _config_mapping_with_default_value(
    inner_schema: ConfigType,
    default_config: Dict[str, Any],
    job_name: str,
) -> ConfigMapping:
    if not isinstance(inner_schema, Shape):
        check.failed("Only Shape (dictionary) config_schema allowed on Job ConfigMapping")
    def config_fn(x):
        return x
    updated_fields = {}
    field_aliases = inner_schema.field_aliases
    for name, field in inner_schema.fields.items():
        if name in default_config:
            updated_fields[name] = Field(
                config=field.config_type,
                default_value=default_config[name],
                description=field.description,
            )
        elif name in field_aliases and field_aliases[name] in default_config:
            updated_fields[name] = Field(
                config=field.config_type,
                default_value=default_config[field_aliases[name]],
                description=field.description,
            )
        else:
            updated_fields[name] = field
    config_schema = Shape(
        fields=updated_fields,
        description=(
            "This run config schema was automatically populated with default values "
            "from `default_config`."
        ),
        field_aliases=inner_schema.field_aliases,
    )
    config_evr = validate_config(config_schema, default_config)
    if not config_evr.success:
        raise DagsterInvalidConfigError(
            f"Error in config when building job '{job_name}' ",
            config_evr.errors,
            default_config,
        )
    return ConfigMapping(
        config_fn=config_fn, config_schema=config_schema, receive_processed_config_values=False
    )
def get_run_config_schema_for_job(
    graph_def: GraphDefinition,
    resource_defs: Mapping[str, ResourceDefinition],
    executor_def: "ExecutorDefinition",
    logger_defs: Mapping[str, LoggerDefinition],
    asset_layer: Optional[AssetLayer],
) -> ConfigType:
    return (
        JobDefinition(
            name=graph_def.name,
            graph_def=graph_def,
            resource_defs=resource_defs,
            executor_def=executor_def,
            logger_defs=logger_defs,
            asset_layer=asset_layer,
        )
        .get_run_config_schema("default")
        .run_config_schema_type
    )