Skip to content

Commit 112c7d0

Browse files
committed
cache: make cache progress-aware
Signed-off-by: Peter Chang <[email protected]>
1 parent 21d04d7 commit 112c7d0

File tree

7 files changed

+459
-11
lines changed

7 files changed

+459
-11
lines changed

cache/cache.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption
147147
}
148148

149149
func (c *Cache) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
150-
if c.store.LatestRev() == 0 {
150+
if c.store.AppliedRev() == 0 {
151151
if err := c.WaitReady(ctx); err != nil {
152152
return nil, err
153153
}
@@ -186,7 +186,10 @@ func (c *Cache) WaitReady(ctx context.Context) error {
186186

187187
func (c *Cache) WaitForRevision(ctx context.Context, rev int64) error {
188188
for {
189-
if c.store.LatestRev() >= rev {
189+
if c.store.AppliedRev() >= rev {
190+
return nil
191+
}
192+
if c.demux.ObservedRev() >= rev {
190193
return nil
191194
}
192195
select {
@@ -313,7 +316,7 @@ func (c *Cache) watchEvents(watchCh clientv3.WatchChan, applyErr <-chan error, r
313316
}
314317
return err
315318
}
316-
c.demux.Broadcast(resp.Events)
319+
c.demux.Broadcast(resp)
317320
case err := <-applyErr:
318321
c.ready.Reset()
319322
c.demux.Purge()

0 commit comments

Comments
 (0)