diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index a25156614..e48b5fbb5 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -53,7 +53,7 @@ jobs: - 6379:6379 env: AMQP_URL: amqp://guest:guest@localhost:5672 - DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres + POSTGRES_URL: postgres://postgres:postgres@localhost:5432/postgres REDIS_URL: redis://localhost:6379 steps: - uses: actions/checkout@v4 @@ -116,7 +116,7 @@ jobs: - 6379:6379 env: AMQP_URL: amqp://guest:guest@localhost:5672 - DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres + POSTGRES_URL: postgres://postgres:postgres@localhost:5432/postgres REDIS_URL: redis://localhost:6379 steps: - uses: actions/checkout@v4 @@ -157,7 +157,7 @@ jobs: - 6379:6379 env: AMQP_URL: amqp://guest:guest@localhost:5672 - DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres + POSTGRES_URL: postgres://postgres:postgres@localhost:5432/postgres REDIS_URL: redis://localhost:6379 steps: - uses: actions/checkout@v4 diff --git a/CHANGES.md b/CHANGES.md index feaf99a0f..99e98dcea 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -266,6 +266,31 @@ To be released. [#437]: https://github.com/fedify-dev/fedify/issues/437 +### @fedify/sqlite + + - Added `SqliteMessageQueue` class implementing `MessageQueue` interface + using SQLite as the backing store. This implementation uses polling to + check for new messages and is suitable for single-node deployments and + development environments. [[#477], [#526] by ChanHaeng Lee] + + - Added `SqliteMessageQueue` class. + - Added `SqliteMessageQueueOptions` interface. + +[#477]: https://github.com/fedify-dev/fedify/issues/477 +[#526]: https://github.com/fedify-dev/fedify/pull/526 + +### @fedify/testing + + - Added `testMessageQueue()` utility function for standardized testing of + `MessageQueue` implementations. This function provides a reusable test + harness that covers common message queue operations including `enqueue()`, + `enqueue()` with delay, `enqueueMany()`, and multiple listener scenarios. + [[#477], [#526] by ChanHaeng Lee] + + - Added `testMessageQueue()` function. + - Added `waitFor()` helper function. + - Added `getRandomKey()` helper function. + Version 1.10.0 -------------- diff --git a/docs/manual/mq.md b/docs/manual/mq.md index ed9c3af31..e6fcc928f 100644 --- a/docs/manual/mq.md +++ b/docs/manual/mq.md @@ -293,6 +293,115 @@ const federation = createFederation({ [`AmqpMessageQueue`]: https://jsr.io/@fedify/amqp/doc/mq/~/AmqpMessageQueue [RabbitMQ]: https://www.rabbitmq.com/ +### `SqliteMessageQueue` + +*This API is available since Fedify 2.0.0.* + +To use [`SqliteMessageQueue`], you need to install the *@fedify/sqlite* package +first: + +::: code-group + +~~~~ bash [Deno] +deno add jsr:@fedify/sqlite +~~~~ + +~~~~ bash [npm] +npm add @fedify/sqlite +~~~~ + +~~~~ bash [pnpm] +pnpm add @fedify/sqlite +~~~~ + +~~~~ bash [Yarn] +yarn add @fedify/sqlite +~~~~ + +~~~~ bash [Bun] +bun add @fedify/sqlite +~~~~ + +::: + +[`SqliteMessageQueue`] is a message queue implementation that uses SQLite as +the backend. It uses polling to check for new messages and is designed for +single-node deployments. It's suitable for development, testing, and +small-scale production use where simplicity is preferred over high throughput. +It uses native sqlite modules, [`node:sqlite`] for Node.js and Deno, +[`bun:sqlite`] for Bun. + +Best for +: Development and testing. + +Pros +: Simple, persistent with minimal configuration. + +Cons +: Limited scalability, not suitable for high-traffic production. + +> [!NOTE] +> `SqliteMessageQueue` uses `DELETE ... RETURNING` to atomically fetch and +> delete the oldest message that is ready to be processed. This requires +> SQLite 3.35.0 or later. + +::: code-group + +~~~~ typescript twoslash [Deno] +import type { KvStore } from "@fedify/fedify"; +// ---cut-before--- +import { DatabaseSync } from "node:sqlite"; +import { createFederation } from "@fedify/fedify"; +import { SqliteMessageQueue } from "@fedify/sqlite"; + +const db = new DatabaseSync(":memory:"); +const federation = createFederation({ + // ... + // ---cut-start--- + kv: null as unknown as KvStore, + // ---cut-end--- + queue: new SqliteMessageQueue(db), // [!code highlight] +}); +~~~~ + +~~~~ typescript twoslash [Node.js] +import type { KvStore } from "@fedify/fedify"; +// ---cut-before--- +import { DatabaseSync } from "node:sqlite"; +import { createFederation } from "@fedify/fedify"; +import { SqliteMessageQueue } from "@fedify/sqlite"; + +const db = new DatabaseSync(":memory:"); +const federation = createFederation({ + // ... + // ---cut-start--- + kv: null as unknown as KvStore, + // ---cut-end--- + queue: new SqliteMessageQueue(db), // [!code highlight] +}); +~~~~ + +~~~~ typescript [Bun] +import type { KvStore } from "@fedify/fedify"; +// ---cut-before--- +import { Database } from "bun:sqlite"; +import { createFederation } from "@fedify/fedify"; +import { SqliteMessageQueue } from "@fedify/sqlite"; + +const db = new Database(":memory:"); +const federation = createFederation({ + // ... + // ---cut-start--- + kv: null as unknown as KvStore, + // ---cut-end--- + queue: new SqliteMessageQueue(db), // [!code highlight] +}); +~~~~ + +::: + +[`SqliteMessageQueue`]: https://jsr.io/@fedify/sqlite/doc/mq/~/SqliteMessageQueue + ### `WorkersMessageQueue` (Cloudflare Workers only) *This API is available since Fedify 1.6.0.* @@ -659,6 +768,9 @@ The following implementations do not yet support native retry: [`AmqpMessageQueue`] : Native retry support planned for future release. +[`SqliteMessageQueue`] +: No native retry support (`~MessageQueue.nativeRetrial` is `false`). + `ParallelMessageQueue` inherits the `~MessageQueue.nativeRetrial` value from the wrapped queue. diff --git a/packages/amqp/package.json b/packages/amqp/package.json index f12a2f2ee..41dbd00d9 100644 --- a/packages/amqp/package.json +++ b/packages/amqp/package.json @@ -59,6 +59,7 @@ }, "devDependencies": { "@alinea/suite": "^0.6.3", + "@fedify/testing": "workspace:^", "@js-temporal/polyfill": "catalog:", "@std/assert": "catalog:", "@std/async": "catalog:", diff --git a/packages/amqp/src/mq.test.ts b/packages/amqp/src/mq.test.ts index bfc24987e..a32bd146e 100644 --- a/packages/amqp/src/mq.test.ts +++ b/packages/amqp/src/mq.test.ts @@ -1,19 +1,12 @@ import { suite } from "@alinea/suite"; import { AmqpMessageQueue } from "@fedify/amqp/mq"; -import * as temporal from "@js-temporal/polyfill"; +import { getRandomKey, testMessageQueue, waitFor } from "@fedify/testing"; import { assert, assertEquals, assertFalse, assertGreater } from "@std/assert"; import { delay } from "@std/async/delay"; // @deno-types="npm:@types/amqplib" import { type ChannelModel, connect } from "amqplib"; import process from "node:process"; -let Temporal: typeof temporal.Temporal; -if ("Temporal" in globalThis) { - Temporal = globalThis.Temporal; -} else { - Temporal = temporal.Temporal; -} - const AMQP_URL = process.env.AMQP_URL; const test = AMQP_URL ? suite(import.meta) : suite(import.meta).skip; @@ -21,86 +14,28 @@ function getConnection(): Promise { return connect(AMQP_URL!); } -test("AmqpMessageQueue", { - sanitizeOps: false, - sanitizeExit: false, - sanitizeResources: false, -}, async () => { - const conn = await getConnection(); - const conn2 = await getConnection(); - const randomSuffix = Math.random().toString(36).substring(2); - const queue = `fedify_queue_${randomSuffix}`; - const delayedQueuePrefix = `fedify_delayed_${randomSuffix}_`; - const mq = new AmqpMessageQueue(conn, { queue, delayedQueuePrefix }); - const mq2 = new AmqpMessageQueue(conn2, { queue, delayedQueuePrefix }); - - const messages1: string[] = []; - const messages2: string[] = []; - const allMessages: string[] = []; - const controller = new AbortController(); - const listening = mq.listen((message: string) => { - messages1.push(message); - allMessages.push(message); - }, { signal: controller.signal }); - const listening2 = mq2.listen((message: string) => { - messages2.push(message); - allMessages.push(message); - }, { signal: controller.signal }); - - await mq.enqueue("Hello, world!"); - - await waitFor(() => allMessages.length > 0, 15_000); - - assertEquals(allMessages.includes("Hello, world!"), true); - - // enqueue() with delay - const started = Date.now(); - await mq.enqueue( - "Delayed message", - { delay: Temporal.Duration.from({ seconds: 3 }) }, - ); - - await waitFor(() => allMessages.includes("Delayed message"), 15_000); - - // listen() with delay - assertEquals(allMessages.includes("Delayed message"), true); - assertGreater(Date.now() - started, 3_000); - - await mq.enqueueMany(["Message 1", "Message 2", "Message 3"]); - - await waitFor(() => - allMessages.includes("Message 1") && - allMessages.includes("Message 2") && - allMessages.includes("Message 3"), 15_000); - - // listen() after enqueueMany() - assertEquals(allMessages.includes("Message 1"), true); - assertEquals(allMessages.includes("Message 2"), true); - assertEquals(allMessages.includes("Message 3"), true); - - // enqueueMany() with delay - const manyStarted = Date.now(); - await mq.enqueueMany( - ["Delayed batch 1", "Delayed batch 2"], - { delay: Temporal.Duration.from({ seconds: 3 }) }, - ); - - await waitFor(() => - allMessages.includes("Delayed batch 1") && - allMessages.includes("Delayed batch 2"), 15_000); - - // listen() after enqueueMany() with delay - assertEquals(allMessages.includes("Delayed batch 1"), true); - assertEquals(allMessages.includes("Delayed batch 2"), true); - assertGreater(Date.now() - manyStarted, 3_000); - - controller.abort(); - await listening; - await listening2; - - await conn.close(); - await conn2.close(); -}); +const connections: ChannelModel[] = []; +const queue = getRandomKey("queue"); +const delayedQueuePrefix = getRandomKey("delayed") + "_"; + +test( + "AmqpMessageQueue", + { sanitizeOps: false, sanitizeExit: false, sanitizeResources: false }, + () => + testMessageQueue( + async () => { + const conn = await getConnection(); + connections.push(conn); + return new AmqpMessageQueue(conn, { queue, delayedQueuePrefix }); + }, + async ({ controller }) => { + controller.abort(); + for (const conn of connections) { + await conn.close(); + } + }, + ), +); test( "AmqpMessageQueue [nativeRetrial: false]", @@ -171,16 +106,3 @@ test( assertGreater(i, 1); }, ); - -async function waitFor( - predicate: () => boolean, - timeoutMs: number, -): Promise { - const started = Date.now(); - while (!predicate()) { - await delay(500); - if (Date.now() - started > timeoutMs) { - throw new Error("Timeout"); - } - } -} diff --git a/packages/cli/package.json b/packages/cli/package.json index d264dc734..757a63342 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -50,6 +50,7 @@ "@fedify/vocab": "workspace:*", "@fedify/vocab-runtime": "workspace:*", "@fedify/vocab-tools": "workspace:*", + "@fedify/webfinger": "workspace:*", "@fxts/core": "catalog:", "@hongminhee/localtunnel": "^0.3.0", "@inquirer/prompts": "^7.8.4", diff --git a/packages/cli/src/init/json/kv.json b/packages/cli/src/init/json/kv.json index 00d844754..fbbb79230 100644 --- a/packages/cli/src/init/json/kv.json +++ b/packages/cli/src/init/json/kv.json @@ -24,9 +24,9 @@ "@fedify/postgres": { "PostgresKvStore": "PostgresKvStore" }, "postgres": { "default": "postgres" } }, - "object": "new PostgresKvStore(postgres(process.env.DATABASE_URL))", + "object": "new PostgresKvStore(postgres(process.env.POSTGRES_URL))", "env": { - "DATABASE_URL": "postgres://postgres@localhost:5432/postgres" + "POSTGRES_URL": "postgres://postgres@localhost:5432/postgres" } }, "denokv": { diff --git a/packages/cli/src/init/json/mq.json b/packages/cli/src/init/json/mq.json index 4ec3ff9c3..2dd6143e8 100644 --- a/packages/cli/src/init/json/mq.json +++ b/packages/cli/src/init/json/mq.json @@ -44,9 +44,9 @@ "default": "postgres" } }, - "object": "new PostgresMessageQueue(postgres(process.env.DATABASE_URL))", + "object": "new PostgresMessageQueue(postgres(process.env.POSTGRES_URL))", "env": { - "DATABASE_URL": "postgres://postgres@localhost:5432/postgres" + "POSTGRES_URL": "postgres://postgres@localhost:5432/postgres" } }, "amqp": { diff --git a/packages/denokv/src/mod.test.ts b/packages/denokv/src/mod.test.ts index 6de322a86..77df71c64 100644 --- a/packages/denokv/src/mod.test.ts +++ b/packages/denokv/src/mod.test.ts @@ -1,5 +1,5 @@ -import { assertEquals, assertGreater } from "@std/assert"; -import { delay } from "es-toolkit"; +import { testMessageQueue } from "@fedify/testing"; +import { assertEquals } from "@std/assert"; import { DenoKvMessageQueue, DenoKvStore } from "./mod.ts"; Deno.test("DenoKvStore", async (t) => { @@ -133,56 +133,23 @@ Deno.test("DenoKvStore", async (t) => { kv.close(); }); -Deno.test("DenoKvMessageQueue", async (t) => { - const kv = await Deno.openKv(":memory:"); - const mq = new DenoKvMessageQueue(kv); - - const messages: string[] = []; - const controller = new AbortController(); - const listening = mq.listen((message: string) => { - messages.push(message); - }, { signal: controller.signal }); - - await t.step("enqueue()", async () => { - await mq.enqueue("Hello, world!"); - }); - - await waitFor(() => messages.length > 0, 15_000); - - await t.step("listen()", () => { - assertEquals(messages, ["Hello, world!"]); - }); - - let started = 0; - await t.step("enqueue() with delay", async () => { - started = Date.now(); - await mq.enqueue( - "Delayed message", - { delay: Temporal.Duration.from({ seconds: 3 }) }, - ); - }); +const kvs: Deno.Kv[] = []; - await waitFor(() => messages.length > 1, 15_000); - - await t.step("listen() with delay", () => { - assertEquals(messages, ["Hello, world!", "Delayed message"]); - assertGreater(Date.now() - started, 3_000); - }); - - controller.abort(); - await listening; - mq[Symbol.dispose](); -}); - -async function waitFor( - predicate: () => boolean, - timeoutMs: number, -): Promise { - const started = Date.now(); - while (!predicate()) { - await delay(500); - if (Date.now() - started > timeoutMs) { - throw new Error("Timeout"); - } - } -} +Deno.test("DenoKvMessageQueue", () => + testMessageQueue( + async () => { + const kv = await Deno.openKv(":memory:"); + kvs.push(kv); + return new DenoKvMessageQueue(kv); + }, + ({ controller }) => { + controller.abort(); + for (const kv of kvs) { + try { + kv.close(); + } catch { + // Ignore errors on close + } + } + }, + )); diff --git a/packages/postgres/deno.json b/packages/postgres/deno.json index 08cc9d35f..3b1c3199e 100644 --- a/packages/postgres/deno.json +++ b/packages/postgres/deno.json @@ -12,7 +12,9 @@ "node_modules" ], "publish": { - "exclude": ["**/*.test.ts"] + "exclude": [ + "**/*.test.ts" + ] }, "tasks": { "check": "deno fmt --check && deno lint && deno check src/*.ts", diff --git a/packages/postgres/package.json b/packages/postgres/package.json index 84487f481..e362caa79 100644 --- a/packages/postgres/package.json +++ b/packages/postgres/package.json @@ -77,6 +77,8 @@ "postgres": "catalog:" }, "devDependencies": { + "@fedify/fixture": "workspace:^", + "@fedify/testing": "workspace:^", "@std/async": "catalog:", "tsdown": "catalog:", "typescript": "catalog:" diff --git a/packages/postgres/src/kv.test.ts b/packages/postgres/src/kv.test.ts index 0047c252f..52a305153 100644 --- a/packages/postgres/src/kv.test.ts +++ b/packages/postgres/src/kv.test.ts @@ -13,7 +13,7 @@ if ("Temporal" in globalThis) { Temporal = temporal.Temporal; } -const dbUrl = process.env.DATABASE_URL; +const dbUrl = process.env.POSTGRES_URL; function getStore(): { // deno-lint-ignore no-explicit-any diff --git a/packages/postgres/src/mq.test.ts b/packages/postgres/src/mq.test.ts index 715b1b983..b5faf61e2 100644 --- a/packages/postgres/src/mq.test.ts +++ b/packages/postgres/src/mq.test.ts @@ -1,116 +1,33 @@ +import { test } from "@fedify/fixture"; import { PostgresMessageQueue } from "@fedify/postgres/mq"; -import * as temporal from "@js-temporal/polyfill"; -import { delay } from "@std/async/delay"; -import assert from "node:assert/strict"; +import { getRandomKey, testMessageQueue } from "@fedify/testing"; import process from "node:process"; -import { test } from "node:test"; import postgres from "postgres"; -let Temporal: typeof temporal.Temporal; -if ("Temporal" in globalThis) { - Temporal = globalThis.Temporal; -} else { - Temporal = temporal.Temporal; -} - -const dbUrl = process.env.DATABASE_URL; +const dbUrl = process.env.POSTGRES_URL; +const sqls: postgres.Sql[] = []; -test("PostgresMessageQueue", { skip: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option +function createSql() { const sql = postgres(dbUrl!); - const sql2 = postgres(dbUrl!); - const tableName = `fedify_message_test_${ - Math.random().toString(36).slice(5) - }`; - const channelName = `fedify_channel_test_${ - Math.random().toString(36).slice(5) - }`; - const mq = new PostgresMessageQueue(sql, { tableName, channelName }); - const mq2 = new PostgresMessageQueue(sql2, { tableName, channelName }); - - try { - const messages: string[] = []; - const controller = new AbortController(); - await mq.initialize(); - const listening = mq.listen((message: string) => { - messages.push(message); - }, { signal: controller.signal }); - const listening2 = mq2.listen((message: string) => { - messages.push(message); - }, { signal: controller.signal }); - - // enqueue() - await mq.enqueue("Hello, world!"); - - await waitFor(() => messages.length > 0, 15_000); - - // listen() - assert.deepStrictEqual(messages, ["Hello, world!"]); - - // enqueue() with delay - let started = 0; - started = Date.now(); - await mq.enqueue( - { msg: "Delayed message" }, - { delay: Temporal.Duration.from({ seconds: 3 }) }, - ); - - await waitFor(() => messages.length > 1, 15_000); - - // listen() with delay - assert.deepStrictEqual(messages, ["Hello, world!", { - msg: "Delayed message", - }]); - assert.ok(Date.now() - started > 3_000); - - // enqueueMany() - while (messages.length > 0) messages.pop(); - const batchMessages = [ - "First batch message", - { text: "Second batch message" }, - { text: "Third batch message", priority: "high" }, - ]; - await mq.enqueueMany(batchMessages); - await waitFor(() => messages.length === batchMessages.length, 15_000); - assert.deepStrictEqual(messages, batchMessages); - - // enqueueMany() with delay - while (messages.length > 0) messages.pop(); - started = Date.now(); - const delayedBatchMessages = [ - "Delayed batch 1", - "Delayed batch 2", - ]; - await mq.enqueueMany( - delayedBatchMessages, - { delay: Temporal.Duration.from({ seconds: 2 }) }, - ); - await waitFor( - () => messages.length === delayedBatchMessages.length, - 15_000, - ); - assert.deepStrictEqual(messages, delayedBatchMessages); - assert.ok(Date.now() - started > 2_000); - - controller.abort(); - await listening; - await listening2; - } finally { - await mq.drop(); - await sql.end(); - await sql2.end(); - } -}); - -async function waitFor( - predicate: () => boolean, - timeoutMs: number, -): Promise { - const started = Date.now(); - while (!predicate()) { - await delay(500); - if (Date.now() - started > timeoutMs) { - throw new Error("Timeout"); - } - } + sqls.push(sql); + return sql; } + +test("PostgresMessageQueue", { ignore: dbUrl == null }, () => + testMessageQueue( + () => + new PostgresMessageQueue(createSql(), { + tableName: getRandomKey("message"), + channelName: getRandomKey("channel"), + }), + async ({ mq1, mq2, controller }) => { + controller.abort(); + await mq1.drop(); + await mq2.drop(); + for (const sql of sqls) { + await sql.end(); + } + }, + )); + +// cspell: ignore sqls diff --git a/packages/postgres/src/mq.ts b/packages/postgres/src/mq.ts index 53ca0b044..67d7436bf 100644 --- a/packages/postgres/src/mq.ts +++ b/packages/postgres/src/mq.ts @@ -258,4 +258,4 @@ export class PostgresMessageQueue implements MessageQueue { } } -// cSpell: ignore typname +// cSpell: ignore typname unlisten diff --git a/packages/redis/deno.json b/packages/redis/deno.json index 9063255ce..62597bce9 100644 --- a/packages/redis/deno.json +++ b/packages/redis/deno.json @@ -13,7 +13,9 @@ "node_modules" ], "publish": { - "exclude": ["**/*.test.ts"] + "exclude": [ + "**/*.test.ts" + ] }, "tasks": { "check": "deno fmt --check && deno lint && deno check src/*.ts", diff --git a/packages/redis/package.json b/packages/redis/package.json index f68071ec9..c8d047790 100644 --- a/packages/redis/package.json +++ b/packages/redis/package.json @@ -87,6 +87,7 @@ "devDependencies": { "@std/async": "catalog:", "@fedify/fixture": "workspace:^", + "@fedify/testing": "workspace:^", "@types/node": "catalog:", "tsdown": "catalog:", "typescript": "catalog:" diff --git a/packages/redis/src/mq.test.ts b/packages/redis/src/mq.test.ts index cb37fa391..c07029025 100644 --- a/packages/redis/src/mq.test.ts +++ b/packages/redis/src/mq.test.ts @@ -1,111 +1,24 @@ import { test } from "@fedify/fixture"; import { RedisMessageQueue } from "@fedify/redis/mq"; -import * as temporal from "@js-temporal/polyfill"; -import { delay } from "@std/async/delay"; -import { Redis } from "ioredis"; -import assert from "node:assert/strict"; +import { getRandomKey, testMessageQueue } from "@fedify/testing"; import process from "node:process"; -let Temporal: typeof temporal.Temporal; -if ("Temporal" in globalThis) { - Temporal = globalThis.Temporal; -} else { - Temporal = temporal.Temporal; -} - -const redisUrl = process.env.REDIS_URL; - -test("RedisMessageQueue", { ignore: redisUrl == null }, async () => { - const channelKey = `fedify_test_channel_${crypto.randomUUID()}`; - const queueKey = `fedify_test_queue_${crypto.randomUUID()}`; - const lockKey = `fedify_test_lock_${crypto.randomUUID()}`; - const mq = new RedisMessageQueue(() => new Redis(redisUrl!), { - pollInterval: { seconds: 1 }, - channelKey, - queueKey, - lockKey, - }); - const mq2 = new RedisMessageQueue(() => new Redis(redisUrl!), { - pollInterval: { seconds: 1 }, - channelKey, - queueKey, - lockKey, - }); - - const messages: (string | number)[] = []; - const controller = new AbortController(); - const listening = mq.listen((message: string | number) => { - messages.push(message); - }, controller); - const listening2 = mq2.listen((message: string | number) => { - messages.push(message); - }, controller); - - try { - // enqueue() - await mq.enqueue("Hello, world!"); - - await waitFor(() => messages.length > 0, 15_000); - - // listen() - assert.deepStrictEqual(messages, ["Hello, world!"]); - - // enqueue() with delay - let started = 0; - started = Date.now(); - await mq.enqueue( - "Delayed message", - { delay: Temporal.Duration.from({ seconds: 3 }) }, - ); - - await waitFor(() => messages.length > 1, 15_000); - - // listen() with delay - assert.deepStrictEqual(messages, ["Hello, world!", "Delayed message"]); - assert.ok(Date.now() - started > 3_000); - - // enqueue() [bulk] - for (let i = 0; i < 1_000; i++) await mq.enqueue(i); - - await waitFor(() => messages.length > 1_001, 30_000); - - // listen() [bulk] - const numbers: Set = new Set(); - for (let i = 0; i < 1_000; i++) numbers.add(i); - assert.deepStrictEqual(new Set(messages.slice(2)), numbers); - - // Reset messages array for the next test: - while (messages.length > 0) messages.pop(); - - // enqueueMany() - const bulkMessages = Array.from({ length: 500 }, (_, i) => `bulk-${i}`); - await mq.enqueueMany(bulkMessages); - - await waitFor(() => messages.length >= 500, 30_000); - - // listen() after enqueueMany() - const expectedMessages = new Set( - Array.from({ length: 500 }, (_, i) => `bulk-${i}`), - ); - assert.deepStrictEqual(new Set(messages), expectedMessages); - } finally { - controller.abort(); - await listening; - await listening2; - mq[Symbol.dispose](); - mq2[Symbol.dispose](); - } -}); +import { Redis } from "ioredis"; -async function waitFor( - predicate: () => boolean, - timeoutMs: number, -): Promise { - const started = Date.now(); - while (!predicate()) { - await delay(500); - if (Date.now() - started > timeoutMs) { - throw new Error("Timeout"); - } - } -} +const dbUrl = process.env.REDIS_URL; + +test("RedisMessageQueue", { ignore: dbUrl == null }, () => + testMessageQueue( + () => + new RedisMessageQueue(() => new Redis(dbUrl!), { + pollInterval: { seconds: 1 }, + channelKey: getRandomKey("channel"), + queueKey: getRandomKey("queue"), + lockKey: getRandomKey("lock"), + }), + ({ mq1, mq2, controller }) => { + controller.abort(); + mq1[Symbol.dispose](); + mq2[Symbol.dispose](); + }, + )); diff --git a/packages/sqlite/deno.json b/packages/sqlite/deno.json index 83f9a6f3f..837c689db 100644 --- a/packages/sqlite/deno.json +++ b/packages/sqlite/deno.json @@ -4,7 +4,8 @@ "license": "MIT", "exports": { ".": "./src/mod.ts", - "./kv": "./src/kv.ts" + "./kv": "./src/kv.ts", + "./mq": "./src/mq.ts" }, "imports": { "#sqlite": "./src/sqlite.node.ts" @@ -22,6 +23,6 @@ }, "tasks": { "check": "deno fmt --check && deno lint && deno check", - "test": "deno test --allow-net --allow-env --doc --no-check=leaks" + "test": "deno test --allow-net --allow-env --allow-read --allow-write --doc --no-check=leaks" } } diff --git a/packages/sqlite/package.json b/packages/sqlite/package.json index dc6660961..da7e927b4 100644 --- a/packages/sqlite/package.json +++ b/packages/sqlite/package.json @@ -46,6 +46,16 @@ "require": "./dist/kv.cjs", "default": "./dist/kv.js" }, + "./mq": { + "types": { + "import": "./dist/mq.d.ts", + "require": "./dist/mq.d.cts", + "default": "./dist/mq.d.ts" + }, + "import": "./dist/mq.js", + "require": "./dist/mq.cjs", + "default": "./dist/mq.js" + }, "./package.json": "./package.json" }, "imports": { @@ -65,6 +75,7 @@ "@fedify/fedify": "workspace:^" }, "devDependencies": { + "@fedify/testing": "workspace:^", "@std/async": "catalog:", "tsdown": "catalog:", "typescript": "catalog:" diff --git a/packages/sqlite/src/mod.ts b/packages/sqlite/src/mod.ts index d43125f71..5794ec2b3 100644 --- a/packages/sqlite/src/mod.ts +++ b/packages/sqlite/src/mod.ts @@ -3,3 +3,4 @@ * @module */ export { SqliteKvStore } from "./kv.ts"; +export { SqliteMessageQueue } from "./mq.ts"; diff --git a/packages/sqlite/src/mq.test.ts b/packages/sqlite/src/mq.test.ts new file mode 100644 index 000000000..b14718401 --- /dev/null +++ b/packages/sqlite/src/mq.test.ts @@ -0,0 +1,23 @@ +import { PlatformDatabase } from "#sqlite"; +import { test } from "@fedify/fixture"; +import { SqliteMessageQueue } from "@fedify/sqlite/mq"; +import { getRandomKey, testMessageQueue } from "@fedify/testing"; +import { mkdtemp } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +const dbDir = await mkdtemp(join(tmpdir(), "fedify-sqlite-")); +const dbPath = join(dbDir, `${getRandomKey("sqlite")}.db`); +const db = new PlatformDatabase(dbPath); +const tableName = getRandomKey("message").replaceAll("-", "_"); + +test("SqliteMessageQueue", () => + testMessageQueue( + () => new SqliteMessageQueue(db, { tableName }), + ({ mq1, mq2, controller }) => { + controller.abort(); + mq1.drop(); + mq2.drop(); + mq1[Symbol.dispose](); + }, + )); diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts new file mode 100644 index 000000000..137e031ca --- /dev/null +++ b/packages/sqlite/src/mq.ts @@ -0,0 +1,361 @@ +import { type PlatformDatabase, SqliteDatabase } from "#sqlite"; +import type { + MessageQueue, + MessageQueueEnqueueOptions, + MessageQueueListenOptions, +} from "@fedify/fedify"; +import { getLogger } from "@logtape/logtape"; +import type { SqliteDatabaseAdapter } from "./adapter.ts"; + +const logger = getLogger(["fedify", "sqlite", "mq"]); + +class EnqueueEvent extends Event { + readonly delayMs: number; + constructor(delayMs: number) { + super("enqueue"); + this.delayMs = delayMs; + } +} + +/** + * Options for the SQLite message queue. + */ +export interface SqliteMessageQueueOptions { + /** + * The table name to use for the message queue. + * Only letters, digits, and underscores are allowed. + * `"fedify_message"` by default. + * @default `"fedify_message"` + */ + tableName?: string; + + /** + * Whether the table has been initialized. `false` by default. + * @default `false` + */ + initialized?: boolean; + + /** + * The poll interval for the message queue. + * @default `{ seconds: 5 }` + */ + pollInterval?: Temporal.Duration | Temporal.DurationLike; +} + +/** + * A message queue that uses SQLite as the underlying storage. + * + * This implementation is designed for single-node deployments and uses + * polling to check for new messages. It is not suitable for high-throughput + * scenarios or distributed environments. + * + * @example + * ```ts ignore + * import { createFederation } from "@fedify/fedify"; + * import { SqliteMessageQueue } from "@fedify/sqlite"; + * import { DatabaseSync } from "node:sqlite"; + * + * const db = new DatabaseSync(":memory:"); + * const federation = createFederation({ + * // ... + * queue: new SqliteMessageQueue(db), + * }); + * ``` + */ +export class SqliteMessageQueue implements MessageQueue, Disposable { + static readonly #defaultTableName = "fedify_message"; + static readonly #tableNameRegex = /^[A-Za-z_][A-Za-z0-9_]{0,63}$/; + // In-memory event emitter for notifying listeners when messages are enqueued. + // Scoped per table name to allow multiple queues to coexist. + static readonly #notifyChannels = new Map(); + + static #getNotifyChannel(tableName: string): EventTarget { + let channel = SqliteMessageQueue.#notifyChannels.get(tableName); + if (channel == null) { + channel = new EventTarget(); + SqliteMessageQueue.#notifyChannels.set(tableName, channel); + } + return channel; + } + + readonly #db: SqliteDatabaseAdapter; + readonly #tableName: string; + readonly #pollIntervalMs: number; + #initialized: boolean; + + /** + * SQLite message queue does not provide native retry mechanisms. + */ + readonly nativeRetrial = false; + + /** + * Creates a new SQLite message queue. + * @param db The SQLite database to use. Supports `node:sqlite`, `bun:sqlite`. + * @param options The options for the message queue. + */ + constructor( + readonly db: PlatformDatabase, + readonly options: SqliteMessageQueueOptions = {}, + ) { + this.#db = new SqliteDatabase(db); + this.#initialized = options.initialized ?? false; + this.#tableName = options.tableName ?? SqliteMessageQueue.#defaultTableName; + this.#pollIntervalMs = Temporal.Duration.from( + options.pollInterval ?? { seconds: 5 }, + ).total("millisecond"); + + if (!SqliteMessageQueue.#tableNameRegex.test(this.#tableName)) { + throw new Error( + `Invalid table name for the message queue: ${this.#tableName}`, + ); + } + } + + /** + * {@inheritDoc MessageQueue.enqueue} + */ + enqueue( + // deno-lint-ignore no-explicit-any + message: any, + options?: MessageQueueEnqueueOptions, + ): Promise { + this.initialize(); + + const id = crypto.randomUUID(); + const encodedMessage = this.#encodeMessage(message); + const now = Temporal.Now.instant().epochMilliseconds; + const delay = options?.delay ?? Temporal.Duration.from({ seconds: 0 }); + const scheduled = now + delay.total({ unit: "milliseconds" }); + + if (options?.delay) { + logger.debug("Enqueuing a message with a delay of {delay}...", { + delay, + message, + }); + } else { + logger.debug("Enqueuing a message...", { message }); + } + + this.#db + .prepare( + `INSERT INTO "${this.#tableName}" (id, message, created, scheduled) + VALUES (?, ?, ?, ?)`, + ) + .run(id, encodedMessage, now, scheduled); + + logger.debug("Enqueued a message.", { message }); + + // Notify listeners that a message has been enqueued + const delayMs = delay.total("millisecond"); + SqliteMessageQueue.#getNotifyChannel(this.#tableName).dispatchEvent( + new EnqueueEvent(delayMs), + ); + + return Promise.resolve(); + } + + /** + * {@inheritDoc MessageQueue.enqueueMany} + */ + enqueueMany( + // deno-lint-ignore no-explicit-any + messages: readonly any[], + options?: MessageQueueEnqueueOptions, + ): Promise { + if (messages.length === 0) return Promise.resolve(); + + this.initialize(); + + const now = Temporal.Now.instant().epochMilliseconds; + const delay = options?.delay ?? Temporal.Duration.from({ seconds: 0 }); + const scheduled = now + delay.total({ unit: "milliseconds" }); + + if (options?.delay) { + logger.debug("Enqueuing messages with a delay of {delay}...", { + delay, + messages, + }); + } else { + logger.debug("Enqueuing messages...", { messages }); + } + + try { + this.#db.exec("BEGIN IMMEDIATE"); + + const stmt = this.#db.prepare( + `INSERT INTO "${this.#tableName}" (id, message, created, scheduled) + VALUES (?, ?, ?, ?)`, + ); + + for (const message of messages) { + const id = crypto.randomUUID(); + const encodedMessage = this.#encodeMessage(message); + stmt.run(id, encodedMessage, now, scheduled); + } + + this.#db.exec("COMMIT"); + logger.debug("Enqueued messages.", { messages }); + + // Notify listeners that messages have been enqueued + const delayMs = delay.total("millisecond"); + SqliteMessageQueue.#getNotifyChannel(this.#tableName).dispatchEvent( + new EnqueueEvent(delayMs), + ); + } catch (error) { + this.#db.exec("ROLLBACK"); + throw error; + } + return Promise.resolve(); + } + + /** + * {@inheritDoc MessageQueue.listen} + */ + async listen( + // deno-lint-ignore no-explicit-any + handler: (message: any) => Promise | void, + options?: MessageQueueListenOptions, + ): Promise { + this.initialize(); + + const { signal } = options ?? {}; + logger.debug( + "Starting to listen for messages on table {tableName}...", + { tableName: this.#tableName }, + ); + + const channel = SqliteMessageQueue.#getNotifyChannel(this.#tableName); + const timeouts = new Set>(); + + const poll = async () => { + while (signal == null || !signal.aborted) { + const now = Temporal.Now.instant().epochMilliseconds; + + // Atomically fetch and delete the oldest message that is ready to be + // processed using DELETE ... RETURNING (SQLite >= 3.35.0) + const result = this.#db + .prepare( + `DELETE FROM "${this.#tableName}" + WHERE id = ( + SELECT id FROM "${this.#tableName}" + WHERE scheduled <= ? + ORDER BY scheduled + LIMIT 1 + ) + RETURNING id, message`, + ) + .get(now) as { id: string; message: string } | undefined; + + if (result) { + const message = this.#decodeMessage(result.message); + logger.debug("Processing message {id}...", { + id: result.id, + message, + }); + await handler(message); + logger.debug("Processed message {id}.", { id: result.id }); + + // Check for next message immediately + continue; + } + + // No more messages ready to process + break; + } + }; + + const onEnqueue = (event: Event) => { + const delayMs = (event as EnqueueEvent).delayMs; + if (delayMs < 1) { + poll(); + } else { + timeouts.add(setTimeout(poll, delayMs)); + } + }; + + channel.addEventListener("enqueue", onEnqueue); + signal?.addEventListener("abort", () => { + channel.removeEventListener("enqueue", onEnqueue); + for (const timeout of timeouts) clearTimeout(timeout); + }); + + // Initial poll + await poll(); + + // Periodic polling as fallback + while (signal == null || !signal.aborted) { + let timeout: ReturnType | undefined; + await new Promise((resolve) => { + signal?.addEventListener("abort", resolve); + timeout = setTimeout(() => { + signal?.removeEventListener("abort", resolve); + resolve(0); + }, this.#pollIntervalMs); + timeouts.add(timeout); + }); + if (timeout != null) timeouts.delete(timeout); + await poll(); + } + + logger.debug("Stopped listening for messages on table {tableName}.", { + tableName: this.#tableName, + }); + } + + /** + * Creates the message queue table if it does not already exist. + * Does nothing if the table already exists. + */ + initialize(): void { + if (this.#initialized) { + return; + } + + logger.debug("Initializing the message queue table {tableName}...", { + tableName: this.#tableName, + }); + + this.#db.exec(` + CREATE TABLE IF NOT EXISTS "${this.#tableName}" ( + id TEXT PRIMARY KEY, + message TEXT NOT NULL, + created INTEGER NOT NULL, + scheduled INTEGER NOT NULL + ) + `); + + this.#db.exec(` + CREATE INDEX IF NOT EXISTS "idx_${this.#tableName}_scheduled" + ON "${this.#tableName}" (scheduled) + `); + + this.#initialized = true; + logger.debug("Initialized the message queue table {tableName}.", { + tableName: this.#tableName, + }); + } + + /** + * Drops the table used by the message queue. Does nothing if the table + * does not exist. + */ + drop(): void { + this.#db.exec(`DROP TABLE IF EXISTS "${this.#tableName}"`); + this.#initialized = false; + } + + /** + * Closes the database connection. + */ + [Symbol.dispose](): void { + this.#db.close(); + } + + #encodeMessage(message: unknown): string { + return JSON.stringify(message); + } + + #decodeMessage(message: string): unknown { + return JSON.parse(message); + } +} diff --git a/packages/sqlite/tsdown.config.ts b/packages/sqlite/tsdown.config.ts index 978459047..7f006fdf9 100644 --- a/packages/sqlite/tsdown.config.ts +++ b/packages/sqlite/tsdown.config.ts @@ -1,7 +1,13 @@ import { defineConfig } from "tsdown"; export default defineConfig({ - entry: ["src/mod.ts", "src/kv.ts", "src/sqlite.node.ts", "src/sqlite.bun.ts"], + entry: [ + "src/mod.ts", + "src/kv.ts", + "src/mq.ts", + "src/sqlite.node.ts", + "src/sqlite.bun.ts", + ], dts: true, unbundle: true, format: ["esm", "cjs"], diff --git a/packages/testing/deno.json b/packages/testing/deno.json index dc5fcb7e7..b618dcd1d 100644 --- a/packages/testing/deno.json +++ b/packages/testing/deno.json @@ -12,7 +12,9 @@ "pnpm-lock.yaml" ], "publish": { - "exclude": ["**/*.test.ts"] + "exclude": [ + "**/*.test.ts" + ] }, "tasks": { "build": "pnpm build", diff --git a/packages/testing/package.json b/packages/testing/package.json index be59f806f..b4ffe09d7 100644 --- a/packages/testing/package.json +++ b/packages/testing/package.json @@ -52,7 +52,11 @@ "peerDependencies": { "@fedify/fedify": "workspace:^" }, + "dependencies": { + "es-toolkit": "catalog:" + }, "devDependencies": { + "@fedify/fixture": "workspace:^", "@js-temporal/polyfill": "catalog:", "@std/assert": "catalog:", "@std/async": "catalog:", diff --git a/packages/testing/src/mod.ts b/packages/testing/src/mod.ts index eeef79f3c..dcd9a917f 100644 --- a/packages/testing/src/mod.ts +++ b/packages/testing/src/mod.ts @@ -29,3 +29,8 @@ export { createInboxContext, createRequestContext, } from "./mock.ts"; +export { + default as testMessageQueue, + getRandomKey, + waitFor, +} from "./mq-tester.ts"; diff --git a/packages/testing/src/mq-tester.ts b/packages/testing/src/mq-tester.ts new file mode 100644 index 000000000..2c4434a98 --- /dev/null +++ b/packages/testing/src/mq-tester.ts @@ -0,0 +1,153 @@ +import type { MessageQueue } from "@fedify/fedify"; +import { delay } from "es-toolkit"; +import { deepStrictEqual, ok } from "node:assert/strict"; + +/** + * Tests a {@link MessageQueue} implementation with a standard set of tests. + * + * This function runs tests for: + * - `enqueue()`: Basic message enqueueing + * - `enqueue()` with delay: Delayed message enqueueing + * - `enqueueMany()`: Bulk message enqueueing + * - `enqueueMany()` with delay: Delayed bulk message enqueueing + * - Multiple listeners: Ensures messages are processed by only one listener + * + * @example + * ```typescript ignore + * import { test } from "@fedify/fixture"; + * import { testMessageQueue } from "@fedify/testing"; + * import { MyMessageQueue } from "./my-mq.ts"; + * + * test("MyMessageQueue", () => + * testMessageQueue( + * () => new MyMessageQueue(), + * async ({ mq1, mq2, controller }) => { + * controller.abort(); + * await mq1.close(); + * await mq2.close(); + * }, + * ) + * ); + * ``` + * + * @param getMessageQueue A factory function that creates a new message queue + * instance. It should return a new instance each time + * to ensure test isolation, but both instances should + * share the same underlying storage/channel. + * @param onFinally A cleanup function called after all tests complete. + * It receives both message queue instances and the abort + * controller used for the listeners. + * @returns A promise that resolves when all tests pass. + */ +export default async function testMessageQueue< + MQ extends MessageQueue, +>( + getMessageQueue: () => MQ | Promise, + onFinally: ({ + mq1, + mq2, + controller, + }: { + mq1: MQ; + mq2: MQ; + controller: AbortController; + }) => Promise | void, +): Promise { + const mq1 = await getMessageQueue(); + const mq2 = await getMessageQueue(); + const controller = new AbortController(); + try { + // Set up message collection and listeners + const messages: string[] = []; + const listening1 = mq1.listen((message: string) => { + messages.push(message); + }, controller); + const listening2 = mq2.listen((message: string) => { + messages.push(message); + }, controller); + + // Test: enqueue() + await mq1.enqueue("Hello, world!"); + await waitFor(() => messages.length > 0, 15_000); + deepStrictEqual(messages, ["Hello, world!"]); + + let started = Date.now(); + await mq1.enqueue( + "Delayed message", + { delay: Temporal.Duration.from({ seconds: 3 }) }, + ); + await waitFor(() => messages.length > 1, 15_000); + deepStrictEqual(messages, ["Hello, world!", "Delayed message"]); + ok( + Date.now() - started >= 3_000, + "Delayed message should be delivered after at least 3 seconds", + ); + + // Test: enqueueMany() (skip if not supported) + if (mq1.enqueueMany != null) { + while (messages.length > 0) messages.pop(); + const batchMessages: string[] = [ + "First batch message", + "Second batch message", + "Third batch message", + ]; + await mq1.enqueueMany(batchMessages); + await waitFor(() => messages.length >= batchMessages.length, 15_000); + deepStrictEqual(new Set(messages), new Set(batchMessages)); + + // Test: enqueueMany() with delay + while (messages.length > 0) messages.pop(); + started = Date.now(); + const delayedBatchMessages: string[] = [ + "Delayed batch 1", + "Delayed batch 2", + ]; + await mq1.enqueueMany( + delayedBatchMessages, + { delay: Temporal.Duration.from({ seconds: 2 }) }, + ); + await waitFor( + () => messages.length >= delayedBatchMessages.length, + 15_000, + ); + deepStrictEqual(new Set(messages), new Set(delayedBatchMessages)); + ok( + Date.now() - started >= 2_000, + "Delayed batch messages should be delivered after at least 2 seconds", + ); + } + + // Test: bulk enqueue (stress test) + while (messages.length > 0) messages.pop(); + const bulkCount = 100; + for (let i = 0; i < bulkCount; i++) await mq1.enqueue(`message-${i}`); + await waitFor(() => messages.length >= bulkCount, 30_000); + const expectedMessages = new Set( + Array.from({ length: bulkCount }, (_, i) => `message-${i}`), + ); + deepStrictEqual(new Set(messages), expectedMessages); + + // Cleanup listeners + controller.abort(); + await listening1; + await listening2; + } finally { + await onFinally({ mq1, mq2, controller }); + } +} + +export async function waitFor( + predicate: () => boolean, + timeoutMs: number, +): Promise { + const started = Date.now(); + while (!predicate()) { + await delay(500); + if (Date.now() - started > timeoutMs) { + throw new Error("Timeout"); + } + } +} + +export const getRandomKey = (prefix: string): string => + `fedify_test_${prefix}_${crypto.randomUUID()}`; diff --git a/packages/testing/tsdown.config.ts b/packages/testing/tsdown.config.ts index 516eb89d6..b96b3814e 100644 --- a/packages/testing/tsdown.config.ts +++ b/packages/testing/tsdown.config.ts @@ -12,5 +12,18 @@ export default defineConfig({ "@fedify/fedify/utils", "@fedify/vocab", "@fedify/fedify/webfinger", + "@fedify/fixture", ], + outputOptions(outputOptions, format) { + if (format === "cjs") { + outputOptions.intro = ` + const { Temporal } = require("@js-temporal/polyfill"); + `; + } else { + outputOptions.intro = ` + import { Temporal } from "@js-temporal/polyfill"; + `; + } + return outputOptions; + }, }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b3660c1c9..7c84413a7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -668,6 +668,9 @@ importers: '@alinea/suite': specifier: ^0.6.3 version: 0.6.3 + '@fedify/testing': + specifier: workspace:^ + version: link:../testing '@js-temporal/polyfill': specifier: 'catalog:' version: 0.5.1 @@ -732,6 +735,9 @@ importers: '@fedify/vocab-tools': specifier: workspace:* version: link:../vocab-tools + '@fedify/webfinger': + specifier: workspace:* + version: link:../webfinger '@fxts/core': specifier: 'catalog:' version: 1.20.0 @@ -1150,6 +1156,12 @@ importers: specifier: 'catalog:' version: 3.4.7 devDependencies: + '@fedify/fixture': + specifier: workspace:^ + version: link:../fixture + '@fedify/testing': + specifier: workspace:^ + version: link:../testing '@std/async': specifier: 'catalog:' version: '@jsr/std__async@1.0.13' @@ -1178,6 +1190,9 @@ importers: '@fedify/fixture': specifier: workspace:^ version: link:../fixture + '@fedify/testing': + specifier: workspace:^ + version: link:../testing '@std/async': specifier: 'catalog:' version: '@jsr/std__async@1.0.13' @@ -1234,6 +1249,9 @@ importers: specifier: ^1.31.0 version: 1.39.5 devDependencies: + '@fedify/testing': + specifier: workspace:^ + version: link:../testing '@std/async': specifier: 'catalog:' version: '@jsr/std__async@1.0.13' @@ -1265,7 +1283,13 @@ importers: '@fedify/fedify': specifier: workspace:^ version: link:../fedify + es-toolkit: + specifier: 'catalog:' + version: 1.43.0 devDependencies: + '@fedify/fixture': + specifier: workspace:^ + version: link:../fixture '@js-temporal/polyfill': specifier: 'catalog:' version: 0.5.1