Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/discov/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type EtcdConf struct {
CertKeyFile string `json:",optional=CertFile"`
CACertFile string `json:",optional=CertFile"`
InsecureSkipVerify bool `json:",optional"`
RegisterTimeout int `json:",default=60"`
}

// HasAccount returns if account provided.
Expand Down
5 changes: 5 additions & 0 deletions internal/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func AddProbe(probe Probe) {
defaultHealthManager.addProbe(probe)
}

// IsReady return global comboHealthManager status.
func IsReady() bool {
return defaultHealthManager.IsReady()
}

// CreateHttpHandler create health http handler base on given probe.
func CreateHttpHandler(healthResponse string) http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
Expand Down
53 changes: 48 additions & 5 deletions zrpc/internal/rpcpubserver.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package internal

import (
"context"
"errors"
"os"
"strings"
"time"

"github.com/zeromicro/go-zero/core/discov"
"github.com/zeromicro/go-zero/core/netx"
"github.com/zeromicro/go-zero/internal/health"
)

const (
allEths = "0.0.0.0"
envPodIp = "POD_IP"
)

var errNotReady = errors.New("service is not ready for a limited time")

// NewRpcPubServer returns a Server.
func NewRpcPubServer(etcd discov.EtcdConf, listenOn string,
opts ...ServerOption) (Server, error) {
Expand All @@ -33,8 +39,9 @@ func NewRpcPubServer(etcd discov.EtcdConf, listenOn string,
return pubClient.KeepAlive()
}
server := keepAliveServer{
registerEtcd: registerEtcd,
Server: NewRpcServer(listenOn, opts...),
registerEtcd: registerEtcd,
Server: NewRpcServer(listenOn, opts...),
registerTimeout: time.Duration(etcd.RegisterTimeout) * time.Second,
}

return server, nil
Expand All @@ -43,14 +50,50 @@ func NewRpcPubServer(etcd discov.EtcdConf, listenOn string,
type keepAliveServer struct {
registerEtcd func() error
Server
registerTimeout time.Duration
}

func (s keepAliveServer) Start(fn RegisterFn) error {
if err := s.registerEtcd(); err != nil {
return err
errCh := make(chan error)
stopCh := make(chan struct{})
defer close(stopCh)

go func() {
defer close(errCh)
select {
case errCh <- s.Server.Start(fn):
case <-stopCh:
// prevent goroutine leak
}
}()

// Wait for the service to start successfully, otherwise the registration service will fail.
ctx, cancel := context.WithTimeout(context.Background(), s.registerTimeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

l:
for {
select {
case <-ticker.C:
if health.IsReady() {
err := s.registerEtcd()
if err != nil {
return err
}
// break for loop
break l
}
case <-ctx.Done():
return errNotReady
case err := <-errCh:
return err
}
}
ticker.Stop()

return s.Server.Start(fn)
return <-errCh
}

func figureOutListenOn(listenOn string) string {
Expand Down
5 changes: 4 additions & 1 deletion zrpc/internal/rpcpubserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"

"github.com/zeromicro/go-zero/core/discov"
"github.com/zeromicro/go-zero/core/netx"
)
Expand All @@ -16,7 +18,8 @@ func TestNewRpcPubServer(t *testing.T) {
}, "")
assert.NoError(t, err)
assert.NotPanics(t, func() {
s.Start(nil)
s.Start(func(server *grpc.Server) {
})
})
}

Expand Down
5 changes: 3 additions & 2 deletions zrpc/internal/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"fmt"
"net"

"github.com/zeromicro/go-zero/core/proc"
"github.com/zeromicro/go-zero/internal/health"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/zeromicro/go-zero/core/proc"
"github.com/zeromicro/go-zero/internal/health"
)

const probeNamePrefix = "zrpc"
Expand Down
3 changes: 2 additions & 1 deletion zrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package zrpc
import (
"time"

"google.golang.org/grpc"

"github.com/zeromicro/go-zero/core/load"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/zrpc/internal"
"github.com/zeromicro/go-zero/zrpc/internal/auth"
"github.com/zeromicro/go-zero/zrpc/internal/serverinterceptors"
"google.golang.org/grpc"
)

// A RpcServer is a rpc server.
Expand Down
Loading