Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6999.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Don't infer task output optionality from the right hand side of graph triggers.
29 changes: 29 additions & 0 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2394,6 +2394,35 @@ def load_graph(self):

self.set_required_outputs(task_output_opt)

# Print optional outputs, which can cause graph branching or branch
# termination at runtime.
if cylc.flow.flags.verbosity > 1:
# Print inferred output optionality, for debugging graph parser.
# (Note: this excludes tasks that default to success-required
# via the RHS of a trigger with no explicit output attached).
outputs = [
f" \u2022 {name}:{output} "
f"{'optional' if optional else 'required'}"
for (name, output), (optional, _, _) in task_output_opt.items()
]
if outputs:
print(
"Optional and required outputs inferred from the graph:\n"
f"{'\n'.join(outputs)}"
)
elif cylc.flow.flags.verbosity > 0:
# Just print optional outputs: the important ones for users.
optionals = [
f" \u2022 {name}:{output} optional"
for (name, output), (optional, _, _) in task_output_opt.items()
if optional
]
if optionals:
print(
"Optional outputs inferred from the graph:\n"
f"{'\n'.join(optionals)}"
)

# Detect use of xtrigger names with '@' prefix (creates a task).
overlap = set(self.taskdefs.keys()).intersection(
list(self.cfg['scheduling']['xtriggers']))
Expand Down
104 changes: 82 additions & 22 deletions cylc/flow/graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,44 @@ class GraphParser:
NODE(<REMOTE-WORKFLOW-QUALIFIER>)(:QUALIFIER)
* Outputs (boo:x) are ignored as triggers on the RHS to allow chaining:
"foo => bar:x => baz & qux"
"""

Chained triggers get decomposed into pairs for processing, e.g.:
x # lone node
a | b => c & d => e
becomes:
[None, x],
[None, a],
[None, b],
[a|b, c&d], --> [a|b, c], [a|b, d]
[c&d, e], --> [c, e], [d, e]
(Lone nodes and left-side nodes make [None, left] for auto-triggering.)

LEFT sides are TASK OUTPUTS (a plain name is short for "task:succeeded"):
a:x & b? & c => d
means:
a:x & b:succeeded? & c:succeeded => d

RIGHT sides are just TASKS to trigger when the left side outputs complete:
[left] => d # does NOT imply d:succeeded is required

But output optionality can be *explicitly* set on the right, if you like:
[left] => d?
[left] => e:x
means
[left] => d # AND d:succeeded is optional
[left] => e # AND e:x is required

How the graph parser determines output optionality:
In chained triggers every node appears on the right of a pair, so we
only need rights to set (and check consitency of) optional outputs.
x # [None, x] ... x is end-of-chain
a => b => c # [None, a], [a, b], [b, c] ... c is end-of-chain

But note we infer :succeeded (for plain task names) except at the end
of a chain that is not a lone node (i.e. for x, but not c).
i.e.: if NOT end-of-chain OR left is None.

"""
CYLC7_COMPAT = "CYLC 7 BACK-COMPAT"

OP_AND = '&'
Expand Down Expand Up @@ -285,6 +321,7 @@ def __init__(
self.original: Dict = {}
self.workflow_state_polling_tasks: Dict = {}
self.expire_triggers = expire_triggers
self.end_of_chain_nodes: set[str] = set()

# Record task outputs as optional or required:
# {(name, output): (is_optional, is_member)}
Expand Down Expand Up @@ -472,6 +509,11 @@ def parse_graph(self, graph_string: str) -> None:
for i in range(0, len(chain) - 1):
pairs.add((chain[i], chain[i + 1]))

# Record end of chain nodes (can be multiple with &)
self.end_of_chain_nodes.update(
chain[-1].split(self.__class__.OP_AND)
)

# Get a set of RH nodes which are not at the LH of another pair:
# terminals = {p[1] for p in pairs}.difference({p[0] for p in pairs})

Expand All @@ -481,6 +523,7 @@ def parse_graph(self, graph_string: str) -> None:

for pair in sorted(pairs, key=lambda p: str(p[0])):
self._proc_dep_pair(pair, check_terminals, lefts, rights)

self.terminals = rights.difference(lefts)
for right in self.terminals:
left = check_terminals.get(right)
Expand Down Expand Up @@ -621,6 +664,10 @@ def _proc_dep_pair(
expr = re.sub(this, that, expr)
else:
# Make success triggers explicit.
if name in self.family_map:
raise GraphParseError(
f"Family trigger required: {left} => {right}"
)
Comment on lines +667 to +670
Copy link
Member Author

@hjoliver hjoliver Sep 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug in master: the code here was adding :succeeded to bare family names on the left side of trigger pairs: FAM --> FAM:succeeded.

This results in failed validation later, but with a cryptic error message.

E.g. for a => FAM => c on master:

$ cylc val .
GraphParseError: Illegal family trigger in FAM:succeeded

On this branch:

$ cylc val .
GraphParseError: Family trigger required: FAM => c

n_trig = TASK_OUTPUT_SUCCEEDED
if offset:
this = r'\b%s\b%s(?!:)' % (
Expand Down Expand Up @@ -796,6 +843,7 @@ def _set_output_opt(
for outp in [TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_FAILED]:
self._set_output_opt(
name, outp, optional, suicide, fam_member)
return
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug in master, possibly inconsequential though.


try:
prev_optional, prev_default, prev_fixed = (
Expand Down Expand Up @@ -913,39 +961,51 @@ def _compute_triggers(
if name in self.family_map:
fam = True
rhs_members = self.family_map[name]
if not output:
# (Plain family name on RHS).
# Make implicit success explicit.
if not output and not expr:
# Infer :succeed-all for lone family nodes, e.g. for
# "R1 = FAM": members default to success required, or
# for "FAM:succeed-all?" they default to success optional.
output = QUAL_FAM_SUCCEED_ALL
elif output.startswith("finish"):
elif output and output.startswith("finish"):
if optional:
raise GraphParseError(
f"Family pseudo-output {name}:{output} can't be"
" optional")
# But implicit optional for the real succeed/fail outputs.
optional = True
try:
outputs = self.__class__.fam_to_mem_output_map[output]
except KeyError:
# Illegal family trigger on RHS of a pair.
raise GraphParseError(
f"Illegal family trigger: {name}:{output}"
) from None
else:
# Implicit optional for the real succeed/fail outputs.
optional = True
if output:
try:
outputs = self.__class__.fam_to_mem_output_map[output]
except KeyError:
# Illegal family trigger on RHS of a pair.
raise GraphParseError(
f"Illegal family trigger: {name}:{output}"
) from None
else:
outputs = ['']
else:
fam = False
if not output:
# Make implicit success explicit.
output = TASK_OUTPUT_SUCCEEDED
else:
rhs_members = [name]
if output:
# Convert to standard output names if necessary.
output = TaskTrigger.standardise_name(output)
rhs_members = [name]
outputs = [output]
elif optional or (
right not in self.end_of_chain_nodes
or not expr
):
# Infer "name:succeeded?" for explicit "name?"
# Infer "name:succeeded" for plain "name"
# if not (end-of-chain and left is not None)
output = TASK_OUTPUT_SUCCEEDED
outputs = [output] # may be [None]

for mem in rhs_members:
if not offset:
# Nodes with offsets on the RHS do not define triggers.
self._set_triggers(mem, suicide, trigs, expr, orig_expr)
for output in outputs:
# But they must be consistent with output optionality.
self._set_output_opt(mem, output, optional, suicide, fam)
if output:
# Infer optionality for explicit outputs on RHS.
self._set_output_opt(
mem, output, optional, suicide, fam)
Loading