Skip to content

Commit 31702d6

Browse files
committed
feat: added transactionPoolService
Signed-off-by: Simeon Nakov <[email protected]>
1 parent ba268d5 commit 31702d6

File tree

7 files changed

+585
-4
lines changed

7 files changed

+585
-4
lines changed

packages/relay/src/lib/clients/sdkClient.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,14 @@ export class SDKClient {
334334
return transactionResponse;
335335
} finally {
336336
if (transactionId?.length) {
337+
// Convert transaction hash from Uint8Array to hex string
338+
const transactionHash = transactionResponse?.transactionHash
339+
? '0x' + Buffer.from(transactionResponse.transactionHash).toString('hex')
340+
: undefined;
341+
337342
this.eventEmitter.emit('execute_transaction', {
338343
transactionId,
344+
transactionHash,
339345
txConstructorName,
340346
operatorAccountId: this.clientMain.operatorAccountId!.toString(),
341347
requestDetails,

packages/relay/src/lib/relay.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ export class Relay {
189189

190190
hapiService.eventEmitter.on('execute_transaction', (args: IExecuteTransactionEventPayload) => {
191191
this.metricService.captureTransactionMetrics(args).then();
192+
// TODO: Call transactionPoolService.onConsensusResult(args) when service is available
192193
});
193194

194195
hapiService.eventEmitter.on('execute_query', (args: IExecuteQueryEventPayload) => {

packages/relay/src/lib/services/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ export * from '../types/rateLimiter';
1515
export * from './rateLimiterService/LruRateLimitStore';
1616
export * from './rateLimiterService/RedisRateLimitStore';
1717
export * from './rateLimiterService/rateLimiterService';
18+
export * from './transactionPoolService/transactionPoolService';
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
3+
import { Transaction } from 'ethers';
4+
import { Logger } from 'pino';
5+
6+
import { IExecuteTransactionEventPayload } from '../../types/events';
7+
import {
8+
PendingTransactionStorage,
9+
TransactionPoolService as ITransactionPoolService,
10+
} from '../../types/transactionPool';
11+
12+
/**
13+
* Service implementation that orchestrates pending transaction management.
14+
* Acts as a facade for the underlying storage layer and coordinates transaction lifecycle.
15+
*/
16+
export class TransactionPoolService implements ITransactionPoolService {
17+
/**
18+
* The logger used for logging transaction pool operations.
19+
*
20+
* @private
21+
*/
22+
private readonly logger: Logger;
23+
24+
/**
25+
* The storage implementation for managing pending transactions.
26+
*
27+
* @private
28+
*/
29+
private readonly storage: PendingTransactionStorage;
30+
31+
/**
32+
* Creates a new TransactionPoolService instance.
33+
*
34+
* @param storage - The storage backend for pending transactions.
35+
* @param logger - The logger instance for transaction pool operations.
36+
*/
37+
constructor(storage: PendingTransactionStorage, logger: Logger) {
38+
this.storage = storage;
39+
this.logger = logger.child({ name: 'transaction-pool-service' });
40+
}
41+
42+
/**
43+
* Saves a transaction into the transaction pool for the given address.
44+
*
45+
* @param address - The account address that submits the transaction.
46+
* @param tx - The transaction object to be stored.
47+
* @returns A promise that resolves once the transaction is stored.
48+
*/
49+
async saveTransaction(address: string, tx: Transaction): Promise<void> {
50+
const txHash = tx.hash;
51+
52+
if (!txHash) {
53+
throw new Error('Transaction hash is required for storage');
54+
}
55+
56+
// Get current pending count to determine expected value for atomic operation
57+
const currentPending = await this.storage.getList(address);
58+
59+
// Attempt atomic addition to the pending list
60+
const result = await this.storage.addToList(address, txHash, currentPending);
61+
62+
if (!result.ok) {
63+
throw new Error('Failed to add transaction due to concurrent modifications');
64+
}
65+
66+
this.logger.debug({ address, txHash, pendingCount: result.newValue }, 'Transaction saved to pool');
67+
}
68+
69+
/**
70+
* Handles consensus results and updates the pool state accordingly.
71+
*
72+
* @param payload - The transaction execution event payload containing transaction details.
73+
* @returns A promise that resolves when the consensus result has been processed.
74+
*/
75+
async onConsensusResult(payload: IExecuteTransactionEventPayload): Promise<void> {
76+
const { transactionHash, originalCallerAddress, transactionId } = payload;
77+
78+
if (!transactionHash) {
79+
this.logger.warn({ transactionId }, 'Transaction hash not available in execution event');
80+
return;
81+
}
82+
83+
// Remove the transaction from the pool
84+
const remainingCount = await this.removeTransaction(originalCallerAddress, transactionHash);
85+
86+
this.logger.debug(
87+
{
88+
transactionHash,
89+
address: originalCallerAddress,
90+
transactionId,
91+
remainingCount,
92+
},
93+
'Transaction removed from pool after consensus',
94+
);
95+
}
96+
97+
/**
98+
* Removes a specific transaction from the pending pool.
99+
* This is typically called when a transaction is confirmed or fails on the consensus layer.
100+
*
101+
* @param address - The account address of the transaction sender.
102+
* @param txHash - The hash of the transaction to remove.
103+
* @returns A promise that resolves to the new pending transaction count for the address.
104+
*/
105+
async removeTransaction(address: string, txHash: string): Promise<number> {
106+
const newCount = await this.storage.removeFromList(address, txHash);
107+
return newCount;
108+
}
109+
110+
/**
111+
* Retrieves the number of pending transactions for a given address.
112+
*
113+
* @param address - The account address to query.
114+
* @returns A promise that resolves to the number of pending transactions.
115+
*/
116+
async getPendingCount(address: string): Promise<number> {
117+
return await this.storage.getList(address);
118+
}
119+
120+
/**
121+
* Clears the transaction pool state (typically called on application restart).
122+
*
123+
* @returns A promise that resolves once the state has been reset.
124+
*/
125+
async resetState(): Promise<void> {
126+
this.logger.info('Resetting transaction pool state');
127+
128+
await this.storage.removeAll();
129+
130+
this.logger.info('Transaction pool state successfully reset');
131+
}
132+
}

packages/relay/src/lib/types/events.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export interface IExecuteQueryEventPayload {
1919

2020
export interface IExecuteTransactionEventPayload {
2121
transactionId: string;
22+
transactionHash?: string;
2223
txConstructorName: string;
2324
operatorAccountId: string;
2425
requestDetails: RequestDetails;

packages/relay/src/lib/types/transactionPool.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import { Transaction } from 'ethers';
44

5+
import { IExecuteTransactionEventPayload } from './events';
6+
57
/**
68
* Result of attempting to add a transaction to the pending list.
79
*
@@ -27,9 +29,10 @@ export interface TransactionPoolService {
2729
/**
2830
* Handles consensus results and updates the pool state accordingly.
2931
*
32+
* @param payload - The transaction execution event payload containing transaction details.
3033
* @returns A promise that resolves when the consensus result has been received.
3134
*/
32-
onConsensusResult(): Promise<void>;
35+
onConsensusResult(payload: IExecuteTransactionEventPayload): Promise<void>;
3336

3437
/**
3538
* Retrieves the number of pending transactions for a given address.
@@ -63,19 +66,20 @@ export interface PendingTransactionStorage {
6366
* Attempts to add a pending transaction entry for the given address.
6467
*
6568
* @param addr - The account address.
69+
* @param txHash - The transaction hash to add to the pending list.
6670
* @param expectedPending - The expected number of pending transactions.
6771
* @returns A promise that resolves to an {@link AddToListResult}.
6872
*/
69-
addToList(addr: string, expectedPending: number): Promise<AddToListResult>;
73+
addToList(addr: string, txHash: string, expectedPending: number): Promise<AddToListResult>;
7074

7175
/**
7276
* Removes a transaction from the pending list of the given address.
7377
*
7478
* @param address - The account address whose transaction should be removed.
75-
* @param transaction - The transaction identifier (e.g., hash).
79+
* @param txHash - The transaction hash to remove.
7680
* @returns A promise that resolves to the updated pending count.
7781
*/
78-
removeFromList(address: string, transaction: string): Promise<number>;
82+
removeFromList(address: string, txHash: string): Promise<number>;
7983

8084
/**
8185
* Removes all pending transactions across all addresses.

0 commit comments

Comments
 (0)