diff --git a/changes.d/6999.feat.md b/changes.d/6999.feat.md new file mode 100644 index 0000000000..aba0ad5353 --- /dev/null +++ b/changes.d/6999.feat.md @@ -0,0 +1 @@ +Don't infer task output optionality from the right hand side of graph triggers. diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 5b5ab40e26..519dd49afc 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -2394,6 +2394,29 @@ def load_graph(self): self.set_required_outputs(task_output_opt) + # Print inferred output optionality, for debugging graph parser. + # Note this excludes tasks that just default to success-required. + optionals = [ + f" \u2022 {name}:{output}" + for (name, output), (optional, _, _) in task_output_opt.items() + if optional + ] + requireds = [ + f" \u2022 {name}:{output}" + for (name, output), (optional, _, _) in task_output_opt.items() + if not optional + ] + if optionals: + LOG.debug( + "Optional outputs inferred from the graph:\n" + f"{'\n'.join(optionals)}" + ) + if requireds: + LOG.debug( + "Required outputs inferred from the graph:\n" + f"{'\n'.join(requireds)}" + ) + # Detect use of xtrigger names with '@' prefix (creates a task). overlap = set(self.taskdefs.keys()).intersection( list(self.cfg['scheduling']['xtriggers'])) diff --git a/cylc/flow/graph_parser.py b/cylc/flow/graph_parser.py index b0046913aa..c3f2969e8d 100644 --- a/cylc/flow/graph_parser.py +++ b/cylc/flow/graph_parser.py @@ -104,8 +104,44 @@ class GraphParser: NODE()(:QUALIFIER) * Outputs (boo:x) are ignored as triggers on the RHS to allow chaining: "foo => bar:x => baz & qux" - """ + Chained triggers get decomposed into pairs for processing, e.g.: + x # lone node + a | b => c & d => e + becomes: + [None, x], + [None, a], + [None, b], + [a|b, c&d], --> [a|b, c], [a|b, d] + [c&d, e], --> [c, e], [d, e] + (Lone nodes and left-side nodes make [None, left] for auto-triggering.) + + LEFT sides are TASK OUTPUTS (a plain name is short for "task:succeeded"): + a:x & b? & c => d + means: + a:x & b:succeeded? & c:succeeded => d + + RIGHT sides are just TASKS to trigger when the left side outputs complete: + [left] => d # does NOT imply d:succeeded is required + + But output optionality can be *explicitly* set on the right, if you like: + [left] => d? + [left] => e:x + means + [left] => d # AND d:succeeded is optional + [left] => e # AND e:x is required + + How the graph parser determines output optionality: + In chained triggers every node appears on the right of a pair, so we + only need rights to set (and check consitency of) optional outputs. + x # [None, x] ... x is end-of-chain + a => b => c # [None, a], [a, b], [b, c] ... c is end-of-chain + + But note we infer :succeeded (for plain task names) except at the end + of a chain that is not a lone node (i.e. for x, but not c). + i.e.: if NOT end-of-chain OR left is None. + + """ CYLC7_COMPAT = "CYLC 7 BACK-COMPAT" OP_AND = '&' @@ -285,6 +321,7 @@ def __init__( self.original: Dict = {} self.workflow_state_polling_tasks: Dict = {} self.expire_triggers = expire_triggers + self.end_of_chain_nodes: set[str] = set() # Record task outputs as optional or required: # {(name, output): (is_optional, is_member)} @@ -472,6 +509,11 @@ def parse_graph(self, graph_string: str) -> None: for i in range(0, len(chain) - 1): pairs.add((chain[i], chain[i + 1])) + # Record end of chain nodes (can be multiple with &) + self.end_of_chain_nodes.update( + chain[-1].split(self.__class__.OP_AND) + ) + # Get a set of RH nodes which are not at the LH of another pair: # terminals = {p[1] for p in pairs}.difference({p[0] for p in pairs}) @@ -481,6 +523,7 @@ def parse_graph(self, graph_string: str) -> None: for pair in sorted(pairs, key=lambda p: str(p[0])): self._proc_dep_pair(pair, check_terminals, lefts, rights) + self.terminals = rights.difference(lefts) for right in self.terminals: left = check_terminals.get(right) @@ -621,6 +664,10 @@ def _proc_dep_pair( expr = re.sub(this, that, expr) else: # Make success triggers explicit. + if name in self.family_map: + raise GraphParseError( + f"Family trigger required: {left} => {right}" + ) n_trig = TASK_OUTPUT_SUCCEEDED if offset: this = r'\b%s\b%s(?!:)' % ( @@ -796,6 +843,7 @@ def _set_output_opt( for outp in [TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED]: self._set_output_opt( name, outp, optional, suicide, fam_member) + return try: prev_optional, prev_default, prev_fixed = ( @@ -913,39 +961,51 @@ def _compute_triggers( if name in self.family_map: fam = True rhs_members = self.family_map[name] - if not output: - # (Plain family name on RHS). - # Make implicit success explicit. + if not output and not expr: + # Infer :succeed-all for lone family nodes, e.g. for + # "R1 = FAM": members default to success required, or + # for "FAM:succeed-all?" they default to success optional. output = QUAL_FAM_SUCCEED_ALL - elif output.startswith("finish"): + elif output and output.startswith("finish"): if optional: raise GraphParseError( f"Family pseudo-output {name}:{output} can't be" " optional") - # But implicit optional for the real succeed/fail outputs. - optional = True - try: - outputs = self.__class__.fam_to_mem_output_map[output] - except KeyError: - # Illegal family trigger on RHS of a pair. - raise GraphParseError( - f"Illegal family trigger: {name}:{output}" - ) from None + else: + # Implicit optional for the real succeed/fail outputs. + optional = True + if output: + try: + outputs = self.__class__.fam_to_mem_output_map[output] + except KeyError: + # Illegal family trigger on RHS of a pair. + raise GraphParseError( + f"Illegal family trigger: {name}:{output}" + ) from None + else: + outputs = [''] else: fam = False - if not output: - # Make implicit success explicit. - output = TASK_OUTPUT_SUCCEEDED - else: + rhs_members = [name] + if output: # Convert to standard output names if necessary. output = TaskTrigger.standardise_name(output) - rhs_members = [name] - outputs = [output] + elif optional or ( + right not in self.end_of_chain_nodes + or not expr + ): + # Infer "name:succeeded?" for explicit "name?" + # Infer "name:succeeded" for plain "name" + # if not (end-of-chain and left is not None) + output = TASK_OUTPUT_SUCCEEDED + outputs = [output] # may be [None] for mem in rhs_members: if not offset: # Nodes with offsets on the RHS do not define triggers. self._set_triggers(mem, suicide, trigs, expr, orig_expr) for output in outputs: - # But they must be consistent with output optionality. - self._set_output_opt(mem, output, optional, suicide, fam) + if output: + # Infer optionality for explicit outputs on RHS. + self._set_output_opt( + mem, output, optional, suicide, fam) diff --git a/tests/integration/test_optional_outputs.py b/tests/integration/test_optional_outputs.py index 1b8e390668..4101db2456 100644 --- a/tests/integration/test_optional_outputs.py +++ b/tests/integration/test_optional_outputs.py @@ -24,7 +24,9 @@ from itertools import combinations from typing import TYPE_CHECKING +import logging import pytest +import itertools from cylc.flow.commands import ( run_cmd, @@ -32,6 +34,7 @@ ) from cylc.flow.cycling.integer import IntegerPoint from cylc.flow.cycling.iso8601 import ISO8601Point +from cylc.flow.exceptions import WorkflowConfigError from cylc.flow.id import TaskTokens, Tokens from cylc.flow.network.resolvers import TaskMsg from cylc.flow.task_events_mgr import ( @@ -57,6 +60,9 @@ from cylc.flow.scheduler import Scheduler +OPT_BOTH_ERR = "Output {} can't be both required and optional" + + def reset_outputs(itask: 'TaskProxy'): """Undo the consequences of setting task outputs. @@ -508,3 +514,438 @@ async def test_removed_taskdef( assert z_1.state.outputs._completion_expression == '' z_1.state.outputs.set_message_complete(TASK_OUTPUT_FAILED) assert z_1.is_complete() + + +@pytest.mark.parametrize( + 'graph, err', + [ + pytest.param( + """ + a + a? + """, + OPT_BOTH_ERR.format("a:succeeded"), + ), + pytest.param( + """ + a => b + a? + """, + OPT_BOTH_ERR.format("a:succeeded"), + ), + pytest.param( + """ + a? => b + a + """, + OPT_BOTH_ERR.format("a:succeeded"), + ), + pytest.param( + """ + a => b? + b + """, + OPT_BOTH_ERR.format("b:succeeded"), + ), + pytest.param( + """ + a => b:succeeded + b? + """, + OPT_BOTH_ERR.format("b:succeeded"), + ), + pytest.param( + """ + a => b:succeeded + c => b? + """, + OPT_BOTH_ERR.format("b:succeeded"), + ), + pytest.param( + """ + c:x => d + a => c:x? + """, + OPT_BOTH_ERR.format("c:x"), + ), + pytest.param( + """ + c:x? => d + a => c:x + """, + OPT_BOTH_ERR.format("c:x"), + ), + pytest.param( + """ + FAM:finish-all? + """, + "Family pseudo-output FAM:finish-all can't be optional", + ), + pytest.param( + """ + a => b => c + b? + """, + OPT_BOTH_ERR.format("b:succeeded"), + ), + pytest.param( + """ + a => FAM => c + """, + "Family trigger required: FAM => c", + ), + ], + ids=itertools.count() +) +async def test_optional_outputs_consistency(flow, validate, graph, err): + """Check that inconsistent output optionality fails validation.""" + id_ = flow( + { + 'scheduling': { + 'graph': { + 'R1': graph + }, + }, + 'runtime': { + 'FAM': {}, + 'm1, m2': { + 'inherit': 'FAM', + }, + 'c': { + 'outputs': { + 'x': 'x', + }, + }, + }, + }, + ) + with pytest.raises(WorkflowConfigError) as exc_ctx: + validate(id_) + assert err in str(exc_ctx.value) + + +@pytest.mark.parametrize( + 'graph, expected', + [ + pytest.param( + "a => b", + { + ("a", "succeeded"): True, # inferred + ("b", "succeeded"): True, # default + ("a", "failed"): None, # (not set) + ("b", "failed"): None, # (not set) + }, + ), + pytest.param( + """ + a => b + b? + """, + { + ("a", "succeeded"): True, # inferred + ("b", "succeeded"): False, # inferred + ("b", "failed"): None, # (not set) + }, + ), + pytest.param( + """ + a:failed => b + """, + { + ("a", "failed"): True, + ("b", "succeeded"): True, + ("a", "succeeded"): None, + ("b", "failed"): None, + }, + ), + pytest.param( + """ + a => b + b + """, + { + ("a", "succeeded"): True, + ("b", "succeeded"): True, + }, + ), + pytest.param( + """ + a => b + b? + """, + { + ("a", "succeeded"): True, + ("b", "succeeded"): False, + }, + ), + pytest.param( + """ + a? => b + """, + { + ("a", "succeeded"): False, + ("b", "succeeded"): True, + }, + ), + pytest.param( + """ + a? => b + b? + """, + { + ("a", "succeeded"): False, + ("b", "succeeded"): False, + }, + ), + pytest.param( + """ + a? => b? + """, + { + ("a", "succeeded"): False, + ("b", "succeeded"): False, + }, + ), + pytest.param( + """ + FAM + """, + { + ("m1", "succeeded"): True, # family default + ("m2", "succeeded"): True, # family default + }, + ), + pytest.param( + """ + FAM:succeed-all? + """, + { + ("m1", "succeeded"): False, # family default + ("m2", "succeeded"): False, # family default + }, + ), + pytest.param( + """ + FAM + m1? + """, + { + ("m1", "succeeded"): False, # inferred + ("m2", "succeeded"): True, # family default + }, + ), + pytest.param( + """ + a => FAM + """, + { + ("a", "succeeded"): True, # inferred + ("m1", "succeeded"): True, # default + ("m2", "succeeded"): True, # default + }, + ), + pytest.param( + """ + a => FAM + m2? + """, + { + ("a", "succeeded"): True, # inferred + ("m1", "succeeded"): True, # default + ("m2", "succeeded"): False, # inferred (override default) + }, + ), + pytest.param( + """ + a => FAM:finish-all + """, + { + ("a", "succeeded"): True, # inferred + ("m1", "succeeded"): False, # family default + ("m2", "succeeded"): False, # family default + }, + ), + pytest.param( + """ + FAM:succeed-any => a + """, + { + ("a", "succeeded"): True, # inferred + ("m1", "succeeded"): True, # family default + ("m2", "succeeded"): True, # family default + }, + ), + pytest.param( + """ + FAM:succeed-any? => a + """, + { + ("a", "succeeded"): True, # inferred + ("m1", "succeeded"): False, # family default + ("m2", "succeeded"): False, # family default + }, + ), + pytest.param( + """ + FAM:succeed-any => a + m1? + """, + { + ("a", "succeeded"): True, + ("m1", "succeeded"): False, + ("m2", "succeeded"): True, + }, + ), + pytest.param( + """ + a & b? => c + """, + { + ("a", "succeeded"): True, # inferred + ("b", "succeeded"): False, # inferred + ("c", "succeeded"): True, # default + }, + ), + pytest.param( + """ + a => c:x + """, + { + ("a", "succeeded"): True, # inferred + ("c", "succeeded"): True, # default + ("c", "x"): True, # inferred + }, + ), + pytest.param( + """ + a => c:x? + """, + { + ("a", "succeeded"): True, # inferred + ("c", "succeeded"): True, # default + ("c", "x"): False, # inferred + }, + ), + pytest.param( + """ + a => b => c # infer :succeeded for b inside chain + """, + { + ("a", "succeeded"): True, # inferred + ("b", "succeeded"): True, # inferred + ("c", "succeeded"): True, # default + }, + ), + pytest.param( + # Check we don't infer c:succeeded at end-of-chain + # when there's an & at the end. + """ + a => b & c + c? + """, + { + ("a", "succeeded"): True, # inferred + ("b", "succeeded"): True, # default + ("c", "succeeded"): False, # inferred + }, + ), + ], + ids=itertools.count() +) +async def test_optional_outputs_inference( + flow, validate, graph, expected +): + """Check task output optionality after graph parsing. + + This checks taskdef.outputs, which holds inferred and default values. + + """ + id = flow( + { + 'scheduling': { + 'graph': { + 'R1': graph + }, + }, + 'runtime': { + 'FAM': {}, + 'm1, m2': { + 'inherit': 'FAM', + }, + 'c': { + 'outputs': { + 'x': 'x', + }, + }, + }, + } + ) + config = validate(id) + for (task, output), exp in expected.items(): + tdef = config.get_taskdef(task) + (_, required) = tdef.outputs[output] + assert required == exp + + +async def test_log_outputs(flow, validate, caplog): + """Test logging of optional and required outputs inferred from the graph. + + This probes output optionality inferred by the graph parser, so it does + not include RHS-only tasks that just default to :succeeded required. + + """ + id = flow( + { + 'scheduling': { + 'graph': { + 'R1': """ + # (b:succeeded required by default, not by inference) + a? => FAM:succeed-all? => b + m1 + a? => c:x? + a? => c:y + """, + }, + }, + 'runtime': { + 'FAM': {}, + 'm1, m2': { + 'inherit': 'FAM', + }, + 'c': { + "outputs": { + "x": "x", + "y": "y" + } + } + } + } + ) + caplog.set_level(logging.DEBUG) + validate(id) + + found_opt = False + found_req = False + + for record in caplog.records: + msg = record.message + if "Optional outputs inferred from the graph:" in msg: + found_opt = True + for output in ["a:succeeded", "m2:succeeded", "c:x"]: + assert output in msg + for output in [ + "b:succeeded", "m1:succeeded", "c:y", "c:succeeded" + ]: + assert output not in msg + elif "Required outputs inferred from the graph:" in msg: + found_req = True + for output in ["m1:succeeded", "c:y"]: + assert output in msg + for output in [ + "m2:succeeded", "b:succeeded", "a:succeeded", "c:x", + "c:succeeded" + ]: + assert output not in msg + + assert found_opt + assert found_req diff --git a/tests/unit/test_graph_parser.py b/tests/unit/test_graph_parser.py index 98022b0721..bc6e18f674 100644 --- a/tests/unit/test_graph_parser.py +++ b/tests/unit/test_graph_parser.py @@ -140,15 +140,17 @@ def test_graph_syntax_errors_2(seq, graph, expected_err): ), # See https://github.com/cylc/cylc-flow/issues/6523 # For the next 4 tests: + # NB "foo:succeeded" now explicit on the right (this used to test + # inferred "foo:succeeded" from "=> foo", which we no longer infer). param( # Yes I know it's circular, but it's here to # demonstrate that the test below is broken: - "foo:finished => foo", + "foo:finished => foo:succeeded", 'Output foo:succeeded can\'t be both required and optional', id='finish-implies-success-optional' ), param( - "foo[-P1]:finish => foo", + "foo[-P1]:finish => foo:succeeded", 'Output foo:succeeded can\'t be both required and optional', id='finish-implies-success-optional-offset' ), @@ -159,7 +161,7 @@ def test_graph_syntax_errors_2(seq, graph, expected_err): id='succeed-or-failed-mustbe-optional' ), param( - "foo[-P1]:succeeded? | foo[-P1]:failed? => foo", + "foo[-P1]:succeeded? | foo[-P1]:failed? => foo:succeeded", 'Output foo:succeeded can\'t be both required and optional', id='succeed-or-failed-implies-success-optional' ), @@ -705,14 +707,21 @@ def test_parse_graph_fails_with_too_many_continuations(before, after): def test_task_optional_outputs(): - """Test optional outputs are correctly parsed from graph.""" + """Test optional outputs are correctly parsed from graph. + + This checks "task_output_opt" dict which holds output optionality inferred + from the graph. Note since https://github.com/cylc/cylc-flow/pull/6999 + we no longer infer optionality from *implicit* outputs on RHS of triggers, + i.e. "a => b" does not imply b:succeeded is a required output. + + """ OPTIONAL = True REQUIRED = False gp = GraphParser() gp.parse_graph( """ - a1 => b1 - a2:succeed => b2 + a1 => b1 # does not imply b1:succeeded ... + a2:succeed => b2:succeeded a3:succeed => b3:succeed c1? => d1? @@ -726,10 +735,11 @@ def test_task_optional_outputs(): ) for i in range(1, 4): for task in (f'a{i}', f'b{i}'): - assert ( - gp.task_output_opt[(task, TASK_OUTPUT_SUCCEEDED)] - == (REQUIRED, False, True) - ) + if task != "b1": + assert ( + gp.task_output_opt[(task, TASK_OUTPUT_SUCCEEDED)] + == (REQUIRED, False, True) + ) for task in (f'c{i}', f'd{i}'): assert ( @@ -834,7 +844,7 @@ def test_cannot_be_required(): ], [ "FAM => foo", # bare family on LHS - "Illegal family trigger" + "Family trigger required: FAM => foo" ], [ "FAM:expire-all => foo",