Skip to content

Commit aa5e37d

Browse files
Adding profiler unit tests
1 parent 5da9f4c commit aa5e37d

File tree

2 files changed

+135
-79
lines changed

2 files changed

+135
-79
lines changed

cylc/flow/scripts/profiler.py

Lines changed: 65 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1717
"""cylc profiler [OPTIONS]
1818
19-
Profiler which periodically polls PBS cgroups to track
19+
Profiler which periodically polls cgroups to track
2020
the resource usage of jobs running on the node.
2121
"""
2222

@@ -53,14 +53,14 @@ def get_option_parser() -> COP:
5353

5454

5555
@cli_function(get_option_parser)
56-
def main(parser, options):
56+
def main(parser: COP, options) -> None:
5757
"""CLI main."""
5858
# Register the stop_profiler function with the signal library
5959
signal.signal(signal.SIGINT, stop_profiler)
6060
signal.signal(signal.SIGHUP, stop_profiler)
6161
signal.signal(signal.SIGTERM, stop_profiler)
6262

63-
profile(options)
63+
get_config(options)
6464

6565

6666
@dataclass
@@ -77,40 +77,43 @@ def stop_profiler(*args):
7777
sys.exit(0)
7878

7979

80-
def parse_memory_file(process):
80+
def parse_memory_file(cgroup_memory_path):
8181
"""Open the memory stat file and copy the appropriate data"""
8282

83-
with open(process.cgroup_memory_path, 'r') as f:
83+
with open(cgroup_memory_path, 'r') as f:
8484
for line in f:
8585
return int(line) // 1024
8686

8787

88-
def parse_cpu_file(process, cgroup_version):
88+
def parse_cpu_file(cgroup_cpu_path, cgroup_version):
8989
"""Open the memory stat file and return the appropriate data"""
9090

9191
if cgroup_version == 1:
92-
with open(process.cgroup_cpu_path, 'r') as f:
92+
with open(cgroup_cpu_path, 'r') as f:
9393
for line in f:
9494
if "usage_usec" in line:
9595
return int(RE_INT.findall(line)[0]) // 1000
9696
elif cgroup_version == 2:
97-
with open(process.cgroup_cpu_path, 'r') as f:
97+
with open(cgroup_cpu_path, 'r') as f:
9898
for line in f:
9999
# Cgroups v2 uses nanoseconds
100100
return int(line) / 1000000
101-
else:
102-
raise FileNotFoundError("cpu usage files not found")
103101

104102

105103
def write_data(data, filename):
106-
try:
107-
with open(filename, 'w') as f:
108-
f.write(data + "\n")
109-
except IOError as err:
110-
raise IOError("Unable to write data to file:" + filename) from err
104+
with open(filename, 'w') as f:
105+
f.write(data + "\n")
106+
111107

108+
def get_cgroup_version(cgroup_location: Path, cgroup_name: Path) -> int:
109+
# HPC uses cgroups v2 and SPICE uses cgroups v1
110+
if Path.exists(Path(cgroup_location + cgroup_name)):
111+
return 1
112+
elif Path.exists(Path(cgroup_location + "/memory" + cgroup_name)):
113+
return 2
112114

113-
def get_cgroup_dir():
115+
116+
def get_cgroup_name():
114117
"""Get the cgroup directory for the current process"""
115118
# Get the PID of the current process
116119
pid = os.getpid()
@@ -121,79 +124,62 @@ def get_cgroup_dir():
121124
result = PID_REGEX.search(result).group()
122125
return result
123126
except FileNotFoundError as err:
124-
print(err)
125-
print('/proc/' + str(pid) + '/cgroup not found')
126-
exit()
127+
raise FileNotFoundError(
128+
'/proc/' + str(pid) + '/cgroup not found') from err
129+
127130
except AttributeError as err:
128-
print(err)
129-
print("No cgroup found for process")
130-
exit()
131+
raise AttributeError("No cgroup found for process:", pid) from err
131132

132133

133-
def profile(args):
134-
# Find the cgroup that this process is running in.
135-
# Cylc will put this profiler in the same cgroup
136-
# as the job it is profiling
137-
cgroup_name = get_cgroup_dir()
134+
def get_cgroup_paths(version, location, name):
138135

139-
# HPC uses cgroups v2 and SPICE uses cgroups v1
140-
cgroup_version = None
136+
if version == 1:
137+
return Process(
138+
cgroup_memory_path=location +
139+
name + "/" + "memory.peak",
140+
cgroup_cpu_path=location +
141+
name + "/" + "cpu.stat")
142+
143+
elif version == 2:
144+
return Process(
145+
cgroup_memory_path=location + "/memory" +
146+
name + "/memory.max_usage_in_bytes",
147+
cgroup_cpu_path=location + "/cpu" +
148+
name + "/cpuacct.usage")
141149

142-
if Path.exists(Path(args.cgroup_location + cgroup_name)):
143-
cgroup_version = 1
144-
elif Path.exists(Path(args.cgroup_location + "/memory" + cgroup_name)):
145-
cgroup_version = 2
146-
else:
147-
raise FileNotFoundError("cgroups not found:" + cgroup_name)
148150

151+
def profile(process, version, delay, keep_looping=lambda: True):
152+
# The infinite loop that will constantly poll the cgroup
153+
# The lambda function is used to allow the loop to be stopped in unit tests
149154
peak_memory = 0
150-
processes = []
155+
while keep_looping():
156+
# Write cpu / memory usage data to disk
157+
cpu_time = parse_cpu_file(process.cgroup_cpu_path, version)
158+
write_data(str(cpu_time), "cpu_time")
151159

152-
if cgroup_version == 1:
153-
try:
154-
processes.append(Process(
155-
cgroup_memory_path=args.cgroup_location +
156-
cgroup_name + "/" + "memory.peak",
157-
cgroup_cpu_path=args.cgroup_location +
158-
cgroup_name + "/" + "cpu.stat"))
159-
except FileNotFoundError as err:
160-
print(err)
161-
raise FileNotFoundError("cgroups not found:"
162-
+ args.cgroup_location) from err
163-
elif cgroup_version == 2:
164-
try:
165-
processes.append(Process(
166-
cgroup_memory_path=args.cgroup_location + "/memory" +
167-
cgroup_name + "/memory.max_usage_in_bytes",
168-
cgroup_cpu_path=args.cgroup_location + "/cpu" +
169-
cgroup_name + "/cpuacct.usage"))
170-
except FileNotFoundError as err:
171-
print(err)
172-
raise FileNotFoundError("cgroups not found:" +
173-
args.cgroup_location) from err
174-
175-
while True:
176-
failures = 0
177-
# Write memory usage data
178-
for process in processes:
179-
# Only save Max RSS to disk if it is above the previous value
180-
try:
181-
memory = parse_memory_file(process)
182-
if memory > peak_memory:
183-
peak_memory = memory
184-
write_data(str(peak_memory), "max_rss")
185-
cpu_time = parse_cpu_file(process, cgroup_version)
186-
write_data(str(cpu_time), "cpu_time")
187-
188-
except (OSError, ValueError) as error:
189-
failures += 1
190-
if failures > 5:
191-
raise OSError("cgroup polling failure", error) from error
192-
193-
time.sleep(args.delay)
160+
memory = parse_memory_file(process.cgroup_memory_path)
161+
# Only save Max RSS to disk if it is above the previous value
162+
if memory > peak_memory:
163+
peak_memory = memory
164+
write_data(str(peak_memory), "max_rss")
165+
166+
time.sleep(delay)
167+
168+
169+
def get_config(args):
170+
# Find the cgroup that this process is running in.
171+
# Cylc will put this profiler in the same cgroup
172+
# as the job it is profiling
173+
cgroup_name = get_cgroup_name()
174+
cgroup_version = get_cgroup_version(args.cgroup_location, cgroup_name)
175+
process = get_cgroup_paths(cgroup_version,
176+
args.cgroups_location,
177+
cgroup_name)
178+
179+
profile(process, cgroup_version, args.delay)
194180

195181

196182
if __name__ == "__main__":
197183

198184
arg_parser = get_option_parser()
199-
profile(arg_parser.parse_args([]))
185+
get_config(arg_parser.parse_args([]))

tests/unit/scripts/test_profiler.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,3 +208,73 @@ def test_stop_profiler():
208208
stop_profiler()
209209
assert pytest_wrapped_e.type == SystemExit
210210
assert pytest_wrapped_e.value.code == 0
211+
212+
213+
def test_get_cgroup_version(mocker):
214+
215+
# Mock the Path.exists function call to return True
216+
mocker.patch("pathlib.Path.exists", return_value=True)
217+
assert get_cgroup_version('stuff/in/place',
218+
'more_stuff') == 1
219+
220+
with mock.patch('pathlib.Path.exists', side_effect=[False, True]):
221+
assert get_cgroup_version('stuff/in/place',
222+
'more_stuff') == 2
223+
224+
# Mock the Path.exists function call to return False
225+
mocker.patch("pathlib.Path.exists", return_value=False)
226+
assert get_cgroup_version('stuff/in/other/place',
227+
'things') is None
228+
229+
230+
def test_get_cgroup_paths():
231+
232+
process = get_cgroup_paths(1, "test_location/",
233+
"test_name")
234+
assert process.cgroup_memory_path == "test_location/test_name/memory.peak"
235+
assert process.cgroup_cpu_path == "test_location/test_name/cpu.stat"
236+
237+
process = get_cgroup_paths(2, "test_location",
238+
"/test_name")
239+
assert (process.cgroup_memory_path ==
240+
"test_location/memory/test_name/memory.max_usage_in_bytes")
241+
assert (process.cgroup_cpu_path ==
242+
"test_location/cpu/test_name/cpuacct.usage")
243+
244+
245+
def test_profile_cpu(mocker):
246+
process = get_cgroup_paths(1, "test_location/",
247+
"test_name")
248+
249+
mock_file = mocker.mock_open(read_data="")
250+
mocker.patch("builtins.open", mock_file)
251+
mocker.patch("cylc.flow.scripts.profiler.parse_memory_file",
252+
return_value=0)
253+
mocker.patch("cylc.flow.scripts.profiler.parse_cpu_file",
254+
return_value=2048)
255+
run_once = mock.Mock(side_effect=[True, False])
256+
profile(process, 1, 1, run_once)
257+
mock_file.assert_called_with("cpu_time", "w")
258+
259+
260+
def test_profile_max_rss(mocker):
261+
process = get_cgroup_paths(1,
262+
"test_location/",
263+
"test_name")
264+
265+
mock_file = mocker.mock_open(read_data="")
266+
mocker.patch("builtins.open", mock_file)
267+
mocker.patch("cylc.flow.scripts.profiler.parse_memory_file",
268+
return_value=1024)
269+
mocker.patch("cylc.flow.scripts.profiler.parse_cpu_file",
270+
return_value=2048)
271+
run_once = mock.Mock(side_effect=[True, False])
272+
profile(process, 1, 1, run_once)
273+
mock_file.assert_called_with("max_rss", "w")
274+
275+
276+
def test_stop_profiler():
277+
with pytest.raises(SystemExit) as pytest_wrapped_e:
278+
stop_profiler()
279+
assert pytest_wrapped_e.type == SystemExit
280+
assert pytest_wrapped_e.value.code == 0

0 commit comments

Comments
 (0)