Skip to content

Commit 71c84de

Browse files
committed
feat(jetstream): handle "no results" status in direct APIs and add relevant tests
- Added `isNoResults` method to identify "no results" error in JetStream errors. - Updated logic in `jsm_direct.ts` to stop iterators on "no results" status. - Enhanced tests with scenarios for handling batches and direct message retrieval when no messages are available. Signed-off-by: Alberto Ricart <[email protected]>
1 parent 7d3f8aa commit 71c84de

File tree

3 files changed

+57
-7
lines changed

3 files changed

+57
-7
lines changed

jetstream/src/jserrors.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ export class JetStreamStatus {
176176
return this.code === 404 && this.description === "message not found";
177177
}
178178

179+
isNoResults(): boolean {
180+
return this.code === 404 && this.description === "no results";
181+
}
182+
179183
isMessageSizeExceedsMaxBytes(): boolean {
180184
return this.code === 409 &&
181185
this.description === "message size exceeds maxbytes";

jetstream/src/jsm_direct.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,11 @@ export class DirectStreamAPIImpl extends BaseApiClientImpl
183183
}
184184
const status = JetStreamStatus.maybeParseStatus(msg);
185185
if (status) {
186+
if (status.isNoResults()) {
187+
push({}, () => {
188+
iter.stop();
189+
});
190+
}
186191
if (status.isEndOfBatch()) {
187192
push({}, () => {
188193
iter.stop();

jetstream/tests/jsm_direct_test.ts

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -492,13 +492,8 @@ Deno.test("direct - last message for", async (t) => {
492492
}
493493

494494
await t.step("not matched filter", async () => {
495-
await assertRejects(
496-
async () => {
497-
await assertBatch({ opts: { multi_last: ["c"] }, expect: [] });
498-
},
499-
JetStreamError,
500-
"no results",
501-
);
495+
await assertBatch({ opts: { multi_last: ["c"] }, expect: [] });
496+
502497
});
503498

504499
await t.step("single filter", async () => {
@@ -580,3 +575,49 @@ Deno.test("direct - batch next_by_subj", async () => {
580575

581576
await cleanup(ns, nc);
582577
});
578+
579+
Deno.test("direct - batch no messages", async () => {
580+
const { ns, nc } = await setup(jetstreamServerConf(), { debug: true });
581+
if (await notCompatible(ns, nc, "2.11.0")) {
582+
return;
583+
}
584+
const nci = nc as NatsConnectionImpl;
585+
const jsm = await jetstreamManager(nci) as JetStreamManagerImpl;
586+
await jsm.streams.add({
587+
name: "A",
588+
subjects: ["a", "b", "z"],
589+
storage: StorageType.Memory,
590+
allow_direct: true,
591+
});
592+
593+
const iter = await jsm.direct.getLastMessagesFor("A", {
594+
multi_last: ["a", "b", "z"],
595+
batch: 100,
596+
});
597+
for await (const m of iter) {
598+
console.log(m.seq);
599+
}
600+
assertEquals(iter.getProcessed(), 0);
601+
602+
await cleanup(ns, nc);
603+
});
604+
605+
Deno.test("direct - get no messages", async () => {
606+
const { ns, nc } = await setup(jetstreamServerConf(), { debug: true });
607+
if (await notCompatible(ns, nc, "2.11.0")) {
608+
return;
609+
}
610+
const nci = nc as NatsConnectionImpl;
611+
const jsm = await jetstreamManager(nci) as JetStreamManagerImpl;
612+
await jsm.streams.add({
613+
name: "A",
614+
subjects: ["a", "b", "z"],
615+
storage: StorageType.Memory,
616+
allow_direct: true,
617+
});
618+
619+
const m = await jsm.direct.getMessage("A", { last_by_subj: "a" });
620+
assertEquals(m, null);
621+
622+
await cleanup(ns, nc);
623+
});

0 commit comments

Comments
 (0)