Skip to content

Commit 7fe6183

Browse files
enhance error handling and logging in node management functions (#817)
Signed-off-by: haitwang-cloud <[email protected]>
1 parent 759be1c commit 7fe6183

File tree

9 files changed

+525
-92
lines changed

9 files changed

+525
-92
lines changed

cmd/scheduler/metrics.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,17 @@ func (cc ClusterManagerCollector) Collect(ch chan<- prometheus.Metric) {
189189
for _, podSingleDevice := range val.Devices {
190190
for ctridx, ctrdevs := range podSingleDevice {
191191
for _, ctrdevval := range ctrdevs {
192-
klog.Infoln("Collecting", val.Namespace, val.NodeID, val.Name, ctrdevval.UUID, ctrdevval.Usedcores, ctrdevval.Usedmem)
192+
klog.V(4).InfoS("Collecting metrics",
193+
"namespace", val.Namespace,
194+
"podName", val.Name,
195+
"deviceUUID", ctrdevval.UUID,
196+
"usedCores", ctrdevval.Usedcores,
197+
"usedMem", ctrdevval.Usedmem,
198+
"nodeID", val.NodeID,
199+
)
193200
if len(ctrdevval.UUID) == 0 {
194-
klog.Infof("UUID empty, omitted")
201+
klog.Warningf("Device UUID is empty, omitting metric collection for namespace=%s, podName=%s, ctridx=%d, nodeID=%s",
202+
val.Namespace, val.Name, ctridx, val.NodeID)
195203
continue
196204
}
197205
ch <- prometheus.MustNewConstMetric(
@@ -214,6 +222,11 @@ func (cc ClusterManagerCollector) Collect(ch chan<- prometheus.Metric) {
214222
break
215223
}
216224
}
225+
klog.V(4).InfoS("Total memory for device",
226+
"deviceUUID", ctrdevval.UUID,
227+
"totalMemory", totaldev,
228+
"nodeID", val.NodeID,
229+
)
217230
if totaldev > 0 {
218231
ch <- prometheus.MustNewConstMetric(
219232
ctrvGPUdeviceAllocatedMemoryPercentageDesc,

pkg/scheduler/pods.go

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,10 @@ type podInfo struct {
3636
CtrIDs []string
3737
}
3838

39-
// PodUseDeviceStat count pod use device info.
39+
// PodUseDeviceStat counts pod use device info.
4040
type PodUseDeviceStat struct {
41-
// count current node all running success pod num
42-
TotalPod int
43-
// only running success pod and use device pod can count.
44-
UseDevicePod int
41+
TotalPod int // Count of all running pods on the current node
42+
UseDevicePod int // Count of running pods that use devices
4543
}
4644

4745
type podManager struct {
@@ -51,59 +49,109 @@ type podManager struct {
5149

5250
func (m *podManager) init() {
5351
m.pods = make(map[k8stypes.UID]*podInfo)
52+
klog.InfoS("Pod manager initialized", "podCount", len(m.pods))
5453
}
5554

5655
func (m *podManager) addPod(pod *corev1.Pod, nodeID string, devices util.PodDevices) {
5756
m.mutex.Lock()
5857
defer m.mutex.Unlock()
59-
_, ok := m.pods[pod.UID]
60-
if !ok {
61-
pi := &podInfo{Name: pod.Name, UID: pod.UID, Namespace: pod.Namespace, NodeID: nodeID, Devices: devices}
58+
59+
_, exists := m.pods[pod.UID]
60+
if !exists {
61+
pi := &podInfo{
62+
Name: pod.Name,
63+
UID: pod.UID,
64+
Namespace: pod.Namespace,
65+
NodeID: nodeID,
66+
Devices: devices,
67+
}
6268
m.pods[pod.UID] = pi
63-
klog.Infof("Pod added: Name: %s, UID: %s, Namespace: %s, NodeID: %s", pod.Name, pod.UID, pod.Namespace, nodeID)
69+
klog.InfoS("Pod added",
70+
"pod", klog.KRef(pod.Namespace, pod.Name),
71+
"namespace", pod.Namespace,
72+
"name", pod.Name,
73+
"nodeID", nodeID,
74+
"devices", devices,
75+
)
6476
} else {
6577
m.pods[pod.UID].Devices = devices
78+
klog.InfoS("Pod devices updated",
79+
"pod", klog.KRef(pod.Namespace, pod.Name),
80+
"namespace", pod.Namespace,
81+
"name", pod.Name,
82+
"devices", devices,
83+
)
6684
}
6785
}
6886

6987
func (m *podManager) delPod(pod *corev1.Pod) {
7088
m.mutex.Lock()
7189
defer m.mutex.Unlock()
72-
pi, ok := m.pods[pod.UID]
73-
if ok {
74-
klog.Infof("Deleted pod %s with node ID %s", pi.Name, pi.NodeID)
90+
91+
pi, exists := m.pods[pod.UID]
92+
if exists {
93+
klog.InfoS("Pod deleted",
94+
"pod", klog.KRef(pod.Namespace, pod.Name),
95+
"namespace", pod.Namespace,
96+
"name", pod.Name,
97+
"nodeID", pi.NodeID,
98+
)
7599
delete(m.pods, pod.UID)
100+
} else {
101+
klog.InfoS("Pod not found for deletion",
102+
"pod", klog.KRef(pod.Namespace, pod.Name),
103+
"namespace", pod.Namespace,
104+
"name", pod.Name,
105+
)
76106
}
77107
}
78108

79109
func (m *podManager) ListPodsUID() ([]*corev1.Pod, error) {
80110
m.mutex.RLock()
81111
defer m.mutex.RUnlock()
82-
pods := make([]*corev1.Pod, 0)
112+
113+
pods := make([]*corev1.Pod, 0, len(m.pods))
83114
for uid := range m.pods {
84115
pods = append(pods, &corev1.Pod{
85116
ObjectMeta: metav1.ObjectMeta{
86117
UID: uid,
87118
},
88119
})
89120
}
121+
klog.InfoS("Listed pod UIDs",
122+
"podCount", len(pods),
123+
)
90124
return pods, nil
91125
}
92126

93127
func (m *podManager) ListPodsInfo() []*podInfo {
94128
m.mutex.RLock()
95129
defer m.mutex.RUnlock()
96-
pods := make([]*podInfo, 0)
97-
for key := range m.pods {
98-
values := m.pods[key]
99-
pods = append(pods, values)
130+
131+
pods := make([]*podInfo, 0, len(m.pods))
132+
for _, pi := range m.pods {
133+
pods = append(pods, pi)
134+
klog.InfoS("Pod info",
135+
"pod", klog.KRef(pi.Namespace, pi.Name),
136+
"namespace", pi.Namespace,
137+
"name", pi.Name,
138+
"nodeID", pi.NodeID,
139+
"devices", pi.Devices,
140+
)
100141
}
142+
klog.InfoS("Listed pod infos",
143+
"podCount", len(pods),
144+
)
101145
return pods
102146
}
103147

104148
func (m *podManager) GetScheduledPods() (map[k8stypes.UID]*podInfo, error) {
105149
m.mutex.RLock()
106150
defer m.mutex.RUnlock()
107-
klog.Infof("Getting all scheduled pods with %d nums", len(m.pods))
151+
152+
podCount := len(m.pods)
153+
klog.InfoS("Retrieved scheduled pods",
154+
"podCount", podCount,
155+
)
108156
return m.pods, nil
109157
}

pkg/scheduler/scheduler.go

Lines changed: 61 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -156,66 +156,83 @@ func (s *Scheduler) Stop() {
156156
}
157157

158158
func (s *Scheduler) RegisterFromNodeAnnotations() {
159-
klog.V(5).Infoln("Scheduler into RegisterFromNodeAnnotations")
159+
klog.InfoS("Entering RegisterFromNodeAnnotations")
160+
defer klog.InfoS("Exiting RegisterFromNodeAnnotations")
160161
ticker := time.NewTicker(time.Second * 15)
162+
defer ticker.Stop()
161163
printedLog := map[string]bool{}
162164
for {
163165
select {
164166
case <-s.nodeNotify:
167+
klog.InfoS("Received node notification")
165168
case <-ticker.C:
169+
klog.InfoS("Ticker triggered")
166170
case <-s.stopCh:
171+
klog.InfoS("Received stop signal, exiting RegisterFromNodeAnnotations")
167172
return
168173
}
169174
labelSelector := labels.Everything()
170175
if len(config.NodeLabelSelector) > 0 {
171176
labelSelector = (labels.Set)(config.NodeLabelSelector).AsSelector()
177+
klog.InfoS("Using label selector", "selector", labelSelector.String())
172178
}
173179
rawNodes, err := s.nodeLister.List(labelSelector)
174180
if err != nil {
175-
klog.Errorln("nodes list failed", err.Error())
181+
klog.ErrorS(err, "Failed to list nodes with selector", "selector", labelSelector.String())
176182
continue
177183
}
184+
klog.InfoS("Listed nodes", "nodeCount", len(rawNodes))
178185
var nodeNames []string
179186
for _, val := range rawNodes {
180187
nodeNames = append(nodeNames, val.Name)
188+
klog.InfoS("Processing node", "nodeName", val.Name)
189+
181190
for devhandsk, devInstance := range device.GetDevices() {
191+
klog.InfoS("Checking device health", "nodeName", val.Name, "deviceVendor", devhandsk)
192+
182193
health, needUpdate := devInstance.CheckHealth(devhandsk, val)
183-
klog.V(5).InfoS("device check health", "node", val.Name, "deviceVendor", devhandsk, "health", health, "needUpdate", needUpdate)
194+
klog.InfoS("Device health check result", "nodeName", val.Name, "deviceVendor", devhandsk, "health", health, "needUpdate", needUpdate)
195+
184196
if !health {
197+
klog.Warning("Device is unhealthy, cleaning up node", "nodeName", val.Name, "deviceVendor", devhandsk)
185198
err := devInstance.NodeCleanUp(val.Name)
186-
// If the device is not healthy, the device is removed from the node.
187-
// At the same time, this node needs to be removed from the cache.
188199
if err != nil {
189-
klog.Errorln("node cleanup failed", err.Error())
200+
klog.ErrorS(err, "Node cleanup failed", "nodeName", val.Name, "deviceVendor", devhandsk)
190201
}
202+
191203
info, ok := s.nodes[val.Name]
192204
if ok {
193-
klog.Infof("node %v device %s:%v leave, %v remaining devices:%v", val.Name, devhandsk, info.ID, err, s.nodes[val.Name].Devices)
205+
klog.InfoS("Removing device from node", "nodeName", val.Name, "deviceVendor", devhandsk, "remainingDevices", s.nodes[val.Name].Devices)
194206
s.rmNodeDevice(val.Name, info, devhandsk)
195-
continue
196207
}
208+
continue
197209
}
198210
if !needUpdate {
211+
klog.InfoS("No update needed for device", "nodeName", val.Name, "deviceVendor", devhandsk)
199212
continue
200213
}
201214
_, ok := util.HandshakeAnnos[devhandsk]
202215
if ok {
203216
tmppat := make(map[string]string)
204217
tmppat[util.HandshakeAnnos[devhandsk]] = "Requesting_" + time.Now().Format(time.DateTime)
205-
klog.V(5).InfoS("New timestamp", util.HandshakeAnnos[devhandsk], tmppat[util.HandshakeAnnos[devhandsk]], "nodeName", val.Name)
218+
klog.InfoS("New timestamp for annotation", "nodeName", val.Name, "annotationKey", util.HandshakeAnnos[devhandsk], "annotationValue", tmppat[util.HandshakeAnnos[devhandsk]])
206219
n, err := util.GetNode(val.Name)
207220
if err != nil {
208-
klog.Errorln("get node failed", err.Error())
221+
klog.ErrorS(err, "Failed to get node", "nodeName", val.Name)
209222
continue
210223
}
211-
util.PatchNodeAnnotations(n, tmppat)
224+
klog.InfoS("Patching node annotations", "nodeName", val.Name, "annotations", tmppat)
225+
if err := util.PatchNodeAnnotations(n, tmppat); err != nil {
226+
klog.ErrorS(err, "Failed to patch node annotations", "nodeName", val.Name)
227+
}
212228
}
213-
214229
nodeInfo := &util.NodeInfo{}
215230
nodeInfo.ID = val.Name
216231
nodeInfo.Node = val
232+
klog.InfoS("Fetching node devices", "nodeName", val.Name, "deviceVendor", devhandsk)
217233
nodedevices, err := devInstance.GetNodeDevices(*val)
218234
if err != nil {
235+
klog.ErrorS(err, "Failed to get node devices", "nodeName", val.Name, "deviceVendor", devhandsk)
219236
continue
220237
}
221238
nodeInfo.Devices = make([]util.DeviceInfo, 0)
@@ -225,17 +242,17 @@ func (s *Scheduler) RegisterFromNodeAnnotations() {
225242
s.addNode(val.Name, nodeInfo)
226243
if s.nodes[val.Name] != nil && len(nodeInfo.Devices) > 0 {
227244
if printedLog[val.Name] {
228-
klog.Infof("node %v device %s come node info=%s,%v total=%v", val.Name, devhandsk, nodeInfo.ID, nodeInfo.Devices, s.nodes[val.Name].Devices)
229-
printedLog[val.Name] = true
245+
klog.InfoS("Node device updated", "nodeName", val.Name, "deviceVendor", devhandsk, "nodeInfo", nodeInfo, "totalDevices", s.nodes[val.Name].Devices)
230246
} else {
231-
klog.V(5).Infof("node %v device %s come node info=%s,%v total=%v", val.Name, devhandsk, nodeInfo.ID, nodeInfo.Devices, s.nodes[val.Name].Devices)
247+
klog.InfoS("Node device added", "nodeName", val.Name, "deviceVendor", devhandsk, "nodeInfo", nodeInfo, "totalDevices", s.nodes[val.Name].Devices)
248+
printedLog[val.Name] = true
232249
}
233250
}
234251
}
235252
}
236253
_, _, err = s.getNodesUsage(&nodeNames, nil)
237254
if err != nil {
238-
klog.Errorln("get node usage failed", err.Error())
255+
klog.ErrorS(err, "Failed to get node usage", "nodeNames", nodeNames)
239256
}
240257
}
241258
}
@@ -377,62 +394,63 @@ func (s *Scheduler) getPodUsage() (map[string]PodUseDeviceStat, error) {
377394
}
378395

379396
func (s *Scheduler) Bind(args extenderv1.ExtenderBindingArgs) (*extenderv1.ExtenderBindingResult, error) {
380-
klog.InfoS("Bind", "pod", args.PodName, "namespace", args.PodNamespace, "podUID", args.PodUID, "node", args.Node)
381-
var err error
397+
klog.InfoS("Attempting to bind pod to node", "pod", args.PodName, "namespace", args.PodNamespace, "node", args.Node)
382398
var res *extenderv1.ExtenderBindingResult
399+
383400
binding := &corev1.Binding{
384401
ObjectMeta: metav1.ObjectMeta{Name: args.PodName, UID: args.PodUID},
385402
Target: corev1.ObjectReference{Kind: "Node", Name: args.Node},
386403
}
387404
current, err := s.kubeClient.CoreV1().Pods(args.PodNamespace).Get(context.Background(), args.PodName, metav1.GetOptions{})
388405
if err != nil {
389-
klog.ErrorS(err, "Get pod failed")
406+
klog.ErrorS(err, "Failed to get pod", "pod", args.PodName, "namespace", args.PodNamespace)
407+
return &extenderv1.ExtenderBindingResult{Error: err.Error()}, err
390408
}
391-
409+
klog.InfoS("Trying to get the target node for pod", "pod", args.PodName, "namespace", args.PodNamespace, "node", args.Node)
392410
node, err := s.kubeClient.CoreV1().Nodes().Get(context.Background(), args.Node, metav1.GetOptions{})
393411
if err != nil {
394412
klog.ErrorS(err, "Failed to get node", "node", args.Node)
395-
s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, fmt.Errorf("failed to get node %v", args.Node))
396-
res = &extenderv1.ExtenderBindingResult{
397-
Error: err.Error(),
398-
}
413+
s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, fmt.Errorf("failed to get node %s", args.Node))
414+
res = &extenderv1.ExtenderBindingResult{Error: err.Error()}
399415
return res, nil
400416
}
401-
tmppatch := make(map[string]string)
417+
418+
tmppatch := map[string]string{
419+
util.DeviceBindPhase: "allocating",
420+
util.BindTimeAnnotations: strconv.FormatInt(time.Now().Unix(), 10),
421+
}
422+
402423
for _, val := range device.GetDevices() {
403424
err = val.LockNode(node, current)
404425
if err != nil {
426+
klog.ErrorS(err, "Failed to lock node", "node", args.Node, "device", val)
405427
goto ReleaseNodeLocks
406428
}
407429
}
408430

409-
tmppatch[util.DeviceBindPhase] = "allocating"
410-
tmppatch[util.BindTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)
411-
412431
err = util.PatchPodAnnotations(current, tmppatch)
413432
if err != nil {
414-
klog.ErrorS(err, "patch pod annotation failed")
415-
}
416-
if err = s.kubeClient.CoreV1().Pods(args.PodNamespace).Bind(context.Background(), binding, metav1.CreateOptions{}); err != nil {
417-
klog.ErrorS(err, "Failed to bind pod", "pod", args.PodName, "namespace", args.PodNamespace, "podUID", args.PodUID, "node", args.Node)
433+
klog.ErrorS(err, "Failed to patch pod annotations", "pod", klog.KObj(current))
434+
return &extenderv1.ExtenderBindingResult{Error: err.Error()}, err
418435
}
419-
if err == nil {
420-
s.recordScheduleBindingResultEvent(current, EventReasonBindingSucceed, []string{args.Node}, nil)
421-
res = &extenderv1.ExtenderBindingResult{
422-
Error: "",
423-
}
424-
klog.InfoS("bind success", "pod", args.PodName, "namespace", args.PodNamespace, "podUID", args.PodUID, "node", args.Node)
425-
return res, nil
436+
437+
err = s.kubeClient.CoreV1().Pods(args.PodNamespace).Bind(context.Background(), binding, metav1.CreateOptions{})
438+
if err != nil {
439+
klog.ErrorS(err, "Failed to bind pod", "pod", args.PodName, "namespace", args.PodNamespace, "node", args.Node)
440+
goto ReleaseNodeLocks
426441
}
442+
443+
s.recordScheduleBindingResultEvent(current, EventReasonBindingSucceed, []string{args.Node}, nil)
444+
klog.InfoS("Successfully bound pod to node", "pod", args.PodName, "namespace", args.PodNamespace, "node", args.Node)
445+
return &extenderv1.ExtenderBindingResult{Error: ""}, nil
446+
427447
ReleaseNodeLocks:
428-
klog.InfoS("bind failed", "err", err.Error())
448+
klog.InfoS("Release node locks", "node", args.Node)
429449
for _, val := range device.GetDevices() {
430450
val.ReleaseNodeLock(node, current)
431451
}
432452
s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, err)
433-
return &extenderv1.ExtenderBindingResult{
434-
Error: err.Error(),
435-
}, nil
453+
return &extenderv1.ExtenderBindingResult{Error: err.Error()}, nil
436454
}
437455

438456
func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {

0 commit comments

Comments
 (0)