diff --git a/jetstream/src/internal_mod.ts b/jetstream/src/internal_mod.ts index 9171c0c7..655eaa2f 100644 --- a/jetstream/src/internal_mod.ts +++ b/jetstream/src/internal_mod.ts @@ -179,6 +179,7 @@ export { AckPolicy, DeliverPolicy, DiscardPolicy, + PersistMode, PubHeaders, ReplayPolicy, RetentionPolicy, diff --git a/jetstream/src/jsapi_types.ts b/jetstream/src/jsapi_types.ts index 8b55eaa2..8af72dc6 100644 --- a/jetstream/src/jsapi_types.ts +++ b/jetstream/src/jsapi_types.ts @@ -188,6 +188,12 @@ export type StreamConfig = StreamUpdateConfig & { * Enables the ability to send atomic batches to the stream */ "allow_atomic": boolean; + + /** + * Sets the persistence model for the stream - the default is PersistMode.Default. + * This is a 2.12 feature. + */ + "persist_mode": PersistMode; }; /** @@ -504,6 +510,26 @@ export const StoreCompression = { export type StoreCompression = typeof StoreCompression[keyof typeof StoreCompression]; +export const PersistMode = { + /** + * All writes are committed and stream data is synced to disk before the publish + * acknowledgement is sent. + * This is the default mode, and provides the strongest data durability guarantee. + */ + Default: "default", + /** + * Writes to the stream are committed, but writes to the disk are asynchronously synced. + * The publish acknowledgement is sent before the sync to the disk is complete. + * This could result in data-loss if the server crashes before the sync is completed, however + * with an R3+ stream, the replication provides in-flight redundancy to reduce the likelihood of + * this occurring with distinct fault domains. + * This can significantly increase the publish throughput. + */ + Async: "async", +}; + +export type PersistMode = typeof PersistMode[keyof typeof PersistMode]; + /** * Options for StreamAPI info requests */ diff --git a/jetstream/src/mod.ts b/jetstream/src/mod.ts index 3d53b759..83d06163 100644 --- a/jetstream/src/mod.ts +++ b/jetstream/src/mod.ts @@ -26,6 +26,7 @@ export { JetStreamApiError, JetStreamError, JsHeaders, + PersistMode, PubHeaders, ReplayPolicy, RepublishHeaders, diff --git a/jetstream/tests/streams_test.ts b/jetstream/tests/streams_test.ts index 6e8e3be7..f3e24567 100644 --- a/jetstream/tests/streams_test.ts +++ b/jetstream/tests/streams_test.ts @@ -19,7 +19,12 @@ import { notCompatible, setup, } from "test_helpers"; -import { AckPolicy, jetstream, jetstreamManager } from "../src/mod.ts"; +import { + AckPolicy, + jetstream, + jetstreamManager, + PersistMode, +} from "../src/mod.ts"; import { assertEquals, assertExists, assertRejects } from "@std/assert"; import { initStream } from "./jstest_util.ts"; @@ -167,3 +172,41 @@ Deno.test("streams - first_seq fails if wrong server", async () => { await cleanup(ns, nc); }); + +Deno.test("streams - persist mode", async () => { + const { ns, nc } = await setup(jetstreamServerConf({})); + if (await notCompatible(ns, nc, "2.12.0")) { + return; + } + + const jsm = await jetstreamManager(nc); + + let si = await jsm.streams.add({ + name: "A", + subjects: ["a"], + persist_mode: PersistMode.Default, + }); + assertEquals(si.config.persist_mode, undefined); + let md = si.config.metadata || {}; + assertEquals(md["_nats.req.level"], "0"); + + si = await jsm.streams.add({ + name: "B", + subjects: ["b"], + persist_mode: PersistMode.Async, + }); + assertEquals(si.config.persist_mode, PersistMode.Async); + md = si.config.metadata || {}; + assertEquals(md["_nats.req.level"], "2"); + + await assertRejects( + () => { + // @ts-expect-error: testing server rejection + return jsm.streams.update("B", { persist_mode: PersistMode.Default }); + }, + Error, + "can not change persist mode", + ); + + await cleanup(ns, nc); +});