Source code for dagster._utils.dagster_type
from dagster._core.definitions.events import Failure, TypeCheck
from dagster._core.definitions.pipeline_base import InMemoryPipeline
from dagster._core.definitions.pipeline_definition import PipelineDefinition
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.api import create_execution_plan
from dagster._core.execution.context_creation_pipeline import scoped_pipeline_context
from dagster._core.instance import DagsterInstance
from dagster._core.types.dagster_type import resolve_dagster_type
from .typing_api import is_typing_type
[docs]def check_dagster_type(dagster_type, value):
"""Test a custom Dagster type.
Args:
dagster_type (Any): The Dagster type to test. Should be one of the
:ref:`built-in types <builtin>`, a dagster type explicitly constructed with
:py:func:`as_dagster_type`, :py:func:`@usable_as_dagster_type <dagster_type>`, or
:py:func:`PythonObjectDagsterType`, or a Python type.
value (Any): The runtime value to test.
Returns:
TypeCheck: The result of the type check.
Examples:
.. code-block:: python
assert check_dagster_type(Dict[Any, Any], {'foo': 'bar'}).success
"""
if is_typing_type(dagster_type):
raise DagsterInvariantViolationError(
(
"Must pass in a type from dagster module. You passed {dagster_type} "
"which is part of python's typing module."
).format(dagster_type=dagster_type)
)
dagster_type = resolve_dagster_type(dagster_type)
pipeline = InMemoryPipeline(PipelineDefinition([], "empty"))
pipeline_def = pipeline.get_definition()
instance = DagsterInstance.ephemeral()
execution_plan = create_execution_plan(pipeline)
pipeline_run = instance.create_run_for_pipeline(pipeline_def)
with scoped_pipeline_context(execution_plan, pipeline, {}, pipeline_run, instance) as context:
context = context.for_type(dagster_type)
try:
type_check = dagster_type.type_check(context, value)
except Failure as failure:
return TypeCheck(success=False, description=failure.description)
if not isinstance(type_check, TypeCheck):
raise DagsterInvariantViolationError(
"Type checks can only return TypeCheck. Type {type_name} returned {value}.".format(
type_name=dagster_type.display_name, value=repr(type_check)
)
)
return type_check