Skip to content

Commit ebce252

Browse files
committed
Avoid thread pinning in SseEmitter, ResponseBodyEmitter
Closes gh-35422 Signed-off-by: Taeik Lim <[email protected]>
1 parent e5b58ef commit ebce252

File tree

2 files changed

+109
-55
lines changed

2 files changed

+109
-55
lines changed

spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java

Lines changed: 108 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.LinkedHashSet;
2222
import java.util.List;
2323
import java.util.Set;
24+
import java.util.concurrent.locks.Lock;
25+
import java.util.concurrent.locks.ReentrantLock;
2426
import java.util.function.Consumer;
2527

2628
import org.jspecify.annotations.Nullable;
@@ -63,6 +65,7 @@
6365
* @author Rossen Stoyanchev
6466
* @author Juergen Hoeller
6567
* @author Brian Clozel
68+
* @author Taeik Lim
6669
* @since 4.2
6770
*/
6871
public class ResponseBodyEmitter {
@@ -86,6 +89,8 @@ public class ResponseBodyEmitter {
8689

8790
private final DefaultCallback completionCallback = new DefaultCallback();
8891

92+
/** Guards access to write operations on the response. */
93+
protected final Lock writeLock = new ReentrantLock();
8994

9095
/**
9196
* Create a new ResponseBodyEmitter instance.
@@ -114,36 +119,48 @@ public ResponseBodyEmitter(Long timeout) {
114119
}
115120

116121

117-
synchronized void initialize(Handler handler) throws IOException {
118-
this.handler = handler;
119-
122+
void initialize(Handler handler) throws IOException {
123+
this.writeLock.lock();
120124
try {
121-
sendInternal(this.earlySendAttempts);
122-
}
123-
finally {
124-
this.earlySendAttempts.clear();
125-
}
125+
this.handler = handler;
126+
127+
try {
128+
sendInternal(this.earlySendAttempts);
129+
}
130+
finally {
131+
this.earlySendAttempts.clear();
132+
}
126133

127-
if (this.complete) {
128-
if (this.failure != null) {
129-
this.handler.completeWithError(this.failure);
134+
if (this.complete) {
135+
if (this.failure != null) {
136+
this.handler.completeWithError(this.failure);
137+
}
138+
else {
139+
this.handler.complete();
140+
}
130141
}
131142
else {
132-
this.handler.complete();
143+
this.handler.onTimeout(this.timeoutCallback);
144+
this.handler.onError(this.errorCallback);
145+
this.handler.onCompletion(this.completionCallback);
133146
}
134147
}
135-
else {
136-
this.handler.onTimeout(this.timeoutCallback);
137-
this.handler.onError(this.errorCallback);
138-
this.handler.onCompletion(this.completionCallback);
148+
finally {
149+
this.writeLock.unlock();
139150
}
140151
}
141152

142-
synchronized void initializeWithError(Throwable ex) {
143-
this.complete = true;
144-
this.failure = ex;
145-
this.earlySendAttempts.clear();
146-
this.errorCallback.accept(ex);
153+
void initializeWithError(Throwable ex) {
154+
this.writeLock.lock();
155+
try {
156+
this.complete = true;
157+
this.failure = ex;
158+
this.earlySendAttempts.clear();
159+
this.errorCallback.accept(ex);
160+
}
161+
finally {
162+
this.writeLock.unlock();
163+
}
147164
}
148165

149166
/**
@@ -180,22 +197,28 @@ public void send(Object object) throws IOException {
180197
* @throws IOException raised when an I/O error occurs
181198
* @throws java.lang.IllegalStateException wraps any other errors
182199
*/
183-
public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException {
200+
public void send(Object object, @Nullable MediaType mediaType) throws IOException {
184201
Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" +
185202
(this.failure != null ? " with error: " + this.failure : ""));
186-
if (this.handler != null) {
187-
try {
188-
this.handler.send(object, mediaType);
189-
}
190-
catch (IOException ex) {
191-
throw ex;
203+
this.writeLock.lock();
204+
try {
205+
if (this.handler != null) {
206+
try {
207+
this.handler.send(object, mediaType);
208+
}
209+
catch (IOException ex) {
210+
throw ex;
211+
}
212+
catch (Throwable ex) {
213+
throw new IllegalStateException("Failed to send " + object, ex);
214+
}
192215
}
193-
catch (Throwable ex) {
194-
throw new IllegalStateException("Failed to send " + object, ex);
216+
else {
217+
this.earlySendAttempts.add(new DataWithMediaType(object, mediaType));
195218
}
196219
}
197-
else {
198-
this.earlySendAttempts.add(new DataWithMediaType(object, mediaType));
220+
finally {
221+
this.writeLock.unlock();
199222
}
200223
}
201224

@@ -208,10 +231,16 @@ public synchronized void send(Object object, @Nullable MediaType mediaType) thro
208231
* @throws java.lang.IllegalStateException wraps any other errors
209232
* @since 6.0.12
210233
*/
211-
public synchronized void send(Set<DataWithMediaType> items) throws IOException {
234+
public void send(Set<DataWithMediaType> items) throws IOException {
212235
Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" +
213236
(this.failure != null ? " with error: " + this.failure : ""));
214-
sendInternal(items);
237+
this.writeLock.lock();
238+
try {
239+
sendInternal(items);
240+
}
241+
finally {
242+
this.writeLock.unlock();
243+
}
215244
}
216245

217246
private void sendInternal(Set<DataWithMediaType> items) throws IOException {
@@ -242,10 +271,16 @@ private void sendInternal(Set<DataWithMediaType> items) throws IOException {
242271
* to complete request processing. It should not be used after container
243272
* related events such as an error while {@link #send(Object) sending}.
244273
*/
245-
public synchronized void complete() {
246-
this.complete = true;
247-
if (this.handler != null) {
248-
this.handler.complete();
274+
public void complete() {
275+
this.writeLock.lock();
276+
try {
277+
this.complete = true;
278+
if (this.handler != null) {
279+
this.handler.complete();
280+
}
281+
}
282+
finally {
283+
this.writeLock.unlock();
249284
}
250285
}
251286

@@ -260,11 +295,17 @@ public synchronized void complete() {
260295
* container related events such as an error while
261296
* {@link #send(Object) sending}.
262297
*/
263-
public synchronized void completeWithError(Throwable ex) {
264-
this.complete = true;
265-
this.failure = ex;
266-
if (this.handler != null) {
267-
this.handler.completeWithError(ex);
298+
public void completeWithError(Throwable ex) {
299+
this.writeLock.lock();
300+
try {
301+
this.complete = true;
302+
this.failure = ex;
303+
if (this.handler != null) {
304+
this.handler.completeWithError(ex);
305+
}
306+
}
307+
finally {
308+
this.writeLock.unlock();
268309
}
269310
}
270311

@@ -273,8 +314,14 @@ public synchronized void completeWithError(Throwable ex) {
273314
* called from a container thread when an async request times out.
274315
* <p>As of 6.2, one can register multiple callbacks for this event.
275316
*/
276-
public synchronized void onTimeout(Runnable callback) {
277-
this.timeoutCallback.addDelegate(callback);
317+
public void onTimeout(Runnable callback) {
318+
this.writeLock.lock();
319+
try {
320+
this.timeoutCallback.addDelegate(callback);
321+
}
322+
finally {
323+
this.writeLock.unlock();
324+
}
278325
}
279326

280327
/**
@@ -284,8 +331,14 @@ public synchronized void onTimeout(Runnable callback) {
284331
* <p>As of 6.2, one can register multiple callbacks for this event.
285332
* @since 5.0
286333
*/
287-
public synchronized void onError(Consumer<Throwable> callback) {
288-
this.errorCallback.addDelegate(callback);
334+
public void onError(Consumer<Throwable> callback) {
335+
this.writeLock.lock();
336+
try {
337+
this.errorCallback.addDelegate(callback);
338+
}
339+
finally {
340+
this.writeLock.unlock();
341+
}
289342
}
290343

291344
/**
@@ -295,8 +348,14 @@ public synchronized void onError(Consumer<Throwable> callback) {
295348
* detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
296349
* <p>As of 6.2, one can register multiple callbacks for this event.
297350
*/
298-
public synchronized void onCompletion(Runnable callback) {
299-
this.completionCallback.addDelegate(callback);
351+
public void onCompletion(Runnable callback) {
352+
this.writeLock.lock();
353+
try {
354+
this.completionCallback.addDelegate(callback);
355+
}
356+
finally {
357+
this.writeLock.unlock();
358+
}
300359
}
301360

302361

spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import java.util.Collections;
2222
import java.util.LinkedHashSet;
2323
import java.util.Set;
24-
import java.util.concurrent.locks.Lock;
25-
import java.util.concurrent.locks.ReentrantLock;
2624

2725
import org.jspecify.annotations.Nullable;
2826

@@ -41,16 +39,13 @@
4139
* @author Juergen Hoeller
4240
* @author Sam Brannen
4341
* @author Brian Clozel
42+
* @author Taeik Lim
4443
* @since 4.2
4544
*/
4645
public class SseEmitter extends ResponseBodyEmitter {
4746

4847
private static final MediaType TEXT_PLAIN = new MediaType("text", "plain", StandardCharsets.UTF_8);
4948

50-
/** Guards access to write operations on the response. */
51-
private final Lock writeLock = new ReentrantLock();
52-
53-
5449
/**
5550
* Create a new SseEmitter instance.
5651
*/

0 commit comments

Comments
 (0)