Skip to content

Commit 2df23d5

Browse files
shawkinsmanusa
authored andcommitted
fix #5036: addressing the handling of non-connection errors
1 parent 60b018e commit 2df23d5

File tree

17 files changed

+46
-18
lines changed

17 files changed

+46
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* Fix #5015: executing resync as a locking operation to ensure resync event ordering
1616
* Fix #5020: updating the resourceVersion on a delete with finalizers
1717
* Fix #5033: port forwarding for clients other than okhttp needs to specify the subprotocol
18+
* fix #5036: Better websocket error handling for protocol / client enforced errors, also update frame/message limits
1819
* Fix #5044: disable Vert.x instance file caching
1920
* Fix #5059: Vert.x InputStreamReader uses an empty Buffer sentinel to avoid NPE
2021

httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public CompletionStage<?> onClose(java.net.http.WebSocket webSocket, int statusC
8282

8383
@Override
8484
public void onError(java.net.http.WebSocket webSocket, Throwable error) {
85-
listener.onError(new JdkWebSocketImpl(queueSize, webSocket), error);
85+
listener.onError(new JdkWebSocketImpl(queueSize, webSocket), error, false);
8686
}
8787

8888
@Override

httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyHttpClientBuilder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class JettyHttpClientBuilder
3939
extends StandardHttpClientBuilder<JettyHttpClient, JettyHttpClientFactory, JettyHttpClientBuilder> {
4040

4141
private static final int MAX_CONNECTIONS = Integer.MAX_VALUE;
42+
// the default for etcd seems to be 3 MB, but we'll default to unlimited to have the same behavior across clients
43+
private static final int MAX_WS_MESSAGE_SIZE = Integer.MAX_VALUE;
4244

4345
public JettyHttpClientBuilder(JettyHttpClientFactory clientFactory) {
4446
super(clientFactory);
@@ -62,6 +64,10 @@ public JettyHttpClient build() {
6264
}
6365
HttpClient sharedHttpClient = new HttpClient(newTransport(sslContextFactory, preferHttp11));
6466
WebSocketClient sharedWebSocketClient = new WebSocketClient(new HttpClient(newTransport(sslContextFactory, preferHttp11)));
67+
sharedWebSocketClient.setMaxBinaryMessageSize(MAX_WS_MESSAGE_SIZE);
68+
// the api-server does not seem to fragment messages, so the frames can be very large
69+
sharedWebSocketClient.setMaxFrameSize(MAX_WS_MESSAGE_SIZE);
70+
sharedWebSocketClient.setMaxTextMessageSize(MAX_WS_MESSAGE_SIZE);
6571
sharedWebSocketClient.setIdleTimeout(Duration.ZERO);
6672
if (connectTimeout != null) {
6773
sharedHttpClient.setConnectTimeout(connectTimeout.toMillis());

httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.eclipse.jetty.websocket.api.WriteCallback;
2727
import org.eclipse.jetty.websocket.api.exceptions.UpgradeException;
2828

29+
import java.io.IOException;
2930
import java.nio.ByteBuffer;
3031
import java.nio.channels.ClosedChannelException;
3132
import java.util.Collections;
@@ -147,7 +148,7 @@ public void onWebSocketError(Throwable cause) {
147148
// - Jetty throws a ClosedChannelException
148149
return;
149150
}
150-
listener.onError(this, cause);
151+
listener.onError(this, cause, cause instanceof IOException);
151152
}
152153

153154
private void backPressure() {

httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,8 @@ public void onClose(WebSocket webSocket, int code, String reason) {
258258
}
259259

260260
@Override
261-
public void onError(WebSocket webSocket, Throwable error) {
262-
events.put("onError", new Object[] { error });
261+
public void onError(WebSocket webSocket, Throwable error, boolean connectionError) {
262+
events.put("onError", new Object[] { error, connectionError });
263263
}
264264
}
265265
}

httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response respons
9090
future.completeExceptionally(t);
9191
}
9292
} else {
93-
listener.onError(new OkHttpWebSocketImpl(webSocket, this::request), t);
93+
listener.onError(new OkHttpWebSocketImpl(webSocket, this::request), t, true);
9494
}
9595
}
9696

httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public class VertxHttpClientBuilder<F extends HttpClient.Factory>
3636
extends StandardHttpClientBuilder<VertxHttpClient<F>, F, VertxHttpClientBuilder<F>> {
3737

3838
private static final int MAX_CONNECTIONS = 8192;
39+
// the default for etcd seems to be 3 MB, but we'll default to unlimited to have the same behavior across clients
40+
private static final int MAX_WS_MESSAGE_SIZE = Integer.MAX_VALUE;
3941

4042
final Vertx vertx;
4143

@@ -51,6 +53,9 @@ public VertxHttpClient<F> build() {
5153
options.setMaxPoolSize(MAX_CONNECTIONS);
5254
options.setMaxWebSockets(MAX_CONNECTIONS);
5355
options.setIdleTimeoutUnit(TimeUnit.SECONDS);
56+
// the api-server does not seem to fragment messages, so the frames can be very large
57+
options.setMaxWebSocketFrameSize(MAX_WS_MESSAGE_SIZE);
58+
options.setMaxWebSocketMessageSize(MAX_WS_MESSAGE_SIZE);
5459

5560
if (this.connectTimeout != null) {
5661
options.setConnectTimeout((int) this.connectTimeout.toMillis());

httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.netty.buffer.Unpooled;
2121
import io.vertx.core.Future;
2222
import io.vertx.core.buffer.Buffer;
23+
import io.vertx.core.http.HttpClosedException;
2324

2425
import java.nio.ByteBuffer;
2526
import java.util.concurrent.atomic.AtomicInteger;
@@ -52,7 +53,16 @@ void init() {
5253
ws.pongHandler(b -> ws.fetch(1));
5354
// use end, not close, because close is processed immediately vs. end is in frame order
5455
ws.endHandler(v -> listener.onClose(this, ws.closeStatusCode(), ws.closeReason()));
55-
ws.exceptionHandler(err -> listener.onError(this, err));
56+
ws.exceptionHandler(err -> {
57+
try {
58+
listener.onError(this, err, err instanceof HttpClosedException);
59+
} finally {
60+
// onError should be terminal
61+
if (!ws.isClosed()) {
62+
ws.close();
63+
}
64+
}
65+
});
5666
listener.onOpen(this);
5767
}
5868

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public interface WebSocket {
2424

2525
/**
2626
* Callback methods for websocket events. The methods are
27-
* guaranteed to be called serially - except for {@link #onError(WebSocket, Throwable)}
27+
* guaranteed to be called serially - except for {@link Listener#onError(WebSocket, Throwable, boolean)}
2828
*/
2929
interface Listener {
3030

@@ -59,7 +59,7 @@ default void onClose(WebSocket webSocket, int code, String reason) {
5959
* Called when an error has occurred. It's a terminal event, calls to {@link WebSocket#request()}
6060
* do nothing after this.
6161
*/
62-
default void onError(WebSocket webSocket, Throwable error) {
62+
default void onError(WebSocket webSocket, Throwable error, boolean connectionError) {
6363
}
6464

6565
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ public void onOpen(WebSocket webSocket) {
261261
}
262262

263263
@Override
264-
public void onError(WebSocket webSocket, Throwable t) {
264+
public void onError(WebSocket webSocket, Throwable t, boolean connectionError) {
265265
closed.set(true);
266266
HttpResponse<?> response = null;
267267

0 commit comments

Comments
 (0)