Skip to content

Commit b8e62c7

Browse files
author
Marvin Zhang
committed
fix(node/master): add failure counter and grace period to node health checks; increase monitor interval
1 parent ef70312 commit b8e62c7

File tree

1 file changed

+39
-13
lines changed

1 file changed

+39
-13
lines changed

core/node/service/master_service.go

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ type MasterService struct {
4242
// settings
4343
monitorInterval time.Duration
4444

45+
// node health tracking - grace period before marking offline
46+
nodeFailureCount map[primitive.ObjectID]int
47+
nodeFailureMux sync.RWMutex
48+
4549
// internals
4650
interfaces.Logger
4751
}
@@ -200,21 +204,27 @@ func (svc *MasterService) monitor() (err error) {
200204
go func(n *models.Node) {
201205
defer wg.Done()
202206

203-
// subscribe
204-
ok := svc.subscribeNode(n)
205-
if !ok {
206-
go svc.setWorkerNodeOffline(n)
207-
return
208-
}
209-
210-
// ping client
211-
ok = svc.pingNodeClient(n)
212-
if !ok {
213-
go svc.setWorkerNodeOffline(n)
207+
// Check node health: subscribe and ping
208+
subscribeOk := svc.subscribeNode(n)
209+
pingOk := svc.pingNodeClient(n)
210+
healthCheckPassed := subscribeOk && pingOk
211+
212+
if !healthCheckPassed {
213+
// Health check failed - increment failure counter
214+
failures := svc.incrementNodeFailureCount(n.Id)
215+
svc.Debugf("worker node[%s] health check failed (failures: %d)", n.Key, failures)
216+
217+
// Only mark offline after consecutive failures (grace period)
218+
// This prevents flapping during brief reconnection windows
219+
if failures >= 2 {
220+
svc.Infof("worker node[%s] failed %d consecutive health checks, marking offline", n.Key, failures)
221+
go svc.setWorkerNodeOffline(n)
222+
}
214223
return
215224
}
216225

217-
// if both subscribe and ping succeed, ensure node is marked as online
226+
// Health check passed - reset failure counter and mark online
227+
svc.resetNodeFailureCount(n.Id)
218228
go svc.setWorkerNodeOnline(n)
219229

220230
// handle reconnection - reconcile disconnected tasks
@@ -329,13 +339,29 @@ func (svc *MasterService) monitorGrpcClientHealth() {
329339
}
330340
}
331341

342+
// incrementNodeFailureCount increments the failure counter for a node and returns the new count
343+
func (svc *MasterService) incrementNodeFailureCount(nodeId primitive.ObjectID) int {
344+
svc.nodeFailureMux.Lock()
345+
defer svc.nodeFailureMux.Unlock()
346+
svc.nodeFailureCount[nodeId]++
347+
return svc.nodeFailureCount[nodeId]
348+
}
349+
350+
// resetNodeFailureCount resets the failure counter for a node to zero
351+
func (svc *MasterService) resetNodeFailureCount(nodeId primitive.ObjectID) {
352+
svc.nodeFailureMux.Lock()
353+
defer svc.nodeFailureMux.Unlock()
354+
delete(svc.nodeFailureCount, nodeId)
355+
}
356+
332357
func newMasterService() *MasterService {
333358
cfgSvc := config.GetNodeConfigService()
334359
server := server.GetGrpcServer()
335360

336361
return &MasterService{
337362
cfgSvc: cfgSvc,
338-
monitorInterval: 15 * time.Second,
363+
monitorInterval: 20 * time.Second, // Increased from 15s to give more reconnection time
364+
nodeFailureCount: make(map[primitive.ObjectID]int),
339365
server: server,
340366
taskSchedulerSvc: scheduler.GetTaskSchedulerService(),
341367
taskHandlerSvc: handler.GetTaskHandlerService(),

0 commit comments

Comments
 (0)