1
1
package balancer
2
2
3
3
import (
4
- "context"
5
4
"fmt"
6
- "net/url"
7
5
"os"
8
- "path/filepath"
9
6
"testing"
10
7
"time"
11
8
12
9
"github.com/nats-io/jsm.go"
13
10
"github.com/nats-io/jsm.go/api"
14
- "github.com/nats-io/nats-server/v2/server"
11
+ testapi "github.com/nats-io/jsm.go/test/testing_client/api"
12
+ "github.com/nats-io/jsm.go/test/testing_client/srvtest"
15
13
"github.com/nats-io/nats.go"
16
14
)
17
15
18
16
func TestBalancer (t * testing.T ) {
19
- withJSCluster (t , func (t * testing.T , servers [] * server. Server , nc * nats. Conn , mgr * jsm. Manager ) error {
17
+ withTesterJetStreamCluster (t , func (t * testing.T , mgr * jsm. Manager , _ * srvtest. Client , servers [] * testapi. ManagedServer ) {
20
18
var err error
19
+
20
+ nc := mgr .NatsConn ()
21
+ firstServer := servers [0 ].Name
22
+
21
23
waitTime := 100 * time .Millisecond
22
24
streams := []* jsm.Stream {}
23
25
for i := 1 ; i <= 3 ; i ++ {
@@ -28,8 +30,8 @@ func TestBalancer(t *testing.T) {
28
30
t .Fatalf ("could not create stream %s" , err )
29
31
}
30
32
info , _ := s .ClusterInfo ()
31
- if info .Leader != "s1" {
32
- placement := api.Placement {Preferred : "s1" }
33
+ if info .Leader != firstServer {
34
+ placement := api.Placement {Preferred : firstServer }
33
35
err = s .LeaderStepDown (& placement )
34
36
if err != nil {
35
37
t .Fatalf ("could not move stream %s" , err )
@@ -59,29 +61,29 @@ func TestBalancer(t *testing.T) {
59
61
60
62
b , err := New (nc , api .NewDefaultLogger (api .DebugLevel ))
61
63
if err != nil {
62
- return err
64
+ t . Fatalf ( "create failed: %v" , err . Error ())
63
65
}
64
66
65
67
count , err := b .BalanceStreams (streams )
66
68
if err != nil {
67
- return err
69
+ t . Fatalf ( "Balance failed: %v" , err . Error ())
68
70
}
69
71
70
72
if count == 0 {
71
- return err
73
+ t . Fatal ( "Balanceed 0 streams" )
72
74
}
73
75
74
76
consumers := []* jsm.Consumer {}
75
77
for i := 1 ; i <= 3 ; i ++ {
76
78
consumerName := fmt .Sprintf ("testc%d" , i )
77
79
c , err := mgr .NewConsumer ("tests1" , jsm .DurableName (consumerName ), jsm .ConsumerOverrideReplicas (3 ))
78
80
if err != nil {
79
- return err
81
+ t . Fatalf ( "could not create consumer %s" , err )
80
82
}
81
83
82
84
info , _ := c .ClusterInfo ()
83
- if info .Leader != "s1" {
84
- placement := api.Placement {Preferred : "s1" }
85
+ if info .Leader != firstServer {
86
+ placement := api.Placement {Preferred : firstServer }
85
87
err = c .LeaderStepDown (& placement )
86
88
if err != nil {
87
89
t .Fatalf ("could not move consumer %s" , err )
@@ -111,102 +113,34 @@ func TestBalancer(t *testing.T) {
111
113
112
114
count , err = b .BalanceConsumers (consumers )
113
115
if err != nil {
114
- return err
116
+ t . Fatalf ( "Balance failed: %v" , err )
115
117
}
116
118
117
119
if count == 0 {
118
- return err
120
+ t . Fatal ( "Balanced 0 consumers" )
119
121
}
120
-
121
- return nil
122
122
})
123
123
}
124
124
125
- func withJSCluster (t * testing.T , cb func (* testing.T , [] * server. Server , * nats. Conn , * jsm. Manager ) error ) {
125
+ func withTesterJetStreamCluster (t * testing.T , fn func (* testing.T , * jsm. Manager , * srvtest. Client , [] * testapi. ManagedServer ) ) {
126
126
t .Helper ()
127
127
128
- d , err := os .MkdirTemp ( "" , "jstest " )
129
- if err != nil {
130
- t . Fatalf ( "temp dir could not be made: %s" , err )
128
+ url := os .Getenv ( "TESTER_URL " )
129
+ if url == "" {
130
+ url = "nats://localhost:4222"
131
131
}
132
- defer os .RemoveAll (d )
133
-
134
- var (
135
- servers []* server.Server
136
- )
137
-
138
- for i := 1 ; i <= 3 ; i ++ {
139
- opts := & server.Options {
140
- JetStream : true ,
141
- StoreDir : filepath .Join (d , fmt .Sprintf ("s%d" , i )),
142
- Port : - 1 ,
143
- Host : "localhost" ,
144
- ServerName : fmt .Sprintf ("s%d" , i ),
145
- LogFile : "/dev/null" ,
146
- Cluster : server.ClusterOpts {
147
- Name : "TEST" ,
148
- Port : 12000 + i ,
149
- },
150
- Routes : []* url.URL {
151
- {Host : "localhost:12001" },
152
- {Host : "localhost:12002" },
153
- {Host : "localhost:12003" },
154
- },
155
- }
156
132
157
- s , err := server .NewServer (opts )
133
+ client := srvtest .New (t , url )
134
+ t .Cleanup (func () {
135
+ client .Reset (t )
136
+ })
137
+
138
+ client .WithJetStreamCluster (t , 3 , func (t * testing.T , nc * nats.Conn , servers []* testapi.ManagedServer ) {
139
+ mgr , err := jsm .New (nc )
158
140
if err != nil {
159
- t .Fatalf ("server %d start failed: %v" , i , err )
160
- }
161
- s .ConfigureLogger ()
162
- go s .Start ()
163
- if ! s .ReadyForConnections (10 * time .Second ) {
164
- t .Errorf ("nats server %d did not start" , i )
141
+ t .Fatal (err )
165
142
}
166
- defer func () {
167
- s .Shutdown ()
168
- }()
169
-
170
- servers = append (servers , s )
171
- }
172
-
173
- if len (servers ) != 3 {
174
- t .Fatalf ("servers did not start" )
175
- }
176
-
177
- nc , err := nats .Connect (servers [0 ].ClientURL (), nats .UseOldRequestStyle ())
178
- if err != nil {
179
- t .Fatalf ("client start failed: %s" , err )
180
- }
181
- defer nc .Close ()
182
143
183
- mgr , err := jsm .New (nc , jsm .WithTimeout (5 * time .Second ))
184
- if err != nil {
185
- t .Fatalf ("manager creation failed: %s" , err )
186
- }
187
-
188
- ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
189
- defer cancel ()
190
- ticker := time .NewTicker (250 * time .Millisecond )
191
- defer ticker .Stop ()
192
-
193
- for {
194
- select {
195
- case <- ticker .C :
196
- _ , err := mgr .JetStreamAccountInfo ()
197
- if err != nil {
198
- continue
199
- }
200
-
201
- err = cb (t , servers , nc , mgr )
202
-
203
- if err != nil {
204
- t .Fatal (err )
205
- }
206
-
207
- return
208
- case <- ctx .Done ():
209
- t .Fatalf ("jetstream did not become available" )
210
- }
211
- }
144
+ fn (t , mgr , client , servers )
145
+ })
212
146
}
0 commit comments