Skip to content

Commit b7d3d32

Browse files
handle graceful shutdown to wait for open workers message channel before disconnecting
1 parent ecc5c9c commit b7d3d32

File tree

2 files changed

+134
-27
lines changed

2 files changed

+134
-27
lines changed

packages/react-on-rails-pro-node-renderer/src/worker/handleGracefulShutdown.ts

Lines changed: 94 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,48 +2,121 @@ import cluster from 'cluster';
22
import { FastifyInstance } from './types.js';
33
import { SHUTDOWN_WORKER_MESSAGE } from '../shared/utils.js';
44
import log from '../shared/log.js';
5+
import { onMessageEnded, onMessageInitiated } from '../workerMessagesRouter.js';
6+
7+
type GracefulShutdownController = {
8+
readonly activeRequestsCount: number;
9+
readonly activeMessageChannelsCount: number;
10+
readonly isShuttingDown: boolean;
11+
markNewRequestReceived: () => void;
12+
markRequestHandled: () => void;
13+
markNewMessageChannelOpened: () => void;
14+
markMessageChannelClosed: () => void;
15+
};
16+
17+
let gracefulShutdownController: GracefulShutdownController;
18+
19+
const setupGracefulShutdownHandler = () => {
20+
if (gracefulShutdownController) {
21+
return gracefulShutdownController;
22+
}
523

6-
const handleGracefulShutdown = (app: FastifyInstance) => {
724
const { worker } = cluster;
825
if (!worker) {
9-
log.error('handleGracefulShutdown is called on master, expected to call it on worker only');
10-
return;
26+
log.error('setupGracefulShutdownHandler is called on master, expected to call it on worker only');
27+
return undefined;
1128
}
1229

1330
let activeRequestsCount = 0;
31+
let activeMessageChannelsCount = 0;
1432
let isShuttingDown = false;
1533

34+
const handleCloseEvent = () => {
35+
if (!isShuttingDown) {
36+
return;
37+
}
38+
39+
if (activeMessageChannelsCount > 0) {
40+
log.info(
41+
'Worker #%d has "%d" active message channels, keep the worker connected',
42+
worker.id,
43+
activeMessageChannelsCount,
44+
);
45+
} else if (activeRequestsCount > 0) {
46+
log.info(
47+
'Worker #%d has "%d" active requests, disconnecting the worker',
48+
worker.id,
49+
activeRequestsCount,
50+
);
51+
worker.disconnect();
52+
} else {
53+
log.info('Worker #%d has no active requests, killing the worker', worker.id);
54+
worker.destroy();
55+
}
56+
};
57+
58+
gracefulShutdownController = {
59+
get activeRequestsCount() {
60+
return activeRequestsCount;
61+
},
62+
get activeMessageChannelsCount() {
63+
return activeMessageChannelsCount;
64+
},
65+
get isShuttingDown() {
66+
return isShuttingDown;
67+
},
68+
markNewRequestReceived: () => {
69+
activeRequestsCount += 1;
70+
},
71+
markRequestHandled: () => {
72+
activeRequestsCount -= 1;
73+
handleCloseEvent();
74+
},
75+
markNewMessageChannelOpened: () => {
76+
activeMessageChannelsCount += 1;
77+
},
78+
markMessageChannelClosed: () => {
79+
activeMessageChannelsCount -= 1;
80+
handleCloseEvent();
81+
},
82+
};
83+
1684
process.on('message', (msg) => {
1785
if (msg === SHUTDOWN_WORKER_MESSAGE) {
18-
log.debug('Worker #%d received graceful shutdown message', worker.id);
86+
log.info('Worker #%d received graceful shutdown message', worker.id);
1987
isShuttingDown = true;
20-
if (activeRequestsCount === 0) {
21-
log.debug('Worker #%d has no active requests, killing the worker', worker.id);
22-
worker.destroy();
23-
} else {
24-
log.debug(
25-
'Worker #%d has "%d" active requests, disconnecting the worker',
26-
worker.id,
27-
activeRequestsCount,
28-
);
29-
worker.disconnect();
30-
}
88+
handleCloseEvent();
3189
}
3290
});
3391

92+
return gracefulShutdownController;
93+
};
94+
95+
const handleGracefulShutdown = (app: FastifyInstance) => {
96+
const controller = setupGracefulShutdownHandler();
97+
if (!controller) {
98+
return;
99+
}
100+
34101
app.addHook('onRequest', (_req, _reply, done) => {
35-
activeRequestsCount += 1;
102+
controller.markNewRequestReceived();
36103
done();
37104
});
38105

39106
app.addHook('onResponse', (_req, _reply, done) => {
40-
activeRequestsCount -= 1;
41-
if (isShuttingDown && activeRequestsCount === 0) {
42-
log.debug('Worker #%d served all active requests and going to be killed', worker.id);
43-
worker.destroy();
44-
}
107+
controller.markRequestHandled();
45108
done();
46109
});
47110
};
48111

112+
onMessageInitiated(() => {
113+
const controller = setupGracefulShutdownHandler();
114+
controller?.markNewMessageChannelOpened();
115+
});
116+
117+
onMessageEnded(() => {
118+
const controller = setupGracefulShutdownHandler();
119+
controller?.markMessageChannelClosed();
120+
});
121+
49122
export default handleGracefulShutdown;

packages/react-on-rails-pro-node-renderer/src/workerMessagesRouter.ts

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,21 @@ export const routeMessagesFromWorker = (worker: Worker) => {
2525
if (!isInterWorkerMessage(msg)) {
2626
return;
2727
}
28-
const { workers } = cluster;
29-
if (!workers) {
28+
const workers = Object.values(cluster.workers ?? {}).filter((w) => {
29+
return worker.id === w?.id || (w?.isConnected() && !w.isScheduledRestart);
30+
});
31+
if (workers.length === 0) {
3032
return;
3133
}
3234

3335
const { requestId, messageType } = msg;
3436
if (messageType === 'initiate') {
35-
const workerIds = Object.keys(workers);
36-
const workersCount = workerIds.length;
37-
const workerIndex = workerIds.indexOf(worker.id.toString());
37+
const workersCount = workers.length;
38+
const workerIndex = workers.findIndex((w) => w?.id === worker.id);
3839
const otherWorker = workers[workersCount - 1 - workerIndex];
40+
if (!otherWorker) {
41+
throw new Error("Can't find a worker to forward the message to");
42+
}
3943

4044
if (!otherWorker) {
4145
return;
@@ -65,6 +69,17 @@ type ReceivedMessage = {
6569
reply: (payload: unknown, close?: boolean) => void;
6670
};
6771

72+
const onMessageInitiatedCallbacks: (() => void)[] = [];
73+
const onMessageEndedCallbacks: (() => void)[] = [];
74+
75+
export const onMessageInitiated = (callback: () => void) => {
76+
onMessageInitiatedCallbacks.push(callback);
77+
};
78+
79+
export const onMessageEnded = (callback: () => void) => {
80+
onMessageEndedCallbacks.push(callback);
81+
};
82+
6883
export const onMessageReceived = (callback: (receivedMessage: ReceivedMessage) => void) => {
6984
process.on('message', (msg) => {
7085
if (!isInterWorkerMessage(msg)) {
@@ -83,8 +98,18 @@ export const onMessageReceived = (callback: (receivedMessage: ReceivedMessage) =
8398
requestId,
8499
};
85100
process.send?.(replyMsg);
101+
102+
if (close) {
103+
onMessageEndedCallbacks.forEach((onMessageEndedCallback) => {
104+
setTimeout(onMessageEndedCallback, 0);
105+
});
106+
}
86107
},
87108
});
109+
110+
onMessageInitiatedCallbacks.forEach((onMessageInitiatedCallback) => {
111+
setTimeout(onMessageInitiatedCallback, 0);
112+
});
88113
}
89114
});
90115
};
@@ -93,8 +118,14 @@ export const sendMessage = (initialPayload: unknown): Readable => {
93118
const requestId = randomUUID();
94119
const reveivedStream = new PassThrough();
95120

121+
onMessageInitiatedCallbacks.forEach((onMessageInitiatedCallback) => {
122+
setTimeout(onMessageInitiatedCallback, 0);
123+
});
124+
96125
process.on('message', function messageReceivedCallback(msg) {
97-
if (!isInterWorkerMessage(msg) || msg.requestId !== requestId) {
126+
// If number of workers are small (1 or 2), the request can be redirected to the same worker
127+
// So, ignore initiate message, it should be handled by the onMessageReceived not the sendMessage function
128+
if (!isInterWorkerMessage(msg) || msg.requestId !== requestId || msg.messageType === 'initiate') {
98129
return;
99130
}
100131

@@ -105,6 +136,9 @@ export const sendMessage = (initialPayload: unknown): Readable => {
105136
if (messageType === 'end') {
106137
reveivedStream.push(null);
107138
process.off('message', messageReceivedCallback);
139+
onMessageEndedCallbacks.forEach((onMessageEndedCallback) => {
140+
setTimeout(onMessageEndedCallback, 0);
141+
});
108142
}
109143
});
110144

0 commit comments

Comments
 (0)