|
1 | 1 | import { Server } from "socket.io"; |
| 2 | +import { createAdapter } from "@socket.io/redis-adapter"; |
| 3 | +import { createClient } from "redis"; |
2 | 4 | import * as Y from "yjs"; |
3 | 5 | import SessionService from "../services/session.service.js"; |
4 | 6 | import { persistSessionHistory } from "../services/sessionHistory.service.js"; |
@@ -124,6 +126,39 @@ const getSessionSnapshot = (sessionId, socket) => { |
124 | 126 | }; |
125 | 127 | }; |
126 | 128 |
|
| 129 | +const initialiseRedisAdapter = async () => { |
| 130 | + const redisUrl = |
| 131 | + process.env.COLLAB_REDIS_URL ?? process.env.REDIS_URL ?? null; |
| 132 | + const redisHost = |
| 133 | + process.env.COLLAB_REDIS_HOST ?? process.env.REDIS_HOST ?? null; |
| 134 | + |
| 135 | + if (!redisUrl && !redisHost) { |
| 136 | + console.log( |
| 137 | + "[collab.socket][redis] Adapter disabled: no COLLAB_REDIS_URL or COLLAB_REDIS_HOST configured.", |
| 138 | + ); |
| 139 | + return null; |
| 140 | + } |
| 141 | + |
| 142 | + const pubClient = createClient({ url: redisUrl }); |
| 143 | + const subClient = pubClient.duplicate(); |
| 144 | + |
| 145 | + pubClient.on("error", (error) => { |
| 146 | + console.error("[collab.socket][redis] Publisher error:", error); |
| 147 | + }); |
| 148 | + subClient.on("error", (error) => { |
| 149 | + console.error("[collab.socket][redis] Subscriber error:", error); |
| 150 | + }); |
| 151 | + |
| 152 | + await Promise.all([pubClient.connect(), subClient.connect()]); |
| 153 | + |
| 154 | + console.log("[collab.socket] Redis adapter connected."); |
| 155 | + |
| 156 | + return { |
| 157 | + adapter: createAdapter(pubClient, subClient), |
| 158 | + clients: [pubClient, subClient], |
| 159 | + }; |
| 160 | +}; |
| 161 | + |
127 | 162 | const normaliseLanguage = (language) => { |
128 | 163 | if (typeof language !== "string") { |
129 | 164 | return null; |
@@ -221,6 +256,28 @@ export const initSocket = (server) => { |
221 | 256 | methods: ["GET", "POST"], |
222 | 257 | }, |
223 | 258 | }); |
| 259 | + const redisClients = []; |
| 260 | + |
| 261 | + (async () => { |
| 262 | + try { |
| 263 | + const redisResources = await initialiseRedisAdapter(); |
| 264 | + if (redisResources?.adapter) { |
| 265 | + io.adapter(redisResources.adapter); |
| 266 | + redisClients.push(...(redisResources.clients ?? [])); |
| 267 | + console.log("[collab.socket] Redis adapter registered with Socket.IO."); |
| 268 | + } |
| 269 | + } catch (error) { |
| 270 | + console.error( |
| 271 | + "[collab.socket][redis] Failed to initialise adapter. Falling back to default adapter.", |
| 272 | + error, |
| 273 | + ); |
| 274 | + } |
| 275 | + })().catch((error) => { |
| 276 | + console.error( |
| 277 | + "[collab.socket][redis] Unexpected error during adapter bootstrap:", |
| 278 | + error, |
| 279 | + ); |
| 280 | + }); |
224 | 281 |
|
225 | 282 | const runInactivitySweep = async () => { |
226 | 283 | const now = Date.now(); |
@@ -693,6 +750,14 @@ export const initSocket = (server) => { |
693 | 750 |
|
694 | 751 | io.engine.on("close", () => { |
695 | 752 | clearInterval(inactivityInterval); |
| 753 | + redisClients.forEach((client) => { |
| 754 | + client.quit?.().catch((error) => { |
| 755 | + console.warn( |
| 756 | + "[collab.socket][redis] Failed to close Redis client cleanly:", |
| 757 | + error, |
| 758 | + ); |
| 759 | + }); |
| 760 | + }); |
696 | 761 | }); |
697 | 762 |
|
698 | 763 | return io; |
|
0 commit comments