Skip to content

Commit 43e3e4a

Browse files
aricartbruth
andauthored
feat(jetstream): PersistMode for stream configurations (#327)
* feat(jsapi): add `PersistMode` and type definitions for stream persistence options - Introduced `PersistMode` enum with `Default` and `Async` options for configuring stream persistence. - Added `persist_mode` property to stream configurations for flexibility in persistence model settings. Signed-off-by: Alberto Ricart <[email protected]> * Update jetstream/src/jsapi_types.ts Co-authored-by: Byron Ruth <[email protected]> * Update jetstream/src/jsapi_types.ts Co-authored-by: Byron Ruth <[email protected]> --------- Signed-off-by: Alberto Ricart <[email protected]> Co-authored-by: Byron Ruth <[email protected]>
1 parent 6787372 commit 43e3e4a

File tree

4 files changed

+72
-1
lines changed

4 files changed

+72
-1
lines changed

jetstream/src/internal_mod.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ export {
179179
AckPolicy,
180180
DeliverPolicy,
181181
DiscardPolicy,
182+
PersistMode,
182183
PubHeaders,
183184
ReplayPolicy,
184185
RetentionPolicy,

jetstream/src/jsapi_types.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,12 @@ export type StreamConfig = StreamUpdateConfig & {
188188
* Enables the ability to send atomic batches to the stream
189189
*/
190190
"allow_atomic": boolean;
191+
192+
/**
193+
* Sets the persistence model for the stream - the default is PersistMode.Default.
194+
* This is a 2.12 feature.
195+
*/
196+
"persist_mode": PersistMode;
191197
};
192198

193199
/**
@@ -504,6 +510,26 @@ export const StoreCompression = {
504510
export type StoreCompression =
505511
typeof StoreCompression[keyof typeof StoreCompression];
506512

513+
export const PersistMode = {
514+
/**
515+
* All writes are committed and stream data is synced to disk before the publish
516+
* acknowledgement is sent.
517+
* This is the default mode, and provides the strongest data durability guarantee.
518+
*/
519+
Default: "default",
520+
/**
521+
* Writes to the stream are committed, but writes to the disk are asynchronously synced.
522+
* The publish acknowledgement is sent before the sync to the disk is complete.
523+
* This could result in data-loss if the server crashes before the sync is completed, however
524+
* with an R3+ stream, the replication provides in-flight redundancy to reduce the likelihood of
525+
* this occurring with distinct fault domains.
526+
* This can significantly increase the publish throughput.
527+
*/
528+
Async: "async",
529+
};
530+
531+
export type PersistMode = typeof PersistMode[keyof typeof PersistMode];
532+
507533
/**
508534
* Options for StreamAPI info requests
509535
*/

jetstream/src/mod.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ export {
2626
JetStreamApiError,
2727
JetStreamError,
2828
JsHeaders,
29+
PersistMode,
2930
PubHeaders,
3031
ReplayPolicy,
3132
RepublishHeaders,

jetstream/tests/streams_test.ts

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@ import {
1919
notCompatible,
2020
setup,
2121
} from "test_helpers";
22-
import { AckPolicy, jetstream, jetstreamManager } from "../src/mod.ts";
22+
import {
23+
AckPolicy,
24+
jetstream,
25+
jetstreamManager,
26+
PersistMode,
27+
} from "../src/mod.ts";
2328

2429
import { assertEquals, assertExists, assertRejects } from "@std/assert";
2530
import { initStream } from "./jstest_util.ts";
@@ -167,3 +172,41 @@ Deno.test("streams - first_seq fails if wrong server", async () => {
167172

168173
await cleanup(ns, nc);
169174
});
175+
176+
Deno.test("streams - persist mode", async () => {
177+
const { ns, nc } = await setup(jetstreamServerConf({}));
178+
if (await notCompatible(ns, nc, "2.12.0")) {
179+
return;
180+
}
181+
182+
const jsm = await jetstreamManager(nc);
183+
184+
let si = await jsm.streams.add({
185+
name: "A",
186+
subjects: ["a"],
187+
persist_mode: PersistMode.Default,
188+
});
189+
assertEquals(si.config.persist_mode, undefined);
190+
let md = si.config.metadata || {};
191+
assertEquals(md["_nats.req.level"], "0");
192+
193+
si = await jsm.streams.add({
194+
name: "B",
195+
subjects: ["b"],
196+
persist_mode: PersistMode.Async,
197+
});
198+
assertEquals(si.config.persist_mode, PersistMode.Async);
199+
md = si.config.metadata || {};
200+
assertEquals(md["_nats.req.level"], "2");
201+
202+
await assertRejects(
203+
() => {
204+
// @ts-expect-error: testing server rejection
205+
return jsm.streams.update("B", { persist_mode: PersistMode.Default });
206+
},
207+
Error,
208+
"can not change persist mode",
209+
);
210+
211+
await cleanup(ns, nc);
212+
});

0 commit comments

Comments
 (0)