Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/7089.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes a memory build-up problem exposed by large workflows of many interconnected tasks.
28 changes: 21 additions & 7 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,15 @@
WORKFLOW: {'log_records': 10}
}

# Data types for protobuf memory reset
RESET_PROTOBUF_TYPES = {
TASKS,
TASK_PROXIES,
FAMILIES,
FAMILY_PROXIES,
WORKFLOW,
}

# internal runtime to protobuf field name mapping
RUNTIME_CFG_MAP_TO_FIELD = {
'completion': 'completion',
Expand Down Expand Up @@ -402,7 +411,7 @@ def runtime_from_partial(rtconfig, runtimeold: Optional[PbRuntime] = None):


def reset_protobuf_object(msg_class, msg_orig):
"""Reset object to clear memory build-up."""
"""Reset upb-protobuf object to clear memory build-up."""
# See: https://github.com/protocolbuffers/protobuf/issues/19674
# The new message instantiation needs happen on a separate line.
new_msg = msg_class()
Expand All @@ -421,6 +430,10 @@ def apply_delta(key, delta, data):

# Merge in updated fields
if getattr(delta, 'updated', False):
# Flag for clearing memory accumulation.
reset_protobuf = False
if key in RESET_PROTOBUF_TYPES:
reset_protobuf = True
if key == WORKFLOW:
# Clear fields that require overwrite with delta
field_set = {f.name for f, _ in delta.updated.ListFields()}
Expand All @@ -437,6 +450,10 @@ def apply_delta(key, delta, data):
lst = getattr(data[key], field_name)
while len(lst) > max_len:
lst.pop(0)

# Reset upb memory allocation post delta merge.
if reset_protobuf:
data[key] = reset_protobuf_object(MESSAGE_MAP[key], data[key])
else:
for element in delta.updated:
try:
Expand All @@ -446,8 +463,9 @@ def apply_delta(key, delta, data):
if field.name in CLEAR_FIELD_MAP[key]:
data_element.ClearField(field.name)
data_element.MergeFrom(element)
# Clear memory accumulation
if key in (TASKS, FAMILIES):

# Reset upb memory allocation post delta merge.
if reset_protobuf:
data[key][element.id] = reset_protobuf_object(
MESSAGE_MAP[key],
data_element
Expand All @@ -461,10 +479,6 @@ def apply_delta(key, delta, data):
)
continue

# Clear memory accumulation
if key == WORKFLOW:
data[key] = reset_protobuf_object(PbWorkflow, data[key])

# Prune data elements
if hasattr(delta, 'pruned'):
# UIS flag to prune workflow, set externally.
Expand Down
8 changes: 5 additions & 3 deletions tests/integration/scripts/test_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ async def test_data_store(
async with start(schd):
await schd.update_data_structure()
data = schd.data_store_mgr.data[schd.tokens.id]
task_a: PbTaskProxy = data[TASK_PROXIES][
schd.pool.get_task(IntegerPoint('1'), 'a').tokens.id
]
# 1/a not the same data-store object post set and not in pool
# post succeeded.
task_a_id = schd.pool.get_task(IntegerPoint('1'), 'a').tokens.id

# set the 1/a:succeeded prereq of 1/z
schd.pool.set_prereqs_and_outputs(
Expand All @@ -174,6 +174,7 @@ async def test_data_store(
{TaskTokens('1', 'a')}, ['x'], [], ['1']
)
await schd.update_data_structure()
task_a: PbTaskProxy = data[TASK_PROXIES][task_a_id]
assert task_a.state == TASK_STATUS_WAITING
assert task_a.outputs['x'].satisfied is True
assert task_a.outputs['succeeded'].satisfied is False
Expand All @@ -183,6 +184,7 @@ async def test_data_store(
{TaskTokens('1', 'a')}, ['succeeded'], [], ['1']
)
await schd.update_data_structure()
task_a: PbTaskProxy = data[TASK_PROXIES][task_a_id]
assert task_a.state == TASK_STATUS_SUCCEEDED
assert task_a.outputs['x'].satisfied is True
assert task_a.outputs['succeeded'].satisfied is True
Expand Down
Loading