import json
import os
import docker.client
from dagster_celery.config import DEFAULT_CONFIG, dict_wrapper
from dagster_celery.core_execution_loop import DELEGATE_MARKER, core_celery_execution_loop
from dagster_celery.defaults import broker_url, result_backend
from dagster_celery.executor import CELERY_CONFIG
from dagster import DagsterInstance, Executor, Field, MetadataEntry, Permissive, StringSource
from dagster import _check as check
from dagster import executor, multiple_process_executor_requirements
from dagster._cli.api import ExecuteStepArgs
from dagster._core.events import EngineEventData
from dagster._core.events.utils import filter_dagster_events_from_cli_logs
from dagster._core.execution.retries import RetryMode
from dagster._core.storage.pipeline_run import PipelineRun
from dagster._serdes import pack_value, serialize_dagster_namedtuple, unpack_value
from dagster._utils import merge_dicts
CELERY_DOCKER_CONFIG_KEY = "celery-docker"
def celery_docker_config():
additional_config = {
"docker": Field(
{
"image": Field(
StringSource,
is_required=False,
description="The docker image to be used for step execution.",
),
"registry": Field(
{
"url": Field(StringSource),
"username": Field(StringSource),
"password": Field(StringSource),
},
is_required=False,
description="Information for using a non local/public docker registry",
),
"env_vars": Field(
[str],
is_required=False,
description="The list of environment variables names to forward from the celery worker in to the docker container",
),
"network": Field(
str,
is_required=False,
description="Name of the network this container will be connected to at creation time",
),
"container_kwargs": Field(
Permissive(),
is_required=False,
description="Additional keyword args for the docker container",
),
},
is_required=True,
description="The configuration for interacting with docker in the celery worker.",
),
}
cfg = merge_dicts(CELERY_CONFIG, additional_config)
return cfg
[docs]@executor(
name=CELERY_DOCKER_CONFIG_KEY,
config_schema=celery_docker_config(),
requirements=multiple_process_executor_requirements(),
)
def celery_docker_executor(init_context):
"""Celery-based executor which launches tasks in docker containers.
The Celery executor exposes config settings for the underlying Celery app under
the ``config_source`` key. This config corresponds to the "new lowercase settings" introduced
in Celery version 4.0 and the object constructed from config will be passed to the
:py:class:`celery.Celery` constructor as its ``config_source`` argument.
(See https://docs.celeryq.dev/en/stable/userguide/configuration.html for details.)
The executor also exposes the ``broker``, `backend`, and ``include`` arguments to the
:py:class:`celery.Celery` constructor.
In the most common case, you may want to modify the ``broker`` and ``backend`` (e.g., to use
Redis instead of RabbitMQ). We expect that ``config_source`` will be less frequently
modified, but that when op executions are especially fast or slow, or when there are
different requirements around idempotence or retry, it may make sense to execute jobs
with variations on these settings.
To use the `celery_docker_executor`, set it as the `executor_def` when defining a job:
.. code-block:: python
from dagster import job
from dagster_celery_docker.executor import celery_docker_executor
@job(executor_def=celery_docker_executor)
def celery_enabled_job():
pass
Then you can configure the executor as follows:
.. code-block:: YAML
execution:
config:
docker:
image: 'my_repo.com/image_name:latest'
registry:
url: 'my_repo.com'
username: 'my_user'
password: {env: 'DOCKER_PASSWORD'}
env_vars: ["DAGSTER_HOME"] # environment vars to pass from celery worker to docker
container_kwargs: # keyword args to be passed to the container. example:
volumes: ['/home/user1/:/mnt/vol2','/var/www:/mnt/vol1']
broker: 'pyamqp://guest@localhost//' # Optional[str]: The URL of the Celery broker
backend: 'rpc://' # Optional[str]: The URL of the Celery results backend
include: ['my_module'] # Optional[List[str]]: Modules every worker should import
config_source: # Dict[str, Any]: Any additional parameters to pass to the
#... # Celery workers. This dict will be passed as the `config_source`
#... # argument of celery.Celery().
Note that the YAML you provide here must align with the configuration with which the Celery
workers on which you hope to run were started. If, for example, you point the executor at a
different broker than the one your workers are listening to, the workers will never be able to
pick up tasks for execution.
In deployments where the celery_docker_job_executor is used all appropriate celery and dagster_celery
commands must be invoked with the `-A dagster_celery_docker.app` argument.
"""
exc_cfg = init_context.executor_config
return CeleryDockerExecutor(
broker=exc_cfg.get("broker"),
backend=exc_cfg.get("backend"),
config_source=exc_cfg.get("config_source"),
include=exc_cfg.get("include"),
retries=RetryMode.from_config(exc_cfg.get("retries")),
docker_config=exc_cfg.get("docker"),
)
class CeleryDockerExecutor(Executor):
def __init__(
self,
retries,
docker_config,
broker=None,
backend=None,
include=None,
config_source=None,
):
self._retries = check.inst_param(retries, "retries", RetryMode)
self.broker = check.opt_str_param(broker, "broker", default=broker_url)
self.backend = check.opt_str_param(backend, "backend", default=result_backend)
self.include = check.opt_list_param(include, "include", of_type=str)
self.config_source = dict_wrapper(
dict(DEFAULT_CONFIG, **check.opt_dict_param(config_source, "config_source"))
)
self.docker_config = check.dict_param(docker_config, "docker_config")
@property
def retries(self):
return self._retries
def execute(self, plan_context, execution_plan):
return core_celery_execution_loop(
plan_context, execution_plan, step_execution_fn=_submit_task_docker
)
def app_args(self):
return {
"broker": self.broker,
"backend": self.backend,
"include": self.include,
"config_source": self.config_source,
"retries": self.retries,
}
def _submit_task_docker(app, plan_context, step, queue, priority, known_state):
execute_step_args = ExecuteStepArgs(
pipeline_origin=plan_context.reconstructable_pipeline.get_python_origin(),
pipeline_run_id=plan_context.pipeline_run.run_id,
step_keys_to_execute=[step.key],
instance_ref=plan_context.instance.get_ref(),
retry_mode=plan_context.executor.retries.for_inner_plan(),
known_state=known_state,
)
task = create_docker_task(app)
task_signature = task.si(
execute_step_args_packed=pack_value(execute_step_args),
docker_config=plan_context.executor.docker_config,
)
return task_signature.apply_async(
priority=priority,
queue=queue,
routing_key="{queue}.execute_step_docker".format(queue=queue),
)
def create_docker_task(celery_app, **task_kwargs):
@celery_app.task(bind=True, name="execute_step_docker", **task_kwargs)
def _execute_step_docker(
self,
execute_step_args_packed,
docker_config,
):
"""Run step execution in a Docker container."""
execute_step_args = unpack_value(
check.dict_param(
execute_step_args_packed,
"execute_step_args_packed",
)
)
check.inst_param(execute_step_args, "execute_step_args", ExecuteStepArgs)
check.dict_param(docker_config, "docker_config")
instance = DagsterInstance.from_ref(execute_step_args.instance_ref)
pipeline_run = instance.get_run_by_id(execute_step_args.pipeline_run_id)
check.inst(
pipeline_run,
PipelineRun,
"Could not load run {}".format(execute_step_args.pipeline_run_id),
)
step_keys_str = ", ".join(execute_step_args.step_keys_to_execute)
input_json = serialize_dagster_namedtuple(execute_step_args)
command = "dagster api execute_step {}".format(json.dumps(input_json))
docker_image = (
docker_config["image"]
if docker_config.get("image")
else pipeline_run.pipeline_code_origin.repository_origin.container_image
)
if not docker_image:
raise Exception("No docker image specified by either the job or the repository")
client = docker.client.from_env()
if docker_config.get("registry"):
client.login(
registry=docker_config["registry"]["url"],
username=docker_config["registry"]["username"],
password=docker_config["registry"]["password"],
)
# Post event for starting execution
engine_event = instance.report_engine_event(
"Executing steps {} in Docker container {}".format(step_keys_str, docker_image),
pipeline_run,
EngineEventData(
[
MetadataEntry("Step keys", value=step_keys_str),
MetadataEntry("Image", value=docker_image),
MetadataEntry("Celery worker", value=self.request.hostname),
],
marker_end=DELEGATE_MARKER,
),
CeleryDockerExecutor,
step_key=execute_step_args.step_keys_to_execute[0],
)
serialized_events = [serialize_dagster_namedtuple(engine_event)]
docker_env = {}
if docker_config.get("env_vars"):
docker_env = {env_name: os.getenv(env_name) for env_name in docker_config["env_vars"]}
container_kwargs = check.opt_dict_param(
docker_config.get("container_kwargs"), "container_kwargs", key_type=str
)
# set defaults for detach and auto_remove
container_kwargs["detach"] = container_kwargs.get("detach", False)
container_kwargs["auto_remove"] = container_kwargs.get("auto_remove", True)
# if environment variables are provided via container_kwargs, merge with env_vars
if container_kwargs.get("environment") is not None:
e_vars = container_kwargs.get("environment")
if isinstance(e_vars, dict):
docker_env.update(e_vars)
else:
for v in e_vars:
key, val = v.split("=")
docker_env[key] = val
del container_kwargs["environment"]
try:
docker_response = client.containers.run(
docker_image,
command=command,
# pass through this worker's environment for things like AWS creds etc.
environment=docker_env,
network=docker_config.get("network", None),
**container_kwargs,
)
res = docker_response.decode("utf-8")
except docker.errors.ContainerError as err:
entries = [MetadataEntry("Job image", value=docker_image)]
if err.stderr is not None:
entries.append(MetadataEntry("Docker stderr", value=err.stderr))
instance.report_engine_event(
"Failed to run steps {} in Docker container {}".format(step_keys_str, docker_image),
pipeline_run,
EngineEventData(entries),
CeleryDockerExecutor,
step_key=execute_step_args.step_keys_to_execute[0],
)
raise
else:
if res is None:
raise Exception("No response from execute_step in CeleryDockerExecutor")
events = filter_dagster_events_from_cli_logs(res.split("\n"))
serialized_events += [serialize_dagster_namedtuple(event) for event in events]
return serialized_events
return _execute_step_docker