Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 58 additions & 31 deletions cylc/flow/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,46 @@

import contextlib
from functools import lru_cache
import os
import re
from typing import List, Optional, TYPE_CHECKING, Tuple
from typing import TYPE_CHECKING, List, Optional, Tuple

from metomi.isodatetime.data import Calendar, CALENDAR, Duration
from metomi.isodatetime.data import CALENDAR, Calendar, Duration
from metomi.isodatetime.dumpers import TimePointDumper
from metomi.isodatetime.timezone import (
get_local_time_zone, get_local_time_zone_format, TimeZoneFormatMode)
from metomi.isodatetime.exceptions import IsodatetimeError
from metomi.isodatetime.parsers import ISO8601SyntaxError
from cylc.flow.time_parser import CylcTimeParser
from metomi.isodatetime.timezone import (
TimeZoneFormatMode,
get_local_time_zone,
get_local_time_zone_format,
)

from cylc.flow.cycling import (
PointBase, IntervalBase, SequenceBase, ExclusionBase, cmp
ExclusionBase,
IntervalBase,
PointBase,
SequenceBase,
cmp,
)
from cylc.flow.exceptions import (
CylcConfigError,
IntervalParsingError,
PointParsingError,
SequenceDegenerateError,
WorkflowConfigError
WorkflowConfigError,
)
from cylc.flow.wallclock import get_current_time_string
from cylc.flow.parsec.validate import IllegalValueError
from cylc.flow.time_parser import CylcTimeParser
from cylc.flow.wallclock import get_current_time_string


if TYPE_CHECKING:
from metomi.isodatetime.data import TimePoint
from metomi.isodatetime.parsers import (
DurationParser, TimePointParser, TimeRecurrenceParser)
DurationParser,
TimePointParser,
TimeRecurrenceParser,
)

CYCLER_TYPE_ISO8601 = "iso8601"
CYCLER_TYPE_SORT_KEY_ISO8601 = 1
Expand All @@ -57,6 +70,17 @@
"(incompatible with [cylc]cycle point num expanded year digits = %s ?)")


# NOTE: We cache some datetime cycling operations to improve compute
# perforance. For profiling, this can be disabled by setting the environment
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# perforance. For profiling, this can be disabled by setting the environment
# performance. For profiling, this can be disabled by setting the environment

# variable CYLC_CYCLER_LRU_CACHE_SIZE=0.

# The number of cycling operations to cache:
_LRU_CACHE_SIZE = int(os.environ.get('CYLC_CYCLER_LRU_CACHE_SIZE', '10000'))

# A smaller cache for use with larger objecs (to reduce memory impact):
_LARGE_LRU_CACHE_SIZE = int(_LRU_CACHE_SIZE / 100) if _LRU_CACHE_SIZE else 0


class WorkflowSpecifics:

"""Store workflow-setup-specific constants and utilities here."""
Expand Down Expand Up @@ -123,7 +147,7 @@ def sub(self, other):
))

@staticmethod
@lru_cache(10000)
@lru_cache(_LRU_CACHE_SIZE)
def _iso_point_add(point_string, interval_string, _calendar_mode):
"""Add the parsed point_string to the parsed interval_string."""
point = point_parse(point_string)
Expand All @@ -134,23 +158,23 @@ def _cmp(self, other: 'ISO8601Point') -> int:
return self._iso_point_cmp(self.value, other.value, CALENDAR.mode)

@staticmethod
@lru_cache(10000)
@lru_cache(_LRU_CACHE_SIZE)
def _iso_point_cmp(point_string, other_point_string, _calendar_mode):
"""Compare the parsed point_string to the other one."""
point = point_parse(point_string)
other_point = point_parse(other_point_string)
return cmp(point, other_point)

@staticmethod
@lru_cache(10000)
@lru_cache(_LRU_CACHE_SIZE)
def _iso_point_sub_interval(point_string, interval_string, _calendar_mode):
"""Return the parsed point_string minus the parsed interval_string."""
point = point_parse(point_string)
interval = interval_parse(interval_string)
return str(point - interval)

@staticmethod
@lru_cache(10000)
@lru_cache(_LRU_CACHE_SIZE)
def _iso_point_sub_point(point_string, other_point_string, _calendar_mode):
"""Return the difference between the two parsed point strings."""
point = point_parse(point_string)
Expand Down Expand Up @@ -216,7 +240,7 @@ def __bool__(self):
return self._iso_interval_nonzero(self.value)

@staticmethod
@lru_cache(10000)
@lru_cache(_LRU_CACHE_SIZE)
def _iso_interval_abs(interval_string, other_interval_string):
"""Return the absolute (non-negative) value of an interval_string."""
interval = interval_parse(interval_string)
Expand All @@ -226,38 +250,38 @@ def _iso_interval_abs(interval_string, other_interval_string):
return interval_string

@staticmethod
@lru_cache(10000)
@lru_cache(_LRU_CACHE_SIZE)
def _iso_interval_add(interval_string, other_interval_string):
"""Return one parsed interval_string plus the other one."""
interval = interval_parse(interval_string)
other = interval_parse(other_interval_string)
return str(interval + other)

@staticmethod
@lru_cache(10000)
@lru_cache(_LRU_CACHE_SIZE)
def _iso_interval_cmp(interval_string, other_interval_string):
"""Compare one parsed interval_string with the other one."""
interval = interval_parse(interval_string)
other = interval_parse(other_interval_string)
return cmp(interval, other)

@staticmethod
@lru_cache(10000)
@lru_cache(_LRU_CACHE_SIZE)
def _iso_interval_sub(interval_string, other_interval_string):
"""Subtract one parsed interval_string from the other one."""
interval = interval_parse(interval_string)
other = interval_parse(other_interval_string)
return str(interval - other)

@staticmethod
@lru_cache(10000)
@lru_cache(_LRU_CACHE_SIZE)
def _iso_interval_mul(interval_string, factor):
"""Multiply one parsed interval_string's values by factor."""
interval = interval_parse(interval_string)
return str(interval * factor)

@staticmethod
@lru_cache(10000)
@lru_cache(_LRU_CACHE_SIZE)
def _iso_interval_nonzero(interval_string):
"""Return whether the parsed interval_string is a null interval."""
interval = interval_parse(interval_string)
Expand Down Expand Up @@ -318,7 +342,6 @@ class ISO8601Sequence(SequenceBase):

TYPE = CYCLER_TYPE_ISO8601
TYPE_SORT_KEY = CYCLER_TYPE_SORT_KEY_ISO8601
_MAX_CACHED_POINTS = 100

__slots__ = ('dep_section', 'context_start_point', 'context_end_point',
'offset', '_cached_first_point_values',
Expand Down Expand Up @@ -346,7 +369,9 @@ def __init__(

# cache is_on_sequence
# see B019 - https://github.com/PyCQA/flake8-bugbear#list-of-warnings
self.is_on_sequence = lru_cache(maxsize=100)(self._is_on_sequence)
self.is_on_sequence = lru_cache(_LARGE_LRU_CACHE_SIZE)(
self._is_on_sequence
)

if (
context_start_point is None
Expand Down Expand Up @@ -462,8 +487,7 @@ def is_valid(self, point):
return self._cached_valid_point_booleans[point.value]
except KeyError:
is_valid = self.is_on_sequence(point)
if (len(self._cached_valid_point_booleans) >
self._MAX_CACHED_POINTS):
if len(self._cached_valid_point_booleans) > _LARGE_LRU_CACHE_SIZE:
self._cached_valid_point_booleans.popitem()
self._cached_valid_point_booleans[point.value] = is_valid
return is_valid
Expand Down Expand Up @@ -555,14 +579,15 @@ def _check_and_cache_next_point(self, point, next_point):
)

# Cache the answer for point -> next_point.
if (len(self._cached_next_point_values) >
self._MAX_CACHED_POINTS):
if len(self._cached_next_point_values) > _LARGE_LRU_CACHE_SIZE:
self._cached_next_point_values.popitem()
self._cached_next_point_values[point.value] = next_point.value

# Cache next_point as a valid starting point for this recurrence.
if (len(self._cached_next_point_values) >
self._MAX_CACHED_POINTS):
if (
_LARGE_LRU_CACHE_SIZE
and len(self._cached_next_point_values) > _LARGE_LRU_CACHE_SIZE
):
self._cached_recent_valid_points.pop(0)
self._cached_recent_valid_points.append(next_point)

Expand Down Expand Up @@ -600,8 +625,10 @@ def get_first_point(
# Check multiple exclusions
if ret and ret in self.exclusions:
return self.get_next_point_on_sequence(ret)
if (len(self._cached_first_point_values) >
self._MAX_CACHED_POINTS):
if (
len(self._cached_first_point_values)
> _LARGE_LRU_CACHE_SIZE
):
self._cached_first_point_values.popitem()
self._cached_first_point_values[point.value] = (
first_point_value)
Expand Down Expand Up @@ -950,7 +977,7 @@ def is_offset_absolute(offset_string):
return False


@lru_cache(10000)
@lru_cache(_LRU_CACHE_SIZE)
def _interval_parse(interval_string):
"""Parse an interval_string into a proper Duration object."""
return WorkflowSpecifics.interval_parser.parse(interval_string)
Expand All @@ -965,7 +992,7 @@ def point_parse(point_string: str) -> 'TimePoint':
)


@lru_cache(10000)
@lru_cache(_LRU_CACHE_SIZE)
def _point_parse(point_string: str, _dump_fmt, _tz) -> 'TimePoint':
"""Parse a point_string into a proper TimePoint object.

Expand Down
Loading