From 495f882cd092ce51f3e720f2a213358d615c91e6 Mon Sep 17 00:00:00 2001 From: chenquan Date: Sun, 3 Aug 2025 16:17:38 +0800 Subject: [PATCH 1/3] feat: readiness check --- internal/health/health.go | 5 ++++ zrpc/internal/rpcpubserver.go | 46 ++++++++++++++++++++++++++++-- zrpc/internal/rpcpubserver_test.go | 5 +++- zrpc/server.go | 3 +- 4 files changed, 54 insertions(+), 5 deletions(-) diff --git a/internal/health/health.go b/internal/health/health.go index afa299d63584..9942fc4a5ff2 100644 --- a/internal/health/health.go +++ b/internal/health/health.go @@ -43,6 +43,11 @@ func AddProbe(probe Probe) { defaultHealthManager.addProbe(probe) } +// IsReady return global comboHealthManager status. +func IsReady() bool { + return defaultHealthManager.IsReady() +} + // CreateHttpHandler create health http handler base on given probe. func CreateHttpHandler(healthResponse string) http.HandlerFunc { return func(w http.ResponseWriter, _ *http.Request) { diff --git a/zrpc/internal/rpcpubserver.go b/zrpc/internal/rpcpubserver.go index 70b481323d92..d2b4b0b1422d 100644 --- a/zrpc/internal/rpcpubserver.go +++ b/zrpc/internal/rpcpubserver.go @@ -1,11 +1,15 @@ package internal import ( + "context" + "errors" "os" "strings" + "time" "github.com/zeromicro/go-zero/core/discov" "github.com/zeromicro/go-zero/core/netx" + "github.com/zeromicro/go-zero/internal/health" ) const ( @@ -13,6 +17,8 @@ const ( envPodIp = "POD_IP" ) +var errNotReady = errors.New("service is not ready for a limited time") + // NewRpcPubServer returns a Server. func NewRpcPubServer(etcd discov.EtcdConf, listenOn string, opts ...ServerOption) (Server, error) { @@ -46,11 +52,45 @@ type keepAliveServer struct { } func (s keepAliveServer) Start(fn RegisterFn) error { - if err := s.registerEtcd(); err != nil { - return err + errCh := make(chan error) + stopCh := make(chan struct{}) + defer close(stopCh) + + go func() { + defer close(errCh) + select { + case errCh <- s.Server.Start(fn): + case <-stopCh: + // prevent goroutine leak + } + }() + + // Wait for the service to start successfully, otherwise the registration service will fail. + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + ticker := time.NewTicker(time.Second) + +l: + for { + select { + case <-ticker.C: + if health.IsReady() { + err := s.registerEtcd() + if err != nil { + return err + } + // break for loop + break l + } + case <-ctx.Done(): + return errNotReady + case err := <-errCh: + return err + } } + ticker.Stop() - return s.Server.Start(fn) + return <-errCh } func figureOutListenOn(listenOn string) string { diff --git a/zrpc/internal/rpcpubserver_test.go b/zrpc/internal/rpcpubserver_test.go index cc36e4653357..1bda8f252d64 100644 --- a/zrpc/internal/rpcpubserver_test.go +++ b/zrpc/internal/rpcpubserver_test.go @@ -4,6 +4,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "github.com/zeromicro/go-zero/core/discov" "github.com/zeromicro/go-zero/core/netx" ) @@ -16,7 +18,8 @@ func TestNewRpcPubServer(t *testing.T) { }, "") assert.NoError(t, err) assert.NotPanics(t, func() { - s.Start(nil) + s.Start(func(server *grpc.Server) { + }) }) } diff --git a/zrpc/server.go b/zrpc/server.go index 813fc358d298..61a3f32c7495 100644 --- a/zrpc/server.go +++ b/zrpc/server.go @@ -3,6 +3,8 @@ package zrpc import ( "time" + "google.golang.org/grpc" + "github.com/zeromicro/go-zero/core/load" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stat" @@ -10,7 +12,6 @@ import ( "github.com/zeromicro/go-zero/zrpc/internal" "github.com/zeromicro/go-zero/zrpc/internal/auth" "github.com/zeromicro/go-zero/zrpc/internal/serverinterceptors" - "google.golang.org/grpc" ) // A RpcServer is a rpc server. From afcb4f3a524cc7af5db29fccb83f3ef002976f11 Mon Sep 17 00:00:00 2001 From: chenquan Date: Sat, 9 Aug 2025 00:33:34 +0800 Subject: [PATCH 2/3] x --- core/discov/config.go | 1 + zrpc/internal/rpcpubserver.go | 8 +++++--- zrpc/internal/rpcserver.go | 5 +++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/discov/config.go b/core/discov/config.go index f6361e814c1e..f3dd58123ecc 100644 --- a/core/discov/config.go +++ b/core/discov/config.go @@ -20,6 +20,7 @@ type EtcdConf struct { CertKeyFile string `json:",optional=CertFile"` CACertFile string `json:",optional=CertFile"` InsecureSkipVerify bool `json:",optional"` + RegisterTimeout int `json:",default=60"` } // HasAccount returns if account provided. diff --git a/zrpc/internal/rpcpubserver.go b/zrpc/internal/rpcpubserver.go index d2b4b0b1422d..62d216a09cad 100644 --- a/zrpc/internal/rpcpubserver.go +++ b/zrpc/internal/rpcpubserver.go @@ -39,8 +39,9 @@ func NewRpcPubServer(etcd discov.EtcdConf, listenOn string, return pubClient.KeepAlive() } server := keepAliveServer{ - registerEtcd: registerEtcd, - Server: NewRpcServer(listenOn, opts...), + registerEtcd: registerEtcd, + Server: NewRpcServer(listenOn, opts...), + registerTimeout: time.Duration(etcd.RegisterTimeout) * time.Second, } return server, nil @@ -49,6 +50,7 @@ func NewRpcPubServer(etcd discov.EtcdConf, listenOn string, type keepAliveServer struct { registerEtcd func() error Server + registerTimeout time.Duration } func (s keepAliveServer) Start(fn RegisterFn) error { @@ -66,7 +68,7 @@ func (s keepAliveServer) Start(fn RegisterFn) error { }() // Wait for the service to start successfully, otherwise the registration service will fail. - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), s.registerTimeout) defer cancel() ticker := time.NewTicker(time.Second) diff --git a/zrpc/internal/rpcserver.go b/zrpc/internal/rpcserver.go index c1302f03ae52..926994948da5 100644 --- a/zrpc/internal/rpcserver.go +++ b/zrpc/internal/rpcserver.go @@ -4,10 +4,11 @@ import ( "fmt" "net" - "github.com/zeromicro/go-zero/core/proc" - "github.com/zeromicro/go-zero/internal/health" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/zeromicro/go-zero/core/proc" + "github.com/zeromicro/go-zero/internal/health" ) const probeNamePrefix = "zrpc" From 5ddb6295dd9907fd860a9a39620a915fc3398ccb Mon Sep 17 00:00:00 2001 From: chenquan Date: Sat, 9 Aug 2025 00:34:49 +0800 Subject: [PATCH 3/3] x --- zrpc/internal/rpcpubserver.go | 1 + 1 file changed, 1 insertion(+) diff --git a/zrpc/internal/rpcpubserver.go b/zrpc/internal/rpcpubserver.go index 62d216a09cad..e45aa352f8d0 100644 --- a/zrpc/internal/rpcpubserver.go +++ b/zrpc/internal/rpcpubserver.go @@ -71,6 +71,7 @@ func (s keepAliveServer) Start(fn RegisterFn) error { ctx, cancel := context.WithTimeout(context.Background(), s.registerTimeout) defer cancel() ticker := time.NewTicker(time.Second) + defer ticker.Stop() l: for {