Source code for dagster_fivetran.asset_defs

from typing import List, Optional

from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL
from dagster_fivetran.utils import generate_materializations

from dagster import AssetKey, AssetOut, AssetsDefinition, Output
from dagster import _check as check
from dagster import multi_asset
from dagster._annotations import experimental


[docs]@experimental def build_fivetran_assets( connector_id: str, destination_tables: List[str], poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, io_manager_key: Optional[str] = None, asset_key_prefix: Optional[List[str]] = None, ) -> List[AssetsDefinition]: """ Build a set of assets for a given Fivetran connector. Returns an AssetsDefintion which connects the specified ``asset_keys`` to the computation that will update them. Internally, executes a Fivetran sync for a given ``connector_id``, and polls until that sync completes, raising an error if it is unsuccessful. Requires the use of the :py:class:`~dagster_fivetran.fivetran_resource`, which allows it to communicate with the Fivetran API. Args: connector_id (str): The Fivetran Connector ID that this op will sync. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. destination_tables (List[str]): `schema_name.table_name` for each table that you want to be represented in the Dagster asset graph for this connection. poll_interval (float): The time (in seconds) that will be waited between successive polls. poll_timeout (Optional[float]): The maximum time that will waited before this operation is timed out. By default, this will never time out. io_manager_key (Optional[str]): The io_manager to be used to handle each of these assets. asset_key_prefix (Optional[List[str]]): A prefix for the asset keys inside this asset. If left blank, assets will have a key of `AssetKey([schema_name, table_name])`. Examples: .. code-block:: python from dagster import AssetKey, repository, with_resources from dagster_fivetran import fivetran_resource from dagster_fivetran.assets import build_fivetran_assets my_fivetran_resource = fivetran_resource.configured( { "api_key": {"env": "FIVETRAN_API_KEY"}, "api_secret": {"env": "FIVETRAN_API_SECRET"}, } ) fivetran_assets = build_fivetran_assets( connector_id="foobar", table_names=["schema1.table1", "schema2.table2"], ]) @repository def repo(): return with_resources( fivetran_assets, resource_defs={"fivetran": my_fivetran_resource}, ) """ asset_key_prefix = check.opt_list_param(asset_key_prefix, "asset_key_prefix", of_type=str) tracked_asset_keys = { AssetKey(asset_key_prefix + table.split(".")) for table in destination_tables } @multi_asset( name=f"fivetran_sync_{connector_id}", outs={ "_".join(key.path): AssetOut(io_manager_key=io_manager_key, key=key) for key in tracked_asset_keys }, required_resource_keys={"fivetran"}, compute_kind="fivetran", ) def _assets(context): fivetran_output = context.resources.fivetran.sync_and_poll( connector_id=connector_id, poll_interval=poll_interval, poll_timeout=poll_timeout, ) for materialization in generate_materializations( fivetran_output, asset_key_prefix=asset_key_prefix ): # scan through all tables actually created, if it was expected then emit an Output. # otherwise, emit a runtime AssetMaterialization if materialization.asset_key in tracked_asset_keys: yield Output( value=None, output_name="_".join(materialization.asset_key.path), metadata={ entry.label: entry.entry_data for entry in materialization.metadata_entries }, ) else: yield materialization return [_assets]