diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 66698e3..e9fd972 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -27,7 +27,7 @@ jobs: - uses: actions/checkout@v5 - uses: actions/setup-node@v6 with: - node-version: 20 # EOL: Apr 30, 2026 + node-version: 22 # (Active LTS + previous LTS) - run: npm ci - run: npm run format - run: npm run lint diff --git a/package-lock.json b/package-lock.json index a0e9c4c..37088b6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -868,6 +868,10 @@ "resolved": "packages/backend-postgres", "link": true }, + "node_modules/@openworkflow/backend-sqlite": { + "resolved": "packages/backend-sqlite", + "link": true + }, "node_modules/@pkgr/core": { "version": "0.2.9", "dev": true, @@ -4081,6 +4085,19 @@ "openworkflow": "^0.3.0" } }, + "packages/backend-sqlite": { + "name": "@openworkflow/backend-sqlite", + "version": "0.1.0", + "devDependencies": { + "openworkflow": "file:../openworkflow" + }, + "engines": { + "node": ">=22.5.0" + }, + "peerDependencies": { + "openworkflow": "^0.2.0" + } + }, "packages/openworkflow": { "version": "0.3.0", "license": "Apache-2.0", diff --git a/packages/backend-sqlite/backend.test.ts b/packages/backend-sqlite/backend.test.ts new file mode 100644 index 0000000..1a783f9 --- /dev/null +++ b/packages/backend-sqlite/backend.test.ts @@ -0,0 +1,983 @@ +import { BackendSqlite } from "./backend.js"; +import { randomUUID } from "node:crypto"; +import { StepAttempt, WorkflowRun } from "openworkflow"; +import { afterAll, beforeAll, describe, expect, test } from "vitest"; + +describe("BackendSqlite", () => { + let backend: BackendSqlite; + + beforeAll(() => { + backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + }); + + afterAll(() => { + backend.stop(); + }); + + describe("createWorkflowRun()", () => { + test("creates a workflow run", async () => { + const expected: WorkflowRun = { + namespaceId: "", + id: "", + workflowName: randomUUID(), + version: randomUUID(), + status: "pending", + idempotencyKey: randomUUID(), + config: { key: "val" }, + context: { key: "val" }, + input: { key: "val" }, + output: null, + error: null, + attempts: 0, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + workerId: null, + availableAt: newDateInOneYear(), + deadlineAt: newDateInOneYear(), + startedAt: null, + finishedAt: null, + createdAt: new Date(), + updatedAt: new Date(), + }; + + // Create with all fields + const created = await backend.createWorkflowRun({ + workflowName: expected.workflowName, + version: expected.version, + idempotencyKey: expected.idempotencyKey, + input: expected.input, + config: expected.config, + context: expected.context, + availableAt: expected.availableAt, + deadlineAt: expected.deadlineAt, + }); + expect(created.namespaceId).toHaveLength(36); + expect(created.id).toHaveLength(36); + expect(deltaSeconds(created.availableAt)).toBeGreaterThan(1); + expect(deltaSeconds(created.createdAt)).toBeLessThan(1); + expect(deltaSeconds(created.updatedAt)).toBeLessThan(1); + + expected.namespaceId = created.namespaceId; + expected.id = created.id; + expected.availableAt = created.availableAt; + expected.createdAt = created.createdAt; + expected.updatedAt = created.updatedAt; + expect(created).toEqual(expected); + + // Create with minimal fields + const createdMin = await backend.createWorkflowRun({ + workflowName: expected.workflowName, + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + expect(createdMin.version).toBeNull(); + expect(createdMin.idempotencyKey).toBeNull(); + expect(createdMin.input).toBeNull(); + expect(createdMin.context).toBeNull(); + expect(deltaSeconds(createdMin.availableAt)).toBeLessThan(1); // defaults to NOW() + expect(createdMin.deadlineAt).toBeNull(); + }); + }); + + describe("claimWorkflowRun()", () => { + // because claims involve timing and leases, we create and teardown a new + // namespaced backend instance for each test + + test("claims workflow runs and respects leases, reclaiming if lease expires", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + await createPendingWorkflowRun(backend); + + const firstLeaseMs = 30; + const firstWorker = randomUUID(); + const claimed = await backend.claimWorkflowRun({ + workerId: firstWorker, + leaseDurationMs: firstLeaseMs, + }); + expect(claimed?.status).toBe("running"); + expect(claimed?.workerId).toBe(firstWorker); + expect(claimed?.attempts).toBe(1); + expect(claimed?.startedAt).not.toBeNull(); + + const secondWorker = randomUUID(); + const blocked = await backend.claimWorkflowRun({ + workerId: secondWorker, + leaseDurationMs: 10, + }); + expect(blocked).toBeNull(); + + sleep(firstLeaseMs); + + const reclaimed = await backend.claimWorkflowRun({ + workerId: secondWorker, + leaseDurationMs: 10, + }); + expect(reclaimed?.id).toBe(claimed?.id); + expect(reclaimed?.attempts).toBe(2); + expect(reclaimed?.workerId).toBe(secondWorker); + expect(reclaimed?.startedAt?.getTime()).toBe( + claimed?.startedAt?.getTime(), + ); + + backend.stop(); + }); + + test("prioritizes pending workflow runs over expired running ones", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + const running = await createPendingWorkflowRun(backend); + const runningClaim = await backend.claimWorkflowRun({ + workerId: "worker-running", + leaseDurationMs: 5, + }); + if (!runningClaim) throw new Error("expected claim"); + expect(runningClaim.id).toBe(running.id); + + sleep(10); // wait for running's lease to expire + + // pending claimed first, even though running expired + const pending = await createPendingWorkflowRun(backend); + const claimedFirst = await backend.claimWorkflowRun({ + workerId: "worker-second", + leaseDurationMs: 100, + }); + expect(claimedFirst?.id).toBe(pending.id); + + // running claimed second + const claimedSecond = await backend.claimWorkflowRun({ + workerId: "worker-third", + leaseDurationMs: 100, + }); + expect(claimedSecond?.id).toBe(running.id); + + backend.stop(); + }); + + test("returns null when no workflow runs are available", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + const claimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 10, + }); + expect(claimed).toBeNull(); + + backend.stop(); + }); + }); + + describe("heartbeatWorkflowRun()", () => { + test("extends the lease for running workflow runs", async () => { + const workerId = randomUUID(); + await createPendingWorkflowRun(backend); + + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 20, + }); + if (!claimed) throw new Error("Expected workflow run to be claimed"); // for type narrowing + + const previousExpiry = claimed.availableAt; + const heartbeated = await backend.heartbeatWorkflowRun({ + workflowRunId: claimed.id, + workerId, + leaseDurationMs: 200, + }); + + expect(heartbeated.availableAt?.getTime()).toBeGreaterThan( + previousExpiry?.getTime() ?? Infinity, + ); + }); + }); + + describe("sleepWorkflowRun()", () => { + test("sets a running workflow to sleeping status until a future time", async () => { + const workerId = randomUUID(); + await createPendingWorkflowRun(backend); + + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 100, + }); + if (!claimed) throw new Error("Expected workflow run to be claimed"); + + const sleepUntil = new Date(Date.now() + 5000); // 5 seconds from now + + await backend.sleepWorkflowRun({ + workflowRunId: claimed.id, + workerId, + availableAt: sleepUntil, + }); + + const fetched = await backend.getWorkflowRun({ + workflowRunId: claimed.id, + }); + + expect(fetched).not.toBeNull(); + expect(fetched?.availableAt?.getTime()).toBe(sleepUntil.getTime()); + expect(fetched?.workerId).toBeNull(); + expect(fetched?.status).toBe("sleeping"); + }); + + test("fails when trying to sleep a canceled workflow", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + // succeeded run + let claimed = await createClaimedWorkflowRun(backend); + await backend.markWorkflowRunSucceeded({ + workflowRunId: claimed.id, + workerId: claimed.workerId ?? "", + output: null, + }); + await expect(() => + backend.sleepWorkflowRun({ + workflowRunId: claimed.id, + workerId: claimed.workerId ?? "", + availableAt: new Date(Date.now() + 60_000), + }), + ).rejects.toThrow("Failed to sleep workflow run"); + + // failed run + claimed = await createClaimedWorkflowRun(backend); + await backend.markWorkflowRunFailed({ + workflowRunId: claimed.id, + workerId: claimed.workerId ?? "", + error: null, + }); + await expect(() => + backend.sleepWorkflowRun({ + workflowRunId: claimed.id, + workerId: claimed.workerId ?? "", + availableAt: new Date(Date.now() + 60_000), + }), + ).rejects.toThrow("Failed to sleep workflow run"); + + // canceled run + claimed = await createClaimedWorkflowRun(backend); + await backend.cancelWorkflowRun({ + workflowRunId: claimed.id, + }); + await expect(() => + backend.sleepWorkflowRun({ + workflowRunId: claimed.id, + workerId: claimed.workerId ?? "", + availableAt: new Date(Date.now() + 60_000), + }), + ).rejects.toThrow("Failed to sleep workflow run"); + + backend.stop(); + }); + }); + + describe("markWorkflowRunSucceeded()", () => { + test("marks running workflow runs as succeeded", async () => { + const workerId = randomUUID(); + await createPendingWorkflowRun(backend); + + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 20, + }); + if (!claimed) throw new Error("Expected workflow run to be claimed"); // for type narrowing + + const output = { ok: true }; + const succeeded = await backend.markWorkflowRunSucceeded({ + workflowRunId: claimed.id, + workerId, + output, + }); + + expect(succeeded.status).toBe("succeeded"); + expect(succeeded.output).toEqual(output); + expect(succeeded.error).toBeNull(); + expect(succeeded.finishedAt).not.toBeNull(); + expect(succeeded.availableAt).toBeNull(); + }); + }); + + describe("markWorkflowRunFailed()", () => { + test("reschedules workflow runs with exponential backoff on first failure", async () => { + const workerId = randomUUID(); + await createPendingWorkflowRun(backend); + + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 20, + }); + if (!claimed) throw new Error("Expected workflow run to be claimed"); + + const beforeFailTime = Date.now(); + + const error = { message: "boom" }; + const failed = await backend.markWorkflowRunFailed({ + workflowRunId: claimed.id, + workerId, + error, + }); + + // rescheduled, not permanently failed + expect(failed.status).toBe("pending"); + expect(failed.error).toEqual(error); + expect(failed.output).toBeNull(); + expect(failed.finishedAt).toBeNull(); + expect(failed.workerId).toBeNull(); + + expect(failed.availableAt).not.toBeNull(); + if (!failed.availableAt) throw new Error("Expected availableAt"); + const delayMs = failed.availableAt.getTime() - beforeFailTime; + expect(delayMs).toBeGreaterThanOrEqual(900); // ~1s with some tolerance + expect(delayMs).toBeLessThan(1500); + }); + + test("reschedules with increasing backoff on multiple failures (known slow test)", async () => { + // this test needs isolated namespace + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + await createPendingWorkflowRun(backend); + + // fail first attempt + let workerId = randomUUID(); + let claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 20, + }); + if (!claimed) throw new Error("Expected workflow run to be claimed"); + expect(claimed.attempts).toBe(1); + + const firstFailed = await backend.markWorkflowRunFailed({ + workflowRunId: claimed.id, + workerId, + error: { message: "first failure" }, + }); + + expect(firstFailed.status).toBe("pending"); + + sleep(1100); // wait for first backoff (~1s) + + // fail second attempt + workerId = randomUUID(); + claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 20, + }); + if (!claimed) throw new Error("Expected workflow run to be claimed"); + expect(claimed.attempts).toBe(2); + + const beforeSecondFail = Date.now(); + const secondFailed = await backend.markWorkflowRunFailed({ + workflowRunId: claimed.id, + workerId, + error: { message: "second failure" }, + }); + + expect(secondFailed.status).toBe("pending"); + + // second attempt should have ~2s backoff (1s * 2^1) + if (!secondFailed.availableAt) throw new Error("Expected availableAt"); + const delayMs = secondFailed.availableAt.getTime() - beforeSecondFail; + expect(delayMs).toBeGreaterThanOrEqual(1900); // ~2s with some tolerance + expect(delayMs).toBeLessThan(2500); + + backend.stop(); + }); + }); + + describe("createStepAttempt()", () => { + test("creates a step attempt", async () => { + const workflowRun = await createClaimedWorkflowRun(backend); + + const expected: StepAttempt = { + namespaceId: workflowRun.namespaceId, + id: "", + workflowRunId: workflowRun.id, + stepName: randomUUID(), + kind: "function", + status: "running", + config: { key: "val" }, + context: null, + output: null, + error: null, + childWorkflowRunNamespaceId: null, + childWorkflowRunId: null, + startedAt: null, + finishedAt: null, + createdAt: new Date(), + updatedAt: new Date(), + }; + + const created = await backend.createStepAttempt({ + workflowRunId: expected.workflowRunId, + workerId: workflowRun.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: expected.stepName, + kind: expected.kind, + config: expected.config, + context: expected.context, + }); + expect(created.id).toHaveLength(36); + expect(deltaSeconds(created.startedAt)).toBeLessThan(1); + expect(deltaSeconds(created.createdAt)).toBeLessThan(1); + expect(deltaSeconds(created.updatedAt)).toBeLessThan(1); + + expected.id = created.id; + expected.startedAt = created.startedAt; + expected.createdAt = created.createdAt; + expected.updatedAt = created.updatedAt; + expect(created).toEqual(expected); + }); + }); + + describe("listStepAttempts()", () => { + test("lists step attempts ordered by creation time", async () => { + const claimed = await createClaimedWorkflowRun(backend); + + const first = await backend.createStepAttempt({ + workflowRunId: claimed.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: randomUUID(), + kind: "function", + config: {}, + context: null, + }); + await backend.markStepAttemptSucceeded({ + workflowRunId: claimed.id, + stepAttemptId: first.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion, + output: { ok: true }, + }); + + const second = await backend.createStepAttempt({ + workflowRunId: claimed.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: randomUUID(), + kind: "function", + config: {}, + context: null, + }); + + const listed = await backend.listStepAttempts({ + workflowRunId: claimed.id, + }); + expect(listed.map((step) => step.stepName)).toEqual([ + first.stepName, + second.stepName, + ]); + }); + }); + + describe("getStepAttempt()", () => { + test("returns a persisted step attempt", async () => { + const claimed = await createClaimedWorkflowRun(backend); + + const created = await backend.createStepAttempt({ + workflowRunId: claimed.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: randomUUID(), + kind: "function", + config: {}, + context: null, + }); + + const got = await backend.getStepAttempt({ + stepAttemptId: created.id, + }); + expect(got).toEqual(created); + }); + }); + + describe("markStepAttemptSucceeded()", () => { + test("marks running step attempts as succeeded", async () => { + const claimed = await createClaimedWorkflowRun(backend); + + const created = await backend.createStepAttempt({ + workflowRunId: claimed.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: randomUUID(), + kind: "function", + config: {}, + context: null, + }); + const output = { foo: "bar" }; + + const succeeded = await backend.markStepAttemptSucceeded({ + workflowRunId: claimed.id, + stepAttemptId: created.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + output, + }); + + expect(succeeded.status).toBe("succeeded"); + expect(succeeded.output).toEqual(output); + expect(succeeded.error).toBeNull(); + expect(succeeded.finishedAt).not.toBeNull(); + + const fetched = await backend.getStepAttempt({ + stepAttemptId: created.id, + }); + expect(fetched?.status).toBe("succeeded"); + expect(fetched?.output).toEqual(output); + expect(fetched?.error).toBeNull(); + expect(fetched?.finishedAt).not.toBeNull(); + }); + }); + + describe("markStepAttemptFailed()", () => { + test("marks running step attempts as failed", async () => { + const claimed = await createClaimedWorkflowRun(backend); + + const created = await backend.createStepAttempt({ + workflowRunId: claimed.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + stepName: randomUUID(), + kind: "function", + config: {}, + context: null, + }); + const error = { message: "nope" }; + + const failed = await backend.markStepAttemptFailed({ + workflowRunId: claimed.id, + stepAttemptId: created.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + error, + }); + + expect(failed.status).toBe("failed"); + expect(failed.error).toEqual(error); + expect(failed.output).toBeNull(); + expect(failed.finishedAt).not.toBeNull(); + + const fetched = await backend.getStepAttempt({ + stepAttemptId: created.id, + }); + expect(fetched?.status).toBe("failed"); + expect(fetched?.error).toEqual(error); + expect(fetched?.output).toBeNull(); + expect(fetched?.finishedAt).not.toBeNull(); + }); + }); + + describe("deadline_at", () => { + test("creates a workflow run with a deadline", async () => { + const deadline = new Date(Date.now() + 60_000); // in 1 minute + const created = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: deadline, + }); + + expect(created.deadlineAt).not.toBeNull(); + expect(created.deadlineAt?.getTime()).toBe(deadline.getTime()); + }); + + test("does not claim workflow runs past their deadline", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + const pastDeadline = new Date(Date.now() - 1000); + await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: pastDeadline, + }); + + const claimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 1000, + }); + + expect(claimed).toBeNull(); + + backend.stop(); + }); + + test("marks deadline-expired workflow runs as failed when claiming", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + const pastDeadline = new Date(Date.now() - 1000); + const created = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: pastDeadline, + }); + + // attempt to claim triggers deadline check + const claimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 1000, + }); + expect(claimed).toBeNull(); + + // verify it was marked as failed + const failed = await backend.getWorkflowRun({ + workflowRunId: created.id, + }); + expect(failed?.status).toBe("failed"); + expect(failed?.error).toEqual({ + message: "Workflow run deadline exceeded", + }); + expect(failed?.finishedAt).not.toBeNull(); + expect(failed?.availableAt).toBeNull(); + + backend.stop(); + }); + + test("does not reschedule failed workflow runs if next retry would exceed deadline", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + const deadline = new Date(Date.now() + 500); // 500ms from now + const created = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: deadline, + }); + + const workerId = randomUUID(); + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 100, + }); + expect(claimed).not.toBeNull(); + + // should mark as permanently failed since retry backoff (1s) would exceed deadline (500ms) + const failed = await backend.markWorkflowRunFailed({ + workflowRunId: created.id, + workerId, + error: { message: "test error" }, + }); + + expect(failed.status).toBe("failed"); + expect(failed.availableAt).toBeNull(); + expect(failed.finishedAt).not.toBeNull(); + + backend.stop(); + }); + + test("reschedules failed workflow runs if retry would complete before deadline", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + const deadline = new Date(Date.now() + 5000); // in 5 seconds + const created = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: deadline, + }); + + const workerId = randomUUID(); + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 100, + }); + expect(claimed).not.toBeNull(); + + // should reschedule since retry backoff (1s) is before deadline (5s) + const failed = await backend.markWorkflowRunFailed({ + workflowRunId: created.id, + workerId, + error: { message: "test error" }, + }); + + expect(failed.status).toBe("pending"); + expect(failed.availableAt).not.toBeNull(); + expect(failed.finishedAt).toBeNull(); + + backend.stop(); + }); + }); + + describe("cancelWorkflowRun()", () => { + test("cancels a pending workflow run", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + const created = await createPendingWorkflowRun(backend); + expect(created.status).toBe("pending"); + + const canceled = await backend.cancelWorkflowRun({ + workflowRunId: created.id, + }); + + expect(canceled.status).toBe("canceled"); + expect(canceled.workerId).toBeNull(); + expect(canceled.availableAt).toBeNull(); + expect(canceled.finishedAt).not.toBeNull(); + expect(deltaSeconds(canceled.finishedAt)).toBeLessThan(1); + + backend.stop(); + }); + + test("cancels a running workflow run", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + const created = await createClaimedWorkflowRun(backend); + expect(created.status).toBe("running"); + expect(created.workerId).not.toBeNull(); + + const canceled = await backend.cancelWorkflowRun({ + workflowRunId: created.id, + }); + + expect(canceled.status).toBe("canceled"); + expect(canceled.workerId).toBeNull(); + expect(canceled.availableAt).toBeNull(); + expect(canceled.finishedAt).not.toBeNull(); + + backend.stop(); + }); + + test("cancels a sleeping workflow run", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + const claimed = await createClaimedWorkflowRun(backend); + + // put workflow to sleep + const sleepUntil = new Date(Date.now() + 60_000); // 1 minute from now + const sleeping = await backend.sleepWorkflowRun({ + workflowRunId: claimed.id, + workerId: claimed.workerId ?? "", + availableAt: sleepUntil, + }); + expect(sleeping.status).toBe("sleeping"); + + const canceled = await backend.cancelWorkflowRun({ + workflowRunId: sleeping.id, + }); + + expect(canceled.status).toBe("canceled"); + expect(canceled.workerId).toBeNull(); + expect(canceled.availableAt).toBeNull(); + expect(canceled.finishedAt).not.toBeNull(); + + backend.stop(); + }); + + test("throws error when canceling a succeeded workflow run", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + const claimed = await createClaimedWorkflowRun(backend); + + // mark as succeeded + await backend.markWorkflowRunSucceeded({ + workflowRunId: claimed.id, + workerId: claimed.workerId ?? "", + output: { result: "success" }, + }); + + await expect(() => + backend.cancelWorkflowRun({ + workflowRunId: claimed.id, + }), + ).rejects.toThrow(/Cannot cancel workflow run .* with status succeeded/); + + backend.stop(); + }); + + test("throws error when canceling a failed workflow run", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + // create with deadline that's already passed to make it fail + const workflowWithDeadline = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: new Date(Date.now() - 1000), // deadline in the past + }); + + // try to claim it, which should mark it as failed due to deadline + const claimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + + // if claim succeeds, manually fail it + if (claimed?.workerId) { + await backend.markWorkflowRunFailed({ + workflowRunId: claimed.id, + workerId: claimed.workerId, + error: { message: "test error" }, + }); + } + + // get a workflow that's definitely failed + const failedRun = await backend.getWorkflowRun({ + workflowRunId: workflowWithDeadline.id, + }); + + if (failedRun?.status === "failed") { + await expect(() => + backend.cancelWorkflowRun({ + workflowRunId: failedRun.id, + }), + ).rejects.toThrow(/Cannot cancel workflow run .* with status failed/); + } + + backend.stop(); + }); + + test("is idempotent when canceling an already canceled workflow run", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + const created = await createPendingWorkflowRun(backend); + + const firstCancel = await backend.cancelWorkflowRun({ + workflowRunId: created.id, + }); + expect(firstCancel.status).toBe("canceled"); + + const secondCancel = await backend.cancelWorkflowRun({ + workflowRunId: created.id, + }); + expect(secondCancel.status).toBe("canceled"); + expect(secondCancel.id).toBe(firstCancel.id); + + backend.stop(); + }); + + test("throws error when canceling a non-existent workflow run", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + const nonExistentId = randomUUID(); + + await expect(() => + backend.cancelWorkflowRun({ + workflowRunId: nonExistentId, + }), + ).rejects.toThrow(`Workflow run ${nonExistentId} does not exist`); + + backend.stop(); + }); + + test("canceled workflow is not claimed by workers", async () => { + const backend = BackendSqlite.connect(":memory:", { + namespaceId: randomUUID(), + }); + + const created = await createPendingWorkflowRun(backend); + + // cancel the workflow + await backend.cancelWorkflowRun({ + workflowRunId: created.id, + }); + + // try to claim work + const claimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + + // should not claim the canceled workflow + expect(claimed).toBeNull(); + + backend.stop(); + }); + }); +}); + +function deltaSeconds(date: Date | null | undefined): number { + if (!date) return Infinity; + return Math.abs((Date.now() - date.getTime()) / 1000); +} + +function newDateInOneYear() { + const d = new Date(); + d.setFullYear(d.getFullYear() + 1); + return d; +} + +function sleep(ms: number) { + const start = Date.now(); + while (Date.now() - start < ms) { + // busy wait for synchronous sleep + } +} + +async function createPendingWorkflowRun(backend: BackendSqlite) { + return await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); +} + +async function createClaimedWorkflowRun(backend: BackendSqlite) { + await createPendingWorkflowRun(backend); + + const claimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + + if (!claimed) throw new Error("Failed to claim workflow run"); + + return claimed; +} diff --git a/packages/backend-sqlite/backend.ts b/packages/backend-sqlite/backend.ts new file mode 100644 index 0000000..9a2b8b7 --- /dev/null +++ b/packages/backend-sqlite/backend.ts @@ -0,0 +1,777 @@ +import { + newDatabase, + Database, + migrate, + generateUUID, + now, + addMilliseconds, + toJSON, + fromJSON, + toISO, + fromISO, + DEFAULT_DATABASE_PATH, +} from "./sqlite.js"; +import { + DEFAULT_NAMESPACE_ID, + Backend, + CancelWorkflowRunParams, + ClaimWorkflowRunParams, + CreateStepAttemptParams, + CreateWorkflowRunParams, + GetStepAttemptParams, + GetWorkflowRunParams, + HeartbeatWorkflowRunParams, + ListStepAttemptsParams, + MarkStepAttemptFailedParams, + MarkStepAttemptSucceededParams, + MarkWorkflowRunFailedParams, + MarkWorkflowRunSucceededParams, + SleepWorkflowRunParams, + StepAttempt, + WorkflowRun, + DEFAULT_RETRY_POLICY, + JsonValue, +} from "openworkflow"; + +interface BackendSqliteOptions { + namespaceId?: string; + runMigrations?: boolean; +} + +/** + * Manages a connection to a SQLite database for workflow operations. + */ +export class BackendSqlite implements Backend { + private db: Database; + private namespaceId: string; + + private constructor(db: Database, namespaceId: string) { + this.db = db; + this.namespaceId = namespaceId; + } + + /** + * Create and initialize a new BackendSqlite instance. This will + * automatically run migrations on startup unless `runMigrations` is set to + * false. + */ + static connect( + path: string = DEFAULT_DATABASE_PATH, + options?: BackendSqliteOptions, + ): BackendSqlite { + const { namespaceId, runMigrations } = { + namespaceId: DEFAULT_NAMESPACE_ID, + runMigrations: true, + ...options, + }; + + const db = newDatabase(path); + + if (runMigrations) { + migrate(db); + } + + return new BackendSqlite(db, namespaceId); + } + + stop(): void { + this.db.close(); + } + + async createWorkflowRun( + params: CreateWorkflowRunParams, + ): Promise { + const id = generateUUID(); + const currentTime = now(); + const availableAt = params.availableAt + ? toISO(params.availableAt) + : currentTime; + + const stmt = this.db.prepare(` + INSERT INTO "workflow_runs" ( + "namespace_id", + "id", + "workflow_name", + "version", + "status", + "idempotency_key", + "config", + "context", + "input", + "attempts", + "available_at", + "deadline_at", + "created_at", + "updated_at" + ) + VALUES (?, ?, ?, ?, 'pending', ?, ?, ?, ?, 0, ?, ?, ?, ?) + `); + + stmt.run( + this.namespaceId, + id, + params.workflowName, + params.version, + params.idempotencyKey, + toJSON(params.config), + toJSON(params.context), + toJSON(params.input), + availableAt, + toISO(params.deadlineAt), + currentTime, + currentTime, + ); + + const workflowRun = await this.getWorkflowRun({ workflowRunId: id }); + if (!workflowRun) throw new Error("Failed to create workflow run"); + + return workflowRun; + } + + getWorkflowRun(params: GetWorkflowRunParams): Promise { + const stmt = this.db.prepare(` + SELECT * + FROM "workflow_runs" + WHERE "namespace_id" = ? AND "id" = ? + LIMIT 1 + `); + + const row = stmt.get(this.namespaceId, params.workflowRunId) as + | WorkflowRunRow + | undefined; + + return Promise.resolve(row ? rowToWorkflowRun(row) : null); + } + + async claimWorkflowRun( + params: ClaimWorkflowRunParams, + ): Promise { + const currentTime = now(); + const newAvailableAt = addMilliseconds(currentTime, params.leaseDurationMs); + + // SQLite doesn't have SKIP LOCKED, so we need to handle claims differently + this.db.exec("BEGIN IMMEDIATE"); + + try { + // 1. mark any deadline-expired workflow runs as failed + const expireStmt = this.db.prepare(` + UPDATE "workflow_runs" + SET + "status" = 'failed', + "error" = ?, + "worker_id" = NULL, + "available_at" = NULL, + "finished_at" = ?, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "status" IN ('pending', 'running', 'sleeping') + AND "deadline_at" IS NOT NULL + AND "deadline_at" <= ? + `); + + expireStmt.run( + toJSON({ message: "Workflow run deadline exceeded" }), + currentTime, + currentTime, + this.namespaceId, + currentTime, + ); + + // 2. find an available workflow run to claim + const findStmt = this.db.prepare(` + SELECT "id" + FROM "workflow_runs" + WHERE "namespace_id" = ? + AND "status" IN ('pending', 'running', 'sleeping') + AND "available_at" <= ? + AND ("deadline_at" IS NULL OR "deadline_at" > ?) + ORDER BY + CASE WHEN "status" = 'pending' THEN 0 ELSE 1 END, + "available_at", + "created_at" + LIMIT 1 + `); + + const candidate = findStmt.get( + this.namespaceId, + currentTime, + currentTime, + ) as { id: string } | undefined; + + if (!candidate) { + this.db.exec("COMMIT"); + return null; + } + + // 3. claim the workflow run + const claimStmt = this.db.prepare(` + UPDATE "workflow_runs" + SET + "status" = 'running', + "attempts" = "attempts" + 1, + "worker_id" = ?, + "available_at" = ?, + "started_at" = COALESCE("started_at", ?), + "updated_at" = ? + WHERE "id" = ? + AND "namespace_id" = ? + `); + + claimStmt.run( + params.workerId, + newAvailableAt, + currentTime, + currentTime, + candidate.id, + this.namespaceId, + ); + + this.db.exec("COMMIT"); + + return await this.getWorkflowRun({ workflowRunId: candidate.id }); + } catch (error) { + this.db.exec("ROLLBACK"); + throw error; + } + } + + async heartbeatWorkflowRun( + params: HeartbeatWorkflowRunParams, + ): Promise { + const currentTime = now(); + const newAvailableAt = addMilliseconds(currentTime, params.leaseDurationMs); + + const stmt = this.db.prepare(` + UPDATE "workflow_runs" + SET + "available_at" = ?, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" = 'running' + AND "worker_id" = ? + `); + + const result = stmt.run( + newAvailableAt, + currentTime, + this.namespaceId, + params.workflowRunId, + params.workerId, + ); + + if (result.changes === 0) { + throw new Error("Failed to heartbeat workflow run"); + } + + const updated = await this.getWorkflowRun({ + workflowRunId: params.workflowRunId, + }); + if (!updated) throw new Error("Failed to heartbeat workflow run"); + + return updated; + } + + async sleepWorkflowRun(params: SleepWorkflowRunParams): Promise { + const currentTime = now(); + + const stmt = this.db.prepare(` + UPDATE "workflow_runs" + SET + "status" = 'sleeping', + "available_at" = ?, + "worker_id" = NULL, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" NOT IN ('succeeded', 'failed', 'canceled') + AND "worker_id" = ? + `); + + const result = stmt.run( + toISO(params.availableAt), + currentTime, + this.namespaceId, + params.workflowRunId, + params.workerId, + ); + + if (result.changes === 0) { + throw new Error("Failed to sleep workflow run"); + } + + const updated = await this.getWorkflowRun({ + workflowRunId: params.workflowRunId, + }); + if (!updated) throw new Error("Failed to sleep workflow run"); + + return updated; + } + + async markWorkflowRunSucceeded( + params: MarkWorkflowRunSucceededParams, + ): Promise { + const currentTime = now(); + + const stmt = this.db.prepare(` + UPDATE "workflow_runs" + SET + "status" = 'succeeded', + "output" = ?, + "error" = NULL, + "worker_id" = ?, + "available_at" = NULL, + "finished_at" = ?, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" = 'running' + AND "worker_id" = ? + `); + + const result = stmt.run( + toJSON(params.output), + params.workerId, + currentTime, + currentTime, + this.namespaceId, + params.workflowRunId, + params.workerId, + ); + + if (result.changes === 0) { + throw new Error("Failed to mark workflow run succeeded"); + } + + const updated = await this.getWorkflowRun({ + workflowRunId: params.workflowRunId, + }); + if (!updated) throw new Error("Failed to mark workflow run succeeded"); + + return updated; + } + + async markWorkflowRunFailed( + params: MarkWorkflowRunFailedParams, + ): Promise { + const { workflowRunId, error } = params; + const { initialIntervalMs, backoffCoefficient, maximumIntervalMs } = + DEFAULT_RETRY_POLICY; + + const currentTime = now(); + + // Get the current workflow run to access attempts + const workflowRun = await this.getWorkflowRun({ workflowRunId }); + if (!workflowRun) throw new Error("Workflow run not found"); + + // Calculate retry delay + const backoffMs = + initialIntervalMs * + Math.pow(backoffCoefficient, workflowRun.attempts - 1); + const retryDelayMs = Math.min(backoffMs, maximumIntervalMs); + + // Determine if we should reschedule or permanently fail + const nextRetryTime = new Date(Date.now() + retryDelayMs); + const shouldRetry = + !workflowRun.deadlineAt || nextRetryTime < workflowRun.deadlineAt; + + const status = shouldRetry ? "pending" : "failed"; + const availableAt = shouldRetry ? nextRetryTime.toISOString() : null; + const finishedAt = shouldRetry ? null : currentTime; + + const stmt = this.db.prepare(` + UPDATE "workflow_runs" + SET + "status" = ?, + "available_at" = ?, + "finished_at" = ?, + "error" = ?, + "worker_id" = NULL, + "started_at" = NULL, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" = 'running' + AND "worker_id" = ? + `); + + const result = stmt.run( + status, + availableAt, + finishedAt, + toJSON(error), + currentTime, + this.namespaceId, + workflowRunId, + params.workerId, + ); + + if (result.changes === 0) { + throw new Error("Failed to mark workflow run failed"); + } + + const updated = await this.getWorkflowRun({ workflowRunId }); + if (!updated) throw new Error("Failed to mark workflow run failed"); + + return updated; + } + + async cancelWorkflowRun( + params: CancelWorkflowRunParams, + ): Promise { + const currentTime = now(); + + const stmt = this.db.prepare(` + UPDATE "workflow_runs" + SET + "status" = 'canceled', + "worker_id" = NULL, + "available_at" = NULL, + "finished_at" = ?, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" IN ('pending', 'running', 'sleeping') + `); + + const result = stmt.run( + currentTime, + currentTime, + this.namespaceId, + params.workflowRunId, + ); + + if (result.changes === 0) { + // workflow may already be in a terminal state + const existing = await this.getWorkflowRun({ + workflowRunId: params.workflowRunId, + }); + if (!existing) { + throw new Error(`Workflow run ${params.workflowRunId} does not exist`); + } + + // if already canceled, just return it + if (existing.status === "canceled") { + return existing; + } + + // throw error for succeeded/failed workflows + if (["succeeded", "failed"].includes(existing.status)) { + throw new Error( + `Cannot cancel workflow run ${params.workflowRunId} with status ${existing.status}`, + ); + } + + throw new Error("Failed to cancel workflow run"); + } + + const updated = await this.getWorkflowRun({ + workflowRunId: params.workflowRunId, + }); + if (!updated) throw new Error("Failed to cancel workflow run"); + + return updated; + } + + listStepAttempts(params: ListStepAttemptsParams): Promise { + const stmt = this.db.prepare(` + SELECT * + FROM "step_attempts" + WHERE "namespace_id" = ? AND "workflow_run_id" = ? + ORDER BY "created_at" + `); + + const rows = stmt.all(this.namespaceId, params.workflowRunId); + + if (!Array.isArray(rows)) return Promise.resolve([]); + + return Promise.resolve( + rows.map((row) => rowToStepAttempt(row as unknown as StepAttemptRow)), + ); + } + + async createStepAttempt( + params: CreateStepAttemptParams, + ): Promise { + const id = generateUUID(); + const currentTime = now(); + + const stmt = this.db.prepare(` + INSERT INTO "step_attempts" ( + "namespace_id", + "id", + "workflow_run_id", + "step_name", + "kind", + "status", + "config", + "context", + "started_at", + "created_at", + "updated_at" + ) + VALUES (?, ?, ?, ?, ?, 'running', ?, ?, ?, ?, ?) + `); + + stmt.run( + this.namespaceId, + id, + params.workflowRunId, + params.stepName, + params.kind, + toJSON(params.config), + toJSON(params.context as JsonValue), + currentTime, + currentTime, + currentTime, + ); + + const stepAttempt = await this.getStepAttempt({ stepAttemptId: id }); + if (!stepAttempt) throw new Error("Failed to create step attempt"); + + return stepAttempt; + } + + getStepAttempt(params: GetStepAttemptParams): Promise { + const stmt = this.db.prepare(` + SELECT * + FROM "step_attempts" + WHERE "namespace_id" = ? AND "id" = ? + LIMIT 1 + `); + + const row = stmt.get(this.namespaceId, params.stepAttemptId) as + | StepAttemptRow + | undefined; + + return Promise.resolve(row ? rowToStepAttempt(row) : null); + } + + async markStepAttemptSucceeded( + params: MarkStepAttemptSucceededParams, + ): Promise { + const currentTime = now(); + + // Check that the workflow is running and owned by the worker + const workflowStmt = this.db.prepare(` + SELECT "id" + FROM "workflow_runs" + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" = 'running' + AND "worker_id" = ? + `); + + const workflowRow = workflowStmt.get( + this.namespaceId, + params.workflowRunId, + params.workerId, + ) as { id: string } | undefined; + + if (!workflowRow) { + throw new Error("Failed to mark step attempt succeeded"); + } + + const stmt = this.db.prepare(` + UPDATE "step_attempts" + SET + "status" = 'succeeded', + "output" = ?, + "error" = NULL, + "finished_at" = ?, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "workflow_run_id" = ? + AND "id" = ? + AND "status" = 'running' + `); + + const result = stmt.run( + toJSON(params.output), + currentTime, + currentTime, + this.namespaceId, + params.workflowRunId, + params.stepAttemptId, + ); + + if (result.changes === 0) { + throw new Error("Failed to mark step attempt succeeded"); + } + + const updated = await this.getStepAttempt({ + stepAttemptId: params.stepAttemptId, + }); + if (!updated) throw new Error("Failed to mark step attempt succeeded"); + + return updated; + } + + async markStepAttemptFailed( + params: MarkStepAttemptFailedParams, + ): Promise { + const currentTime = now(); + + // Check that the workflow is running and owned by the worker + const workflowStmt = this.db.prepare(` + SELECT "id" + FROM "workflow_runs" + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" = 'running' + AND "worker_id" = ? + `); + + const workflowRow = workflowStmt.get( + this.namespaceId, + params.workflowRunId, + params.workerId, + ) as { id: string } | undefined; + + if (!workflowRow) { + throw new Error("Failed to mark step attempt failed"); + } + + const stmt = this.db.prepare(` + UPDATE "step_attempts" + SET + "status" = 'failed', + "output" = NULL, + "error" = ?, + "finished_at" = ?, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "workflow_run_id" = ? + AND "id" = ? + AND "status" = 'running' + `); + + const result = stmt.run( + toJSON(params.error), + currentTime, + currentTime, + this.namespaceId, + params.workflowRunId, + params.stepAttemptId, + ); + + if (result.changes === 0) { + throw new Error("Failed to mark step attempt failed"); + } + + const updated = await this.getStepAttempt({ + stepAttemptId: params.stepAttemptId, + }); + if (!updated) throw new Error("Failed to mark step attempt failed"); + + return updated; + } +} + +// Row types for SQLite results +interface WorkflowRunRow { + namespace_id: string; + id: string; + workflow_name: string; + version: string | null; + status: string; + idempotency_key: string | null; + config: string; + context: string | null; + input: string | null; + output: string | null; + error: string | null; + attempts: number; + parent_step_attempt_namespace_id: string | null; + parent_step_attempt_id: string | null; + worker_id: string | null; + available_at: string | null; + deadline_at: string | null; + started_at: string | null; + finished_at: string | null; + created_at: string; + updated_at: string; +} + +interface StepAttemptRow { + namespace_id: string; + id: string; + workflow_run_id: string; + step_name: string; + kind: string; + status: string; + config: string; + context: string | null; + output: string | null; + error: string | null; + child_workflow_run_namespace_id: string | null; + child_workflow_run_id: string | null; + started_at: string | null; + finished_at: string | null; + created_at: string; + updated_at: string; +} + +// Conversion functions +function rowToWorkflowRun(row: WorkflowRunRow): WorkflowRun { + const createdAt = fromISO(row.created_at); + const updatedAt = fromISO(row.updated_at); + const config = fromJSON(row.config); + + if (!createdAt) throw new Error("createdAt is required"); + if (!updatedAt) throw new Error("updatedAt is required"); + if (config === null) throw new Error("config is required"); + + return { + namespaceId: row.namespace_id, + id: row.id, + workflowName: row.workflow_name, + version: row.version, + status: row.status as WorkflowRun["status"], + idempotencyKey: row.idempotency_key, + config: config as WorkflowRun["config"], + context: fromJSON(row.context) as WorkflowRun["context"], + input: fromJSON(row.input) as WorkflowRun["input"], + output: fromJSON(row.output) as WorkflowRun["output"], + error: fromJSON(row.error) as WorkflowRun["error"], + attempts: row.attempts, + parentStepAttemptNamespaceId: row.parent_step_attempt_namespace_id, + parentStepAttemptId: row.parent_step_attempt_id, + workerId: row.worker_id, + availableAt: fromISO(row.available_at), + deadlineAt: fromISO(row.deadline_at), + startedAt: fromISO(row.started_at), + finishedAt: fromISO(row.finished_at), + createdAt, + updatedAt, + }; +} + +function rowToStepAttempt(row: StepAttemptRow): StepAttempt { + const createdAt = fromISO(row.created_at); + const updatedAt = fromISO(row.updated_at); + const config = fromJSON(row.config); + + if (!createdAt) throw new Error("createdAt is required"); + if (!updatedAt) throw new Error("updatedAt is required"); + if (config === null) throw new Error("config is required"); + + return { + namespaceId: row.namespace_id, + id: row.id, + workflowRunId: row.workflow_run_id, + stepName: row.step_name, + kind: row.kind as StepAttempt["kind"], + status: row.status as StepAttempt["status"], + config: config as StepAttempt["config"], + context: fromJSON(row.context) as StepAttempt["context"], + output: fromJSON(row.output) as StepAttempt["output"], + error: fromJSON(row.error) as StepAttempt["error"], + childWorkflowRunNamespaceId: row.child_workflow_run_namespace_id, + childWorkflowRunId: row.child_workflow_run_id, + startedAt: fromISO(row.started_at), + finishedAt: fromISO(row.finished_at), + createdAt, + updatedAt, + }; +} diff --git a/packages/backend-sqlite/index.ts b/packages/backend-sqlite/index.ts new file mode 100644 index 0000000..cd7c692 --- /dev/null +++ b/packages/backend-sqlite/index.ts @@ -0,0 +1 @@ +export { BackendSqlite } from "./backend.js"; diff --git a/packages/backend-sqlite/package.json b/packages/backend-sqlite/package.json new file mode 100644 index 0000000..bcee5c7 --- /dev/null +++ b/packages/backend-sqlite/package.json @@ -0,0 +1,28 @@ +{ + "name": "@openworkflow/backend-sqlite", + "version": "0.1.0", + "type": "module", + "exports": { + ".": { + "development": "./index.ts", + "default": "./dist/index.js", + "types": "./dist/index.d.ts" + } + }, + "files": [ + "dist" + ], + "scripts": { + "build": "tsc", + "prepublishOnly": "npm run build" + }, + "devDependencies": { + "openworkflow": "file:../openworkflow" + }, + "peerDependencies": { + "openworkflow": "^0.2.0" + }, + "engines": { + "node": ">=22.5.0" + } +} diff --git a/packages/backend-sqlite/sqlite.test.ts b/packages/backend-sqlite/sqlite.test.ts new file mode 100644 index 0000000..a0f3cda --- /dev/null +++ b/packages/backend-sqlite/sqlite.test.ts @@ -0,0 +1,212 @@ +import { Database, newDatabase, migrations, migrate } from "./sqlite.js"; +import { randomUUID } from "node:crypto"; +import { unlinkSync } from "node:fs"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; + +// Helper to get the current migration version (exported for testing) +// Note: This function exists in sqlite.ts but isn't exported, so we'll +// test it indirectly through migrate() and by checking the migrations table +function getMigrationVersion(db: Database): number { + const existsStmt = db.prepare(` + SELECT COUNT(*) as count + FROM sqlite_master + WHERE type = 'table' AND name = 'openworkflow_migrations' + `); + const existsResult = existsStmt.get() as { count: number } | undefined; + if (!existsResult || existsResult.count === 0) return -1; + + const versionStmt = db.prepare( + `SELECT MAX("version") AS "version" FROM "openworkflow_migrations";`, + ); + const versionResult = versionStmt.get() as { version: number } | undefined; + return versionResult?.version ?? -1; +} + +describe("sqlite", () => { + let db: Database; + let dbPath: string; + + beforeEach(() => { + // Use a unique file path for each test to ensure isolation + dbPath = `/tmp/test_${randomUUID()}.db`; + db = newDatabase(dbPath); + }); + + afterEach(() => { + db.close(); + // Clean up the test database file + try { + unlinkSync(dbPath); + } catch { + // Ignore cleanup errors + } + }); + + describe("migrations()", () => { + test("returns migration SQL statements with correct table names", () => { + const migs = migrations(); + expect(migs.length).toBeGreaterThan(0); + + // Check that migrations reference the openworkflow_migrations table + for (const mig of migs) { + expect(mig).toContain("openworkflow_migrations"); + } + + // Verify first migration creates the migrations table + expect(migs[0]).toContain( + 'CREATE TABLE IF NOT EXISTS "openworkflow_migrations"', + ); + expect(migs[0]).toContain('"version"'); + }); + + test("migrations create workflow_runs and step_attempts tables", () => { + const migs = migrations(); + + // Migration 1 should create workflow_runs and step_attempts + const migration1 = migs[1]; + expect(migration1).toContain( + 'CREATE TABLE IF NOT EXISTS "workflow_runs"', + ); + expect(migration1).toContain( + 'CREATE TABLE IF NOT EXISTS "step_attempts"', + ); + }); + }); + + describe("migrate()", () => { + test("runs database migrations idempotently", () => { + // First migration + migrate(db); + const version1 = getMigrationVersion(db); + expect(version1).toBeGreaterThanOrEqual(0); + + // Second migration - should not cause errors + migrate(db); + const version2 = getMigrationVersion(db); + expect(version2).toBe(version1); // Version should not change + + // Third migration - should still work + migrate(db); + const version3 = getMigrationVersion(db); + expect(version3).toBe(version1); + }); + + test("tracks migration versions correctly", () => { + // Before migration, version should be -1 (table doesn't exist) + let version = getMigrationVersion(db); + expect(version).toBe(-1); + + // After migration, version should be the latest migration version + migrate(db); + version = getMigrationVersion(db); + + const allMigrations = migrations(); + const expectedLatestVersion = allMigrations.length - 1; + expect(version).toBe(expectedLatestVersion); + }); + + test("applies migrations incrementally", () => { + // Create the migrations table manually with version 0 + db.exec(` + CREATE TABLE IF NOT EXISTS "openworkflow_migrations" ( + "version" INTEGER NOT NULL PRIMARY KEY + ); + INSERT OR IGNORE INTO "openworkflow_migrations" ("version") + VALUES (0); + `); + + let version = getMigrationVersion(db); + expect(version).toBe(0); + + // Run migrate - should apply remaining migrations + migrate(db); + version = getMigrationVersion(db); + + const allMigrations = migrations(); + const expectedLatestVersion = allMigrations.length - 1; + expect(version).toBe(expectedLatestVersion); + }); + + test("creates all required tables after migration", () => { + migrate(db); + + // Check that migrations table exists + const migrationsCheck = db + .prepare( + ` + SELECT COUNT(*) as count + FROM sqlite_master + WHERE type = 'table' AND name = 'openworkflow_migrations' + `, + ) + .get() as { count: number }; + expect(migrationsCheck.count).toBe(1); + + // Check that workflow_runs table exists + const workflowRunsCheck = db + .prepare( + ` + SELECT COUNT(*) as count + FROM sqlite_master + WHERE type = 'table' AND name = 'workflow_runs' + `, + ) + .get() as { count: number }; + expect(workflowRunsCheck.count).toBe(1); + + // Check that step_attempts table exists + const stepAttemptsCheck = db + .prepare( + ` + SELECT COUNT(*) as count + FROM sqlite_master + WHERE type = 'table' AND name = 'step_attempts' + `, + ) + .get() as { count: number }; + expect(stepAttemptsCheck.count).toBe(1); + }); + }); + + describe("migration version tracking", () => { + test("migrations table stores version numbers correctly", () => { + migrate(db); + + const versionStmt = db.prepare( + `SELECT "version" FROM "openworkflow_migrations" ORDER BY "version";`, + ); + const versions = versionStmt.all() as { version: number }[]; + + // Should have all migration versions from 0 to latest + const allMigrations = migrations(); + const expectedLatestVersion = allMigrations.length - 1; + + expect(versions.length).toBe(expectedLatestVersion + 1); + for (let i = 0; i <= expectedLatestVersion; i++) { + const version = versions[i]; + expect(version).toBeDefined(); + expect(version?.version).toBe(i); + } + }); + + test("migrations can be run multiple times safely with INSERT OR IGNORE", () => { + migrate(db); + const versionAfterFirst = getMigrationVersion(db); + + // Run migrations again + migrate(db); + const versionAfterSecond = getMigrationVersion(db); + + expect(versionAfterSecond).toBe(versionAfterFirst); + + // Check that version entries aren't duplicated + const versionStmt = db.prepare( + `SELECT COUNT(*) as count FROM "openworkflow_migrations";`, + ); + const countResult = versionStmt.get() as { count: number }; + const allMigrations = migrations(); + const expectedCount = allMigrations.length; + expect(countResult.count).toBe(expectedCount); + }); + }); +}); diff --git a/packages/backend-sqlite/sqlite.ts b/packages/backend-sqlite/sqlite.ts new file mode 100644 index 0000000..899f90f --- /dev/null +++ b/packages/backend-sqlite/sqlite.ts @@ -0,0 +1,249 @@ +import { randomUUID } from "node:crypto"; +import { DatabaseSync } from "node:sqlite"; + +export type Database = DatabaseSync; + +export const DEFAULT_DATABASE_PATH = ":memory:"; + +/** + * newDatabase creates a new SQLite database connection. + */ +export function newDatabase(path: string = DEFAULT_DATABASE_PATH): Database { + const db = new DatabaseSync(path); + // Only enable WAL mode for file-based databases + if (path !== ":memory:") { + db.exec("PRAGMA journal_mode = WAL;"); + } + db.exec("PRAGMA foreign_keys = ON;"); + return db; +} + +/** + * migrations returns the list of migration SQL statements. + */ +export function migrations(): string[] { + return [ + // 0 - init + `BEGIN; + + CREATE TABLE IF NOT EXISTS "openworkflow_migrations" ( + "version" INTEGER NOT NULL PRIMARY KEY + ); + + INSERT OR IGNORE INTO "openworkflow_migrations" ("version") + VALUES (0); + + COMMIT;`, + + // 1 - add workflow_runs and step_attempts tables + `BEGIN; + + PRAGMA defer_foreign_keys = ON; + + CREATE TABLE IF NOT EXISTS "workflow_runs" ( + "namespace_id" TEXT NOT NULL, + "id" TEXT NOT NULL, + -- + "workflow_name" TEXT NOT NULL, + "version" TEXT, + "status" TEXT NOT NULL, + "idempotency_key" TEXT, + "config" TEXT NOT NULL, + "context" TEXT, + "input" TEXT, + "output" TEXT, + "error" TEXT, + "attempts" INTEGER NOT NULL, + "parent_step_attempt_namespace_id" TEXT, + "parent_step_attempt_id" TEXT, + "worker_id" TEXT, + "available_at" TEXT, + "deadline_at" TEXT, + "started_at" TEXT, + "finished_at" TEXT, + "created_at" TEXT NOT NULL, + "updated_at" TEXT NOT NULL, + PRIMARY KEY ("namespace_id", "id"), + FOREIGN KEY ("parent_step_attempt_namespace_id", "parent_step_attempt_id") + REFERENCES "step_attempts" ("namespace_id", "id") + ON DELETE SET NULL + ); + + CREATE TABLE IF NOT EXISTS "step_attempts" ( + "namespace_id" TEXT NOT NULL, + "id" TEXT NOT NULL, + -- + "workflow_run_id" TEXT NOT NULL, + "step_name" TEXT NOT NULL, + "kind" TEXT NOT NULL, + "status" TEXT NOT NULL, + "config" TEXT NOT NULL, + "context" TEXT, + "output" TEXT, + "error" TEXT, + "child_workflow_run_namespace_id" TEXT, + "child_workflow_run_id" TEXT, + "started_at" TEXT, + "finished_at" TEXT, + "created_at" TEXT NOT NULL, + "updated_at" TEXT NOT NULL, + PRIMARY KEY ("namespace_id", "id"), + FOREIGN KEY ("namespace_id", "workflow_run_id") + REFERENCES "workflow_runs" ("namespace_id", "id") + ON DELETE CASCADE, + FOREIGN KEY ("child_workflow_run_namespace_id", "child_workflow_run_id") + REFERENCES "workflow_runs" ("namespace_id", "id") + ON DELETE SET NULL + ); + + INSERT OR IGNORE INTO "openworkflow_migrations" ("version") + VALUES (1); + + COMMIT;`, + + // 2 - foreign keys + `BEGIN; + + -- Foreign keys are defined in migration 1 since SQLite requires them during table creation + -- This migration exists for version parity with PostgreSQL backend + + INSERT OR IGNORE INTO "openworkflow_migrations" ("version") + VALUES (2); + + COMMIT;`, + + // 3 - validate foreign keys + `BEGIN; + + -- Foreign key validation happens automatically in SQLite when PRAGMA foreign_keys = ON + -- This migration exists for version parity with PostgreSQL backend + + INSERT OR IGNORE INTO "openworkflow_migrations" ("version") + VALUES (3); + + COMMIT;`, + + // 4 - indexes + `BEGIN; + + CREATE INDEX IF NOT EXISTS "workflow_runs_status_available_at_created_at_idx" + ON "workflow_runs" ("namespace_id", "status", "available_at", "created_at"); + + CREATE INDEX IF NOT EXISTS "workflow_runs_workflow_name_idempotency_key_created_at_idx" + ON "workflow_runs" ("namespace_id", "workflow_name", "idempotency_key", "created_at"); + + CREATE INDEX IF NOT EXISTS "workflow_runs_parent_step_idx" + ON "workflow_runs" ("parent_step_attempt_namespace_id", "parent_step_attempt_id") + WHERE parent_step_attempt_namespace_id IS NOT NULL AND parent_step_attempt_id IS NOT NULL; + + CREATE INDEX IF NOT EXISTS "workflow_runs_created_at_desc_idx" + ON "workflow_runs" ("namespace_id", "created_at" DESC); + + CREATE INDEX IF NOT EXISTS "workflow_runs_status_created_at_desc_idx" + ON "workflow_runs" ("namespace_id", "status", "created_at" DESC); + + CREATE INDEX IF NOT EXISTS "workflow_runs_workflow_name_status_created_at_desc_idx" + ON "workflow_runs" ("namespace_id", "workflow_name", "status", "created_at" DESC); + + CREATE INDEX IF NOT EXISTS "step_attempts_workflow_run_created_at_idx" + ON "step_attempts" ("namespace_id", "workflow_run_id", "created_at"); + + CREATE INDEX IF NOT EXISTS "step_attempts_workflow_run_step_name_created_at_idx" + ON "step_attempts" ("namespace_id", "workflow_run_id", "step_name", "created_at"); + + CREATE INDEX IF NOT EXISTS "step_attempts_child_workflow_run_idx" + ON "step_attempts" ("child_workflow_run_namespace_id", "child_workflow_run_id") + WHERE child_workflow_run_namespace_id IS NOT NULL AND child_workflow_run_id IS NOT NULL; + + INSERT OR IGNORE INTO "openworkflow_migrations"("version") + VALUES (4); + + COMMIT;`, + ]; +} + +/** + * migrate applies pending migrations to the database. Does nothing if the + * database is already up to date. + */ +export function migrate(db: Database): void { + const currentMigrationVersion = getCurrentMigrationVersion(db); + + for (const [i, migrationSql] of migrations().entries()) { + if (i <= currentMigrationVersion) continue; // already applied + + db.exec(migrationSql); + } +} + +/** + * getCurrentMigrationVersion returns the current migration version of the database. + */ +function getCurrentMigrationVersion(db: Database): number { + // check if migrations table exists + const existsStmt = db.prepare(` + SELECT COUNT(*) as count + FROM sqlite_master + WHERE type = 'table' AND name = 'openworkflow_migrations' + `); + const existsResult = existsStmt.get() as { count: number } | undefined; + if (!existsResult || existsResult.count === 0) return -1; + + // get current version + const versionStmt = db.prepare( + `SELECT MAX("version") AS "version" FROM "openworkflow_migrations";`, + ); + const versionResult = versionStmt.get() as { version: number } | undefined; + return versionResult?.version ?? -1; +} + +/** + * Helper to generate UUIDs (SQLite doesn't have built-in UUID generation) + */ +export function generateUUID(): string { + return randomUUID(); +} + +/** + * Helper to get current timestamp in ISO8601 format + */ +export function now(): string { + return new Date().toISOString(); +} + +/** + * Helper to add milliseconds to a date and return ISO8601 string + */ +export function addMilliseconds(date: string, ms: number): string { + const d = new Date(date); + d.setMilliseconds(d.getMilliseconds() + ms); + return d.toISOString(); +} + +/** + * Helper to serialize JSON for SQLite storage + */ +export function toJSON(value: unknown): string | null { + return value === null || value === undefined ? null : JSON.stringify(value); +} + +/** + * Helper to deserialize JSON from SQLite storage + */ +export function fromJSON(value: string | null): unknown { + return value === null ? null : JSON.parse(value); +} + +/** + * Helper to convert Date to ISO8601 string for SQLite + */ +export function toISO(date: Date | null): string | null { + return date ? date.toISOString() : null; +} + +/** + * Helper to convert ISO8601 string from SQLite to Date + */ +export function fromISO(dateStr: string | null): Date | null { + return dateStr ? new Date(dateStr) : null; +} diff --git a/packages/backend-sqlite/tsconfig.json b/packages/backend-sqlite/tsconfig.json new file mode 100644 index 0000000..20f3425 --- /dev/null +++ b/packages/backend-sqlite/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": ["../../tsconfig.json"], + "compilerOptions": { + "rootDir": ".", + "outDir": "dist" + }, + "include": ["**/*.ts"] +}