from datetime import datetime, time
from typing import Any, Callable, Dict, Iterable, List, NamedTuple, Optional, Sequence, Union, cast
import pendulum
import dagster._check as check
from dagster._annotations import PublicAttr, public
from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE
from dagster._utils.schedules import schedule_execution_time_iterator
from .partition import (
DEFAULT_DATE_FORMAT,
Partition,
PartitionedConfig,
PartitionsDefinition,
ScheduleType,
get_cron_schedule,
)
from .partition_key_range import PartitionKeyRange
class TimeWindow(NamedTuple):
"""An interval that is closed at the start and open at the end"""
start: PublicAttr[datetime]
end: PublicAttr[datetime]
[docs]class TimeWindowPartitionsDefinition(
PartitionsDefinition[TimeWindow], # pylint: disable=unsubscriptable-object
NamedTuple(
"_TimeWindowPartitions",
[
("schedule_type", PublicAttr[ScheduleType]),
("start", PublicAttr[datetime]),
("timezone", PublicAttr[str]),
("fmt", PublicAttr[str]),
("end_offset", PublicAttr[int]),
("minute_offset", PublicAttr[int]),
("hour_offset", PublicAttr[int]),
("day_offset", PublicAttr[Optional[int]]),
],
),
):
def __new__( # pylint: disable=arguments-differ
cls,
schedule_type: ScheduleType,
start: Union[datetime, str],
timezone: Optional[str],
fmt: str,
end_offset: int,
minute_offset: int = 0,
hour_offset: int = 0,
day_offset: Optional[int] = None,
):
if isinstance(start, str):
start_dt = datetime.strptime(start, fmt)
else:
start_dt = start
return super(TimeWindowPartitionsDefinition, cls).__new__(
cls,
schedule_type,
start_dt,
timezone or "UTC",
fmt,
end_offset,
minute_offset,
hour_offset,
day_offset,
)
def get_partitions(
self, current_time: Optional[datetime] = None
) -> Sequence[Partition[TimeWindow]]:
current_timestamp = (
pendulum.instance(current_time, tz=self.timezone)
if current_time
else pendulum.now(self.timezone)
).timestamp()
partitions_past_current_time = 0
partitions: List[Partition[TimeWindow]] = []
for time_window in self._iterate_time_windows(self.start):
if (
time_window.end.timestamp() <= current_timestamp
or partitions_past_current_time < self.end_offset
):
partitions.append(
Partition(value=time_window, name=time_window.start.strftime(self.fmt))
)
if time_window.end.timestamp() > current_timestamp:
partitions_past_current_time += 1
else:
break
if self.end_offset < 0:
partitions = partitions[: self.end_offset]
return partitions
def __str__(self) -> str:
partition_def_str = f"{self.schedule_type.value.capitalize()}, starting {self.start.strftime(self.fmt)} {self.timezone}."
if self.end_offset != 0:
partition_def_str += f" End offsetted by {self.end_offset} partition{'' if self.end_offset == 1 else 's'}."
return partition_def_str
def time_window_for_partition_key(self, partition_key: str) -> TimeWindow:
start = self.start_time_for_partition_key(partition_key)
return next(iter(self._iterate_time_windows(start)))
def start_time_for_partition_key(self, partition_key: str) -> datetime:
return pendulum.instance(datetime.strptime(partition_key, self.fmt), tz=self.timezone)
def get_default_partition_mapping(self):
from dagster._core.definitions.time_window_partition_mapping import (
TimeWindowPartitionMapping,
)
return TimeWindowPartitionMapping()
def get_partition_keys_in_range(self, partition_key_range: PartitionKeyRange) -> Sequence[str]:
start_time = self.start_time_for_partition_key(partition_key_range.start)
end_time = self.start_time_for_partition_key(partition_key_range.end)
result: List[str] = []
for time_window in self._iterate_time_windows(start_time):
if time_window.start <= end_time:
result.append(time_window.start.strftime(self.fmt))
else:
break
return result
[docs] @public
def get_cron_schedule(
self,
minute_of_hour: Optional[int] = None,
hour_of_day: Optional[int] = None,
day_of_week: Optional[int] = None,
day_of_month: Optional[int] = None,
):
"""The schedule executes at the cadence specified by the partitioning."""
minute_of_hour = cast(
int,
check.opt_int_param(minute_of_hour, "minute_of_hour", default=self.minute_offset),
)
if self.schedule_type == ScheduleType.HOURLY:
check.invariant(
hour_of_day is None, "Cannot set hour parameter with hourly partitions."
)
hour_of_day = cast(
int, check.opt_int_param(hour_of_day, "hour_of_day", default=self.hour_offset)
)
execution_time = time(minute=minute_of_hour, hour=hour_of_day)
if self.schedule_type == ScheduleType.DAILY:
check.invariant(
day_of_week is None, "Cannot set day of week parameter with daily partitions."
)
check.invariant(
day_of_month is None, "Cannot set day of month parameter with daily partitions."
)
if self.schedule_type == ScheduleType.MONTHLY:
default = self.day_offset or 1
execution_day = check.opt_int_param(day_of_month, "day_of_month", default=default)
elif self.schedule_type == ScheduleType.WEEKLY:
default = self.day_offset or 0
execution_day = check.opt_int_param(day_of_week, "day_of_week", default=default)
else:
execution_day = 0
return get_cron_schedule(self.schedule_type, execution_time, execution_day)
def _iterate_time_windows(self, start: datetime) -> Iterable[TimeWindow]:
"""
Returns an infinite generator of time windows after the given start time.
"""
time_of_day = time(self.hour_offset, self.minute_offset)
start_timestamp = pendulum.instance(start, tz=self.timezone).timestamp()
iterator = schedule_execution_time_iterator(
start_timestamp=start_timestamp,
cron_schedule=get_cron_schedule(
schedule_type=self.schedule_type,
time_of_day=time_of_day,
execution_day=self.day_offset,
),
execution_timezone=self.timezone,
)
prev_time = next(iterator)
while prev_time.timestamp() < start_timestamp:
prev_time = next(iterator)
while True:
next_time = next(iterator)
yield TimeWindow(prev_time, next_time)
prev_time = next_time
[docs]class DailyPartitionsDefinition(TimeWindowPartitionsDefinition):
def __new__(
cls,
start_date: Union[datetime, str],
minute_offset: int = 0,
hour_offset: int = 0,
timezone: Optional[str] = None,
fmt: Optional[str] = None,
end_offset: int = 0,
):
"""A set of daily partitions.
The first partition in the set will start at the start_date at midnight. The last partition
in the set will end before the current time, unless the end_offset argument is set to a
positive number. If minute_offset and/or hour_offset are used, the start and end times of
each partition will be hour_offset:minute_offset of each day.
Args:
start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can
provide in either a datetime or string format.
minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults
to 0.
hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0.
timezone (Optional[str]): The timezone in which each date should exist.
Supported strings for timezones are the ones provided by the
`IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles".
fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`.
end_offset (int): Extends the partition set by a number of partitions equal to the value
passed. If end_offset is 0 (the default), the last partition ends before the current
time. If end_offset is 1, the second-to-last partition ends before the current time,
and so on.
.. code-block:: python
DailyPartitionsDefinition(start_date="2022-03-12")
# creates partitions (2022-03-12-00:00, 2022-03-13-00:00), (2022-03-13-00:00, 2022-03-14-00:00), ...
DailyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=16)
# creates partitions (2022-03-12-16:15, 2022-03-13-16:15), (2022-03-13-16:15, 2022-03-14-16:15), ...
"""
_fmt = fmt or DEFAULT_DATE_FORMAT
return super(DailyPartitionsDefinition, cls).__new__(
cls,
schedule_type=ScheduleType.DAILY,
start=start_date,
minute_offset=minute_offset,
hour_offset=hour_offset,
timezone=timezone,
fmt=_fmt,
end_offset=end_offset,
)
def wrap_time_window_tags_fn(
tags_fn: Optional[Callable[[datetime, datetime], Dict[str, str]]]
) -> Callable[[Partition], Dict[str, str]]:
def _tag_wrapper(partition: Partition) -> Dict[str, str]:
if not tags_fn:
return {}
return tags_fn(cast(datetime, partition.value[0]), cast(datetime, partition.value[1]))
return _tag_wrapper
[docs]def daily_partitioned_config(
start_date: Union[datetime, str],
minute_offset: int = 0,
hour_offset: int = 0,
timezone: Optional[str] = None,
fmt: Optional[str] = None,
end_offset: int = 0,
tags_for_partition_fn: Optional[Callable[[datetime, datetime], Dict[str, str]]] = None,
) -> Callable[[Callable[[datetime, datetime], Dict[str, Any]]], PartitionedConfig]:
"""Defines run config over a set of daily partitions.
The decorated function should accept a start datetime and end datetime, which represent the bounds
of the date partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job.
The first partition in the set will start at the start_date at midnight. The last partition in
the set will end before the current time, unless the end_offset argument is set to a positive
number. If minute_offset and/or hour_offset are used, the start and end times of each partition
will be hour_offset:minute_offset of each day.
Args:
start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can
provide in either a datetime or string format.
minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults
to 0.
hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0.
timezone (Optional[str]): The timezone in which each date should exist.
Supported strings for timezones are the ones provided by the
`IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles".
fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`.
end_offset (int): Extends the partition set by a number of partitions equal to the value
passed. If end_offset is 0 (the default), the last partition ends before the current
time. If end_offset is 1, the second-to-last partition ends before the current time,
and so on.
.. code-block:: python
@daily_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-03-12-00:00, 2022-03-13-00:00), (2022-03-13-00:00, 2022-03-14-00:00), ...
@daily_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=16)
# creates partitions (2022-03-12-16:15, 2022-03-13-16:15), (2022-03-13-16:15, 2022-03-14-16:15), ...
"""
def inner(fn: Callable[[datetime, datetime], Dict[str, Any]]) -> PartitionedConfig:
check.callable_param(fn, "fn")
return PartitionedConfig(
run_config_for_partition_fn=lambda partition: fn(
partition.value[0], partition.value[1]
),
partitions_def=DailyPartitionsDefinition(
start_date=start_date,
minute_offset=minute_offset,
hour_offset=hour_offset,
timezone=timezone,
fmt=fmt,
end_offset=end_offset,
),
decorated_fn=fn,
tags_for_partition_fn=wrap_time_window_tags_fn(tags_for_partition_fn),
)
return inner
[docs]class HourlyPartitionsDefinition(TimeWindowPartitionsDefinition):
def __new__(
cls,
start_date: Union[datetime, str],
minute_offset: int = 0,
timezone: Optional[str] = None,
fmt: Optional[str] = None,
end_offset: int = 0,
):
"""A set of hourly partitions.
The first partition in the set will start on the start_date at midnight. The last partition
in the set will end before the current time, unless the end_offset argument is set to a
positive number. If minute_offset is provided, the start and end times of each partition
will be minute_offset past the hour.
Args:
start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can
provide in either a datetime or string format.
minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults
to 0.
fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`.
timezone (Optional[str]): The timezone in which each date should exist.
Supported strings for timezones are the ones provided by the
`IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles".
end_offset (int): Extends the partition set by a number of partitions equal to the value
passed. If end_offset is 0 (the default), the last partition ends before the current
time. If end_offset is 1, the second-to-last partition ends before the current time,
and so on.
.. code-block:: python
HourlyPartitionsDefinition(start_date=datetime(2022, 03, 12))
# creates partitions (2022-03-12-00:00, 2022-03-12-01:00), (2022-03-12-01:00, 2022-03-12-02:00), ...
HourlyPartitionsDefinition(start_date=datetime(2022, 03, 12), minute_offset=15)
# creates partitions (2022-03-12-00:15, 2022-03-12-01:15), (2022-03-12-01:15, 2022-03-12-02:15), ...
"""
_fmt = fmt or DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE
return super(HourlyPartitionsDefinition, cls).__new__(
cls,
schedule_type=ScheduleType.HOURLY,
start=start_date,
minute_offset=minute_offset,
timezone=timezone,
fmt=_fmt,
end_offset=end_offset,
)
[docs]def hourly_partitioned_config(
start_date: Union[datetime, str],
minute_offset: int = 0,
timezone: Optional[str] = None,
fmt: Optional[str] = None,
end_offset: int = 0,
tags_for_partition_fn: Optional[Callable[[datetime, datetime], Dict[str, str]]] = None,
) -> Callable[[Callable[[datetime, datetime], Dict[str, Any]]], PartitionedConfig]:
"""Defines run config over a set of hourly partitions.
The decorated function should accept a start datetime and end datetime, which represent the date
partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job.
The first partition in the set will start at the start_date at midnight. The last partition in
the set will end before the current time, unless the end_offset argument is set to a positive
number. If minute_offset is provided, the start and end times of each partition will be
minute_offset past the hour.
Args:
start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can
provide in either a datetime or string format.
minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults
to 0.
fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`.
timezone (Optional[str]): The timezone in which each date should exist.
Supported strings for timezones are the ones provided by the
`IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles".
end_offset (int): Extends the partition set by a number of partitions equal to the value
passed. If end_offset is 0 (the default), the last partition ends before the current
time. If end_offset is 1, the second-to-last partition ends before the current time,
and so on.
.. code-block:: python
@hourly_partitioned_config(start_date=datetime(2022, 03, 12))
# creates partitions (2022-03-12-00:00, 2022-03-12-01:00), (2022-03-12-01:00, 2022-03-12-02:00), ...
@hourly_partitioned_config(start_date=datetime(2022, 03, 12), minute_offset=15)
# creates partitions (2022-03-12-00:15, 2022-03-12-01:15), (2022-03-12-01:15, 2022-03-12-02:15), ...
"""
def inner(fn: Callable[[datetime, datetime], Dict[str, Any]]) -> PartitionedConfig:
check.callable_param(fn, "fn")
return PartitionedConfig(
run_config_for_partition_fn=lambda partition: fn(
partition.value[0], partition.value[1]
),
partitions_def=HourlyPartitionsDefinition(
start_date=start_date,
minute_offset=minute_offset,
timezone=timezone,
fmt=fmt,
end_offset=end_offset,
),
decorated_fn=fn,
tags_for_partition_fn=wrap_time_window_tags_fn(tags_for_partition_fn),
)
return inner
[docs]class MonthlyPartitionsDefinition(TimeWindowPartitionsDefinition):
def __new__(
cls,
start_date: Union[datetime, str],
minute_offset: int = 0,
hour_offset: int = 0,
day_offset: int = 1,
timezone: Optional[str] = None,
fmt: Optional[str] = None,
end_offset: int = 0,
):
"""A set of monthly partitions.
The first partition in the set will start at the soonest first of the month after start_date
at midnight. The last partition in the set will end before the current time, unless the
end_offset argument is set to a positive number. If day_offset is provided, the start and
end date of each partition will be day_offset. If minute_offset and/or hour_offset are used,
the start and end times of each partition will be hour_offset:minute_offset of each day.
Args:
start_date (Union[datetime.datetime, str]): The first date in the set of partitions will be
midnight the sonnest first of the month following start_date. Can provide in either a
datetime or string format.
minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults
to 0.
hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0.
day_offset (int): Day of the month to "split" the partition. Defaults to 1.
timezone (Optional[str]): The timezone in which each date should exist.
Supported strings for timezones are the ones provided by the
`IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles".
fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`.
end_offset (int): Extends the partition set by a number of partitions equal to the value
passed. If end_offset is 0 (the default), the last partition ends before the current
time. If end_offset is 1, the second-to-last partition ends before the current time,
and so on.
.. code-block:: python
MonthlyPartitionsDefinition(start_date="2022-03-12")
# creates partitions (2022-04-01-00:00, 2022-05-01-00:00), (2022-05-01-00:00, 2022-06-01-00:00), ...
MonthlyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=5)
# creates partitions (2022-04-05-03:15, 2022-05-05-03:15), (2022-05-05-03:15, 2022-06-05-03:15), ...
"""
_fmt = fmt or DEFAULT_DATE_FORMAT
return super(MonthlyPartitionsDefinition, cls).__new__(
cls,
schedule_type=ScheduleType.MONTHLY,
start=start_date,
minute_offset=minute_offset,
hour_offset=hour_offset,
day_offset=day_offset,
timezone=timezone,
fmt=_fmt,
end_offset=end_offset,
)
[docs]def monthly_partitioned_config(
start_date: Union[datetime, str],
minute_offset: int = 0,
hour_offset: int = 0,
day_offset: int = 1,
timezone: Optional[str] = None,
fmt: Optional[str] = None,
end_offset: int = 0,
tags_for_partition_fn: Optional[Callable[[datetime, datetime], Dict[str, str]]] = None,
) -> Callable[[Callable[[datetime, datetime], Dict[str, Any]]], PartitionedConfig]:
"""Defines run config over a set of monthly partitions.
The decorated function should accept a start datetime and end datetime, which represent the date
partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job.
The first partition in the set will start at midnight on the soonest first of the month after
start_date. The last partition in the set will end before the current time, unless the
end_offset argument is set to a positive number. If day_offset is provided, the start and end
date of each partition will be day_offset. If minute_offset and/or hour_offset are used, the
start and end times of each partition will be hour_offset:minute_offset of each day.
Args:
start_date (Union[datetime.datetime, str]): The first date in the set of partitions will be
midnight the sonnest first of the month following start_date. Can provide in either a
datetime or string format.
minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults
to 0.
hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0.
day_offset (int): Day of the month to "split" the partition. Defaults to 1.
timezone (Optional[str]): The timezone in which each date should exist.
Supported strings for timezones are the ones provided by the
`IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles".
fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`.
end_offset (int): Extends the partition set by a number of partitions equal to the value
passed. If end_offset is 0 (the default), the last partition ends before the current
time. If end_offset is 1, the second-to-last partition ends before the current time,
and so on.
.. code-block:: python
@monthly_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-04-01-00:00, 2022-05-01-00:00), (2022-05-01-00:00, 2022-06-01-00:00), ...
@monthly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=5)
# creates partitions (2022-04-05-03:15, 2022-05-05-03:15), (2022-05-05-03:15, 2022-06-05-03:15), ...
"""
def inner(fn: Callable[[datetime, datetime], Dict[str, Any]]) -> PartitionedConfig:
check.callable_param(fn, "fn")
return PartitionedConfig(
run_config_for_partition_fn=lambda partition: fn(
partition.value[0], partition.value[1]
),
partitions_def=MonthlyPartitionsDefinition(
start_date=start_date,
minute_offset=minute_offset,
hour_offset=hour_offset,
day_offset=day_offset,
timezone=timezone,
fmt=fmt,
end_offset=end_offset,
),
decorated_fn=fn,
tags_for_partition_fn=wrap_time_window_tags_fn(tags_for_partition_fn),
)
return inner
[docs]class WeeklyPartitionsDefinition(TimeWindowPartitionsDefinition):
def __new__(
cls,
start_date: Union[datetime, str],
minute_offset: int = 0,
hour_offset: int = 0,
day_offset: int = 0,
timezone: Optional[str] = None,
fmt: Optional[str] = None,
end_offset: int = 0,
):
"""Defines a set of weekly partitions.
The first partition in the set will start at the start_date. The last partition in the set will
end before the current time, unless the end_offset argument is set to a positive number. If
day_offset is provided, the start and end date of each partition will be day of the week
corresponding to day_offset (0 indexed with Sunday as the start of the week). If
minute_offset and/or hour_offset are used, the start and end times of each partition will be
hour_offset:minute_offset of each day.
Args:
start_date (Union[datetime.datetime, str]): The first date in the set of partitions will
Sunday at midnight following start_date. Can provide in either a datetime or string
format.
minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults
to 0.
hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0.
day_offset (int): Day of the week to "split" the partition. Defaults to 0 (Sunday).
timezone (Optional[str]): The timezone in which each date should exist.
Supported strings for timezones are the ones provided by the
`IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles".
fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`.
end_offset (int): Extends the partition set by a number of partitions equal to the value
passed. If end_offset is 0 (the default), the last partition ends before the current
time. If end_offset is 1, the second-to-last partition ends before the current time,
and so on.
.. code-block:: python
WeeklyPartitionsDefinition(start_date="2022-03-12")
# creates partitions (2022-03-13-00:00, 2022-03-20-00:00), (2022-03-20-00:00, 2022-03-27-00:00), ...
WeeklyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=6)
# creates partitions (2022-03-12-03:15, 2022-03-19-03:15), (2022-03-19-03:15, 2022-03-26-03:15), ...
"""
_fmt = fmt or DEFAULT_DATE_FORMAT
return super(WeeklyPartitionsDefinition, cls).__new__(
cls,
schedule_type=ScheduleType.WEEKLY,
start=start_date,
minute_offset=minute_offset,
hour_offset=hour_offset,
day_offset=day_offset,
timezone=timezone,
fmt=_fmt,
end_offset=end_offset,
)
[docs]def weekly_partitioned_config(
start_date: Union[datetime, str],
minute_offset: int = 0,
hour_offset: int = 0,
day_offset: int = 0,
timezone: Optional[str] = None,
fmt: Optional[str] = None,
end_offset: int = 0,
tags_for_partition_fn: Optional[Callable[[datetime, datetime], Dict[str, str]]] = None,
) -> Callable[[Callable[[datetime, datetime], Dict[str, Any]]], PartitionedConfig]:
"""Defines run config over a set of weekly partitions.
The decorated function should accept a start datetime and end datetime, which represent the date
partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job.
The first partition in the set will start at the start_date. The last partition in the set will
end before the current time, unless the end_offset argument is set to a positive number. If
day_offset is provided, the start and end date of each partition will be day of the week
corresponding to day_offset (0 indexed with Sunday as the start of the week). If
minute_offset and/or hour_offset are used, the start and end times of each partition will be
hour_offset:minute_offset of each day.
Args:
start_date (Union[datetime.datetime, str]): The first date in the set of partitions will
Sunday at midnight following start_date. Can provide in either a datetime or string
format.
minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults
to 0.
hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0.
day_offset (int): Day of the week to "split" the partition. Defaults to 0 (Sunday).
timezone (Optional[str]): The timezone in which each date should exist.
Supported strings for timezones are the ones provided by the
`IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles".
fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`.
end_offset (int): Extends the partition set by a number of partitions equal to the value
passed. If end_offset is 0 (the default), the last partition ends before the current
time. If end_offset is 1, the second-to-last partition ends before the current time,
and so on.
.. code-block:: python
@weekly_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-03-13-00:00, 2022-03-20-00:00), (2022-03-20-00:00, 2022-03-27-00:00), ...
@weekly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=6)
# creates partitions (2022-03-12-03:15, 2022-03-19-03:15), (2022-03-19-03:15, 2022-03-26-03:15), ...
"""
def inner(fn: Callable[[datetime, datetime], Dict[str, Any]]) -> PartitionedConfig:
check.callable_param(fn, "fn")
return PartitionedConfig(
run_config_for_partition_fn=lambda partition: fn(
partition.value[0], partition.value[1]
),
partitions_def=WeeklyPartitionsDefinition(
start_date=start_date,
minute_offset=minute_offset,
hour_offset=hour_offset,
day_offset=day_offset,
timezone=timezone,
fmt=fmt,
end_offset=end_offset,
),
decorated_fn=fn,
tags_for_partition_fn=wrap_time_window_tags_fn(tags_for_partition_fn),
)
return inner