Skip to content

Commit 208a0d2

Browse files
Hitless upgrades (#3021)
* feat(errors): Add specialized timeout error types for maintenance scenarios - Added `SocketTimeoutDuringMaintananceError`, a subclass of `TimeoutError`, to handle socket timeouts during maintenance. - Added `CommandTimeoutDuringMaintenanceError`, another subclass of `TimeoutError`, to address command write timeouts during maintenance. * feat(linked-list): Add EmptyAwareSinglyLinkedList and enhance DoublyLinkedList functionality - Introduced `EmptyAwareSinglyLinkedList`, a subclass of `SinglyLinkedList` that emits an `empty` event when the list becomes empty due to `reset`, `shift`, or `remove` operations. - Added `nodes()` iterator method to `DoublyLinkedList` for iterating over nodes directly. - Enhanced unit tests for `DoublyLinkedList` and `SinglyLinkedList` to cover edge cases and new functionality. - Added comprehensive tests for `EmptyAwareSinglyLinkedList` to validate `empty` event emission under various scenarios. - Improved code formatting and consistency. * refactor(commands-queue): Improve push notification handling - Replaced `setInvalidateCallback` with a more flexible `addPushHandler` method, allowing multiple handlers for push notifications. - Introduced the `PushHandler` type to standardize push notification processing. - Refactored `RedisCommandsQueue` to use a `#pushHandlers` array, enabling dynamic and modular handling of push notifications. - Updated `RedisClient` to leverage the new handler mechanism for `invalidate` push notifications, simplifying and decoupling logic. * feat(commands-queue): Add method to wait for in-flight commands to complete - Introduced `waitForInflightCommandsToComplete` method to asynchronously wait for all in-flight commands to finish processing. - Utilized the `empty` event from `#waitingForReply` to signal when all commands have been completed. * feat(commands-queue): Introduce maintenance mode support for commands-queue - Added `#maintenanceCommandTimeout` and `setMaintenanceCommandTimeout` method to dynamically adjust command timeouts during maintenance * refator(client): Extract socket event listener setup into helper method * refactor(socket): Add maintenance mode support and dynamic timeout handling - Added `#maintenanceTimeout` and `setMaintenanceTimeout` method to dynamically adjust socket timeouts during maintenance. * feat(client): Add Redis Enterprise maintenance configuration options - Added `maintPushNotifications` option to control how the client handles Redis Enterprise maintenance push notifications (`disabled`, `enabled`, `au to`). - Added `maintMovingEndpointType` option to specify the endpoint type for reconnecting during a MOVING notification (`auto`, `internal-ip`, `external-ip`, etc.). - Added `maintRelaxedCommandTimeout` option to define a relaxed timeout for commands during maintenance. - Added `maintRelaxedSocketTimeout` option to define a relaxed timeout for the socket during maintenance. - Enforced RESP3 requirement for maintenance-related features (`maintPushNotifications`). * feat(client): Add socket helpers and pause mechanism - Introduced `#paused` flag with corresponding `_pause` and `_unpause` methods to temporarily halt writing commands to the socket during maintenance windows. - Updated `#write` method to respect the `#paused` flag, preventing new commands from being written during maintenance. - Added `_ejectSocket` method to safely detach from and return the current socket - Added `_insertSocket` method to receive and start using a new socket * feat(client): Add Redis Enterprise maintenance handling capabilities - Introduced `EnterpriseMaintenanceManager` to manage Redis Enterprise maintenance events and push notifications. - Integrated `EnterpriseMaintenanceManager` into `RedisClient` to handle maintenance push notifications and manage socket transitions. - Implemented graceful handling of MOVING, MIGRATING, and FAILOVER push notifications, including socket replacement and timeout adjustments. * test: add E2E test infrastructure for Redis maintenance scenarios * test: add E2E tests for Redis Enterprise maintenance timeout handling (#3) * test: add connection handoff test --------- Co-authored-by: Pavel Pashov <[email protected]> Co-authored-by: Pavel Pashov <[email protected]>
1 parent 6ad4c68 commit 208a0d2

20 files changed

+1635
-117
lines changed

package-lock.json

Lines changed: 0 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/bloom/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"release": "release-it"
1414
},
1515
"peerDependencies": {
16-
"@redis/client": "^5.8.2"
16+
"@redis/client": "^5.8.2 || ^5.9.0-0"
1717
},
1818
"devDependencies": {
1919
"@redis/test-utils": "*"

packages/client/lib/client/commands-queue.ts

Lines changed: 84 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
1+
import { DoublyLinkedNode, DoublyLinkedList, EmptyAwareSinglyLinkedList } from './linked-list';
22
import encodeCommand from '../RESP/encoder';
33
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
44
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
55
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
6-
import { AbortError, ErrorReply, TimeoutError } from '../errors';
6+
import { AbortError, ErrorReply, CommandTimeoutDuringMaintenanceError, TimeoutError } from '../errors';
77
import { MonitorCallback } from '.';
8+
import { dbgMaintenance } from './enterprise-maintenance-manager';
89

910
export interface CommandOptions<T = TypeMapping> {
1011
chainId?: symbol;
@@ -30,6 +31,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
3031
timeout: {
3132
signal: AbortSignal;
3233
listener: () => unknown;
34+
originalTimeout: number | undefined;
3335
} | undefined;
3436
}
3537

@@ -50,22 +52,74 @@ const RESP2_PUSH_TYPE_MAPPING = {
5052
[RESP_TYPES.SIMPLE_STRING]: Buffer
5153
};
5254

55+
// Try to handle a push notification. Return whether you
56+
// successfully consumed the notification or not. This is
57+
// important in order for the queue to be able to pass the
58+
// notification to another handler if the current one did not
59+
// succeed.
60+
type PushHandler = (pushItems: Array<any>) => boolean;
61+
5362
export default class RedisCommandsQueue {
5463
readonly #respVersion;
5564
readonly #maxLength;
5665
readonly #toWrite = new DoublyLinkedList<CommandToWrite>();
57-
readonly #waitingForReply = new SinglyLinkedList<CommandWaitingForReply>();
66+
readonly #waitingForReply = new EmptyAwareSinglyLinkedList<CommandWaitingForReply>();
5867
readonly #onShardedChannelMoved;
5968
#chainInExecution: symbol | undefined;
6069
readonly decoder;
6170
readonly #pubSub = new PubSub();
6271

72+
#pushHandlers: PushHandler[] = [this.#onPush.bind(this)];
73+
74+
#maintenanceCommandTimeout: number | undefined
75+
76+
setMaintenanceCommandTimeout(ms: number | undefined) {
77+
// Prevent possible api misuse
78+
if (this.#maintenanceCommandTimeout === ms) {
79+
dbgMaintenance(`Queue already set maintenanceCommandTimeout to ${ms}, skipping`);
80+
return;
81+
};
82+
83+
dbgMaintenance(`Setting maintenance command timeout to ${ms}`);
84+
this.#maintenanceCommandTimeout = ms;
85+
86+
if(this.#maintenanceCommandTimeout === undefined) {
87+
dbgMaintenance(`Queue will keep maintenanceCommandTimeout for exisitng commands, just to be on the safe side. New commands will receive normal timeouts`);
88+
return;
89+
}
90+
91+
let counter = 0;
92+
const total = this.#toWrite.length;
93+
94+
// Overwrite timeouts of all eligible toWrite commands
95+
for(const node of this.#toWrite.nodes()) {
96+
const command = node.value;
97+
98+
// Remove timeout listener if it exists
99+
RedisCommandsQueue.#removeTimeoutListener(command)
100+
101+
counter++;
102+
const newTimeout = this.#maintenanceCommandTimeout;
103+
104+
// Overwrite the command's timeout
105+
const signal = AbortSignal.timeout(newTimeout);
106+
command.timeout = {
107+
signal,
108+
listener: () => {
109+
this.#toWrite.remove(node);
110+
command.reject(new CommandTimeoutDuringMaintenanceError(newTimeout));
111+
},
112+
originalTimeout: command.timeout?.originalTimeout
113+
};
114+
signal.addEventListener('abort', command.timeout.listener, { once: true });
115+
};
116+
dbgMaintenance(`Total of ${counter} of ${total} timeouts reset to ${ms}`);
117+
}
118+
63119
get isPubSubActive() {
64120
return this.#pubSub.isActive;
65121
}
66122

67-
#invalidateCallback?: (key: RedisArgument | null) => unknown;
68-
69123
constructor(
70124
respVersion: RespVersions,
71125
maxLength: number | null | undefined,
@@ -107,6 +161,7 @@ export default class RedisCommandsQueue {
107161
}
108162
return true;
109163
}
164+
return false
110165
}
111166

112167
#getTypeMapping() {
@@ -119,30 +174,27 @@ export default class RedisCommandsQueue {
119174
onErrorReply: err => this.#onErrorReply(err),
120175
//TODO: we can shave off a few cycles by not adding onPush handler at all if CSC is not used
121176
onPush: push => {
122-
if (!this.#onPush(push)) {
123-
// currently only supporting "invalidate" over RESP3 push messages
124-
switch (push[0].toString()) {
125-
case "invalidate": {
126-
if (this.#invalidateCallback) {
127-
if (push[1] !== null) {
128-
for (const key of push[1]) {
129-
this.#invalidateCallback(key);
130-
}
131-
} else {
132-
this.#invalidateCallback(null);
133-
}
134-
}
135-
break;
136-
}
137-
}
177+
for(const pushHandler of this.#pushHandlers) {
178+
if(pushHandler(push)) return
138179
}
139180
},
140181
getTypeMapping: () => this.#getTypeMapping()
141182
});
142183
}
143184

144-
setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) {
145-
this.#invalidateCallback = callback;
185+
addPushHandler(handler: PushHandler): void {
186+
this.#pushHandlers.push(handler);
187+
}
188+
189+
async waitForInflightCommandsToComplete(): Promise<void> {
190+
// In-flight commands already completed
191+
if(this.#waitingForReply.length === 0) {
192+
return
193+
};
194+
// Otherwise wait for in-flight commands to fire `empty` event
195+
return new Promise(resolve => {
196+
this.#waitingForReply.events.on('empty', resolve)
197+
});
146198
}
147199

148200
addCommand<T>(
@@ -168,15 +220,20 @@ export default class RedisCommandsQueue {
168220
typeMapping: options?.typeMapping
169221
};
170222

171-
const timeout = options?.timeout;
223+
// If #maintenanceCommandTimeout was explicitly set, we should
224+
// use it instead of the timeout provided by the command
225+
const timeout = this.#maintenanceCommandTimeout ?? options?.timeout;
226+
const wasInMaintenance = this.#maintenanceCommandTimeout !== undefined;
172227
if (timeout) {
228+
173229
const signal = AbortSignal.timeout(timeout);
174230
value.timeout = {
175231
signal,
176232
listener: () => {
177233
this.#toWrite.remove(node);
178-
value.reject(new TimeoutError());
179-
}
234+
value.reject(wasInMaintenance ? new CommandTimeoutDuringMaintenanceError(timeout) : new TimeoutError());
235+
},
236+
originalTimeout: options?.timeout
180237
};
181238
signal.addEventListener('abort', value.timeout.listener, { once: true });
182239
}
@@ -432,7 +489,7 @@ export default class RedisCommandsQueue {
432489
}
433490

434491
static #removeTimeoutListener(command: CommandToWrite) {
435-
command.timeout!.signal.removeEventListener('abort', command.timeout!.listener);
492+
command.timeout?.signal.removeEventListener('abort', command.timeout!.listener);
436493
}
437494

438495
static #flushToWrite(toBeSent: CommandToWrite, err: Error) {

0 commit comments

Comments
 (0)