Skip to content

Commit 016fada

Browse files
committed
enhance: add support for streaming lists
closes: #5081 Signed-off-by: Steve Hawkins <[email protected]>
1 parent d133717 commit 016fada

File tree

10 files changed

+351
-24
lines changed

10 files changed

+351
-24
lines changed

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/Config.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1057,5 +1057,9 @@ public boolean isOnlyHttpWatches() {
10571057
public void setOnlyHttpWatches(boolean onlyHttpWatches) {
10581058
this.onlyHttpWatches = onlyHttpWatches;
10591059
}
1060+
1061+
public boolean isWatchList() {
1062+
return Optional.ofNullable(watchList).orElse(false);
1063+
}
10601064

10611065
}

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/SundrioConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ public class SundrioConfig {
113113
protected TlsVersion[] tlsVersions;
114114

115115
protected Boolean onlyHttpWatches;
116+
117+
protected Boolean watchList;
116118

117119
/**
118120
* custom headers

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/Watchable.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,19 @@
1515
*/
1616
package io.fabric8.kubernetes.client.dsl;
1717

18+
import io.fabric8.kubernetes.api.model.HasMetadata;
1819
import io.fabric8.kubernetes.api.model.ListOptions;
20+
import io.fabric8.kubernetes.api.model.ListOptionsBuilder;
21+
import io.fabric8.kubernetes.api.model.ObjectMeta;
22+
import io.fabric8.kubernetes.api.model.Status;
23+
import io.fabric8.kubernetes.client.KubernetesClientException;
1924
import io.fabric8.kubernetes.client.Watch;
2025
import io.fabric8.kubernetes.client.Watcher;
26+
import io.fabric8.kubernetes.client.WatcherException;
27+
28+
import java.util.Optional;
29+
import java.util.concurrent.CompletableFuture;
30+
import java.util.function.Consumer;
2131

2232
public interface Watchable<T> {
2333

@@ -55,4 +65,59 @@ public interface Watchable<T> {
5565
@Deprecated
5666
Watch watch(String resourceVersion, Watcher<T> watcher);
5767

68+
/**
69+
* Helper method to use the WatchList feature to list resources.
70+
* A watch is used under the covers, but will be terminated after the initial events.
71+
* <br>
72+
* Not specifying a resourceVersion on the context or using 0 will perform a "consistent read"
73+
* from the time at which the request started processing.
74+
*
75+
* @param onItem a consumer to be called for each item
76+
* @return a CompletableFuture that provides the terminal resourceVersion, or any underlying exception during processing. It
77+
* may be
78+
* cancelled to terminate the streamingList operation early
79+
*/
80+
default CompletableFuture<String> streamingList(Consumer<T> onItem) {
81+
CompletableFuture<String> future = new CompletableFuture<>();
82+
Watch watch = this.watch(new ListOptionsBuilder().withSendInitialEvents(true)
83+
.withResourceVersionMatch("NotOlderThan")
84+
.withAllowWatchBookmarks(true)
85+
.build(), new Watcher<T>() {
86+
87+
@Override
88+
public void eventReceived(Action action, T resource) {
89+
switch (action) {
90+
case ADDED:
91+
onItem.accept(resource);
92+
break;
93+
case BOOKMARK:
94+
if (resource instanceof HasMetadata) {
95+
future.complete(Optional.ofNullable(((HasMetadata) resource).getMetadata())
96+
.map(ObjectMeta::getResourceVersion).orElse(null));
97+
} else {
98+
future.complete(null);
99+
}
100+
break;
101+
default:
102+
if (action == Action.ERROR && resource instanceof Status) {
103+
Status status = (Status) resource;
104+
future.completeExceptionally(new KubernetesClientException(status));
105+
} else {
106+
future.completeExceptionally(
107+
new KubernetesClientException("Unexpected event before list ending bookmark: " + action));
108+
}
109+
break;
110+
}
111+
}
112+
113+
@Override
114+
public void onClose(WatcherException cause) {
115+
future.completeExceptionally(cause);
116+
}
117+
118+
});
119+
future.whenComplete((v, t) -> watch.close());
120+
return future;
121+
}
122+
58123
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis
8989

9090
processorStore = new ProcessorStore<>(this.indexer, this.processor);
9191
this.reflector = new Reflector<>(listerWatcher, processorStore, informerExecutor);
92+
this.reflector.setWatchList(listerWatcher.getConfig().isWatchList());
9293
}
9394

9495
/**

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/ListerWatcher.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.fabric8.kubernetes.api.model.HasMetadata;
1919
import io.fabric8.kubernetes.api.model.ListOptions;
20+
import io.fabric8.kubernetes.client.Config;
2021
import io.fabric8.kubernetes.client.Watcher;
2122
import io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager;
2223

@@ -39,4 +40,6 @@ public interface ListerWatcher<T extends HasMetadata, L> {
3940
int getWatchReconnectInterval();
4041

4142
String getApiEndpointPath();
43+
44+
Config getConfig();
4245
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java

Lines changed: 75 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
6464

6565
private boolean cachedListing = true;
6666

67+
private static class WatchListState {
68+
Set<String> nextKeys = new ConcurrentSkipListSet<>();
69+
private CompletableFuture<Void> listDone = new CompletableFuture<>();
70+
}
71+
72+
private boolean watchList;
73+
private volatile WatchListState watchListState;
74+
6775
public Reflector(ListerWatcher<T, L> listerWatcher, ProcessorStore<T> store) {
6876
this(listerWatcher, store, Runnable::run);
6977
}
@@ -122,22 +130,45 @@ public CompletableFuture<Void> listSyncAndWatch() {
122130
if (isStopped()) {
123131
return CompletableFuture.completedFuture(null);
124132
}
125-
Set<String> nextKeys = new ConcurrentSkipListSet<>();
126-
CompletableFuture<Void> theFuture = processList(nextKeys, null).thenCompose(result -> {
127-
final String latestResourceVersion = result.getMetadata().getResourceVersion();
128-
log.debug("Listing items ({}) for {} at v{}", nextKeys.size(), this, latestResourceVersion);
129-
CompletableFuture<?> cf = new CompletableFuture<>();
130-
store.retainAll(nextKeys, executor -> {
131-
boolean startWatchImmediately = cachedListing && lastSyncResourceVersion == null;
132-
lastSyncResourceVersion = latestResourceVersion;
133-
if (startWatchImmediately) {
134-
cf.complete(null);
135-
} else {
136-
executor.execute(() -> cf.complete(null));
137-
}
133+
134+
CompletableFuture<Void> theFuture = null;
135+
if (watchList) {
136+
watchListState = new WatchListState();
137+
CompletableFuture<Void> cf = watchListState.listDone;
138+
theFuture = establishWatch(startWatcher(lastSyncResourceVersion)).thenCompose(ignored -> cf);
139+
} else {
140+
Set<String> nextKeys = new ConcurrentSkipListSet<>();
141+
CompletableFuture<? extends Watch> startWatcher = processList(nextKeys, null).thenCompose(result -> {
142+
final String latestResourceVersion = result.getMetadata().getResourceVersion();
143+
log.debug("Listing items ({}) for {} at v{}", nextKeys.size(), this, latestResourceVersion);
144+
CompletableFuture<?> cf = new CompletableFuture<>();
145+
store.retainAll(nextKeys, executor -> {
146+
boolean startWatchImmediately = cachedListing && lastSyncResourceVersion == null;
147+
lastSyncResourceVersion = latestResourceVersion;
148+
if (startWatchImmediately) {
149+
cf.complete(null);
150+
} else {
151+
executor.execute(() -> cf.complete(null));
152+
}
153+
});
154+
return cf.thenCompose(ignored -> startWatcher(latestResourceVersion));
138155
});
139-
return cf.thenCompose(ignored -> startWatcher(latestResourceVersion));
140-
}).thenAccept(w -> {
156+
theFuture = establishWatch(startWatcher);
157+
}
158+
159+
theFuture.whenComplete((v, t) -> {
160+
if (t != null) {
161+
onException("listSyncAndWatch", t);
162+
} else {
163+
startFuture.complete(null);
164+
retryIntervalCalculator.resetReconnectAttempts();
165+
}
166+
});
167+
return theFuture;
168+
}
169+
170+
private CompletableFuture<Void> establishWatch(CompletableFuture<? extends Watch> future) {
171+
return future.thenAccept(w -> {
141172
if (w != null) {
142173
if (!isStopped()) {
143174
if (log.isDebugEnabled()) {
@@ -149,15 +180,6 @@ public CompletableFuture<Void> listSyncAndWatch() {
149180
}
150181
}
151182
});
152-
theFuture.whenComplete((v, t) -> {
153-
if (t != null) {
154-
onException("listSyncAndWatch", t);
155-
} else {
156-
startFuture.complete(null);
157-
retryIntervalCalculator.resetReconnectAttempts();
158-
}
159-
});
160-
return theFuture;
161183
}
162184

163185
private void onException(String operation, Throwable t) {
@@ -225,6 +247,9 @@ private synchronized CompletableFuture<? extends Watch> startWatcher(final Strin
225247
// so instead we'll terminate below and set a fail-safe here
226248
// .withTimeoutSeconds((long) ((Math.random() + 1) * minTimeout))
227249
.withTimeoutSeconds(minTimeout * 2)
250+
.withAllowWatchBookmarks(true) // should always allow bookmarks to process the lastResourceVersion here
251+
.withSendInitialEvents(watchListState != null ? true : null)
252+
.withResourceVersionMatch(watchListState != null ? "NotOlderThan" : null)
228253
.build(),
229254
watcher);
230255

@@ -286,6 +311,28 @@ public void eventReceived(Action action, T resource) {
286311
resource.getKind(),
287312
resource.getMetadata().getResourceVersion(), Reflector.this);
288313
}
314+
315+
if (watchListState != null) {
316+
switch (action) {
317+
case ADDED:
318+
String key = store.getKey(resource);
319+
watchListState.nextKeys.add(key);
320+
break;
321+
case BOOKMARK:
322+
// done with the initial events, trigger that we are ready and switch to regular
323+
// watching
324+
log.debug("Listing items ({}) for {} at v{}", watchListState.nextKeys.size(), this,
325+
resource.getMetadata().getResourceVersion());
326+
store.retainAll(watchListState.nextKeys, ignored -> watchListState.listDone.complete(null));
327+
watchListState = null;
328+
break;
329+
case MODIFIED:
330+
case DELETED:
331+
onClose(new WatcherException("Unexpected event before list ending bookmark: " + action));
332+
return;
333+
}
334+
}
335+
289336
switch (action) {
290337
case ERROR:
291338
throw new KubernetesClientException("ERROR event");
@@ -351,4 +398,8 @@ public void usingInitialState() {
351398
this.cachedListing = false;
352399
}
353400

401+
public void setWatchList(boolean watchList) {
402+
this.watchList = watchList;
403+
}
404+
354405
}

kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.fabric8.kubernetes.api.model.PodList;
2020
import io.fabric8.kubernetes.api.model.PodListBuilder;
2121
import io.fabric8.kubernetes.client.KubernetesClientException;
22+
import io.fabric8.kubernetes.client.Watcher.Action;
2223
import io.fabric8.kubernetes.client.WatcherException;
2324
import io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager;
2425
import io.fabric8.kubernetes.client.informers.impl.ListerWatcher;
@@ -169,6 +170,45 @@ void testNonHttpGone() {
169170
assertTrue(reflector.isStopped());
170171
}
171172

173+
@Test
174+
void testWatchListException() {
175+
ListerWatcher<Pod, PodList> mock = Mockito.mock(ListerWatcher.class);
176+
177+
Reflector<Pod, PodList> reflector = new Reflector<>(mock, mockStore);
178+
reflector.setWatchList(true);
179+
180+
Mockito.when(mock.submitWatch(Mockito.any(), Mockito.any()))
181+
.thenReturn(CompletableFuture.completedFuture(Mockito.mock(AbstractWatchManager.class)));
182+
183+
reflector.start();
184+
185+
assertTrue(reflector.isWatching());
186+
assertFalse(reflector.isStopped());
187+
188+
reflector.getWatcher().onClose(new WatcherException(null));
189+
190+
assertFalse(reflector.isWatching());
191+
assertTrue(reflector.isStopped());
192+
}
193+
194+
@Test
195+
void testWatchListEventException() {
196+
ListerWatcher<Pod, PodList> mock = Mockito.mock(ListerWatcher.class);
197+
198+
Reflector<Pod, PodList> reflector = new Reflector<>(mock, mockStore);
199+
reflector.setWatchList(true);
200+
201+
Mockito.when(mock.submitWatch(Mockito.any(), Mockito.any()))
202+
.thenReturn(CompletableFuture.completedFuture(Mockito.mock(AbstractWatchManager.class)));
203+
204+
reflector.start();
205+
206+
reflector.getWatcher().eventReceived(Action.DELETED, new Pod());
207+
208+
assertFalse(reflector.isWatching());
209+
assertTrue(reflector.isStopped());
210+
}
211+
172212
@Test
173213
void testTimeout() {
174214
ListerWatcher<Pod, PodList> mock = Mockito.mock(ListerWatcher.class);

kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/InformTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
import java.net.HttpURLConnection;
4040
import java.util.Arrays;
41+
import java.util.concurrent.CompletableFuture;
4142
import java.util.concurrent.CountDownLatch;
4243
import java.util.concurrent.TimeUnit;
4344
import java.util.stream.Stream;
@@ -109,6 +110,42 @@ public void onUpdate(Pod oldObj, Pod newObj) {
109110
informer.stop();
110111
}
111112

113+
@Test
114+
void testStreamingList() throws Exception {
115+
// Given
116+
Pod pod1 = new PodBuilder().withNewMetadata()
117+
.withNamespace("test")
118+
.withName("pod1")
119+
.withResourceVersion("1")
120+
.endMetadata()
121+
.build();
122+
123+
Pod podBookmark = new PodBuilder().withNewMetadata()
124+
.withResourceVersion("2")
125+
.endMetadata()
126+
.build();
127+
128+
server.expect()
129+
.withPath(
130+
"/api/v1/namespaces/test/pods?allowWatchBookmarks=true&labelSelector=my-label&resourceVersion=3&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true")
131+
.andUpgradeToWebSocket()
132+
.open()
133+
.waitFor(EVENT_WAIT_PERIOD_MS)
134+
.andEmit(new WatchEvent(pod1, "ADDED"))
135+
.waitFor(EVENT_WAIT_PERIOD_MS)
136+
.andEmit(new WatchEvent(podBookmark, "BOOKMARK"))
137+
.done()
138+
.once();
139+
final CountDownLatch addLatch = new CountDownLatch(1);
140+
141+
// When
142+
CompletableFuture<String> future = client.pods().withLabel("my-label").withResourceVersion("3")
143+
.streamingList(pod -> addLatch.countDown());
144+
145+
assertTrue(addLatch.await(10, TimeUnit.SECONDS));
146+
assertEquals("2", future.get(10, TimeUnit.SECONDS));
147+
}
148+
112149
@Test
113150
void testInformGeneric() throws InterruptedException {
114151
// Given
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (C) 2015 Red Hat, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.fabric8.kubernetes.client.mock;
18+
19+
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
20+
21+
import java.util.function.Consumer;
22+
23+
public class WatchListCustomizer implements Consumer<KubernetesClientBuilder> {
24+
25+
@Override
26+
public void accept(KubernetesClientBuilder t) {
27+
t.editOrNewConfig().withWatchList(true).endConfig();
28+
}
29+
30+
}

0 commit comments

Comments
 (0)