Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/util/subscription-set/expiring-sorted-set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,23 @@ export class ExpiringSortedSet<T> implements SubscriptionSet<T> {
add(value: T, ttl: number, key: string) {
let node = this.map.get(key)
if (node) {
if (JSON.stringify(value) !== JSON.stringify(node.data.value)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stringifies are expensive, we should not be performing these here

logger.warn(
`Subscription set received a value for key "${key}" that differs from the stored value. ` +
`Keeping the original value to avoid unnecessary subscription churn. ` +
`This indicates requests are using inconsistent parameter casing - ` +
`stored: ${JSON.stringify(node.data.value)}, incoming: ${JSON.stringify(value)}`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even more stringifies haha

)
}
node.data = {
value,
// Preserve the existing value rather than overwriting it. The key is the
// normalised cache key (e.g. lowercased), so two entries that share a key
// represent the same logical subscription. Overwriting the value with a
// differently-cased variant would cause the streaming transport's
// JSON.stringify-based diff to see a change, triggering an
// unnecessary unsubscribe + resubscribe cycle that can permanently kill
// the provider feed. Only the TTL needs refreshing here.
value: node.data.value,
expirationTimestamp: Date.now() + ttl,
}
this.moveToTail(node)
Expand Down
186 changes: 186 additions & 0 deletions test/transports/websocket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1711,3 +1711,189 @@ test.serial(
await t.context.clock.runToLastAsync()
},
)

test.serial(
'does not unnecessarily unsubscribe when two requests differ only in casing for the same pair',
async (t) => {
// Regression test for the case-sensitivity mismatch between the subscription set
// (keyed by lowercased cache key) and StreamingTransport's local subscription diff
// (which uses JSON.stringify, preserving original casing).
//
// Scenario:
// 1. Request A { base: 'USDe', quote: 'USD' } → subscribes; localSubscriptions=['USDe/USD']
// 2. Request B { base: 'usde', quote: 'usd' } → same cache key, hits cache,
// but overwrites the subscription-set value with the lowercase variant
// 3. Next background execute: desiredSubs=['usde/usd'] vs localSubscriptions=['USDe/USD']
// → JSON.stringify mismatch → unnecessary unsubscribe + resubscribe

mockWebSocketProvider(WebSocketClassProvider)
const mockWsServer = new Server(ENDPOINT_URL, { mock: false })
let subscribeCount = 0
let unsubscribeCount = 0

mockWsServer.on('connection', (socket) => {
socket.on('message', (rawMsg) => {
const msg = rawMsg.toString()
if (msg.startsWith('S:')) {
subscribeCount++
const pair = msg.slice(2)
socket.send(JSON.stringify({ pair, value: price }))
} else {
try {
const parsed = JSON.parse(msg)
if (parsed.request === 'unsubscribe') {
unsubscribeCount++
}
} catch {
// Ignore non-JSON messages
}
}
})
})

const adapter = createAdapter({})

const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)

// First request with mixed-case base — triggers subscribe and populates cache
await testAdapter.startBackgroundExecuteThenGetResponse(t, {
requestData: { base: 'USDe', quote: 'USD' },
expectedResponse: {
data: { result: price },
result: price,
statusCode: 200,
},
})

// Second request with all-lowercase — same cache key, should be a cache hit,
// but overwrites the subscription set value with the lowercase variant
const response = await testAdapter.request({ base: 'usde', quote: 'usd' })
t.is(response.statusCode, 200)

// Advance clock to trigger another background execute cycle
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS + 100)

// Capture the counters before cleanup so assertions run after cleanup.
// Closing the API first (without await) signals the background executor to shut
// down via fake-timer-driven setImmediate; runToLastAsync then fires all pending
// fake timers (Fastify's close, bg executor sleep) so the executor exits cleanly.
const capturedSubscribeCount = subscribeCount
const capturedUnsubscribeCount = unsubscribeCount

testAdapter.api.close()
mockWsServer.close()
await t.context.clock.runToLastAsync()

// With the bug: subscribeCount === 2, unsubscribeCount === 1 (unneccesary unsub+resub
// caused by case mismatch between desiredSubs and localSubscriptions)
// After the fix: subscribeCount === 1, unsubscribeCount === 0
t.is(capturedSubscribeCount, 1)
t.is(capturedUnsubscribeCount, 0)
},
)

test.serial(
'both request variants continue receiving data with case-insensitive provider',
async (t) => {
// Regression test (user-visible impact). With a case-insensitive streaming provider,
// two requests that differ only in casing should both continue receiving data.
//
// The bug:
// 1. Request A { base: 'USDe' } subscribes; localSubscriptions=['USDe/USD']
// 2. Request B { base: 'usde' } overwrites the subscription-set value with lowercase
// 3. Next bg execute: desiredSubs=['usde/usd'] ≠ localSubscriptions=['USDe/USD']
// → sendMessages sends subscribes first, then unsubscribes:
// subscribe usde/usd → provider (case-insensitive) starts/restarts feed
// unsubscribe USDe/USD → provider treats as the same feed and kills it
// 4. After the cycle: localSubscriptions=desiredSubs=['usde/usd'] → no diff on
// the next execute → feed is permanently dead, cache expires → 504
//
// CACHE_MAX_AGE is reduced so the test can observe the expiry without waiting the
// full default 90 s. After the fix: no unnecessary sub/unsub, feed stays alive,
// both variants return 200.

mockWebSocketProvider(WebSocketClassProvider)
const mockWsServer = new Server(ENDPOINT_URL, { mock: false })

// Simulate a case-insensitive streaming provider: sends data on subscribe and
// pushes periodic updates; unsubscribe kills the feed.
let feedActive = false
let activePair = ''
let intervalTimer: ReturnType<typeof setInterval> | null = null

mockWsServer.on('connection', (socket) => {
socket.on('message', (rawMsg) => {
const msg = rawMsg.toString()
if (msg.startsWith('S:')) {
feedActive = true
activePair = msg.slice(2)
socket.send(JSON.stringify({ pair: activePair, value: price }))
// Periodic pushes simulate a streaming provider keeping the cache warm.
if (intervalTimer) {
clearInterval(intervalTimer)
}
const sendPeriodic = () => {
if (feedActive) {
socket.send(JSON.stringify({ pair: activePair, value: price }))
}
}
intervalTimer = setInterval(sendPeriodic, BACKGROUND_EXECUTE_MS_WS)
} else {
try {
const parsed = JSON.parse(msg)
if (parsed.request === 'unsubscribe') {
feedActive = false
if (intervalTimer) {
clearInterval(intervalTimer)
intervalTimer = null
}
}
} catch {
// Ignore non-JSON messages
}
}
})
socket.on('close', () => {
if (intervalTimer) {
clearInterval(intervalTimer)
}
})
})

// Reduced CACHE_MAX_AGE so expiry is observable within the test without waiting
// the full default 90 s. Must be > BACKGROUND_EXECUTE_MS_WS so periodic pushes
// keep the cache warm when the feed is healthy (no bug).
const cacheMaxAge = Math.round(1.5 * BACKGROUND_EXECUTE_MS_WS) // 7500ms
const adapter = createAdapter({
CACHE_MAX_AGE: cacheMaxAge,
})

const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)

// First request (mixed case) — subscribes to provider, starts periodic pushes.
await testAdapter.startBackgroundExecuteThenGetResponse(t, {
requestData: { base: 'USDe', quote: 'USD' },
expectedResponse: { data: { result: price }, result: price, statusCode: 200 },
})

// Second request (lowercase) — same cache key, gets a hit. But it also overwrites
// the subscription set value, setting up the unnecessary unsub/resub cycle.
const hit = await testAdapter.request({ base: 'usde', quote: 'usd' })
t.is(hit.statusCode, 200)

// Advance past two bg-execute cycles and one full cacheMaxAge window.
// With bug: feed is permanently dead after cycle 1 (~5000ms); cache expires at
// ~5000ms + 7500ms = ~12500ms → both variants return 504 by assertion time.
// Without bug: periodic pushes keep refreshing the cache → both variants return 200.
await runAllUntilTime(t.context.clock, 2 * BACKGROUND_EXECUTE_MS_WS + cacheMaxAge + 100)
const response1 = await testAdapter.request({ base: 'USDe', quote: 'USD' })
t.is(response1.statusCode, 200)

const response2 = await testAdapter.request({ base: 'usde', quote: 'usd' })
t.is(response2.statusCode, 200)

testAdapter.api.close()
mockWsServer.close()
await t.context.clock.runToLastAsync()
},
)
Loading