Skip to content

Commit 504d58f

Browse files
committed
feat: introduced session key
Signed-off-by: Logan Nguyen <[email protected]>
1 parent 2d5aec8 commit 504d58f

File tree

4 files changed

+121
-65
lines changed

4 files changed

+121
-65
lines changed

packages/relay/src/lib/clients/sdkClient.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ export class SDKClient {
135135
* @param {string} originalCallerAddress - The address of the original caller making the request.
136136
* @param {number} networkGasPriceInWeiBars - The predefined gas price of the network in weibar.
137137
* @param {number} currentNetworkExchangeRateInCents - The exchange rate in cents of the current network.
138+
* @param {RawTxSynchronizeService} rawTxSynchronizeService - The service for managing transaction locks.
139+
* @param {string | null} lockSessionKey - The session key for the acquired lock, null if no lock was acquired.
138140
* @returns {Promise<{ txResponse: TransactionResponse; fileId: FileId | null }>}
139141
* @throws {SDKClientError} Throws an error if no file ID is created or if the preemptive fee check fails.
140142
*/
@@ -146,6 +148,7 @@ export class SDKClient {
146148
networkGasPriceInWeiBars: number,
147149
currentNetworkExchangeRateInCents: number,
148150
rawTxSynchronizeService: RawTxSynchronizeService,
151+
lockSessionKey: string | null,
149152
): Promise<{ txResponse: TransactionResponse; fileId: FileId | null }> {
150153
const jumboTxEnabled = ConfigService.get('JUMBO_TX_ENABLED');
151154
const ethereumTransactionData: EthereumTransactionData = EthereumTransactionData.fromBytes(transactionBuffer);
@@ -192,6 +195,7 @@ export class SDKClient {
192195
true,
193196
originalCallerAddress,
194197
rawTxSynchronizeService,
198+
lockSessionKey,
195199
),
196200
};
197201
}
@@ -268,7 +272,9 @@ export class SDKClient {
268272
* @param requestDetails - The request details for logging and tracking.
269273
* @param shouldThrowHbarLimit - Flag to indicate whether to check HBAR limits.
270274
* @param originalCallerAddress - The address of the original caller making the request.
275+
* @param rawTxSynchronizeService - The service for managing transaction locks.
271276
* @param estimatedTxFee - The optional total estimated transaction fee.
277+
* @param lockSessionKey - The session key for the acquired lock, null if no lock was acquired.
272278
* @returns - A promise that resolves to the transaction response.
273279
* @throws {SDKClientError} - Throws if an error occurs during transaction execution.
274280
*/
@@ -279,6 +285,7 @@ export class SDKClient {
279285
shouldThrowHbarLimit: boolean,
280286
originalCallerAddress: string,
281287
rawTxSynchronizeService?: RawTxSynchronizeService,
288+
lockSessionKey?: string | null,
282289
estimatedTxFee?: number,
283290
): Promise<TransactionResponse> {
284291
const txConstructorName = transaction.constructor.name;
@@ -337,8 +344,8 @@ export class SDKClient {
337344
return transactionResponse;
338345
} finally {
339346
// Eventually release the transaction mutex lock if it was acquired by the sender
340-
if (rawTxSynchronizeService) {
341-
await rawTxSynchronizeService.releaseLock(originalCallerAddress);
347+
if (rawTxSynchronizeService && lockSessionKey) {
348+
await rawTxSynchronizeService.releaseLock(originalCallerAddress, lockSessionKey);
342349
}
343350

344351
if (transactionId?.length) {

packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -256,46 +256,57 @@ export class TransactionService implements ITransactionService {
256256

257257
const transactionBuffer = Buffer.from(this.prune0x(transaction), 'hex');
258258
const parsedTx = Precheck.parseRawTransaction(transaction);
259+
let lockSessionKey: string | null = null;
259260

260261
// Acquire a lock for the sender before any side effects or asynchronous calls to ensure proper nonce ordering
261262
if (parsedTx.from) {
262-
await this.rawTxSynchronizeService.acquireLock(parsedTx.from);
263+
lockSessionKey = await this.rawTxSynchronizeService.acquireLock(parsedTx.from);
263264
}
264265

265-
const networkGasPriceInWeiBars = Utils.addPercentageBufferToGasPrice(
266-
await this.common.getGasPriceInWeibars(requestDetails),
267-
);
266+
try {
267+
const networkGasPriceInWeiBars = Utils.addPercentageBufferToGasPrice(
268+
await this.common.getGasPriceInWeibars(requestDetails),
269+
);
268270

269-
await this.validateRawTransaction(parsedTx, networkGasPriceInWeiBars, requestDetails);
271+
await this.validateRawTransaction(parsedTx, networkGasPriceInWeiBars, requestDetails);
270272

271-
/**
272-
* Note: If the USE_ASYNC_TX_PROCESSING feature flag is enabled,
273-
* the transaction hash is calculated and returned immediately after passing all prechecks.
274-
* All transaction processing logic is then handled asynchronously in the background.
275-
*/
276-
const useAsyncTxProcessing = ConfigService.get('USE_ASYNC_TX_PROCESSING');
277-
if (useAsyncTxProcessing) {
278-
this.sendRawTransactionProcessor(
273+
/**
274+
* Note: If the USE_ASYNC_TX_PROCESSING feature flag is enabled,
275+
* the transaction hash is calculated and returned immediately after passing all prechecks.
276+
* All transaction processing logic is then handled asynchronously in the background.
277+
*/
278+
const useAsyncTxProcessing = ConfigService.get('USE_ASYNC_TX_PROCESSING');
279+
if (useAsyncTxProcessing) {
280+
this.sendRawTransactionProcessor(
281+
transactionBuffer,
282+
parsedTx,
283+
networkGasPriceInWeiBars,
284+
this.rawTxSynchronizeService,
285+
lockSessionKey,
286+
requestDetails,
287+
);
288+
return Utils.computeTransactionHash(transactionBuffer);
289+
}
290+
291+
/**
292+
* Note: If the USE_ASYNC_TX_PROCESSING feature flag is disabled,
293+
* wait for all transaction processing logic to complete before returning the transaction hash.
294+
*/
295+
return await this.sendRawTransactionProcessor(
279296
transactionBuffer,
280297
parsedTx,
281298
networkGasPriceInWeiBars,
282299
this.rawTxSynchronizeService,
300+
lockSessionKey,
283301
requestDetails,
284302
);
285-
return Utils.computeTransactionHash(transactionBuffer);
303+
} catch (error) {
304+
// Release the lock on any error to prevent lock starvation
305+
if (lockSessionKey && parsedTx.from) {
306+
await this.rawTxSynchronizeService.releaseLock(parsedTx.from, lockSessionKey);
307+
}
308+
throw error;
286309
}
287-
288-
/**
289-
* Note: If the USE_ASYNC_TX_PROCESSING feature flag is disabled,
290-
* wait for all transaction processing logic to complete before returning the transaction hash.
291-
*/
292-
return await this.sendRawTransactionProcessor(
293-
transactionBuffer,
294-
parsedTx,
295-
networkGasPriceInWeiBars,
296-
this.rawTxSynchronizeService,
297-
requestDetails,
298-
);
299310
}
300311

301312
/**
@@ -484,6 +495,8 @@ export class TransactionService implements ITransactionService {
484495
* @param {Buffer} transactionBuffer - The raw transaction data as a buffer.
485496
* @param {EthersTransaction} parsedTx - The parsed Ethereum transaction object.
486497
* @param {number} networkGasPriceInWeiBars - The current network gas price in wei bars.
498+
* @param {RawTxSynchronizeService} rawTxSynchronizeService - The service for managing transaction locks.
499+
* @param {string | null} lockSessionKey - The session key for the acquired lock, null if no lock was acquired.
487500
* @param {RequestDetails} requestDetails - Details of the request for logging and tracking purposes.
488501
* @returns {Promise<string | JsonRpcError>} A promise that resolves to the transaction hash if successful, or a JsonRpcError if an error occurs.
489502
*/
@@ -492,6 +505,7 @@ export class TransactionService implements ITransactionService {
492505
parsedTx: EthersTransaction,
493506
networkGasPriceInWeiBars: number,
494507
rawTxSynchronizeService: RawTxSynchronizeService,
508+
lockSessionKey: string | null,
495509
requestDetails: RequestDetails,
496510
): Promise<string | JsonRpcError> {
497511
let sendRawTransactionError: any;
@@ -507,6 +521,7 @@ export class TransactionService implements ITransactionService {
507521
originalCallerAddress,
508522
networkGasPriceInWeiBars,
509523
rawTxSynchronizeService,
524+
lockSessionKey,
510525
requestDetails,
511526
);
512527

@@ -653,6 +668,8 @@ export class TransactionService implements ITransactionService {
653668
* @param transactionBuffer The raw transaction buffer
654669
* @param originalCallerAddress The address of the original caller
655670
* @param networkGasPriceInWeiBars The current network gas price in wei bars
671+
* @param rawTxSynchronizeService The service for managing transaction locks
672+
* @param lockSessionKey The session key for the acquired lock, null if no lock was acquired
656673
* @param requestDetails The request details for logging and tracking
657674
* @returns {Promise<{txSubmitted: boolean, submittedTransactionId: string, error: any}>} A promise that resolves to an object containing transaction submission details
658675
*/
@@ -661,6 +678,7 @@ export class TransactionService implements ITransactionService {
661678
originalCallerAddress: string,
662679
networkGasPriceInWeiBars: number,
663680
rawTxSynchronizeService: RawTxSynchronizeService,
681+
lockSessionKey: string | null,
664682
requestDetails: RequestDetails,
665683
): Promise<{
666684
txSubmitted: boolean;
@@ -681,6 +699,7 @@ export class TransactionService implements ITransactionService {
681699
networkGasPriceInWeiBars,
682700
await this.getCurrentNetworkExchangeRateInCents(requestDetails),
683701
rawTxSynchronizeService,
702+
lockSessionKey,
684703
);
685704

686705
txSubmitted = true;

packages/relay/src/lib/services/hapiService/hapiService.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ export default class HAPIService {
244244
networkGasPriceInWeiBars: number,
245245
currentNetworkExchangeRateInCents: number,
246246
rawTxSynchronizeService: RawTxSynchronizeService,
247+
lockSessionKey: string | null,
247248
): Promise<{ txResponse: TransactionResponse; fileId: FileId | null }> {
248249
return this.getSDKClient().submitEthereumTransaction(
249250
transactionBuffer,
@@ -253,6 +254,7 @@ export default class HAPIService {
253254
networkGasPriceInWeiBars,
254255
currentNetworkExchangeRateInCents,
255256
rawTxSynchronizeService,
257+
lockSessionKey,
256258
);
257259
}
258260

Lines changed: 65 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,33 @@
11
// SPDX-License-Identifier: Apache-2.0
22

33
import { Mutex, withTimeout } from 'async-mutex';
4+
import { randomUUID } from 'crypto';
45
import { LRUCache } from 'lru-cache';
56
import { Logger } from 'pino';
67

8+
/**
9+
* Represents the state of a lock for a specific sender address.
10+
* Encapsulates both the mutex and the active session keys for that lock.
11+
*/
12+
interface LockState {
13+
/** The mutex used for synchronization */
14+
mutex: Mutex;
15+
/** Set of active session keys that can release this lock */
16+
activeSessionKeys: Set<string>;
17+
}
18+
719
export class RawTxSynchronizeService {
8-
/** Default timeout for lock acquisition in milliseconds (30 seconds) */
9-
private static readonly DEFAULT_LOCK_TIMEOUT_MS = 30_000;
20+
/** Default timeout for lock acquisition in milliseconds (300 seconds) */
21+
private static readonly DEFAULT_LOCK_TIMEOUT_MS = 300_000;
1022

1123
/** Default TTL for inactive sender mutex entries in milliseconds (15 minutes) */
1224
private static readonly DEFAULT_LOCK_STATE_TTL_MS = 15 * 60 * 1000;
1325

1426
/** Maximum number of sender mutex entries to maintain concurrently */
1527
private static readonly MAX_LOCKS = 1000;
1628

17-
/** LRU cache with TTL for managing sender mutexes with automatic cleanup */
18-
private readonly localLockStates: LRUCache<string, Mutex>;
29+
/** LRU cache with TTL for managing sender lock states with automatic cleanup */
30+
private readonly localLockStates: LRUCache<string, LockState>;
1931

2032
/** Logger instance for debugging and monitoring lock operations */
2133
private readonly logger: Logger;
@@ -30,19 +42,21 @@ export class RawTxSynchronizeService {
3042
*/
3143
constructor(logger: Logger) {
3244
this.logger = logger;
33-
this.localLockStates = new LRUCache<string, Mutex>({
45+
this.localLockStates = new LRUCache<string, LockState>({
3446
max: RawTxSynchronizeService.MAX_LOCKS,
3547
ttl: RawTxSynchronizeService.DEFAULT_LOCK_STATE_TTL_MS,
36-
dispose: (mutex: Mutex, sender: string) => {
48+
dispose: (lockState: LockState, sender: string) => {
3749
// Clean up any active locks during eviction/expiration
38-
if (mutex.isLocked()) {
50+
if (lockState.mutex.isLocked()) {
3951
try {
40-
mutex.release();
52+
lockState.mutex.release();
4153
this.logger.debug(`Active lock auto-released during cleanup for sender: ${sender}`);
4254
} catch (error) {
4355
this.logger.warn(`Error auto-releasing lock during cleanup for sender: ${sender}`, error);
4456
}
4557
}
58+
// Clear all active sessions since the lock state is being disposed
59+
lockState.activeSessionKeys.clear();
4660
this.logger.debug(`Lock state evicted/expired for sender: ${sender}`);
4761
},
4862
});
@@ -51,26 +65,32 @@ export class RawTxSynchronizeService {
5165
/**
5266
* Acquires a mutex lock for the specified sender address with timeout.
5367
*
54-
* This method implements timeout-based lock acquisition. The call will wait for the mutex
55-
* to become available but will timeout after the configured duration to prevent
56-
* deadlocks and ensure resource availability.
68+
* This method implements timeout-based lock acquisition and generates a unique session key
69+
* to track the lock instance. The session key can be used to release the specific lock
70+
* and prevent double-release scenarios.
5771
*
5872
* @param sender - The sender address (wallet address) to acquire the lock for
59-
* @returns Promise that resolves when the lock is successfully acquired
73+
* @returns Promise that resolves to a unique session key when the lock is successfully acquired
6074
* @throws Error if the lock acquisition fails due to timeout or internal errors
6175
*/
62-
async acquireLock(sender: string): Promise<void> {
63-
const mutex = this.getOrCreateMutex(sender);
64-
const timeoutMutex = withTimeout(mutex, RawTxSynchronizeService.DEFAULT_LOCK_TIMEOUT_MS);
76+
async acquireLock(sender: string): Promise<string> {
77+
const lockState = this.getOrCreateLockState(sender);
78+
const timeoutMutex = withTimeout(lockState.mutex, RawTxSynchronizeService.DEFAULT_LOCK_TIMEOUT_MS);
6579
const waitStartedAt = Date.now();
80+
const sessionKey = randomUUID();
6681

6782
try {
6883
// Acquire the mutex lock with timeout protection
6984
await timeoutMutex.acquire();
85+
86+
// Add the session key to the active sessions set
87+
lockState.activeSessionKeys.add(sessionKey);
88+
7089
const waitDurationMs = Date.now() - waitStartedAt;
7190
this.logger.debug(
72-
`Lock acquired for sender: ${sender}, waited ${waitDurationMs}ms (timeout: ${RawTxSynchronizeService.DEFAULT_LOCK_TIMEOUT_MS}ms)`,
91+
`Lock acquired for sender: ${sender}, sessionKey: ${sessionKey}, waited ${waitDurationMs}ms (timeout: ${RawTxSynchronizeService.DEFAULT_LOCK_TIMEOUT_MS}ms)`,
7392
);
93+
return sessionKey;
7494
} catch (error) {
7595
if (error instanceof Error && error.message.includes('timeout')) {
7696
throw new Error(
@@ -83,42 +103,50 @@ export class RawTxSynchronizeService {
83103
}
84104

85105
/**
86-
* Releases the mutex lock for the specified sender address.
106+
* Releases the mutex lock for the specified sender address using a session key.
87107
*
88-
* This method safely releases a previously acquired lock using the mutex instance and performs cleanup
89-
* to prevent resource leaks. It's safe to call even if no lock is held for the sender.
108+
* This method safely releases a previously acquired lock by checking if the session key
109+
* is in the active sessions set. If the session key exists, it releases the mutex and
110+
* removes the session key. If not, it means the lock for that session was already released (double-release protection).
90111
*
91112
* @param sender - The sender address to release the lock for
92-
* @returns Promise that resolves when the lock is successfully released
113+
* @param sessionKey - The unique session key returned from acquireLock()
114+
* @returns Promise that resolves when the lock is successfully released or if already released
93115
*/
94-
async releaseLock(sender: string): Promise<void> {
95-
const mutex = this.localLockStates.get(sender);
96-
if (!mutex || !mutex.isLocked()) {
97-
this.logger.debug(`No active lock to release for sender: ${sender}`);
116+
async releaseLock(sender: string, sessionKey: string): Promise<void> {
117+
const lockState = this.localLockStates.get(sender);
118+
119+
if (!lockState || !lockState.activeSessionKeys.has(sessionKey)) {
120+
this.logger.debug(`Lock already released or not found for sender: ${sender}, session: ${sessionKey}`);
98121
return;
99122
}
100123

101124
try {
102-
mutex.release();
103-
this.logger.debug(`Lock released for sender: ${sender}`);
125+
lockState.mutex.release();
126+
this.logger.debug(`Lock released for sender: ${sender}, session: ${sessionKey}`);
104127
} catch (error) {
105-
this.logger.error(`Error releasing lock for ${sender}:`, error);
128+
this.logger.error(`Error releasing lock for ${sender}, session: ${sessionKey}:`, error);
129+
} finally {
130+
// Always remove the session key to prevent stale state, regardless of success or error
131+
lockState.activeSessionKeys.delete(sessionKey);
106132
}
107133
}
108-
109134
/**
110-
* Retrieves an existing mutex for the sender or creates a new one if needed.
135+
* Retrieves an existing lock state for the sender or creates a new one if needed.
111136
*
112137
* @private
113-
* @param sender - The sender address to get or create a mutex for
114-
* @returns The mutex associated with the specified sender
138+
* @param sender - The sender address to get or create a lock state for
139+
* @returns The lock state associated with the specified sender
115140
*/
116-
private getOrCreateMutex(sender: string): Mutex {
117-
let mutex = this.localLockStates.get(sender);
118-
if (!mutex) {
119-
mutex = new Mutex();
120-
this.localLockStates.set(sender, mutex);
141+
private getOrCreateLockState(sender: string): LockState {
142+
let lockState = this.localLockStates.get(sender);
143+
if (!lockState) {
144+
lockState = {
145+
mutex: new Mutex(),
146+
activeSessionKeys: new Set<string>(),
147+
};
148+
this.localLockStates.set(sender, lockState);
121149
}
122-
return mutex;
150+
return lockState;
123151
}
124152
}

0 commit comments

Comments
 (0)