The dagster_databricks
package provides two main pieces of functionality:
A resource, databricks_pyspark_step_launcher
, which will execute a op within a Databricks
context on a cluster, such that the pyspark
resource uses the cluster’s Spark instance.
A function, create_databricks_job_op
, which creates a op that submits an external
configurable job to Databricks using the ‘Run Now’ API.
Note that, for the databricks_pyspark_step_launcher
, either S3 or Azure Data Lake Storage config
must be specified for ops to succeed, and the credentials for this storage must also be
stored as a Databricks Secret and stored in the resource config so that the Databricks cluster can
access storage.
Creates an op that launches a databricks job (not to be confused with Dagster’s job API).
As config, the op accepts a blob of the form described in Databricks’ job API: https://docs.databricks.com/dev-tools/api/latest/jobs.html.
An op definition.
Example
from dagster import graph
from dagster_databricks import create_databricks_job_op, databricks_client
sparkpi = create_databricks_job_op().configured(
{
"job": {
"name": "SparkPi Python job",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
"spark_python_task": {"python_file": "dbfs:/docs/pi.py", "parameters": ["10"]},
}
},
name="sparkpi",
)
@graph
def my_spark():
sparkpi()
my_spark.to_job(
resource_defs={
"databricks_client": databricks_client.configured(
{"host": "my.workspace.url", "token": "my.access.token"}
)
}
)
Databricks job run configuration
The minimum number of workers to which the cluster can scale down when underutilized. It is also the initial number of workers the cluster will have after creation.
The maximum number of workers to which the cluster can scale up when overloaded. max_workers must be strictly greater than min_workers.
If num_workers, number of worker nodes that this cluster should have. A cluster has one Spark Driver and num_workers Executors for a total of num_workers + 1 Spark nodes.
The Spark version of the cluster. A list of available Spark versions can be retrieved by using the Runtime versions API call. This field is required.
An object containing a set of optional, user-specified Spark configuration key-value pairs. You can also pass in a string of extra JVM options to the driver and the executors via spark.driver.extraJavaOptions and spark.executor.extraJavaOptions respectively. Example Spark confs: {“spark.speculation”: true, “spark.streaming.ui.retainedBatches”: 5} or {“spark.driver.extraJavaOptions”: “-verbose:gc -XX:+PrintGCDetails”}
The nodes used in the cluster. Either the node types or an instance pool can be specified.
This field encodes, through a single value, the resources available to each of the Spark nodes in this cluster. For example, the Spark nodes can be provisioned and optimized for memory or compute intensive workloads. A list of available node types can be retrieved by using the List node types API call. This field is required.
The node type of the Spark driver. This field is optional; if unset, the driver node type is set as the same value as node_type_id defined above.
The optional ID of the instance pool to which the cluster belongs. Refer to the Instance Pools API for details.
Attributes related to clusters running on Amazon Web Services. If not specified at cluster creation, a set of default values is used. See aws_attributes at https://docs.databricks.com/dev-tools/api/latest/clusters.html.
The first first_on_demand nodes of the cluster will be placed on on-demand instances. If this value is greater than 0, the cluster driver node will be placed on an on-demand instance. If this value is greater than or equal to the current cluster size, all nodes will be placed on on-demand instances. If this value is less than the current cluster size, first_on_demand nodes will be placed on on-demand instances and the remainder will be placed on availability instances. This value does not affect cluster size and cannot be mutated over the lifetime of a cluster.
Availability type used for all subsequent nodes past the first_on_demand ones. Note: If first_on_demand is zero, this availability type will be used for the entire cluster.
Identifier for the availability zone/datacenter in which the cluster resides.
Nodes for this cluster will only be placed on AWS instances with this instance profile.
The max price for AWS spot instances, as a percentage of the corresponding instance type’s on-demand price.
The type of EBS volumes that will be launched with this cluster.
The number of volumes launched for each instance. You can choose up to 10 volumes.
The size of each EBS volume (in GiB) launched for each instance.
The number of IOPS per EBS gp3 volume.
The throughput per EBS gp3 volume, in MiB per second.
SSH public key contents that will be added to each Spark node in this cluster. The corresponding private keys can be used to login with the user name ubuntu on port 2200. Up to 10 keys can be specified.
Additional tags for cluster resources. Databricks tags all cluster resources (e.g., AWS instances and EBS volumes) with these tags in addition to default_tags. Note: - Tags are not supported on legacy node types such as compute-optimized and memory-optimized - Databricks allows at most 45 custom tagsMore restrictions may apply if using Azure Databricks; refer to the official docs for further details.
Recommended! The configuration for delivering Spark logs to a long-term storage destination. Only one destination can be specified for one cluster. If the conf is given, the logs will be delivered to the destination every 5 mins. The destination of driver logs is <destination>/<cluster-id>/driver, while the destination of executor logs is <destination>/<cluster-id>/executor.
DBFS storage information
DBFS destination, e.g. dbfs:/my/path
S3 storage information
S3 destination, e.g. s3://my-bucket/some-prefix. You must configure the cluster with an instance profile and the instance profile must have write access to the destination. You cannot use AWS keys.
S3 region, e.g. us-west-2. Either region or endpoint must be set. If both are set, endpoint is used.
S3 endpoint, e.g. https://s3-us-west-2.amazonaws.com. Either region or endpoint must be set. If both are set, endpoint is used.
(Optional) Enable server side encryption, false by default.
(Optional) The encryption type, it could be sse-s3 or sse-kms. It is used only when encryption is enabled and the default type is sse-s3.
(Optional) KMS key used if encryption is enabled and encryption type is set to sse-kms.
(Optional) Set canned access control list, e.g. bucket-owner-full-control.If canned_acl is set, the cluster instance profile must have s3:PutObjectAcl permission on the destination bucket and prefix. The full list of possible canned ACLs can be found at https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl. By default only the object owner gets full control. If you are using cross account role for writing data, you may want to set bucket-owner-full-control to make bucket owner able to read the logs.
The configuration for storing init scripts. Any number of scripts can be specified. The scripts are executed sequentially in the order provided. If cluster_log_conf is specified, init script logs are sent to <destination>/<cluster-id>/init_scripts.
An object containing a set of optional, user-specified environment variable key-value pairs. Key-value pair of the form (X,Y) are exported as is (i.e., export X=”Y”) while launching the driver and workers. To specify an additional set of SPARK_DAEMON_JAVA_OPTS, we recommend appending them to $SPARK_DAEMON_JAVA_OPTS as shown in the example below. This ensures that all default Databricks managed environmental variables are included as well. Example Spark environment variables: {“SPARK_WORKER_MEMORY”: “28000m”, “SPARK_LOCAL_DIRS”: “/local_disk0”} or {“SPARK_DAEMON_JAVA_OPTS”: “$SPARK_DAEMON_JAVA_OPTS -Dspark.shuffle.service.enabled=true”}
Autoscaling Local Storage: when enabled, this cluster dynamically acquires attitional disk space when its Spark workers are running low on disk space. This feature requires specific AWS permissions to function correctly - refer to https://docs.databricks.com/clusters/configure.html#autoscaling-local-storage for details.
The ID of an existing cluster that will be used for all runs of this job. When running jobs on an existing cluster, you may need to manually restart the cluster if it stops responding. Databricks suggests running jobs on new clusters for greater reliability.
An optional name for the run. The default value is Untitled
An optional list of libraries to be installed on the cluster that will execute the job. By default dagster, dagster-databricks and dagster-pyspark libraries will be included.
An optional timeout applied to each run of this job. The default behavior is to have no timeout.
An optional token that can be used to guarantee the idempotency of job run requests.If an active run with the provided token already exists, the request will not create a new run, but will return the ID of the existing run instead. If you specify the idempotency token, upon failure you can retry until the request succeeds. Databricks guarantees that exactly one run will be launched with that idempotency token. This token should have at most 64 characters.
{}
job permission spec; ref: https://docs.databricks.com/security/access-control/jobs-acl.html#job-permissions
cluster permission spec; ref: https://docs.databricks.com/security/access-control/cluster-acl.html#cluster-level-permissions
Databricks host, e.g. uksouth.azuredatabricks.com
Databricks access token
Dictionary of arbitrary environment variables to be set on the databricks cluster.
Databricks secrets to be exported as environment variables. Since runs will execute in the Databricks runtime environment, environment variables (such as those required for a StringSource config variable) will not be accessible to Dagster. These variables must be stored as Databricks secrets and specified here, which will ensure they are re-exported as environment variables accessible to Dagster upon execution.
Databricks storage configuration for either S3 or ADLS2. If access credentials for your Databricks storage are stored in Databricks secrets, this config indicates the secret scope and the secret keys used to access either S3 or ADLS2.
S3 storage secret configuration
The Databricks secret scope containing the storage secrets.
The key of a Databricks secret containing the S3 access key ID.
The key of a Databricks secret containing the S3 secret access key.
ADLS2 storage secret configuration
The Databricks secret scope containing the storage secrets.
The name of the storage account used to access data.
The key of a Databricks secret containing the storage account secret key.
Absolute path to root python package containing your Dagster code. If you set this value to a directory lower than the root package, and have user relative imports in your code (e.g. from .foo import bar), it’s likely you’ll encounter an import error on the remote step. Before every step run, the launcher will zip up the code in this local path, upload it to DBFS, and unzip it into the Python path of the remote Spark process. This gives the remote process access to up-to-date user code.
Absolute path to root python package containing your Dagster code. If you set this value to a directory lower than the root package, and have user relative imports in your code (e.g. from .foo import bar), it’s likely you’ll encounter an import error on the remote step. Before every step run, the launcher will zip up the code in this local path, upload it to DBFS, and unzip it into the Python path of the remote Spark process. This gives the remote process access to up-to-date user code.
Directory in DBFS to use for uploaded job code. Must be absolute.
Default Value: ‘/dagster_staging’
If set, and if the specified cluster is configured to export logs, the system will wait after job completion for the logs to appear in the configured location. Note that logs are copied every 5 minutes, so enabling this will add several minutes to the job runtime. NOTE: this integration will export stdout/stderrfrom the remote Databricks process automatically, so this option is not generally necessary.
Default Value: False
If the Databricks job run takes more than this many seconds, then consider it failed and terminate the step.
Default Value: 86400
How frequently Dagster will poll Databricks to determine the state of the job.
Default Value: 5.0
Resource for running ops as a Databricks Job.
When this resource is used, the op will be executed in Databricks using the ‘Run Submit’ API. Pipeline code will be zipped up and copied to a directory in DBFS along with the op’s execution context.
Use the ‘run_config’ configuration to specify the details of the Databricks cluster used, and the ‘storage’ key to configure persistent storage on that cluster. Storage is accessed by setting the credentials in the Spark context, as documented here for S3 and here for ADLS.