Skip to content

Commit 58b7e64

Browse files
provider: batch queue
1 parent 5088386 commit 58b7e64

File tree

4 files changed

+435
-293
lines changed

4 files changed

+435
-293
lines changed

dual/provider/provider.go

Lines changed: 60 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,27 @@ package provider
33
import (
44
"context"
55
"errors"
6-
"fmt"
76

87
"github.com/ipfs/go-cid"
8+
logging "github.com/ipfs/go-log/v2"
99
dht "github.com/libp2p/go-libp2p-kad-dht"
1010
"github.com/libp2p/go-libp2p-kad-dht/dual"
11+
"github.com/libp2p/go-libp2p-kad-dht/internal/batch"
1112
"github.com/libp2p/go-libp2p-kad-dht/provider"
1213
"github.com/libp2p/go-libp2p-kad-dht/provider/datastore"
1314
mh "github.com/multiformats/go-multihash"
1415
)
1516

17+
var logger = logging.Logger(provider.LoggerName)
18+
1619
// SweepingProvider manages provides and reprovides for both DHT swarms (LAN
1720
// and WAN) in the dual DHT setup.
1821
type SweepingProvider struct {
19-
dht *dual.DHT
20-
LAN *provider.SweepingProvider
21-
WAN *provider.SweepingProvider
22-
keyStore datastore.KeyStore
22+
dht *dual.DHT
23+
lan *provider.SweepingProvider
24+
wan *provider.SweepingProvider
25+
keyStore datastore.KeyStore
26+
keyStoreQueue *batch.Processor
2327
}
2428

2529
// New creates a new SweepingProvider that manages provides and reprovides for
@@ -70,32 +74,44 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) {
7074
}
7175
}
7276

73-
return &SweepingProvider{
77+
s := &SweepingProvider{
7478
dht: d,
75-
LAN: sweepingProviders[0],
76-
WAN: sweepingProviders[1],
79+
lan: sweepingProviders[0],
80+
wan: sweepingProviders[1],
7781
keyStore: cfg.keyStore,
78-
}, nil
82+
}
83+
s.keyStoreQueue = batch.NewProcessor(func(keys []mh.Multihash, force bool) {
84+
newKeys, err := s.keyStore.Put(context.Background(), keys...)
85+
if err != nil {
86+
logger.Errorf("failed to store multihashes: %v", err)
87+
return
88+
}
89+
if !force {
90+
if len(newKeys) == 0 {
91+
return
92+
}
93+
keys = newKeys
94+
}
95+
96+
// Add to schedule and to provide queue of both systems
97+
s.runOnBoth(func(p *provider.SweepingProvider) {
98+
p.AddToScheduleAndProvide(newKeys...)
99+
})
100+
})
101+
102+
return s, nil
79103
}
80104

81105
// runOnBoth runs the provided function on both the LAN and WAN providers in
82106
// parallel and waits for both to complete.
83-
func (s *SweepingProvider) runOnBoth(f func(*provider.SweepingProvider) error) error {
84-
var errs [2]error
107+
func (s *SweepingProvider) runOnBoth(f func(*provider.SweepingProvider)) {
85108
done := make(chan struct{})
86109
go func() {
87110
defer close(done)
88-
err := f(s.LAN)
89-
if err != nil {
90-
errs[0] = fmt.Errorf("LAN provider: %w", err)
91-
}
111+
f(s.lan)
92112
}()
93-
err := f(s.WAN)
94-
if err != nil {
95-
errs[1] = fmt.Errorf("WAN provider: %w", err)
96-
}
113+
f(s.wan)
97114
<-done
98-
return errors.Join(errs[:]...)
99115
}
100116

101117
// ProvideOnce sends provider records for the specified keys to both DHT swarms
@@ -109,9 +125,9 @@ func (s *SweepingProvider) runOnBoth(f func(*provider.SweepingProvider) error) e
109125
// (either never bootstrapped, or disconnected since more than `OfflineDelay`).
110126
// The schedule and provide queue depend on the network size, hence recent
111127
// network connectivity is essential.
112-
func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) error {
113-
return s.runOnBoth(func(p *provider.SweepingProvider) error {
114-
return p.ProvideOnce(keys...)
128+
func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) {
129+
s.runOnBoth(func(p *provider.SweepingProvider) {
130+
p.ProvideOnce(keys...)
115131
})
116132
}
117133

@@ -132,22 +148,8 @@ func (s *SweepingProvider) ProvideOnce(keys ...mh.Multihash) error {
132148
// (either never bootstrapped, or disconnected since more than `OfflineDelay`).
133149
// The schedule and provide queue depend on the network size, hence recent
134150
// network connectivity is essential.
135-
func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) error {
136-
ctx := context.Background()
137-
newKeys, err := s.keyStore.Put(ctx, keys...)
138-
if err != nil {
139-
return fmt.Errorf("failed to store multihashes: %w", err)
140-
}
141-
142-
s.runOnBoth(func(p *provider.SweepingProvider) error {
143-
return p.AddToSchedule(newKeys...)
144-
})
145-
146-
if !force {
147-
keys = newKeys
148-
}
149-
150-
return s.ProvideOnce(keys...)
151+
func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) {
152+
s.keyStoreQueue.Enqueue(force, keys...)
151153
}
152154

153155
// StopProviding stops reproviding the given keys to both DHT swarms. The node
@@ -157,12 +159,12 @@ func (s *SweepingProvider) StartProviding(force bool, keys ...mh.Multihash) erro
157159
// Remove the `keys` from the schedule and return immediately. Valid records
158160
// can remain in the DHT swarms up to the provider record TTL after calling
159161
// `StopProviding`.
160-
func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) error {
162+
func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) {
163+
// TODO: batch deletes
161164
err := s.keyStore.Delete(context.Background(), keys...)
162165
if err != nil {
163-
return fmt.Errorf("failed to stop providing keys: %w", err)
166+
logger.Errorf("failed to stop providing keys: %w", err)
164167
}
165-
return nil
166168
}
167169

168170
// Clear clears the all the keys from the provide queues of both DHTs and
@@ -171,7 +173,7 @@ func (s *SweepingProvider) StopProviding(keys ...mh.Multihash) error {
171173
// The keys are not deleted from the keystore, so they will continue to be
172174
// reprovided as scheduled.
173175
func (s *SweepingProvider) Clear() int {
174-
return s.LAN.Clear() + s.WAN.Clear()
176+
return s.lan.Clear() + s.wan.Clear()
175177
}
176178

177179
// RefreshSchedule scans the KeyStore for any keys that are not currently
@@ -186,10 +188,17 @@ func (s *SweepingProvider) Clear() int {
186188
// Offline (either never bootstrapped, or disconnected since more than
187189
// `OfflineDelay`). The schedule depends on the network size, hence recent
188190
// network connectivity is essential.
189-
func (s *SweepingProvider) RefreshSchedule() error {
190-
return s.runOnBoth(func(p *provider.SweepingProvider) error {
191-
return p.RefreshSchedule()
191+
func (s *SweepingProvider) RefreshSchedule() {
192+
s.runOnBoth(func(p *provider.SweepingProvider) {
193+
p.RefreshSchedule()
194+
})
195+
}
196+
197+
func (s *SweepingProvider) Close() error {
198+
s.runOnBoth(func(p *provider.SweepingProvider) {
199+
p.Close()
192200
})
201+
return nil
193202
}
194203

195204
var (
@@ -200,9 +209,10 @@ var (
200209
// dhtProvider is the interface to ensure that SweepingProvider and
201210
// provider.SweepingProvider share the same interface.
202211
type dhtProvider interface {
203-
StartProviding(force bool, keys ...mh.Multihash) error
204-
StopProviding(keys ...mh.Multihash) error
205-
ProvideOnce(keys ...mh.Multihash) error
212+
StartProviding(force bool, keys ...mh.Multihash)
213+
StopProviding(keys ...mh.Multihash)
214+
ProvideOnce(keys ...mh.Multihash)
206215
Clear() int
207-
RefreshSchedule() error
216+
RefreshSchedule()
217+
Close() error
208218
}

internal/batch/processor.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package batch
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
7+
mh "github.com/multiformats/go-multihash"
8+
)
9+
10+
type BatchHandler func(keys []mh.Multihash, special bool)
11+
12+
const (
13+
normalQueueID uint8 = iota
14+
specialQueueID
15+
)
16+
17+
type enqueueOp struct {
18+
keys []mh.Multihash
19+
id uint8
20+
}
21+
22+
type drainOp struct {
23+
id uint8
24+
ch chan []mh.Multihash
25+
}
26+
27+
type Processor struct {
28+
close chan struct{}
29+
closeOnce sync.Once
30+
wg sync.WaitGroup
31+
enqueue chan enqueueOp
32+
drain chan drainOp
33+
34+
signals [2]chan struct{}
35+
bufs [2][]mh.Multihash
36+
37+
f BatchHandler
38+
}
39+
40+
func NewProcessor(f BatchHandler) *Processor {
41+
q := &Processor{
42+
close: make(chan struct{}),
43+
wg: sync.WaitGroup{},
44+
enqueue: make(chan enqueueOp),
45+
drain: make(chan drainOp),
46+
47+
signals: [2]chan struct{}{make(chan struct{}, 1), make(chan struct{}, 1)},
48+
bufs: [2][]mh.Multihash{make([]mh.Multihash, 0), make([]mh.Multihash, 0)},
49+
f: f,
50+
}
51+
q.wg.Add(2)
52+
go q.in()
53+
go q.out()
54+
return q
55+
}
56+
57+
func (q *Processor) in() {
58+
defer q.wg.Done()
59+
60+
for {
61+
select {
62+
case <-q.close:
63+
return
64+
case op := <-q.enqueue:
65+
q.bufs[op.id] = append(q.bufs[op.id], op.keys...)
66+
// Signal there are keys in the queue.
67+
select {
68+
case q.signals[op.id] <- struct{}{}:
69+
default:
70+
}
71+
case op := <-q.drain:
72+
op.ch <- q.bufs[op.id]
73+
q.bufs[op.id] = make([]mh.Multihash, 0)
74+
}
75+
}
76+
}
77+
78+
func (q *Processor) out() {
79+
defer q.wg.Done()
80+
81+
ch := make(chan []mh.Multihash)
82+
handle := func(id uint8) {
83+
select {
84+
case <-q.close:
85+
return
86+
case q.drain <- drainOp{id: id, ch: ch}:
87+
}
88+
89+
var keys []mh.Multihash
90+
select {
91+
case <-q.close:
92+
return
93+
case keys = <-ch:
94+
}
95+
96+
// Run callback
97+
q.f(keys, id == specialQueueID)
98+
}
99+
100+
for {
101+
select {
102+
case <-q.close:
103+
return
104+
case <-q.signals[normalQueueID]:
105+
handle(normalQueueID)
106+
case <-q.signals[specialQueueID]:
107+
handle(specialQueueID)
108+
}
109+
}
110+
}
111+
112+
func (q *Processor) Close() error {
113+
q.closeOnce.Do(func() {
114+
close(q.enqueue)
115+
close(q.close)
116+
})
117+
return nil
118+
}
119+
120+
func (q *Processor) Enqueue(special bool, keys ...mh.Multihash) (err error) {
121+
id := normalQueueID
122+
if special {
123+
id = specialQueueID
124+
}
125+
defer func() {
126+
if r := recover(); r != nil {
127+
err = fmt.Errorf("failed to enqueue item: %s", r)
128+
}
129+
}()
130+
q.enqueue <- enqueueOp{keys: keys, id: id}
131+
return
132+
}

0 commit comments

Comments
 (0)