Source code for dagster_azure.adls2.resources
from azure.storage.filedatalake import DataLakeLeaseClient
from dagster_azure.blob.utils import create_blob_client
from dagster import Field, Selector, StringSource, resource
from dagster._utils.merger import merge_dicts
from .file_manager import ADLS2FileManager
from .utils import create_adls2_client
ADLS2_CLIENT_CONFIG = {
"storage_account": Field(StringSource, description="The storage account name."),
"credential": Field(
Selector(
{
"sas": Field(StringSource, description="SAS token for the account."),
"key": Field(StringSource, description="Shared Access Key for the account"),
}
),
description="The credentials with which to authenticate.",
),
}
[docs]@resource(ADLS2_CLIENT_CONFIG)
def adls2_resource(context):
"""Resource that gives ops access to Azure Data Lake Storage Gen2.
The underlying client is a :py:class:`~azure.storage.filedatalake.DataLakeServiceClient`.
Attach this resource definition to a :py:class:`~dagster.JobDefinition` in order to make it
available to your ops.
Example:
.. code-block:: python
from dagster import job, op
from dagster_azure.adls2 import adls2_resource
@op(required_resource_keys={'adls2'})
def example_adls2_op(context):
return list(context.resources.adls2.adls2_client.list_file_systems())
@job(resource_defs={"adls2": adls2_resource})
def my_job():
example_adls2_op()
Note that your ops must also declare that they require this resource with
`required_resource_keys`, or it will not be initialized for the execution of their compute
functions.
You may pass credentials to this resource using either a SAS token or a key, using
environment variables if desired:
.. code-block:: YAML
resources:
adls2:
config:
storage_account: my_storage_account
# str: The storage account name.
credential:
sas: my_sas_token
# str: the SAS token for the account.
key:
env: AZURE_DATA_LAKE_STORAGE_KEY
# str: The shared access key for the account.
"""
return _adls2_resource_from_config(context.resource_config)
[docs]@resource(
merge_dicts(
ADLS2_CLIENT_CONFIG,
{
"adls2_file_system": Field(StringSource, description="ADLS Gen2 file system name"),
"adls2_prefix": Field(StringSource, is_required=False, default_value="dagster"),
},
)
)
def adls2_file_manager(context):
"""FileManager that provides abstract access to ADLS2.
Implements the :py:class:`~dagster._core.storage.file_manager.FileManager` API.
"""
adls2_client = _adls2_resource_from_config(context.resource_config).adls2_client
return ADLS2FileManager(
adls2_client=adls2_client,
file_system=context.resource_config["adls2_file_system"],
prefix=context.resource_config["adls2_prefix"],
)
class ADLS2Resource:
"""Resource containing clients to access Azure Data Lake Storage Gen2.
Contains a client for both the Data Lake and Blob APIs, to work around the limitations
of each.
"""
def __init__(self, storage_account, credential):
self._adls2_client = create_adls2_client(storage_account, credential)
self._blob_client = create_blob_client(storage_account, credential)
self._lease_client_constructor = DataLakeLeaseClient
@property
def adls2_client(self):
return self._adls2_client
@property
def blob_client(self):
return self._blob_client
@property
def lease_client_constructor(self):
return self._lease_client_constructor
def _adls2_resource_from_config(config):
"""
Args:
config: A configuration containing the fields in ADLS2_CLIENT_CONFIG.
Returns: An adls2 client.
"""
storage_account = config["storage_account"]
credential = config["credential"].copy().popitem()[1]
return ADLS2Resource(storage_account, credential)