Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/7048.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Stopped the evaluated value of `initial cycle point = now` changing on reload/restart.
9 changes: 5 additions & 4 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,10 @@ async def reload_workflow(schd: 'Scheduler', reload_global: bool = False):
schd.reload_pending = 'loading the workflow definition'
schd.update_data_store() # update workflow status msg
schd._update_workflow_state()
# Things that can't change on workflow reload:
schd._set_workflow_params(
schd.workflow_db_mgr.pri_dao.select_workflow_params()
)
LOG.info("Reloading the workflow definition.")
config = schd.load_flow_file(is_reload=True)
except (ParsecError, CylcConfigError) as exc:
Expand All @@ -589,10 +593,7 @@ async def reload_workflow(schd: 'Scheduler', reload_global: bool = False):
else:
schd.reload_pending = 'applying the new config'
old_tasks = set(schd.config.get_task_name_list())
# Things that can't change on workflow reload:
schd._set_workflow_params(
schd.workflow_db_mgr.pri_dao.select_workflow_params()
)

schd.apply_new_config(config, is_reload=True)
schd.broadcast_mgr.linearized_ancestors = (
schd.config.get_linearized_ancestors()
Expand Down
23 changes: 7 additions & 16 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ def __init__(
raise WorkflowConfigError("missing [scheduling][[graph]] section.")
# (The check that 'graph' is defined is below).

# Override the workflow defn with an initial point from the CLI.
# Override the workflow defn with an initial point from the CLI
# or from reload/restart:
icp_str = getattr(self.options, 'icp', None)
if icp_str is not None:
self.cfg['scheduling']['initial cycle point'] = icp_str
Expand Down Expand Up @@ -564,7 +565,7 @@ def __init__(
self._upg_wflow_event_names()

self.mem_log("config.py: before load_graph()")
self.load_graph()
self._load_graph()
self.mem_log("config.py: after load_graph()")

self._set_completion_expressions()
Expand Down Expand Up @@ -684,10 +685,10 @@ def prelim_process_graph(self) -> None:
all(item in ['graph', '1', 'R1'] for item in graphdict)
):
# Pure acyclic graph, assume integer cycling mode with '1' cycle
self.cfg['scheduling']['cycling mode'] = INTEGER_CYCLING_TYPE
for key in ('initial cycle point', 'final cycle point'):
if key not in self.cfg['scheduling']:
self.cfg['scheduling'][key] = '1'
self.cfg['scheduling']['cycling mode'] = INTEGER_CYCLING_TYPE

def process_utc_mode(self):
"""Set UTC mode from config or from stored value on restart.
Expand Down Expand Up @@ -761,7 +762,6 @@ def process_initial_cycle_point(self) -> None:
Sets:
self.initial_point
self.cfg['scheduling']['initial cycle point']
self.evaluated_icp
Raises:
WorkflowConfigError - if it fails to validate
"""
Expand All @@ -775,11 +775,6 @@ def process_initial_cycle_point(self) -> None:
raise WorkflowConfigError(
"This workflow requires an initial cycle point.")
icp = _parse_iso_cycle_point(orig_icp)
self.evaluated_icp = None
if icp != orig_icp:
# now/next()/previous() was used, need to store
# evaluated point in DB
self.evaluated_icp = icp
self.initial_point = get_point(icp).standardise()
self.cfg['scheduling']['initial cycle point'] = str(self.initial_point)

Expand Down Expand Up @@ -2311,7 +2306,7 @@ def _close_families(l_id, r_id, clf_map):

return lret, rret

def load_graph(self):
def _load_graph(self):
"""Parse and load dependency graph."""
LOG.debug("Parsing the dependency graph")

Expand All @@ -2335,18 +2330,14 @@ def load_graph(self):
section = get_sequence_cls().get_async_expr()
graphdict[section] = graphdict.pop('graph')

icp = self.cfg['scheduling']['initial cycle point']
icp = str(self.initial_point)
fcp = self.cfg['scheduling']['final cycle point']

# Make a stack of sections and graphs [(sec1, graph1), ...]
sections = []
for section, value in self.cfg['scheduling']['graph'].items():
# Substitute initial and final cycle points.
if icp:
section = section.replace("^", icp)
elif "^" in section:
raise WorkflowConfigError("Initial cycle point referenced"
" (^) but not defined.")
section = section.replace("^", icp)
if fcp:
section = section.replace("$", fcp)
elif "$" in section:
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/cycling/integer.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ def init_from_cfg(_):
pass


def get_dump_format(cycling_type=None):
def get_dump_format() -> None:
"""Return cycle point string dump format."""
# Not used for integer cycling.
return None
Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,9 @@ def __lt__(self, other):
def __str__(self):
return self.value

def __repr__(self) -> str:
return f"<{type(self).__name__} {self.value}>"

def __hash__(self) -> int:
return hash(self.value)

Expand Down Expand Up @@ -902,7 +905,7 @@ def init(num_expanded_year_digits=0, custom_dump_format=None, time_zone=None,
return WorkflowSpecifics


def get_dump_format():
def get_dump_format() -> str:
"""Return cycle point string dump format."""
return WorkflowSpecifics.DUMP_FORMAT

Expand Down
28 changes: 24 additions & 4 deletions cylc/flow/cycling/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@
Each task may have multiple sequences, e.g. 12-hourly and 6-hourly.
"""

from typing import Optional, Type, overload
from typing import (
Literal,
Optional,
Type,
overload,
)

from cylc.flow.cycling import PointBase, integer, iso8601
from metomi.isodatetime.data import Calendar

from cylc.flow.cycling import (
PointBase,
integer,
iso8601,
)


ISO8601_CYCLING_TYPE = iso8601.CYCLER_TYPE_ISO8601
INTEGER_CYCLING_TYPE = integer.CYCLER_TYPE_INTEGER
Expand Down Expand Up @@ -88,8 +98,18 @@ def get_point_cls(cycling_type: Optional[str] = None) -> Type[PointBase]:
return POINTS[cycling_type]


def get_dump_format(cycling_type=None):
"""Return cycle point dump format, or None."""
@overload
def get_dump_format(cycling_type: Literal["integer"]) -> None:
...


@overload
def get_dump_format(cycling_type: Literal["iso8601"]) -> str:
...


def get_dump_format(cycling_type: Literal["integer", "iso8601"]) -> str | None:
"""Return cycle point dump format (None for integer mode)."""
return DUMP_FORMAT_GETTERS[cycling_type]()


Expand Down
10 changes: 5 additions & 5 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ def apply_new_config(self, config, is_reload=False):
})

def _set_workflow_params(
self, params: Iterable[Tuple[str, Optional[str]]]
self, params: Iterable[tuple[str, str | None]]
) -> None:
"""Set workflow params on restart/reload.

Expand All @@ -1240,20 +1240,20 @@ def _set_workflow_params(
* A flag to indicate if the workflow should be paused or not.
* Original workflow run time zone.
"""
LOG.info('LOADING workflow parameters')
LOG.info("LOADING saved workflow parameters")
for key, value in params:
if key == self.workflow_db_mgr.KEY_RUN_MODE:
self.options.run_mode = value or RunMode.LIVE.value
LOG.info(f"+ run mode = {value}")
if value is None:
continue
if key in self.workflow_db_mgr.KEY_INITIAL_CYCLE_POINT_COMPATS:
if key == self.workflow_db_mgr.KEY_INITIAL_CYCLE_POINT:
self.options.icp = value
LOG.info(f"+ initial point = {value}")
elif key in self.workflow_db_mgr.KEY_START_CYCLE_POINT_COMPATS:
elif key == self.workflow_db_mgr.KEY_START_CYCLE_POINT:
self.options.startcp = value
LOG.info(f"+ start point = {value}")
elif key in self.workflow_db_mgr.KEY_FINAL_CYCLE_POINT_COMPATS:
elif key == self.workflow_db_mgr.KEY_FINAL_CYCLE_POINT:
if self.is_restart and self.options.fcp == 'reload':
LOG.debug(f"- final point = {value} (ignored)")
elif self.options.fcp is None:
Expand Down
12 changes: 2 additions & 10 deletions cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,8 @@ class WorkflowDatabaseManager:
"""Manage the workflow runtime private and public databases."""

KEY_INITIAL_CYCLE_POINT = 'icp'
KEY_INITIAL_CYCLE_POINT_COMPATS = (
KEY_INITIAL_CYCLE_POINT, 'initial_point')
KEY_START_CYCLE_POINT = 'startcp'
KEY_START_CYCLE_POINT_COMPATS = (
KEY_START_CYCLE_POINT, 'start_point')
KEY_FINAL_CYCLE_POINT = 'fcp'
KEY_FINAL_CYCLE_POINT_COMPATS = (KEY_FINAL_CYCLE_POINT, 'final_point')
KEY_STOP_CYCLE_POINT = 'stopcp'
KEY_UUID_STR = 'uuid_str'
KEY_CYLC_VERSION = 'cylc_version'
Expand Down Expand Up @@ -337,7 +332,7 @@ def put_workflow_params(self, schd: 'Scheduler') -> None:
This method queues the relevant insert statements.

Arguments:
schd (cylc.flow.scheduler.Scheduler): scheduler object.
schd: scheduler object.
"""
self.db_deletes_map[self.TABLE_WORKFLOW_PARAMS].append({})
self.db_inserts_map[self.TABLE_WORKFLOW_PARAMS].extend([
Expand All @@ -353,11 +348,8 @@ def put_workflow_params(self, schd: 'Scheduler') -> None:
])

# Store raw initial cycle point in the DB.
value = schd.config.evaluated_icp
value = None if value == 'reload' else value
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial cycle point cannot be "reload" (#4313)

self.put_workflow_params_1(
self.KEY_INITIAL_CYCLE_POINT,
value or str(schd.config.initial_point)
self.KEY_INITIAL_CYCLE_POINT, str(schd.config.initial_point)
)

for key in (
Expand Down
16 changes: 8 additions & 8 deletions tests/functional/cylc-combination-scripts/09-vr-icp-now.t
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

#------------------------------------------------------------------------------
# Ensure that validate step of Cylc VR cannot change the options object.
# Ensure that validate step of `cylc vr` does not set the --icp option for the
# restart step, as this would cause an InputError.
# See https://github.com/cylc/cylc-flow/issues/6262

. "$(dirname "$0")/test_header"
Expand All @@ -26,14 +27,13 @@ WORKFLOW_ID=$(workflow_id)

cp -r "${TEST_SOURCE_DIR}/${TEST_NAME_BASE}/flow.cylc" .

run_ok "${TEST_NAME_BASE}-vip" \
cylc vip . \
--workflow-name "${WORKFLOW_ID}" \
--no-detach \
--no-run-name
run_ok "${TEST_NAME_BASE}-vip" cylc vip . \
--workflow-name "${WORKFLOW_ID}" \
--no-detach \
--no-run-name \
--mode simulation

echo "# Some Comment" >> flow.cylc

run_ok "${TEST_NAME_BASE}-vr" \
cylc vr "${WORKFLOW_ID}" \
--stop-cycle-point 2020-01-01T00:02Z
cylc vr "${WORKFLOW_ID}" --no-detach --mode simulation
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
[scheduler]
[[events]]
restart timeout = PT0S
[scheduling]
initial cycle point = 2020
stop after cycle point = 2020-01-01T00:01Z
final cycle point = 2020
[[graph]]
PT1M = foo
P1Y = foo
[runtime]
[[foo]]
[[[simulation]]]
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/param_expand/01-basic.t
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,9 @@ cmp_ok '19.cylc' <<'__FLOW_CONFIG__'
[[templates]]
lang = %(lang)s
[scheduling]
cycling mode = integer
initial cycle point = 1
final cycle point = 1
cycling mode = integer
[[graph]]
R1 = <lang=c++> => <lang = fortran-2008>
[runtime]
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/reload/23-cycle-point-time-zone.t
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ poll_grep_workflow_log "Reload completed"
cylc stop --now --now "${WORKFLOW_NAME}"

log_scan "${TEST_NAME_BASE}-log-scan" "${WORKFLOW_RUN_DIR}/log/scheduler/log" 1 0 \
'LOADING workflow parameters' \
'LOADING saved workflow parameters' \
'+ cycle point time zone = +0100'

purge
2 changes: 1 addition & 1 deletion tests/functional/restart/52-cycle-point-time-zone.t
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ poll_workflow_running
cylc stop "${WORKFLOW_NAME}"

log_scan "${TEST_NAME_BASE}-log-scan" "${WORKFLOW_RUN_DIR}/log/scheduler/log" 1 0 \
'LOADING workflow parameters' \
'LOADING saved workflow parameters' \
'+ cycle point time zone = +0100'

purge
59 changes: 59 additions & 0 deletions tests/integration/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import Any
import pytest

from cylc.flow import commands
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.cfgspec.globalcfg import GlobalConfig
from cylc.flow.exceptions import (
Expand All @@ -33,6 +34,7 @@
from cylc.flow.parsec.exceptions import ListValueError
from cylc.flow.parsec.fileparse import read_and_proc
from cylc.flow.pathutil import get_workflow_run_pub_db_path
from cylc.flow.scheduler import Scheduler

Fixture = Any
param = pytest.param
Expand Down Expand Up @@ -778,3 +780,60 @@ async def test_task_event_bad_custom_template(
with pytest.raises(WorkflowConfigError, match=exception):
async with start(schd):
pass


async def test_icp_now_reload(
flow, scheduler, start, monkeypatch: pytest.MonkeyPatch, log_filter
):
"""initial cycle point = 'now' should not change from original value on
reload/restart, and sequences should remain intact.

https://github.com/cylc/cylc-flow/issues/7047
"""
def set_time(value):
monkeypatch.setattr(
'cylc.flow.config.get_current_time_string',
lambda *a, **k: f"2005-01-01T{value}Z",
)

wid = flow({
'scheduling': {
'initial cycle point': 'now',
'graph': {
'R1': 'cold => foo',
'PT15M': 'foo[-PT15M] => foo',
},
},
})
schd: Scheduler = scheduler(wid)

def main_check(icp):
assert str(schd.config.initial_point) == icp
assert schd.pool.get_task_ids() == {
f'{icp}/cold',
}
assert {str(seq) for seq in schd.config.sequences} == {
f'R1/{icp}/P0Y',
f'R/{icp}/PT15M',
}

set_time('06:00')
async with start(schd):
expected_icp = '20050101T0600Z'
main_check(expected_icp)

set_time('06:03')
await commands.run_cmd(commands.reload_workflow(schd))

main_check(expected_icp)

await commands.run_cmd(
commands.set_prereqs_and_outputs(
schd, [f'{expected_icp}/cold'], []
)
)
# Downstream task should have spawned on sequence:
assert schd.pool.get_task_ids() == {
f'{expected_icp}/foo',
}
assert not log_filter(level=logging.WARNING)
Loading
Loading