Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 66 additions & 32 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
}
s.nodeManager = newNodeManager()
s.podManager = newPodManager()
klog.V(2).InfoS("Scheduler initialized successfully")
klog.InfoS("Scheduler initialized successfully")
return s
}

Expand All @@ -87,7 +87,7 @@
klog.ErrorS(fmt.Errorf("invalid pod object"), "Failed to process pod addition")
return
}
klog.V(5).InfoS("Pod added", "pod", pod.Name, "namespace", pod.Namespace)
klog.V(4).InfoS("Pod added", "pod", pod.Name, "namespace", pod.Namespace)
nodeID, ok := pod.Annotations[util.AssignedNodeAnnotations]
if !ok {
return
Expand All @@ -107,7 +107,7 @@
func (s *Scheduler) onDelPod(obj any) {
pod, ok := obj.(*corev1.Pod)
if !ok {
klog.Errorf("unknown add object type")
klog.ErrorS(fmt.Errorf("unknown object type"), "Failed to process pod deletion")

Check warning on line 110 in pkg/scheduler/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/scheduler.go#L110

Added line #L110 was not covered by tests
return
}
_, ok = pod.Annotations[util.AssignedNodeAnnotations]
Expand Down Expand Up @@ -156,9 +156,9 @@
for {
select {
case <-s.nodeNotify:
klog.V(5).InfoS("Received node notification")
klog.V(4).InfoS("Received node notification")
case <-ticker.C:
klog.InfoS("Ticker triggered")
klog.V(5).InfoS("Ticker triggered")

Check warning on line 161 in pkg/scheduler/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/scheduler.go#L161

Added line #L161 was not covered by tests
case <-s.stopCh:
klog.InfoS("Received stop signal, exiting RegisterFromNodeAnnotations")
return
Expand All @@ -168,26 +168,36 @@
klog.ErrorS(err, "Failed to list nodes with selector", "selector", labelSelector.String())
continue
}
klog.V(5).InfoS("Listed nodes", "nodeCount", len(rawNodes))

klog.V(4).InfoS("Listed nodes", "nodeCount", len(rawNodes))
var nodeNames []string
for _, val := range rawNodes {
nodeNames = append(nodeNames, val.Name)
klog.V(5).InfoS("Processing node", "nodeName", val.Name)

klog.V(4).InfoS("Processing node", "nodeName", val.Name)

for devhandsk, devInstance := range device.GetDevices() {
klog.V(5).InfoS("Checking device health", "nodeName", val.Name, "deviceVendor", devhandsk)

klog.V(4).InfoS("Checking device health", "nodeName", val.Name, "deviceVendor", devhandsk)

nodedevices, err := devInstance.GetNodeDevices(*val)
if err != nil {
klog.V(5).InfoS("Failed to get node devices", "nodeName", val.Name, "deviceVendor", devhandsk)
continue
klog.V(3).InfoS("Failed to update node device status", "nodeName", val.Name, "deviceVendor", devhandsk, "error", err)
}

health, needUpdate := devInstance.CheckHealth(devhandsk, val)
klog.V(5).InfoS("Device health check result", "nodeName", val.Name, "deviceVendor", devhandsk, "health", health, "needUpdate", needUpdate)

klog.V(4).InfoS("Device health check result",
"nodeName", val.Name,
"deviceVendor", devhandsk,
"health", health,
"needUpdate", needUpdate)

if !health {
klog.Warning("Device is unhealthy, cleaning up node", "nodeName", val.Name, "deviceVendor", devhandsk)

klog.Warning("Device is unhealthy, cleaning up node",
"nodeName", val.Name,
"deviceVendor", devhandsk)
err := devInstance.NodeCleanUp(val.Name)
if err != nil {
klog.ErrorS(err, "Node cleanup failed", "nodeName", val.Name, "deviceVendor", devhandsk)
Expand All @@ -197,40 +207,47 @@
continue
}
if !needUpdate {
klog.V(5).InfoS("No update needed for device", "nodeName", val.Name, "deviceVendor", devhandsk)
klog.V(4).InfoS("No update needed for device", "nodeName", val.Name, "deviceVendor", devhandsk)

Check warning on line 210 in pkg/scheduler/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/scheduler.go#L210

Added line #L210 was not covered by tests
continue
}
_, ok := util.HandshakeAnnos[devhandsk]
if ok {
tmppat := make(map[string]string)
tmppat[util.HandshakeAnnos[devhandsk]] = "Requesting_" + time.Now().Format(time.DateTime)
klog.InfoS("New timestamp for annotation", "nodeName", val.Name, "annotationKey", util.HandshakeAnnos[devhandsk], "annotationValue", tmppat[util.HandshakeAnnos[devhandsk]])
klog.V(3).InfoS("New timestamp for annotation",
"nodeName", val.Name,
"annotationKey", util.HandshakeAnnos[devhandsk],
"annotationValue", tmppat[util.HandshakeAnnos[devhandsk]])
n, err := util.GetNode(val.Name)
if err != nil {
klog.ErrorS(err, "Failed to get node", "nodeName", val.Name)
continue
}
klog.V(5).InfoS("Patching node annotations", "nodeName", val.Name, "annotations", tmppat)
klog.V(4).InfoS("Patching node annotations", "nodeName", val.Name, "annotations", tmppat)
if err := util.PatchNodeAnnotations(n, tmppat); err != nil {
klog.ErrorS(err, "Failed to patch node annotations", "nodeName", val.Name)
}
}
nodeInfo := &util.NodeInfo{}
nodeInfo.ID = val.Name
nodeInfo.Node = val
klog.V(5).InfoS("Fetching node devices", "nodeName", val.Name, "deviceVendor", devhandsk)
klog.V(4).InfoS("Fetching node devices", "nodeName", val.Name, "deviceVendor", devhandsk)
nodeInfo.Devices = make([]util.DeviceInfo, 0)
for _, deviceinfo := range nodedevices {
nodeInfo.Devices = append(nodeInfo.Devices, *deviceinfo)
}
s.addNode(val.Name, nodeInfo)
if s.nodes[val.Name] != nil && len(nodeInfo.Devices) > 0 {
if printedLog[val.Name] {
klog.V(5).InfoS("Node device updated", "nodeName", val.Name, "deviceVendor", devhandsk, "nodeInfo", nodeInfo, "totalDevices", s.nodes[val.Name].Devices)
klog.InfoS("Node device updated", "nodeName", val.Name, "deviceVendor", devhandsk, "nodeInfo", nodeInfo, "totalDevices", s.nodes[val.Name].Devices)

Check warning on line 242 in pkg/scheduler/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/scheduler.go#L242

Added line #L242 was not covered by tests
} else {
klog.InfoS("Node device added", "nodeName", val.Name, "deviceVendor", devhandsk, "nodeInfo", nodeInfo, "totalDevices", s.nodes[val.Name].Devices)
printedLog[val.Name] = true
}
klog.InfoS("Node device inventory changed",
"nodeName", val.Name,
"deviceVendor", devhandsk,
"deviceCount", len(nodeInfo.Devices))

Check warning on line 250 in pkg/scheduler/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/scheduler.go#L247-L250

Added lines #L247 - L250 were not covered by tests
}
}
}
Expand Down Expand Up @@ -319,7 +336,10 @@
d.Device.Usedcores += udevice.Usedcores
if strings.Contains(udevice.UUID, "[") {
if strings.Compare(d.Device.Mode, "hami-core") == 0 {
klog.Errorf("found a mig task running on a hami-core GPU\n")
klog.ErrorS(fmt.Errorf("invalid configuration"), "MIG task assigned to non-MIG GPU",
"deviceID", d.Device.ID,
"deviceMode", d.Device.Mode,
"taskUUID", udevice.UUID)

Check warning on line 342 in pkg/scheduler/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/scheduler.go#L339-L342

Added lines #L339 - L342 were not covered by tests
d.Device.Health = false
continue
}
Expand All @@ -328,21 +348,28 @@
util.PlatternMIG(&d.Device.MigUsage, d.Device.MigTemplate, tmpIdx)
}
d.Device.MigUsage.UsageList[Instance].InUse = true
klog.V(5).Infoln("add mig usage", d.Device.MigUsage, "template=", d.Device.MigTemplate, "uuid=", d.Device.ID)
klog.V(4).InfoS("MIG device allocated",
"deviceID", d.Device.ID,
"instanceID", Instance,
"template", d.Device.MigTemplate)

Check warning on line 354 in pkg/scheduler/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/scheduler.go#L351-L354

Added lines #L351 - L354 were not covered by tests
}
}
}
}
}
}
klog.V(5).Infof("usage: pod %v assigned %v %v", p.Name, p.NodeID, p.Devices)
klog.V(4).InfoS("Pod resource assignment",
"podName", p.Name,
"nodeName", p.NodeID,
"deviceCount", len(p.Devices))
}
s.overviewstatus = overallnodeMap
for _, nodeID := range *nodes {
node, err := s.GetNode(nodeID)
if err != nil {
// The identified node does not have a gpu device, so the log here has no practical meaning,increase log priority.
klog.V(5).InfoS("node unregistered", "node", nodeID, "error", err)
klog.V(3).InfoS("Node unregistered or no GPU devices found",
"nodeName", nodeID,
"error", err)
failedNodes[nodeID] = "node unregistered"
continue
}
Expand Down Expand Up @@ -396,7 +423,7 @@
klog.ErrorS(err, "Failed to get pod", "pod", args.PodName, "namespace", args.PodNamespace)
return &extenderv1.ExtenderBindingResult{Error: err.Error()}, err
}
klog.InfoS("Trying to get the target node for pod", "pod", args.PodName, "namespace", args.PodNamespace, "node", args.Node)
klog.V(3).InfoS("Retrieving target node for binding", "pod", args.PodName, "namespace", args.PodNamespace, "node", args.Node)

Check warning on line 426 in pkg/scheduler/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/scheduler.go#L426

Added line #L426 was not covered by tests
node, err := s.kubeClient.CoreV1().Nodes().Get(context.Background(), args.Node, metav1.GetOptions{})
if err != nil {
klog.ErrorS(err, "Failed to get node", "node", args.Node)
Expand All @@ -413,14 +440,14 @@
for _, val := range device.GetDevices() {
err = val.LockNode(node, current)
if err != nil {
klog.ErrorS(err, "Failed to lock node", "node", args.Node, "device", val)
klog.ErrorS(err, "Failed to lock node", "node", args.Node, "error", err)

Check warning on line 443 in pkg/scheduler/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/scheduler.go#L443

Added line #L443 was not covered by tests
goto ReleaseNodeLocks
}
}

err = util.PatchPodAnnotations(current, tmppatch)
if err != nil {
klog.ErrorS(err, "Failed to patch pod annotations", "pod", klog.KObj(current))
klog.ErrorS(err, "Failed to patch pod annotations", "pod", klog.KObj(current), "annotations", tmppatch)

Check warning on line 450 in pkg/scheduler/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/scheduler.go#L450

Added line #L450 was not covered by tests
return &extenderv1.ExtenderBindingResult{Error: err.Error()}, err
}

Expand Down Expand Up @@ -453,8 +480,10 @@
}
}
if total == 0 {
klog.V(1).InfoS("Pod does not request any resources",
"pod", args.Pod.Name)
klog.InfoS("Pod does not request any resources",
"podName", args.Pod.Name,
"podNamespace", args.Pod.Namespace,
"podUID", args.Pod.UID)

Check warning on line 486 in pkg/scheduler/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/scheduler.go#L483-L486

Added lines #L483 - L486 were not covered by tests
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, "", fmt.Errorf("does not request any resource"))
return &extenderv1.ExtenderFilterResult{
NodeNames: args.NodeNames,
Expand All @@ -470,8 +499,9 @@
return nil, err
}
if len(failedNodes) != 0 {
klog.V(5).InfoS("Nodes failed during usage retrieval",
"nodes", failedNodes)
klog.InfoS("Failed to retrieve usage for some nodes",
"failedNodeCount", len(failedNodes),
"failedNodes", failedNodes)

Check warning on line 504 in pkg/scheduler/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/scheduler.go#L502-L504

Added lines #L502 - L504 were not covered by tests
}
nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod, failedNodes)
if err != nil {
Expand All @@ -480,14 +510,18 @@
return nil, err
}
if len((*nodeScores).NodeList) == 0 {
klog.V(4).InfoS("No available nodes meet the required scores",
"pod", args.Pod.Name)
klog.InfoS("No available nodes meet the required scores",
"podName", args.Pod.Name,
"podNamespace", args.Pod.Namespace,
"requestedResources", k8sutil.Resourcereqs(args.Pod),
"totalNodesChecked", len(*args.NodeNames),
"failedNodesCount", len(failedNodes))

Check warning on line 518 in pkg/scheduler/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/scheduler.go#L513-L518

Added lines #L513 - L518 were not covered by tests
s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, "", fmt.Errorf("no available node, %d nodes do not meet", len(*args.NodeNames)))
return &extenderv1.ExtenderFilterResult{
FailedNodes: failedNodes,
}, nil
}
klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList))
klog.InfoS("Calculated node scores", "pod", args.Pod.Name, "nodeScoresLen", len(nodeScores.NodeList))
sort.Sort(nodeScores)
m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
klog.InfoS("Scheduling pod to node",
Expand Down
18 changes: 14 additions & 4 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,8 +670,8 @@ func Test_RegisterFromNodeAnnotations(t *testing.T) {
t.Errorf("missing annotation: hami.io/node-handshake-dcu")
return false
}
_, errHami := time.Parse(time.DateTime, strings.TrimPrefix(handshakeTimeStr, "Requesting_"))
_, errDcu := time.Parse(time.DateTime, strings.TrimPrefix(dcuTimeStr, "Requesting_"))
_, errHami := time.Parse(time.DateTime, trimHandshakePrefix(handshakeTimeStr))
_, errDcu := time.Parse(time.DateTime, trimHandshakePrefix(dcuTimeStr))
if errHami != nil {
t.Errorf("invalid time format in annotation 'hami.io/node-handshake': %v", errHami)
return false
Expand Down Expand Up @@ -786,8 +786,8 @@ func Test_RegisterFromNodeAnnotations_NIL(t *testing.T) {
}

// Verify time format in annotations if they exist
_, errHami := time.Parse(time.DateTime, strings.TrimPrefix(handshakeTimeStr, "Requesting_"))
_, errDcu := time.Parse(time.DateTime, strings.TrimPrefix(dcuTimeStr, "Requesting_"))
_, errHami := time.Parse(time.DateTime, trimHandshakePrefix(handshakeTimeStr))
_, errDcu := time.Parse(time.DateTime, trimHandshakePrefix(dcuTimeStr))

if errHami != nil {
t.Errorf("invalid time format in annotation 'hami.io/node-handshake': %v", errHami)
Expand Down Expand Up @@ -837,3 +837,13 @@ func Test_RegisterFromNodeAnnotations_NIL(t *testing.T) {
})
}
}

func trimHandshakePrefix(s string) string {
prefixes := []string{"Requesting_", "Deleted_"}
for _, p := range prefixes {
if strings.HasPrefix(s, p) {
return strings.TrimPrefix(s, p)
}
}
return s
}
Loading