Skip to content

Commit f9a40be

Browse files
committed
Avoid thread pinning in SseEmitter, ResponseBodyEmitter
Closes gh-35422 Signed-off-by: Taeik Lim <[email protected]>
1 parent e5b58ef commit f9a40be

File tree

2 files changed

+103
-55
lines changed

2 files changed

+103
-55
lines changed

spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java

Lines changed: 102 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import java.util.LinkedHashSet;
2222
import java.util.List;
2323
import java.util.Set;
24+
import java.util.concurrent.locks.Lock;
25+
import java.util.concurrent.locks.ReentrantLock;
2426
import java.util.function.Consumer;
27+
import java.util.logging.Handler;
2528

2629
import org.jspecify.annotations.Nullable;
2730

@@ -63,6 +66,7 @@
6366
* @author Rossen Stoyanchev
6467
* @author Juergen Hoeller
6568
* @author Brian Clozel
69+
* @author Taeik Lim
6670
* @since 4.2
6771
*/
6872
public class ResponseBodyEmitter {
@@ -86,6 +90,8 @@ public class ResponseBodyEmitter {
8690

8791
private final DefaultCallback completionCallback = new DefaultCallback();
8892

93+
/** Guards access to write operations on the response. */
94+
protected final Lock writeLock = new ReentrantLock();
8995

9096
/**
9197
* Create a new ResponseBodyEmitter instance.
@@ -114,36 +120,46 @@ public ResponseBodyEmitter(Long timeout) {
114120
}
115121

116122

117-
synchronized void initialize(Handler handler) throws IOException {
118-
this.handler = handler;
119-
123+
void initialize(Handler handler) throws IOException {
124+
this.writeLock.lock();
120125
try {
121-
sendInternal(this.earlySendAttempts);
122-
}
123-
finally {
124-
this.earlySendAttempts.clear();
125-
}
126+
this.handler = handler;
127+
128+
try {
129+
sendInternal(this.earlySendAttempts);
130+
}
131+
finally {
132+
this.earlySendAttempts.clear();
133+
}
126134

127-
if (this.complete) {
128-
if (this.failure != null) {
129-
this.handler.completeWithError(this.failure);
135+
if (this.complete) {
136+
if (this.failure != null) {
137+
this.handler.completeWithError(this.failure);
138+
}
139+
else {
140+
this.handler.complete();
141+
}
130142
}
131143
else {
132-
this.handler.complete();
144+
this.handler.onTimeout(this.timeoutCallback);
145+
this.handler.onError(this.errorCallback);
146+
this.handler.onCompletion(this.completionCallback);
133147
}
134-
}
135-
else {
136-
this.handler.onTimeout(this.timeoutCallback);
137-
this.handler.onError(this.errorCallback);
138-
this.handler.onCompletion(this.completionCallback);
148+
} finally {
149+
this.writeLock.unlock();
139150
}
140151
}
141152

142-
synchronized void initializeWithError(Throwable ex) {
143-
this.complete = true;
144-
this.failure = ex;
145-
this.earlySendAttempts.clear();
146-
this.errorCallback.accept(ex);
153+
void initializeWithError(Throwable ex) {
154+
this.writeLock.lock();
155+
try {
156+
this.complete = true;
157+
this.failure = ex;
158+
this.earlySendAttempts.clear();
159+
this.errorCallback.accept(ex);
160+
} finally {
161+
this.writeLock.unlock();
162+
}
147163
}
148164

149165
/**
@@ -180,22 +196,27 @@ public void send(Object object) throws IOException {
180196
* @throws IOException raised when an I/O error occurs
181197
* @throws java.lang.IllegalStateException wraps any other errors
182198
*/
183-
public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException {
199+
public void send(Object object, @Nullable MediaType mediaType) throws IOException {
184200
Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" +
185201
(this.failure != null ? " with error: " + this.failure : ""));
186-
if (this.handler != null) {
187-
try {
188-
this.handler.send(object, mediaType);
189-
}
190-
catch (IOException ex) {
191-
throw ex;
202+
this.writeLock.lock();
203+
try {
204+
if (this.handler != null) {
205+
try {
206+
this.handler.send(object, mediaType);
207+
}
208+
catch (IOException ex) {
209+
throw ex;
210+
}
211+
catch (Throwable ex) {
212+
throw new IllegalStateException("Failed to send " + object, ex);
213+
}
192214
}
193-
catch (Throwable ex) {
194-
throw new IllegalStateException("Failed to send " + object, ex);
215+
else {
216+
this.earlySendAttempts.add(new DataWithMediaType(object, mediaType));
195217
}
196-
}
197-
else {
198-
this.earlySendAttempts.add(new DataWithMediaType(object, mediaType));
218+
} finally {
219+
this.writeLock.unlock();
199220
}
200221
}
201222

@@ -208,10 +229,15 @@ public synchronized void send(Object object, @Nullable MediaType mediaType) thro
208229
* @throws java.lang.IllegalStateException wraps any other errors
209230
* @since 6.0.12
210231
*/
211-
public synchronized void send(Set<DataWithMediaType> items) throws IOException {
232+
public void send(Set<DataWithMediaType> items) throws IOException {
212233
Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" +
213234
(this.failure != null ? " with error: " + this.failure : ""));
214-
sendInternal(items);
235+
this.writeLock.lock();
236+
try {
237+
sendInternal(items);
238+
} finally {
239+
this.writeLock.unlock();
240+
}
215241
}
216242

217243
private void sendInternal(Set<DataWithMediaType> items) throws IOException {
@@ -242,10 +268,15 @@ private void sendInternal(Set<DataWithMediaType> items) throws IOException {
242268
* to complete request processing. It should not be used after container
243269
* related events such as an error while {@link #send(Object) sending}.
244270
*/
245-
public synchronized void complete() {
246-
this.complete = true;
247-
if (this.handler != null) {
248-
this.handler.complete();
271+
public void complete() {
272+
this.writeLock.lock();
273+
try {
274+
this.complete = true;
275+
if (this.handler != null) {
276+
this.handler.complete();
277+
}
278+
} finally {
279+
this.writeLock.unlock();
249280
}
250281
}
251282

@@ -260,11 +291,16 @@ public synchronized void complete() {
260291
* container related events such as an error while
261292
* {@link #send(Object) sending}.
262293
*/
263-
public synchronized void completeWithError(Throwable ex) {
264-
this.complete = true;
265-
this.failure = ex;
266-
if (this.handler != null) {
267-
this.handler.completeWithError(ex);
294+
public void completeWithError(Throwable ex) {
295+
this.writeLock.lock();
296+
try {
297+
this.complete = true;
298+
this.failure = ex;
299+
if (this.handler != null) {
300+
this.handler.completeWithError(ex);
301+
}
302+
} finally {
303+
this.writeLock.unlock();
268304
}
269305
}
270306

@@ -273,8 +309,13 @@ public synchronized void completeWithError(Throwable ex) {
273309
* called from a container thread when an async request times out.
274310
* <p>As of 6.2, one can register multiple callbacks for this event.
275311
*/
276-
public synchronized void onTimeout(Runnable callback) {
277-
this.timeoutCallback.addDelegate(callback);
312+
public void onTimeout(Runnable callback) {
313+
this.writeLock.lock();
314+
try {
315+
this.timeoutCallback.addDelegate(callback);
316+
} finally {
317+
this.writeLock.unlock();
318+
}
278319
}
279320

280321
/**
@@ -284,8 +325,13 @@ public synchronized void onTimeout(Runnable callback) {
284325
* <p>As of 6.2, one can register multiple callbacks for this event.
285326
* @since 5.0
286327
*/
287-
public synchronized void onError(Consumer<Throwable> callback) {
288-
this.errorCallback.addDelegate(callback);
328+
public void onError(Consumer<Throwable> callback) {
329+
this.writeLock.lock();
330+
try {
331+
this.errorCallback.addDelegate(callback);
332+
} finally {
333+
this.writeLock.unlock();
334+
}
289335
}
290336

291337
/**
@@ -295,8 +341,13 @@ public synchronized void onError(Consumer<Throwable> callback) {
295341
* detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
296342
* <p>As of 6.2, one can register multiple callbacks for this event.
297343
*/
298-
public synchronized void onCompletion(Runnable callback) {
299-
this.completionCallback.addDelegate(callback);
344+
public void onCompletion(Runnable callback) {
345+
this.writeLock.lock();
346+
try {
347+
this.completionCallback.addDelegate(callback);
348+
} finally {
349+
this.writeLock.unlock();
350+
}
300351
}
301352

302353

spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,13 @@
4141
* @author Juergen Hoeller
4242
* @author Sam Brannen
4343
* @author Brian Clozel
44+
* @author Taeik Lim
4445
* @since 4.2
4546
*/
4647
public class SseEmitter extends ResponseBodyEmitter {
4748

4849
private static final MediaType TEXT_PLAIN = new MediaType("text", "plain", StandardCharsets.UTF_8);
4950

50-
/** Guards access to write operations on the response. */
51-
private final Lock writeLock = new ReentrantLock();
52-
53-
5451
/**
5552
* Create a new SseEmitter instance.
5653
*/

0 commit comments

Comments
 (0)