Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,49 @@
this.supportedTypes = [constants.FILTER.TYPE.LOG, constants.FILTER.TYPE.NEW_BLOCK];
}

/**
* Generates cache key for filter ID
* @param filterId
* @private
*/
private getCacheKey(filterId: string): string {
return `${constants.CACHE_KEY.FILTERID}_${filterId}`;
}

/**
* Updates filter cache with new data
* @param filterId
* @param type
* @param params
* @param lastQueried
* @param method
* @param requestDetails
* @private
*/
private async updateFilterCache(
filterId: string,
type: string,
params: any,
lastQueried: number | null,
method: string,
requestDetails: RequestDetails,
): Promise<void> {
const cacheKey = this.getCacheKey(filterId);
await this.cacheService.set(cacheKey, { type, params, lastQueried }, method, requestDetails, constants.FILTER.TTL);
}

/**
* Retrieves filter from cache
* @param filterId
* @param method
* @param requestDetails
* @private
*/
private async getFilterFromCache(filterId: string, method: string, requestDetails: RequestDetails) {
const cacheKey = this.getCacheKey(filterId);
return await this.cacheService.getAsync(cacheKey, method, requestDetails);
}

/**
* Creates a new filter with the specified type and parameters
* @param type
Expand All @@ -70,20 +113,12 @@
*/
async createFilter(type: string, params: any, requestDetails: RequestDetails): Promise<string> {
const filterId = generateRandomHex();
const cacheKey = `${constants.CACHE_KEY.FILTERID}_${filterId}`;
await this.cacheService.set(
cacheKey,
{
type,
params,
lastQueried: null,
},
this.ethNewFilter,
requestDetails,
constants.FILTER.TTL,
);
await this.updateFilterCache(filterId, type, params, null, this.ethNewFilter, requestDetails);

if (this.logger.isLevelEnabled('trace')) {
this.logger.trace(`${requestDetails.formattedRequestId} created filter with TYPE=${type}, params: ${params}`);
this.logger.trace(

Check warning on line 119 in packages/relay/src/lib/services/ethService/ethFilterService/FilterService.ts

View check run for this annotation

Codecov / codecov/patch

packages/relay/src/lib/services/ethService/ethFilterService/FilterService.ts#L119

Added line #L119 was not covered by tests
`${requestDetails.formattedRequestId} created filter with TYPE=${type}, params: ${JSON.stringify(params)}`,
);
}
return filterId;
}
Expand Down Expand Up @@ -130,27 +165,24 @@
}

async newBlockFilter(requestDetails: RequestDetails): Promise<string> {
try {
FilterService.requireFiltersEnabled();
return await this.createFilter(
constants.FILTER.TYPE.NEW_BLOCK,
{
blockAtCreation: await this.common.getLatestBlockNumber(requestDetails),
},
requestDetails,
);
} catch (e) {
throw this.common.genericErrorHandler(e);
}
FilterService.requireFiltersEnabled();

return await this.createFilter(
constants.FILTER.TYPE.NEW_BLOCK,
{
blockAtCreation: await this.common.getLatestBlockNumber(requestDetails),
},
requestDetails,
);
}

public async uninstallFilter(filterId: string, requestDetails: RequestDetails): Promise<boolean> {
FilterService.requireFiltersEnabled();

const cacheKey = `${constants.CACHE_KEY.FILTERID}_${filterId}`;
const filter = await this.cacheService.getAsync(cacheKey, this.ethUninstallFilter, requestDetails);
const filter = await this.getFilterFromCache(filterId, this.ethUninstallFilter, requestDetails);

if (filter) {
const cacheKey = this.getCacheKey(filterId);
await this.cacheService.delete(cacheKey, this.ethUninstallFilter, requestDetails);
return true;
}
Expand All @@ -165,9 +197,8 @@
public async getFilterLogs(filterId: string, requestDetails: RequestDetails): Promise<Log[]> {
FilterService.requireFiltersEnabled();

const cacheKey = `${constants.CACHE_KEY.FILTERID}_${filterId}`;
const filter = await this.cacheService.getAsync(cacheKey, this.ethGetFilterLogs, requestDetails);
if (filter?.type != constants.FILTER.TYPE.LOG) {
const filter = await this.getFilterFromCache(filterId, this.ethGetFilterLogs, requestDetails);
if (filter?.type !== constants.FILTER.TYPE.LOG) {
throw predefined.FILTER_NOT_FOUND;
}

Expand All @@ -181,82 +212,114 @@
);

// update filter to refresh TTL
await this.cacheService.set(
cacheKey,
{
type: filter.type,
params: filter.params,
lastQueried: filter.lastQueried,
},
await this.updateFilterCache(
filterId,
filter.type,
filter.params,
filter.lastQueried,
this.ethGetFilterChanges,
requestDetails,
constants.FILTER.TTL,
);

return logs;
}

/**
* Handles log filter changes
* @param filter
* @param requestDetails
* @private
*/
private async handleLogFilterChanges(
filter: any,
requestDetails: RequestDetails,
): Promise<{ result: Log[]; latestBlockNumber: number }> {
const result = await this.common.getLogs(
null,
filter?.lastQueried || filter?.params.fromBlock,
filter?.params.toBlock,
filter?.params.address,
filter?.params.topics,
requestDetails,
);

// get the latest block number and add 1 to exclude current results from the next response because
// the mirror node query executes "gte" not "gt"
const latestBlockNumber =
Number(
result.length ? result[result.length - 1].blockNumber : await this.common.getLatestBlockNumber(requestDetails),
) + 1;

return { result, latestBlockNumber };
}

/**
* Handles new block filter changes
* @param filter
* @param requestDetails
* @private
*/
private async handleNewBlockFilterChanges(
filter: any,
requestDetails: RequestDetails,
): Promise<{ result: string[]; latestBlockNumber: number }> {
const blockResponse = await this.mirrorNodeClient.getBlocks(
requestDetails,
[`gt:${filter.lastQueried || filter.params.blockAtCreation}`],
undefined,
{
order: 'asc',
},
);

const latestBlockNumber = Number(
blockResponse?.blocks?.length
? blockResponse.blocks[blockResponse.blocks.length - 1].number
: await this.common.getLatestBlockNumber(requestDetails),
);

const result = blockResponse?.blocks?.map((r) => r.hash) || [];

return { result, latestBlockNumber };
}

public async getFilterChanges(filterId: string, requestDetails: RequestDetails): Promise<string[] | Log[]> {
FilterService.requireFiltersEnabled();

const cacheKey = `${constants.CACHE_KEY.FILTERID}_${filterId}`;
const filter = await this.cacheService.getAsync(cacheKey, this.ethGetFilterChanges, requestDetails);
const filter = await this.getFilterFromCache(filterId, this.ethGetFilterChanges, requestDetails);

if (!filter) {
throw predefined.FILTER_NOT_FOUND;
}

let result, latestBlockNumber;
if (filter.type === constants.FILTER.TYPE.LOG) {
result = await this.common.getLogs(
null,
filter?.lastQueried || filter?.params.fromBlock,
filter?.params.toBlock,
filter?.params.address,
filter?.params.topics,
requestDetails,
);
let result: string[] | Log[];
let latestBlockNumber: number;

// get the latest block number and add 1 to exclude current results from the next response because
// the mirror node query executes "gte" not "gt"
latestBlockNumber =
Number(
result.length
? result[result.length - 1].blockNumber
: await this.common.getLatestBlockNumber(requestDetails),
) + 1;
} else if (filter.type === constants.FILTER.TYPE.NEW_BLOCK) {
result = await this.mirrorNodeClient.getBlocks(
requestDetails,
[`gt:${filter.lastQueried || filter.params.blockAtCreation}`],
undefined,
{
order: 'asc',
},
);

latestBlockNumber = Number(
result?.blocks?.length
? result.blocks[result.blocks.length - 1].number
: await this.common.getLatestBlockNumber(requestDetails),
);

result = result?.blocks?.map((r) => r.hash) || [];
} else if (this.supportedTypes.indexOf(filter.type) === -1) {
throw predefined.UNSUPPORTED_METHOD;
switch (filter.type) {
case constants.FILTER.TYPE.LOG: {
const logResult = await this.handleLogFilterChanges(filter, requestDetails);
result = logResult.result;
latestBlockNumber = logResult.latestBlockNumber;
break;
}
case constants.FILTER.TYPE.NEW_BLOCK: {
const blockResult = await this.handleNewBlockFilterChanges(filter, requestDetails);
result = blockResult.result;
latestBlockNumber = blockResult.latestBlockNumber;
break;
}
default:
throw predefined.UNSUPPORTED_METHOD;
}

// update filter to refresh TTL and set lastQueried block number
await this.cacheService.set(
cacheKey,
{
type: filter.type,
params: filter.params,
lastQueried: latestBlockNumber,
},
await this.updateFilterCache(
filterId,
filter.type,
filter.params,
latestBlockNumber,
this.ethGetFilterChanges,
requestDetails,
constants.FILTER.TTL,
);

return result;
Expand Down
Loading
Loading