@@ -30,6 +30,8 @@ const (
3030)
3131
3232var totalMessages uint64
33+ var totalSubscribedChannels int64
34+ var totalConnects uint64
3335var clusterSlicesMu sync.Mutex
3436
3537type testResult struct {
@@ -48,41 +50,58 @@ type testResult struct {
4850 Addresses []string `json:"Addresses"`
4951}
5052
51- func subscriberRoutine (mode string , channels []string , printMessages bool , connectionReconnectInterval int , ctx context.Context , wg * sync.WaitGroup , client * redis.Client ) {
53+ func subscriberRoutine (clientName , mode string , channels []string , printMessages bool , connectionReconnectInterval int , ctx context.Context , wg * sync.WaitGroup , client * redis.Client ) {
5254 // Tell the caller we've stopped
5355 defer wg .Done ()
5456 var reconnectTicker * time.Ticker
5557 if connectionReconnectInterval > 0 {
56- reconnectTicker = time .NewTicker (time .Duration (connectionReconnectInterval ) * time .Second )
58+ reconnectTicker = time .NewTicker (time .Duration (connectionReconnectInterval ) * time .Millisecond )
5759 defer reconnectTicker .Stop ()
5860 } else {
5961 reconnectTicker = time .NewTicker (1 * time .Second )
6062 reconnectTicker .Stop ()
6163 }
6264
6365 var pubsub * redis.PubSub
66+ nChannels := len (channels )
6467
6568 // Helper function to handle subscription based on mode
6669 subscribe := func () {
6770 if pubsub != nil {
68- // Unsubscribe based on mode before re-subscribing
69- if mode == "ssubscribe" {
70- if err := pubsub .SUnsubscribe (ctx , channels ... ); err != nil {
71- fmt .Printf ("Error during SUnsubscribe: %v\n " , err )
71+ if nChannels > 1 {
72+ // Unsubscribe based on mode before re-subscribing
73+ if mode == "ssubscribe" {
74+ if err := pubsub .SUnsubscribe (ctx , channels [1 :]... ); err != nil {
75+ fmt .Printf ("Error during SUnsubscribe: %v\n " , err )
76+ }
77+ pubsub .Close ()
78+ atomic .AddInt64 (& totalSubscribedChannels , int64 (- len (channels [1 :])))
79+ pubsub = client .SSubscribe (ctx , channels [1 :]... )
80+ atomic .AddInt64 (& totalSubscribedChannels , int64 (len (channels [1 :])))
81+ } else {
82+ if err := pubsub .Unsubscribe (ctx , channels [1 :]... ); err != nil {
83+ fmt .Printf ("Error during Unsubscribe: %v\n " , err )
84+ pubsub .Close ()
85+ atomic .AddInt64 (& totalSubscribedChannels , int64 (- len (channels [1 :])))
86+ pubsub = client .Subscribe (ctx , channels [1 :]... )
87+ atomic .AddInt64 (& totalSubscribedChannels , int64 (len (channels [1 :])))
88+ }
7289 }
90+ atomic .AddUint64 (& totalConnects , 1 )
7391 } else {
74- if err := pubsub .Unsubscribe (ctx , channels ... ); err != nil {
75- fmt .Printf ("Error during Unsubscribe: %v\n " , err )
76- }
92+ log .Println (fmt .Sprintf ("Skipping (S)UNSUBSCRIBE given client %s had only one channel subscribed in this connection: %v." , clientName , channels ))
7793 }
78- pubsub .Close ()
79- }
80- switch mode {
81- case "ssubscribe" :
82- pubsub = client .SSubscribe (ctx , channels ... )
83- default :
84- pubsub = client .Subscribe (ctx , channels ... )
94+ } else {
95+ switch mode {
96+ case "ssubscribe" :
97+ pubsub = client .SSubscribe (ctx , channels ... )
98+ default :
99+ pubsub = client .Subscribe (ctx , channels ... )
100+ }
101+ atomic .AddInt64 (& totalSubscribedChannels , int64 (len (channels )))
102+ atomic .AddUint64 (& totalConnects , 1 )
85103 }
104+
86105 }
87106
88107 subscribe ()
@@ -142,6 +161,7 @@ func main() {
142161 json_out_file := flag .String ("json-out-file" , "" , "Name of json output file, if not set, will not print to json." )
143162 client_update_tick := flag .Int ("client-update-tick" , 1 , "client update tick." )
144163 test_time := flag .Int ("test-time" , 0 , "Number of seconds to run the test, after receiving the first message." )
164+ randSeed := flag .Int64 ("rand-seed" , 12345 , "Random deterministic seed." )
145165 subscribe_prefix := flag .String ("subscriber-prefix" , "channel-" , "prefix for subscribing to channel, used in conjunction with key-minimum and key-maximum." )
146166 client_output_buffer_limit_pubsub := flag .String ("client-output-buffer-limit-pubsub" , "" , "Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply." )
147167 distributeSubscribers := flag .Bool ("oss-cluster-api-distribute-subscribers" , false , "read cluster slots and distribute subscribers among them." )
@@ -167,6 +187,8 @@ func main() {
167187 log .Fatal (fmt .Errorf ("--messages and --test-time are mutially exclusive ( please specify one or the other )" ))
168188 }
169189 log .Println (fmt .Sprintf ("pubsub-sub-bench (git_sha1:%s%s)" , git_sha , git_dirty_str ))
190+ log .Println (fmt .Sprintf ("using random seed:%d" , * randSeed ))
191+ rand .Seed (* randSeed )
170192
171193 ctx := context .Background ()
172194 nodeCount := 0
@@ -292,7 +314,7 @@ func main() {
292314 if * max_channels_per_subscriber == * min_channels_per_subscriber {
293315 n_channels_this_conn = * max_channels_per_subscriber
294316 } else {
295- n_channels_this_conn = rand .Intn (* max_channels_per_subscriber - * min_channels_per_subscriber )
317+ n_channels_this_conn = rand .Intn (* max_channels_per_subscriber - * min_channels_per_subscriber ) + * min_channels_per_subscriber
296318 }
297319 for channel_this_conn := 1 ; channel_this_conn < n_channels_this_conn ; channel_this_conn ++ {
298320 new_channel_id := rand .Intn (* channel_maximum ) + * channel_minimum
@@ -330,12 +352,13 @@ func main() {
330352 if * max_reconnect_interval == * min_reconnect_interval {
331353 connectionReconnectInterval = * max_reconnect_interval
332354 } else {
333- connectionReconnectInterval = rand .Intn (* max_reconnect_interval - * min_reconnect_interval ) + * max_reconnect_interval
355+ connectionReconnectInterval = rand .Intn (* max_reconnect_interval - * min_reconnect_interval ) + * min_reconnect_interval
334356 }
335357 if connectionReconnectInterval > 0 {
336- log .Println (fmt .Sprintf ("Using reconnection interval of %d for subscriber: %s" , connectionReconnectInterval , subscriberName ))
358+ log .Println (fmt .Sprintf ("Using reconnection interval of %d milliseconds for subscriber: %s" , connectionReconnectInterval , subscriberName ))
337359 }
338- go subscriberRoutine (* mode , channels , * printMessages , connectionReconnectInterval , ctx , & wg , client )
360+ log .Println (fmt .Sprintf ("subscriber: %s. Total channels %d: %v" , subscriberName , len (channels ), channels ))
361+ go subscriberRoutine (subscriberName , * mode , channels , * printMessages , connectionReconnectInterval , ctx , & wg , client )
339362 }
340363 }
341364 }
@@ -431,10 +454,11 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw
431454 start := time .Now ()
432455 prevTime := time .Now ()
433456 prevMessageCount := uint64 (0 )
457+ prevConnectCount := uint64 (0 )
434458 messageRateTs := []float64 {}
435459
436460 w .Init (os .Stdout , 25 , 0 , 1 , ' ' , tabwriter .AlignRight )
437- fmt .Fprint (w , fmt .Sprintf ("Test Time\t Total Messages\t Message Rate \t " ))
461+ fmt .Fprint (w , fmt .Sprintf ("Test Time\t Total Messages\t Message Rate \t Connect Rate \t Active subscriptions \ t " ))
438462 fmt .Fprint (w , "\n " )
439463 w .Flush ()
440464 for {
@@ -444,16 +468,19 @@ func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabw
444468 now := time .Now ()
445469 took := now .Sub (prevTime )
446470 messageRate := float64 (totalMessages - prevMessageCount ) / float64 (took .Seconds ())
471+ connectRate := float64 (totalConnects - prevConnectCount ) / float64 (took .Seconds ())
472+
447473 if prevMessageCount == 0 && totalMessages != 0 {
448474 start = time .Now ()
449475 }
450476 if totalMessages != 0 {
451477 messageRateTs = append (messageRateTs , messageRate )
452478 }
453479 prevMessageCount = totalMessages
480+ prevConnectCount = totalConnects
454481 prevTime = now
455482
456- fmt .Fprint (w , fmt .Sprintf ("%.0f\t %d\t %.2f\t " , time .Since (start ).Seconds (), totalMessages , messageRate ))
483+ fmt .Fprint (w , fmt .Sprintf ("%.0f\t %d\t %.2f\t %.2f \t %d \t " , time .Since (start ).Seconds (), totalMessages , messageRate , connectRate , totalSubscribedChannels ))
457484 fmt .Fprint (w , "\r \n " )
458485 w .Flush ()
459486 if message_limit > 0 && totalMessages >= uint64 (message_limit ) {
0 commit comments