diff --git a/packages/relay/src/lib/eth.ts b/packages/relay/src/lib/eth.ts index 7bdee42d56..ac4fd200ae 100644 --- a/packages/relay/src/lib/eth.ts +++ b/packages/relay/src/lib/eth.ts @@ -2,6 +2,7 @@ import { EventEmitter } from 'events'; import { Logger } from 'pino'; +import { RedisClientType } from 'redis'; import { Eth } from '../index'; import { MirrorNodeClient } from './clients'; @@ -19,6 +20,8 @@ import { IBlockService, ICommonService, IContractService, + LocalPendingTransactionStorage, + TransactionPoolService, TransactionService, } from './services'; import type { CacheService } from './services/cacheService/cacheService'; @@ -26,6 +29,7 @@ import { FeeService } from './services/ethService/feeService/FeeService'; import { IFeeService } from './services/ethService/feeService/IFeeService'; import { ITransactionService } from './services/ethService/transactionService/ITransactionService'; import HAPIService from './services/hapiService/hapiService'; +import { RedisPendingTransactionStorage } from './services/transactionPoolService/RedisPendingTransactionStorage'; import { IContractCallRequest, IFeeHistory, @@ -123,6 +127,7 @@ export class EthImpl implements Eth { logger: Logger, chain: string, public readonly cacheService: CacheService, + public readonly redisClient: RedisClientType | undefined, ) { this.chain = chain; this.logger = logger; @@ -134,6 +139,10 @@ export class EthImpl implements Eth { this.contractService = new ContractService(cacheService, this.common, hapiService, logger, mirrorNodeClient); this.accountService = new AccountService(cacheService, this.common, logger, mirrorNodeClient); this.blockService = new BlockService(cacheService, chain, this.common, mirrorNodeClient, logger); + const storage = this.redisClient + ? new RedisPendingTransactionStorage(this.redisClient) + : new LocalPendingTransactionStorage(); + const transactionPoolService = new TransactionPoolService(storage, logger); this.transactionService = new TransactionService( cacheService, chain, @@ -142,6 +151,7 @@ export class EthImpl implements Eth { hapiService, logger, mirrorNodeClient, + transactionPoolService, ); } diff --git a/packages/relay/src/lib/relay.ts b/packages/relay/src/lib/relay.ts index 3d37fd7ab1..342df7f6f4 100644 --- a/packages/relay/src/lib/relay.ts +++ b/packages/relay/src/lib/relay.ts @@ -192,6 +192,7 @@ export class Relay { logger.child({ name: 'relay-eth' }), chainId, this.cacheService, + this.redisClient, ); (this.ethImpl as EthImpl).eventEmitter.on('eth_execution', (args: IEthExecutionEventPayload) => { @@ -200,7 +201,6 @@ export class Relay { hapiService.eventEmitter.on('execute_transaction', (args: IExecuteTransactionEventPayload) => { this.metricService.captureTransactionMetrics(args).then(); - // TODO: Call transactionPoolService.onConsensusResult(args) when service is available }); hapiService.eventEmitter.on('execute_query', (args: IExecuteQueryEventPayload) => { diff --git a/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts b/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts index 6a1b80816f..52e6ae7773 100644 --- a/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts +++ b/packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts @@ -21,7 +21,7 @@ import { Precheck } from '../../../precheck'; import { ITransactionReceipt, RequestDetails, TypedEvents } from '../../../types'; import { CacheService } from '../../cacheService/cacheService'; import HAPIService from '../../hapiService/hapiService'; -import { ICommonService } from '../../index'; +import { ICommonService, TransactionPoolService } from '../../index'; import { ITransactionService } from './ITransactionService'; export class TransactionService implements ITransactionService { @@ -66,6 +66,8 @@ export class TransactionService implements ITransactionService { */ private readonly precheck: Precheck; + private readonly transactionPoolService: TransactionPoolService; + /** * The ID of the chain, as a hex string, as it would be returned in a JSON-RPC call. * @private @@ -83,6 +85,7 @@ export class TransactionService implements ITransactionService { hapiService: HAPIService, logger: Logger, mirrorNodeClient: MirrorNodeClient, + transactionPoolService: TransactionPoolService, ) { this.cacheService = cacheService; this.chain = chain; @@ -92,6 +95,7 @@ export class TransactionService implements ITransactionService { this.logger = logger; this.mirrorNodeClient = mirrorNodeClient; this.precheck = new Precheck(mirrorNodeClient, logger, chain); + this.transactionPoolService = transactionPoolService; } /** @@ -455,6 +459,15 @@ export class TransactionService implements ITransactionService { return input.startsWith(constants.EMPTY_HEX) ? input.substring(2) : input; } + /** + * Narrows an ethers Transaction to one that definitely has a non-null hash. + */ + private assertSignedTransaction(tx: EthersTransaction): asserts tx is EthersTransaction & { hash: string } { + if (tx.hash == null) { + throw predefined.INVALID_ARGUMENTS('Expected a signed transaction with a non-null hash'); + } + } + /** * Asynchronously processes a raw transaction by submitting it to the network, managing HFS, polling the MN, handling errors, and returning the transaction hash. * @@ -471,10 +484,16 @@ export class TransactionService implements ITransactionService { networkGasPriceInWeiBars: number, requestDetails: RequestDetails, ): Promise { + // although we validate in earlier stages that we have + // a signed transaction, we need to assert it again here in order to satisfy the type checker + this.assertSignedTransaction(parsedTx); let sendRawTransactionError: any; const originalCallerAddress = parsedTx.from?.toString() || ''; + // Save the transaction to the transaction pool before submitting it to the network + await this.transactionPoolService.saveTransaction(originalCallerAddress, parsedTx); + this.eventEmitter.emit('eth_execution', { method: constants.ETH_SEND_RAW_TRANSACTION, }); @@ -530,12 +549,18 @@ export class TransactionService implements ITransactionService { ); } + // Remove the transaction from the transaction pool after successful submission + await this.transactionPoolService.removeTransaction(originalCallerAddress, contractResult.hash); + return contractResult.hash; } catch (e: any) { sendRawTransactionError = e; } } + // Remove the transaction from the transaction pool after unsuccessful submission + await this.transactionPoolService.removeTransaction(originalCallerAddress, parsedTx.hash); + // If this point is reached, it means that no valid transaction hash was returned. Therefore, an error must have occurred. return await this.sendRawTransactionErrorHandler( sendRawTransactionError, diff --git a/packages/relay/src/lib/services/transactionPoolService/transactionPoolService.ts b/packages/relay/src/lib/services/transactionPoolService/transactionPoolService.ts index c87d7a9473..768c124637 100644 --- a/packages/relay/src/lib/services/transactionPoolService/transactionPoolService.ts +++ b/packages/relay/src/lib/services/transactionPoolService/transactionPoolService.ts @@ -48,12 +48,13 @@ export class TransactionPoolService implements ITransactionPoolService { */ async saveTransaction(address: string, tx: Transaction): Promise { const txHash = tx.hash; + const addressLowerCased = address.toLowerCase(); if (!txHash) { throw new Error('Transaction hash is required for storage'); } - const result = await this.storage.addToList(address, txHash); + const result = await this.storage.addToList(addressLowerCased, txHash); if (!result.ok) { throw new Error('Failed to add transaction to list'); @@ -62,33 +63,6 @@ export class TransactionPoolService implements ITransactionPoolService { this.logger.debug({ address, txHash, pendingCount: result.newValue }, 'Transaction saved to pool'); } - /** - * Handles consensus results and updates the pool state accordingly. - * - * @param payload - The transaction execution event payload containing transaction details. - * @returns A promise that resolves when the consensus result has been processed. - */ - async onConsensusResult(payload: IExecuteTransactionEventPayload): Promise { - const { transactionHash, originalCallerAddress, transactionId } = payload; - - if (!transactionHash) { - this.logger.warn({ transactionId }, 'Transaction hash not available in execution event'); - return; - } - - const remainingCount = await this.removeTransaction(originalCallerAddress, transactionHash); - - this.logger.debug( - { - transactionHash, - address: originalCallerAddress, - transactionId, - remainingCount, - }, - 'Transaction removed from pool after consensus', - ); - } - /** * Removes a specific transaction from the pending pool. * This is typically called when a transaction is confirmed or fails on the consensus layer. @@ -98,7 +72,8 @@ export class TransactionPoolService implements ITransactionPoolService { * @returns A promise that resolves to the new pending transaction count for the address. */ async removeTransaction(address: string, txHash: string): Promise { - return await this.storage.removeFromList(address, txHash); + const addressLowerCased = address.toLowerCase(); + return await this.storage.removeFromList(addressLowerCased, txHash); } /** @@ -108,7 +83,8 @@ export class TransactionPoolService implements ITransactionPoolService { * @returns A promise that resolves to the number of pending transactions. */ async getPendingCount(address: string): Promise { - return await this.storage.getList(address); + const addressLowerCased = address.toLowerCase(); + return await this.storage.getList(addressLowerCased); } /** diff --git a/packages/relay/src/lib/types/transactionPool.ts b/packages/relay/src/lib/types/transactionPool.ts index 5096247f29..bffaf9cb5c 100644 --- a/packages/relay/src/lib/types/transactionPool.ts +++ b/packages/relay/src/lib/types/transactionPool.ts @@ -2,8 +2,6 @@ import { Transaction } from 'ethers'; -import { IExecuteTransactionEventPayload } from './events'; - /** * Result of attempting to add a transaction to the pending list. * @@ -26,14 +24,6 @@ export interface TransactionPoolService { */ saveTransaction(address: string, tx: Transaction): Promise; - /** - * Handles consensus results and updates the pool state accordingly. - * - * @param payload - The transaction execution event payload containing transaction details. - * @returns A promise that resolves when the consensus result has been received. - */ - onConsensusResult(payload: IExecuteTransactionEventPayload): Promise; - /** * Retrieves the number of pending transactions for a given address. * diff --git a/packages/relay/tests/lib/eth/eth_sendRawTransaction.spec.ts b/packages/relay/tests/lib/eth/eth_sendRawTransaction.spec.ts index 19ef5fe377..05d1af3c00 100644 --- a/packages/relay/tests/lib/eth/eth_sendRawTransaction.spec.ts +++ b/packages/relay/tests/lib/eth/eth_sendRawTransaction.spec.ts @@ -288,6 +288,34 @@ describe('@ethSendRawTransaction eth_sendRawTransaction spec', async function () ); }); + it('should save and remove transaction from transaction pool on success path', async function () { + const signed = await signTransaction(transaction); + const txPool = ethImpl['transactionService']['transactionPoolService'] as any; + + const saveStub = sinon.stub(txPool, 'saveTransaction').resolves(); + const removeStub = sinon.stub(txPool, 'removeTransaction').resolves(); + + restMock.onGet(contractResultEndpoint).reply(200, JSON.stringify({ hash: ethereumHash })); + sdkClientStub.submitEthereumTransaction.resolves({ + txResponse: { + transactionId: TransactionId.fromString(transactionIdServicesFormat), + } as unknown as TransactionResponse, + fileId: null, + }); + + const result = await ethImpl.sendRawTransaction(signed, requestDetails); + + expect(result).to.equal(ethereumHash); + sinon.assert.calledOnce(saveStub); + sinon.assert.calledWithMatch(saveStub, accountAddress, sinon.match.object); + + sinon.assert.calledOnce(removeStub); + sinon.assert.calledWith(removeStub, accountAddress, ethereumHash); + + saveStub.restore(); + removeStub.restore(); + }); + withOverriddenEnvsInMochaTest({ USE_ASYNC_TX_PROCESSING: false }, () => { it('[USE_ASYNC_TX_PROCESSING=true] should throw internal error when transaction returned from mirror node is null', async function () { const signed = await signTransaction(transaction); @@ -307,6 +335,35 @@ describe('@ethSendRawTransaction eth_sendRawTransaction spec', async function () expect(`Error invoking RPC: ${response.message}`).to.equal(predefined.INTERNAL_ERROR(response.message).message); }); + it('should save and remove transaction (fallback path uses parsedTx.hash)', async function () { + const signed = await signTransaction(transaction); + const expectedTxHash = Utils.computeTransactionHash(Buffer.from(signed.replace('0x', ''), 'hex')); + const txPool = ethImpl['transactionService']['transactionPoolService'] as any; + + const saveStub = sinon.stub(txPool, 'saveTransaction').resolves(); + const removeStub = sinon.stub(txPool, 'removeTransaction').resolves(); + + // Cause MN polling to fail, forcing fallback + restMock.onGet(contractResultEndpoint).reply(404, JSON.stringify(mockData.notFound)); + sdkClientStub.submitEthereumTransaction.resolves({ + txResponse: { + transactionId: TransactionId.fromString(transactionIdServicesFormat), + } as unknown as TransactionResponse, + fileId: null, + }); + + await ethImpl.sendRawTransaction(signed, requestDetails); + + sinon.assert.calledOnce(saveStub); + sinon.assert.calledWithMatch(saveStub, accountAddress, sinon.match.object); + + sinon.assert.calledOnce(removeStub); + sinon.assert.calledWith(removeStub, accountAddress, expectedTxHash); + + saveStub.restore(); + removeStub.restore(); + }); + it('[USE_ASYNC_TX_PROCESSING=false] should throw internal error when transactionID is invalid', async function () { const signed = await signTransaction(transaction); diff --git a/packages/relay/tests/lib/services/transactionPoolService/transactionPoolService.spec.ts b/packages/relay/tests/lib/services/transactionPoolService/transactionPoolService.spec.ts index 078b89e398..aa20618702 100644 --- a/packages/relay/tests/lib/services/transactionPoolService/transactionPoolService.spec.ts +++ b/packages/relay/tests/lib/services/transactionPoolService/transactionPoolService.spec.ts @@ -130,87 +130,6 @@ describe.only('TransactionPoolService Test Suite', function () { }); }); - describe('onConsensusResult', () => { - it('should successfully remove transaction from pool when transaction hash is provided', async () => { - const remainingCount = 1; - const payload = createTestEventPayload(); - - mockStorage.removeFromList.resolves(remainingCount); - - await transactionPoolService.onConsensusResult(payload); - - expect(mockStorage.removeFromList.calledOnceWith(testAddress, testTxHash)).to.be.true; - }); - - it('should handle missing transaction hash gracefully', async () => { - const payload = createTestEventPayload({ transactionHash: undefined }); - - await transactionPoolService.onConsensusResult(payload); - - expect(mockStorage.removeFromList.called).to.be.false; - }); - - it('should handle empty transaction hash gracefully', async () => { - const payload = createTestEventPayload({ transactionHash: '' }); - - await transactionPoolService.onConsensusResult(payload); - - expect(mockStorage.removeFromList.called).to.be.false; - }); - - it('should throw storage errors during transaction removal', async () => { - const payload = createTestEventPayload(); - const storageError = new Error('Storage removal failed'); - mockStorage.removeFromList.rejects(storageError); - - // Should throw - errors are no longer caught - await expect(transactionPoolService.onConsensusResult(payload)).to.be.rejectedWith('Storage removal failed'); - - expect(mockStorage.removeFromList.calledOnceWith(testAddress, testTxHash)).to.be.true; - }); - - it('should throw when removal fails', async () => { - const payload = createTestEventPayload(); - mockStorage.removeFromList.rejects(new Error('Storage error')); - - // Should throw when storage fails - await expect(transactionPoolService.onConsensusResult(payload)).to.be.rejectedWith('Storage error'); - }); - - it('should use originalCallerAddress from payload for transaction removal', async () => { - const customAddress = '0x999999999999999999999999999999999999999'; - const payload = createTestEventPayload({ originalCallerAddress: customAddress }); - - mockStorage.removeFromList.resolves(0); - - await transactionPoolService.onConsensusResult(payload); - - expect(mockStorage.removeFromList.calledOnceWith(customAddress, testTxHash)).to.be.true; - }); - - it('should handle multiple transaction removals correctly', async () => { - const payload1 = createTestEventPayload(); - const payload2 = createTestEventPayload({ - transactionHash: '0xabcdef1234567890', - originalCallerAddress: '0x1111111111111111111111111111111111111111', - }); - - mockStorage.removeFromList.resolves(0); - - await transactionPoolService.onConsensusResult(payload1); - await transactionPoolService.onConsensusResult(payload2); - - expect(mockStorage.removeFromList.calledTwice).to.be.true; - expect(mockStorage.removeFromList.firstCall.calledWith(testAddress, testTxHash)).to.be.true; - expect( - mockStorage.removeFromList.secondCall.calledWith( - '0x1111111111111111111111111111111111111111', - '0xabcdef1234567890', - ), - ).to.be.true; - }); - }); - describe('removeTransaction', () => { it('should successfully remove transaction from pool', async () => { const remainingCount = 1;