Skip to content

Commit a7125e1

Browse files
feat: Implement Redis storage (#4438)
Signed-off-by: Konstantina Blazhukova <[email protected]>
1 parent 7a24cc8 commit a7125e1

File tree

2 files changed

+225
-0
lines changed

2 files changed

+225
-0
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
import { RedisClientType } from 'redis';
3+
4+
import { AddToListResult, PendingTransactionStorage } from '../../types/transactionPool';
5+
6+
export class RedisPendingTransactionStorage implements PendingTransactionStorage {
7+
/**
8+
* Number of elements to scan per step.
9+
*/
10+
private elementsPerStep = 500;
11+
12+
/**
13+
* Prefix used to namespace all keys managed by this storage.
14+
*
15+
* @remarks
16+
* Using a prefix allows efficient scanning and cleanup of related keys
17+
*/
18+
private readonly keyPrefix = 'pending:';
19+
20+
/**
21+
* Creates a new Redis-backed pending transaction storage.
22+
*
23+
* @param redisClient - A connected {@link RedisClientType} instance.
24+
*/
25+
constructor(private readonly redisClient: RedisClientType) {}
26+
27+
/**
28+
* Resolves the Redis key for a given address.
29+
*
30+
* @param addr - Account address whose pending list key should be derived.
31+
* @returns The Redis key (e.g., `pending:<address>`).
32+
*/
33+
private keyFor(address: string): string {
34+
return `${this.keyPrefix}${address}`;
35+
}
36+
37+
/**
38+
* Appends a transaction hash to the pending list for the provided address.
39+
*
40+
* @remarks
41+
* This uses Redis `RPUSH`, which is atomic. The integer result from Redis is
42+
* the new length of the list after the append.
43+
*
44+
* Duplicate values are not prevented by this method.
45+
*
46+
* @param addr - Account address whose pending list will be appended to.
47+
* @param txHash - Transaction hash to append.
48+
* @returns Result indicating success and the new list length.
49+
*/
50+
async addToList(address: string, txHash: string): Promise<AddToListResult> {
51+
const key = this.keyFor(address);
52+
await this.redisClient.sAdd(key, txHash);
53+
const newLen = await this.redisClient.sCard(key);
54+
55+
return { ok: true, newValue: newLen };
56+
}
57+
58+
/**
59+
* Removes a transaction hash from the pending list of the given address.
60+
*
61+
* @param address - Account address whose pending list should be modified.
62+
* @param txHash - Transaction hash to remove from the list.
63+
* @returns The updated number of pending transactions for the address.
64+
*/
65+
async removeFromList(address: string, txHash: string): Promise<number> {
66+
const key = this.keyFor(address);
67+
await this.redisClient.sRem(key, txHash);
68+
69+
return await this.redisClient.sCard(key);
70+
}
71+
72+
/**
73+
* Removes all keys managed by this storage (all `pending:*`).
74+
*
75+
* @remarks
76+
* Iterates keys using `SCAN` via `scanIterator` to avoid blocking Redis, and
77+
* batches deletions using a pipeline for efficiency.
78+
*/
79+
async removeAll(): Promise<void> {
80+
let pipeline = this.redisClient.multi();
81+
let batched = 0;
82+
for await (const key of this.redisClient.scanIterator({
83+
MATCH: `${this.keyPrefix}*`,
84+
COUNT: this.elementsPerStep,
85+
})) {
86+
pipeline.del(key);
87+
batched++;
88+
if (batched % 100 === 0) {
89+
await pipeline.execAsPipeline();
90+
pipeline = this.redisClient.multi();
91+
}
92+
}
93+
await pipeline.execAsPipeline();
94+
}
95+
96+
/**
97+
* Retrieves the number of pending transactions for a given address.
98+
*
99+
* @param addr - Account address to query.
100+
* @returns The current pending count (0 if the list does not exist).
101+
*/
102+
async getList(address: string): Promise<number> {
103+
const key = this.keyFor(address);
104+
105+
return await this.redisClient.sCard(key);
106+
}
107+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
3+
import chai, { expect } from 'chai';
4+
import chaiAsPromised from 'chai-as-promised';
5+
import { pino } from 'pino';
6+
import { createClient, RedisClientType } from 'redis';
7+
8+
import { RedisPendingTransactionStorage } from '../../../../src/lib/services/transactionPoolService/RedisPendingTransactionStorage';
9+
import { useInMemoryRedisServer } from '../../../helpers';
10+
11+
chai.use(chaiAsPromised);
12+
13+
describe('RedisPendingTransactionStorage Test Suite', function () {
14+
this.timeout(10000);
15+
16+
const logger = pino({ level: 'silent' });
17+
18+
let redisClient: RedisClientType;
19+
let storage: RedisPendingTransactionStorage;
20+
21+
useInMemoryRedisServer(logger, 6390);
22+
23+
before(async () => {
24+
redisClient = createClient({ url: 'redis://127.0.0.1:6390' });
25+
await redisClient.connect();
26+
storage = new RedisPendingTransactionStorage(redisClient);
27+
});
28+
29+
beforeEach(async () => {
30+
await redisClient.flushAll();
31+
});
32+
33+
const addr1 = '0x1111111111111111111111111111111111111111';
34+
const addr2 = '0x2222222222222222222222222222222222222222';
35+
const tx1 = '0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
36+
const tx2 = '0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb';
37+
const tx3 = '0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc';
38+
39+
describe('addToList (Set-based)', () => {
40+
it('adds first transaction and returns size 1', async () => {
41+
const res = await storage.addToList(addr1, tx1);
42+
expect(res.ok).to.equal(true);
43+
if (res.ok) expect(res.newValue).to.equal(1);
44+
const count = await storage.getList(addr1);
45+
expect(count).to.equal(1);
46+
});
47+
48+
it('deduplicates the same transaction hash', async () => {
49+
await storage.addToList(addr1, tx1);
50+
const res = await storage.addToList(addr1, tx1);
51+
expect(res.ok).to.equal(true);
52+
if (res.ok) expect(res.newValue).to.equal(1);
53+
const count = await storage.getList(addr1);
54+
expect(count).to.equal(1);
55+
});
56+
57+
it('adds multiple distinct tx hashes and returns correct size', async () => {
58+
await storage.addToList(addr1, tx1);
59+
const r2 = await storage.addToList(addr1, tx2);
60+
expect(r2.ok).to.equal(true);
61+
if (r2.ok) expect(r2.newValue).to.equal(2);
62+
const count = await storage.getList(addr1);
63+
expect(count).to.equal(2);
64+
});
65+
});
66+
67+
describe('getList (Set-based)', () => {
68+
it('returns 0 for empty/non-existent key', async () => {
69+
const count = await storage.getList(addr2);
70+
expect(count).to.equal(0);
71+
});
72+
73+
it('returns size after multiple adds', async () => {
74+
await storage.addToList(addr1, tx1);
75+
await storage.addToList(addr1, tx2);
76+
const count = await storage.getList(addr1);
77+
expect(count).to.equal(2);
78+
});
79+
});
80+
81+
describe('removeFromList (Set-based)', () => {
82+
it('removes existing tx and returns new size', async () => {
83+
await storage.addToList(addr1, tx1);
84+
await storage.addToList(addr1, tx2);
85+
const remaining = await storage.removeFromList(addr1, tx1);
86+
expect(remaining).to.equal(1);
87+
const count = await storage.getList(addr1);
88+
expect(count).to.equal(1);
89+
});
90+
91+
it('is idempotent when removing non-existent tx', async () => {
92+
await storage.addToList(addr1, tx1);
93+
const remaining = await storage.removeFromList(addr1, tx2);
94+
expect(remaining).to.equal(1);
95+
const count = await storage.getList(addr1);
96+
expect(count).to.equal(1);
97+
});
98+
});
99+
100+
describe('removeAll', () => {
101+
after(async () => {
102+
await redisClient.quit();
103+
});
104+
105+
it('deletes all pending:* keys', async () => {
106+
await storage.addToList(addr1, tx1);
107+
await storage.addToList(addr1, tx2);
108+
await storage.addToList(addr2, tx3);
109+
110+
await storage.removeAll();
111+
112+
const c1 = await storage.getList(addr1);
113+
const c2 = await storage.getList(addr2);
114+
expect(c1).to.equal(0);
115+
expect(c2).to.equal(0);
116+
});
117+
});
118+
});

0 commit comments

Comments
 (0)