|
4 | 4 | # SPDX-License-Identifier: (MIT) |
5 | 5 |
|
6 | 6 | import collections |
| 7 | +import importlib.util |
| 8 | +import inspect |
| 9 | +import os |
| 10 | +import shutil |
| 11 | +import sys |
7 | 12 |
|
| 13 | +import flux_metrics_api.utils as utils |
8 | 14 | from flux_metrics_api.logger import logger |
9 | 15 |
|
10 | 16 | try: |
@@ -57,29 +63,125 @@ def node_free_count(): |
57 | 63 | return len(listing.free.nodelist) |
58 | 64 |
|
59 | 65 |
|
60 | | -def update_queue_metrics(): |
| 66 | +def get_queue_metrics(): |
61 | 67 | """ |
62 | 68 | Update metrics for counts of jobs in the queue |
| 69 | +
|
| 70 | + See https://github.com/flux-framework/flux-core/blob/master/src/common/libjob/job.h#L45-L53 |
| 71 | + for identifiers. |
63 | 72 | """ |
64 | 73 | jobs = flux.job.job_list(handle) |
65 | 74 | listing = jobs.get() |
66 | 75 |
|
67 | 76 | # Organize based on states |
68 | 77 | states = [x["state"] for x in listing["jobs"]] |
69 | | - print(states) |
70 | 78 | counter = collections.Counter(states) |
71 | 79 |
|
| 80 | + # Lookup of state name to integer |
| 81 | + lookup = { |
| 82 | + "new": 1, |
| 83 | + "depend": 2, |
| 84 | + "priority": 4, |
| 85 | + "sched": 8, |
| 86 | + "run": 16, |
| 87 | + "cleanup": 32, |
| 88 | + "inactive": 64, |
| 89 | + } |
| 90 | + |
72 | 91 | # This is how to get states |
73 | | - # TODO make an endpoint for each, if this works at all :/ |
74 | | - for stateint, _ in counter.items(): |
75 | | - flux.job.info.statetostr(stateint) |
| 92 | + counts = {} |
| 93 | + for stateint, count in counter.items(): |
| 94 | + state = flux.job.info.statetostr(stateint) |
| 95 | + counts[state] = count |
| 96 | + for state in lookup: |
| 97 | + if state not in counts: |
| 98 | + counts[state] = 0 |
| 99 | + return counts |
| 100 | + |
| 101 | + |
| 102 | +# Queue states |
| 103 | + |
| 104 | + |
| 105 | +def job_queue_state_new_count(): |
| 106 | + return get_queue_metrics()["new"] |
| 107 | + |
| 108 | + |
| 109 | +def job_queue_state_depend_count(): |
| 110 | + return get_queue_metrics()["depend"] |
| 111 | + |
| 112 | + |
| 113 | +def job_queue_state_priority_count(): |
| 114 | + return get_queue_metrics()["priority"] |
| 115 | + |
76 | 116 |
|
| 117 | +def job_queue_state_sched_count(): |
| 118 | + return get_queue_metrics()["sched"] |
77 | 119 |
|
78 | | -# Organize metrics by name so we can eventually support export of custom set (if needed) |
| 120 | + |
| 121 | +def job_queue_state_run_count(): |
| 122 | + return get_queue_metrics()["run"] |
| 123 | + |
| 124 | + |
| 125 | +def job_queue_state_cleanup_count(): |
| 126 | + return get_queue_metrics()["cleanup"] |
| 127 | + |
| 128 | + |
| 129 | +def job_queue_state_inactive_count(): |
| 130 | + return get_queue_metrics()["inactive"] |
| 131 | + |
| 132 | + |
| 133 | +def add_custom_metrics(metric_file): |
| 134 | + """ |
| 135 | + Add custom metrics to the server |
| 136 | + """ |
| 137 | + global metrics |
| 138 | + tmpdir = utils.get_tmpdir() |
| 139 | + |
| 140 | + # Copy our metrics file there and do relative import |
| 141 | + custom_metrics_file = os.path.join(tmpdir, "custom_metrics.py") |
| 142 | + shutil.copyfile(metric_file, custom_metrics_file) |
| 143 | + spec = importlib.util.spec_from_file_location("custom_metrics", custom_metrics_file) |
| 144 | + cm = importlib.util.module_from_spec(spec) |
| 145 | + sys.modules["cm"] = cm |
| 146 | + spec.loader.exec_module(cm) |
| 147 | + |
| 148 | + # Discover the names, and add the functions! |
| 149 | + for contender in dir(cm): |
| 150 | + if contender.startswith("_"): |
| 151 | + continue |
| 152 | + func = getattr(cm, contender) |
| 153 | + |
| 154 | + # We only care about functions |
| 155 | + if func.__class__.__name__ == "function": |
| 156 | + args = inspect.signature(func) |
| 157 | + |
| 158 | + # Must have at least one argument (the handle) |
| 159 | + # We could be more strict here, but this is probably OK |
| 160 | + if len(args.parameters) == 0: |
| 161 | + sys.exit(f"{contender} is not a valid function - has no arguments") |
| 162 | + print(f"Adding custom function {contender} to metrics.") |
| 163 | + custom_metrics[contender] = func |
| 164 | + |
| 165 | + # Cleanup |
| 166 | + shutil.rmtree(tmpdir) |
| 167 | + |
| 168 | + |
| 169 | +# Organize metrics by name |
79 | 170 | metrics = { |
| 171 | + # Node resources |
80 | 172 | "node_cores_free_count": node_core_free_count, |
81 | 173 | "node_cores_up_count": node_core_up_count, |
82 | 174 | "node_free_count": node_free_count, |
83 | 175 | "node_up_count": node_up_count, |
84 | | - # TODO add shared function to get queue stats |
| 176 | + # Queue states |
| 177 | + "job_queue_state_new_count": job_queue_state_new_count, |
| 178 | + "job_queue_state_depend_count": job_queue_state_depend_count, |
| 179 | + "job_queue_state_priority_count": job_queue_state_priority_count, |
| 180 | + "job_queue_state_sched_count": job_queue_state_sched_count, |
| 181 | + "job_queue_state_run_count": job_queue_state_run_count, |
| 182 | + "job_queue_state_cleanup_count": job_queue_state_cleanup_count, |
| 183 | + "job_queue_state_inactive_count": job_queue_state_inactive_count, |
85 | 184 | } |
| 185 | + |
| 186 | +# Custom metrics defined by the user (have the handle provided) |
| 187 | +custom_metrics = {} |
0 commit comments