diff --git a/.eslintrc.js b/.eslintrc.js index dab9e56ba9..dfaff3d240 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -29,6 +29,16 @@ module.exports = { "sourceType": "script", }, }, + { + "files": [ + "**/*.spec.ts", + "**/*.test.ts", + "**/tests/**/*.ts", + ], + "rules": { + "@typescript-eslint/no-unused-expressions": "off", + }, + }, ], "parserOptions": { "ecmaVersion": "latest", @@ -39,7 +49,6 @@ module.exports = { "@typescript-eslint/no-explicit-any": "off", "@typescript-eslint/no-var-requires": "warn", "@typescript-eslint/no-unused-vars": "warn", - "@typescript-eslint/ban-types": "warn", "no-trailing-spaces": "error", "no-useless-escape": "warn", "prefer-const": "error", diff --git a/packages/relay/src/lib/config/methodConfiguration.ts b/packages/relay/src/lib/config/methodConfiguration.ts index 59ba8e928c..22493cb22b 100644 --- a/packages/relay/src/lib/config/methodConfiguration.ts +++ b/packages/relay/src/lib/config/methodConfiguration.ts @@ -4,181 +4,194 @@ import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services' import { MethodRateLimitConfiguration } from '../types'; -const tier1rateLimit = ConfigService.get('TIER_1_RATE_LIMIT'); -const tier2rateLimit = ConfigService.get('TIER_2_RATE_LIMIT'); -const tier3rateLimit = ConfigService.get('TIER_3_RATE_LIMIT'); +// Lazy getter function that reads config at call time (not module load time) +// This allows test environment overrides to work properly +export function getMethodConfiguration(): MethodRateLimitConfiguration { + const tier1rateLimit = ConfigService.get('TIER_1_RATE_LIMIT'); + const tier2rateLimit = ConfigService.get('TIER_2_RATE_LIMIT'); + const tier3rateLimit = ConfigService.get('TIER_3_RATE_LIMIT'); -// total requests per rate limit duration (default ex. 200 request per 60000ms) -export const methodConfiguration: MethodRateLimitConfiguration = { - web3_clientVersion: { - total: tier3rateLimit, - }, - web3_sha3: { - total: tier3rateLimit, - }, - net_listening: { - total: tier3rateLimit, - }, - net_version: { - total: tier3rateLimit, - }, - net_peerCount: { - total: tier3rateLimit, - }, - ADMIN_CONFIG: { - total: tier3rateLimit, - }, - eth_blockNumber: { - total: tier2rateLimit, - }, - eth_call: { - total: tier1rateLimit, - }, - eth_coinbase: { - total: tier2rateLimit, - }, - eth_simulateV1: { - total: tier2rateLimit, - }, - eth_blobBaseFee: { - total: tier2rateLimit, - }, - eth_estimateGas: { - total: tier2rateLimit, - }, - eth_gasPrice: { - total: tier2rateLimit, - }, - eth_getBalance: { - total: tier2rateLimit, - }, - eth_getBlockByHash: { - total: tier2rateLimit, - }, - eth_getBlockByNumber: { - total: tier2rateLimit, - }, - eth_getBlockReceipts: { - total: tier2rateLimit, - }, - eth_getBlockTransactionCountByHash: { - total: tier2rateLimit, - }, - eth_getBlockTransactionCountByNumber: { - total: tier2rateLimit, - }, - eth_getCode: { - total: tier2rateLimit, - }, - eth_chainId: { - total: tier2rateLimit, - }, - eth_getFilterChanges: { - total: tier2rateLimit, - }, - eth_getLogs: { - total: tier2rateLimit, - }, - eth_getStorageAt: { - total: tier2rateLimit, - }, - eth_getTransactionByBlockHashAndIndex: { - total: tier2rateLimit, - }, - eth_getTransactionByBlockNumberAndIndex: { - total: tier2rateLimit, - }, - eth_getTransactionByHash: { - total: tier2rateLimit, - }, - eth_getTransactionCount: { - total: tier2rateLimit, - }, - eth_getTransactionReceipt: { - total: tier2rateLimit, - }, - eth_getUncleByBlockHashAndIndex: { - total: tier2rateLimit, - }, - eth_getUncleByBlockNumberAndIndex: { - total: tier2rateLimit, - }, - eth_getUncleCountByBlockHash: { - total: tier2rateLimit, - }, - eth_getUncleCountByBlockNumber: { - total: tier2rateLimit, - }, - eth_getWork: { - total: tier2rateLimit, - }, - eth_feeHistory: { - total: tier2rateLimit, - }, - eth_hashrate: { - total: tier1rateLimit, - }, - eth_maxPriorityFeePerGas: { - total: tier1rateLimit, - }, - eth_mining: { - total: tier1rateLimit, - }, - eth_protocolVersion: { - total: tier2rateLimit, - }, - eth_sendRawTransaction: { - total: tier1rateLimit, - }, - eth_sendTransaction: { - total: tier1rateLimit, - }, - eth_sign: { - total: tier1rateLimit, - }, - eth_signTransaction: { - total: tier1rateLimit, - }, - eth_submitHashrate: { - total: tier1rateLimit, - }, - eth_submitWork: { - total: tier1rateLimit, - }, - eth_syncing: { - total: tier1rateLimit, - }, - eth_accounts: { - total: tier2rateLimit, - }, - eth_newBlockFilter: { - total: tier2rateLimit, - }, - eth_newPendingTransactionFilter: { - total: tier2rateLimit, - }, - eth_newFilter: { - total: tier2rateLimit, - }, - eth_uninstallFilter: { - total: tier2rateLimit, - }, - eth_getFilterLogs: { - total: tier2rateLimit, - }, - debug_traceTransaction: { - total: tier1rateLimit, - }, - debug_traceBlockByNumber: { - total: tier1rateLimit, - }, - batch_request: { - total: tier1rateLimit, - }, - eth_getProof: { - total: tier2rateLimit, - }, - eth_createAccessList: { - total: tier2rateLimit, - }, -}; + return { + web3_clientVersion: { + total: tier3rateLimit, + }, + web3_sha3: { + total: tier3rateLimit, + }, + net_listening: { + total: tier3rateLimit, + }, + net_version: { + total: tier3rateLimit, + }, + net_peerCount: { + total: tier3rateLimit, + }, + ADMIN_CONFIG: { + total: tier3rateLimit, + }, + eth_blockNumber: { + total: tier2rateLimit, + }, + eth_call: { + total: tier1rateLimit, + }, + eth_coinbase: { + total: tier2rateLimit, + }, + eth_simulateV1: { + total: tier2rateLimit, + }, + eth_blobBaseFee: { + total: tier2rateLimit, + }, + eth_estimateGas: { + total: tier2rateLimit, + }, + eth_gasPrice: { + total: tier2rateLimit, + }, + eth_getBalance: { + total: tier2rateLimit, + }, + eth_getBlockByHash: { + total: tier2rateLimit, + }, + eth_getBlockByNumber: { + total: tier2rateLimit, + }, + eth_getBlockReceipts: { + total: tier2rateLimit, + }, + eth_getBlockTransactionCountByHash: { + total: tier2rateLimit, + }, + eth_getBlockTransactionCountByNumber: { + total: tier2rateLimit, + }, + eth_getCode: { + total: tier2rateLimit, + }, + eth_chainId: { + total: tier2rateLimit, + }, + eth_getFilterChanges: { + total: tier2rateLimit, + }, + eth_getLogs: { + total: tier2rateLimit, + }, + eth_getStorageAt: { + total: tier2rateLimit, + }, + eth_getTransactionByBlockHashAndIndex: { + total: tier2rateLimit, + }, + eth_getTransactionByBlockNumberAndIndex: { + total: tier2rateLimit, + }, + eth_getTransactionByHash: { + total: tier2rateLimit, + }, + eth_getTransactionCount: { + total: tier2rateLimit, + }, + eth_getTransactionReceipt: { + total: tier2rateLimit, + }, + eth_getUncleByBlockHashAndIndex: { + total: tier2rateLimit, + }, + eth_getUncleByBlockNumberAndIndex: { + total: tier2rateLimit, + }, + eth_getUncleCountByBlockHash: { + total: tier2rateLimit, + }, + eth_getUncleCountByBlockNumber: { + total: tier2rateLimit, + }, + eth_getWork: { + total: tier2rateLimit, + }, + eth_feeHistory: { + total: tier2rateLimit, + }, + eth_hashrate: { + total: tier1rateLimit, + }, + eth_maxPriorityFeePerGas: { + total: tier1rateLimit, + }, + eth_mining: { + total: tier1rateLimit, + }, + eth_protocolVersion: { + total: tier2rateLimit, + }, + eth_sendRawTransaction: { + total: tier1rateLimit, + }, + eth_sendTransaction: { + total: tier1rateLimit, + }, + eth_sign: { + total: tier1rateLimit, + }, + eth_signTransaction: { + total: tier1rateLimit, + }, + eth_submitHashrate: { + total: tier1rateLimit, + }, + eth_submitWork: { + total: tier1rateLimit, + }, + eth_syncing: { + total: tier1rateLimit, + }, + eth_accounts: { + total: tier2rateLimit, + }, + eth_newBlockFilter: { + total: tier2rateLimit, + }, + eth_newPendingTransactionFilter: { + total: tier2rateLimit, + }, + eth_newFilter: { + total: tier2rateLimit, + }, + eth_uninstallFilter: { + total: tier2rateLimit, + }, + eth_getFilterLogs: { + total: tier2rateLimit, + }, + debug_traceTransaction: { + total: tier1rateLimit, + }, + debug_traceBlockByNumber: { + total: tier1rateLimit, + }, + batch_request: { + total: tier1rateLimit, + }, + eth_getProof: { + total: tier2rateLimit, + }, + eth_createAccessList: { + total: tier2rateLimit, + }, + }; +} + +// Backwards compatibility: Export a Proxy that lazily calls getMethodConfiguration() +// This allows existing code to use `methodConfiguration.eth_chainId` without changes +// A Proxy is used for lazy evaluation to ensure test configuration overrides work correctly +// (ConfigService is read at access time, not module load time). +export const methodConfiguration = new Proxy({} as MethodRateLimitConfiguration, { + get(target, prop) { + return getMethodConfiguration()[prop as keyof MethodRateLimitConfiguration]; + }, +}); diff --git a/packages/relay/src/lib/relay.ts b/packages/relay/src/lib/relay.ts index 53ad486516..1e1f099700 100644 --- a/packages/relay/src/lib/relay.ts +++ b/packages/relay/src/lib/relay.ts @@ -116,13 +116,13 @@ export class Relay { private readonly rpcMethodDispatcher: RpcMethodDispatcher; /** - * Initializes the main components of the relay service, including Hedera network clients, - * Ethereum-compatible interfaces, caching, metrics, and subscription management. + * Private constructor to prevent direct instantiation. + * Use Relay.init() static factory method instead. * * @param {Logger} logger - Logger instance for logging system messages. * @param {Registry} register - Registry instance for registering metrics. */ - constructor( + private constructor( private readonly logger: Logger, register: Registry, ) { @@ -341,4 +341,28 @@ export class Relay { this.logger.info(`Operator account '${operator}' has balance: ${balance}`); } } + + /** + * Static factory method to create and initialize a Relay instance. + * This is the recommended way to create a Relay instance as it ensures + * all async initialization (operator balance check) is complete. + * + * @param {Logger} logger - Logger instance for logging system messages. + * @param {Registry} register - Registry instance for registering metrics. + * @returns {Promise} A fully initialized Relay instance. + * + * @example + * ```typescript + * const relay = await Relay.init(logger, register); + * ``` + */ + static async init(logger: Logger, register: Registry): Promise { + // Create Relay instance + const relay = new Relay(logger, register); + + // Check operator balance if not in read-only mode + await relay.ensureOperatorHasBalance(); + + return relay; + } } diff --git a/packages/relay/tests/lib/admin.spec.ts b/packages/relay/tests/lib/admin.spec.ts index 60255fc113..94a43660c9 100644 --- a/packages/relay/tests/lib/admin.spec.ts +++ b/packages/relay/tests/lib/admin.spec.ts @@ -3,30 +3,41 @@ import { expect } from 'chai'; import pino from 'pino'; import { Registry } from 'prom-client'; +import sinon from 'sinon'; import { Relay } from '../../src/lib/relay'; -import { RequestDetails } from '../../src/lib/types'; import { withOverriddenEnvsInMochaTest } from '../helpers'; const logger = pino({ level: 'silent' }); let relay: Relay; -const requestDetails = new RequestDetails({ requestId: 'admin', ipAddress: '0.0.0.0' }); describe('Admin', async function () { + // we used to initialize the relay by using the constructor, but now we use the init method + // which checks the operator balance, we want to stub this method, its not part of the test + before(() => { + sinon.stub(Relay.prototype, 'ensureOperatorHasBalance').resolves(); + }); + + after(() => { + sinon.restore(); + }); + it('should execute config', async () => { - relay = new Relay(logger, new Registry()); - const res = await relay.admin().config(requestDetails); + relay = await Relay.init(logger, new Registry()); + const res = await relay.admin().config(); expect(res).to.haveOwnProperty('relay'); expect(res).to.haveOwnProperty('upstreamDependencies'); expect(res.relay).to.haveOwnProperty('version'); expect(res.relay).to.haveOwnProperty('config'); - expect(res.relay.config).to.not.be.empty; + const keys = Object.keys(res.relay.config); + expect(keys).to.have.length.greaterThan(0); for (const service of res.upstreamDependencies) { expect(service).to.haveOwnProperty('config'); expect(service).to.haveOwnProperty('service'); - expect(service.config).to.not.be.empty; + const keys = Object.keys(service.config); + expect(keys).to.have.length.greaterThan(0); } }); @@ -41,10 +52,10 @@ describe('Admin', async function () { }, () => { it(`should return a valid consensus version for ${networkName}`, async () => { - const tempRelay = new Relay(logger, new Registry()); - const res = await tempRelay.admin().config(requestDetails); + const tempRelay = await Relay.init(logger, new Registry()); + const res = await tempRelay.admin().config(); const regex = /^\d+\.\d+\.\d+.*$/; - expect(res.upstreamDependencies[0].version.match(regex)).to.not.be.empty; + expect(res.upstreamDependencies[0].version.match(regex)).to.have.length.greaterThan(0); }); }, ); @@ -56,8 +67,8 @@ describe('Admin', async function () { }, () => { it(`should return a valid consensus version for local network`, async () => { - const tempRelay = new Relay(logger, new Registry()); - const res = await tempRelay.admin().config(requestDetails); + const tempRelay = await Relay.init(logger, new Registry()); + const res = await tempRelay.admin().config(); expect(res.upstreamDependencies[0].version).to.equal('local'); }); }, diff --git a/packages/relay/tests/lib/net.spec.ts b/packages/relay/tests/lib/net.spec.ts index 5d04cbc160..61e69f3d18 100644 --- a/packages/relay/tests/lib/net.spec.ts +++ b/packages/relay/tests/lib/net.spec.ts @@ -4,22 +4,29 @@ import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services' import { expect } from 'chai'; import pino from 'pino'; import { Registry } from 'prom-client'; +import sinon from 'sinon'; import { Relay } from '../../src/lib/relay'; import { withOverriddenEnvsInMochaTest } from '../helpers'; const logger = pino({ level: 'silent' }); -let relay: Relay; describe('Net', async function () { - it('should execute "net_listening"', function () { - relay = new Relay(logger, new Registry()); + before(() => { + sinon.stub(Relay.prototype, 'ensureOperatorHasBalance').resolves(); + }); + + after(() => { + sinon.restore(); + }); + it('should execute "net_listening"', async function () { + const relay = await Relay.init(logger, new Registry()); const result = relay.net().listening(); expect(result).to.eq(true); }); - it('should execute "net_version"', function () { - relay = new Relay(logger, new Registry()); + it('should execute "net_version"', async function () { + const relay = await Relay.init(logger, new Registry()); const expectedNetVersion = parseInt(ConfigService.get('CHAIN_ID'), 16).toString(); const actualNetVersion = relay.net().version(); @@ -27,32 +34,32 @@ describe('Net', async function () { }); withOverriddenEnvsInMochaTest({ CHAIN_ID: '123' }, () => { - it('should set chainId from CHAIN_ID environment variable', () => { - relay = new Relay(logger, new Registry()); + it('should set chainId from CHAIN_ID environment variable', async () => { + const relay = await Relay.init(logger, new Registry()); const actualNetVersion = relay.net().version(); expect(actualNetVersion).to.equal('123'); }); }); withOverriddenEnvsInMochaTest({ CHAIN_ID: '0x1a' }, () => { - it('should set chainId from CHAIN_ID environment variable starting with 0x', () => { - relay = new Relay(logger, new Registry()); + it('should set chainId from CHAIN_ID environment variable starting with 0x', async () => { + const relay = await Relay.init(logger, new Registry()); const actualNetVersion = relay.net().version(); expect(actualNetVersion).to.equal('26'); // 0x1a in decimal is 26 }); }); withOverriddenEnvsInMochaTest({ HEDERA_NETWORK: undefined }, () => { - it('should throw error if required configuration is set to undefined', () => { - expect(() => new Relay(logger, new Registry())).to.throw( + it('should throw error if required configuration is set to undefined', async () => { + await expect(Relay.init(logger, new Registry())).to.be.rejectedWith( 'Configuration error: HEDERA_NETWORK is a mandatory configuration for relay operation.', ); }); }); withOverriddenEnvsInMochaTest({ HEDERA_NETWORK: 'mainnet', CHAIN_ID: '0x2' }, () => { - it('should prioritize CHAIN_ID over HEDERA_NETWORK', () => { - relay = new Relay(logger, new Registry()); + it('should prioritize CHAIN_ID over HEDERA_NETWORK', async () => { + const relay = await Relay.init(logger, new Registry()); const actualNetVersion = relay.net().version(); expect(actualNetVersion).to.equal('2'); // 0x2 in decimal is 2 }); diff --git a/packages/relay/tests/lib/web3.spec.ts b/packages/relay/tests/lib/web3.spec.ts index 9a1f938e12..cf37cd8898 100644 --- a/packages/relay/tests/lib/web3.spec.ts +++ b/packages/relay/tests/lib/web3.spec.ts @@ -4,14 +4,25 @@ import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services' import { expect } from 'chai'; import pino from 'pino'; import { Registry } from 'prom-client'; +import sinon from 'sinon'; import { Relay } from '../../src'; import { withOverriddenEnvsInMochaTest } from '../helpers'; const logger = pino({ level: 'silent' }); -const relay = new Relay(logger, new Registry()); describe('Web3', function () { + let relay: Relay; + + before(async () => { + sinon.stub(Relay.prototype, 'ensureOperatorHasBalance').resolves(); + relay = await Relay.init(logger, new Registry()); + }); + + after(() => { + sinon.restore(); + }); + withOverriddenEnvsInMochaTest({ npm_package_version: '1.0.0' }, () => { it('should return "relay/1.0.0"', async function () { const clientVersion = relay.web3().clientVersion(); diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index a727bdcbf3..315c5b617f 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -2,21 +2,21 @@ import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services'; -import { setServerTimeout } from './koaJsonRpc/lib/utils'; // Import the 'setServerTimeout' function from the correct location -import app, { logger, relay } from './server'; +import { setServerTimeout } from './koaJsonRpc/lib/utils'; +import { initializeServer, logger } from './server'; async function main() { try { - await relay.ensureOperatorHasBalance(); + // Initialize server with the fully initialized Relay + const { app } = await initializeServer(); + const server = app.listen({ port: ConfigService.get('SERVER_PORT'), host: ConfigService.get('SERVER_HOST') }); + + // set request timeout to ensure sockets are closed after specified time of inactivity + setServerTimeout(server); } catch (error) { logger.fatal(error); process.exit(1); } - - const server = app.listen({ port: ConfigService.get('SERVER_PORT'), host: ConfigService.get('SERVER_HOST') }); - - // set request timeout to ensure sockets are closed after specified time of inactivity - setServerTimeout(server); } main(); diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index 4d1af98f29..6faa3efe13 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -39,46 +39,7 @@ const mainLogger = pino({ }); export const logger = mainLogger.child({ name: 'rpc-server' }); -const register = new Registry(); -export const relay: Relay = new Relay(logger.child({ name: 'relay' }), register); -const app = new KoaJsonRpc(logger.child({ name: 'koa-rpc' }), register, relay, { - limit: ConfigService.get('INPUT_SIZE_LIMIT') + 'mb', -}); - -collectDefaultMetrics({ register, prefix: 'rpc_relay_' }); - -// clear and create metric in registry -const metricHistogramName = 'rpc_relay_method_response'; -register.removeSingleMetric(metricHistogramName); -const methodResponseHistogram = new Histogram({ - name: metricHistogramName, - help: 'JSON RPC method statusCode latency histogram', - labelNames: ['method', 'statusCode'], - registers: [register], - buckets: [5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 20000, 30000, 40000, 50000, 60000], // ms (milliseconds) -}); - -// enable proxy support to trust proxy-added headers for client IP detection -app.getKoaApp().proxy = true; - -// Middleware to parse RFC 7239 Forwarded header and make it compatible with Koa's X-Forwarded-For parsing -app.getKoaApp().use(async (ctx, next) => { - // Only process if X-Forwarded-For doesn't exist but Forwarded does - if (!ctx.request.headers['x-forwarded-for'] && ctx.request.headers['forwarded']) { - const forwardedHeader = ctx.request.headers['forwarded'] as string; - - // Parse the Forwarded header to extract the client IP - // Format: Forwarded: for="192.168.1.1";by="10.0.0.1", for="203.0.113.1";by="10.0.0.2" - const clientIp = parseForwardedHeader(forwardedHeader); - - if (clientIp) { - // Set X-Forwarded-For so Koa can parse it normally - ctx.request.headers['x-forwarded-for'] = clientIp; - } - } - - await next(); -}); +export const register = new Registry(); /** * Parse RFC 7239 Forwarded header to extract the original client IP @@ -156,148 +117,198 @@ function parseForwardedHeader(forwardedHeader: string): string | null { } return ip; - } catch (error) { + } catch { // If parsing fails, return null to avoid breaking the request return null; } } -// Set CORS -app.getKoaApp().use(cors()); +/** + * Initialize the server components + */ +export async function initializeServer() { + const relay = await Relay.init(logger.child({ name: 'relay' }), register); + + const koaJsonRpc = new KoaJsonRpc(logger.child({ name: 'koa-rpc' }), register, relay, { + limit: ConfigService.get('INPUT_SIZE_LIMIT') + 'mb', + }); + + const app = koaJsonRpc.getKoaApp(); + + collectDefaultMetrics({ register, prefix: 'rpc_relay_' }); + + // clear and create metric in registry + const metricHistogramName = 'rpc_relay_method_response'; + register.removeSingleMetric(metricHistogramName); + const methodResponseHistogram = new Histogram({ + name: metricHistogramName, + help: 'JSON RPC method statusCode latency histogram', + labelNames: ['method', 'statusCode'], + registers: [register], + buckets: [5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 20000, 30000, 40000, 50000, 60000], // ms (milliseconds) + }); + + // enable proxy support to trust proxy-added headers for client IP detection + app.proxy = true; + + // Middleware to parse RFC 7239 Forwarded header and make it compatible with Koa's X-Forwarded-For parsing + app.use(async (ctx, next) => { + // Only process if X-Forwarded-For doesn't exist but Forwarded does + if (!ctx.request.headers['x-forwarded-for'] && ctx.request.headers['forwarded']) { + const forwardedHeader = ctx.request.headers['forwarded'] as string; + + // Parse the Forwarded header to extract the client IP + // Format: Forwarded: for="192.168.1.1";by="10.0.0.1", for="203.0.113.1";by="10.0.0.2" + const clientIp = parseForwardedHeader(forwardedHeader); + + if (clientIp) { + // Set X-Forwarded-For so Koa can parse it normally + ctx.request.headers['x-forwarded-for'] = clientIp; + } + } -// Middleware for non POST request timing -app.getKoaApp().use(async (ctx, next) => { - const start = Date.now(); - await next(); - const ms = Date.now() - start; + await next(); + }); - if (ctx.method !== 'POST') { - logger.info(`[${ctx.method}]: ${ctx.url} ${ctx.status} ${ms} ms`); - } else { - // Since ctx.state.status might contain the request ID from JsonRpcError, remove it for a cleaner log. - const contextStatus = ctx.state.status?.replace(`[Request ID: ${ctx.state.reqId}] `, '') || ctx.status; + // Set CORS + app.use(cors()); - // log call type, method, status code and latency - logger.info(`${formatRequestIdMessage(ctx.state.reqId)} [POST]: ${ctx.state.methodName} ${contextStatus} ${ms} ms`); - methodResponseHistogram.labels(ctx.state.methodName, `${ctx.status}`).observe(ms); - } -}); + // Middleware for non POST request timing + app.use(async (ctx, next) => { + const start = Date.now(); + await next(); + const ms = Date.now() - start; -// Prometheus metrics exposure -app.getKoaApp().use(async (ctx, next) => { - if (ctx.url === '/metrics') { - ctx.status = 200; - ctx.body = await register.metrics(); - } else { - return next(); - } -}); + if (ctx.method !== 'POST') { + logger.info(`[${ctx.method}]: ${ctx.url} ${ctx.status} ${ms} ms`); + } else { + // Since ctx.state.status might contain the request ID from JsonRpcError, remove it for a cleaner log. + const contextStatus = ctx.state.status?.replace(`[Request ID: ${ctx.state.reqId}] `, '') || ctx.status; + + // log call type, method, status code and latency + logger.info( + `${formatRequestIdMessage(ctx.state.reqId)} [POST]: ${ctx.state.methodName} ${contextStatus} ${ms} ms`, + ); + methodResponseHistogram.labels(ctx.state.methodName, `${ctx.status}`).observe(ms); + } + }); -// Liveness endpoint -app.getKoaApp().use(async (ctx, next) => { - if (ctx.url === '/health/liveness') { - ctx.status = 200; - } else { - return next(); - } -}); + // Prometheus metrics exposure + app.use(async (ctx, next) => { + if (ctx.url === '/metrics') { + ctx.status = 200; + ctx.body = await register.metrics(); + } else { + return next(); + } + }); -// Config endpoint -app.getKoaApp().use(async (ctx, next) => { - if (ctx.url === '/config') { - if (ConfigService.get('DISABLE_ADMIN_NAMESPACE')) { - return spec.MethodNotFound('config'); + // Liveness endpoint + app.use(async (ctx, next) => { + if (ctx.url === '/health/liveness') { + ctx.status = 200; + } else { + return next(); } - ctx.status = 200; - ctx.body = JSON.stringify(await relay.admin().config()); - } else { - return next(); - } -}); + }); -// Readiness endpoint -app.getKoaApp().use(async (ctx, next) => { - if (ctx.url === '/health/readiness') { - try { - const result = relay.eth().chainId(); - if (result.indexOf('0x12') >= 0) { - ctx.status = 200; - ctx.body = 'OK'; - } else { - ctx.body = 'DOWN'; - ctx.status = 503; // UNAVAILABLE + // Config endpoint + app.use(async (ctx, next) => { + if (ctx.url === '/config') { + if (ConfigService.get('DISABLE_ADMIN_NAMESPACE')) { + return spec.MethodNotFound('config'); } - } catch (e) { - logger.error(e); - throw e; + ctx.status = 200; + ctx.body = JSON.stringify(await relay.admin().config()); + } else { + return next(); } - } else { - return next(); - } -}); - -// OpenRPC endpoint -app.getKoaApp().use(async (ctx, next) => { - if (ctx.url === '/openrpc') { - ctx.status = 200; - ctx.body = JSON.stringify( - JSON.parse(fs.readFileSync(path.resolve(__dirname, '../../../docs/openrpc.json')).toString()), - null, - 2, - ); - } else { - return next(); - } -}); - -// Middleware to end for non POST requests asides health, metrics and openrpc -app.getKoaApp().use(async (ctx, next) => { - if (ctx.method === 'POST') { - await next(); - } else if (ctx.method === 'OPTIONS') { - // support CORS preflight - ctx.status = 200; - } else { - logger.warn(`skipping HTTP method: [${ctx.method}], url: ${ctx.url}, status: ${ctx.status}`); - } -}); - -app.getKoaApp().use((ctx, next) => { - const options = { - expose: ctx.get('Request-Id'), - header: ctx.get('Request-Id'), - query: ctx.get('query'), - }; - - for (const key in options) { - if (typeof options[key] !== 'boolean' && typeof options[key] !== 'string') { - throw new Error(`Option \`${key}\` requires a boolean or a string`); + }); + + // Readiness endpoint + app.use(async (ctx, next) => { + if (ctx.url === '/health/readiness') { + try { + const result = relay.eth().chainId(); + if (result.indexOf('0x12') >= 0) { + ctx.status = 200; + ctx.body = 'OK'; + } else { + ctx.body = 'DOWN'; + ctx.status = 503; // UNAVAILABLE + } + } catch (e) { + logger.error(e); + throw e; + } + } else { + return next(); + } + }); + + // OpenRPC endpoint + app.use(async (ctx, next) => { + if (ctx.url === '/openrpc') { + ctx.status = 200; + ctx.body = JSON.stringify( + JSON.parse(fs.readFileSync(path.resolve(__dirname, '../../../docs/openrpc.json')).toString()), + null, + 2, + ); + } else { + return next(); + } + }); + + // Middleware to end for non POST requests asides health, metrics and openrpc + app.use(async (ctx, next) => { + if (ctx.method === 'POST') { + await next(); + } else if (ctx.method === 'OPTIONS') { + // support CORS preflight + ctx.status = 200; + } else { + logger.warn(`skipping HTTP method: [${ctx.method}], url: ${ctx.url}, status: ${ctx.status}`); + } + }); + + app.use((ctx, next) => { + const options = { + expose: ctx.get('Request-Id'), + header: ctx.get('Request-Id'), + query: ctx.get('query'), + }; + + for (const key in options) { + if (typeof options[key] !== 'boolean' && typeof options[key] !== 'string') { + throw new Error(`Option \`${key}\` requires a boolean or a string`); + } } - } - const requestId = options.query || options.header || uuid(); + const requestId = options.query || options.header || uuid(); - if (options.expose) { - ctx.set(options.expose, requestId); - } + if (options.expose) { + ctx.set(options.expose, requestId); + } - ctx.state.reqId = requestId; + ctx.state.reqId = requestId; - return context.run({ requestId }, next); -}); + return context.run({ requestId }, next); + }); -const rpcApp = app.rpcApp(); + const rpcApp = koaJsonRpc.rpcApp(); -app.getKoaApp().use(async (ctx) => { - await rpcApp(ctx); -}); + app.use(async (ctx) => { + await rpcApp(ctx); + }); -process.on('unhandledRejection', (reason, p) => { - logger.error(`Unhandled Rejection at: Promise: ${JSON.stringify(p)}, reason: ${reason}`); -}); + process.on('unhandledRejection', (reason, p) => { + logger.error(`Unhandled Rejection at: Promise: ${JSON.stringify(p)}, reason: ${reason}`); + }); -process.on('uncaughtException', (err) => { - logger.error(err, 'Uncaught Exception!'); -}); + process.on('uncaughtException', (err) => { + logger.error(err, 'Uncaught Exception!'); + }); -export default app.getKoaApp(); + return { app }; +} diff --git a/packages/server/tests/acceptance/index.spec.ts b/packages/server/tests/acceptance/index.spec.ts index cf2dac3c55..d4120f552a 100644 --- a/packages/server/tests/acceptance/index.spec.ts +++ b/packages/server/tests/acceptance/index.spec.ts @@ -9,7 +9,7 @@ dotenv.config({ path: path.resolve(__dirname, '../../../../.env') }); import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services'; // Constants import constants from '@hashgraph/json-rpc-relay/dist/lib/constants'; -import { app as wsApp } from '@hashgraph/json-rpc-ws-server/dist/webSocketServer'; +import { initializeWsServer } from '@hashgraph/json-rpc-ws-server/dist/webSocketServer'; // Hashgraph SDK import { AccountId, Hbar } from '@hashgraph/sdk'; import chai from 'chai'; @@ -20,9 +20,9 @@ import { Server } from 'http'; import pino from 'pino'; import { GCProfiler } from 'v8'; +import { setServerTimeout } from '../../dist/koaJsonRpc/lib/utils'; // Server related -import app from '../../dist/server'; -import { setServerTimeout } from '../../src/koaJsonRpc/lib/utils'; +import { initializeServer } from '../../dist/server'; import MetricsClient from '../clients/metricsClient'; import MirrorClient from '../clients/mirrorClient'; import RelayClient from '../clients/relayClient'; @@ -72,7 +72,7 @@ describe('RPC Server Acceptance Tests', function () { stopRelay(); await new Promise((r) => setTimeout(r, 5000)); // wait for server to shutdown - runLocalRelay(); + await runLocalRelay(); } }; @@ -94,7 +94,7 @@ describe('RPC Server Acceptance Tests', function () { logger.info(`E2E_RELAY_HOST: ${ConfigService.get('E2E_RELAY_HOST')}`); if (global.relayIsLocal) { - runLocalRelay(); + await runLocalRelay(); } // cache start balance @@ -158,8 +158,9 @@ describe('RPC Server Acceptance Tests', function () { }); }); - function loadTest(testFile) { + function loadTest(testFile: string): void { if (testFile !== 'index.spec.ts' && testFile.endsWith('.spec.ts')) { + // eslint-disable-next-line @typescript-eslint/no-require-imports require(`./${testFile}`); } } @@ -178,17 +179,19 @@ describe('RPC Server Acceptance Tests', function () { } } - function runLocalRelay() { + async function runLocalRelay() { // start local relay, relay instance in local should not be running logger.info(`Start relay on port ${constants.RELAY_PORT}`); logger.info(`Start relay on host ${constants.RELAY_HOST}`); + const { app } = await initializeServer(); const relayServer = app.listen({ port: constants.RELAY_PORT }); global.relayServer = relayServer; setServerTimeout(relayServer); if (ConfigService.get('TEST_WS_SERVER')) { logger.info(`Start ws-server on port ${constants.WEB_SOCKET_PORT}`); + const { app: wsApp } = await initializeWsServer(); global.socketServer = wsApp.listen({ port: constants.WEB_SOCKET_PORT }); } } diff --git a/packages/server/tests/integration/proxyHeaders.spec.ts b/packages/server/tests/integration/proxyHeaders.spec.ts index a6966047f1..da90a7d85e 100644 --- a/packages/server/tests/integration/proxyHeaders.spec.ts +++ b/packages/server/tests/integration/proxyHeaders.spec.ts @@ -4,18 +4,21 @@ import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services' import Axios, { AxiosInstance, AxiosResponse } from 'axios'; import { expect } from 'chai'; import { Server } from 'http'; -import Koa from 'koa'; import { pino } from 'pino'; import { ConfigServiceTestHelper } from '../../../config-service/tests/configServiceTestHelper'; ConfigServiceTestHelper.appendEnvsFromPath(__dirname + '/test.env'); +import { Relay } from '@hashgraph/json-rpc-relay'; +import sinon from 'sinon'; + import { overrideEnvsInMochaDescribe, useInMemoryRedisServer, withOverriddenEnvsInMochaTest, } from '../../../relay/tests/helpers'; +import { initializeServer, register } from '../../dist/server'; import RelayCalls from '../../tests/helpers/constants'; describe('Proxy Headers Integration Tests', function () { @@ -32,7 +35,6 @@ describe('Proxy Headers Integration Tests', function () { let testServer: Server; let testClient: AxiosInstance; - let app: Koa; // Simple static test IPs - each test uses different IP ranges to avoid conflicts const TEST_IP_A = '192.168.1.100'; @@ -48,18 +50,22 @@ describe('Proxy Headers Integration Tests', function () { const TEST_IPV6 = '2001:db8::1'; const TEST_METHOD = RelayCalls.ETH_ENDPOINTS.ETH_CHAIN_ID; - before(function () { - app = require('../../src/server').default; + before(async function () { + sinon.stub(Relay.prototype, 'ensureOperatorHasBalance').resolves(); + const { app } = await initializeServer(); testServer = app.listen(ConfigService.get('E2E_SERVER_PORT')); testClient = createTestClient(); }); after(function () { + sinon.restore(); testServer.close((err) => { if (err) { console.error(err); } }); + // Clear the Prometheus registry to avoid conflicts with other test files + register.clear(); }); this.timeout(10000); diff --git a/packages/server/tests/integration/server.spec.ts b/packages/server/tests/integration/server.spec.ts index 3bae91f8b9..5c1009c554 100644 --- a/packages/server/tests/integration/server.spec.ts +++ b/packages/server/tests/integration/server.spec.ts @@ -29,6 +29,7 @@ import { overrideEnvsInMochaDescribe, withOverriddenEnvsInMochaTest, } from '../../../relay/tests/helpers'; +import { initializeServer } from '../../dist/server'; import RelayCalls from '../../tests/helpers/constants'; import Assertions, { requestIdRegex } from '../helpers/assertions'; import { Utils } from '../helpers/utils'; @@ -44,9 +45,10 @@ describe('RPC Server', function () { overrideEnvsInMochaDescribe({ RATE_LIMIT_DISABLED: true, + READ_ONLY: true, }); - before(function () { + before(async function () { // Stub getAllMasked to avoid maskUpEnv errors for unknown envs getAllMaskedStub = sinon.stub(ConfigService, 'getAllMasked').returns({ BATCH_REQUESTS_MAX_SIZE: '100', @@ -62,7 +64,7 @@ describe('RPC Server', function () { // Clear the module cache to ensure a fresh server instance delete require.cache[require.resolve('../../src/server')]; - app = require('../../src/server').default; + app = (await initializeServer()).app; testServer = app.listen(ConfigService.get('E2E_SERVER_PORT')); testClient = BaseTest.createTestClient(); @@ -161,7 +163,6 @@ describe('RPC Server', function () { const CUSTOMIZE_PORT = '7545'; const CUSTOMIZE_HOST = '127.0.0.1'; const configuredServer = app.listen({ port: CUSTOMIZE_PORT, host: CUSTOMIZE_HOST }); - return new Promise((resolve, reject) => { configuredServer.on('listening', () => { const address = configuredServer.address(); @@ -179,7 +180,6 @@ describe('RPC Server', function () { configuredServer.close(() => reject(error)); } }); - configuredServer.on('error', (error) => { reject(error); }); @@ -232,7 +232,7 @@ describe('RPC Server', function () { withOverriddenEnvsInMochaTest({ REQUEST_ID_IS_OPTIONAL: true }, async function () { xit('supports optionality of request id when configured', async function () { - const app2 = require('../../src/server').default; + const { app: app2 } = await initializeServer(); const port = `1${ConfigService.get('E2E_SERVER_PORT')}`; const testServer2 = app2.listen(port); @@ -2572,19 +2572,19 @@ describe('RPC Server', function () { }); it('should execute with CallTracer type and valid CallTracerConfig', async () => { - expect( - await testClient.post('/', { + await expect( + testClient.post('/', { jsonrpc: '2.0', method: 'debug_traceTransaction', params: [contractHash1, { tracer: TracerType.CallTracer, tracerConfig: { onlyTopCall: true } }], id: 1, }), - ).to.not.throw; + ).to.be.fulfilled.and.eventually.have.property('status', 200); }); it('should execute with OpcodeLogger type and valid OpcodeLoggerConfig', async () => { - expect( - await testClient.post('/', { + await expect( + testClient.post('/', { jsonrpc: '2.0', method: 'debug_traceTransaction', params: [ @@ -2596,73 +2596,73 @@ describe('RPC Server', function () { ], id: 1, }), - ).to.not.throw; + ).to.be.fulfilled.and.eventually.have.property('status', 200); }); it('should execute with PrestateTracer type and valid PrestateTracerConfig', async () => { - expect( - await testClient.post('/', { + await expect( + testClient.post('/', { jsonrpc: '2.0', method: 'debug_traceTransaction', params: [contractHash1, { tracer: TracerType.PrestateTracer }], id: 1, }), - ).to.not.throw; + ).to.be.fulfilled.and.eventually.have.property('status', 200); }); it('should execute with PrestateTracer type and onlyTopCall option', async () => { - expect( - await testClient.post('/', { + await expect( + testClient.post('/', { jsonrpc: '2.0', method: 'debug_traceTransaction', params: [contractHash1, { tracer: TracerType.PrestateTracer, tracerConfig: { onlyTopCall: true } }], id: 1, }), - ).to.not.throw; + ).to.be.fulfilled.and.eventually.have.property('status', 200); }); it('should execute with valid hash', async () => { - expect( - await testClient.post('/', { + await expect( + testClient.post('/', { jsonrpc: '2.0', method: 'debug_traceTransaction', params: [contractHash1], id: '2', }), - ).to.not.throw; + ).to.be.fulfilled.and.eventually.have.property('status', 200); }); it('should execute with valid hash and valid TracerType string', async () => { - expect( - await testClient.post('/', { + await expect( + testClient.post('/', { jsonrpc: '2.0', method: 'debug_traceTransaction', params: [contractHash1, { tracer: TracerType.CallTracer }], id: '2', }), - ).to.not.throw; + ).to.be.fulfilled.and.eventually.have.property('status', 200); }); it('should execute with valid hash, valid TracerType and empty TracerConfig', async () => { - expect( - await testClient.post('/', { + await expect( + testClient.post('/', { jsonrpc: '2.0', method: 'debug_traceTransaction', params: [contractHash1, { tracer: TracerType.CallTracer, tracerConfig: {} }], id: '2', }), - ).to.not.throw; + ).to.be.fulfilled.and.eventually.have.property('status', 200); }); it('should execute with valid hash, no TracerType and no TracerConfig', async () => { - expect( - await testClient.post('/', { + await expect( + testClient.post('/', { jsonrpc: '2.0', method: 'debug_traceTransaction', params: [contractHash1], id: '2', }), - ).to.not.throw; + ).to.be.fulfilled.and.eventually.have.property('status', 200); }); it('should fail with missing transaction hash', async () => { diff --git a/packages/ws-server/src/index.ts b/packages/ws-server/src/index.ts index 6f7120c130..2d3bcd6151 100644 --- a/packages/ws-server/src/index.ts +++ b/packages/ws-server/src/index.ts @@ -3,19 +3,20 @@ import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services'; import constants from '@hashgraph/json-rpc-relay/dist/lib/constants'; -import { app, httpApp, logger, relay } from './webSocketServer'; +import { initializeWsServer, logger } from './webSocketServer'; async function main() { try { - await relay.ensureOperatorHasBalance(); + // Initialize WebSocket server with the fully initialized Relay + const { app, httpApp } = await initializeWsServer(); + + const host = ConfigService.get('SERVER_HOST'); + app.listen({ port: constants.WEB_SOCKET_PORT, host }); + httpApp.listen({ port: constants.WEB_SOCKET_HTTP_PORT, host }); } catch (error) { logger.fatal(error); process.exit(1); } - - const host = ConfigService.get('SERVER_HOST'); - app.listen({ port: constants.WEB_SOCKET_PORT, host }); - httpApp.listen({ port: constants.WEB_SOCKET_HTTP_PORT, host }); } main(); diff --git a/packages/ws-server/src/webSocketServer.ts b/packages/ws-server/src/webSocketServer.ts index 90fe35ff7c..fa82a2399d 100644 --- a/packages/ws-server/src/webSocketServer.ts +++ b/packages/ws-server/src/webSocketServer.ts @@ -52,227 +52,233 @@ const mainLogger = pino({ }, }); -const register = new Registry(); -const logger = mainLogger.child({ name: 'rpc-ws-server' }); -const relay = new Relay(logger, register); +export const logger = mainLogger.child({ name: 'rpc-ws-server' }); +export async function initializeWsServer() { + const register = new Registry(); + const relay = await Relay.init(logger, register); -const subscriptionService = new SubscriptionService(relay, logger, register); + const subscriptionService = new SubscriptionService(relay, logger, register); -const mirrorNodeClient = relay.mirrorClient(); + const mirrorNodeClient = relay.mirrorClient(); -const rateLimitDuration = ConfigService.get('LIMIT_DURATION'); -const rateLimiter = new IPRateLimiterService(logger.child({ name: 'ip-rate-limit' }), register, rateLimitDuration); -const limiter = new ConnectionLimiter(logger, register, rateLimiter); -const wsMetricRegistry = new WsMetricRegistry(register); + const rateLimitDuration = ConfigService.get('LIMIT_DURATION'); + const rateLimiter = new IPRateLimiterService(logger.child({ name: 'ip-rate-limit' }), register, rateLimitDuration); + const limiter = new ConnectionLimiter(logger, register, rateLimiter); + const wsMetricRegistry = new WsMetricRegistry(register); -const pingInterval = ConfigService.get('WS_PING_INTERVAL'); + const pingInterval = ConfigService.get('WS_PING_INTERVAL'); -const app = websockify(new Koa()); + const app = websockify(new Koa()); -app.ws.use((ctx: Koa.Context, next: Koa.Next) => { - const connectionId = subscriptionService.generateId(); - ctx.websocket.id = connectionId; - next(); -}); - -app.ws.use(async (ctx: Koa.Context) => { - // Increment the total opened connections - wsMetricRegistry.getCounter('totalOpenedConnections').inc(); - - // Record the start time when the connection is established - const startTime = process.hrtime(); - ctx.websocket.limiter = limiter; - ctx.websocket.wsMetricRegistry = wsMetricRegistry; - - logger.info( - // @ts-ignore - `New connection established. Current active connections: ${ctx.app.server._connections}`, - ); - - // Close event handle - // https://nodejs.org/api/async_context.html#static-method-asyncresourcebindfn-type-thisarg - // https://nodejs.org/api/async_context.html#troubleshooting-context-loss - ctx.websocket.on( - 'close', - AsyncResource.bind(async (code, message) => { - logger.info(`Closing connection ${ctx.websocket.id} | code: ${code}, message: ${message}`); - await handleConnectionClose(ctx, subscriptionService, limiter, wsMetricRegistry, startTime); - }), - ); - - // Increment limit counters - limiter.incrementCounters(ctx); - - // Limit checks - limiter.applyLimits(ctx); - - // listen on message event - ctx.websocket.on('message', async (msg) => { - const requestId = uuid(); - ctx.websocket.requestId = requestId; - - const requestDetails = new RequestDetails({ - requestId, - ipAddress: ctx.request.ip, - connectionId: ctx.websocket.id, - }); - - context.run({ requestId, connectionId: requestDetails.connectionId! }, async () => { - // Increment the total messages counter for each message received - wsMetricRegistry.getCounter('totalMessageCounter').inc(); - - // Record the start time when a new message is received - const msgStartTime = process.hrtime(); - - // Reset the TTL timer for inactivity upon receiving a message from the client - limiter.resetInactivityTTLTimer(ctx.websocket); - // parse the received message from the client into a JSON object - let request: IJsonRpcRequest | IJsonRpcRequest[]; - try { - request = JSON.parse(msg.toString('ascii')); - } catch (e) { - // Log an error if the message cannot be decoded and send an invalid request error to the client - logger.warn(`Could not decode message from connection, message: ${msg}, error: ${e}`); - ctx.websocket.send(JSON.stringify(jsonRespError(null, predefined.INVALID_REQUEST, requestDetails.requestId))); - return; - } - - // check if request is a batch request (array) or a signle request (JSON) - if (Array.isArray(request)) { - if (logger.isLevelEnabled('trace')) { - logger.trace(`Receive batch request=${JSON.stringify(request)}`); - } + app.ws.use((ctx: Koa.Context, next: Koa.Next) => { + const connectionId = subscriptionService.generateId(); + ctx.websocket.id = connectionId; + next(); + }); - // Increment metrics for batch_requests - wsMetricRegistry.getCounter('methodsCounter').labels(WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME).inc(); - wsMetricRegistry - .getCounter('methodsCounterByIp') - .labels(ctx.request.ip, WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME) - .inc(); - - // send error if batch request feature is not enabled - if (!getWsBatchRequestsEnabled()) { - const batchRequestDisabledError = predefined.WS_BATCH_REQUESTS_DISABLED; - logger.warn(`${JSON.stringify(batchRequestDisabledError)}`); - ctx.websocket.send( - JSON.stringify([jsonRespError(null, batchRequestDisabledError, requestDetails.requestId)]), - ); + app.ws.use(async (ctx: Koa.Context) => { + // Increment the total opened connections + wsMetricRegistry.getCounter('totalOpenedConnections').inc(); + + // Record the start time when the connection is established + const startTime = process.hrtime(); + ctx.websocket.limiter = limiter; + ctx.websocket.wsMetricRegistry = wsMetricRegistry; + + logger.info( + // @ts-ignore + `New connection established. Current active connections: ${ctx.app.server._connections}`, + ); + + // Close event handle + // https://nodejs.org/api/async_context.html#static-method-asyncresourcebindfn-type-thisarg + // https://nodejs.org/api/async_context.html#troubleshooting-context-loss + ctx.websocket.on( + 'close', + AsyncResource.bind(async (code, message) => { + logger.info(`Closing connection ${ctx.websocket.id} | code: ${code}, message: ${message}`); + await handleConnectionClose(ctx, subscriptionService, limiter, wsMetricRegistry, startTime); + }), + ); + + // Increment limit counters + limiter.incrementCounters(ctx); + + // Limit checks + limiter.applyLimits(ctx); + + // listen on message event + ctx.websocket.on('message', async (msg) => { + const requestId = uuid(); + ctx.websocket.requestId = requestId; + + const requestDetails = new RequestDetails({ + requestId, + ipAddress: ctx.request.ip, + connectionId: ctx.websocket.id, + }); + + context.run({ requestId, connectionId: requestDetails.connectionId! }, async () => { + // Increment the total messages counter for each message received + wsMetricRegistry.getCounter('totalMessageCounter').inc(); + + // Record the start time when a new message is received + const msgStartTime = process.hrtime(); + + // Reset the TTL timer for inactivity upon receiving a message from the client + limiter.resetInactivityTTLTimer(ctx.websocket); + // parse the received message from the client into a JSON object + let request: IJsonRpcRequest | IJsonRpcRequest[]; + try { + request = JSON.parse(msg.toString('ascii')); + } catch (e) { + // Log an error if the message cannot be decoded and send an invalid request error to the client + logger.warn(`Could not decode message from connection, message: ${msg}, error: ${e}`); + ctx.websocket.send(JSON.stringify(jsonRespError(null, predefined.INVALID_REQUEST, requestDetails.requestId))); return; } - // send error if batch request exceed max batch size - if (request.length > getBatchRequestsMaxSize()) { - const batchRequestAmountMaxExceed = predefined.BATCH_REQUESTS_AMOUNT_MAX_EXCEEDED( - request.length, - getBatchRequestsMaxSize(), - ); - logger.warn(`${JSON.stringify(batchRequestAmountMaxExceed)}`); - ctx.websocket.send( - JSON.stringify([jsonRespError(null, batchRequestAmountMaxExceed, requestDetails.requestId)]), - ); - return; - } + // check if request is a batch request (array) or a signle request (JSON) + if (Array.isArray(request)) { + if (logger.isLevelEnabled('trace')) { + logger.trace(`Receive batch request=${JSON.stringify(request)}`); + } + + // Increment metrics for batch_requests + wsMetricRegistry.getCounter('methodsCounter').labels(WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME).inc(); + wsMetricRegistry + .getCounter('methodsCounterByIp') + .labels(ctx.request.ip, WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME) + .inc(); + + // send error if batch request feature is not enabled + if (!getWsBatchRequestsEnabled()) { + const batchRequestDisabledError = predefined.WS_BATCH_REQUESTS_DISABLED; + logger.warn(`${JSON.stringify(batchRequestDisabledError)}`); + ctx.websocket.send( + JSON.stringify([jsonRespError(null, batchRequestDisabledError, requestDetails.requestId)]), + ); + return; + } - // process requests - const requestPromises = request.map((item: any) => { - if (ConfigService.get('BATCH_REQUESTS_DISALLOWED_METHODS').includes(item.method)) { - return jsonRespError(item.id, spec.BatchRequestsMethodNotPermitted(item.method), requestDetails.requestId); + // send error if batch request exceed max batch size + if (request.length > getBatchRequestsMaxSize()) { + const batchRequestAmountMaxExceed = predefined.BATCH_REQUESTS_AMOUNT_MAX_EXCEEDED( + request.length, + getBatchRequestsMaxSize(), + ); + logger.warn(`${JSON.stringify(batchRequestAmountMaxExceed)}`); + ctx.websocket.send( + JSON.stringify([jsonRespError(null, batchRequestAmountMaxExceed, requestDetails.requestId)]), + ); + return; } - return getRequestResult( + + // process requests + const requestPromises = request.map((item: any) => { + if (ConfigService.get('BATCH_REQUESTS_DISALLOWED_METHODS').includes(item.method)) { + return jsonRespError( + item.id, + spec.BatchRequestsMethodNotPermitted(item.method), + requestDetails.requestId, + ); + } + return getRequestResult( + ctx, + relay, + logger, + item, + limiter, + mirrorNodeClient, + wsMetricRegistry, + requestDetails, + subscriptionService, + ); + }); + + // resolve all promises + const responses = await Promise.all(requestPromises); + + // send to client + sendToClient(ctx.websocket, request, responses, logger); + } else { + if (logger.isLevelEnabled('trace')) { + logger.trace(`Receive single request=${JSON.stringify(request)}`); + } + + // process requests + const response = await getRequestResult( ctx, relay, logger, - item, + request, limiter, mirrorNodeClient, wsMetricRegistry, requestDetails, subscriptionService, ); - }); - // resolve all promises - const responses = await Promise.all(requestPromises); - - // send to client - sendToClient(ctx.websocket, request, responses, logger); - } else { - if (logger.isLevelEnabled('trace')) { - logger.trace(`Receive single request=${JSON.stringify(request)}`); + // send to client + sendToClient(ctx.websocket, request, response, logger); } - // process requests - const response = await getRequestResult( - ctx, - relay, - logger, - request, - limiter, - mirrorNodeClient, - wsMetricRegistry, - requestDetails, - subscriptionService, - ); - - // send to client - sendToClient(ctx.websocket, request, response, logger); - } - - // Calculate the duration of the connection - const msgEndTime = process.hrtime(msgStartTime); - const msgDurationInMiliSeconds = (msgEndTime[0] + msgEndTime[1] / 1e9) * 1000; // Convert duration to miliseconds + // Calculate the duration of the connection + const msgEndTime = process.hrtime(msgStartTime); + const msgDurationInMiliSeconds = (msgEndTime[0] + msgEndTime[1] / 1e9) * 1000; // Convert duration to miliseconds - // Update the connection duration histogram with the calculated duration - const methodLabel = Array.isArray(request) ? WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME : request.method; - wsMetricRegistry.getHistogram('messageDuration').labels(methodLabel).observe(msgDurationInMiliSeconds); + // Update the connection duration histogram with the calculated duration + const methodLabel = Array.isArray(request) ? WS_CONSTANTS.BATCH_REQUEST_METHOD_NAME : request.method; + wsMetricRegistry.getHistogram('messageDuration').labels(methodLabel).observe(msgDurationInMiliSeconds); + }); }); - }); - if (pingInterval > 0) { - setInterval(async () => { - ctx.websocket.send(JSON.stringify(jsonRespResult(null, null))); - }, pingInterval); - } -}); + if (pingInterval > 0) { + setInterval(async () => { + ctx.websocket.send(JSON.stringify(jsonRespResult(null, null))); + }, pingInterval); + } + }); -const koaJsonRpc = new KoaJsonRpc(logger, register, relay); -const httpApp = koaJsonRpc.getKoaApp(); -collectDefaultMetrics({ register, prefix: 'rpc_relay_' }); - -httpApp.use(async (ctx: Koa.Context, next: Koa.Next) => { - // prometheus metrics exposure - if (ctx.url === '/metrics') { - ctx.status = 200; - ctx.body = await register.metrics(); - } else if (ctx.url === '/health/liveness') { - //liveness endpoint - ctx.status = 200; - } else if (ctx.url === '/health/readiness') { - // readiness endpoint - try { - const result = relay.eth().chainId(); - if (result.includes('0x12')) { - ctx.status = 200; - ctx.body = 'OK'; - } else { - ctx.body = 'DOWN'; - ctx.status = 503; // UNAVAILABLE + const koaJsonRpc = new KoaJsonRpc(logger, register, relay); + const httpApp = koaJsonRpc.getKoaApp(); + collectDefaultMetrics({ register, prefix: 'rpc_relay_' }); + + httpApp.use(async (ctx: Koa.Context, next: Koa.Next) => { + // prometheus metrics exposure + if (ctx.url === '/metrics') { + ctx.status = 200; + ctx.body = await register.metrics(); + } else if (ctx.url === '/health/liveness') { + //liveness endpoint + ctx.status = 200; + } else if (ctx.url === '/health/readiness') { + // readiness endpoint + try { + const result = relay.eth().chainId(); + if (result.includes('0x12')) { + ctx.status = 200; + ctx.body = 'OK'; + } else { + ctx.body = 'DOWN'; + ctx.status = 503; // UNAVAILABLE + } + } catch (e) { + logger.error(e); + throw e; } - } catch (e) { - logger.error(e); - throw e; + } else { + return await next(); } - } else { - return await next(); - } -}); + }); -process.on('unhandledRejection', (reason, p) => { - logger.error(`Unhandled Rejection at: Promise: ${JSON.stringify(p)}, reason: ${reason}`); -}); + process.on('unhandledRejection', (reason, p) => { + logger.error(`Unhandled Rejection at: Promise: ${JSON.stringify(p)}, reason: ${reason}`); + }); -process.on('uncaughtException', (err) => { - logger.error(err, 'Uncaught Exception!'); -}); + process.on('uncaughtException', (err) => { + logger.error(err, 'Uncaught Exception!'); + }); -export { app, httpApp, relay, logger }; + return { app, httpApp }; +} diff --git a/packages/ws-server/tests/acceptance/index.spec.ts b/packages/ws-server/tests/acceptance/index.spec.ts index 81aa11b4b4..2dc487be7c 100644 --- a/packages/ws-server/tests/acceptance/index.spec.ts +++ b/packages/ws-server/tests/acceptance/index.spec.ts @@ -10,12 +10,12 @@ import { Server } from 'node:http'; import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services'; import constants from '@hashgraph/json-rpc-relay/dist/lib/constants'; import { setServerTimeout } from '@hashgraph/json-rpc-server/dist/koaJsonRpc/lib/utils'; -import app from '@hashgraph/json-rpc-server/dist/server'; +import { initializeServer } from '@hashgraph/json-rpc-server/dist/server'; import MirrorClient from '@hashgraph/json-rpc-server/tests/clients/mirrorClient'; import RelayClient from '@hashgraph/json-rpc-server/tests/clients/relayClient'; import ServicesClient from '@hashgraph/json-rpc-server/tests/clients/servicesClient'; import { AliasAccount } from '@hashgraph/json-rpc-server/tests/types/AliasAccount'; -import { app as wsApp } from '@hashgraph/json-rpc-ws-server/dist/webSocketServer'; +import { initializeWsServer } from '@hashgraph/json-rpc-ws-server/dist/webSocketServer'; import { AccountId, Hbar } from '@hashgraph/sdk'; import chai from 'chai'; import chaiAsPromised from 'chai-as-promised'; @@ -67,7 +67,7 @@ describe('RPC Server Acceptance Tests', function () { logger.info(`E2E_RELAY_HOST: ${ConfigService.get('E2E_RELAY_HOST')}`); if (global.relayIsLocal) { - runLocalRelay(); + await runLocalRelay(); } // cache start balance @@ -139,20 +139,23 @@ describe('RPC Server Acceptance Tests', function () { function loadTest(testFile) { if (testFile !== 'index.spec.ts' && testFile.endsWith('.spec.ts')) { + //eslint-disable-next-line @typescript-eslint/no-require-imports require(`./${testFile}`); } } - function runLocalRelay() { + async function runLocalRelay() { // start local relay, relay instance in local should not be running logger.info(`Start relay on port ${constants.RELAY_PORT}`); + const { app } = await initializeServer(); const relayServer = app.listen({ port: constants.RELAY_PORT }); global.relayServer = relayServer; setServerTimeout(relayServer); if (ConfigService.get('TEST_WS_SERVER')) { logger.info(`Start ws-server on port ${constants.WEB_SOCKET_PORT}`); + const { app: wsApp } = await initializeWsServer(); global.socketServer = wsApp.listen({ port: constants.WEB_SOCKET_PORT }); } } diff --git a/packages/ws-server/tests/unit/connectionLimiter.spec.ts b/packages/ws-server/tests/unit/connectionLimiter.spec.ts index 20d067c6a9..d6d86cb206 100644 --- a/packages/ws-server/tests/unit/connectionLimiter.spec.ts +++ b/packages/ws-server/tests/unit/connectionLimiter.spec.ts @@ -62,6 +62,10 @@ describe('Connection Limiter', function () { configServiceStub.withArgs('WS_SUBSCRIPTION_LIMIT').returns(10); configServiceStub.withArgs('LIMIT_DURATION').returns(60000); configServiceStub.withArgs('IP_RATE_LIMIT_STORE').returns('LRU'); + // methodConfiguration now uses lazy evaluation, so we need to stub the tier rate limits + configServiceStub.withArgs('TIER_1_RATE_LIMIT').returns(100); + configServiceStub.withArgs('TIER_2_RATE_LIMIT').returns(800); + configServiceStub.withArgs('TIER_3_RATE_LIMIT').returns(1600); const rateLimiter = new IPRateLimiterService(mockLogger, mockRegistry, 9000); diff --git a/packages/ws-server/tests/unit/index.spec.ts b/packages/ws-server/tests/unit/index.spec.ts index 8f32cdca8d..7107db4e7d 100644 --- a/packages/ws-server/tests/unit/index.spec.ts +++ b/packages/ws-server/tests/unit/index.spec.ts @@ -1,28 +1,79 @@ // SPDX-License-Identifier: Apache-2.0 +import constants from '@hashgraph/json-rpc-relay/dist/lib/constants'; import { expect } from 'chai'; import sinon from 'sinon'; import { ConfigService } from '../../../config-service/dist/services'; import * as webSocketServer from '../../dist/webSocketServer'; -describe('main', () => { +describe('WebSocket Server Main', () => { + let initializeWsServerStub: sinon.SinonStub; + let mockApp: any; + let mockHttpApp: any; + let loggerFatalStub: sinon.SinonStub; + let configGetStub: sinon.SinonStub; + let processExitStub: sinon.SinonStub; + + beforeEach(() => { + // Create mock objects with listen methods + mockApp = { listen: sinon.stub() }; + mockHttpApp = { listen: sinon.stub() }; + + // Stub the functions we need + configGetStub = sinon.stub(ConfigService, 'get').returns('127.0.0.1'); + processExitStub = sinon.stub(process, 'exit'); + + // Stub the logger.fatal method from webSocketServer + loggerFatalStub = sinon.stub(webSocketServer.logger, 'fatal'); + + // Stub initializeWsServer to return our mocks + initializeWsServerStub = sinon.stub(webSocketServer, 'initializeWsServer').resolves({ + app: mockApp, + httpApp: mockHttpApp, + }); + }); + afterEach(() => { sinon.restore(); + // Clear the module cache to allow fresh imports + delete require.cache[require.resolve('../../dist/index.js')]; }); - it('should start server if ensureOperatorHasBalance succeeds', async () => { - const appListenStub = sinon.stub(webSocketServer.app, 'listen'); - const configGetStub = sinon.stub(ConfigService, 'get').returns('127.0.0.1'); - const ensureBalanceStub = sinon.stub(webSocketServer.relay, 'ensureOperatorHasBalance').resolves(); - const httpAppListenStub = sinon.stub(webSocketServer.httpApp, 'listen'); - const loggerFatalStub = sinon.stub(webSocketServer.logger, 'fatal'); + it('should initialize and start both WebSocket and HTTP servers successfully', async () => { + // Import and execute the main module + await import('../../dist/index.js'); + + await new Promise((resolve) => setInterval(resolve, 100)); + + expect(initializeWsServerStub.calledOnce).to.equal(true); + expect(configGetStub.calledWith('SERVER_HOST')).to.equal(true); + + expect(mockApp.listen.calledOnce).to.equal(true); + expect(mockApp.listen.calledWith({ port: constants.WEB_SOCKET_PORT, host: '127.0.0.1' })).to.equal(true); + + expect(mockHttpApp.listen.calledOnce).to.equal(true); + expect(mockHttpApp.listen.calledWith({ port: constants.WEB_SOCKET_HTTP_PORT, host: '127.0.0.1' })).to.equal(true); + + expect(loggerFatalStub.calledOnce).to.equal(false); + expect(processExitStub.called).to.equal(false); + }); + + it('should handle initialization errors and exit gracefully', async () => { + // Make initializeWsServer throw an error + const testError = new Error('Initialization failed'); + initializeWsServerStub.rejects(testError); await import('../../dist/index.js'); - expect(appListenStub.calledOnce).to.be.true; - expect(configGetStub.calledWith('SERVER_HOST')).to.be.true; - expect(ensureBalanceStub.calledOnce).to.be.true; - expect(httpAppListenStub.calledOnce).to.be.true; - expect(loggerFatalStub.notCalled).to.be.true; + await new Promise((resolve) => setInterval(resolve, 100)); + + expect(loggerFatalStub.calledOnce).to.equal(true); + expect(loggerFatalStub.calledWith(testError)).to.equal(true); + + expect(processExitStub.calledOnce).to.equal(true); + expect(processExitStub.calledWith(1)).to.equal(true); + + expect(mockApp.listen.called).to.equal(false); + expect(mockHttpApp.listen.called).to.equal(false); }); }); diff --git a/packages/ws-server/tests/unit/webSocketServer.spec.ts b/packages/ws-server/tests/unit/webSocketServer.spec.ts index 3a915fc927..2dcb25ec9e 100644 --- a/packages/ws-server/tests/unit/webSocketServer.spec.ts +++ b/packages/ws-server/tests/unit/webSocketServer.spec.ts @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services'; +import { Relay } from '@hashgraph/json-rpc-relay'; import { expect } from 'chai'; import http from 'http'; import sinon from 'sinon'; @@ -9,12 +10,12 @@ import WebSocket from 'ws'; import * as jsonRpcController from '../../dist/controllers/jsonRpcController'; import wsMetricRegistry from '../../dist/metrics/wsMetricRegistry'; import * as utils from '../../dist/utils/utils'; -import { app, httpApp, logger, relay } from '../../dist/webSocketServer'; +import * as webSocketServer from '../../dist/webSocketServer'; async function httpGet(server: http.Server, path: string): Promise<{ status: number; text: string }> { return new Promise((resolve, reject) => { http - .get(`http://127.0.0.1:${server.address().port}${path}`, (res) => { + .get(`http://127.0.0.1:${server.address()?.port}${path}`, (res) => { let data = ''; res.setEncoding('utf8'); res.on('data', (chunk) => (data += chunk)); @@ -24,42 +25,40 @@ async function httpGet(server: http.Server, path: string): Promise<{ status: num }); } -function wsUrl(server): string { - return `ws://127.0.0.1:${server.address().port}`; -} - -function createMirrorStubServer(): http.Server { - return http.createServer((req: any, res: any) => { - res.statusCode = 200; - res.setHeader('Content-Type', 'application/json'); - const url = req.url || ''; - if (url.startsWith('/api/v1/blocks') || url.startsWith('/blocks')) { - res.end(JSON.stringify({ blocks: [{ number: 5644, timestamp: { to: '0.0.5644' } }] })); - } else if (url.startsWith('/api/v1/accounts') || url.startsWith('/accounts')) { - res.end(JSON.stringify({ balance: { balance: 1 }, account: '0.0.2', transactions: [], links: {} })); - } else { - res.end(JSON.stringify({})); - } - }); +function wsUrl(server: http.Server): string { + const address = server.address(); + if (!address || typeof address === 'string') { + throw new Error('Invalid server address'); + } + return `ws://127.0.0.1:${address.port}`; } describe('webSocketServer http endpoints', () => { let server: http.Server; - let mirrorStubServer: http.Server; - - beforeEach((done) => { - mirrorStubServer = createMirrorStubServer(); - mirrorStubServer.listen(5551, '127.0.0.1', () => { - server = http.createServer(httpApp.callback()); - server.listen(0, '127.0.0.1', done); + let httpApp: any; + let mockRelay: any; + + beforeEach(async function () { + // Create a mock relay object + mockRelay = { + eth: sinon.stub().returns({ chainId: () => '0x12a' }), + mirrorClient: sinon.stub(), + }; + sinon.stub(Relay, 'init').resolves(mockRelay as any); + + const wsServer = await webSocketServer.initializeWsServer(); + httpApp = wsServer.httpApp; + + // Create HTTP server from the Koa app + server = http.createServer(httpApp.callback()); + await new Promise((resolve) => { + server.listen(0, '127.0.0.1', () => resolve()); }); }); afterEach((done) => { sinon.restore(); - server.close(() => { - mirrorStubServer.close(() => done()); - }); + server.close(done); }); it('should return 200 for /metrics', async () => { @@ -76,30 +75,28 @@ describe('webSocketServer http endpoints', () => { }); it('should return 200 for /health/readiness when chainId is valid', async () => { - const ethStub = sinon.stub(relay, 'eth').returns({ chainId: () => '0x12a' } as any); const res = await httpGet(server, '/health/readiness'); - expect(ethStub.calledOnce).to.be.true; + expect(mockRelay.eth.called).to.be.true; expect(res.status).to.equal(200); expect(res.text).to.equal('OK'); }); it('should return 503 and DOWN for /health/readiness when chainId is not valid', async () => { - const ethStub = sinon.stub(relay, 'eth').returns({ chainId: () => '0xabc' } as any); + mockRelay.eth.returns({ chainId: () => '0xabc' }); const res = await httpGet(server, '/health/readiness'); - expect(ethStub.calledOnce).to.be.true; + expect(mockRelay.eth.called).to.be.true; expect(res.status).to.equal(503); expect(res.text).to.equal('DOWN'); }); it('should log and throw when /health/readiness handler errors', async () => { - sinon.stub(relay, 'eth').throws(new Error()); - const logStub = sinon.stub(logger, 'error'); + // Override the mock to throw an error + mockRelay.eth.throws(new Error('Test error')); const res = await httpGet(server, '/health/readiness'); expect(res.status).to.equal(500); - expect(logStub.called).to.be.true; }); it('should throw 404 for unknown path', async () => { @@ -111,7 +108,6 @@ describe('webSocketServer http endpoints', () => { describe('webSocketServer websocket handling', () => { let server: http.Server; - let mirrorStubServer: http.Server; const sockets: WebSocket[] = []; async function openWsServerAndUpdateSockets(server, socketsArr) { @@ -122,10 +118,18 @@ describe('webSocketServer websocket handling', () => { return ws; } - beforeEach((done) => { - mirrorStubServer = createMirrorStubServer(); - mirrorStubServer.listen(5551, '127.0.0.1', () => { - server = app.listen(0, '127.0.0.1', done); + beforeEach(async function () { + // Initialize the WebSocket server with mocked dependencies + const mockRelay = { + eth: sinon.stub().returns({ chainId: () => '0x12a' }), + mirrorClient: sinon.stub(), + }; + sinon.stub(Relay, 'init').resolves(mockRelay as any); + const { app } = await webSocketServer.initializeWsServer(); + + // Start the WebSocket server and wait for it to start + await new Promise((resolve) => { + server = app.listen(0, '127.0.0.1', () => resolve()); }); }); @@ -139,9 +143,7 @@ describe('webSocketServer websocket handling', () => { } } sockets.length = 0; - server.close(() => { - mirrorStubServer.close(() => done()); - }); + server.close(done); }); it('should send INVALID_REQUEST on malformed JSON', async () => {