The dagster-airflow
package allows you to export Dagster jobs as Airflow DAGs, as well as to import Airflow DAGs into Dagster jobs.
Dagster is a fully-featured orchestrator and does not require a system like Airflow to deploy, execute, or schedule jobs. The main scenarios for using Dagster with Airflow are:
You can compile Dagster jobs into DAGs that can be understood by Airflow. Each op in the job becomes an Airflow task. For example, here's a Dagster job:
import csv
import requests
from dagster import job, op
@op
def download_cereals():
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
return [row for row in csv.DictReader(lines)]
@op
def find_sugariest(context, cereals):
sorted_by_sugar = sorted(cereals, key=lambda cereal: cereal["sugars"])
context.log.info(f'{sorted_by_sugar[-1]["name"]} is the sugariest cereal')
@job
def hello_cereal_job():
find_sugariest(download_cereals())
To make this job available inside Airflow, you can write an Airflow DAG definition file that invokes make_airflow_dag
:
# pylint: disable=unused-variable
import datetime
from dagster_airflow.factory import make_airflow_dag
DEFAULT_ARGS = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime.datetime(2019, 11, 7),
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
}
dag, tasks = make_airflow_dag(
module_name="docs_snippets.integrations.airflow.hello_cereal",
job_name="hello_cereal_job",
dag_kwargs={"default_args": DEFAULT_ARGS, "max_active_runs": 1},
)
If you run this code interactively, you'll see that dag
and tasks
are ordinary Airflow objects, just as you'd expect to see when defining an Airflow pipeline manually:
>>> dag
<DAG: hello_cereal_job>
>>> tasks
[<Task(DagsterPythonOperator): hello_cereal>]
Like other Airflow DAG definition files, this should go inside $AIRLFLOW_HOME/dags
. The docs_snippets.integrations.airflow.hello_cereal
module that's passed as the value for the module_name
argument must be importable via the sys.path.
After this, the DAG should show up inside Airflow:
The approach above runs each op inside an operator that's similar to the Airflow PythonOperator. If you instead want to containerize your Dagster job and run it using an operator that's similar to the Airflow DockerOperator, you can use make_airflow_dag_containerized
.
As in the uncontainerized case, you'll put a new Python file defining your DAG in the directory in which Airflow looks for DAGs:
# pylint: disable=unused-variable
import datetime
from dagster_airflow.factory import make_airflow_dag_containerized
DEFAULT_ARGS = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime.datetime(2019, 11, 7),
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
}
dag, steps = make_airflow_dag_containerized(
module_name="docs_snippets.integrations.airflow.hello_cereal",
job_name="hello_cereal_job",
image="dagster-airflow-demo-repository",
dag_kwargs={"default_args": DEFAULT_ARGS, "max_active_runs": 1},
)
The image
argument is the name of the Docker image. Running in a containerized context requires a persistent intermediate storage layer available to the Dagster containers, such as a network filesystem, S3, or GCS. You can pass op_kwargs
through to the the DagsterDockerOperator
to use custom TLS settings, the private registry of your choice, etc., just as you would configure the ordinary Airflow DockerOperator
.
If you want your containerized job to be available to Airflow operators running on other machines (for example, in environments where Airflow workers are running remotely) you'll need to push your Docker image to a Docker registry so that remote instances of Docker can pull the image by name, or otherwise ensure that the image is available on remote nodes.
Dagster can convert Airflow operators to Dagster ops for any Airflow operators that require no instance-wide configuration via airflow.cfg
. Using airflow_operator_to_op
, you can convert instantiated Airflow operators to be executed within a Dagster op. Optionally, you can pass in a list of Airflow connection objects utilized by the operator:
http_task = SimpleHttpOperator(task_id="http_task", method="GET", endpoint="images")
connections = [Connection(conn_id="http_default", host="https://google.com")]
dagster_op = airflow_operator_to_op(http_task, connections=connections)
@job
def my_http_job():
dagster_op()
To specify extra parameters to an instantiated Airflow connection, pass a JSON string into the set_extra()
function of the Airflow connection object:
s3_conn = Connection(conn_id="s3_conn", conn_type="s3")
s3_conn.set_extra(
json.dumps(
{
"aws_access_key_id": "my_access_key",
"aws_secret_access_key": "my_secret_key",
}
)
)
This example demonstrates how to use make_dagster_job_from_airflow_dag
to compile an Airflow DAG into a Dagster job that can be executed (and explored) the same way as a Dagster-native job.
There are two jobs in the repo:
airflow_simple_dag
demonstrates the use of Airflow templates.airflow_complex_dag
shows the translation of a more complex dependency structure.from dagster_airflow.dagster_job_factory import make_dagster_job_from_airflow_dag
from with_airflow.airflow_complex_dag import complex_dag
from with_airflow.airflow_simple_dag import simple_dag
from dagster import repository
airflow_simple_dag = make_dagster_job_from_airflow_dag(simple_dag)
airflow_complex_dag = make_dagster_job_from_airflow_dag(complex_dag)
@repository
def with_airflow():
return [airflow_complex_dag, airflow_simple_dag]
Note that the "execution_date" for the Airflow DAG is specified through the job tags. To specify tags, call to:
airflow_simple_dag_with_execution_date = make_dagster_job_from_airflow_dag(
dag=simple_dag, tags={"airflow_execution_date": "2021-11-01 00:00:00"}
)