Skip to content

Commit b57635a

Browse files
authored
Adjust stream creation in nats bench (#1493)
Adds a new flag to enable asynchronous persist mode on R1 streams Signed-off-by: Jean-Noël Moyne <[email protected]>
1 parent f5d94e4 commit b57635a

File tree

1 file changed

+18
-3
lines changed

1 file changed

+18
-3
lines changed

cli/bench_command.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type benchCmd struct {
6262
doubleAck bool
6363
batchSize int
6464
replicas int
65+
persistModeAsync bool
6566
purge bool
6667
sleep time.Duration
6768
consumerName string
@@ -124,6 +125,7 @@ func configureBenchCommand(app commandHost) {
124125
f.Flag("dedup", "Sets a message id in the header to use JS Publish de-duplication").Default("false").UnNegatableBoolVar(&c.deDuplication)
125126
f.Flag("dedupwindow", "Sets the duration of the stream's deduplication functionality").Default("2m").DurationVar(&c.deDuplicationWindow)
126127
f.Flag("purge", "Purge the stream before running").UnNegatableBoolVar(&c.purge)
128+
f.Flag("persistasync", "Set the persistence mode for the steam to asynchronous (only for R1 streams)").UnNegatableBoolVar(&c.persistModeAsync)
127129
}
128130

129131
addKVPutFlags := func(f *fisk.CmdClause) {
@@ -837,6 +839,10 @@ func (c *benchCmd) jspubActions(_ *fisk.ParseContext, jsPubType string) error {
837839
return err
838840
}
839841

842+
if c.persistModeAsync && c.replicas != 1 && jsPubType == bench.TypeJSPubBatch {
843+
return fmt.Errorf("async persist mode is only supported for streams with 1 replica and incompatible with atomic batch publishing")
844+
}
845+
840846
banner := c.generateBanner(jsPubType)
841847
log.Println(banner)
842848
bm := bench.NewBenchmark("NATS", jsPubType, c.numClients)
@@ -874,12 +880,21 @@ func (c *benchCmd) jspubActions(_ *fisk.ParseContext, jsPubType string) error {
874880

875881
if c.createStream {
876882
// create the stream with our attributes, will create it if it doesn't exist or make sure the existing one has the same attributes
883+
// uses the jsm library since it's updated with the new 2.12+ stream features and nats.go doesn't yet support them
884+
atomicBatch := true
885+
persistMode := api.DefaultPersistMode
886+
storage := api.MemoryStorage
877887

878888
if c.storageType() == jetstream.FileStorage {
879-
_, err = myjsm.NewStreamFromDefault(c.streamOrBucketName, api.StreamConfig{Name: c.streamOrBucketName, Subjects: []string{c.getSubscribeSubject()}, Retention: api.LimitsPolicy, Discard: api.DiscardNew, Storage: 0, Replicas: c.replicas, MaxBytes: c.streamMaxBytes, Duplicates: c.deDuplicationWindow, AllowDirect: true, AllowAtomicPublish: true})
880-
} else {
881-
_, err = myjsm.NewStreamFromDefault(c.streamOrBucketName, api.StreamConfig{Name: c.streamOrBucketName, Subjects: []string{c.getSubscribeSubject()}, Retention: api.LimitsPolicy, Discard: api.DiscardNew, Storage: 1, Replicas: c.replicas, MaxBytes: c.streamMaxBytes, Duplicates: c.deDuplicationWindow, AllowDirect: true, AllowAtomicPublish: true})
889+
storage = api.FileStorage
882890
}
891+
892+
if c.replicas == 1 && jsPubType != bench.TypeJSPubBatch && c.persistModeAsync {
893+
persistMode = api.AsyncPersistMode
894+
atomicBatch = false
895+
}
896+
897+
_, err = myjsm.NewStreamFromDefault(c.streamOrBucketName, api.StreamConfig{Name: c.streamOrBucketName, Subjects: []string{c.getSubscribeSubject()}, Retention: api.LimitsPolicy, Discard: api.DiscardNew, Storage: storage, Replicas: c.replicas, MaxBytes: c.streamMaxBytes, Duplicates: c.deDuplicationWindow, AllowDirect: true, AllowAtomicPublish: atomicBatch, PersistMode: persistMode})
883898
if err != nil {
884899
return fmt.Errorf("could not create the stream. If you want to delete and re-define the stream use `nats stream delete %s`: %w", c.streamOrBucketName, err)
885900
}

0 commit comments

Comments
 (0)