Skip to content

Commit 929e947

Browse files
authored
Merge pull request #20541 from apullo777/watcher-respch
cache: change watcher.eventQueue to respCh (clientv3.WatchResponse)
2 parents 915ca5a + 20232ac commit 929e947

File tree

3 files changed

+25
-13
lines changed

3 files changed

+25
-13
lines changed

cache/cache.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption
119119
return
120120
case <-c.internalCtx.Done():
121121
return
122-
case events, ok := <-w.eventQueue:
122+
case resp, ok := <-w.respCh:
123123
if !ok {
124124
if w.cancelResp != nil {
125125
select {
@@ -135,7 +135,7 @@ func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption
135135
return
136136
case <-c.internalCtx.Done():
137137
return
138-
case responseChan <- clientv3.WatchResponse{Events: events}:
138+
case responseChan <- resp:
139139
}
140140
}
141141
}
@@ -272,11 +272,17 @@ func (c *Cache) applyStorage(storeW *watcher) error {
272272
select {
273273
case <-c.internalCtx.Done():
274274
return nil
275-
case events, ok := <-storeW.eventQueue:
275+
case resp, ok := <-storeW.respCh:
276276
if !ok {
277277
return nil
278278
}
279-
if err := c.store.Apply(events); err != nil {
279+
if resp.Canceled {
280+
return nil
281+
}
282+
if len(resp.Events) == 0 {
283+
continue
284+
}
285+
if err := c.store.Apply(resp.Events); err != nil {
280286
return err
281287
}
282288
}

cache/demux_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,15 @@ func readBatches(t *testing.T, w *watcher, n int) (revs []int64, sizes []int) {
154154
timeout := time.After(2 * time.Second)
155155
for len(revs) < n {
156156
select {
157-
case batch := <-w.eventQueue:
158-
revs = append(revs, batch[0].Kv.ModRevision)
159-
sizes = append(sizes, len(batch))
157+
case resp := <-w.respCh:
158+
if resp.Canceled {
159+
t.Fatalf("unexpected canceled response in test: %v", resp.CancelReason)
160+
}
161+
if len(resp.Events) == 0 {
162+
continue
163+
}
164+
revs = append(revs, resp.Events[0].Kv.ModRevision)
165+
sizes = append(sizes, len(resp.Events))
160166
case <-timeout:
161167
t.Fatalf("timed out waiting for %d batches; got %d", n, len(revs))
162168
}

cache/watcher.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@ import (
2323

2424
// watcher holds one client’s buffered stream of events.
2525
type watcher struct {
26-
eventQueue chan []*clientv3.Event
26+
respCh chan clientv3.WatchResponse
2727
cancelResp *clientv3.WatchResponse
2828
keyPred KeyPredicate
2929
stopOnce sync.Once
3030
}
3131

3232
func newWatcher(bufSize int, pred KeyPredicate) *watcher {
3333
return &watcher{
34-
eventQueue: make(chan []*clientv3.Event, bufSize),
35-
keyPred: pred,
34+
respCh: make(chan clientv3.WatchResponse, bufSize),
35+
keyPred: pred,
3636
}
3737
}
3838

@@ -52,7 +52,7 @@ func (w *watcher) enqueueEvent(eventBatch []*clientv3.Event) bool {
5252
eventBatch = filtered
5353
}
5454
select {
55-
case w.eventQueue <- eventBatch:
55+
case w.respCh <- clientv3.WatchResponse{Events: eventBatch}:
5656
return true
5757
default:
5858
return false
@@ -67,13 +67,13 @@ func (w *watcher) Compact(compactRev int64) {
6767
}
6868
w.stopOnce.Do(func() {
6969
w.cancelResp = resp
70-
close(w.eventQueue)
70+
close(w.respCh)
7171
})
7272
}
7373

7474
// Stop closes the event channel atomically.
7575
func (w *watcher) Stop() {
7676
w.stopOnce.Do(func() {
77-
close(w.eventQueue)
77+
close(w.respCh)
7878
})
7979
}

0 commit comments

Comments
 (0)