diff --git a/.gitignore b/.gitignore index b18c09ad1eb..f7e26153db0 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,9 @@ __pycache__/ # vscode .vscode +# pycharm +.idea + # processed workflow configs *.rc.processed *.cylc.processed diff --git a/.mailmap b/.mailmap index aca6b1acd25..5f1a688518b 100644 --- a/.mailmap +++ b/.mailmap @@ -58,4 +58,5 @@ github-actions[bot] github-actions[bot] GitHub Action Diquan Jabbour <165976689+Diquan-BOM@users.noreply.github.com> Maxime Rio +Christopher Bennett ChrisPaulBennett Christopher Bennett christopher.bennett \ No newline at end of file diff --git a/changes.d/6663.feat.md b/changes.d/6663.feat.md new file mode 100644 index 00000000000..9a40cc43d59 --- /dev/null +++ b/changes.d/6663.feat.md @@ -0,0 +1 @@ +Adding CPU time and Max RSS to Analysis Tools diff --git a/cylc/flow/cfgspec/globalcfg.py b/cylc/flow/cfgspec/globalcfg.py index d4328e4e934..866e429f79a 100644 --- a/cylc/flow/cfgspec/globalcfg.py +++ b/cylc/flow/cfgspec/globalcfg.py @@ -1471,6 +1471,28 @@ def default_for( .. versionadded:: 8.0.0 ''') + + with Conf('profile'): + Conf('activate', VDR.V_BOOLEAN, False, desc=''' + A Boolean that sets if the cylc profiler will be used + + .. versionadded:: 8.0.0 + ''') + Conf('cgroups path', VDR.V_STRING, + default='/sys/fs/cgroup', + desc=''' + The path to the cgroups filesystem. The default value + (/sys/fs/cgroup) is the standard location for cgroups on + linux and should work in most circumstances''') + Conf('polling interval', VDR.V_INTEGER, + default=10, + desc=''' + The interval (in seconds) at which the profiler will + poll the cgroups filesystem for resource usage data. + The default value of 10 seconds should be sufficient for + most use cases, but can be adjusted as needed. + ''') + Conf('job runner', VDR.V_STRING, 'background', desc=f''' The system used to run jobs on the platform. diff --git a/cylc/flow/etc/job.sh b/cylc/flow/etc/job.sh index c64edfda298..16849e9f4b2 100755 --- a/cylc/flow/etc/job.sh +++ b/cylc/flow/etc/job.sh @@ -139,6 +139,12 @@ cylc__job__main() { mkdir -p "$(dirname "${CYLC_TASK_WORK_DIR}")" || true mkdir -p "${CYLC_TASK_WORK_DIR}" cd "${CYLC_TASK_WORK_DIR}" + + if [[ "${CYLC_PROFILE}" == "True" ]] ; then + cylc profile -m "${CYLC_CGROUP}" -i "${CYLC_POLLING_INTERVAL}" & + export profiler_pid="$!" + fi + # Env-Script, User Environment, Pre-Script, Script and Post-Script # Run user scripts in subshell to protect cylc job script from interference. # Waiting on background process allows signal traps to trigger immediately. @@ -157,11 +163,15 @@ cylc__job__main() { cylc__set_return "$ret_code" fi } + # Grab the max rss and cpu_time and clean up before changing directory + cylc__kill_profiler # Empty work directory remove cd rmdir "${CYLC_TASK_WORK_DIR}" 2>'/dev/null' || true # Send task succeeded message + wait "${CYLC_TASK_MESSAGE_STARTED_PID}" 2>'/dev/null' || true + cylc message -- "${CYLC_WORKFLOW_ID}" "${CYLC_TASK_JOB}" 'succeeded' || true # (Ignore shellcheck "globbing and word splitting" warning here). # shellcheck disable=SC2086 @@ -187,6 +197,14 @@ cylc__set_return() { return "${1:-0}" } +############################################################################### +# Save the data using cylc message and exit the profiler +cylc__kill_profiler() { + if [[ -n "${profiler_pid:-}" ]] && ps -p "$profiler_pid" > /dev/null; then + kill -s SIGINT "${profiler_pid}" || true + fi +} + ############################################################################### # Disable selected or all (if no arguments given) fail traps. # Globals: @@ -268,6 +286,9 @@ cylc__job_finish_err() { # (Ignore shellcheck "globbing and word splitting" warning here). # shellcheck disable=SC2086 trap '' ${CYLC_VACATION_SIGNALS:-} ${CYLC_FAIL_SIGNALS} + + cylc__kill_profiler + if [[ -n "${CYLC_TASK_MESSAGE_STARTED_PID:-}" ]]; then wait "${CYLC_TASK_MESSAGE_STARTED_PID}" 2>'/dev/null' || true fi diff --git a/cylc/flow/job_file.py b/cylc/flow/job_file.py index 930331dc5a4..cc44aa42e03 100644 --- a/cylc/flow/job_file.py +++ b/cylc/flow/job_file.py @@ -224,8 +224,16 @@ def _write_task_environment(self, handle, job_conf): '\n export CYLC_TASK_TRY_NUMBER=%s' % job_conf['try_num']) handle.write( "\n export CYLC_TASK_FLOW_NUMBERS=" - f"{','.join(str(f) for f in job_conf['flow_nums'])}" - ) + f"{','.join(str(f) for f in job_conf['flow_nums'])}") + handle.write( + "\n export CYLC_PROFILE=" + f"{job_conf['platform']['profile']['activate']}") + handle.write( + "\n export CYLC_CGROUP=" + f"{job_conf['platform']['profile']['cgroups path']}") + handle.write( + "\n export CYLC_POLLING_INTERVAL=" + f"{job_conf['platform']['profile']['polling interval']}") # Standard parameter environment variables for var, val in job_conf['param_var'].items(): handle.write('\n export CYLC_TASK_PARAM_%s="%s"' % (var, val)) diff --git a/cylc/flow/scripts/profiler.py b/cylc/flow/scripts/profiler.py new file mode 100755 index 00000000000..ef26ed35256 --- /dev/null +++ b/cylc/flow/scripts/profiler.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python3 +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +"""cylc profiler [OPTIONS] + +Profiler which periodically polls cgroups to track +the resource usage of jobs running on the node. +""" + +import os +import re +import sys +import time +import signal +import asyncio + +from pathlib import Path +from functools import partial +from dataclasses import dataclass + +from cylc.flow.exceptions import CylcError +from cylc.flow.terminal import cli_function +from cylc.flow.option_parsers import CylcOptionParser as COP +from cylc.flow.network.client_factory import get_client + + +PID_REGEX = re.compile(r"([^:]*\d{6,}.*)") +RE_INT = re.compile(r'\d+') + + +@dataclass +class Process: + """Class for representing CPU and Memory usage of a process""" + cgroup_memory_path: str + cgroup_cpu_path: str + memory_allocated_path: str + cgroup_version: int + + +def stop_profiler(process, comms_timeout, *args): + """This function will be executed when the SIGINT signal is sent + to this process""" + + max_rss, cpu_time, memory_allocated = get_resource_usage(process) + + graphql_mutation = """ + mutation($WORKFLOWS: [WorkflowID]!, + $MESSAGES: [[String]], $JOB: String!, $TIME: String) { + message(workflows: $WORKFLOWS, messages:$MESSAGES, + taskJob:$JOB, eventTime:$TIME) { + result + } + } + """ + + graphql_request_variables = { + "WORKFLOWS": [os.environ.get('CYLC_WORKFLOW_ID')], + "MESSAGES": [[ + "DEBUG", + f"cpu_time {cpu_time} " + f"max_rss {max_rss} " + f"mem_alloc {memory_allocated}"]], + "JOB": os.environ.get('CYLC_TASK_JOB'), + "TIME": "now" + } + + pclient = get_client(os.environ.get('CYLC_WORKFLOW_ID'), + timeout=comms_timeout) + + async def send_cylc_message(): + await pclient.async_request( + 'graphql', + {'request_string': graphql_mutation, + 'variables': graphql_request_variables}, + ) + + asyncio.run(send_cylc_message()) + sys.exit(0) + + +def get_resource_usage(process): + # If a task fails instantly, or finishes very quickly (< 1 second), + # the get config function doesn't have time to run + if (process.cgroup_memory_path is None + or process.cgroup_cpu_path is None + or process.memory_allocated_path is None): + return 0, 0, 0 + max_rss = parse_memory_file(process) + cpu_time = parse_cpu_file(process) + memory_allocated = parse_memory_allocated(process) + return max_rss, cpu_time, memory_allocated + + +def parse_memory_file(process: Process): + """Open the memory stat file and copy the appropriate data""" + + if process.cgroup_version == 2: + with open(process.cgroup_memory_path, 'r') as f: + for line in f: + if "anon" in line: + return int(''.join(filter(str.isdigit, line))) + else: + with open(process.cgroup_memory_path, 'r') as f: + for line in f: + if "total_rss" in line: + return int(''.join(filter(str.isdigit, line))) + + raise CylcError("Unable to find memory usage data") + + +def parse_memory_allocated(process: Process) -> int: + """Open the memory stat file and copy the appropriate data""" + if process.cgroup_version == 2: + cgroup_memory_path = Path(process.memory_allocated_path) + for _ in range(5): + with open(cgroup_memory_path / "memory.max", 'r') as f: + line = f.readline() + if "max" not in line: + return int(line) + cgroup_memory_path = cgroup_memory_path.parent + return 0 + else: # Memory limit not tracked for cgroups v1 + return 0 + + +def parse_cpu_file(process: Process) -> int: + """Open the CPU stat file and return the appropriate data""" + + if process.cgroup_version == 2: + with open(process.cgroup_cpu_path, 'r') as f: + for line in f: + if "usage_usec" in line: + return int(RE_INT.findall(line)[0]) // 1000 + raise ValueError("Unable to find cpu usage data") + else: + with open(process.cgroup_cpu_path, 'r') as f: + for line in f: + # Cgroups v1 uses nanoseconds + return int(line) // 1000000 + + raise CylcError("Unable to find cpu usage data") + + +def get_cgroup_version(cgroup_location: str, cgroup_name: str) -> int: + if Path.exists(Path(cgroup_location + cgroup_name)): + return 2 + elif Path.exists(Path(cgroup_location + "/memory" + cgroup_name)): + return 1 + else: + raise FileNotFoundError("Cgroup not found at " + + cgroup_location + cgroup_name) + + +def get_cgroup_name(): + """Get the cgroup directory for the current process""" + + # fugly hack to allow functional tests to use test data + if 'profiler_test_env_var' in os.environ: + return os.getenv('profiler_test_env_var') + + # Get the PID of the current process + pid = os.getpid() + try: + # Get the cgroup information for the current process + with open('/proc/' + str(pid) + '/cgroup', 'r') as f: + result = f.read() + result = PID_REGEX.search(result).group() + return result + except FileNotFoundError: + raise CylcError( + '/proc/' + str(pid) + '/cgroup not found') from None + + except AttributeError as err: + raise AttributeError("No cgroup found for process:", pid) from err + + +def get_cgroup_paths(location) -> Process: + cgroup_name = get_cgroup_name() + cgroup_version = get_cgroup_version(location, cgroup_name) + if cgroup_version == 2: + return Process( + cgroup_memory_path=location + + cgroup_name + "/" + "memory.stat", + cgroup_cpu_path=location + + cgroup_name + "/" + "cpu.stat", + memory_allocated_path=location + cgroup_name, + cgroup_version=cgroup_version, + ) + + elif cgroup_version == 1: + return Process( + cgroup_memory_path=location + "memory/" + + cgroup_name + "/memory.stat", + cgroup_cpu_path=location + "cpu/" + + cgroup_name + "/cpuacct.usage", + memory_allocated_path="", + cgroup_version=cgroup_version, + ) + raise CylcError("Unable to determine cgroup version") + + +def profile(_process: Process, delay, keep_looping=lambda: True): + # The infinite loop that will constantly poll the cgroup + # The lambda function is used to allow the loop to be stopped in unit tests + + while keep_looping(): + # Write cpu / memory usage data to disk + # CPU_TIME = parse_cpu_file(process.cgroup_cpu_path, version) + time.sleep(delay) + + +def get_option_parser() -> COP: + parser = COP( + __doc__, + comms=True, + argdoc=[ + ], + ) + parser.add_option( + "-i", type=int, + help="interval between query cycles in seconds", dest="delay") + parser.add_option( + "-m", type=str, help="Location of cgroups directory", + dest="cgroup_location") + + return parser + + +@cli_function(get_option_parser) +def main(_parser: COP, options) -> None: + """CLI main.""" + _main(options) + + +def _main(options) -> None: + # get cgroup information + process = get_cgroup_paths(options.cgroup_location) + + # Register the stop_profiler function with the signal library + _stop_profiler = partial(stop_profiler, process, options.comms_timeout) + signal.signal(signal.SIGINT, _stop_profiler) + signal.signal(signal.SIGHUP, _stop_profiler) + signal.signal(signal.SIGTERM, _stop_profiler) + + # run profiler run + profile(process, options.delay) + + +if __name__ == "__main__": + arg_parser = get_option_parser() + _main(arg_parser.parse_args([])) diff --git a/setup.cfg b/setup.cfg index afb4f8cc707..c26ffcf65f1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -168,6 +168,7 @@ cylc.command = ping = cylc.flow.scripts.ping:main play = cylc.flow.scripts.play:main poll = cylc.flow.scripts.poll:main + profile = cylc.flow.scripts.profiler:main psutils = cylc.flow.scripts.psutil:main reinstall = cylc.flow.scripts.reinstall:main release = cylc.flow.scripts.release:main diff --git a/tests/functional/jobscript/03-profiler.t b/tests/functional/jobscript/03-profiler.t new file mode 100644 index 00000000000..2a910e6705c --- /dev/null +++ b/tests/functional/jobscript/03-profiler.t @@ -0,0 +1,82 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- +# Cylc profile test +# NOTE: This test will run the Cylc profiler on the given test platform. +# The test platform may need to be configured for this to work (e.g. +# "cgroups path" may need to be set). +export REQUIRE_PLATFORM='runner:?(pbs|slurm) comms:tcp' +. "$(dirname "$0")/test_header" +set_test_number 8 + +create_test_global_config " +[platforms] + [[${CYLC_TEST_PLATFORM}]] + [[[profile]]] + activate = True + polling interval = 10 + [[localhost]] + [[[profile]]] + activate = True + polling interval = 10 + cgroups path = the/thing/that/should/not/be +" + +init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__' +#!Jinja2 + +[scheduling] + [[graph]] + R1 = the_good & the_bad? & the_ugly + +[runtime] + [[the_good]] + # this task should succeeded normally + platform = {{ environ['CYLC_TEST_PLATFORM'] }} + script = sleep 1 + [[the_bad]] + # this task should fail (it should still send profiling info) + platform = {{ environ['CYLC_TEST_PLATFORM'] }} + script = sleep 5; false + [[the_ugly]] + # this task should succeed despite the broken profiler configuration + platform = localhost + script = sleep 5 +__FLOW_CONFIG__ + +run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" +workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --debug --no-detach "${WORKFLOW_NAME}" + +# ensure the cpu and memory messages were received and that these messages +# were received before the succeeded message +log_scan "${TEST_NAME_BASE}-task-succeeded" \ + "${WORKFLOW_RUN_DIR}/log/scheduler/log" 1 0 \ + '1/the_good.*(received)cpu_time.*max_rss*' \ + '1/the_good.*(received)succeeded' + +# ensure the cpu and memory messages were received and that these messages +# were received before the failed message +log_scan "${TEST_NAME_BASE}-task-succeeded" \ + "${WORKFLOW_RUN_DIR}/log/scheduler/log" 1 0 \ + '1/the_bad.*(received)cpu_time.*max_rss*' \ + '1/the_bad.*(received)failed' + +# ensure this task succeeded despite the broken profiler configuration +grep_workflow_log_ok "${TEST_NAME_BASE}-broken" '1/the_ugly.*(received)succeeded' +grep_ok 'FileNotFoundError: Cgroup not found' "$(cylc cat-log "${WORKFLOW_NAME}//1/the_ugly" -f e -m p)" + +purge diff --git a/tests/functional/jobscript/03-profiler/flow.cylc b/tests/functional/jobscript/03-profiler/flow.cylc new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/functional/jobscript/04-profiler-e2e.t b/tests/functional/jobscript/04-profiler-e2e.t new file mode 100644 index 00000000000..8e960647233 --- /dev/null +++ b/tests/functional/jobscript/04-profiler-e2e.t @@ -0,0 +1,88 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- +# Cylc profile test +# NOTE: This test will run the Cylc profiler on the given test platform. +# The test platform may need to be configured for this to work (e.g. +# "cgroups path" may need to be set). + +. "$(dirname "$0")/test_header" + +if [[ "$OSTYPE" != "linux-gnu"* ]]; then + skip_all "Tests not compatibile with $OSTYPE" +fi + +set_test_number 7 + +mkdir -p "${PWD}/cgroups_test_data" + +echo 'anon 12345678' > cgroups_test_data/memory.stat +echo '123456789' > cgroups_test_data/memory.max +printf "blah blah 123456\nusage_usec 56781234" > cgroups_test_data/cpu.stat + +export profiler_test_env_var='/cgroups_test_data' +create_test_global_config " +[platforms] + [[localhost]] + [[[profile]]] + activate = True + cgroups path = ${PWD} +" + +init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__' +#!Jinja2 + +[scheduling] + [[graph]] + R1 = the_good & the_bad? & the_ugly + +[runtime] + [[the_good]] + # this task should succeeded normally + platform = localhost + script = sleep 5 + [[the_bad]] + # this task should fail (it should still send profiling info) + platform = localhost + script = sleep 5; false + [[the_ugly]] + # this task should succeed despite the broken profiler configuration + platform = localhost + script = sleep 1 +__FLOW_CONFIG__ + +run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" +workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --debug --no-detach "${WORKFLOW_NAME}" + +# ensure the cpu and memory messages were received and that these messages +# were received before the succeeded message +log_scan "${TEST_NAME_BASE}-task-succeeded" \ + "${WORKFLOW_RUN_DIR}/log/scheduler/log" 1 0 \ + '1/the_good.*(received)cpu_time.*max_rss*' \ + '1/the_good.*(received)succeeded' + +# ensure the cpu and memory messages were received and that these messages +# were received before the failed message +log_scan "${TEST_NAME_BASE}-task-succeeded" \ + "${WORKFLOW_RUN_DIR}/log/scheduler/log" 1 0 \ + '1/the_bad.*(received)cpu_time.*max_rss*' \ + '1/the_bad.*failed' + +# ensure this task succeeded despite the broken profiler configuration +grep_workflow_log_ok "${TEST_NAME_BASE}-broken" '1/the_ugly.*(received)succeeded' + +purge diff --git a/tests/functional/jobscript/04-profiler-e2e/flow.cylc b/tests/functional/jobscript/04-profiler-e2e/flow.cylc new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/unit/scripts/test_profiler.py b/tests/unit/scripts/test_profiler.py new file mode 100644 index 00000000000..6ec020c66e8 --- /dev/null +++ b/tests/unit/scripts/test_profiler.py @@ -0,0 +1,343 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Tests for functions contained in cylc.flow.scripts.profiler +from cylc.flow.scripts.profiler import (parse_memory_file, + parse_cpu_file, + parse_memory_allocated, + get_cgroup_name, + get_cgroup_version, + get_cgroup_paths, + get_resource_usage, + stop_profiler, + profile, + Process) +import pytest +from unittest import mock +from cylc.flow.exceptions import CylcError + + +def test_stop_profiler(mocker, monkeypatch, tmpdir): + monkeypatch.setenv('CYLC_WORKFLOW_ID', "test_value") + + class MockedClient: + def __init__(self, *a, **k): + pass + + async def async_request(self, *a, **k): + pass + + mocker.patch("cylc.flow.scripts.profiler.get_client", MockedClient) + + mem_file = tmpdir.join("memory_file.txt") + mem_file.write('total_rss 1234') + cpu_file = tmpdir.join("cpu_file.txt") + cpu_file.write('5678') + mem_allocated_file = tmpdir.join("memory_allocated.txt") + mem_allocated_file.write('99999') + + process_object = Process( + cgroup_memory_path=mem_file, + cgroup_cpu_path=cpu_file, + memory_allocated_path=mem_allocated_file, + cgroup_version=1) + with pytest.raises(SystemExit) as excinfo: + stop_profiler(process_object, 1) + + assert excinfo.type == SystemExit + assert excinfo.value.code == 0 + + +def test_get_resource_usage(mocker, monkeypatch, tmpdir): + monkeypatch.setenv('CYLC_WORKFLOW_ID', "test_value") + + process_object = Process( + cgroup_memory_path=None, + cgroup_cpu_path=None, + memory_allocated_path=None, + cgroup_version=1) + + max_rss, cpu_time, memory_allocated = get_resource_usage(process_object) + + assert max_rss == 0 + assert cpu_time == 0 + assert memory_allocated == 0 + + +def test_parse_memory_file(mocker, tmpdir): + + mem_file_v1 = tmpdir.join("memory_file_v1.txt") + mem_file_v1.write('total_rss=1024') + mem_file_v2 = tmpdir.join("memory_file_v2.txt") + mem_file_v2.write('anon=666') + cpu_file = tmpdir.join("cpu_file.txt") + cpu_file.write('5678') + mem_allocated_file = tmpdir.join("memory_allocated.txt") + mem_allocated_file.write('99999') + + good_process_object_v1 = Process( + cgroup_memory_path=mem_file_v1, + cgroup_cpu_path=cpu_file, + memory_allocated_path=mem_allocated_file, + cgroup_version=1) + good_process_object_v2 = Process( + cgroup_memory_path=mem_file_v2, + cgroup_cpu_path=cpu_file, + memory_allocated_path=mem_allocated_file, + cgroup_version=2) + bad_process_object = Process( + cgroup_memory_path='', + cgroup_cpu_path='', + memory_allocated_path='', + cgroup_version=1) + + with pytest.raises(FileNotFoundError): + parse_memory_file(bad_process_object) + + # Test the parse_memory_file function + assert parse_memory_file(good_process_object_v1) == 1024 + assert parse_memory_file(good_process_object_v2) == 666 + + +def test_parse_cpu_file(mocker, tmpdir): + + mem_file = tmpdir.join("memory_file.txt") + mem_file.write('1024') + cpu_file_v1_good = tmpdir.join("cpu_file_v1_good.txt") + cpu_file_v1_good.write('1234567890') + cpu_file_v1_bad = tmpdir.join("cpu_file_v1_bad.txt") + cpu_file_v1_bad.write("I'm your dream, mind ashtray") + cpu_file_v2_good = tmpdir.join("cpu_file_v2_good.txt") + cpu_file_v2_good.write('usage_usec=1234567890') + cpu_file_v2_bad = tmpdir.join("cpu_file_v2_bad.txt") + cpu_file_v2_bad.write('Give me fuel, give me fire, ' + 'give me that which I desire') + mem_allocated_file = tmpdir.join("memory_allocated.txt") + mem_allocated_file.write('99999') + + good_process_object_v1 = Process( + cgroup_memory_path=mem_file, + cgroup_cpu_path=cpu_file_v1_good, + memory_allocated_path=mem_allocated_file, + cgroup_version=1) + good_process_object_v2 = Process( + cgroup_memory_path=mem_file, + cgroup_cpu_path=cpu_file_v2_good, + memory_allocated_path=mem_allocated_file, + cgroup_version=2) + bad_process_object_v1_1 = Process( + cgroup_memory_path='', + cgroup_cpu_path='', + memory_allocated_path='', + cgroup_version=1) + bad_process_object_v1_2 = Process( + cgroup_memory_path=mem_file, + cgroup_cpu_path=cpu_file_v1_bad, + memory_allocated_path=mem_allocated_file, + cgroup_version=1) + bad_process_object_v2 = Process( + cgroup_memory_path=mem_file, + cgroup_cpu_path=cpu_file_v2_bad, + memory_allocated_path=mem_allocated_file, + cgroup_version=2) + + assert parse_cpu_file(good_process_object_v1) == 1234 + assert parse_cpu_file(good_process_object_v2) == 1234567 + + with pytest.raises(FileNotFoundError): + parse_cpu_file(bad_process_object_v1_1) + with pytest.raises(ValueError): + parse_cpu_file(bad_process_object_v1_2) + with pytest.raises(ValueError): + parse_cpu_file(bad_process_object_v2) + + +def test_get_cgroup_name(mocker): + + mock_file = mocker.mock_open(read_data="0::bad/test/cgroup/place") + mocker.patch("builtins.open", mock_file) + with pytest.raises(AttributeError): + get_cgroup_name() + + mock_file = mocker.mock_open(read_data="0::good/cgroup/place/2222222") + mocker.patch("builtins.open", mock_file) + assert get_cgroup_name() == "good/cgroup/place/2222222" + + +def test_parse_memory_allocated(tmp_path_factory): + good_mem_dir = tmp_path_factory.mktemp("mem_dir") + mem_allocated_file = good_mem_dir / "memory.max" + mem_allocated_file.write_text('99999') + + # We currently do not track memory allocated for cgroups v1 + good_process_object_v1 = Process( + cgroup_memory_path='', + cgroup_cpu_path='', + memory_allocated_path=str(good_mem_dir), + cgroup_version=1) + + good_process_object_v2 = Process( + cgroup_memory_path='', + cgroup_cpu_path='', + memory_allocated_path=str(good_mem_dir), + cgroup_version=2) + + bad_process_object_v2_1 = Process( + cgroup_memory_path='', + cgroup_cpu_path='', + memory_allocated_path='/', + cgroup_version=2) + + assert parse_memory_allocated(good_process_object_v1) == 0 + assert parse_memory_allocated(good_process_object_v2) == 99999 + with pytest.raises(FileNotFoundError): + parse_memory_file(bad_process_object_v2_1) + + # Nested directories with 'max' value + base_dir = tmp_path_factory.mktemp("base") + + dir_1 = base_dir / "dir_1" + dir_1.mkdir() + mem_file_1 = dir_1 / "memory.max" + mem_file_1.write_text("max") + + dir_2 = dir_1 / "dir_2" + dir_2.mkdir() + mem_file_2 = dir_2 / "memory.max" + mem_file_2.write_text("max") + + dir_3 = dir_2 / "dir_3" + dir_3.mkdir() + mem_file_3 = dir_3 / "memory.max" + mem_file_3.write_text("max") + + dir_4 = dir_3 / "dir_4" + dir_4.mkdir() + mem_file_4 = dir_4 / "memory.max" + mem_file_4.write_text("max") + + dir_5 = dir_4 / "dir_5" + dir_5.mkdir() + mem_file_5 = dir_5 / "memory.max" + mem_file_5.write_text("max") + + bad_process_object_v2_2 = Process( + cgroup_memory_path='', + cgroup_cpu_path='', + memory_allocated_path=str(dir_5), + cgroup_version=2) + + assert parse_memory_allocated(bad_process_object_v2_2) == 0 + + +def test_get_cgroup_name_file_not_found(mocker): + + def mock_os_pid(): + return 'The Thing That Should Not Be' + + mocker.patch("os.getpid", mock_os_pid) + with pytest.raises(CylcError): + get_cgroup_name() + + +def test_get_cgroup_version(mocker): + + # Mock the Path.exists function call to return True + mocker.patch("pathlib.Path.exists", return_value=True) + assert get_cgroup_version('stuff/in/place', + 'more_stuff') == 2 + + with mock.patch('pathlib.Path.exists', side_effect=[False, True]): + assert get_cgroup_version('stuff/in/place', + 'more_stuff') == 1 + + # Mock the Path.exists function call to return False + mocker.patch("pathlib.Path.exists", return_value=False) + with pytest.raises(FileNotFoundError): + get_cgroup_version('stuff/in/other/place', + 'things') + + +def test_get_cgroup_paths(mocker): + mocker.patch("cylc.flow.scripts.profiler.get_cgroup_name", + return_value='test_name') + mocker.patch("cylc.flow.scripts.profiler.get_cgroup_version", + return_value=2) + process = get_cgroup_paths("test_location/") + assert process.cgroup_memory_path == "test_location/test_name/memory.stat" + assert process.cgroup_cpu_path == "test_location/test_name/cpu.stat" + + mocker.patch("cylc.flow.scripts.profiler.get_cgroup_name", + return_value='test_name') + mocker.patch("cylc.flow.scripts.profiler.get_cgroup_version", + return_value=1) + + process = get_cgroup_paths("test_location/") + assert (process.cgroup_memory_path == + "test_location/memory/test_name/memory.stat") + assert (process.cgroup_cpu_path == + "test_location/cpu/test_name/cpuacct.usage") + + mocker.patch("cylc.flow.scripts.profiler.get_cgroup_name", + return_value='test_name') + mocker.patch("cylc.flow.scripts.profiler.get_cgroup_version", + return_value=3) + with pytest.raises(CylcError): + get_cgroup_paths("test_location/") + + +def test_profile_data(mocker): + # This test should run without error + mocker.patch("cylc.flow.scripts.profiler.get_cgroup_name", + return_value='test_name') + mocker.patch("cylc.flow.scripts.profiler.get_cgroup_version", + return_value=2) + process = get_cgroup_paths("test_location/") + + mock_file = mocker.mock_open(read_data="") + mocker.patch("builtins.open", mock_file) + mocker.patch("cylc.flow.scripts.profiler.parse_memory_file", + return_value=0) + mocker.patch("cylc.flow.scripts.profiler.parse_cpu_file", + return_value=2048) + run_once = mock.Mock(side_effect=[True, False]) + profile(process, 1, run_once) + + +@pytest.fixture +def options(mocker): + opts = mocker.Mock() + opts.cgroup_location = "/fake/path" + opts.comms_timeout = 10 + opts.delay = 1 + return opts + + +def test_main(mocker, options): + mock_get_cgroup_paths = mocker.patch( + "cylc.flow.scripts.profiler.get_cgroup_paths" + ) + mock_signal = mocker.patch("cylc.flow.scripts.profiler.signal.signal") + mock_profile = mocker.patch("cylc.flow.scripts.profiler.profile") + + mock_get_cgroup_paths.return_value = mocker.Mock() + + from cylc.flow.scripts.profiler import _main + _main(options) + + mock_get_cgroup_paths.assert_called_once_with("/fake/path") + assert mock_signal.call_count == 3 + mock_profile.assert_called_once() diff --git a/tests/unit/test_job_file.py b/tests/unit/test_job_file.py index 7e6b7982519..3ccd8d2923c 100644 --- a/tests/unit/test_job_file.py +++ b/tests/unit/test_job_file.py @@ -399,11 +399,21 @@ def test_write_task_environment(): 'CYLC_TASK_NAMESPACE_HIERARCHY="baa moo"\n export ' 'CYLC_TASK_TRY_NUMBER=1\n export ' 'CYLC_TASK_FLOW_NUMBERS=1\n export ' + 'CYLC_PROFILE=true\n export ' + 'CYLC_CGROUP=exit_light\n export ' + 'CYLC_POLLING_INTERVAL=1\n export ' 'CYLC_TASK_PARAM_duck="quack"\n export ' 'CYLC_TASK_PARAM_mouse="squeak"\n ' 'CYLC_TASK_WORK_DIR_BASE=\'farm_noises/work_d\'\n}') job_conf = { - "platform": {'communication method': 'ssh'}, + "platform": { + 'communication method': 'ssh', + 'profile': { + "activate": "true", + "cgroups path": 'exit_light', + "polling interval": 1 + } + }, "job_d": "1/moo/01", "namespace_hierarchy": ["baa", "moo"], "dependencies": ['moo', 'neigh', 'quack'],