Skip to content

Commit bb74374

Browse files
committed
feat(jsapi): add support for message scheduling
- Introduced `allow_msg_schedules` in stream configurations to enable scheduling messages. - Added `ScheduleOptions` type to define scheduling parameters (e.g., specification, target, source, ttl). - Updated headers to include scheduling fields (`Nats-Schedule`, `Nats-Schedule-Target`, etc.). - Added tests to validate message scheduling functionality. Signed-off-by: Alberto Ricart <[email protected]>
1 parent 68f9fd6 commit bb74374

File tree

4 files changed

+149
-0
lines changed

4 files changed

+149
-0
lines changed

jetstream/src/jsapi_types.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,11 @@ export type StreamConfig = StreamUpdateConfig & {
183183
* Enables a NATS stream implementation of CRDT operations
184184
*/
185185
"allow_msg_counter": boolean;
186+
187+
/**
188+
* Enables the scheduling of messages in a stream.
189+
*/
190+
"allow_msg_schedules": boolean;
186191
};
187192

188193
/**
@@ -1270,5 +1275,9 @@ export const PubHeaders = {
12701275
* enable {@link StreamConfig#allow_msg_ttl}.
12711276
*/
12721277
MessageTTL: "Nats-TTL",
1278+
Schedule: "Nats-Schedule",
1279+
ScheduleTarget: "Nats-Schedule-Target",
1280+
ScheduleSource: "Nats-Schedule-Source",
1281+
ScheduleTTL: "Nats-Schedule-TTL",
12731282
} as const;
12741283
export type PubHeaders = typeof PubHeaders[keyof typeof PubHeaders];

jetstream/src/jsclient.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,29 @@ export class JetStreamClientImpl extends BaseApiClientImpl
212212
`${opts.ttl}`,
213213
);
214214
}
215+
216+
if (opts.schedule) {
217+
const so = opts.schedule;
218+
if (so.specification) {
219+
if (typeof so.specification === "string") {
220+
mh.set(PubHeaders.Schedule, so.specification);
221+
} else if (so.specification instanceof Date) {
222+
mh.set(
223+
PubHeaders.Schedule,
224+
"@at " + so.specification.toISOString(),
225+
);
226+
}
227+
}
228+
if (so.target) {
229+
mh.set(PubHeaders.ScheduleTarget, so.target);
230+
}
231+
if (so.source) {
232+
mh.set(PubHeaders.ScheduleSource, so.source);
233+
}
234+
if (so.ttl) {
235+
mh.set(PubHeaders.ScheduleTTL, so.ttl);
236+
}
237+
}
215238
}
216239

217240
const to = opts.timeout || this.timeout;

jetstream/src/types.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,31 @@ export type PubAck = {
9898
duplicate: boolean;
9999
};
100100

101+
export type ScheduleOptions = {
102+
/**
103+
* The schedule specification format.
104+
* Currently only supports:
105+
* "@at <date in isostring format>" - e.g., `new Date(Date.now() + 60_000).toISOString()`
106+
*
107+
* Note: The ADR-51 specification defines additional formats that may be supported in future
108+
* server versions.
109+
*/
110+
specification: string | Date;
111+
112+
/**
113+
* The subject the message will be delivered to
114+
*/
115+
target: string;
116+
/**
117+
* Instructs the schedule to read the last message on the given subject and publish. If the subject is empty, nothing is published. Wildcards are NOT supported.
118+
*/
119+
source?: string;
120+
/**
121+
* Sets a message TTL if the stream supports per-message TTL.
122+
*/
123+
ttl?: string;
124+
};
125+
101126
/**
102127
* Options for messages published to JetStream
103128
*/
@@ -170,6 +195,12 @@ export type JetStreamPublishOptions = {
170195
* └────────────────────┴────────┘
171196
*/
172197
lastSubjectSequenceSubject: string;
198+
199+
/**
200+
* The expected last sequence on the stream for a message with this subject
201+
* and this value.
202+
*/
203+
lastSubjectSequenceValue: number;
173204
}>;
174205

175206
/**
@@ -184,6 +215,11 @@ export type JetStreamPublishOptions = {
184215
* Default is 1.
185216
*/
186217
retries?: number;
218+
219+
/**
220+
* Specifies the schedule for the message.
221+
*/
222+
schedule?: ScheduleOptions;
187223
};
188224

189225
/**

jetstream/tests/schedules_test.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2025 The NATS Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
import { jetstreamManager } from "../src/jsclient.ts";
16+
import { deferred, nanos } from "@nats-io/nats-core";
17+
import {
18+
cleanup,
19+
jetstreamServerConf,
20+
notCompatible,
21+
setup,
22+
} from "test_helpers";
23+
24+
Deno.test("schedules - basics", async () => {
25+
const { ns, nc } = await setup(jetstreamServerConf({}));
26+
27+
if (await notCompatible(ns, nc, "2.11.0")) {
28+
return;
29+
}
30+
31+
const jsm = await jetstreamManager(nc);
32+
await jsm.streams.add({
33+
name: "schedules",
34+
allow_msg_schedules: true,
35+
subjects: ["schedules.>", "target.>"],
36+
allow_msg_ttl: true,
37+
});
38+
39+
await jsm.consumers.add("schedules", {
40+
flow_control: true,
41+
idle_heartbeat: nanos(60_000),
42+
deliver_subject: "cron",
43+
filter_subject: "target.>",
44+
});
45+
46+
const d3000 = deferred();
47+
const d5000 = deferred();
48+
49+
nc.subscribe("cron", {
50+
callback: (err, m) => {
51+
if (m.subject.endsWith("5000")) {
52+
d5000.resolve();
53+
} else if (m.subject.endsWith("3000")) {
54+
d3000.resolve();
55+
}
56+
},
57+
});
58+
59+
const js = jsm.jetstream();
60+
61+
const specification = "@at " + new Date(Date.now() + 5000).toISOString();
62+
await js.publish("schedules.a", "5000", {
63+
schedule: {
64+
specification,
65+
target: "target.5000",
66+
ttl: "5m",
67+
},
68+
});
69+
70+
await js.publish("schedules.b", "3000", {
71+
schedule: {
72+
specification: new Date(Date.now() + 3000),
73+
target: "target.3000",
74+
ttl: "5m",
75+
},
76+
});
77+
78+
await Promise.all([d3000, d5000]);
79+
80+
await cleanup(ns, nc);
81+
});

0 commit comments

Comments
 (0)