From 9347caaf6254e981ae746ea5571dacd117772c38 Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Mon, 23 Jun 2025 11:20:09 -0600 Subject: [PATCH 1/2] feat: support sql transactions for multiple dbs --- src/helpers/values.ts | 16 ++++++++++++++++ src/postgres/base-pg-store.ts | 19 ++++++++++--------- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/helpers/values.ts b/src/helpers/values.ts index 7b8a22e..f9bd74c 100644 --- a/src/helpers/values.ts +++ b/src/helpers/values.ts @@ -142,3 +142,19 @@ export function toEnumValue(enm: { [s: string]: T }, value: string): T | unde ? (value as unknown as T) : undefined; } + +/** + * Unwraps a value that may be undefined or null. + * @param val - The value to unwrap + * @param onNullish - Callback to throw an error if the value is null or undefined + * @returns The unwrapped value + */ +export function unwrap(val: T | null, onNullish?: () => string): Exclude { + if (val === undefined) { + throw new Error(onNullish?.() ?? 'value is undefined'); + } + if (val === null) { + throw new Error(onNullish?.() ?? 'value is null'); + } + return val as Exclude; +} diff --git a/src/postgres/base-pg-store.ts b/src/postgres/base-pg-store.ts index 05d4585..5ce3623 100644 --- a/src/postgres/base-pg-store.ts +++ b/src/postgres/base-pg-store.ts @@ -8,8 +8,7 @@ import { isProdEnv } from '../helpers/values'; */ export const sqlTransactionContext = new AsyncLocalStorage(); type SqlTransactionContext = { - usageName: string; - sql: PgSqlClient; + [dbName: string]: PgSqlClient; }; type UnwrapPromiseArray = T extends any[] ? { @@ -26,8 +25,9 @@ export abstract class BasePgStore { * async context will be returned to guarantee transaction consistency. */ get sql(): PgSqlClient { + const dbName = this._sql.options.database?.toString() ?? 'default'; const sqlContext = sqlTransactionContext.getStore(); - return sqlContext ? sqlContext.sql : this._sql; + return sqlContext ? sqlContext[dbName] : this._sql; } private readonly _sql: PgSqlClient; @@ -52,15 +52,16 @@ export abstract class BasePgStore { callback: (sql: PgSqlClient) => T | Promise, readOnly = true ): Promise> { - // Do we have a scoped client already? Use it directly. - const sqlContext = sqlTransactionContext.getStore(); - if (sqlContext) { - return callback(sqlContext.sql) as UnwrapPromiseArray; + // Do we have a scoped client already? Use it directly. Key is the database name. + const dbName = this._sql.options.database?.toString() ?? 'default'; + const sql = sqlTransactionContext.getStore()?.[dbName]; + if (sql) { + return callback(sql) as UnwrapPromiseArray; } // Otherwise, start a transaction and store the scoped connection in the current async context. - const usageName = this._sql.options.connection.application_name ?? ''; return this._sql.begin(readOnly ? 'read only' : 'read write', sql => { - return sqlTransactionContext.run({ usageName, sql }, () => callback(sql)); + const currentStore = sqlTransactionContext.getStore() ?? {}; + return sqlTransactionContext.run({ ...currentStore, [dbName]: sql }, () => callback(sql)); }); } From e953707915645ee0a4acea8b7069d8e303953d0f Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Mon, 23 Jun 2025 11:27:36 -0600 Subject: [PATCH 2/2] fix: test --- src/postgres/__tests__/base-pg-store.test.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/postgres/__tests__/base-pg-store.test.ts b/src/postgres/__tests__/base-pg-store.test.ts index aa92041..a1661d6 100644 --- a/src/postgres/__tests__/base-pg-store.test.ts +++ b/src/postgres/__tests__/base-pg-store.test.ts @@ -92,21 +92,18 @@ describe('BasePgStore', () => { }); test('postgres transaction connection integrity', async () => { - const usageName = 'postgres:test;datastore-crud'; const obj = db.sql; + const dbName = obj.options.database; expect(sqlTransactionContext.getStore()).toBeUndefined(); await db.sqlTransaction(async sql => { - // Transaction flag is open. - expect(sqlTransactionContext.getStore()?.usageName).toBe(usageName); // New connection object. const newObj = sql; expect(obj).not.toEqual(newObj); - expect(sqlTransactionContext.getStore()?.sql).toEqual(newObj); + expect(sqlTransactionContext.getStore()?.[dbName]).toEqual(newObj); // Nested tx uses the same connection object. await db.sqlTransaction(sql => { - expect(sqlTransactionContext.getStore()?.usageName).toBe(usageName); expect(newObj).toEqual(sql); });