@@ -77,7 +77,7 @@ func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error)
77
77
cache .waitGroup .Add (1 )
78
78
go func () {
79
79
defer cache .waitGroup .Done ()
80
- cache .getWatchLoop (internalCtx )
80
+ cache .getWatchLoop ()
81
81
}()
82
82
83
83
return cache , nil
@@ -202,14 +202,15 @@ func (c *Cache) Close() {
202
202
c .waitGroup .Wait ()
203
203
}
204
204
205
- func (c * Cache ) getWatchLoop (ctx context. Context ) {
205
+ func (c * Cache ) getWatchLoop () {
206
206
cfg := defaultConfig ()
207
+ ctx := c .internalCtx
207
208
backoff := cfg .InitialBackoff
208
209
for {
209
210
if err := ctx .Err (); err != nil {
210
211
return
211
212
}
212
- if err := c .getWatch (ctx ); err != nil {
213
+ if err := c .getWatch (); err != nil {
213
214
fmt .Printf ("getWatch failed, will retry after %v: %v\n " , backoff , err )
214
215
}
215
216
select {
@@ -220,12 +221,12 @@ func (c *Cache) getWatchLoop(ctx context.Context) {
220
221
}
221
222
}
222
223
223
- func (c * Cache ) getWatch (ctx context. Context ) error {
224
- getResp , err := c .get (ctx )
224
+ func (c * Cache ) getWatch () error {
225
+ getResp , err := c .get (c . internalCtx )
225
226
if err != nil {
226
227
return err
227
228
}
228
- return c .watch (ctx , getResp .Header .Revision + 1 )
229
+ return c .watch (getResp .Header .Revision + 1 )
229
230
}
230
231
231
232
func (c * Cache ) get (ctx context.Context ) (* clientv3.GetResponse , error ) {
@@ -237,36 +238,75 @@ func (c *Cache) get(ctx context.Context) (*clientv3.GetResponse, error) {
237
238
return resp , nil
238
239
}
239
240
240
- func (c * Cache ) watch (ctx context. Context , rev int64 ) error {
241
+ func (c * Cache ) watch (rev int64 ) error {
241
242
readyOnce := sync.Once {}
242
243
for {
244
+ storeW := newWatcher (c .cfg .PerWatcherBufferSize , nil )
245
+ c .demux .Register (storeW , rev )
246
+ applyErr := make (chan error , 1 )
247
+ c .waitGroup .Add (1 )
248
+ go func () {
249
+ defer c .waitGroup .Done ()
250
+ if err := c .applyStorage (storeW ); err != nil {
251
+ applyErr <- err
252
+ }
253
+ close (applyErr )
254
+ }()
255
+
243
256
watchCh := c .watcher .Watch (
244
- ctx ,
257
+ c . internalCtx ,
245
258
c .prefix ,
246
259
clientv3 .WithPrefix (),
247
260
clientv3 .WithRev (rev ),
248
261
clientv3 .WithProgressNotify (),
249
262
clientv3 .WithCreatedNotify (),
250
263
)
251
264
252
- for resp := range watchCh {
253
- readyOnce .Do (func () { c .ready .Set () })
254
- if err := resp .Err (); err != nil {
255
- c .ready .Reset ()
256
- c .demux .Purge ()
265
+ err := c .watchEvents (watchCh , applyErr , & readyOnce )
266
+ c .demux .Unregister (storeW )
267
+
268
+ if err != nil {
269
+ return err
270
+ }
271
+ }
272
+ }
273
+
274
+ func (c * Cache ) applyStorage (storeW * watcher ) error {
275
+ for {
276
+ select {
277
+ case <- c .internalCtx .Done ():
278
+ return nil
279
+ case events , ok := <- storeW .eventQueue :
280
+ if ! ok {
281
+ return nil
282
+ }
283
+ if err := c .store .Apply (events ); err != nil {
257
284
return err
258
285
}
286
+ }
287
+ }
288
+ }
259
289
260
- if err := c .store .Apply (resp .Events ); err != nil {
290
+ func (c * Cache ) watchEvents (watchCh clientv3.WatchChan , applyErr <- chan error , readyOnce * sync.Once ) error {
291
+ for {
292
+ select {
293
+ case <- c .internalCtx .Done ():
294
+ return c .internalCtx .Err ()
295
+ case resp , ok := <- watchCh :
296
+ if ! ok {
297
+ return nil
298
+ }
299
+ readyOnce .Do (func () { c .ready .Set () })
300
+ if err := resp .Err (); err != nil {
261
301
c .ready .Reset ()
262
302
c .demux .Purge ()
263
303
return err
264
304
}
265
305
c .demux .Broadcast (resp .Events )
266
- }
267
-
268
- if ctx . Err () != nil {
269
- return ctx . Err ()
306
+ case err := <- applyErr :
307
+ c . ready . Reset ()
308
+ c . demux . Purge ()
309
+ return err
270
310
}
271
311
}
272
312
}
0 commit comments