Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/relay/src/lib/eth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import { EventEmitter } from 'events';
import { Logger } from 'pino';
import { RedisClientType } from 'redis';

import { Eth } from '../index';
import { MirrorNodeClient } from './clients';
Expand All @@ -19,13 +20,16 @@ import {
IBlockService,
ICommonService,
IContractService,
LocalPendingTransactionStorage,
TransactionPoolService,
TransactionService,
} from './services';
import type { CacheService } from './services/cacheService/cacheService';
import { FeeService } from './services/ethService/feeService/FeeService';
import { IFeeService } from './services/ethService/feeService/IFeeService';
import { ITransactionService } from './services/ethService/transactionService/ITransactionService';
import HAPIService from './services/hapiService/hapiService';
import { RedisPendingTransactionStorage } from './services/transactionPoolService/RedisPendingTransactionStorage';
import {
IContractCallRequest,
IFeeHistory,
Expand Down Expand Up @@ -123,6 +127,7 @@ export class EthImpl implements Eth {
logger: Logger,
chain: string,
public readonly cacheService: CacheService,
public readonly redisClient: RedisClientType | undefined,
) {
this.chain = chain;
this.logger = logger;
Expand All @@ -134,6 +139,10 @@ export class EthImpl implements Eth {
this.contractService = new ContractService(cacheService, this.common, hapiService, logger, mirrorNodeClient);
this.accountService = new AccountService(cacheService, this.common, logger, mirrorNodeClient);
this.blockService = new BlockService(cacheService, chain, this.common, mirrorNodeClient, logger);
const storage = this.redisClient
? new RedisPendingTransactionStorage(this.redisClient)
: new LocalPendingTransactionStorage();
const transactionPoolService = new TransactionPoolService(storage, logger);
this.transactionService = new TransactionService(
cacheService,
chain,
Expand All @@ -142,6 +151,7 @@ export class EthImpl implements Eth {
hapiService,
logger,
mirrorNodeClient,
transactionPoolService,
);
}

Expand Down
2 changes: 1 addition & 1 deletion packages/relay/src/lib/relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ export class Relay {
logger.child({ name: 'relay-eth' }),
chainId,
this.cacheService,
this.redisClient,
);

(this.ethImpl as EthImpl).eventEmitter.on('eth_execution', (args: IEthExecutionEventPayload) => {
Expand All @@ -200,7 +201,6 @@ export class Relay {

hapiService.eventEmitter.on('execute_transaction', (args: IExecuteTransactionEventPayload) => {
this.metricService.captureTransactionMetrics(args).then();
// TODO: Call transactionPoolService.onConsensusResult(args) when service is available
});

hapiService.eventEmitter.on('execute_query', (args: IExecuteQueryEventPayload) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { Precheck } from '../../../precheck';
import { ITransactionReceipt, RequestDetails, TypedEvents } from '../../../types';
import { CacheService } from '../../cacheService/cacheService';
import HAPIService from '../../hapiService/hapiService';
import { ICommonService } from '../../index';
import { ICommonService, TransactionPoolService } from '../../index';
import { ITransactionService } from './ITransactionService';

export class TransactionService implements ITransactionService {
Expand Down Expand Up @@ -66,6 +66,8 @@ export class TransactionService implements ITransactionService {
*/
private readonly precheck: Precheck;

private readonly transactionPoolService: TransactionPoolService;

/**
* The ID of the chain, as a hex string, as it would be returned in a JSON-RPC call.
* @private
Expand All @@ -83,6 +85,7 @@ export class TransactionService implements ITransactionService {
hapiService: HAPIService,
logger: Logger,
mirrorNodeClient: MirrorNodeClient,
transactionPoolService: TransactionPoolService,
) {
this.cacheService = cacheService;
this.chain = chain;
Expand All @@ -92,6 +95,7 @@ export class TransactionService implements ITransactionService {
this.logger = logger;
this.mirrorNodeClient = mirrorNodeClient;
this.precheck = new Precheck(mirrorNodeClient, logger, chain);
this.transactionPoolService = transactionPoolService;
}

/**
Expand Down Expand Up @@ -455,6 +459,15 @@ export class TransactionService implements ITransactionService {
return input.startsWith(constants.EMPTY_HEX) ? input.substring(2) : input;
}

/**
* Narrows an ethers Transaction to one that definitely has a non-null hash.
*/
private assertSignedTransaction(tx: EthersTransaction): asserts tx is EthersTransaction & { hash: string } {
if (tx.hash == null) {
throw predefined.INVALID_ARGUMENTS('Expected a signed transaction with a non-null hash');
}
}

/**
* Asynchronously processes a raw transaction by submitting it to the network, managing HFS, polling the MN, handling errors, and returning the transaction hash.
*
Expand All @@ -471,10 +484,16 @@ export class TransactionService implements ITransactionService {
networkGasPriceInWeiBars: number,
requestDetails: RequestDetails,
): Promise<string | JsonRpcError> {
// although we validate in earlier stages that we have
// a signed transaction, we need to assert it again here in order to satisfy the type checker
this.assertSignedTransaction(parsedTx);
let sendRawTransactionError: any;

const originalCallerAddress = parsedTx.from?.toString() || '';

// Save the transaction to the transaction pool before submitting it to the network
await this.transactionPoolService.saveTransaction(originalCallerAddress, parsedTx);

this.eventEmitter.emit('eth_execution', {
method: constants.ETH_SEND_RAW_TRANSACTION,
});
Expand Down Expand Up @@ -530,12 +549,18 @@ export class TransactionService implements ITransactionService {
);
}

// Remove the transaction from the transaction pool after successful submission
await this.transactionPoolService.removeTransaction(originalCallerAddress, contractResult.hash);

return contractResult.hash;
} catch (e: any) {
sendRawTransactionError = e;
}
}

// Remove the transaction from the transaction pool after unsuccessful submission
await this.transactionPoolService.removeTransaction(originalCallerAddress, parsedTx.hash);

// If this point is reached, it means that no valid transaction hash was returned. Therefore, an error must have occurred.
return await this.sendRawTransactionErrorHandler(
sendRawTransactionError,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ export class TransactionPoolService implements ITransactionPoolService {
*/
async saveTransaction(address: string, tx: Transaction): Promise<void> {
const txHash = tx.hash;
const addressLowerCased = address.toLowerCase();

if (!txHash) {
throw new Error('Transaction hash is required for storage');
}

const result = await this.storage.addToList(address, txHash);
const result = await this.storage.addToList(addressLowerCased, txHash);

if (!result.ok) {
throw new Error('Failed to add transaction to list');
Expand All @@ -62,33 +63,6 @@ export class TransactionPoolService implements ITransactionPoolService {
this.logger.debug({ address, txHash, pendingCount: result.newValue }, 'Transaction saved to pool');
}

/**
* Handles consensus results and updates the pool state accordingly.
*
* @param payload - The transaction execution event payload containing transaction details.
* @returns A promise that resolves when the consensus result has been processed.
*/
async onConsensusResult(payload: IExecuteTransactionEventPayload): Promise<void> {
const { transactionHash, originalCallerAddress, transactionId } = payload;

if (!transactionHash) {
this.logger.warn({ transactionId }, 'Transaction hash not available in execution event');
return;
}

const remainingCount = await this.removeTransaction(originalCallerAddress, transactionHash);

this.logger.debug(
{
transactionHash,
address: originalCallerAddress,
transactionId,
remainingCount,
},
'Transaction removed from pool after consensus',
);
}

/**
* Removes a specific transaction from the pending pool.
* This is typically called when a transaction is confirmed or fails on the consensus layer.
Expand All @@ -98,7 +72,8 @@ export class TransactionPoolService implements ITransactionPoolService {
* @returns A promise that resolves to the new pending transaction count for the address.
*/
async removeTransaction(address: string, txHash: string): Promise<number> {
return await this.storage.removeFromList(address, txHash);
const addressLowerCased = address.toLowerCase();
return await this.storage.removeFromList(addressLowerCased, txHash);
}

/**
Expand All @@ -108,7 +83,8 @@ export class TransactionPoolService implements ITransactionPoolService {
* @returns A promise that resolves to the number of pending transactions.
*/
async getPendingCount(address: string): Promise<number> {
return await this.storage.getList(address);
const addressLowerCased = address.toLowerCase();
return await this.storage.getList(addressLowerCased);
}

/**
Expand Down
10 changes: 0 additions & 10 deletions packages/relay/src/lib/types/transactionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import { Transaction } from 'ethers';

import { IExecuteTransactionEventPayload } from './events';

/**
* Result of attempting to add a transaction to the pending list.
*
Expand All @@ -26,14 +24,6 @@ export interface TransactionPoolService {
*/
saveTransaction(address: string, tx: Transaction): Promise<void>;

/**
* Handles consensus results and updates the pool state accordingly.
*
* @param payload - The transaction execution event payload containing transaction details.
* @returns A promise that resolves when the consensus result has been received.
*/
onConsensusResult(payload: IExecuteTransactionEventPayload): Promise<void>;

/**
* Retrieves the number of pending transactions for a given address.
*
Expand Down
57 changes: 57 additions & 0 deletions packages/relay/tests/lib/eth/eth_sendRawTransaction.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,34 @@ describe('@ethSendRawTransaction eth_sendRawTransaction spec', async function ()
);
});

it('should save and remove transaction from transaction pool on success path', async function () {
const signed = await signTransaction(transaction);
const txPool = ethImpl['transactionService']['transactionPoolService'] as any;

const saveStub = sinon.stub(txPool, 'saveTransaction').resolves();
const removeStub = sinon.stub(txPool, 'removeTransaction').resolves();

restMock.onGet(contractResultEndpoint).reply(200, JSON.stringify({ hash: ethereumHash }));
sdkClientStub.submitEthereumTransaction.resolves({
txResponse: {
transactionId: TransactionId.fromString(transactionIdServicesFormat),
} as unknown as TransactionResponse,
fileId: null,
});

const result = await ethImpl.sendRawTransaction(signed, requestDetails);

expect(result).to.equal(ethereumHash);
sinon.assert.calledOnce(saveStub);
sinon.assert.calledWithMatch(saveStub, accountAddress, sinon.match.object);

sinon.assert.calledOnce(removeStub);
sinon.assert.calledWith(removeStub, accountAddress, ethereumHash);

saveStub.restore();
removeStub.restore();
});

withOverriddenEnvsInMochaTest({ USE_ASYNC_TX_PROCESSING: false }, () => {
it('[USE_ASYNC_TX_PROCESSING=true] should throw internal error when transaction returned from mirror node is null', async function () {
const signed = await signTransaction(transaction);
Expand All @@ -307,6 +335,35 @@ describe('@ethSendRawTransaction eth_sendRawTransaction spec', async function ()
expect(`Error invoking RPC: ${response.message}`).to.equal(predefined.INTERNAL_ERROR(response.message).message);
});

it('should save and remove transaction (fallback path uses parsedTx.hash)', async function () {
const signed = await signTransaction(transaction);
const expectedTxHash = Utils.computeTransactionHash(Buffer.from(signed.replace('0x', ''), 'hex'));
const txPool = ethImpl['transactionService']['transactionPoolService'] as any;

const saveStub = sinon.stub(txPool, 'saveTransaction').resolves();
const removeStub = sinon.stub(txPool, 'removeTransaction').resolves();

// Cause MN polling to fail, forcing fallback
restMock.onGet(contractResultEndpoint).reply(404, JSON.stringify(mockData.notFound));
sdkClientStub.submitEthereumTransaction.resolves({
txResponse: {
transactionId: TransactionId.fromString(transactionIdServicesFormat),
} as unknown as TransactionResponse,
fileId: null,
});

await ethImpl.sendRawTransaction(signed, requestDetails);

sinon.assert.calledOnce(saveStub);
sinon.assert.calledWithMatch(saveStub, accountAddress, sinon.match.object);

sinon.assert.calledOnce(removeStub);
sinon.assert.calledWith(removeStub, accountAddress, expectedTxHash);

saveStub.restore();
removeStub.restore();
});

it('[USE_ASYNC_TX_PROCESSING=false] should throw internal error when transactionID is invalid', async function () {
const signed = await signTransaction(transaction);

Expand Down
Loading