Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/how_tos/map_time_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dst_table_name = "ev_charging_datetime"
hours_per_year = 12 * 7 * 24
num_time_arrays = 3
df = pd.DataFrame({
"id": np.concat([np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)]),
"id": np.concatenate([np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)]),
"month": np.tile(np.repeat(range(1, 13), 7 * 24), num_time_arrays),
"day_of_week": np.tile(np.tile(np.repeat(range(7), 24), 12), num_time_arrays),
"hour": np.tile(np.tile(range(24), 12 * 7), num_time_arrays),
Expand Down
9 changes: 4 additions & 5 deletions src/chronify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@
TableSchema,
)
from chronify.store import Store
from chronify.time import RepresentativePeriodFormat
from chronify.time import RepresentativePeriodFormat, TimeDataType
from chronify.time_configs import (
AnnualTimeRange,
DatetimeRange,
DatetimeRangeWithTZColumn,
IndexTimeRangeNTZ,
IndexTimeRangeTZ,
IndexTimeRange,
IndexTimeRangeWithTZColumn,
RepresentativePeriodTimeNTZ,
RepresentativePeriodTimeTZ,
Expand All @@ -42,9 +41,8 @@
"CsvTableSchema",
"DatetimeRange",
"DatetimeRangeWithTZColumn",
"IndexTimeRange",
"IndexTimeRangeWithTZColumn",
"IndexTimeRangeNTZ",
"IndexTimeRangeTZ",
"InvalidOperation",
"InvalidParameter",
"InvalidTable",
Expand All @@ -59,6 +57,7 @@
"TableSchema",
"TimeBaseModel",
"TimeBasedDataAdjustment",
"TimeDataType",
)

__version__ = metadata.metadata("chronify")["Version"]
Expand Down
133 changes: 59 additions & 74 deletions src/chronify/datetime_range_generator.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from datetime import datetime, tzinfo
from typing import Generator, Optional
from zoneinfo import ZoneInfo
from itertools import chain

import pandas as pd

from chronify.time import (
LeapDayAdjustmentType,
TimeDataType,
)
from chronify.time_configs import DatetimeRanges, DatetimeRange, DatetimeRangeWithTZColumn
from chronify.time_utils import adjust_timestamp_by_dst_offset, get_tzname
from chronify.time_utils import get_tzname, list_timestamps
from chronify.time_range_generator_base import TimeRangeGeneratorBase
from chronify.exceptions import InvalidValue

Expand All @@ -25,58 +25,19 @@ def __init__(
self._model = model
self._adjustment = leap_day_adjustment or LeapDayAdjustmentType.NONE

def _iter_timestamps(
self, start: Optional[datetime] = None
) -> Generator[datetime, None, None]:
"""
if start is supplied, override self._model.start
"""
if start is None:
start = self._model.start
tz = start.tzinfo

for i in range(self._model.length):
if not tz:
cur = adjust_timestamp_by_dst_offset(
start + i * self._model.resolution, self._model.resolution
)
else:
# always step in standard time
cur_utc = start.astimezone(ZoneInfo("UTC")) + i * self._model.resolution
cur = adjust_timestamp_by_dst_offset(
cur_utc.astimezone(tz), self._model.resolution
)

is_leap_year = (
pd.Timestamp(f"{cur.year}-01-01") + pd.Timedelta(days=365)
).year == cur.year
if not is_leap_year:
yield pd.Timestamp(cur)
continue

month = cur.month
day = cur.day
if not (
self._adjustment == LeapDayAdjustmentType.DROP_FEB29 and month == 2 and day == 29
):
if not (
self._adjustment == LeapDayAdjustmentType.DROP_DEC31
and month == 12
and day == 31
):
if not (
self._adjustment == LeapDayAdjustmentType.DROP_JAN1
and month == 1
and day == 1
):
yield pd.Timestamp(cur)
def _iter_timestamps(self) -> Generator[datetime, None, None]:
"""Generate timestamps from pd.date_range()."""
for ts in list_timestamps(
self._model.start, self._model.length, self._model.resolution, self._adjustment
):
yield ts

def list_time_columns(self) -> list[str]:
return self._model.list_time_columns()

def list_distinct_timestamps_from_dataframe(self, df: pd.DataFrame) -> list[datetime]:
result = sorted(df[self._model.time_column].unique())
if not isinstance(result[0], datetime):
if len(result) > 0 and not isinstance(result[0], datetime):
result = [pd.Timestamp(x) for x in result]
return result

Expand All @@ -92,15 +53,18 @@ def __init__(
super().__init__(model, leap_day_adjustment=leap_day_adjustment)
assert isinstance(self._model, DatetimeRange)

def list_timestamps(self) -> list[datetime]:
return list(self._iter_timestamps())
def list_timestamps(self) -> list[pd.Timestamp]:
return list_timestamps(
self._model.start, self._model.length, self._model.resolution, self._adjustment
)


class DatetimeRangeGeneratorExternalTimeZone(DatetimeRangeGeneratorBase):
"""Generates datetime ranges based on a DatetimeRangeWithTZColumn model.
datetime ranges will be tz-naive and can be listed by time_zone name using special class func
"""Generate datetime ranges based on a DatetimeRangeWithTZColumn model.

Datetime ranges will be tz-naive and can be listed by time_zone name using special class functions.
These ranges may be localized by the time_zone name.
# TODO: add offset as a column

"""

def __init__(
Expand All @@ -117,42 +81,63 @@ def __init__(
)
raise InvalidValue(msg)

def _list_timestamps(self, time_zone: Optional[tzinfo]) -> list[datetime]:
"""always return tz-naive timestamps relative to input time_zone"""
if self._model.start_time_is_tz_naive():
# For clock-time-aligned data, iterate naively without timezone conversion.
# All timezones get the same clock times (e.g., midnight everywhere).
start = None
else:
if time_zone:
start = self._model.start.astimezone(time_zone)
else:
start = self._model.start.replace(tzinfo=None)
timestamps = list(self._iter_timestamps(start=start))
return [x.replace(tzinfo=None) for x in timestamps]

def list_timestamps(self) -> list[datetime]:
"""return ordered timestamps across all time zones in the order of the time zones."""
def _list_timestamps_by_time_zone(self, time_zone: Optional[tzinfo]) -> list[pd.Timestamp]:
"""Return timestamps for a given time_zone expected in the dataframe.

The returned timestamp dtype matches that in the dataframe (i.e., self._model.dtype).
For example, if time_zone is None, return tz-naive timestamps; otherwise, return tz-aware timestamps.
"""
match (self._model.start_time_is_tz_naive(), self._model.dtype):
case (True, TimeDataType.TIMESTAMP_NTZ):
# aligned_in_local_standard_time of the time zone,
# all time zones must have the same tz-naive timestamps
# timestamps must represent local standard time zone, not local prevailing time zone with DST
start = self._model.start
case (True, TimeDataType.TIMESTAMP_TZ):
# aligned_in_local_standard_time of the time zone,
# all time zones have different tz-aware timestamps that are aligned when adjusted to local standard time zone
start = self._model.start.replace(tzinfo=time_zone)
case (False, TimeDataType.TIMESTAMP_NTZ):
# aligned_in_absolute_time,
# all time zones have different tz-naive timestamps that are aligned when localized to the time zone
if time_zone:
start = self._model.start.astimezone(time_zone).replace(tzinfo=None)
else:
start = self._model.start.replace(tzinfo=None)
case (False, TimeDataType.TIMESTAMP_TZ):
# aligned_in_absolute_time, all time zones have the same tz-aware timestamps
start = self._model.start
case _:
msg = f"Unsupported combination of start_time_is_tz_naive and dtype: {self._model}"
raise InvalidValue(msg)
return list_timestamps(start, self._model.length, self._model.resolution, self._adjustment)

def list_timestamps(self) -> list[pd.Timestamp]:
"""Return ordered tz-naive timestamps across all time zones in the order of the time zones."""
dct = self.list_timestamps_by_time_zone()
return list(chain(*dct.values()))

def list_timestamps_by_time_zone(self) -> dict[str, list[datetime]]:
"""for each time zone, returns full timestamp iteration (duplicates allowed)"""
def list_timestamps_by_time_zone(self) -> dict[str, list[pd.Timestamp]]:
"""Return full timestamp iteration for each time zone.

This follows DST if applicable.
"""
dct = {}
for tz in self._model.get_time_zones():
tz_name = get_tzname(tz)
dct[tz_name] = self._list_timestamps(tz)

dct[tz_name] = self._list_timestamps_by_time_zone(time_zone=tz)
return dct

def list_distinct_timestamps_by_time_zone_from_dataframe(
self, df: pd.DataFrame
) -> dict[str, list[datetime]]:
"""Return distinct timestamps for each time zone from the dataframe."""
tz_col = self._model.get_time_zone_column()
t_col = self._model.time_column
df[t_col] = pd.to_datetime(df[t_col])
df2 = df[[tz_col, t_col]].drop_duplicates()
dct = {}
for tz_name in sorted(df2[tz_col].unique()):
dct[tz_name] = sorted(df2.loc[df2[tz_col] == tz_name, t_col].tolist())
timestamps = sorted(df2.loc[df2[tz_col] == tz_name, t_col].tolist())
dct[tz_name] = timestamps
return dct
1 change: 0 additions & 1 deletion src/chronify/duckdb/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ def add_datetime_column(
timestamps: list[datetime],
) -> DuckDBPyRelation:
"""Add a datetime column to the relation."""
# TODO
raise NotImplementedError
# values = []
# columns = ",".join(rel.columns)
Expand Down
31 changes: 20 additions & 11 deletions src/chronify/sqlalchemy/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,21 @@
import pandas as pd
from numpy.dtypes import DateTime64DType, ObjectDType
from pandas import DatetimeTZDtype
from chronify.time import TimeDataType
from sqlalchemy import Connection, Engine, Selectable, text

from chronify.exceptions import InvalidOperation, InvalidParameter
from chronify.time_configs import DatetimeRangeBase, DatetimeRange, TimeBaseModel
from chronify.time_configs import (
DatetimeRangeBase,
TimeBaseModel,
DatetimeRange,
DatetimeRangeWithTZColumn,
)
from chronify.utils.path_utils import check_overwrite, delete_if_exists, to_path

# Copied from Pandas/Polars
DbWriteMode: TypeAlias = Literal["replace", "append", "fail"]
DatetimeRangeWithDtype: TypeAlias = DatetimeRange | DatetimeRangeWithTZColumn


def read_database(
Expand All @@ -35,7 +42,7 @@ def read_database(
df = conn.execute(query).cursor.fetch_df() # type: ignore
case "sqlite":
df = pd.read_sql(query, conn, params=params)
if isinstance(config, DatetimeRange):
if isinstance(config, (DatetimeRange, DatetimeRangeWithTZColumn)):
_convert_database_output_for_datetime(df, config)
case "hive":
df = _read_from_hive(query, conn, config, params)
Expand Down Expand Up @@ -81,17 +88,16 @@ def _check_one_config_per_datetime_column(configs: Sequence[TimeBaseModel]) -> N


def _convert_database_input_for_datetime(
df: pd.DataFrame, config: DatetimeRange, copied: bool
df: pd.DataFrame, config: DatetimeRangeWithDtype, copied: bool
) -> tuple[pd.DataFrame, bool]:
if config.start_time_is_tz_naive():
if config.dtype == TimeDataType.TIMESTAMP_NTZ:
return df, copied

if copied:
df2 = df
else:
df2 = df.copy()
copied = True

if isinstance(df2[config.time_column].dtype, DatetimeTZDtype):
df2[config.time_column] = df2[config.time_column].dt.tz_convert("UTC")
else:
Expand All @@ -100,9 +106,11 @@ def _convert_database_input_for_datetime(
return df2, copied


def _convert_database_output_for_datetime(df: pd.DataFrame, config: DatetimeRange) -> None:
def _convert_database_output_for_datetime(
df: pd.DataFrame, config: DatetimeRangeWithDtype
) -> None:
if config.time_column in df.columns:
if not config.start_time_is_tz_naive():
if config.dtype == TimeDataType.TIMESTAMP_TZ:
if isinstance(df[config.time_column].dtype, ObjectDType):
df[config.time_column] = pd.to_datetime(df[config.time_column], utc=True)
else:
Expand All @@ -120,6 +128,7 @@ def _write_to_duckdb(
) -> None:
assert conn._dbapi_connection is not None
assert conn._dbapi_connection.driver_connection is not None

match if_table_exists:
case "append":
query = f"INSERT INTO {table_name} SELECT * FROM df"
Expand All @@ -131,6 +140,7 @@ def _write_to_duckdb(
case _:
msg = f"{if_table_exists=}"
raise InvalidOperation(msg)

conn._dbapi_connection.driver_connection.sql(query)


Expand Down Expand Up @@ -190,9 +200,9 @@ def _read_from_hive(
) -> pd.DataFrame:
df = pd.read_sql_query(query, conn, params=params)
if (
isinstance(config, DatetimeRange)
isinstance(config, (DatetimeRange, DatetimeRangeWithTZColumn))
and config.time_column in df.columns
and not config.start_time_is_tz_naive()
and config.dtype == TimeDataType.TIMESTAMP_TZ
):
# This is tied to the fact that we set the Spark session to UTC.
# Otherwise, there is confusion with the computer's local time zone.
Expand All @@ -210,7 +220,7 @@ def _write_to_sqlite(
_check_one_config_per_datetime_column(configs)
copied = False
for config in configs:
if isinstance(config, DatetimeRange):
if isinstance(config, (DatetimeRange, DatetimeRangeWithTZColumn)):
df, copied = _convert_database_input_for_datetime(df, config, copied)
df.to_sql(table_name, conn, if_exists=if_table_exists, index=False)

Expand Down Expand Up @@ -251,7 +261,6 @@ def write_query_to_parquet(
if not overwrite:
msg = "write_table_to_parquet with Hive requires overwrite=True"
raise InvalidOperation(msg)
# TODO: partition columns
if partition_columns:
msg = "write_table_to_parquet with Hive doesn't support partition_columns"
raise InvalidOperation(msg)
Expand Down
Loading