Skip to content

Commit 7f3ffe2

Browse files
authored
[ISSUE #989] consumer and producer client add unit support (WithUnitName)
[ISSUE #989] consumer and producer client add unit support (WithUnitName)
1 parent 7ae788d commit 7f3ffe2

File tree

6 files changed

+213
-9
lines changed

6 files changed

+213
-9
lines changed

consumer/option.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ limitations under the License.
1818
package consumer
1919

2020
import (
21-
"github.com/apache/rocketmq-client-go/v2/hooks"
21+
"strings"
2222
"time"
2323

24+
"github.com/apache/rocketmq-client-go/v2/hooks"
2425
"github.com/apache/rocketmq-client-go/v2/internal"
2526
"github.com/apache/rocketmq-client-go/v2/primitive"
2627
)
@@ -327,7 +328,21 @@ func WithNameServer(nameServers primitive.NamesrvAddr) Option {
327328
// WithNameServerDomain set NameServer domain
328329
func WithNameServerDomain(nameServerUrl string) Option {
329330
return func(opts *consumerOptions) {
330-
opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
331+
h := primitive.NewHttpResolver("DEFAULT", nameServerUrl)
332+
if opts.UnitName != "" {
333+
h.DomainWithUnit(opts.UnitName)
334+
}
335+
opts.Resolver = h
336+
}
337+
}
338+
339+
// WithUnitName set the name of specified unit
340+
func WithUnitName(unitName string) Option {
341+
return func(opts *consumerOptions) {
342+
opts.UnitName = strings.TrimSpace(unitName)
343+
if ns, ok := opts.Resolver.(*primitive.HttpResolver); ok {
344+
ns.DomainWithUnit(opts.UnitName)
345+
}
331346
}
332347
}
333348

consumer/option_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package consumer
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
"strings"
7+
"testing"
8+
)
9+
10+
func getFieldString(obj interface{}, field string) string {
11+
v := reflect.Indirect(reflect.ValueOf(obj))
12+
return v.FieldByNameFunc(func(n string) bool {
13+
return n == field
14+
}).String()
15+
}
16+
17+
func TestWithUnitName(t *testing.T) {
18+
opt := defaultPullConsumerOptions()
19+
unitName := "unsh"
20+
WithUnitName(unitName)(&opt)
21+
if opt.UnitName != unitName {
22+
t.Errorf("consumer option WithUnitName. want:%s, got=%s", unitName, opt.UnitName)
23+
}
24+
}
25+
26+
func TestWithNameServerDomain(t *testing.T) {
27+
opt := defaultPullConsumerOptions()
28+
nameServerAddr := "http://127.0.0.1:8080/nameserver/addr"
29+
WithNameServerDomain(nameServerAddr)(&opt)
30+
domainStr := getFieldString(opt.Resolver, "domain")
31+
if domainStr != nameServerAddr {
32+
t.Errorf("consumer option WithUnitName. want:%s, got=%s", nameServerAddr, domainStr)
33+
}
34+
}
35+
36+
func TestWithNameServerDomainAndUnitName(t *testing.T) {
37+
nameServerAddr := "http://127.0.0.1:8080/nameserver/addr"
38+
unitName := "unsh"
39+
suffix := fmt.Sprintf("-%s?nofix=1", unitName)
40+
41+
// test with two different orders
42+
t.Run("WithNameServerDomain & WithUnitName", func(t *testing.T) {
43+
opt := defaultPullConsumerOptions()
44+
WithNameServerDomain(nameServerAddr)(&opt)
45+
WithUnitName(unitName)(&opt)
46+
47+
domainStr := getFieldString(opt.Resolver, "domain")
48+
if !strings.Contains(domainStr, nameServerAddr) || !strings.Contains(domainStr, suffix) {
49+
t.Errorf("consumer option should contains %s and %s", nameServerAddr, suffix)
50+
}
51+
})
52+
53+
t.Run("WithUnitName & WithNameServerDomain", func(t *testing.T) {
54+
opt := defaultPullConsumerOptions()
55+
WithNameServerDomain(nameServerAddr)(&opt)
56+
WithUnitName(unitName)(&opt)
57+
58+
domainStr := getFieldString(opt.Resolver, "domain")
59+
if !strings.Contains(domainStr, nameServerAddr) || !strings.Contains(domainStr, suffix) {
60+
t.Errorf("consumer option should contains %s and %s", nameServerAddr, suffix)
61+
}
62+
})
63+
}

primitive/nsresolver.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ The ASF licenses this file to You under the Apache License, Version 2.0
66
(the "License"); you may not use this file except in compliance with
77
the License. You may obtain a copy of the License at
88
9-
http://www.apache.org/licenses/LICENSE-2.0
9+
http://www.apache.org/licenses/LICENSE-2.0
1010
1111
Unless required by applicable law or agreed to in writing, software
1212
distributed under the License is distributed on an "AS IS" BASIS,
@@ -111,6 +111,16 @@ func NewHttpResolver(instance string, domain ...string) *HttpResolver {
111111
return h
112112
}
113113

114+
func (h *HttpResolver) DomainWithUnit(unitName string) {
115+
if unitName == "" {
116+
return
117+
}
118+
if strings.Contains(h.domain, "?nofix=1") {
119+
return
120+
}
121+
h.domain = fmt.Sprintf("%s-%s?nofix=1", h.domain, unitName)
122+
}
123+
114124
func (h *HttpResolver) Resolve() []string {
115125
addrs := h.get()
116126
if len(addrs) > 0 {
@@ -152,14 +162,14 @@ func (h *HttpResolver) get() []string {
152162
return nil
153163
}
154164

155-
bodyStr := string(body)
165+
bodyStr := strings.TrimSpace(string(body))
156166
if bodyStr == "" {
157167
return nil
158168
}
159169

160-
h.saveSnapshot(body)
170+
_ = h.saveSnapshot([]byte(bodyStr))
161171

162-
return strings.Split(string(body), ";")
172+
return strings.Split(bodyStr, ";")
163173
}
164174

165175
func (h *HttpResolver) saveSnapshot(body []byte) error {

primitive/nsresolver_test.go

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ The ASF licenses this file to You under the Apache License, Version 2.0
66
(the "License"); you may not use this file except in compliance with
77
the License. You may obtain a copy of the License at
88
9-
http://www.apache.org/licenses/LICENSE-2.0
9+
http://www.apache.org/licenses/LICENSE-2.0
1010
1111
Unless required by applicable law or agreed to in writing, software
1212
distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,15 @@ package primitive
1818

1919
import (
2020
"fmt"
21-
"github.com/apache/rocketmq-client-go/v2/rlog"
2221
"io/ioutil"
2322
"net"
2423
"net/http"
2524
"os"
2625
"strings"
2726
"testing"
2827

28+
"github.com/apache/rocketmq-client-go/v2/rlog"
29+
2930
. "github.com/smartystreets/goconvey/convey"
3031
)
3132

@@ -81,6 +82,43 @@ func TestHttpResolverWithGet(t *testing.T) {
8182
})
8283
}
8384

85+
func TestHttpResolverWithGetUnitName(t *testing.T) {
86+
Convey("Test UpdateNameServerAddress Save Local Snapshot", t, func() {
87+
srvs := []string{
88+
"192.168.100.1",
89+
"192.168.100.2",
90+
"192.168.100.3",
91+
"192.168.100.4",
92+
"192.168.100.5",
93+
}
94+
http.HandleFunc("/nameserver/addrs3-unsh", func(w http.ResponseWriter, r *http.Request) {
95+
if r.URL.Query().Get("nofix") == "1" {
96+
fmt.Fprintf(w, strings.Join(srvs, ";"))
97+
}
98+
fmt.Fprintf(w, "")
99+
})
100+
server := &http.Server{Addr: ":0", Handler: nil}
101+
listener, _ := net.Listen("tcp", ":0")
102+
go server.Serve(listener)
103+
104+
port := listener.Addr().(*net.TCPAddr).Port
105+
nameServerDommain := fmt.Sprintf("http://127.0.0.1:%d/nameserver/addrs3", port)
106+
rlog.Info("Temporary Nameserver", map[string]interface{}{
107+
"domain": nameServerDommain,
108+
})
109+
110+
resolver := NewHttpResolver("DEFAULT", nameServerDommain)
111+
resolver.DomainWithUnit("unsh")
112+
resolver.Resolve()
113+
114+
// check snapshot saved
115+
filePath := resolver.getSnapshotFilePath("DEFAULT")
116+
body := strings.Join(srvs, ";")
117+
bs, _ := ioutil.ReadFile(filePath)
118+
So(string(bs), ShouldEqual, body)
119+
})
120+
}
121+
84122
func TestHttpResolverWithSnapshotFile(t *testing.T) {
85123
Convey("Test UpdateNameServerAddress Use Local Snapshot", t, func() {
86124
srvs := []string{

producer/option.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ limitations under the License.
1818
package producer
1919

2020
import (
21+
"strings"
2122
"time"
2223

2324
"github.com/apache/rocketmq-client-go/v2/internal"
@@ -144,7 +145,21 @@ func WithNameServer(nameServers primitive.NamesrvAddr) Option {
144145
// WithNameServerDomain set NameServer domain
145146
func WithNameServerDomain(nameServerUrl string) Option {
146147
return func(opts *producerOptions) {
147-
opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
148+
h := primitive.NewHttpResolver("DEFAULT", nameServerUrl)
149+
if opts.UnitName != "" {
150+
h.DomainWithUnit(opts.UnitName)
151+
}
152+
opts.Resolver = h
153+
}
154+
}
155+
156+
// WithUnitName set the name of specified unit
157+
func WithUnitName(unitName string) Option {
158+
return func(opts *producerOptions) {
159+
opts.UnitName = strings.TrimSpace(unitName)
160+
if ns, ok := opts.Resolver.(*primitive.HttpResolver); ok {
161+
ns.DomainWithUnit(opts.UnitName)
162+
}
148163
}
149164
}
150165

producer/option_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package producer
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
"strings"
7+
"testing"
8+
)
9+
10+
func getFieldString(obj interface{}, field string) string {
11+
v := reflect.Indirect(reflect.ValueOf(obj))
12+
return v.FieldByNameFunc(func(n string) bool {
13+
return n == field
14+
}).String()
15+
}
16+
17+
func TestWithUnitName(t *testing.T) {
18+
opt := defaultProducerOptions()
19+
unitName := "unsh"
20+
WithUnitName(unitName)(&opt)
21+
if opt.UnitName != unitName {
22+
t.Errorf("consumer option WithUnitName. want:%s, got=%s", unitName, opt.UnitName)
23+
}
24+
}
25+
26+
func TestWithNameServerDomain(t *testing.T) {
27+
opt := defaultProducerOptions()
28+
nameServerAddr := "http://127.0.0.1:8080/nameserver/addr"
29+
WithNameServerDomain(nameServerAddr)(&opt)
30+
domainStr := getFieldString(opt.Resolver, "domain")
31+
if domainStr != nameServerAddr {
32+
t.Errorf("consumer option WithUnitName. want:%s, got=%s", nameServerAddr, domainStr)
33+
}
34+
}
35+
36+
func TestWithNameServerDomainAndUnitName(t *testing.T) {
37+
nameServerAddr := "http://127.0.0.1:8080/nameserver/addr"
38+
unitName := "unsh"
39+
suffix := fmt.Sprintf("-%s?nofix=1", unitName)
40+
41+
// test with two different orders
42+
t.Run("WithNameServerDomain & WithUnitName", func(t *testing.T) {
43+
opt := defaultProducerOptions()
44+
WithNameServerDomain(nameServerAddr)(&opt)
45+
WithUnitName(unitName)(&opt)
46+
47+
domainStr := getFieldString(opt.Resolver, "domain")
48+
if !strings.Contains(domainStr, nameServerAddr) || !strings.Contains(domainStr, suffix) {
49+
t.Errorf("consumer option should contains %s and %s", nameServerAddr, suffix)
50+
}
51+
})
52+
53+
t.Run("WithUnitName & WithNameServerDomain", func(t *testing.T) {
54+
opt := defaultProducerOptions()
55+
WithNameServerDomain(nameServerAddr)(&opt)
56+
WithUnitName(unitName)(&opt)
57+
58+
domainStr := getFieldString(opt.Resolver, "domain")
59+
if !strings.Contains(domainStr, nameServerAddr) || !strings.Contains(domainStr, suffix) {
60+
t.Errorf("consumer option should contains %s and %s", nameServerAddr, suffix)
61+
}
62+
})
63+
}

0 commit comments

Comments
 (0)