Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions jetstream/src/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ export {
AckPolicy,
DeliverPolicy,
DiscardPolicy,
PersistMode,
PubHeaders,
ReplayPolicy,
RetentionPolicy,
Expand Down
26 changes: 26 additions & 0 deletions jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ export type StreamConfig = StreamUpdateConfig & {
* Enables the ability to send atomic batches to the stream
*/
"allow_atomic": boolean;

/**
* Sets the persistence model for the stream - the default is PersistMode.Default.
* This is a 2.12 feature.
*/
"persist_mode": PersistMode;
};

/**
Expand Down Expand Up @@ -504,6 +510,26 @@ export const StoreCompression = {
export type StoreCompression =
typeof StoreCompression[keyof typeof StoreCompression];

export const PersistMode = {
/**
* All writes are committed and stream data is synced to disk before the publish
* acknowledgement is sent.
* This is the default mode, and provides the strongest data durability guarantee.
*/
Default: "default",
/**
* Writes to the stream are committed, but writes to the disk are asynchronously synced.
* The publish acknowledgement is sent before the sync to the disk is complete.
* This could result in data-loss if the server crashes before the sync is completed, however
* with an R3+ stream, the replication provides in-flight redundancy to reduce the likelihood of
* this occurring with distinct fault domains.
* This can significantly increase the publish throughput.
*/
Async: "async",
};

export type PersistMode = typeof PersistMode[keyof typeof PersistMode];

/**
* Options for StreamAPI info requests
*/
Expand Down
1 change: 1 addition & 0 deletions jetstream/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export {
JetStreamApiError,
JetStreamError,
JsHeaders,
PersistMode,
PubHeaders,
ReplayPolicy,
RepublishHeaders,
Expand Down
45 changes: 44 additions & 1 deletion jetstream/tests/streams_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ import {
notCompatible,
setup,
} from "test_helpers";
import { AckPolicy, jetstream, jetstreamManager } from "../src/mod.ts";
import {
AckPolicy,
jetstream,
jetstreamManager,
PersistMode,
} from "../src/mod.ts";

import { assertEquals, assertExists, assertRejects } from "@std/assert";
import { initStream } from "./jstest_util.ts";
Expand Down Expand Up @@ -167,3 +172,41 @@ Deno.test("streams - first_seq fails if wrong server", async () => {

await cleanup(ns, nc);
});

Deno.test("streams - persist mode", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.12.0")) {
return;
}

const jsm = await jetstreamManager(nc);

let si = await jsm.streams.add({
name: "A",
subjects: ["a"],
persist_mode: PersistMode.Default,
});
assertEquals(si.config.persist_mode, undefined);
let md = si.config.metadata || {};
assertEquals(md["_nats.req.level"], "0");

si = await jsm.streams.add({
name: "B",
subjects: ["b"],
persist_mode: PersistMode.Async,
});
assertEquals(si.config.persist_mode, PersistMode.Async);
md = si.config.metadata || {};
assertEquals(md["_nats.req.level"], "2");

await assertRejects(
() => {
// @ts-expect-error: testing server rejection
return jsm.streams.update("B", { persist_mode: PersistMode.Default });
},
Error,
"can not change persist mode",
);

await cleanup(ns, nc);
});
Loading