From d4569b906e861049286f129ab464164c09b84e09 Mon Sep 17 00:00:00 2001 From: wangwq10 Date: Wed, 26 Mar 2025 17:13:21 +0800 Subject: [PATCH 1/2] fix: skip Alive =false backend before add job. --- pkg/ccr/base/backend.go | 1 + pkg/ccr/meta.go | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/ccr/base/backend.go b/pkg/ccr/base/backend.go index fcfef559..7a4c2ce3 100644 --- a/pkg/ccr/base/backend.go +++ b/pkg/ccr/base/backend.go @@ -24,6 +24,7 @@ type Backend struct { BePort uint16 HttpPort uint16 BrpcPort uint16 + Alive bool } // Backend Stringer diff --git a/pkg/ccr/meta.go b/pkg/ccr/meta.go index 07ee6f7b..1bce86bd 100644 --- a/pkg/ccr/meta.go +++ b/pkg/ccr/meta.go @@ -532,7 +532,11 @@ func (m *Meta) UpdateBackends() error { return xerror.Wrapf(err, xerror.Normal, query) } backend.BrpcPort = uint16(port) - + alive, err := rowParser.GetBool("Alive") + if err != nil { + return xerror.Wrapf(err, xerror.Normal, query) + } + backend.Alive = alive log.Debugf("backend: %v", &backend) backends = append(backends, &backend) } @@ -1212,6 +1216,10 @@ func (m *Meta) checkBEsBinlogFeature() error { var disabledBinlogBEs []string for _, backend := range backends { + if !backend.Alive { + log.Warnf("backend %v:%v is not alive, skip check", backend.Host, backend.HttpPort) + continue + } url := fmt.Sprintf("http://%v:%v/api/show_config?conf_item=enable_feature_binlog", backend.Host, backend.HttpPort) resp, err := http.Get(url) From aa160e55a049ecf32f85981e4b49794d37bc72b2 Mon Sep 17 00:00:00 2001 From: wangwq10 <125749333@qq.com> Date: Fri, 23 May 2025 15:52:53 +0800 Subject: [PATCH 2/2] Update job_manager.go fix: after pause update successfully, status/state should align to job.State and return "paused" as excepted. --- pkg/ccr/job_manager.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/ccr/job_manager.go b/pkg/ccr/job_manager.go index 174e4ee4..31abd4f2 100644 --- a/pkg/ccr/job_manager.go +++ b/pkg/ccr/job_manager.go @@ -217,6 +217,8 @@ func (jm *JobManager) GetJobStatus(jobName string) (*JobStatus, error) { defer jm.lock.RUnlock() if job, ok := jm.jobs[jobName]; ok { + log.Debugf("job rawStatue: %s, job state:%s", job.rawStatus.state, job.State) + atomic.StoreInt32(&job.rawStatus.state, int32(job.State)) return job.Status(), nil } else { return nil, xerror.Errorf(xerror.Normal, "job not exist: %s", jobName)