Skip to content

Commit 57f5dde

Browse files
committed
runExclusive instead of manual
Signed-off-by: Quiet Node <[email protected]>
1 parent 6870011 commit 57f5dde

File tree

5 files changed

+56
-85
lines changed

5 files changed

+56
-85
lines changed

packages/relay/src/lib/clients/sdkClient.ts

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import { Logger } from 'pino';
2727

2828
import { weibarHexToTinyBarInt } from '../../formatters';
2929
import { Utils } from '../../utils';
30-
import { CommonService, RawTxSynchronizeService } from '../services';
30+
import { CommonService } from '../services';
3131
import { HbarLimitService } from '../services/hbarLimitService';
3232
import { ITransactionRecordMetric, RequestDetails, TypedEvents } from '../types';
3333
import constants from './../constants';
@@ -145,7 +145,6 @@ export class SDKClient {
145145
originalCallerAddress: string,
146146
networkGasPriceInWeiBars: number,
147147
currentNetworkExchangeRateInCents: number,
148-
rawTxSynchronizeService: RawTxSynchronizeService,
149148
): Promise<{ txResponse: TransactionResponse; fileId: FileId | null }> {
150149
const jumboTxEnabled = ConfigService.get('JUMBO_TX_ENABLED');
151150
const ethereumTransactionData: EthereumTransactionData = EthereumTransactionData.fromBytes(transactionBuffer);
@@ -191,7 +190,6 @@ export class SDKClient {
191190
requestDetails,
192191
true,
193192
originalCallerAddress,
194-
rawTxSynchronizeService,
195193
),
196194
};
197195
}
@@ -278,7 +276,6 @@ export class SDKClient {
278276
requestDetails: RequestDetails,
279277
shouldThrowHbarLimit: boolean,
280278
originalCallerAddress: string,
281-
rawTxSynchronizeService?: RawTxSynchronizeService,
282279
estimatedTxFee?: number,
283280
): Promise<TransactionResponse> {
284281
const txConstructorName = transaction.constructor.name;
@@ -336,11 +333,6 @@ export class SDKClient {
336333
}
337334
return transactionResponse;
338335
} finally {
339-
// Eventually release the transaction mutex lock if it was acquired by the sender
340-
if (rawTxSynchronizeService) {
341-
await rawTxSynchronizeService.releaseLock(originalCallerAddress);
342-
}
343-
344336
if (transactionId?.length) {
345337
this.eventEmitter.emit('execute_transaction', {
346338
transactionId,

packages/relay/src/lib/eth.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,6 @@ export class EthImpl implements Eth {
151151
hapiService,
152152
logger,
153153
mirrorNodeClient,
154-
this.rawTxSynchronizeService,
155154
);
156155
}
157156

@@ -960,7 +959,9 @@ export class EthImpl implements Eth {
960959
0: { type: 'hex', required: true },
961960
})
962961
async sendRawTransaction(transaction: string, requestDetails: RequestDetails): Promise<string | JsonRpcError> {
963-
return await this.transactionService.sendRawTransaction(transaction, requestDetails);
962+
return await this.rawTxSynchronizeService.runExclusive(transaction, async () => {
963+
return await this.transactionService.sendRawTransaction(transaction, requestDetails);
964+
});
964965
}
965966

966967
/**

packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { Precheck } from '../../../precheck';
2121
import { ITransactionReceipt, RequestDetails, TypedEvents } from '../../../types';
2222
import { CacheService } from '../../cacheService/cacheService';
2323
import HAPIService from '../../hapiService/hapiService';
24-
import { ICommonService, RawTxSynchronizeService } from '../../index';
24+
import { ICommonService } from '../../index';
2525
import { ITransactionService } from './ITransactionService';
2626

2727
export class TransactionService implements ITransactionService {
@@ -72,13 +72,6 @@ export class TransactionService implements ITransactionService {
7272
*/
7373
private readonly chain: string;
7474

75-
/**
76-
* The service responsible for synchronizing raw transaction submissions to ensure nonce ordering.
77-
* @private
78-
* @readonly
79-
*/
80-
private readonly rawTxSynchronizeService: RawTxSynchronizeService;
81-
8275
/**
8376
* Constructor for the TransactionService class.
8477
*/
@@ -90,7 +83,6 @@ export class TransactionService implements ITransactionService {
9083
hapiService: HAPIService,
9184
logger: Logger,
9285
mirrorNodeClient: MirrorNodeClient,
93-
rawTxSynchronizeService: RawTxSynchronizeService,
9486
) {
9587
this.cacheService = cacheService;
9688
this.chain = chain;
@@ -99,7 +91,6 @@ export class TransactionService implements ITransactionService {
9991
this.hapiService = hapiService;
10092
this.logger = logger;
10193
this.mirrorNodeClient = mirrorNodeClient;
102-
this.rawTxSynchronizeService = rawTxSynchronizeService;
10394
this.precheck = new Precheck(mirrorNodeClient, logger, chain);
10495
}
10596

@@ -256,12 +247,6 @@ export class TransactionService implements ITransactionService {
256247

257248
const transactionBuffer = Buffer.from(this.prune0x(transaction), 'hex');
258249
const parsedTx = Precheck.parseRawTransaction(transaction);
259-
260-
// Acquire a lock for the sender before any side effects or asynchronous calls to ensure proper nonce ordering
261-
if (parsedTx.from) {
262-
await this.rawTxSynchronizeService.acquireLock(parsedTx.from);
263-
}
264-
265250
const networkGasPriceInWeiBars = Utils.addPercentageBufferToGasPrice(
266251
await this.common.getGasPriceInWeibars(requestDetails),
267252
);
@@ -275,13 +260,7 @@ export class TransactionService implements ITransactionService {
275260
*/
276261
const useAsyncTxProcessing = ConfigService.get('USE_ASYNC_TX_PROCESSING');
277262
if (useAsyncTxProcessing) {
278-
this.sendRawTransactionProcessor(
279-
transactionBuffer,
280-
parsedTx,
281-
networkGasPriceInWeiBars,
282-
this.rawTxSynchronizeService,
283-
requestDetails,
284-
);
263+
this.sendRawTransactionProcessor(transactionBuffer, parsedTx, networkGasPriceInWeiBars, requestDetails);
285264
return Utils.computeTransactionHash(transactionBuffer);
286265
}
287266

@@ -293,7 +272,6 @@ export class TransactionService implements ITransactionService {
293272
transactionBuffer,
294273
parsedTx,
295274
networkGasPriceInWeiBars,
296-
this.rawTxSynchronizeService,
297275
requestDetails,
298276
);
299277
}
@@ -491,7 +469,6 @@ export class TransactionService implements ITransactionService {
491469
transactionBuffer: Buffer,
492470
parsedTx: EthersTransaction,
493471
networkGasPriceInWeiBars: number,
494-
rawTxSynchronizeService: RawTxSynchronizeService,
495472
requestDetails: RequestDetails,
496473
): Promise<string | JsonRpcError> {
497474
let sendRawTransactionError: any;
@@ -506,7 +483,6 @@ export class TransactionService implements ITransactionService {
506483
transactionBuffer,
507484
originalCallerAddress,
508485
networkGasPriceInWeiBars,
509-
rawTxSynchronizeService,
510486
requestDetails,
511487
);
512488

@@ -660,7 +636,6 @@ export class TransactionService implements ITransactionService {
660636
transactionBuffer: Buffer,
661637
originalCallerAddress: string,
662638
networkGasPriceInWeiBars: number,
663-
rawTxSynchronizeService: RawTxSynchronizeService,
664639
requestDetails: RequestDetails,
665640
): Promise<{
666641
txSubmitted: boolean;
@@ -680,7 +655,6 @@ export class TransactionService implements ITransactionService {
680655
originalCallerAddress,
681656
networkGasPriceInWeiBars,
682657
await this.getCurrentNetworkExchangeRateInCents(requestDetails),
683-
rawTxSynchronizeService,
684658
);
685659

686660
txSubmitted = true;

packages/relay/src/lib/services/hapiService/hapiService.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import { Counter, Registry } from 'prom-client';
99
import { SDKClient } from '../../clients';
1010
import { ITransactionRecordMetric, RequestDetails, TypedEvents } from '../../types';
1111
import { HbarLimitService } from '../hbarLimitService';
12-
import { RawTxSynchronizeService } from '../rawTxSynchronizeService/RawTxSynchronizeService';
1312

1413
export default class HAPIService {
1514
/**
@@ -243,7 +242,6 @@ export default class HAPIService {
243242
originalCallerAddress: string,
244243
networkGasPriceInWeiBars: number,
245244
currentNetworkExchangeRateInCents: number,
246-
rawTxSynchronizeService: RawTxSynchronizeService,
247245
): Promise<{ txResponse: TransactionResponse; fileId: FileId | null }> {
248246
return this.getSDKClient().submitEthereumTransaction(
249247
transactionBuffer,
@@ -252,7 +250,6 @@ export default class HAPIService {
252250
originalCallerAddress,
253251
networkGasPriceInWeiBars,
254252
currentNetworkExchangeRateInCents,
255-
rawTxSynchronizeService,
256253
);
257254
}
258255

packages/relay/src/lib/services/rawTxSynchronizeService/RawTxSynchronizeService.ts

Lines changed: 50 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22

33
import { Mutex, withTimeout } from 'async-mutex';
4+
import { Transaction } from 'ethers';
45
import { LRUCache } from 'lru-cache';
56
import { Logger } from 'pino';
67

@@ -49,62 +50,54 @@ export class RawTxSynchronizeService {
4950
}
5051

5152
/**
52-
* Acquires a mutex lock for the specified sender address with timeout.
53+
* Executes a callback under an exclusive per-sender mutex with timeout protection.
5354
*
54-
* This method implements timeout-based lock acquisition. The call will wait for the mutex
55-
* to become available but will timeout after the configured duration to prevent
56-
* deadlocks and ensure resource availability.
55+
* The lock is automatically released once the callback settles (resolve or reject). If the lock cannot be
56+
* acquired within the configured timeout, an error is thrown mirroring the previous behaviour.
5757
*
58-
* @param sender - The sender address (wallet address) to acquire the lock for
59-
* @returns Promise that resolves when the lock is successfully acquired
60-
* @throws Error if the lock acquisition fails due to timeout or internal errors
58+
* @param rawTransaction - The raw transaction payload used to derive the sender address
59+
* @param callback - The critical section to execute while the sender lock is held
60+
* @returns The callback result
6161
*/
62-
async acquireLock(sender: string): Promise<void> {
63-
const mutex = this.getOrCreateMutex(sender);
62+
async runExclusive<T>(rawTransaction: string, callback: () => Promise<T> | T): Promise<T> {
63+
const senderAddress = this.extractSender(rawTransaction);
64+
65+
if (!senderAddress) {
66+
this.logger.warn('Unable to derive sender from raw transaction. Executing callback without synchronization.');
67+
return await callback();
68+
}
69+
70+
const mutex = this.getOrCreateMutex(senderAddress);
6471
const timeoutMutex = withTimeout(mutex, RawTxSynchronizeService.DEFAULT_LOCK_TIMEOUT_MS);
72+
const waitStartedAt = Date.now();
73+
let lockAcquired = false;
6574

6675
try {
67-
// Acquire the mutex lock with timeout protection
68-
await timeoutMutex.acquire();
69-
this.logger.debug(
70-
`Lock acquired for sender: ${sender}, timeout: ${RawTxSynchronizeService.DEFAULT_LOCK_TIMEOUT_MS}ms`,
71-
);
76+
return await timeoutMutex.runExclusive(async () => {
77+
const waitDurationMs = Date.now() - waitStartedAt;
78+
this.logger.debug(
79+
`Lock acquired for sender: ${senderAddress}, waited ${waitDurationMs}ms (timeout: ${RawTxSynchronizeService.DEFAULT_LOCK_TIMEOUT_MS}ms)`,
80+
);
81+
lockAcquired = true;
82+
83+
return await callback();
84+
});
7285
} catch (error) {
7386
if (error instanceof Error && error.message.includes('timeout')) {
7487
throw new Error(
75-
`Failed to acquire lock for sender ${sender}: timeout after ${RawTxSynchronizeService.DEFAULT_LOCK_TIMEOUT_MS}ms`,
88+
`Failed to acquire lock for sender ${senderAddress}: timeout after ${RawTxSynchronizeService.DEFAULT_LOCK_TIMEOUT_MS}ms`,
7689
);
7790
}
78-
this.logger.error(`Failed to acquire lock for ${sender}:`, error);
79-
throw error;
80-
}
81-
}
82-
83-
/**
84-
* Releases the mutex lock for the specified sender address.
85-
*
86-
* This method safely releases a previously acquired lock using the mutex instance and performs cleanup
87-
* to prevent resource leaks. It's safe to call even if no lock is held for the sender.
88-
*
89-
* @param sender - The sender address to release the lock for
90-
* @returns Promise that resolves when the lock is successfully released
91-
*/
92-
async releaseLock(sender: string): Promise<void> {
93-
const mutex = this.localLockStates.get(sender);
94-
if (!mutex || !mutex.isLocked()) {
95-
this.logger.debug(`No active lock to release for sender: ${sender}`);
96-
return;
97-
}
9891

99-
try {
100-
mutex.release();
101-
this.logger.debug(`Lock released for sender: ${sender}`);
102-
} catch (error) {
103-
this.logger.error(`Error releasing lock for ${sender}:`, error);
92+
this.logger.error(`Failed to execute exclusive section for ${senderAddress}:`, error);
93+
throw error;
94+
} finally {
95+
// only log release event if lock was actually acquired
96+
if (lockAcquired) {
97+
this.logger.debug(`Lock released for sender: ${senderAddress}`);
98+
}
10499
}
105-
}
106-
107-
/**
100+
} /**
108101
* Retrieves an existing mutex for the sender or creates a new one if needed.
109102
*
110103
* @private
@@ -119,4 +112,18 @@ export class RawTxSynchronizeService {
119112
}
120113
return mutex;
121114
}
115+
116+
/**
117+
* Parses the raw transaction and returns the normalized sender address if available.
118+
*
119+
* @param rawTransaction - The serialized transaction payload (with or without 0x prefix)
120+
* @returns The lowercase sender address, or null when parsing fails or sender is absent
121+
*/
122+
private extractSender(rawTransaction: string): string | null | undefined {
123+
try {
124+
return Transaction.from(rawTransaction).from?.toLowerCase();
125+
} catch {
126+
return null;
127+
}
128+
}
122129
}

0 commit comments

Comments
 (0)