@@ -51,6 +51,9 @@ import (
51
51
"k8s.io/client-go/rest"
52
52
"k8s.io/client-go/tools/cache"
53
53
"k8s.io/client-go/util/workqueue"
54
+
55
+ "github.com/prometheus/client_golang/prometheus"
56
+ "github.com/prometheus/client_golang/prometheus/promauto"
54
57
)
55
58
56
59
const maxRetries = 5
@@ -93,6 +96,14 @@ func objName(obj interface{}) string {
93
96
func Start (conf * config.Config , eventHandler handlers.Handler ) {
94
97
var kubeClient kubernetes.Interface
95
98
var dynamicClient dynamic.Interface
99
+
100
+ kubewatchEventsMetrics := promauto .NewCounterVec (
101
+ prometheus.CounterOpts {
102
+ Name : "kubewatch_events_total" ,
103
+ Help : "The total number of Kubernetes events observed by Kubewatch, labeled by resource and event type" ,
104
+ },
105
+ []string {"resourceType" , "eventType" },
106
+ )
96
107
97
108
if _ , err := rest .InClusterConfig (); err != nil {
98
109
kubeClient = utils .GetClientOutOfCluster ()
@@ -120,7 +131,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
120
131
cache.Indexers {},
121
132
)
122
133
123
- allCoreEventsController := newResourceController (kubeClient , eventHandler , allCoreEventsInformer , objName (api_v1.Event {}), V1 )
134
+ allCoreEventsController := newResourceController (kubeClient , eventHandler , allCoreEventsInformer , objName (api_v1.Event {}), V1 , kubewatchEventsMetrics )
124
135
stopAllCoreEventsCh := make (chan struct {})
125
136
defer close (stopAllCoreEventsCh )
126
137
@@ -144,7 +155,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
144
155
cache.Indexers {},
145
156
)
146
157
147
- allEventsController := newResourceController (kubeClient , eventHandler , allEventsInformer , objName (events_v1.Event {}), EVENTS_V1 )
158
+ allEventsController := newResourceController (kubeClient , eventHandler , allEventsInformer , objName (events_v1.Event {}), EVENTS_V1 , kubewatchEventsMetrics )
148
159
stopAllEventsCh := make (chan struct {})
149
160
defer close (stopAllEventsCh )
150
161
@@ -166,7 +177,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
166
177
cache.Indexers {},
167
178
)
168
179
169
- c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.Pod {}), V1 )
180
+ c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.Pod {}), V1 , kubewatchEventsMetrics )
170
181
stopCh := make (chan struct {})
171
182
defer close (stopCh )
172
183
@@ -188,7 +199,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
188
199
cache.Indexers {},
189
200
)
190
201
191
- c := newResourceController (kubeClient , eventHandler , informer , objName (autoscaling_v1.HorizontalPodAutoscaler {}), AUTOSCALING_V1 )
202
+ c := newResourceController (kubeClient , eventHandler , informer , objName (autoscaling_v1.HorizontalPodAutoscaler {}), AUTOSCALING_V1 , kubewatchEventsMetrics )
192
203
stopCh := make (chan struct {})
193
204
defer close (stopCh )
194
205
@@ -211,7 +222,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
211
222
cache.Indexers {},
212
223
)
213
224
214
- c := newResourceController (kubeClient , eventHandler , informer , objName (apps_v1.DaemonSet {}), APPS_V1 )
225
+ c := newResourceController (kubeClient , eventHandler , informer , objName (apps_v1.DaemonSet {}), APPS_V1 , kubewatchEventsMetrics )
215
226
stopCh := make (chan struct {})
216
227
defer close (stopCh )
217
228
@@ -233,7 +244,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
233
244
cache.Indexers {},
234
245
)
235
246
236
- c := newResourceController (kubeClient , eventHandler , informer , objName (apps_v1.StatefulSet {}), APPS_V1 )
247
+ c := newResourceController (kubeClient , eventHandler , informer , objName (apps_v1.StatefulSet {}), APPS_V1 , kubewatchEventsMetrics )
237
248
stopCh := make (chan struct {})
238
249
defer close (stopCh )
239
250
@@ -255,7 +266,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
255
266
cache.Indexers {},
256
267
)
257
268
258
- c := newResourceController (kubeClient , eventHandler , informer , objName (apps_v1.ReplicaSet {}), APPS_V1 )
269
+ c := newResourceController (kubeClient , eventHandler , informer , objName (apps_v1.ReplicaSet {}), APPS_V1 , kubewatchEventsMetrics )
259
270
stopCh := make (chan struct {})
260
271
defer close (stopCh )
261
272
@@ -277,7 +288,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
277
288
cache.Indexers {},
278
289
)
279
290
280
- c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.Service {}), V1 )
291
+ c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.Service {}), V1 , kubewatchEventsMetrics )
281
292
stopCh := make (chan struct {})
282
293
defer close (stopCh )
283
294
@@ -299,7 +310,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
299
310
cache.Indexers {},
300
311
)
301
312
302
- c := newResourceController (kubeClient , eventHandler , informer , objName (apps_v1.Deployment {}), APPS_V1 )
313
+ c := newResourceController (kubeClient , eventHandler , informer , objName (apps_v1.Deployment {}), APPS_V1 , kubewatchEventsMetrics )
303
314
stopCh := make (chan struct {})
304
315
defer close (stopCh )
305
316
@@ -321,7 +332,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
321
332
cache.Indexers {},
322
333
)
323
334
324
- c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.Namespace {}), V1 )
335
+ c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.Namespace {}), V1 , kubewatchEventsMetrics )
325
336
stopCh := make (chan struct {})
326
337
defer close (stopCh )
327
338
@@ -343,7 +354,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
343
354
cache.Indexers {},
344
355
)
345
356
346
- c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.ReplicationController {}), V1 )
357
+ c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.ReplicationController {}), V1 , kubewatchEventsMetrics )
347
358
stopCh := make (chan struct {})
348
359
defer close (stopCh )
349
360
@@ -365,7 +376,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
365
376
cache.Indexers {},
366
377
)
367
378
368
- c := newResourceController (kubeClient , eventHandler , informer , objName (batch_v1.Job {}), BATCH_V1 )
379
+ c := newResourceController (kubeClient , eventHandler , informer , objName (batch_v1.Job {}), BATCH_V1 , kubewatchEventsMetrics )
369
380
stopCh := make (chan struct {})
370
381
defer close (stopCh )
371
382
@@ -387,7 +398,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
387
398
cache.Indexers {},
388
399
)
389
400
390
- c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.Node {}), V1 )
401
+ c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.Node {}), V1 , kubewatchEventsMetrics )
391
402
stopCh := make (chan struct {})
392
403
defer close (stopCh )
393
404
@@ -409,7 +420,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
409
420
cache.Indexers {},
410
421
)
411
422
412
- c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.ServiceAccount {}), V1 )
423
+ c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.ServiceAccount {}), V1 , kubewatchEventsMetrics )
413
424
stopCh := make (chan struct {})
414
425
defer close (stopCh )
415
426
@@ -431,7 +442,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
431
442
cache.Indexers {},
432
443
)
433
444
434
- c := newResourceController (kubeClient , eventHandler , informer , objName (rbac_v1.ClusterRole {}), RBAC_V1 )
445
+ c := newResourceController (kubeClient , eventHandler , informer , objName (rbac_v1.ClusterRole {}), RBAC_V1 , kubewatchEventsMetrics )
435
446
stopCh := make (chan struct {})
436
447
defer close (stopCh )
437
448
@@ -453,7 +464,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
453
464
cache.Indexers {},
454
465
)
455
466
456
- c := newResourceController (kubeClient , eventHandler , informer , objName (rbac_v1.ClusterRoleBinding {}), RBAC_V1 )
467
+ c := newResourceController (kubeClient , eventHandler , informer , objName (rbac_v1.ClusterRoleBinding {}), RBAC_V1 , kubewatchEventsMetrics )
457
468
stopCh := make (chan struct {})
458
469
defer close (stopCh )
459
470
@@ -475,7 +486,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
475
486
cache.Indexers {},
476
487
)
477
488
478
- c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.PersistentVolume {}), V1 )
489
+ c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.PersistentVolume {}), V1 , kubewatchEventsMetrics )
479
490
stopCh := make (chan struct {})
480
491
defer close (stopCh )
481
492
@@ -497,7 +508,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
497
508
cache.Indexers {},
498
509
)
499
510
500
- c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.Secret {}), V1 )
511
+ c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.Secret {}), V1 , kubewatchEventsMetrics )
501
512
stopCh := make (chan struct {})
502
513
defer close (stopCh )
503
514
@@ -519,7 +530,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
519
530
cache.Indexers {},
520
531
)
521
532
522
- c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.ConfigMap {}), V1 )
533
+ c := newResourceController (kubeClient , eventHandler , informer , objName (api_v1.ConfigMap {}), V1 , kubewatchEventsMetrics )
523
534
stopCh := make (chan struct {})
524
535
defer close (stopCh )
525
536
@@ -541,7 +552,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
541
552
cache.Indexers {},
542
553
)
543
554
544
- c := newResourceController (kubeClient , eventHandler , informer , objName (networking_v1.Ingress {}), NETWORKING_V1 )
555
+ c := newResourceController (kubeClient , eventHandler , informer , objName (networking_v1.Ingress {}), NETWORKING_V1 , kubewatchEventsMetrics )
545
556
stopCh := make (chan struct {})
546
557
defer close (stopCh )
547
558
@@ -572,7 +583,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
572
583
cache.Indexers {},
573
584
)
574
585
575
- c := newResourceController (kubeClient , eventHandler , informer , crd .Resource , fmt .Sprintf ("%s/%s" , crd .Group , crd .Version ))
586
+ c := newResourceController (kubeClient , eventHandler , informer , crd .Resource , fmt .Sprintf ("%s/%s" , crd .Group , crd .Version ), kubewatchEventsMetrics )
576
587
stopCh := make (chan struct {})
577
588
defer close (stopCh )
578
589
@@ -585,7 +596,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
585
596
<- sigterm
586
597
}
587
598
588
- func newResourceController (client kubernetes.Interface , eventHandler handlers.Handler , informer cache.SharedIndexInformer , resourceType string , apiVersion string ) * Controller {
599
+ func newResourceController (client kubernetes.Interface , eventHandler handlers.Handler , informer cache.SharedIndexInformer , resourceType string , apiVersion string , kubewatchEventsMetrics * prometheus. CounterVec ) * Controller {
589
600
queue := workqueue .NewRateLimitingQueue (workqueue .DefaultControllerRateLimiter ())
590
601
var newEvent Event
591
602
var err error
@@ -605,6 +616,8 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha
605
616
if err == nil {
606
617
queue .Add (newEvent )
607
618
}
619
+
620
+ kubewatchEventsMetrics .WithLabelValues (resourceType , "create" ).Inc ()
608
621
},
609
622
UpdateFunc : func (old , new interface {}) {
610
623
var ok bool
@@ -625,6 +638,8 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha
625
638
if err == nil {
626
639
queue .Add (newEvent )
627
640
}
641
+
642
+ kubewatchEventsMetrics .WithLabelValues (resourceType , "update" ).Inc ()
628
643
},
629
644
DeleteFunc : func (obj interface {}) {
630
645
var ok bool
@@ -641,6 +656,8 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha
641
656
if err == nil {
642
657
queue .Add (newEvent )
643
658
}
659
+
660
+ kubewatchEventsMetrics .WithLabelValues (resourceType , "delete" ).Inc ()
644
661
},
645
662
})
646
663
0 commit comments