-
Notifications
You must be signed in to change notification settings - Fork 95
CPU and Max RSS Analysis tools #6663
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ChrisPaulBennett
wants to merge
112
commits into
cylc:master
Choose a base branch
from
ChrisPaulBennett:cylc_profiler
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
112 commits
Select commit
Hold shift + click to select a range
246a4c1
Initial profiler implementation (non working)
ChrisPaulBennett 9727be9
CPU/Memory Logging working
ChrisPaulBennett 171f7ee
GH Actions: use explicit `bash` shell & other defaults
MetRonnie 7d50d0b
host-select: fix compatibility with force-condemned hosts
oliver-sanders 8608088
Update cylc/flow/cfgspec/globalcfg.py
oliver-sanders 060056b
actions: update build action to support python 3.13
oliver-sanders a0cdcd6
host-select: fix compatibility with force-condemned hosts
oliver-sanders 3d4a187
Update cylc/flow/cfgspec/globalcfg.py
oliver-sanders 3057edd
tests: convert unittest to pytest
oliver-sanders a6ecd29
tests/u: test_subprocpool.py::test_run_command_writes_to_err
oliver-sanders e5119b4
Time Series now working
ChrisPaulBennett 9f593fc
Profiler sends KB instead of bytes
ChrisPaulBennett 5f84c2f
Modifying unit tests
ChrisPaulBennett 47a4a98
host-select: fix compatibility with force-condemned hosts
oliver-sanders 356abff
Update cylc/flow/cfgspec/globalcfg.py
oliver-sanders 6ed1770
tests: convert unittest to pytest
oliver-sanders 4cfcffe
tests/u: test_subprocpool.py::test_run_command_writes_to_err
oliver-sanders 1e5b804
Linting
ChrisPaulBennett 38c31f5
Fail gracefully if cgroups cannot be found
ChrisPaulBennett 339af9c
tests: convert unittest to pytest
oliver-sanders 391559b
tests/u: test_subprocpool.py::test_run_command_writes_to_err
oliver-sanders b9c080c
Adding profiler unit tests
ChrisPaulBennett 7091711
adding pycharm files to .gitignore file
ChrisPaulBennett ad6b3a1
Fixing my terrible rebasing
ChrisPaulBennett 5bddeef
Merge remote-tracking branch 'refs/remotes/origin/master' into cylc_p…
ChrisPaulBennett afcdd81
MyPy Linting
ChrisPaulBennett 378dcb9
More unit tests
ChrisPaulBennett ca1e796
Linting
ChrisPaulBennett 58fae4f
Adding towncrier fragment
ChrisPaulBennett 64e9b2b
Review changes
ChrisPaulBennett 07348b7
Review changes
ChrisPaulBennett b485fd7
Review Changes
ChrisPaulBennett 5793b23
Review Changes
ChrisPaulBennett 73545ac
Review Changes
ChrisPaulBennett bf1b9c9
Review Changes
ChrisPaulBennett 30d4382
profiler: add functional test for cgroup profiling
oliver-sanders 49fcbc8
Review Changes
ChrisPaulBennett c63a250
Added polling interval configuration
ChrisPaulBennett 939e128
Update unit tests
ChrisPaulBennett 4928405
Added name to CONTRIBUTING.md
ChrisPaulBennett 66acd1f
Added name to .mailmap
ChrisPaulBennett 935fdc6
Merge remote-tracking branch 'Oliver/profiler' into cylc_profiler
ChrisPaulBennett cacf077
Fixed syntax error
ChrisPaulBennett 99f9ae5
Fixed syntax error
ChrisPaulBennett a542523
Fixed the issue where CPU / Max RSS data is not available in the even…
ChrisPaulBennett 1a63365
Update cylc/flow/cfgspec/globalcfg.py
ChrisPaulBennett 048606b
Refactored max rss and cpu time data flow
ChrisPaulBennett b05c38d
Updating unit tests
ChrisPaulBennett 80963cd
Linting
ChrisPaulBennett 5d3cb0c
Linting
ChrisPaulBennett f51794b
Refactoring so that the jq command is not used
ChrisPaulBennett 27a0879
Shellchecker linting
ChrisPaulBennett 6efc7cf
Linting
ChrisPaulBennett 9a5a4fa
Removing json usage
ChrisPaulBennett 5a620ed
Merge remote-tracking branch 'ChrisPaulBennett/cylc_profiler' into cy…
ChrisPaulBennett f43bd83
Merge branch 'refs/heads/master' into cylc_profiler
ChrisPaulBennett 8f7c419
Adding e2e functional tests
ChrisPaulBennett 6f8c3f3
Linting
ChrisPaulBennett 2e02286
Linting
ChrisPaulBennett 2923917
Fixing functional tests
ChrisPaulBennett ce4122b
Merge remote-tracking branch 'refs/remotes/origin/master' into cylc_p…
ChrisPaulBennett 15c29fa
Kill profiler more reliably
ChrisPaulBennett c09aa6f
Added unit test coverage
ChrisPaulBennett a29cf82
linting
ChrisPaulBennett 172b453
linting
ChrisPaulBennett 59200fc
Corrected cgroup versions
ChrisPaulBennett 35daea9
Merge branch 'refs/heads/master' into cylc_profiler
ChrisPaulBennett 9189ac3
Updating unit tests
ChrisPaulBennett 31f2c18
Updating unit tests
ChrisPaulBennett baaa1a6
Merge branch 'refs/heads/master' into cylc_profiler
ChrisPaulBennett 038348a
Updating unit tests
ChrisPaulBennett 2975ea4
Updating unit tests
ChrisPaulBennett 0448e70
Merge branch 'master' into cylc_profiler
ChrisPaulBennett 3cff4a1
Merge branch 'master' into cylc_profiler
ChrisPaulBennett c8f3ab5
Merge remote-tracking branch 'ChrisPaulBennett/cylc_profiler' into cy…
ChrisPaulBennett 5e994fb
updating .mailmap
ChrisPaulBennett 5c8585e
Updating .mailmap
ChrisPaulBennett 0c68cec
testing macos
ChrisPaulBennett 7eddcf2
testing macos
ChrisPaulBennett 2cd9991
testing macos
ChrisPaulBennett c1a5686
testing macos
ChrisPaulBennett 2bc9ec6
testing macos
ChrisPaulBennett 8ca760d
testing macos
ChrisPaulBennett ebbb8f0
testing macos
ChrisPaulBennett 6f65ebd
testing macos
ChrisPaulBennett 54553c1
testing macos
ChrisPaulBennett 556f3ba
testing macos
ChrisPaulBennett f70c1d3
testing macos
ChrisPaulBennett 1e93434
change how profiler is killed to be POSIX compliant
ChrisPaulBennett 797b8ae
Added recording of memory allocation
ChrisPaulBennett 5bfaf05
Updating unit tests
ChrisPaulBennett 3c32069
profiler: remove global variables
oliver-sanders 8592107
Refactored to global variables
ChrisPaulBennett 6c95914
Code review changes
ChrisPaulBennett e30a1a4
Linting
ChrisPaulBennett c52f7fb
Linting
ChrisPaulBennett 19ba481
Updating unit tests
ChrisPaulBennett 48bb9c4
Linting
ChrisPaulBennett fb0a156
Adding unit test coverage
ChrisPaulBennett ba94659
Adding unit test coverage
ChrisPaulBennett ed7f4fc
Adding unit test coverage
ChrisPaulBennett 5de1e27
Adding unit test coverage
ChrisPaulBennett 2762d56
Linting
ChrisPaulBennett e1978c1
Linting
ChrisPaulBennett 61f0ceb
Unit test coverage
ChrisPaulBennett 2b27c25
Linting
ChrisPaulBennett 31fa1fb
Merge remote-tracking branch 'refs/remotes/origin/master' into cylc_p…
ChrisPaulBennett f26ed40
Adding test coverage
ChrisPaulBennett 80c38d0
Linting
ChrisPaulBennett c54cff5
Implemented usage of CylcError
ChrisPaulBennett f5e5af8
Linting
ChrisPaulBennett 1598cd3
Fix functional tests
ChrisPaulBennett File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,4 +58,5 @@ github-actions[bot] <[email protected]> | |
| github-actions[bot] <[email protected]> GitHub Action | ||
| Diquan Jabbour <[email protected]> | ||
| Maxime Rio <[email protected]> | ||
| Christopher Bennett <[email protected]> ChrisPaulBennett <[email protected]> | ||
| Christopher Bennett <[email protected]> christopher.bennett <[email protected]> | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Adding CPU time and Max RSS to Analysis Tools |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,264 @@ | ||
| #!/usr/bin/env python3 | ||
| # THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. | ||
| # Copyright (C) NIWA & British Crown (Met Office) & Contributors. | ||
| # | ||
| # This program is free software: you can redistribute it and/or modify | ||
| # it under the terms of the GNU General Public License as published by | ||
| # the Free Software Foundation, either version 3 of the License, or | ||
| # (at your option) any later version. | ||
| # | ||
| # This program is distributed in the hope that it will be useful, | ||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| # GNU General Public License for more details. | ||
| # | ||
| # You should have received a copy of the GNU General Public License | ||
| # along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
| """cylc profiler [OPTIONS] | ||
|
|
||
| Profiler which periodically polls cgroups to track | ||
| the resource usage of jobs running on the node. | ||
| """ | ||
|
|
||
| import os | ||
| import re | ||
| import sys | ||
| import time | ||
| import signal | ||
| import asyncio | ||
|
|
||
| from pathlib import Path | ||
| from functools import partial | ||
| from dataclasses import dataclass | ||
|
|
||
| from cylc.flow.exceptions import CylcError | ||
| from cylc.flow.terminal import cli_function | ||
| from cylc.flow.option_parsers import CylcOptionParser as COP | ||
| from cylc.flow.network.client_factory import get_client | ||
|
|
||
|
|
||
| PID_REGEX = re.compile(r"([^:]*\d{6,}.*)") | ||
| RE_INT = re.compile(r'\d+') | ||
|
|
||
|
|
||
| @dataclass | ||
| class Process: | ||
| """Class for representing CPU and Memory usage of a process""" | ||
| cgroup_memory_path: str | ||
| cgroup_cpu_path: str | ||
| memory_allocated_path: str | ||
| cgroup_version: int | ||
|
|
||
|
|
||
| def stop_profiler(process, comms_timeout, *args): | ||
| """This function will be executed when the SIGINT signal is sent | ||
| to this process""" | ||
|
|
||
| max_rss, cpu_time, memory_allocated = get_resource_usage(process) | ||
|
|
||
| graphql_mutation = """ | ||
| mutation($WORKFLOWS: [WorkflowID]!, | ||
| $MESSAGES: [[String]], $JOB: String!, $TIME: String) { | ||
| message(workflows: $WORKFLOWS, messages:$MESSAGES, | ||
| taskJob:$JOB, eventTime:$TIME) { | ||
| result | ||
| } | ||
| } | ||
| """ | ||
|
|
||
| graphql_request_variables = { | ||
| "WORKFLOWS": [os.environ.get('CYLC_WORKFLOW_ID')], | ||
| "MESSAGES": [[ | ||
| "DEBUG", | ||
| f"cpu_time {cpu_time} " | ||
| f"max_rss {max_rss} " | ||
| f"mem_alloc {memory_allocated}"]], | ||
| "JOB": os.environ.get('CYLC_TASK_JOB'), | ||
| "TIME": "now" | ||
| } | ||
|
|
||
| pclient = get_client(os.environ.get('CYLC_WORKFLOW_ID'), | ||
| timeout=comms_timeout) | ||
|
|
||
| async def send_cylc_message(): | ||
| await pclient.async_request( | ||
| 'graphql', | ||
| {'request_string': graphql_mutation, | ||
| 'variables': graphql_request_variables}, | ||
| ) | ||
|
|
||
| asyncio.run(send_cylc_message()) | ||
| sys.exit(0) | ||
|
|
||
|
|
||
| def get_resource_usage(process): | ||
| # If a task fails instantly, or finishes very quickly (< 1 second), | ||
| # the get config function doesn't have time to run | ||
| if (process.cgroup_memory_path is None | ||
| or process.cgroup_cpu_path is None | ||
| or process.memory_allocated_path is None): | ||
| return 0, 0, 0 | ||
| max_rss = parse_memory_file(process) | ||
| cpu_time = parse_cpu_file(process) | ||
| memory_allocated = parse_memory_allocated(process) | ||
| return max_rss, cpu_time, memory_allocated | ||
|
|
||
|
|
||
| def parse_memory_file(process: Process): | ||
| """Open the memory stat file and copy the appropriate data""" | ||
|
|
||
| if process.cgroup_version == 2: | ||
| with open(process.cgroup_memory_path, 'r') as f: | ||
| for line in f: | ||
| if "anon" in line: | ||
| return int(''.join(filter(str.isdigit, line))) | ||
| else: | ||
| with open(process.cgroup_memory_path, 'r') as f: | ||
| for line in f: | ||
| if "total_rss" in line: | ||
| return int(''.join(filter(str.isdigit, line))) | ||
|
|
||
| raise CylcError("Unable to find memory usage data") | ||
|
|
||
|
|
||
| def parse_memory_allocated(process: Process) -> int: | ||
| """Open the memory stat file and copy the appropriate data""" | ||
| if process.cgroup_version == 2: | ||
| cgroup_memory_path = Path(process.memory_allocated_path) | ||
| for _ in range(5): | ||
| with open(cgroup_memory_path / "memory.max", 'r') as f: | ||
| line = f.readline() | ||
| if "max" not in line: | ||
| return int(line) | ||
| cgroup_memory_path = cgroup_memory_path.parent | ||
| return 0 | ||
| else: # Memory limit not tracked for cgroups v1 | ||
| return 0 | ||
|
|
||
|
|
||
| def parse_cpu_file(process: Process) -> int: | ||
| """Open the CPU stat file and return the appropriate data""" | ||
|
|
||
| if process.cgroup_version == 2: | ||
| with open(process.cgroup_cpu_path, 'r') as f: | ||
| for line in f: | ||
| if "usage_usec" in line: | ||
| return int(RE_INT.findall(line)[0]) // 1000 | ||
| raise ValueError("Unable to find cpu usage data") | ||
| else: | ||
| with open(process.cgroup_cpu_path, 'r') as f: | ||
| for line in f: | ||
| # Cgroups v1 uses nanoseconds | ||
| return int(line) // 1000000 | ||
|
|
||
| raise CylcError("Unable to find cpu usage data") | ||
|
|
||
|
|
||
| def get_cgroup_version(cgroup_location: str, cgroup_name: str) -> int: | ||
| if Path.exists(Path(cgroup_location + cgroup_name)): | ||
| return 2 | ||
| elif Path.exists(Path(cgroup_location + "/memory" + cgroup_name)): | ||
| return 1 | ||
| else: | ||
| raise FileNotFoundError("Cgroup not found at " + | ||
| cgroup_location + cgroup_name) | ||
|
|
||
|
|
||
| def get_cgroup_name(): | ||
| """Get the cgroup directory for the current process""" | ||
|
|
||
| # fugly hack to allow functional tests to use test data | ||
ChrisPaulBennett marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if 'profiler_test_env_var' in os.environ: | ||
| return os.getenv('profiler_test_env_var') | ||
|
|
||
| # Get the PID of the current process | ||
| pid = os.getpid() | ||
| try: | ||
| # Get the cgroup information for the current process | ||
| with open('/proc/' + str(pid) + '/cgroup', 'r') as f: | ||
| result = f.read() | ||
| result = PID_REGEX.search(result).group() | ||
| return result | ||
| except FileNotFoundError: | ||
| raise CylcError( | ||
| '/proc/' + str(pid) + '/cgroup not found') from None | ||
|
|
||
| except AttributeError as err: | ||
| raise AttributeError("No cgroup found for process:", pid) from err | ||
|
|
||
|
|
||
| def get_cgroup_paths(location) -> Process: | ||
| cgroup_name = get_cgroup_name() | ||
| cgroup_version = get_cgroup_version(location, cgroup_name) | ||
| if cgroup_version == 2: | ||
| return Process( | ||
| cgroup_memory_path=location + | ||
| cgroup_name + "/" + "memory.stat", | ||
| cgroup_cpu_path=location + | ||
| cgroup_name + "/" + "cpu.stat", | ||
| memory_allocated_path=location + cgroup_name, | ||
| cgroup_version=cgroup_version, | ||
| ) | ||
|
|
||
| elif cgroup_version == 1: | ||
| return Process( | ||
| cgroup_memory_path=location + "memory/" + | ||
| cgroup_name + "/memory.stat", | ||
| cgroup_cpu_path=location + "cpu/" + | ||
| cgroup_name + "/cpuacct.usage", | ||
| memory_allocated_path="", | ||
| cgroup_version=cgroup_version, | ||
| ) | ||
| raise CylcError("Unable to determine cgroup version") | ||
|
|
||
|
|
||
| def profile(_process: Process, delay, keep_looping=lambda: True): | ||
| # The infinite loop that will constantly poll the cgroup | ||
| # The lambda function is used to allow the loop to be stopped in unit tests | ||
|
|
||
| while keep_looping(): | ||
| # Write cpu / memory usage data to disk | ||
| # CPU_TIME = parse_cpu_file(process.cgroup_cpu_path, version) | ||
| time.sleep(delay) | ||
|
|
||
|
|
||
| def get_option_parser() -> COP: | ||
| parser = COP( | ||
| __doc__, | ||
| comms=True, | ||
| argdoc=[ | ||
| ], | ||
| ) | ||
| parser.add_option( | ||
| "-i", type=int, | ||
| help="interval between query cycles in seconds", dest="delay") | ||
| parser.add_option( | ||
| "-m", type=str, help="Location of cgroups directory", | ||
| dest="cgroup_location") | ||
|
|
||
| return parser | ||
|
|
||
|
|
||
| @cli_function(get_option_parser) | ||
| def main(_parser: COP, options) -> None: | ||
| """CLI main.""" | ||
| _main(options) | ||
|
|
||
|
|
||
| def _main(options) -> None: | ||
| # get cgroup information | ||
| process = get_cgroup_paths(options.cgroup_location) | ||
|
|
||
| # Register the stop_profiler function with the signal library | ||
| _stop_profiler = partial(stop_profiler, process, options.comms_timeout) | ||
| signal.signal(signal.SIGINT, _stop_profiler) | ||
| signal.signal(signal.SIGHUP, _stop_profiler) | ||
| signal.signal(signal.SIGTERM, _stop_profiler) | ||
|
|
||
| # run profiler run | ||
| profile(process, options.delay) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| arg_parser = get_option_parser() | ||
| _main(arg_parser.parse_args([])) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.