|
12 | 12 | // See the License for the specific language governing permissions and |
13 | 13 | // limitations under the License. |
14 | 14 |
|
15 | | -use std::collections::HashMap; |
16 | | -use std::sync::{Arc, LazyLock}; |
| 15 | +use std::sync::LazyLock; |
17 | 16 |
|
18 | | -use itertools::Itertools; |
19 | | -use parking_lot::Mutex; |
20 | | -use prometheus::Registry; |
21 | | -use prometheus::core::{Collector, Desc}; |
22 | | -use prometheus::proto::{Gauge, LabelPair, Metric, MetricFamily}; |
23 | | -use rw_iter_util::ZipEqFast; |
| 17 | +use prometheus::{IntCounterVec, Registry, register_int_counter_vec_with_registry}; |
24 | 18 |
|
25 | 19 | use crate::monitor::GLOBAL_METRICS_REGISTRY; |
26 | 20 |
|
| 21 | +#[derive(Clone)] |
27 | 22 | pub struct ErrorMetric<const N: usize> { |
28 | | - payload: Arc<Mutex<HashMap<[String; N], u32>>>, |
29 | | - desc: Desc, |
| 23 | + inner: IntCounterVec, |
30 | 24 | } |
31 | 25 |
|
32 | 26 | impl<const N: usize> ErrorMetric<N> { |
33 | | - pub fn new(name: &str, help: &str, label_names: &[&str; N]) -> Self { |
| 27 | + pub fn new(name: &str, help: &str, label_names: &[&str; N], registry: &Registry) -> Self { |
34 | 28 | Self { |
35 | | - payload: Default::default(), |
36 | | - desc: Desc::new( |
37 | | - name.to_owned(), |
38 | | - help.to_owned(), |
39 | | - label_names.iter().map(|l| l.to_string()).collect_vec(), |
40 | | - Default::default(), |
41 | | - ) |
42 | | - .unwrap(), |
| 29 | + inner: register_int_counter_vec_with_registry!(name, help, label_names, registry) |
| 30 | + .unwrap(), |
43 | 31 | } |
44 | 32 | } |
45 | 33 |
|
46 | 34 | pub fn report(&self, labels: [String; N]) { |
47 | | - let mut m = self.payload.lock(); |
48 | | - let v = m.entry(labels).or_default(); |
49 | | - *v += 1; |
50 | | - } |
51 | | - |
52 | | - fn collect(&self) -> MetricFamily { |
53 | | - let mut m = MetricFamily::default(); |
54 | | - m.set_name(self.desc.fq_name.clone()); |
55 | | - m.set_help(self.desc.help.clone()); |
56 | | - m.set_field_type(prometheus::proto::MetricType::GAUGE); |
57 | | - |
58 | | - let payload = self.payload.lock().drain().collect_vec(); |
59 | | - let mut metrics = Vec::with_capacity(payload.len()); |
60 | | - for (labels, count) in payload { |
61 | | - let mut label_pairs = Vec::with_capacity(self.desc.variable_labels.len()); |
62 | | - for (name, label) in self.desc.variable_labels.iter().zip_eq_fast(labels) { |
63 | | - let mut label_pair = LabelPair::default(); |
64 | | - label_pair.set_name(name.clone()); |
65 | | - label_pair.set_value(label); |
66 | | - label_pairs.push(label_pair); |
67 | | - } |
68 | | - |
69 | | - let mut metric = Metric::new(); |
70 | | - metric.set_label(label_pairs); |
71 | | - let mut gauge = Gauge::default(); |
72 | | - gauge.set_value(count as f64); |
73 | | - metric.set_gauge(gauge); |
74 | | - metrics.push(metric); |
75 | | - } |
76 | | - m.set_metric(metrics); |
77 | | - m |
| 35 | + self.inner.with_label_values(&labels).inc(); |
78 | 36 | } |
79 | 37 | } |
80 | 38 |
|
81 | | -pub type ErrorMetricRef<const N: usize> = Arc<ErrorMetric<N>>; |
82 | | - |
83 | 39 | /// Metrics for counting errors in the system. |
84 | | -/// The detailed error messages are not supposed to be stored in the metrics, but in the logs. |
85 | 40 | /// |
86 | 41 | /// Please avoid adding new error metrics here. Instead, introduce new `error_type` for new errors. |
87 | 42 | #[derive(Clone)] |
88 | 43 | pub struct ErrorMetrics { |
89 | | - pub user_sink_error: ErrorMetricRef<4>, |
90 | | - pub user_compute_error: ErrorMetricRef<3>, |
91 | | - pub user_source_error: ErrorMetricRef<4>, |
| 44 | + pub user_sink_error: ErrorMetric<4>, |
| 45 | + pub user_compute_error: ErrorMetric<3>, |
| 46 | + pub user_source_error: ErrorMetric<4>, |
92 | 47 | } |
93 | 48 |
|
94 | 49 | impl ErrorMetrics { |
95 | | - pub fn new() -> Self { |
| 50 | + pub fn new(registry: &Registry) -> Self { |
96 | 51 | Self { |
97 | | - user_sink_error: Arc::new(ErrorMetric::new( |
98 | | - "user_sink_error", |
| 52 | + user_sink_error: ErrorMetric::new( |
| 53 | + "user_sink_error_cnt", |
99 | 54 | "Sink errors in the system, queryable by tags", |
100 | 55 | &["error_type", "sink_id", "sink_name", "fragment_id"], |
101 | | - )), |
102 | | - user_compute_error: Arc::new(ErrorMetric::new( |
103 | | - "user_compute_error", |
| 56 | + registry, |
| 57 | + ), |
| 58 | + user_compute_error: ErrorMetric::new( |
| 59 | + "user_compute_error_cnt", |
104 | 60 | "Compute errors in the system, queryable by tags", |
105 | 61 | &["error_type", "executor_name", "fragment_id"], |
106 | | - )), |
107 | | - user_source_error: Arc::new(ErrorMetric::new( |
108 | | - "user_source_error", |
| 62 | + registry, |
| 63 | + ), |
| 64 | + user_source_error: ErrorMetric::new( |
| 65 | + "user_source_error_cnt", |
109 | 66 | "Source errors in the system, queryable by tags", |
110 | 67 | &["error_type", "source_id", "source_name", "fragment_id"], |
111 | | - )), |
| 68 | + registry, |
| 69 | + ), |
112 | 70 | } |
113 | 71 | } |
114 | | - |
115 | | - fn desc(&self) -> Vec<&Desc> { |
116 | | - vec![ |
117 | | - &self.user_sink_error.desc, |
118 | | - &self.user_compute_error.desc, |
119 | | - &self.user_source_error.desc, |
120 | | - ] |
121 | | - } |
122 | | - |
123 | | - fn collect(&self) -> Vec<prometheus::proto::MetricFamily> { |
124 | | - vec![ |
125 | | - self.user_sink_error.collect(), |
126 | | - self.user_compute_error.collect(), |
127 | | - self.user_source_error.collect(), |
128 | | - ] |
129 | | - } |
130 | | -} |
131 | | - |
132 | | -impl Default for ErrorMetrics { |
133 | | - fn default() -> Self { |
134 | | - ErrorMetrics::new() |
135 | | - } |
136 | | -} |
137 | | - |
138 | | -pub struct ErrorMetricsCollector { |
139 | | - metrics: ErrorMetrics, |
140 | | -} |
141 | | - |
142 | | -impl Collector for ErrorMetricsCollector { |
143 | | - fn desc(&self) -> Vec<&Desc> { |
144 | | - self.metrics.desc() |
145 | | - } |
146 | | - |
147 | | - fn collect(&self) -> Vec<prometheus::proto::MetricFamily> { |
148 | | - self.metrics.collect() |
149 | | - } |
150 | | -} |
151 | | - |
152 | | -pub fn monitor_errors(registry: &Registry, metrics: ErrorMetrics) { |
153 | | - let ec = ErrorMetricsCollector { metrics }; |
154 | | - registry.register(Box::new(ec)).unwrap() |
155 | 72 | } |
156 | 73 |
|
157 | | -pub static GLOBAL_ERROR_METRICS: LazyLock<ErrorMetrics> = LazyLock::new(|| { |
158 | | - let e = ErrorMetrics::new(); |
159 | | - monitor_errors(&GLOBAL_METRICS_REGISTRY, e.clone()); |
160 | | - e |
161 | | -}); |
| 74 | +pub static GLOBAL_ERROR_METRICS: LazyLock<ErrorMetrics> = |
| 75 | + LazyLock::new(|| ErrorMetrics::new(&GLOBAL_METRICS_REGISTRY)); |
0 commit comments