Skip to content

Commit 5228307

Browse files
fix(lib-storage): improve concurrency and abortion management
1 parent 756643e commit 5228307

File tree

3 files changed

+101
-37
lines changed

3 files changed

+101
-37
lines changed

lib/lib-storage/src/Upload.ts

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@ import {
1313
Tag,
1414
UploadPartCommand,
1515
} from "@aws-sdk/client-s3";
16-
import { AbortController } from "@smithy/abort-controller";
1716
import {
1817
EndpointParameterInstructionsSupplier,
1918
getEndpointFromInstructions,
2019
toEndpointV1,
2120
} from "@smithy/middleware-endpoint";
2221
import { HttpRequest } from "@smithy/protocol-http";
2322
import { extendedEncodeURIComponent } from "@smithy/smithy-client";
24-
import type { AbortController as IAbortController, AbortSignal as IAbortSignal, Endpoint } from "@smithy/types";
23+
import type { Endpoint } from "@smithy/types";
2524
import { EventEmitter } from "events";
2625

2726
import { byteLength } from "./bytelength";
2827
import { getChunk } from "./chunker";
28+
import { wireSignal } from "./signal";
2929
import { BodyDataTypes, Options, Progress } from "./types";
3030

3131
export interface RawDataPart {
@@ -59,8 +59,7 @@ export class Upload extends EventEmitter {
5959
private bytesUploadedSoFar: number;
6060

6161
// used in the upload.
62-
private abortController: IAbortController;
63-
private concurrentUploaders: Promise<void>[] = [];
62+
private abortController = new AbortController();
6463
private createMultiPartPromise?: Promise<CreateMultipartUploadCommandOutput>;
6564
private abortMultipartUploadCommand: AbortMultipartUploadCommand | null = null;
6665

@@ -93,7 +92,9 @@ export class Upload extends EventEmitter {
9392
// set progress defaults
9493
this.totalBytes = byteLength(this.params.Body);
9594
this.bytesUploadedSoFar = 0;
96-
this.abortController = options.abortController ?? new AbortController();
95+
96+
wireSignal(this.abortController, options.abortSignal);
97+
wireSignal(this.abortController, options.abortController?.signal);
9798
}
9899

99100
async abort(): Promise<void> {
@@ -111,7 +112,12 @@ export class Upload extends EventEmitter {
111112
);
112113
}
113114
this.sent = true;
114-
return await Promise.race([this.__doMultipartUpload(), this.__abortTimeout(this.abortController.signal)]);
115+
116+
try {
117+
return await this.__doMultipartUpload();
118+
} finally {
119+
this.abortController.abort();
120+
}
115121
}
116122

117123
public on(event: "httpUploadProgress", listener: (progress: Progress) => void): this {
@@ -143,7 +149,12 @@ export class Upload extends EventEmitter {
143149
eventEmitter.on("xhr.upload.progress", uploadEventListener);
144150
}
145151

146-
const resolved = await Promise.all([this.client.send(new PutObjectCommand(params)), clientConfig?.endpoint?.()]);
152+
const resolved = await Promise.all([
153+
this.client.send(new PutObjectCommand(params), {
154+
abortSignal: this.abortController.signal,
155+
}),
156+
clientConfig?.endpoint?.(),
157+
]);
147158
const putResult = resolved[0];
148159
let endpoint: Endpoint | undefined = resolved[1];
149160

@@ -291,7 +302,10 @@ export class Upload extends EventEmitter {
291302
UploadId: this.uploadId,
292303
Body: dataPart.data,
293304
PartNumber: dataPart.partNumber,
294-
})
305+
}),
306+
{
307+
abortSignal: this.abortController.signal,
308+
}
295309
);
296310

297311
if (eventEmitter !== null) {
@@ -333,28 +347,27 @@ export class Upload extends EventEmitter {
333347

334348
private async __doMultipartUpload(): Promise<CompleteMultipartUploadCommandOutput> {
335349
const dataFeeder = getChunk(this.params.Body, this.partSize);
336-
const concurrentUploaderFailures: Error[] = [];
350+
const concurrentUploads: Promise<void>[] = [];
337351

338352
for (let index = 0; index < this.queueSize; index++) {
339-
const currentUpload = this.__doConcurrentUpload(dataFeeder).catch((err) => {
340-
concurrentUploaderFailures.push(err);
341-
});
342-
this.concurrentUploaders.push(currentUpload);
353+
const currentUpload = this.__doConcurrentUpload(dataFeeder);
354+
concurrentUploads.push(currentUpload);
343355
}
344356

345-
await Promise.all(this.concurrentUploaders);
346-
if (concurrentUploaderFailures.length >= 1) {
357+
/**
358+
* Previously, each promise in concurrentUploads could potentially throw
359+
* and immediately return control to user code. However, we want to wait for
360+
* all uploaders to finish before calling AbortMultipartUpload to avoid
361+
* stranding uploaded parts.
362+
*
363+
* We throw only the first error to be consistent with prior behavior,
364+
* but may consider combining the errors into a report in the future.
365+
*/
366+
const results = await Promise.allSettled(concurrentUploads);
367+
const firstFailure = results.find((result) => result.status === "rejected");
368+
if (firstFailure) {
347369
await this.markUploadAsAborted();
348-
/**
349-
* Previously, each promise in concurrentUploaders could potentially throw
350-
* and immediately return control to user code. However, we want to wait for
351-
* all uploaders to finish before calling AbortMultipartUpload to avoid
352-
* stranding uploaded parts.
353-
*
354-
* We throw only the first error to be consistent with prior behavior,
355-
* but may consider combining the errors into a report in the future.
356-
*/
357-
throw concurrentUploaderFailures[0];
370+
throw firstFailure.reason;
358371
}
359372

360373
if (this.abortController.signal.aborted) {
@@ -417,16 +430,6 @@ export class Upload extends EventEmitter {
417430
}
418431
}
419432

420-
private async __abortTimeout(abortSignal: IAbortSignal): Promise<never> {
421-
return new Promise((resolve, reject) => {
422-
abortSignal.onabort = () => {
423-
const abortError = new Error("Upload aborted.");
424-
abortError.name = "AbortError";
425-
reject(abortError);
426-
};
427-
});
428-
}
429-
430433
private __validateInput(): void {
431434
if (!this.params) {
432435
throw new Error(`InputError: Upload requires params to be passed to upload.`);

lib/lib-storage/src/signal.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { IAbortSignal } from "./types";
2+
3+
/**
4+
* This function wires an external abort signal to an internal abort controller.
5+
* The internal abort controller will be aborted when the external signal is
6+
* aborted.
7+
*
8+
* Every callback created will be removed as soon as either the internal or
9+
* external signal is aborted. This allows to avoid memory leaks, especially if
10+
* the external signal has a (significantly) longer lifespan than the internal
11+
* one.
12+
*
13+
* In order to ensure that any references are removed, make sure to always
14+
* `abort()` the internal controller when you are done with it.
15+
*/
16+
export function wireSignal(internalController: AbortController, externalSignal?: IAbortSignal): void {
17+
if (!externalSignal || internalController.signal.aborted) {
18+
return;
19+
}
20+
if (externalSignal.aborted) {
21+
internalController.abort();
22+
return;
23+
}
24+
25+
if (isNativeSignal(externalSignal)) {
26+
externalSignal.addEventListener("abort", () => internalController.abort(), {
27+
once: true,
28+
signal: internalController.signal,
29+
});
30+
} else {
31+
// backwards compatibility
32+
const origOnabort = externalSignal.onabort;
33+
const restore = () => {
34+
externalSignal.onabort = origOnabort;
35+
};
36+
37+
externalSignal.onabort = function () {
38+
internalController.abort();
39+
restore();
40+
origOnabort?.call(this);
41+
};
42+
43+
// Let's clear any reference to the internal controller when it is aborted,
44+
// avoiding potential memory leaks.
45+
internalController.signal.addEventListener("abort", restore, { once: true });
46+
}
47+
}
48+
49+
export function isNativeSignal(signal: IAbortSignal): signal is globalThis.AbortSignal {
50+
return "addEventListener" in signal && typeof signal.addEventListener === "function";
51+
}

lib/lib-storage/src/types.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
Tag,
77
UploadPartCommandInput,
88
} from "@aws-sdk/client-s3";
9-
import type { AbortController } from "@smithy/types";
9+
import type { AbortController, AbortSignal } from "@smithy/types";
1010

1111
export interface Progress {
1212
loaded?: number;
@@ -19,6 +19,9 @@ export interface Progress {
1919
// string | Uint8Array | Buffer | Readable | ReadableStream | Blob.
2020
export type BodyDataTypes = PutObjectCommandInput["Body"];
2121

22+
export type IAbortController = AbortController | globalThis.AbortController;
23+
export type IAbortSignal = AbortSignal | globalThis.AbortSignal;
24+
2225
/**
2326
* @deprecated redundant, use {@link S3Client} directly.
2427
*/
@@ -51,8 +54,15 @@ export interface Configuration {
5154

5255
/**
5356
* Optional abort controller for controlling this upload's abort signal externally.
57+
*
58+
* @deprecated use `abortSignal` instead.
59+
*/
60+
abortController?: IAbortController;
61+
62+
/**
63+
* Optional abort signal to communicate aborting operation, must be wired up by caller.
5464
*/
55-
abortController?: AbortController;
65+
abortSignal?: IAbortSignal;
5666
}
5767

5868
export interface Options extends Partial<Configuration> {

0 commit comments

Comments
 (0)