Source code for dagster._serdes.config_class

import importlib
from abc import ABC, abstractmethod
from typing import Any, Dict, NamedTuple

import dagster._check as check
from dagster._utils import convert_dagster_submodule_name
from dagster._utils.yaml_utils import load_run_config_yaml

from .serdes import DefaultNamedTupleSerializer, WhitelistMap, whitelist_for_serdes


class ConfigurableClassDataSerializer(DefaultNamedTupleSerializer):
    @classmethod
    def value_to_storage_dict(
        cls,
        value: NamedTuple,
        whitelist_map: WhitelistMap,
        descent_path: str,
    ) -> Dict[str, Any]:
        dct = super().value_to_storage_dict(value, whitelist_map, descent_path)
        dct["module_name"] = convert_dagster_submodule_name(value.module_name, "public")  # type: ignore
        return dct


[docs]@whitelist_for_serdes(serializer=ConfigurableClassDataSerializer) class ConfigurableClassData( NamedTuple( "_ConfigurableClassData", [ ("module_name", str), ("class_name", str), ("config_yaml", str), ], ) ): """Serializable tuple describing where to find a class and the config fragment that should be used to instantiate it. Users should not instantiate this class directly. Classes intended to be serialized in this way should implement the :py:class:`dagster.serdes.ConfigurableClass` mixin. """ def __new__(cls, module_name, class_name, config_yaml): return super(ConfigurableClassData, cls).__new__( cls, convert_dagster_submodule_name(check.str_param(module_name, "module_name"), "private"), check.str_param(class_name, "class_name"), check.str_param(config_yaml, "config_yaml"), ) @property def config_dict(self): return load_run_config_yaml(self.config_yaml) def info_dict(self): return { "module": self.module_name, "class": self.class_name, "config": self.config_dict, } def rehydrate(self): from dagster._config import process_config, resolve_to_config_type from dagster._core.errors import DagsterInvalidConfigError try: module = importlib.import_module(self.module_name) except ModuleNotFoundError: check.failed( f"Couldn't import module {self.module_name} when attempting to load the " f"configurable class {self.module_name}.{self.class_name}" ) try: klass = getattr(module, self.class_name) except AttributeError: check.failed( f"Couldn't find class {self.class_name} in module when attempting to load the " f"configurable class {self.module_name}.{self.class_name}" ) if not issubclass(klass, ConfigurableClass): raise check.CheckError( klass, f"class {self.class_name} in module {self.module_name}", ConfigurableClass, ) config_dict = self.config_dict result = process_config(resolve_to_config_type(klass.config_type()), config_dict) if not result.success: raise DagsterInvalidConfigError( f"Errors whilst loading configuration for {klass.config_type()}.", result.errors, config_dict, ) return klass.from_config_value(self, result.value)
[docs]class ConfigurableClass(ABC): """Abstract mixin for classes that can be loaded from config. This supports a powerful plugin pattern which avoids both a) a lengthy, hard-to-synchronize list of conditional imports / optional extras_requires in dagster core and b) a magic directory or file in which third parties can place plugin packages. Instead, the intention is to make, e.g., run storage, pluggable with a config chunk like: .. code-block:: yaml run_storage: module: very_cool_package.run_storage class: SplendidRunStorage config: magic_word: "quux" This same pattern should eventually be viable for other system components, e.g. engines. The ``ConfigurableClass`` mixin provides the necessary hooks for classes to be instantiated from an instance of ``ConfigurableClassData``. Pieces of the Dagster system which we wish to make pluggable in this way should consume a config type such as: .. code-block:: python {'module': str, 'class': str, 'config': Field(Permissive())} """ @property @abstractmethod def inst_data(self): """ Subclass must be able to return the inst_data as a property if it has been constructed through the from_config_value code path. """ @classmethod @abstractmethod def config_type(cls): """dagster.ConfigType: The config type against which to validate a config yaml fragment serialized in an instance of ``ConfigurableClassData``. """ raise NotImplementedError(f"{cls.__name__} must implement the config_type classmethod") @staticmethod @abstractmethod def from_config_value(inst_data, config_value): """New up an instance of the ConfigurableClass from a validated config value. Called by ConfigurableClassData.rehydrate. Args: config_value (dict): The validated config value to use. Typically this should be the ``value`` attribute of a :py:class:`~dagster._core.types.evaluator.evaluation.EvaluateValueResult`. A common pattern is for the implementation to align the config_value with the signature of the ConfigurableClass's constructor: .. code-block:: python @staticmethod def from_config_value(inst_data, config_value): return MyConfigurableClass(inst_data=inst_data, **config_value) """ raise NotImplementedError( "ConfigurableClass subclasses must implement the from_config_value staticmethod" )
def class_from_code_pointer(module_name, class_name): try: module = importlib.import_module(module_name) except ModuleNotFoundError: check.failed( "Couldn't import module {module_name} when attempting to load the " "class {klass}".format( module_name=module_name, klass=module_name + "." + class_name, ) ) try: return getattr(module, class_name) except AttributeError: check.failed( "Couldn't find class {class_name} in module when attempting to load the " "class {klass}".format( class_name=class_name, klass=module_name + "." + class_name, ) )