Skip to content

Commit b0f65c2

Browse files
add reconnection example
1 parent bd35703 commit b0f65c2

File tree

10 files changed

+340
-6
lines changed

10 files changed

+340
-6
lines changed

README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,15 +245,18 @@ To enable the feature, set the `wsReconnect` option to an object with the follow
245245
- `reconnectOnClose`: Whether to reconnect on close, as long as the connection from the related client to the proxy is active (default: `false`).
246246
- `logs`: Whether to log the reconnection process (default: `false`).
247247

248+
See the example in [examples/reconnection](examples/reconnection).
249+
248250
## wsHooks
249251

250252
On websocket events, the following hooks are available, note **the hooks are all synchronous**.
251253

252-
- `onConnect`: A hook function that is called when the connection is established `onConnect(source, target)` (default: `undefined`).
253-
- `onDisconnect`: A hook function that is called when the connection is closed `onDisconnect(source)` (default: `undefined`).
254-
- `onReconnect`: A hook function that is called when the connection is reconnected `onReconnect(source, target)` (default: `undefined`).
255254
- `onIncomingMessage`: A hook function that is called when the request is received from the client `onIncomingMessage({ data, binary })` (default: `undefined`).
256255
- `onOutgoingMessage`: A hook function that is called when the response is received from the target `onOutgoingMessage({ data, binary })` (default: `undefined`).
256+
- `onConnect`: A hook function that is called when the connection is established `onConnect(source, target)` (default: `undefined`).
257+
- `onDisconnect`: A hook function that is called when the connection is closed `onDisconnect(source)` (default: `undefined`).
258+
- `onReconnect`: A hook function that is called when the connection is reconnected `onReconnect(source, target)` (default: `undefined`). The function is called if reconnection feature is enabled.
259+
- `onPong`: A hook function that is called when the target responds to the ping `onPong(source, target)` (default: `undefined`). The function is called if reconnection feature is enabled.
257260

258261
## Benchmarks
259262

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Reconnection Example
2+
3+
This example demonstrates how to use the reconnection feature of the proxy.
4+
5+
It simulates an unstable target service: slow to start, unresponsive due to block of the event loop, crash and restart.
6+
7+
The goal is to ensures a more resilient and customizable integration, minimizing disruptions caused by connection instability.
8+
9+
10+
## How to run
11+
12+
Run the unstable target
13+
14+
```
15+
cd examples/reconnection/unstable-target
16+
npm run unstable
17+
```
18+
19+
Run the proxy
20+
21+
```
22+
cd examples/reconnection/proxy
23+
npm run start
24+
```
25+
26+
Then run the client
27+
28+
```
29+
cd examples/reconnection/client
30+
npm run start
31+
```
32+
33+
---
34+
35+
## How it works
36+
37+
### Proxy Connection Monitoring and Recovery
38+
39+
The proxy monitors the target connection using a ping/pong mechanism. If a pong response does not arrive on time, the connection is closed, and the proxy attempts to reconnect.
40+
41+
If the target service crashes, the connection may close either gracefully or abruptly. Regardless of how the disconnection occurs, the proxy detects the connection loss and initiates a reconnection attempt.
42+
43+
### Connection Stability
44+
45+
- The connection between the client and the proxy remains unaffected by an unstable target.
46+
- The connection between the proxy and the target may be closed due to:
47+
- The target failing to respond to ping messages, even if the connection is still technically open (e.g., due to a freeze or blockage).
48+
- The target crashing and restarting.
49+
50+
### Handling Data Loss During Reconnection
51+
52+
The proxy supports hooks to manage potential data loss during reconnection. These hooks allow for custom logic to ensure message integrity when resending data from the client to the target.
53+
54+
Examples of how hooks can be used based on the target service type:
55+
56+
- GraphQL subscriptions: Resend the subscription from the last received message.
57+
- Message brokers: Resend messages starting from the last successfully processed message.
58+
59+
In this example, the proxy re-sends the messages from the last ping to ensure all the messages are sent to the target, without any additional logic.
60+
Resending messages from the last pong ensures that the target does not miss any messages, but it may send messages more than once.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
'use strict'
2+
3+
const WebSocket = require('ws')
4+
5+
const port = process.env.PORT || 3001
6+
7+
// connect to proxy
8+
9+
const url = `ws://localhost:${port}/`
10+
const ws = new WebSocket(url)
11+
const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8', objectMode: true })
12+
13+
client.setEncoding('utf8')
14+
15+
let i = 1
16+
setInterval(() => {
17+
client.write(JSON.stringify({
18+
message: i
19+
}))
20+
i++
21+
}, 1000).unref()
22+
const responses = {}
23+
24+
client.on('data', message => {
25+
const data = JSON.parse(message)
26+
console.log('Received', data)
27+
responses[data.response] = responses[data.response] ? responses[data.response] + 1 : 1
28+
})
29+
30+
client.on('error', error => {
31+
console.log('Error')
32+
console.error(error)
33+
})
34+
35+
client.on('close', () => {
36+
console.log('\n\n\nConnection closed')
37+
38+
console.log('\n\n\nResponses')
39+
for (const key in responses) {
40+
if (!responses[key]) {
41+
console.log('missing', key)
42+
} else if (responses[key] !== 1) {
43+
console.log('extra messages', key, responses[key])
44+
}
45+
}
46+
})
47+
48+
client.on('unexpected-response', (error) => {
49+
console.log('Unexpected response')
50+
console.error(error)
51+
})
52+
53+
client.on('redirect', (error) => {
54+
console.log('Redirect')
55+
console.error(error)
56+
})
57+
58+
client.on('upgrade', (error) => {
59+
console.log('Upgrade')
60+
console.error(error)
61+
})
62+
63+
client.on('ping', (error) => {
64+
console.log('Ping')
65+
console.error(error)
66+
})
67+
68+
client.on('pong', (error) => {
69+
console.log('Pong')
70+
console.error(error)
71+
})
72+
73+
process.on('SIGINT', () => {
74+
client.end()
75+
})
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"name": "client",
3+
"version": "1.0.0",
4+
"main": "index.js",
5+
"scripts": {
6+
"start": "node index.js",
7+
"dev": "node --watch index.js"
8+
},
9+
"dependencies": {
10+
"ws": "^8.18.0"
11+
}
12+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
'use strict'
2+
3+
const { setTimeout: wait } = require('node:timers/promises')
4+
const fastify = require('fastify')
5+
const fastifyHttpProxy = require('../../../')
6+
7+
async function main () {
8+
const port = process.env.PORT || 3001
9+
10+
const wsReconnect = {
11+
logs: true,
12+
pingInterval: 3_000,
13+
reconnectOnClose: true,
14+
}
15+
16+
let backup = []
17+
let lastPong = Date.now()
18+
19+
// resend messages from last ping
20+
// it may send messages more than once
21+
// in case the target already received messages between last ping and the reconnection
22+
async function resendMessages (target) {
23+
const now = Date.now()
24+
25+
for (const m of backup) {
26+
if (m.timestamp < lastPong || m.timestamp > now) {
27+
continue
28+
}
29+
console.log(' >>> resending message #', m)
30+
target.send(m.message)
31+
// introduce a small delay to avoid to flood the target
32+
await wait(250)
33+
}
34+
};
35+
36+
const wsHooks = {
37+
onPong: () => {
38+
console.log('onPong')
39+
lastPong = Date.now()
40+
// clean backup from the last ping
41+
backup = backup.filter(message => message.timestamp > lastPong)
42+
},
43+
onIncomingMessage: (message) => {
44+
const m = message.data.toString()
45+
console.log('onIncomingMessage backup', m)
46+
backup.push({ message: m, timestamp: Date.now() })
47+
},
48+
// onOutgoingMessage: (message) => {
49+
// console.log('onOutgoingMessage', message.data.toString())
50+
// },
51+
onDisconnect: () => {
52+
console.log('onDisconnect')
53+
backup.length = 0
54+
},
55+
onReconnect: (source, target) => {
56+
console.log('onReconnect')
57+
resendMessages(target)
58+
},
59+
}
60+
61+
const proxy = fastify({ logger: true })
62+
proxy.register(fastifyHttpProxy, {
63+
upstream: 'http://localhost:3000/',
64+
websocket: true,
65+
wsUpstream: 'ws://localhost:3000/',
66+
wsReconnect,
67+
wsHooks,
68+
})
69+
70+
await proxy.listen({ port })
71+
}
72+
73+
main()
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"name": "proxy",
3+
"version": "1.0.0",
4+
"main": "index.js",
5+
"scripts": {
6+
"start": "node index.js",
7+
"dev": "node --watch index.js"
8+
},
9+
"dependencies": {
10+
"fastify": "^5.2.1"
11+
}
12+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
'use strict'
2+
3+
const { setTimeout: wait } = require('node:timers/promises')
4+
const fastify = require('fastify')
5+
6+
// unstable service
7+
8+
async function main () {
9+
const SLOW_START = process.env.SLOW_START || 2_000
10+
const UNSTABLE_MIN = process.env.UNSTABLE_MIN || 1_000
11+
const UNSTABLE_MAX = process.env.UNSTABLE_MAX || 10_000
12+
const BLOCK_TIME = process.env.BLOCK_TIME || 5_000
13+
14+
const app = fastify({ logger: true })
15+
16+
// slow start
17+
18+
await wait(SLOW_START)
19+
20+
app.register(require('@fastify/websocket'))
21+
app.register(async function (app) {
22+
app.get('/', { websocket: true }, (socket) => {
23+
socket.on('message', message => {
24+
let m = message.toString()
25+
console.log('incoming message', m)
26+
m = JSON.parse(m)
27+
28+
socket.send(JSON.stringify({
29+
response: m.message
30+
}))
31+
})
32+
})
33+
})
34+
35+
try {
36+
const port = process.env.PORT || 3000
37+
await app.listen({ port })
38+
} catch (err) {
39+
app.log.error(err)
40+
process.exit(1)
41+
}
42+
43+
if (process.env.STABLE) {
44+
return
45+
}
46+
47+
function runProblem () {
48+
const problem = process.env.PROBLEM || (Math.random() < 0.5 ? 'crash' : 'block')
49+
const unstabilityTimeout = process.env.UNSTABLE_TIMEOUT || Math.round(UNSTABLE_MIN + Math.random() * (UNSTABLE_MAX - UNSTABLE_MIN))
50+
51+
if (problem === 'crash') {
52+
console.log(`Restarting (crash and restart) in ${unstabilityTimeout}ms`)
53+
setTimeout(() => {
54+
console.log('UNHANDLED EXCEPTION')
55+
throw new Error('UNHANDLED EXCEPTION')
56+
}, unstabilityTimeout).unref()
57+
} else {
58+
console.log(`Blocking EL in ${unstabilityTimeout}ms for ${BLOCK_TIME}ms`)
59+
60+
setTimeout(() => {
61+
console.log('Block EL ...')
62+
const start = performance.now()
63+
while (performance.now() - start < BLOCK_TIME) {
64+
// just block
65+
}
66+
console.log('Block ends')
67+
runProblem()
68+
}, unstabilityTimeout).unref()
69+
}
70+
}
71+
72+
runProblem()
73+
}
74+
75+
main()
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"name": "unstable-target",
3+
"version": "1.0.0",
4+
"main": "index.js",
5+
"scripts": {
6+
"stable": "STABLE=1 node index.js",
7+
"unstable": "forever index.js",
8+
"dev": "node --watch index.js"
9+
},
10+
"dependencies": {
11+
"fastify": "^5.2.1",
12+
"forever": "^4.0.3"
13+
}
14+
}

examples/ws-reconnection.js

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)