Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
- 6379:6379
env:
AMQP_URL: amqp://guest:guest@localhost:5672
DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres
POSTGRES_URL: postgres://postgres:postgres@localhost:5432/postgres
REDIS_URL: redis://localhost:6379
steps:
- uses: actions/checkout@v4
Expand Down Expand Up @@ -116,7 +116,7 @@ jobs:
- 6379:6379
env:
AMQP_URL: amqp://guest:guest@localhost:5672
DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres
POSTGRES_URL: postgres://postgres:postgres@localhost:5432/postgres
REDIS_URL: redis://localhost:6379
steps:
- uses: actions/checkout@v4
Expand Down Expand Up @@ -157,7 +157,7 @@ jobs:
- 6379:6379
env:
AMQP_URL: amqp://guest:guest@localhost:5672
DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres
POSTGRES_URL: postgres://postgres:postgres@localhost:5432/postgres
REDIS_URL: redis://localhost:6379
steps:
- uses: actions/checkout@v4
Expand Down
26 changes: 26 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,32 @@ To be released.

[#437]: https://github.com/fedify-dev/fedify/issues/437

### @fedify/sqlite

- Added `SqliteMessageQueue` class implementing `MessageQueue` interface
using SQLite as the backing store. This implementation uses polling to
check for new messages and is suitable for single-node deployments and
development environments. [[#477], [#526] by ChanHaeng Lee]

- Added `SqliteMessageQueue` class.
- Added `SqliteMessageQueueOptions` interface.

[#477]: https://github.com/fedify-dev/fedify/issues/477
[#526]: https://github.com/fedify-dev/fedify/pull/526

### @fedify/testing

- Added `testMessageQueue()` utility function for standardized testing of
`MessageQueue` implementations. This function provides a reusable test
harness that covers common message queue operations including `enqueue()`,
`enqueue()` with delay, `enqueueMany()`, and multiple listener scenarios.
[[#477], [#526] by ChanHaeng Lee]

- Added `testMessageQueue()` function.
- Added `TestMessageQueueOptions` interface.
- Added `waitFor()` helper function.
- Added `getRandomKey()` helper function.


Version 1.10.0
--------------
Expand Down
112 changes: 112 additions & 0 deletions docs/manual/mq.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,115 @@ const federation = createFederation({
[`AmqpMessageQueue`]: https://jsr.io/@fedify/amqp/doc/mq/~/AmqpMessageQueue
[RabbitMQ]: https://www.rabbitmq.com/

### `SqliteMessageQueue`

*This API is available since Fedify 2.0.0.*

To use [`SqliteMessageQueue`], you need to install the *@fedify/sqlite* package
first:

::: code-group

~~~~ bash [Deno]
deno add jsr:@fedify/sqlite
~~~~

~~~~ bash [npm]
npm add @fedify/sqlite
~~~~

~~~~ bash [pnpm]
pnpm add @fedify/sqlite
~~~~

~~~~ bash [Yarn]
yarn add @fedify/sqlite
~~~~

~~~~ bash [Bun]
bun add @fedify/sqlite
~~~~

:::

[`SqliteMessageQueue`] is a message queue implementation that uses SQLite as
the backend. It uses polling to check for new messages and is designed for
single-node deployments. It's suitable for development, testing, and
small-scale production use where simplicity is preferred over high throughput.
It uses native sqlite modules, [`node:sqlite`] for Node.js and Deno,
[`bun:sqlite`] for Bun.

Best for
: Development and testing.

Pros
: Simple, persistent with minimal configuration.

Cons
: Limited scalability, not suitable for high-traffic production.

> [!NOTE]
> `SqliteMessageQueue` uses `DELETE ... RETURNING` to atomically fetch and
> delete the oldest message that is ready to be processed. This requires
> SQLite 3.35.0 or later.

::: code-group

~~~~ typescript twoslash [Deno]
import type { KvStore } from "@fedify/fedify";
// ---cut-before---
import { DatabaseSync } from "node:sqlite";
import { createFederation } from "@fedify/fedify";
import { SqliteMessageQueue } from "@fedify/sqlite";

const db = new DatabaseSync(":memory:");
const federation = createFederation<void>({
// ...
// ---cut-start---
kv: null as unknown as KvStore,
// ---cut-end---
queue: new SqliteMessageQueue(db), // [!code highlight]
});
~~~~

~~~~ typescript twoslash [Node.js]
import type { KvStore } from "@fedify/fedify";
// ---cut-before---
import { DatabaseSync } from "node:sqlite";
import { createFederation } from "@fedify/fedify";
import { SqliteMessageQueue } from "@fedify/sqlite";

const db = new DatabaseSync(":memory:");
const federation = createFederation<void>({
// ...
// ---cut-start---
kv: null as unknown as KvStore,
// ---cut-end---
queue: new SqliteMessageQueue(db), // [!code highlight]
});
~~~~

~~~~ typescript [Bun]
import type { KvStore } from "@fedify/fedify";
// ---cut-before---
import { Database } from "bun:sqlite";
import { createFederation } from "@fedify/fedify";
import { SqliteMessageQueue } from "@fedify/sqlite";

const db = new Database(":memory:");
const federation = createFederation<void>({
// ...
// ---cut-start---
kv: null as unknown as KvStore,
// ---cut-end---
queue: new SqliteMessageQueue(db), // [!code highlight]
});
~~~~

:::

[`SqliteMessageQueue`]: https://jsr.io/@fedify/sqlite/doc/mq/~/SqliteMessageQueue

### `WorkersMessageQueue` (Cloudflare Workers only)

*This API is available since Fedify 1.6.0.*
Expand Down Expand Up @@ -659,6 +768,9 @@ The following implementations do not yet support native retry:
[`AmqpMessageQueue`]
: Native retry support planned for future release.

[`SqliteMessageQueue`]
: No native retry support (`~MessageQueue.nativeRetrial` is `false`).

`ParallelMessageQueue` inherits the `~MessageQueue.nativeRetrial` value from
the wrapped queue.

Expand Down
1 change: 1 addition & 0 deletions packages/amqp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
},
"devDependencies": {
"@alinea/suite": "^0.6.3",
"@fedify/testing": "workspace:^",
"@js-temporal/polyfill": "catalog:",
"@std/assert": "catalog:",
"@std/async": "catalog:",
Expand Down
128 changes: 27 additions & 101 deletions packages/amqp/src/mq.test.ts
Original file line number Diff line number Diff line change
@@ -1,106 +1,45 @@
import { suite } from "@alinea/suite";
import { AmqpMessageQueue } from "@fedify/amqp/mq";
import * as temporal from "@js-temporal/polyfill";
import { getRandomKey, testMessageQueue, waitFor } from "@fedify/testing";
import { assert, assertEquals, assertFalse, assertGreater } from "@std/assert";
import { delay } from "@std/async/delay";
// @deno-types="npm:@types/amqplib"
import { type ChannelModel, connect } from "amqplib";
import process from "node:process";

let Temporal: typeof temporal.Temporal;
if ("Temporal" in globalThis) {
Temporal = globalThis.Temporal;
} else {
Temporal = temporal.Temporal;
}

const AMQP_URL = process.env.AMQP_URL;
const test = AMQP_URL ? suite(import.meta) : suite(import.meta).skip;

function getConnection(): Promise<ChannelModel> {
return connect(AMQP_URL!);
}

test("AmqpMessageQueue", {
sanitizeOps: false,
sanitizeExit: false,
sanitizeResources: false,
}, async () => {
const conn = await getConnection();
const conn2 = await getConnection();
const randomSuffix = Math.random().toString(36).substring(2);
const queue = `fedify_queue_${randomSuffix}`;
const delayedQueuePrefix = `fedify_delayed_${randomSuffix}_`;
const mq = new AmqpMessageQueue(conn, { queue, delayedQueuePrefix });
const mq2 = new AmqpMessageQueue(conn2, { queue, delayedQueuePrefix });

const messages1: string[] = [];
const messages2: string[] = [];
const allMessages: string[] = [];
const controller = new AbortController();
const listening = mq.listen((message: string) => {
messages1.push(message);
allMessages.push(message);
}, { signal: controller.signal });
const listening2 = mq2.listen((message: string) => {
messages2.push(message);
allMessages.push(message);
}, { signal: controller.signal });

await mq.enqueue("Hello, world!");

await waitFor(() => allMessages.length > 0, 15_000);

assertEquals(allMessages.includes("Hello, world!"), true);

// enqueue() with delay
const started = Date.now();
await mq.enqueue(
"Delayed message",
{ delay: Temporal.Duration.from({ seconds: 3 }) },
);

await waitFor(() => allMessages.includes("Delayed message"), 15_000);

// listen() with delay
assertEquals(allMessages.includes("Delayed message"), true);
assertGreater(Date.now() - started, 3_000);

await mq.enqueueMany(["Message 1", "Message 2", "Message 3"]);

await waitFor(() =>
allMessages.includes("Message 1") &&
allMessages.includes("Message 2") &&
allMessages.includes("Message 3"), 15_000);

// listen() after enqueueMany()
assertEquals(allMessages.includes("Message 1"), true);
assertEquals(allMessages.includes("Message 2"), true);
assertEquals(allMessages.includes("Message 3"), true);

// enqueueMany() with delay
const manyStarted = Date.now();
await mq.enqueueMany(
["Delayed batch 1", "Delayed batch 2"],
{ delay: Temporal.Duration.from({ seconds: 3 }) },
);

await waitFor(() =>
allMessages.includes("Delayed batch 1") &&
allMessages.includes("Delayed batch 2"), 15_000);

// listen() after enqueueMany() with delay
assertEquals(allMessages.includes("Delayed batch 1"), true);
assertEquals(allMessages.includes("Delayed batch 2"), true);
assertGreater(Date.now() - manyStarted, 3_000);

controller.abort();
await listening;
await listening2;

await conn.close();
await conn2.close();
});
const connections: ChannelModel[] = [];
const queue = getRandomKey("queue");
const delayedQueuePrefix = getRandomKey("delayed") + "_";

testMessageQueue(
"AmqpMessageQueue",
async () => {
const conn = await getConnection();
connections.push(conn);
return new AmqpMessageQueue(conn, { queue, delayedQueuePrefix });
},
async ({ controller }) => {
controller.abort();
for (const conn of connections) {
await conn.close();
}
},
{
test: (name, fn) =>
test(name, {
sanitizeOps: false,
sanitizeExit: false,
sanitizeResources: false,
}, fn),
},
);

test(
"AmqpMessageQueue [nativeRetrial: false]",
Expand Down Expand Up @@ -171,16 +110,3 @@ test(
assertGreater(i, 1);
},
);

async function waitFor(
predicate: () => boolean,
timeoutMs: number,
): Promise<void> {
const started = Date.now();
while (!predicate()) {
await delay(500);
if (Date.now() - started > timeoutMs) {
throw new Error("Timeout");
}
}
}
1 change: 1 addition & 0 deletions packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"@fedify/vocab": "workspace:*",
"@fedify/vocab-runtime": "workspace:*",
"@fedify/vocab-tools": "workspace:*",
"@fedify/webfinger": "workspace:*",
"@fxts/core": "catalog:",
"@hongminhee/localtunnel": "^0.3.0",
"@inquirer/prompts": "^7.8.4",
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/init/json/kv.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
"@fedify/postgres": { "PostgresKvStore": "PostgresKvStore" },
"postgres": { "default": "postgres" }
},
"object": "new PostgresKvStore(postgres(process.env.DATABASE_URL))",
"object": "new PostgresKvStore(postgres(process.env.POSTGRES_URL))",
"env": {
"DATABASE_URL": "postgres://postgres@localhost:5432/postgres"
"POSTGRES_URL": "postgres://postgres@localhost:5432/postgres"
}
},
"denokv": {
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/init/json/mq.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
"default": "postgres"
}
},
"object": "new PostgresMessageQueue(postgres(process.env.DATABASE_URL))",
"object": "new PostgresMessageQueue(postgres(process.env.POSTGRES_URL))",
"env": {
"DATABASE_URL": "postgres://postgres@localhost:5432/postgres"
"POSTGRES_URL": "postgres://postgres@localhost:5432/postgres"
}
},
"amqp": {
Expand Down
Loading
Loading