Partitioning allows a job or software-defined asset to correspond to a set of entities with identical structure but different parameters.
A partitioned job is a job where each run corresponds to a partition key. Most commonly, each partition key represents a time window, so, when a job executes, it processes data within one of the time windows.
A partitioned asset is an asset that's composed of a set of partitions, which can be materialized and tracked independently. Most commonly, each partition represents all the records in a data set that fall within a particular time window. Depending on where the asset is stored, each partition might correspond to a file or a slice of a table in a database.
It's common to construct a partitioned job that materializes a particular set of partitioned assets every time it runs.
Having defined a partitioned job or asset, you can:
Name | Description |
---|---|
PartitionsDefinition | Superclass - defines the set of partitions that can be materialized for an asset. |
HourlyPartitionsDefinition | A partitions definition with a partition for each hour. |
DailyPartitionsDefinition | A partitions definition with a partition for each day. |
WeeklyPartitionsDefinition | A partitions definition with a partition for each week. |
MonthlyPartitionsDefinition | A partitions definition with a partition for each month. |
StaticPartitionsDefinition | A partitions definition with a fixed set of partitions. |
A software-defined asset can be assigned a PartitionsDefinition
, which determines the set of partitions that compose it. Once an asset has a set of partitions, you can launch materializations of individual partitions and view the materialization history by partition in Dagit.
For example, below is an asset with a partition for each day since the first day of 2022:
from dagster import DailyPartitionsDefinition, asset
@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def my_daily_partitioned_asset(context):
context.log.info(
f"Processing asset partition '{context.asset_partition_key_for_output()}'"
)
When an asset is unpartitioned, the default IO manager stores it in a file whose location is based on the asset's key. When an asset is partitioned, the default IO manager stores each partition in a separate file, all underneath a directory whose location is based on the asset's key.
To view all partitions for an asset, open the Definition tab of the asset's details page. The bar in the Partitions section represents all of the partitions for the asset.
In the following image, the partitions bar is entirely gray. This is because none of the partitions have been materialized:
When you materialize a partitioned asset, you choose which partitions to materialize, and Dagster will launch a run for each partition.
Note: If you choose more than one partition, the Dagster Daemon needs to be running to queue the multiple runs.
After you've materialized a partition, it will show up as green in the partitions bar.
To view materializations by partition, navigate to the Activity tab:
When a partitioned asset depends on another partitioned asset, each partition in the downstream asset depends on a partition or multiple partitions in the upstream asset.
A few rules govern partition-to-partition dependencies:
When the upstream asset and downstream asset have the same PartitionsDefinition
, each partition in the downstream asset depends on the same partition in the upstream asset.
When the upstream asset and downstream asset are both time window-partitioned, each partition in the downstream asset depends on all partitions in the upstream asset that intersect its time window.
For example, if an asset with a DailyPartitionsDefinition
depends on an asset with an HourlyPartitionsDefinition
, then partition 2022-04-12
of the daily asset the would depend on 24 partitions of the hourly asset: 2022-04-12-00:00
through 2022-04-12-23:00
.
A partitioned asset job is a job that materializes a particular set of partitioned assets every time it runs.
from dagster import (
AssetSelection,
HourlyPartitionsDefinition,
asset,
define_asset_job,
repository,
)
hourly_partitions_def = HourlyPartitionsDefinition(start_date="2022-05-31-00:00")
@asset(partitions_def=hourly_partitions_def)
def asset1():
...
@asset(partitions_def=hourly_partitions_def)
def asset2():
...
partitioned_asset_job = define_asset_job(
name="asset_1_and_2_job",
selection=AssetSelection.assets(asset1, asset2),
partitions_def=hourly_partitions_def,
)
@repository
def repo():
return [asset1, asset2, partitioned_asset_job]
Name | Description |
---|---|
PartitionedConfig | Determines a set of partitions and how to generate run config for a partition. |
@daily_partitioned_config | Decorator for constructing partitioned config where each partition is a date. |
@hourly_partitioned_config | Decorator for constructing partitioned config where each partition is an hour of a date. |
@weekly_partitioned_config | Decorator for constructing partitioned config where each partition is a week. |
@monthly_partitioned_config | Decorator for constructing partitioned config where each partition is a month. |
@static_partitioned_config | Decorator for constructing partitioned config for a static set of partition keys. |
@dynamic_partitioned_config | Decorator for constructing partitioned config for a set of partition keys that can grow over time. |
build_schedule_from_partitioned_job | A function that constructs a schedule whose interval matches the partitioning of a partitioned job. |
When defining a job that doesn't use software-defined assets, you can make it partitioned by supplying PartitionedConfig
object as its config.
The most common kind of partitioned job is a time-partitioned job - each partition is a time window, and each run for a partition processes data within that time window.
Before we define a partitioned job, let's look at a non-partitioned job that computes some data for a given date:
from dagster import job, op
@op(config_schema={"date": str})
def process_data_for_date(context):
date = context.op_config["date"]
context.log.info(f"processing data for {date}")
@job
def do_stuff():
process_data_for_date()
It takes, as config, a string date
. This piece of config defines which date to compute data for. For example, if you wanted to compute for May 5th, 2020, you would execute the graph with the following config:
graph:
process_data_for_date:
config:
date: "2020-05-05"
With the job above, it's possible to supply any value for the date
param, which means that, if you wanted to launch a backfill, Dagster wouldn't know what values to run it on. You can instead build a partitioned job that operates on a defined set of dates.
First, you define the PartitionedConfig
. In this case, because each partition is a date, you can use the @daily_partitioned_config
decorator. It defines the full set of partitions - every date between the start date and the current date, as well as how to determine the run config for a given partition.
from dagster import daily_partitioned_config
from datetime import datetime
@daily_partitioned_config(start_date=datetime(2020, 1, 1))
def my_partitioned_config(start: datetime, _end: datetime):
return {
"ops": {
"process_data_for_date": {"config": {"date": start.strftime("%Y-%m-%d")}}
}
}
Then you can build a job that uses the PartitionedConfig
by supplying it to the config
argument when you construct the job:
@job(config=my_partitioned_config)
def do_stuff_partitioned():
process_data_for_date()
In Dagit, you can view runs by partition in the Partitions tab of a Job page.
In the "Run Matrix", each column corresponds to one of the partitions in the job. The time listed corresponds to the start time of the partition. Each row corresponds to one of the steps in the job. You can click on an individual box to navigate to logs and run information for the step.
You can view and use partitions in the Dagit Launchpad tab for a job. In the top bar, you can select from the list of all available partitions. Within the config editor, the config for the selected partition will be populated.
In the screenshot below, we select the 2020-01-02
partition, and we can see that the run config for the partition has been populated in the editor.
In addition to the @daily_partitioned_config
decorator, Dagster also provides @monthly_partitioned_config
, @weekly_partitioned_config
, @hourly_partitioned_config
. See the API docs for each of these decorators for more information on how partitions are built based on different start_date
, minute_offset
, hour_offset
, and day_offset
inputs.
Not all jobs are partitioned by time. Here's a partitioned job where the partitions are continents:
from dagster import job, op, static_partitioned_config
CONTINENTS = [
"Africa",
"Antarctica",
"Asia",
"Europe",
"North America",
"Oceania",
"South America",
]
@static_partitioned_config(partition_keys=CONTINENTS)
def continent_config(partition_key: str):
return {"ops": {"continent_op": {"config": {"continent_name": partition_key}}}}
@op(config_schema={"continent_name": str})
def continent_op(context):
context.log.info(context.op_config["continent_name"])
@job(config=continent_config)
def continent_job():
continent_op()
It's common that, when you have a partitioned job, you want to run it on a schedule. For example, if your job has a partition for each date, you likely want to run that job every day, on the partition for that day.
The build_schedule_from_partitioned_job
function allows you to construct a schedule from a date partitioned job. It creates a schedule with an interval that matches the spacing of your partition. If you wanted to create a schedule for do_stuff_partitioned
job defined above, you could write:
from dagster import build_schedule_from_partitioned_job, job
@job(config=my_partitioned_config)
def do_stuff_partitioned():
...
do_stuff_partitioned_schedule = build_schedule_from_partitioned_job(
do_stuff_partitioned,
)
Schedules can also be made from static partitioned jobs. If you wanted to make a schedule for the continent_job
above that runs each partition, you could write:
from dagster import schedule
@schedule(cron_schedule="0 0 * * *", job=continent_job)
def continent_schedule():
for c in CONTINENTS:
request = continent_job.run_request_for_partition(partition_key=c, run_key=c)
yield request
Or a schedule that will run a subselection of the partition
@schedule(cron_schedule="0 0 * * *", job=continent_job)
def antarctica_schedule():
request = continent_job.run_request_for_partition(
partition_key="Antarctica", run_key=None
)
yield request
Refer to the Schedules documentation for more info about constructing both schedule types.
Invoking a PartitionedConfig
object will directly invoke the decorated function.
If you want to check whether the generated run config is valid for the config of job, you can use the validate_run_config
function.
from dagster import validate_run_config, daily_partitioned_config
from datetime import datetime
@daily_partitioned_config(start_date=datetime(2020, 1, 1))
def my_partitioned_config(start: datetime, _end: datetime):
return {
"ops": {
"process_data_for_date": {"config": {"date": start.strftime("%Y-%m-%d")}}
}
}
def test_my_partitioned_config():
# assert that the decorated function returns the expected output
run_config = my_partitioned_config(datetime(2020, 1, 3), datetime(2020, 1, 4))
assert run_config == {
"ops": {"process_data_for_date": {"config": {"date": "2020-01-03"}}}
}
# assert that the output of the decorated function is valid configuration for the
# do_stuff_partitioned job
assert validate_run_config(do_stuff_partitioned, run_config)
If you want to test that your PartitionedConfig
creates the partitions you expect, you can use the get_partition_keys
or get_run_config_for_partition_key
functions.
@daily_partitioned_config(start_date=datetime(2020, 1, 1), minute_offset=15)
def my_offset_partitioned_config(start: datetime, _end: datetime):
return {
"ops": {
"process_data": {
"config": {
"start": start.strftime("%Y-%m-%d-%H:%M"),
"end": _end.strftime("%Y-%m-%d-%H:%M"),
}
}
}
}
@op(config_schema={"start": str, "end": str})
def process_data(context):
s = context.op_config["start"]
e = context.op_config["end"]
context.log.info(f"processing data for {s} - {e}")
@job(config=my_offset_partitioned_config)
def do_more_stuff_partitioned():
process_data()
def test_my_offset_partitioned_config():
# test that the partition keys are what you expect
keys = my_offset_partitioned_config.get_partition_keys()
assert keys[0] == "2020-01-01"
assert keys[1] == "2020-01-02"
# test that the run_config for a partition is valid for do_stuff_partitioned
run_config = my_offset_partitioned_config.get_run_config_for_partition_key(keys[0])
assert validate_run_config(do_more_stuff_partitioned, run_config)
# test that the contents of run_config are what you expect
assert run_config == {
"ops": {
"process_data": {
"config": {"start": "2020-01-01-00:15", "end": "2020-01-02-00:15"}
}
}
}
To run a partitioned job in-process on a particular partition, you can supply a value for the partition_key
argument of JobDefinition.execute_in_process
def test_do_stuff_partitioned():
assert do_stuff_partitioned.execute_in_process(partition_key="2020-01-01").success
For more examples of partitions, check out the following in our Hacker News example: