Skip to content

Commit d154d71

Browse files
don't sync to write to altDs
1 parent 5088386 commit d154d71

File tree

1 file changed

+12
-52
lines changed

1 file changed

+12
-52
lines changed

provider/datastore/resettable_keystore.go

Lines changed: 12 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ import (
1515
var ErrResetInProgress = errors.New("reset already in progress")
1616

1717
const (
18-
opStart opType = iota + lastOp + 1
18+
opStart opType = iota
1919
opCleanup
20-
opAltPut
2120
)
2221

2322
type resetOp struct {
@@ -57,8 +56,7 @@ type ResettableKeyStore struct {
5756

5857
altDs ds.Batching
5958
resetInProgress bool
60-
resetSync chan []mh.Multihash // passes keys from worker to reset go routine
61-
resetOps chan resetOp // reset operations that must be run in main go routine
59+
resetOps chan resetOp // reset operations that must be run in main go routine
6260
}
6361

6462
var _ KeyStore = (*ResettableKeyStore)(nil)
@@ -85,9 +83,8 @@ func NewResettableKeyStore(d ds.Batching, opts ...KeyStoreOption) (*ResettableKe
8583
close: make(chan struct{}),
8684
done: make(chan struct{}),
8785
},
88-
altDs: namespace.Wrap(d, ds.NewKey(cfg.base+"/1")),
89-
resetOps: make(chan resetOp),
90-
resetSync: make(chan []mh.Multihash, 128), // buffered to avoid blocking
86+
altDs: namespace.Wrap(d, ds.NewKey(cfg.base+"/1")),
87+
resetOps: make(chan resetOp),
9188
}
9289

9390
// start worker goroutine
@@ -129,11 +126,6 @@ func (s *ResettableKeyStore) worker() {
129126
case opSize:
130127
size, err := s.size(op.ctx)
131128
op.response <- operationResponse{size: size, err: err}
132-
133-
case opAltPut:
134-
err := s.altPut(op.ctx, op.keys)
135-
op.response <- operationResponse{err: err}
136-
137129
}
138130
case op := <-s.resetOps:
139131
s.handleResetOp(op)
@@ -147,7 +139,7 @@ func (s *ResettableKeyStore) put(ctx context.Context, keys []mh.Multihash) ([]mh
147139
if s.resetInProgress {
148140
// Reset is in progress, write to alternate datastore in addition to
149141
// current datastore
150-
s.resetSync <- keys
142+
s.altPut(ctx, keys)
151143
}
152144
return s.keyStore.put(ctx, keys)
153145
}
@@ -190,15 +182,6 @@ func (s *ResettableKeyStore) handleResetOp(op resetOp) {
190182
s.ds = s.altDs
191183
s.altDs = oldDs
192184
}
193-
// Drain resetSync
194-
drain:
195-
for {
196-
select {
197-
case <-s.resetSync:
198-
default:
199-
break drain
200-
}
201-
}
202185
// Empty the unused datastore.
203186
s.resetInProgress = false
204187
op.response <- empty(context.Background(), s.altDs, s.batchSize)
@@ -254,31 +237,8 @@ func (s *ResettableKeyStore) ResetCids(ctx context.Context, keysChan <-chan cid.
254237
}
255238
}()
256239

257-
rsp := make(chan operationResponse)
258-
batchPut := func(ctx context.Context, keys []mh.Multihash) error {
259-
select {
260-
case <-s.done:
261-
return ErrKeyStoreClosed
262-
case <-ctx.Done():
263-
return ctx.Err()
264-
case s.requests <- operation{op: opAltPut, ctx: ctx, keys: keys, response: rsp}:
265-
return (<-rsp).err
266-
}
267-
}
268-
269240
keys := make([]mh.Multihash, 0)
270241

271-
processNewKeys := func(newKeys ...mh.Multihash) error {
272-
keys = append(keys, newKeys...)
273-
if len(keys) >= s.batchSize {
274-
if err := batchPut(ctx, keys); err != nil {
275-
return err
276-
}
277-
keys = keys[:0]
278-
}
279-
return nil
280-
}
281-
282242
// Read all the keys from the channel and write them to the altDs
283243
loop:
284244
for {
@@ -287,21 +247,21 @@ loop:
287247
return ctx.Err()
288248
case <-s.done:
289249
return ErrKeyStoreClosed
290-
case mhs := <-s.resetSync:
291-
if err := processNewKeys(mhs...); err != nil {
292-
return err
293-
}
294250
case c, ok := <-keysChan:
295251
if !ok {
296252
break loop
297253
}
298-
if err := processNewKeys(c.Hash()); err != nil {
299-
return err
254+
keys = append(keys, c.Hash())
255+
if len(keys) >= s.batchSize {
256+
if err := s.altPut(ctx, keys); err != nil {
257+
return err
258+
}
259+
keys = keys[:0]
300260
}
301261
}
302262
}
303263
// Put final batch
304-
if err := batchPut(ctx, keys); err != nil {
264+
if err := s.altPut(ctx, keys); err != nil {
305265
return err
306266
}
307267
success = true

0 commit comments

Comments
 (0)