Source code for dagster_celery_k8s.launcher

import sys
from typing import cast

import kubernetes
from dagster_k8s.job import (
    DagsterK8sJobConfig,
    construct_dagster_k8s_job,
    get_job_name_from_run_id,
    get_user_defined_k8s_config,
)
from dagster_k8s.utils import delete_job

from dagster import DagsterInvariantViolationError, MetadataEntry
from dagster import _check as check
from dagster._config import process_config, resolve_to_config_type
from dagster._core.events import EngineEventData
from dagster._core.execution.retries import RetryMode
from dagster._core.launcher import LaunchRunContext, RunLauncher
from dagster._core.launcher.base import CheckRunHealthResult, WorkerStatus
from dagster._core.origin import PipelinePythonOrigin
from dagster._core.storage.pipeline_run import PipelineRun, PipelineRunStatus
from dagster._core.storage.tags import DOCKER_IMAGE_TAG
from dagster._serdes import ConfigurableClass, ConfigurableClassData
from dagster._utils import frozentags, merge_dicts
from dagster._utils.error import serializable_error_info_from_exc_info

from .config import CELERY_K8S_CONFIG_KEY, celery_k8s_executor_config


[docs]class CeleryK8sRunLauncher(RunLauncher, ConfigurableClass): """In contrast to the :py:class:`K8sRunLauncher`, which launches dagster runs as single K8s Jobs, this run launcher is intended for use in concert with :py:func:`dagster_celery_k8s.celery_k8s_job_executor`. With this run launcher, execution is delegated to: 1. A run worker Kubernetes Job, which traverses the dagster run execution plan and submits steps to Celery queues for execution; 2. The step executions which are submitted to Celery queues are picked up by Celery workers, and each step execution spawns a step execution Kubernetes Job. See the implementation defined in :py:func:`dagster_celery_k8.executor.create_k8s_job_task`. You can configure a Dagster instance to use this RunLauncher by adding a section to your ``dagster.yaml`` like the following: .. code-block:: yaml run_launcher: module: dagster_k8s.launcher class: CeleryK8sRunLauncher config: instance_config_map: "dagster-k8s-instance-config-map" dagster_home: "/some/path" postgres_password_secret: "dagster-k8s-pg-password" broker: "some_celery_broker_url" backend: "some_celery_backend_url" """ def __init__( self, instance_config_map, dagster_home, postgres_password_secret, load_incluster_config=True, kubeconfig_file=None, broker=None, backend=None, include=None, config_source=None, retries=None, inst_data=None, k8s_client_batch_api=None, env_config_maps=None, env_secrets=None, volume_mounts=None, volumes=None, service_account_name=None, image_pull_policy=None, image_pull_secrets=None, labels=None, fail_pod_on_run_failure=None, ): self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData) if load_incluster_config: check.invariant( kubeconfig_file is None, "`kubeconfig_file` is set but `load_incluster_config` is True.", ) kubernetes.config.load_incluster_config() else: check.opt_str_param(kubeconfig_file, "kubeconfig_file") kubernetes.config.load_kube_config(kubeconfig_file) self._fixed_batch_api = k8s_client_batch_api self.instance_config_map = check.str_param(instance_config_map, "instance_config_map") self.dagster_home = check.str_param(dagster_home, "dagster_home") self.postgres_password_secret = check.str_param( postgres_password_secret, "postgres_password_secret" ) self.broker = check.opt_str_param(broker, "broker") self.backend = check.opt_str_param(backend, "backend") self.include = check.opt_list_param(include, "include") self.config_source = check.opt_dict_param(config_source, "config_source") retries = check.opt_dict_param(retries, "retries") or {"enabled": {}} self.retries = RetryMode.from_config(retries) self._env_config_maps = check.opt_list_param( env_config_maps, "env_config_maps", of_type=str ) self._env_secrets = check.opt_list_param(env_secrets, "env_secrets", of_type=str) self._volume_mounts = check.opt_list_param(volume_mounts, "volume_mounts") self._volumes = check.opt_list_param(volumes, "volumes") self._service_account_name = check.opt_str_param( service_account_name, "service_account_name" ) self._image_pull_policy = check.opt_str_param( image_pull_policy, "image_pull_policy", "IfNotPresent" ) self._image_pull_secrets = check.opt_list_param( image_pull_secrets, "image_pull_secrets", of_type=dict ) self._labels = check.opt_dict_param(labels, "labels", key_type=str, value_type=str) self._fail_pod_on_run_failure = check.opt_bool_param( fail_pod_on_run_failure, "fail_pod_on_run_failure" ) super().__init__() @property def _batch_api(self): return self._fixed_batch_api if self._fixed_batch_api else kubernetes.client.BatchV1Api() @classmethod def config_type(cls): from dagster_celery.executor import CELERY_CONFIG return merge_dicts(DagsterK8sJobConfig.config_type_run_launcher(), CELERY_CONFIG) @classmethod def from_config_value(cls, inst_data, config_value): return cls(inst_data=inst_data, **config_value) @property def inst_data(self): return self._inst_data def launch_run(self, context: LaunchRunContext) -> None: run = context.pipeline_run job_name = get_job_name_from_run_id(run.run_id) pod_name = job_name exc_config = _get_validated_celery_k8s_executor_config(run.run_config) job_image_from_executor_config = exc_config.get("job_image") pipeline_origin = cast(PipelinePythonOrigin, context.pipeline_code_origin) repository_origin = pipeline_origin.repository_origin job_image = repository_origin.container_image if job_image: if job_image_from_executor_config: job_image = job_image_from_executor_config self._instance.report_engine_event( f"You have specified a job_image {job_image_from_executor_config} in your executor configuration, " f"but also {job_image} in your user-code deployment. Using the job image {job_image_from_executor_config} " f"from executor configuration as it takes precedence.", run, cls=self.__class__, ) else: if not job_image_from_executor_config: raise DagsterInvariantViolationError( "You have not specified a job_image in your executor configuration. " "To resolve this error, specify the job_image configuration in the executor " "config section in your run config. \n" "Note: You may also be seeing this error because you are using the configured API. " "Using configured with the celery-k8s executor is not supported at this time, " "and the job_image must be configured at the top-level executor config without " "using configured." ) job_image = job_image_from_executor_config job_config = self.get_k8s_job_config(job_image, exc_config) self._instance.add_run_tags( run.run_id, {DOCKER_IMAGE_TAG: job_config.job_image}, ) user_defined_k8s_config = get_user_defined_k8s_config(frozentags(run.tags)) from dagster._cli.api import ExecuteRunArgs run_args = ExecuteRunArgs( pipeline_origin=pipeline_origin, pipeline_run_id=run.run_id, instance_ref=self._instance.get_ref(), set_exit_code_on_failure=self._fail_pod_on_run_failure, ).get_command_args() job = construct_dagster_k8s_job( job_config, args=run_args, job_name=job_name, pod_name=pod_name, component="run_worker", user_defined_k8s_config=user_defined_k8s_config, labels={ "dagster/job": pipeline_origin.pipeline_name, "dagster/run-id": run.run_id, }, ) job_namespace = exc_config.get("job_namespace") self._instance.report_engine_event( "Creating Kubernetes run worker job", run, EngineEventData( [ MetadataEntry("Kubernetes Job name", value=job_name), MetadataEntry("Kubernetes Namespace", value=job_namespace), MetadataEntry("Run ID", value=run.run_id), ] ), cls=self.__class__, ) self._batch_api.create_namespaced_job(body=job, namespace=job_namespace) self._instance.report_engine_event( "Kubernetes run worker job created", run, EngineEventData( [ MetadataEntry("Kubernetes Job name", value=job_name), MetadataEntry("Kubernetes Namespace", value=job_namespace), MetadataEntry("Run ID", value=run.run_id), ] ), cls=self.__class__, ) def get_k8s_job_config(self, job_image, exc_config): return DagsterK8sJobConfig( dagster_home=self.dagster_home, instance_config_map=self.instance_config_map, postgres_password_secret=self.postgres_password_secret, job_image=check.opt_str_param(job_image, "job_image"), image_pull_policy=exc_config.get("image_pull_policy", self._image_pull_policy), image_pull_secrets=exc_config.get("image_pull_secrets", []) + self._image_pull_secrets, service_account_name=exc_config.get("service_account_name", self._service_account_name), env_config_maps=exc_config.get("env_config_maps", []) + self._env_config_maps, env_secrets=exc_config.get("env_secrets", []) + self._env_secrets, volume_mounts=exc_config.get("volume_mounts", []) + self._volume_mounts, volumes=exc_config.get("volumes", []) + self._volumes, labels=merge_dicts(self._labels, exc_config.get("labels", {})), ) # https://github.com/dagster-io/dagster/issues/2741 def can_terminate(self, run_id): check.str_param(run_id, "run_id") pipeline_run = self._instance.get_run_by_id(run_id) if not pipeline_run: return False if pipeline_run.status != PipelineRunStatus.STARTED: return False return True def terminate(self, run_id): check.str_param(run_id, "run_id") run = self._instance.get_run_by_id(run_id) if not run: return False can_terminate = self.can_terminate(run_id) if not can_terminate: self._instance.report_engine_event( message="Unable to terminate dagster job: can_terminate returned {}.".format( can_terminate ), pipeline_run=run, cls=self.__class__, ) return False job_name = get_job_name_from_run_id(run_id) job_namespace = self.get_namespace_from_run_config(run_id) self._instance.report_run_canceling(run) try: termination_result = delete_job(job_name=job_name, namespace=job_namespace) if termination_result: self._instance.report_engine_event( message="Dagster Job was terminated successfully.", pipeline_run=run, cls=self.__class__, ) else: self._instance.report_engine_event( message="Dagster Job was not terminated successfully; delete_job returned {}".format( termination_result ), pipeline_run=run, cls=self.__class__, ) return termination_result except Exception: self._instance.report_engine_event( message="Dagster Job was not terminated successfully; encountered error in delete_job", pipeline_run=run, engine_event_data=EngineEventData.engine_error( serializable_error_info_from_exc_info(sys.exc_info()) ), cls=self.__class__, ) def get_namespace_from_run_config(self, run_id): check.str_param(run_id, "run_id") pipeline_run = self._instance.get_run_by_id(run_id) run_config = pipeline_run.run_config executor_config = _get_validated_celery_k8s_executor_config(run_config) return executor_config.get("job_namespace") @property def supports_check_run_worker_health(self): return True def check_run_worker_health(self, run: PipelineRun): job_namespace = _get_validated_celery_k8s_executor_config(run.run_config).get( "job_namespace" ) job_name = get_job_name_from_run_id(run.run_id) try: job = self._batch_api.read_namespaced_job(namespace=job_namespace, name=job_name) except Exception: return CheckRunHealthResult( WorkerStatus.UNKNOWN, str(serializable_error_info_from_exc_info(sys.exc_info())) ) if job.status.failed: return CheckRunHealthResult(WorkerStatus.FAILED, "K8s job failed") return CheckRunHealthResult(WorkerStatus.RUNNING)
def _get_validated_celery_k8s_executor_config(run_config): check.dict_param(run_config, "run_config") executor_config = run_config.get("execution", {}) execution_config_schema = resolve_to_config_type(celery_k8s_executor_config()) # In run config on jobs, we don't have an executor key if not CELERY_K8S_CONFIG_KEY in executor_config: execution_run_config = executor_config.get("config", {}) else: execution_run_config = (run_config["execution"][CELERY_K8S_CONFIG_KEY] or {}).get( "config", {} ) res = process_config(execution_config_schema, execution_run_config) check.invariant( res.success, "Incorrect execution schema provided. Note: You may also be seeing this error " "because you are using the configured API. " "Using configured with the {config_key} executor is not supported at this time, " "and all executor config must be directly in the run config without using configured.".format( config_key=CELERY_K8S_CONFIG_KEY, ), ) return res.value