3
3
const Readable = require('./readable')
4
4
const {
5
5
InvalidArgumentError,
6
- RequestAbortedError,
7
- ResponseStatusCodeError
6
+ RequestAbortedError
8
7
} = require('../core/errors')
9
8
const util = require('../core/util')
9
+ const { getResolveErrorBodyCallback } = require('./util')
10
10
const { AsyncResource } = require('async_hooks')
11
11
const { addSignal, removeSignal } = require('./abort-signal')
12
12
@@ -16,13 +16,17 @@ class RequestHandler extends AsyncResource {
16
16
throw new InvalidArgumentError('invalid opts')
17
17
}
18
18
19
- const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError } = opts
19
+ const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError, highWaterMark } = opts
20
20
21
21
try {
22
22
if (typeof callback !== 'function') {
23
23
throw new InvalidArgumentError('invalid callback')
24
24
}
25
25
26
+ if (highWaterMark && (typeof highWaterMark !== 'number' || highWaterMark < 0)) {
27
+ throw new InvalidArgumentError('invalid highWaterMark')
28
+ }
29
+
26
30
if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
27
31
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
28
32
}
@@ -53,6 +57,7 @@ class RequestHandler extends AsyncResource {
53
57
this.context = null
54
58
this.onInfo = onInfo || null
55
59
this.throwOnError = throwOnError
60
+ this.highWaterMark = highWaterMark
56
61
57
62
if (util.isStream(body)) {
58
63
body.on('error', (err) => {
@@ -73,40 +78,39 @@ class RequestHandler extends AsyncResource {
73
78
}
74
79
75
80
onHeaders (statusCode, rawHeaders, resume, statusMessage) {
76
- const { callback, opaque, abort, context } = this
81
+ const { callback, opaque, abort, context, responseHeaders, highWaterMark } = this
82
+
83
+ const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
77
84
78
85
if (statusCode < 200) {
79
86
if (this.onInfo) {
80
- const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
81
87
this.onInfo({ statusCode, headers })
82
88
}
83
89
return
84
90
}
85
91
86
- const parsedHeaders = util.parseHeaders(rawHeaders)
92
+ const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
87
93
const contentType = parsedHeaders['content-type']
88
- const body = new Readable(resume, abort, contentType)
94
+ const body = new Readable({ resume, abort, contentType, highWaterMark } )
89
95
90
96
this.callback = null
91
97
this.res = body
92
- const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
93
98
94
99
if (callback !== null) {
95
100
if (this.throwOnError && statusCode >= 400) {
96
101
this.runInAsyncScope(getResolveErrorBodyCallback, null,
97
102
{ callback, body, contentType, statusCode, statusMessage, headers }
98
103
)
99
- return
104
+ } else {
105
+ this.runInAsyncScope(callback, null, null, {
106
+ statusCode,
107
+ headers,
108
+ trailers: this.trailers,
109
+ opaque,
110
+ body,
111
+ context
112
+ })
100
113
}
101
-
102
- this.runInAsyncScope(callback, null, null, {
103
- statusCode,
104
- headers,
105
- trailers: this.trailers,
106
- opaque,
107
- body,
108
- context
109
- })
110
114
}
111
115
}
112
116
@@ -153,33 +157,6 @@ class RequestHandler extends AsyncResource {
153
157
}
154
158
}
155
159
156
- async function getResolveErrorBodyCallback ({ callback, body, contentType, statusCode, statusMessage, headers }) {
157
- if (statusCode === 204 || !contentType) {
158
- body.dump()
159
- process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers))
160
- return
161
- }
162
-
163
- try {
164
- if (contentType.startsWith('application/json')) {
165
- const payload = await body.json()
166
- process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload))
167
- return
168
- }
169
-
170
- if (contentType.startsWith('text/')) {
171
- const payload = await body.text()
172
- process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload))
173
- return
174
- }
175
- } catch (err) {
176
- // Process in a fallback if error
177
- }
178
-
179
- body.dump()
180
- process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers))
181
- }
182
-
183
160
function request (opts, callback) {
184
161
if (callback === undefined) {
185
162
return new Promise((resolve, reject) => {
0 commit comments