Skip to content

Commit ce9fa44

Browse files
committed
refactor: improve force connect pipeline
1 parent 40d235c commit ce9fa44

20 files changed

+120
-190
lines changed

common/lib/plugin_manager.ts

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,32 @@ type PluginFunc<T> = (plugin: ConnectionPlugin, targetFunc: () => Promise<T>) =>
3737

3838
class PluginChain<T> {
3939
private readonly targetFunc: () => Promise<T>;
40-
private chain?: (pluginFunc: PluginFunc<T>, targetFunc: () => Promise<T>) => Promise<T>;
40+
private chain?: (pluginFunc: PluginFunc<T>, targetFunc: () => Promise<T>, pluginToSkip: ConnectionPlugin) => Promise<T>;
4141

4242
constructor(targetFunc: () => Promise<T>) {
4343
this.targetFunc = targetFunc;
4444
}
4545

46-
addToHead(plugin: ConnectionPlugin) {
46+
addToHead(plugin: ConnectionPlugin, pluginToSkip: ConnectionPlugin) {
4747
if (this.chain === undefined) {
48-
this.chain = (pluginFunc, targetFunc) => pluginFunc(plugin, targetFunc);
48+
this.chain = (pluginFunc, targetFunc, pluginToSkip) => pluginFunc(plugin, targetFunc);
4949
} else {
5050
const pipelineSoFar = this.chain;
5151
// @ts-ignore
52-
this.chain = (pluginFunc, targetFunc) => pluginFunc(plugin, () => pipelineSoFar(pluginFunc, targetFunc));
52+
if (plugin !== pluginToSkip) {
53+
this.chain = (pluginFunc, targetFunc, pluginToSkip) => {
54+
return pluginFunc(plugin, () => pipelineSoFar(pluginFunc, targetFunc, pluginToSkip));
55+
};
56+
}
5357
}
5458
return this;
5559
}
5660

57-
execute(pluginFunc: PluginFunc<T>): Promise<T> {
61+
execute(pluginFunc: PluginFunc<T>, pluginToSkip: ConnectionPlugin): Promise<T> {
5862
if (this.chain === undefined) {
5963
throw new AwsWrapperError(Messages.get("PluginManager.pipelineNone"));
6064
}
61-
return this.chain(pluginFunc, this.targetFunc);
65+
return this.chain(pluginFunc, this.targetFunc, pluginToSkip);
6266
}
6367
}
6468

@@ -133,15 +137,28 @@ export class PluginManager {
133137
props,
134138
methodName,
135139
(plugin, nextPluginFunc) => this.runMethodFuncWithTelemetry(() => plugin.execute(methodName, nextPluginFunc, options), plugin.name),
136-
methodFunc
140+
methodFunc,
141+
null
137142
);
138143
});
139144
} finally {
140145
this.pluginServiceManagerContainer.pluginService.attachErrorListener(currentClient);
141146
}
142147
}
143148

144-
async connect(hostInfo: HostInfo | null, props: Map<string, any>, isInitialConnection: boolean): Promise<ClientWrapper> {
149+
async connect(hostInfo: HostInfo | null, props: Map<string, any>, isInitialConnection: boolean): Promise<ClientWrapper>;
150+
async connect(
151+
hostInfo: HostInfo | null,
152+
props: Map<string, any>,
153+
isInitialConnection: boolean,
154+
pluginToSkip: ConnectionPlugin | null
155+
): Promise<ClientWrapper>;
156+
async connect(
157+
hostInfo: HostInfo | null,
158+
props: Map<string, any>,
159+
isInitialConnection: boolean,
160+
pluginToSkip?: ConnectionPlugin | null
161+
): Promise<ClientWrapper> {
145162
if (hostInfo == null) {
146163
throw new AwsWrapperError(Messages.get("HostInfo.noHostParameter"));
147164
}
@@ -156,12 +173,25 @@ export class PluginManager {
156173
this.runMethodFuncWithTelemetry(() => plugin.connect(hostInfo, props, isInitialConnection, nextPluginFunc), plugin.name),
157174
async () => {
158175
throw new AwsWrapperError("Shouldn't be called.");
159-
}
176+
},
177+
pluginToSkip
160178
);
161179
});
162180
}
163181

164-
async forceConnect(hostInfo: HostInfo | null, props: Map<string, any>, isInitialConnection: boolean): Promise<ClientWrapper> {
182+
async forceConnect(hostInfo: HostInfo | null, props: Map<string, any>, isInitialConnection: boolean): Promise<ClientWrapper>;
183+
async forceConnect(
184+
hostInfo: HostInfo | null,
185+
props: Map<string, any>,
186+
isInitialConnection: boolean,
187+
pluginToSkip: ConnectionPlugin
188+
): Promise<ClientWrapper>;
189+
async forceConnect(
190+
hostInfo: HostInfo | null,
191+
props: Map<string, any>,
192+
isInitialConnection: boolean,
193+
pluginToSkip?: ConnectionPlugin
194+
): Promise<ClientWrapper> {
165195
if (hostInfo == null) {
166196
throw new AwsWrapperError(Messages.get("HostInfo.noHostParameter"));
167197
}
@@ -176,7 +206,8 @@ export class PluginManager {
176206
this.runMethodFuncWithTelemetry(() => plugin.forceConnect(hostInfo, props, isInitialConnection, nextPluginFunc), plugin.name),
177207
async () => {
178208
throw new AwsWrapperError("Shouldn't be called.");
179-
}
209+
},
210+
pluginToSkip
180211
);
181212
});
182213
}
@@ -186,19 +217,28 @@ export class PluginManager {
186217
props: Map<string, any>,
187218
methodName: string,
188219
pluginFunc: PluginFunc<T>,
189-
methodFunc: () => Promise<T>
220+
methodFunc: () => Promise<T>,
221+
pluginToSkip: ConnectionPlugin | null
190222
): Promise<T> {
191-
const chain = this.makeExecutePipeline(hostInfo, props, methodName, methodFunc);
192-
return chain.execute(pluginFunc);
223+
const chain = this.makeExecutePipeline(hostInfo, props, methodName, methodFunc, pluginToSkip);
224+
return chain.execute(pluginFunc, pluginToSkip);
193225
}
194226

195-
makeExecutePipeline<T>(hostInfo: HostInfo, props: Map<string, any>, name: string, methodFunc: () => Promise<T>): PluginChain<T> {
227+
pluginToSkip: ConnectionPlugin | null;
228+
229+
makeExecutePipeline<T>(
230+
hostInfo: HostInfo,
231+
props: Map<string, any>,
232+
name: string,
233+
methodFunc: () => Promise<T>,
234+
pluginToSkip: ConnectionPlugin
235+
): PluginChain<T> {
196236
const chain = new PluginChain(methodFunc);
197237

198238
for (let i = this._plugins.length - 1; i >= 0; i--) {
199239
const p = this._plugins[i];
200240
if (p.getSubscribedMethods().has("*") || p.getSubscribedMethods().has(name)) {
201-
chain.addToHead(p);
241+
chain.addToHead(p, pluginToSkip);
202242
}
203243
}
204244

@@ -219,7 +259,8 @@ export class PluginManager {
219259
),
220260
() => {
221261
throw new AwsWrapperError("Shouldn't be called");
222-
}
262+
},
263+
null
223264
);
224265
});
225266
}

common/lib/plugin_service.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import { getWriter, logTopology } from "./utils/utils";
4444
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
4545
import { DriverDialect } from "./driver_dialect/driver_dialect";
4646
import { AllowedAndBlockedHosts } from "./AllowedAndBlockedHosts";
47+
import { ConnectionPlugin } from "./connection_plugin";
4748

4849
export interface PluginService extends ErrorHandler {
4950
isInTransaction(): boolean;
@@ -108,8 +109,16 @@ export interface PluginService extends ErrorHandler {
108109

109110
connect(hostInfo: HostInfo, props: Map<string, any>): Promise<ClientWrapper>;
110111

112+
connect(hostInfo: HostInfo, props: Map<string, any>, pluginToSkip: ConnectionPlugin): Promise<ClientWrapper>;
113+
114+
connect(hostInfo: HostInfo, props: Map<string, any>, pluginToSkip?: ConnectionPlugin): Promise<ClientWrapper>;
115+
111116
forceConnect(hostInfo: HostInfo, props: Map<string, any>): Promise<ClientWrapper>;
112117

118+
forceConnect(hostInfo: HostInfo, props: Map<string, any>, pluginToSkip: ConnectionPlugin): Promise<ClientWrapper>;
119+
120+
forceConnect(hostInfo: HostInfo, props: Map<string, any>, pluginToSkip?: ConnectionPlugin): Promise<ClientWrapper>;
121+
113122
setCurrentClient(newClient: ClientWrapper, hostInfo: HostInfo): Promise<Set<HostChangeOptions>>;
114123

115124
isClientValid(targetClient: ClientWrapper): Promise<boolean>;
@@ -495,12 +504,16 @@ export class PluginServiceImpl implements PluginService, HostListProviderService
495504
return provider.identifyConnection(targetClient, this.dialect);
496505
}
497506

498-
connect(hostInfo: HostInfo, props: Map<string, any>): Promise<ClientWrapper> {
499-
return this.pluginServiceManagerContainer.pluginManager!.connect(hostInfo, props, false);
507+
connect(hostInfo: HostInfo, props: Map<string, any>): Promise<ClientWrapper>;
508+
connect(hostInfo: HostInfo, props: Map<string, any>, pluginToSkip: ConnectionPlugin): Promise<ClientWrapper>;
509+
connect(hostInfo: HostInfo, props: Map<string, any>, pluginToSkip?: ConnectionPlugin): Promise<ClientWrapper> {
510+
return this.pluginServiceManagerContainer.pluginManager!.connect(hostInfo, props, false, pluginToSkip);
500511
}
501512

502-
forceConnect(hostInfo: HostInfo, props: Map<string, any>): Promise<ClientWrapper> {
503-
return this.pluginServiceManagerContainer.pluginManager!.forceConnect(hostInfo, props, false);
513+
forceConnect(hostInfo: HostInfo, props: Map<string, any>): Promise<ClientWrapper>;
514+
forceConnect(hostInfo: HostInfo, props: Map<string, any>, pluginToSkip: ConnectionPlugin): Promise<ClientWrapper>;
515+
forceConnect(hostInfo: HostInfo, props: Map<string, any>, pluginToSkip?: ConnectionPlugin): Promise<ClientWrapper> {
516+
return this.pluginServiceManagerContainer.pluginManager!.forceConnect(hostInfo, props, false, pluginToSkip);
504517
}
505518

506519
async setCurrentClient(newClient: ClientWrapper, hostInfo: HostInfo): Promise<Set<HostChangeOptions>> {

common/lib/plugins/aurora_initial_connection_strategy_plugin.ts

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import { logger } from "../../logutils";
3030
import { ClientWrapper } from "../client_wrapper";
3131

3232
export class AuroraInitialConnectionStrategyPlugin extends AbstractConnectionPlugin {
33-
private static readonly subscribedMethods = new Set<string>(["initHostProvider", "connect", "forceConnect"]);
33+
private static readonly subscribedMethods = new Set<string>(["initHostProvider", "connect"]);
3434
private pluginService: PluginService;
3535
private hostListProviderService?: HostListProviderService;
3636
private rdsUtils = new RdsUtils();
@@ -62,24 +62,6 @@ export class AuroraInitialConnectionStrategyPlugin extends AbstractConnectionPlu
6262
props: Map<string, any>,
6363
isInitialConnection: boolean,
6464
connectFunc: () => Promise<ClientWrapper>
65-
): Promise<ClientWrapper> {
66-
return this.connectInternal(hostInfo, props, isInitialConnection, connectFunc);
67-
}
68-
69-
async forceConnect(
70-
hostInfo: HostInfo,
71-
props: Map<string, any>,
72-
isInitialConnection: boolean,
73-
forceConnectFunc: () => Promise<ClientWrapper>
74-
): Promise<ClientWrapper> {
75-
return this.connectInternal(hostInfo, props, isInitialConnection, forceConnectFunc);
76-
}
77-
78-
async connectInternal(
79-
hostInfo: HostInfo,
80-
props: Map<string, any>,
81-
isInitialConnection: boolean,
82-
connectFunc: () => Promise<ClientWrapper>
8365
): Promise<ClientWrapper> {
8466
const type = this.rdsUtils.identifyRdsType(hostInfo.host);
8567

@@ -148,7 +130,7 @@ export class AuroraInitialConnectionStrategyPlugin extends AbstractConnectionPlu
148130
}
149131
return writerCandidateClient;
150132
}
151-
writerCandidateClient = await this.pluginService.connect(writerCandidate, props);
133+
writerCandidateClient = await this.pluginService.connect(writerCandidate, props, this);
152134

153135
if ((await this.pluginService.getHostRole(writerCandidateClient)) !== HostRole.WRITER) {
154136
// If the new connection resolves to a reader instance, this means the topology is outdated.
@@ -225,7 +207,7 @@ export class AuroraInitialConnectionStrategyPlugin extends AbstractConnectionPlu
225207
}
226208
return readerCandidateClient;
227209
}
228-
readerCandidateClient = await this.pluginService.connect(readerCandidate, props);
210+
readerCandidateClient = await this.pluginService.connect(readerCandidate, props, this);
229211

230212
if ((await this.pluginService.getHostRole(readerCandidateClient)) !== HostRole.READER) {
231213
// If the new connection resolves to a writer instance, this means the topology is outdated.

common/lib/plugins/connection_tracker/aurora_connection_tracker_plugin.ts

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import { HostRole } from "../../host_role";
2828
import { OpenedConnectionTracker } from "./opened_connection_tracker";
2929

3030
export class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin implements CanReleaseResources {
31-
private static readonly subscribedMethods = new Set<string>(["notifyHostListChanged"].concat(SubscribedMethodHelper.NETWORK_BOUND_METHODS));
31+
private static readonly subscribedMethods = new Set<string>(["notifyHostListChanged", "connect", "query", "rollback"]);
3232

3333
private readonly pluginService: PluginService;
3434
private readonly rdsUtils: RdsUtils;
@@ -55,19 +55,6 @@ export class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin impl
5555
isInitialConnection: boolean,
5656
connectFunc: () => Promise<ClientWrapper>
5757
): Promise<ClientWrapper> {
58-
return this.connectInternal(hostInfo, connectFunc);
59-
}
60-
61-
override async forceConnect(
62-
hostInfo: HostInfo,
63-
props: Map<string, any>,
64-
isInitialConnection: boolean,
65-
forceConnectFunc: () => Promise<ClientWrapper>
66-
): Promise<ClientWrapper> {
67-
return this.connectInternal(hostInfo, forceConnectFunc);
68-
}
69-
70-
async connectInternal(hostInfo: HostInfo, connectFunc: () => Promise<ClientWrapper>): Promise<ClientWrapper> {
7158
const targetClient = await connectFunc();
7259

7360
if (targetClient) {

common/lib/plugins/efm/host_monitoring_connection_plugin.ts

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,25 +53,12 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp
5353
return new Set<string>(["*"]);
5454
}
5555

56-
connect(
56+
async connect(
5757
hostInfo: HostInfo,
5858
props: Map<string, any>,
5959
isInitialConnection: boolean,
6060
connectFunc: () => Promise<ClientWrapper>
6161
): Promise<ClientWrapper> {
62-
return this.connectInternal(hostInfo, connectFunc);
63-
}
64-
65-
forceConnect(
66-
hostInfo: HostInfo,
67-
props: Map<string, any>,
68-
isInitialConnection: boolean,
69-
forceConnectFunc: () => Promise<ClientWrapper>
70-
): Promise<ClientWrapper> {
71-
return this.connectInternal(hostInfo, forceConnectFunc);
72-
}
73-
74-
private async connectInternal(hostInfo: HostInfo, connectFunc: () => Promise<ClientWrapper>): Promise<ClientWrapper> {
7562
const targetClient = await connectFunc();
7663
if (targetClient != null) {
7764
const type: RdsUrlType = this.rdsUtils.identifyRdsType(hostInfo.host);

common/lib/plugins/efm2/host_monitoring2_connection_plugin.ts

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,25 +52,12 @@ export class HostMonitoring2ConnectionPlugin extends AbstractConnectionPlugin im
5252
return new Set<string>(["*"]);
5353
}
5454

55-
connect(
55+
async connect(
5656
hostInfo: HostInfo,
5757
props: Map<string, any>,
5858
isInitialConnection: boolean,
5959
connectFunc: () => Promise<ClientWrapper>
6060
): Promise<ClientWrapper> {
61-
return this.connectInternal(hostInfo, connectFunc);
62-
}
63-
64-
forceConnect(
65-
hostInfo: HostInfo,
66-
props: Map<string, any>,
67-
isInitialConnection: boolean,
68-
forceConnectFunc: () => Promise<ClientWrapper>
69-
): Promise<ClientWrapper> {
70-
return this.connectInternal(hostInfo, forceConnectFunc);
71-
}
72-
73-
private async connectInternal(hostInfo: HostInfo, connectFunc: () => Promise<ClientWrapper>): Promise<ClientWrapper> {
7461
const targetClient = await connectFunc();
7562
if (targetClient != null) {
7663
const type: RdsUrlType = this.rdsUtils.identifyRdsType(hostInfo.host);

common/lib/plugins/failover/failover_plugin.ts

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
5252
private static readonly subscribedMethods: Set<string> = new Set([
5353
"initHostProvider",
5454
"connect",
55-
"forceConnect",
5655
"query",
5756
"notifyConnectionChanged",
5857
"notifyHostListChanged"
@@ -246,34 +245,6 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
246245
props: Map<string, any>,
247246
isInitialConnection: boolean,
248247
connectFunc: () => Promise<ClientWrapper>
249-
): Promise<ClientWrapper> {
250-
try {
251-
return await this.connectInternal(hostInfo, props, isInitialConnection, connectFunc);
252-
} catch (e: any) {
253-
logger.debug(`Connect to ${hostInfo.host} failed with message: ${e.message}`);
254-
throw e;
255-
}
256-
}
257-
258-
override async forceConnect(
259-
hostInfo: HostInfo,
260-
props: Map<string, any>,
261-
isInitialConnection: boolean,
262-
forceConnectFunc: () => Promise<ClientWrapper>
263-
): Promise<ClientWrapper> {
264-
try {
265-
return await this.connectInternal(hostInfo, props, isInitialConnection, forceConnectFunc);
266-
} catch (e: any) {
267-
logger.debug(`Force connect to ${hostInfo.host} failed with message: ${e.message}`);
268-
throw e;
269-
}
270-
}
271-
272-
async connectInternal(
273-
hostInfo: HostInfo,
274-
props: Map<string, any>,
275-
isInitialConnection: boolean,
276-
connectFunc: () => Promise<ClientWrapper>
277248
): Promise<ClientWrapper> {
278249
this.initFailoverMode();
279250
return await this._staleDnsHelper.getVerifiedConnection(hostInfo.host, isInitialConnection, this.hostListProviderService!, props, connectFunc);

0 commit comments

Comments
 (0)