Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions packages/client/lib/client/enterprise-maintenance-manager.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import assert from "node:assert";
import { createClient } from "../../";

describe("EnterpriseMaintenanceManager does not prevent proper options parsing", () => {
it("should not throw when initializing without options", async () => {
const client = createClient();
assert.doesNotThrow(async () => {
//Expected to reject because there is no url or socket provided and there is no running server on localhost
await assert.rejects(client.connect);
});
});

it("should not throw when initializing without url/socket and with maint", async () => {
const client = createClient({
maintNotifications: "enabled",
RESP: 3,
});
assert.doesNotThrow(async () => {
//Expected to reject because there is no url or socket provided and there is no running server on localhost
await assert.rejects(client.connect);
});
});
it("should not throw when initializing with url and with maint", async () => {
const client = createClient({
maintNotifications: "enabled",
RESP: 3,
url: "redis://localhost:6379",
});
assert.doesNotThrow(async () => {
//Expected to reject because there is no url or socket provided and there is no running server on localhost
await assert.rejects(client.connect);
});
});

it("should not throw when initializing with socket and with maint", async () => {
const client = createClient({
maintNotifications: "enabled",
RESP: 3,
socket: {
host: "localhost",
port: 6379,
},
});
assert.doesNotThrow(async () => {
//Expected to reject because there is no url or socket provided and there is no running server on localhost
await assert.rejects(client.connect);
});
});
});
34 changes: 20 additions & 14 deletions packages/client/lib/client/enterprise-maintenance-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { isIP } from "net";
import { lookup } from "dns/promises";
import assert from "node:assert";
import { setTimeout } from "node:timers/promises";
import RedisSocket from "./socket";
import RedisSocket, { RedisTcpSocketOptions } from "./socket";
import diagnostics_channel from "node:diagnostics_channel";

export const MAINTENANCE_EVENTS = {
Expand Down Expand Up @@ -64,12 +64,12 @@ export default class EnterpriseMaintenanceManager {
#client: Client;

static setupDefaultMaintOptions(options: RedisClientOptions) {
if (options.maintPushNotifications === undefined) {
options.maintPushNotifications =
if (options.maintNotifications === undefined) {
options.maintNotifications =
options?.RESP === 3 ? "auto" : "disabled";
}
if (options.maintMovingEndpointType === undefined) {
options.maintMovingEndpointType = "auto";
if (options.maintEndpointType === undefined) {
options.maintEndpointType = "auto";
}
if (options.maintRelaxedSocketTimeout === undefined) {
options.maintRelaxedSocketTimeout = 10000;
Expand All @@ -80,14 +80,20 @@ export default class EnterpriseMaintenanceManager {
}

static async getHandshakeCommand(
tls: boolean,
host: string,
options: RedisClientOptions,
): Promise<
| { cmd: Array<RedisArgument>; errorHandler: (error: Error) => void }
| undefined
> {
if (options.maintPushNotifications === "disabled") return;
if (options.maintNotifications === "disabled") return;

const host = options.url
? new URL(options.url).hostname
: (options.socket as RedisTcpSocketOptions | undefined)?.host;

if (!host) return;

const tls = options.socket?.tls ?? false

const movingEndpointType = await determineEndpoint(tls, host, options);
return {
Expand All @@ -100,7 +106,7 @@ export default class EnterpriseMaintenanceManager {
],
errorHandler: (error: Error) => {
dbgMaintenance("handshake failed:", error);
if (options.maintPushNotifications === "enabled") {
if (options.maintNotifications === "enabled") {
throw error;
}
},
Expand Down Expand Up @@ -189,7 +195,7 @@ export default class EnterpriseMaintenanceManager {
// reconnect to its currently configured endpoint after half of the grace
// period that was communicated by the server is over.
if (url === null) {
assert(this.#options.maintMovingEndpointType === "none");
assert(this.#options.maintEndpointType === "none");
assert(this.#options.socket !== undefined);
assert("host" in this.#options.socket);
assert(typeof this.#options.socket.host === "string");
Expand Down Expand Up @@ -329,12 +335,12 @@ async function determineEndpoint(
host: string,
options: RedisClientOptions,
): Promise<MovingEndpointType> {
assert(options.maintMovingEndpointType !== undefined);
if (options.maintMovingEndpointType !== "auto") {
assert(options.maintEndpointType !== undefined);
if (options.maintEndpointType !== "auto") {
dbgMaintenance(
`Determine endpoint type: ${options.maintMovingEndpointType}`,
`Determine endpoint type: ${options.maintEndpointType}`,
);
return options.maintMovingEndpointType;
return options.maintEndpointType;
}

const ip = isIP(host) ? host : (await lookup(host, { family: 0 })).address;
Expand Down
2 changes: 1 addition & 1 deletion packages/client/lib/client/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { strict as assert } from 'node:assert';
import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
import RedisClient, { RedisClientOptions, RedisClientType } from '.';
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, SocketClosedUnexpectedlyError, TimeoutError, WatchError } from '../errors';
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, TimeoutError, WatchError } from '../errors';
import { defineScript } from '../lua-script';
import { spy, stub } from 'sinon';
import { once } from 'node:events';
Expand Down
77 changes: 38 additions & 39 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import COMMANDS from '../commands';
import RedisSocket, { RedisSocketOptions, RedisTcpSocketOptions } from './socket';
import RedisSocket, { RedisSocketOptions } from './socket';
import { BasicAuth, CredentialsError, CredentialsProvider, StreamingCredentialsProvider, UnableToObtainNewCredentialsError, Disposable } from '../authx';
import RedisCommandsQueue, { CommandOptions } from './commands-queue';
import { EventEmitter } from 'node:events';
Expand Down Expand Up @@ -154,7 +154,7 @@ export interface RedisClientOptions<
*
* The default is `auto`.
*/
maintPushNotifications?: 'disabled' | 'enabled' | 'auto';
maintNotifications?: 'disabled' | 'enabled' | 'auto';
/**
* Controls how the client requests the endpoint to reconnect to during a MOVING notification in Redis Enterprise maintenance.
*
Expand All @@ -167,19 +167,19 @@ export interface RedisClientOptions<
* The default is `auto`.
*/
maintMovingEndpointType?: MovingEndpointType;
maintEndpointType?: MovingEndpointType;
/**
* Specifies a more relaxed timeout (in milliseconds) for commands during a maintenance window.
* This helps minimize command timeouts during maintenance. If not provided, the `commandOptions.timeout`
* will be used instead. Timeouts during maintenance period result in a `CommandTimeoutDuringMaintenance` error.
* This helps minimize command timeouts during maintenance. Timeouts during maintenance period result
* in a `CommandTimeoutDuringMaintenance` error.
*
* The default is 10000
*/
maintRelaxedCommandTimeout?: number;
/**
* Specifies a more relaxed timeout (in milliseconds) for the socket during a maintenance window.
* This helps minimize socket timeouts during maintenance. If not provided, the `socket.timeout`
* will be used instead. Timeouts during maintenance period result in a `SocketTimeoutDuringMaintenance` error.
* This helps minimize socket timeouts during maintenance. Timeouts during maintenance period result
* in a `SocketTimeoutDuringMaintenance` error.
*
* The default is 10000
*/
Expand Down Expand Up @@ -429,7 +429,7 @@ export default class RedisClient<
return parsed;
}

readonly #options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>;
readonly #options: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>;
#socket: RedisSocket;
readonly #queue: RedisCommandsQueue;
#selectedDB = 0;
Expand All @@ -453,7 +453,7 @@ export default class RedisClient<
return this._self.#clientSideCache;
}

get options(): RedisClientOptions<M, F, S, RESP> | undefined {
get options(): RedisClientOptions<M, F, S, RESP> {
return this._self.#options;
}

Expand Down Expand Up @@ -503,15 +503,15 @@ export default class RedisClient<
this.#socket = this.#initiateSocket();


if(options?.maintPushNotifications !== 'disabled') {
new EnterpriseMaintenanceManager(this.#queue, this, this.#options!);
if(this.#options.maintNotifications !== 'disabled') {
new EnterpriseMaintenanceManager(this.#queue, this, this.#options);
};

if (options?.clientSideCache) {
if (options.clientSideCache instanceof ClientSideCacheProvider) {
this.#clientSideCache = options.clientSideCache;
if (this.#options.clientSideCache) {
if (this.#options.clientSideCache instanceof ClientSideCacheProvider) {
this.#clientSideCache = this.#options.clientSideCache;
} else {
const cscConfig = options.clientSideCache;
const cscConfig = this.#options.clientSideCache;
this.#clientSideCache = new BasicClientSideCache(cscConfig);
}
this.#queue.addPushHandler((push: Array<any>): boolean => {
Expand All @@ -535,16 +535,16 @@ export default class RedisClient<
throw new Error('Client Side Caching is only supported with RESP3');
}

if (options?.maintPushNotifications && options?.maintPushNotifications !== 'disabled' && options?.RESP !== 3) {
if (options?.maintNotifications && options?.maintNotifications !== 'disabled' && options?.RESP !== 3) {
throw new Error('Graceful Maintenance is only supported with RESP3');
}

}

#initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined {
#initiateOptions(options: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> = {}): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> {

// Convert username/password to credentialsProvider if no credentialsProvider is already in place
if (!options?.credentialsProvider && (options?.username || options?.password)) {
if (!options.credentialsProvider && (options.username || options.password)) {

options.credentialsProvider = {
type: 'async-credentials-provider',
Expand All @@ -555,19 +555,19 @@ export default class RedisClient<
};
}

if (options?.database) {
if (options.database) {
this._self.#selectedDB = options.database;
}

if (options?.commandOptions) {
if (options.commandOptions) {
this._commandOptions = options.commandOptions;
}

if(options?.maintPushNotifications !== 'disabled') {
EnterpriseMaintenanceManager.setupDefaultMaintOptions(options!);
if(options.maintNotifications !== 'disabled') {
EnterpriseMaintenanceManager.setupDefaultMaintOptions(options);
}

if (options?.url) {
if (options.url) {
const parsedOptions = RedisClient.parseOptions(options);
if (parsedOptions?.database) {
this._self.#selectedDB = parsedOptions.database;
Expand All @@ -580,8 +580,8 @@ export default class RedisClient<

#initiateQueue(): RedisCommandsQueue {
return new RedisCommandsQueue(
this.#options?.RESP ?? 2,
this.#options?.commandsQueueMaxLength,
this.#options.RESP ?? 2,
this.#options.commandsQueueMaxLength,
(channel, listeners) => this.emit('sharded-channel-moved', channel, listeners)
);
}
Expand All @@ -591,7 +591,7 @@ export default class RedisClient<
*/
private reAuthenticate = async (credentials: BasicAuth) => {
// Re-authentication is not supported on RESP2 with PubSub active
if (!(this.isPubSubActive && !this.#options?.RESP)) {
if (!(this.isPubSubActive && !this.#options.RESP)) {
await this.sendCommand(
parseArgs(COMMANDS.AUTH, {
username: credentials.username,
Expand Down Expand Up @@ -640,9 +640,9 @@ export default class RedisClient<
Array<{ cmd: CommandArguments } & { errorHandler?: (err: Error) => void }>
> {
const commands = [];
const cp = this.#options?.credentialsProvider;
const cp = this.#options.credentialsProvider;

if (this.#options?.RESP) {
if (this.#options.RESP) {
const hello: HelloOptions = {};

if (cp && cp.type === 'async-credentials-provider') {
Expand Down Expand Up @@ -702,7 +702,7 @@ export default class RedisClient<
}
}

if (this.#options?.name) {
if (this.#options.name) {
commands.push({
cmd: parseArgs(COMMANDS.CLIENT_SETNAME, this.#options.name)
});
Expand All @@ -713,11 +713,11 @@ export default class RedisClient<
commands.push({ cmd: ['SELECT', this.#selectedDB.toString()] });
}

if (this.#options?.readonly) {
if (this.#options.readonly) {
commands.push({ cmd: parseArgs(COMMANDS.READONLY) });
}

if (!this.#options?.disableClientInfo) {
if (!this.#options.disableClientInfo) {
commands.push({
cmd: ['CLIENT', 'SETINFO', 'LIB-VER', version],
errorHandler: () => {
Expand All @@ -732,7 +732,7 @@ export default class RedisClient<
'CLIENT',
'SETINFO',
'LIB-NAME',
this.#options?.clientInfoTag
this.#options.clientInfoTag
? `node-redis(${this.#options.clientInfoTag})`
: 'node-redis'
],
Expand All @@ -748,8 +748,7 @@ export default class RedisClient<
commands.push({cmd: this.#clientSideCache.trackingOn()});
}

const { tls, host } = this.#options!.socket as RedisTcpSocketOptions;
const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(!!tls, host!, this.#options!);
const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(this.#options);
if(maintenanceHandshakeCmd) {
commands.push(maintenanceHandshakeCmd);
};
Expand All @@ -769,7 +768,7 @@ export default class RedisClient<
.on('error', err => {
this.emit('error', err);
this.#clientSideCache?.onError();
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
if (this.#socket.isOpen && !this.#options.disableOfflineQueue) {
this.#queue.flushWaitingForReply(err);
} else {
this.#queue.flushAll(err);
Expand Down Expand Up @@ -817,15 +816,15 @@ export default class RedisClient<
}
};

const socket = new RedisSocket(socketInitiator, this.#options?.socket);
const socket = new RedisSocket(socketInitiator, this.#options.socket);
this.#attachListeners(socket);
return socket;
}

#pingTimer?: NodeJS.Timeout;

#setPingTimer(): void {
if (!this.#options?.pingInterval || !this.#socket.isReady) return;
if (!this.#options.pingInterval || !this.#socket.isReady) return;
clearTimeout(this.#pingTimer);

this.#pingTimer = setTimeout(() => {
Expand Down Expand Up @@ -986,7 +985,7 @@ export default class RedisClient<
transformReply: TransformReply | undefined,
) {
const csc = this._self.#clientSideCache;
const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions;
const defaultTypeMapping = this._self.#options.commandOptions === commandOptions;

const fn = () => { return this.sendCommand(parser.redisArgs, commandOptions) };

Expand Down Expand Up @@ -1035,7 +1034,7 @@ export default class RedisClient<
): Promise<T> {
if (!this._self.#socket.isOpen) {
return Promise.reject(new ClientClosedError());
} else if (!this._self.#socket.isReady && this._self.#options?.disableOfflineQueue) {
} else if (!this._self.#socket.isReady && this._self.#options.disableOfflineQueue) {
return Promise.reject(new ClientOfflineError());
}

Expand Down
Loading