Source code for dagster._core.definitions.time_window_partition_mapping

from datetime import datetime, timedelta
from typing import Optional, cast

import dagster._check as check
from dagster._core.definitions.partition import PartitionsDefinition, ScheduleType
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition
from dagster._core.errors import DagsterInvalidDefinitionError


[docs]class TimeWindowPartitionMapping(PartitionMapping): """ The default mapping between two TimeWindowPartitionsDefinitions. A partition in the downstream partitions definition is mapped to all partitions in the upstream asset whose time windows overlap it. This means that, if the upstream and downstream partitions definitions share the same time period, then this mapping is essentially the identity partition mapping - plus conversion of datetime formats. If the upstream time period is coarser than the downstream time period, then each partition in the downstream asset will map to a single (larger) upstream partition. E.g. if the downstream is hourly and the upstream is daily, then each hourly partition in the downstream will map to the daily partition in the upstream that contains that hour. If the upstream time period is finer than the downstream time period, then each partition in the downstream asset will map to multiple upstream partitions. E.g. if the downstream is daily and the upstream is hourly, then each daily partition in the downstream asset will map to the 24 hourly partitions in the upstream that occur on that day. """ def get_upstream_partitions_for_partition_range( self, downstream_partition_key_range: Optional[PartitionKeyRange], downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, ) -> PartitionKeyRange: if downstream_partitions_def is None or downstream_partition_key_range is None: check.failed("downstream asset is not partitioned") return self._map_partitions( downstream_partitions_def, upstream_partitions_def, downstream_partition_key_range ) def get_downstream_partitions_for_partition_range( self, upstream_partition_key_range: PartitionKeyRange, downstream_partitions_def: Optional[PartitionsDefinition], upstream_partitions_def: PartitionsDefinition, ) -> PartitionKeyRange: if downstream_partitions_def is None: check.failed("downstream asset is not partitioned") return self._map_partitions( upstream_partitions_def, downstream_partitions_def, upstream_partition_key_range ) def _map_partitions( self, from_partitions_def: PartitionsDefinition, to_partitions_def: PartitionsDefinition, from_partition_key_range: PartitionKeyRange, ) -> PartitionKeyRange: if not isinstance(from_partitions_def, TimeWindowPartitionsDefinition) or not isinstance( from_partitions_def, TimeWindowPartitionsDefinition ): raise DagsterInvalidDefinitionError( "TimeWindowPartitionMappings can only operate on TimeWindowPartitionsDefinitions" ) from_partitions_def = cast(TimeWindowPartitionsDefinition, from_partitions_def) to_partitions_def = cast(TimeWindowPartitionsDefinition, to_partitions_def) if to_partitions_def.timezone != from_partitions_def.timezone: raise DagsterInvalidDefinitionError("Timezones don't match") to_period = to_partitions_def.schedule_type from_period = from_partitions_def.schedule_type from_start_dt = datetime.strptime(from_partition_key_range.start, from_partitions_def.fmt) from_end_dt = datetime.strptime(from_partition_key_range.end, from_partitions_def.fmt) if to_period > from_period: to_start_dt = round_datetime_to_period(from_start_dt, to_period) to_end_dt = round_datetime_to_period(from_end_dt, to_period) elif to_period < from_period: to_start_dt = from_start_dt to_end_dt = (from_end_dt + from_period.delta) - to_period.delta else: to_start_dt = from_start_dt to_end_dt = from_end_dt return PartitionKeyRange( to_start_dt.strftime(to_partitions_def.fmt), to_end_dt.strftime(to_partitions_def.fmt), )
def round_datetime_to_period(dt: datetime, period: ScheduleType) -> datetime: if period == ScheduleType.MONTHLY: return dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0) elif period == ScheduleType.WEEKLY: return (dt - timedelta(days=dt.weekday())).replace( hour=0, minute=0, second=0, microsecond=0 ) elif period == ScheduleType.DAILY: return dt.replace(hour=0, minute=0, second=0, microsecond=0) elif period == ScheduleType.HOURLY: return dt.replace(minute=0, second=0, microsecond=0) else: check.failed("Unknown schedule type")