Skip to content

File tree

3 files changed

+92
-44
lines changed

3 files changed

+92
-44
lines changed

pkg/util/writerlease/writerlease.go

Lines changed: 51 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ const (
6666

6767
var nowFn = time.Now
6868

69+
type work struct {
70+
id int
71+
fn WorkFunc
72+
}
73+
6974
type WriterLease struct {
7075
name string
7176
backoff wait.Backoff
@@ -74,7 +79,8 @@ type WriterLease struct {
7479
once chan struct{}
7580

7681
lock sync.Mutex
77-
queued map[string]WorkFunc
82+
id int
83+
queued map[string]*work
7884
queue workqueue.DelayingInterface
7985
state State
8086
expires time.Time
@@ -95,7 +101,7 @@ func New(leaseDuration, retryInterval time.Duration) *WriterLease {
95101
maxBackoff: leaseDuration,
96102
retryInterval: retryInterval,
97103

98-
queued: make(map[string]WorkFunc),
104+
queued: make(map[string]*work),
99105
queue: workqueue.NewDelayingQueue(),
100106
once: make(chan struct{}),
101107
}
@@ -110,7 +116,7 @@ func NewWithBackoff(name string, leaseDuration, retryInterval time.Duration, bac
110116
maxBackoff: leaseDuration,
111117
retryInterval: retryInterval,
112118

113-
queued: make(map[string]WorkFunc),
119+
queued: make(map[string]*work),
114120
queue: workqueue.NewNamedDelayingQueue(name),
115121
once: make(chan struct{}),
116122
}
@@ -122,7 +128,8 @@ func (l *WriterLease) Run(stopCh <-chan struct{}) {
122128

123129
go func() {
124130
defer utilruntime.HandleCrash()
125-
l.work()
131+
for l.work() {
132+
}
126133
glog.V(4).Infof("[%s] Worker stopped", l.name)
127134
}()
128135

@@ -154,7 +161,8 @@ func (l *WriterLease) WaitUntil(t time.Duration) (bool, bool) {
154161
func (l *WriterLease) Try(key string, fn WorkFunc) {
155162
l.lock.Lock()
156163
defer l.lock.Unlock()
157-
l.queued[key] = fn
164+
l.id++
165+
l.queued[key] = &work{fn: fn, id: l.id}
158166
if l.state == Follower {
159167
delay := l.expires.Sub(nowFn())
160168
// no matter what, always wait at least some amount of time as a follower to give the nominal
@@ -195,7 +203,7 @@ func (l *WriterLease) Remove(key string) {
195203
delete(l.queued, key)
196204
}
197205

198-
func (l *WriterLease) get(key string) WorkFunc {
206+
func (l *WriterLease) get(key string) *work {
199207
l.lock.Lock()
200208
defer l.lock.Unlock()
201209
return l.queued[key]
@@ -207,49 +215,48 @@ func (l *WriterLease) leaseState() (State, time.Time, int) {
207215
return l.state, l.expires, l.tick
208216
}
209217

210-
func (l *WriterLease) work() {
211-
for {
212-
item, shutdown := l.queue.Get()
213-
if shutdown {
214-
return
215-
}
216-
key := item.(string)
217-
218-
fn := l.get(key)
219-
if fn == nil {
220-
glog.V(4).Infof("[%s] Work item %s was cleared, done", l.name, key)
221-
l.queue.Done(key)
222-
continue
223-
}
218+
func (l *WriterLease) work() bool {
219+
item, shutdown := l.queue.Get()
220+
if shutdown {
221+
return false
222+
}
223+
key := item.(string)
224224

225-
leaseState, leaseExpires, _ := l.leaseState()
226-
if leaseState == Follower {
227-
// if we are following, continue to defer work until the lease expires
228-
if remaining := leaseExpires.Sub(nowFn()); remaining > 0 {
229-
glog.V(4).Infof("[%s] Follower, %s remaining in lease", l.name, remaining)
230-
l.queue.AddAfter(key, remaining)
231-
l.queue.Done(key)
232-
continue
233-
}
234-
glog.V(4).Infof("[%s] Lease expired, running %s", l.name, key)
235-
} else {
236-
glog.V(4).Infof("[%s] Lease owner or electing, running %s", l.name, key)
237-
}
225+
work := l.get(key)
226+
if work == nil {
227+
glog.V(4).Infof("[%s] Work item %s was cleared, done", l.name, key)
228+
l.queue.Done(key)
229+
return true
230+
}
238231

239-
isLeader, retry := fn()
240-
if retry {
241-
// come back in a bit
242-
glog.V(4).Infof("[%s] Retrying %s", l.name, key)
243-
l.queue.AddAfter(key, l.retryInterval)
232+
leaseState, leaseExpires, _ := l.leaseState()
233+
if leaseState == Follower {
234+
// if we are following, continue to defer work until the lease expires
235+
if remaining := leaseExpires.Sub(nowFn()); remaining > 0 {
236+
glog.V(4).Infof("[%s] Follower, %s remaining in lease", l.name, remaining)
237+
l.queue.AddAfter(key, remaining)
244238
l.queue.Done(key)
245-
continue
239+
return true
246240
}
241+
glog.V(4).Infof("[%s] Lease expired, running %s", l.name, key)
242+
} else {
243+
glog.V(4).Infof("[%s] Lease owner or electing, running %s", l.name, key)
244+
}
247245

248-
l.finishKey(key, isLeader)
246+
isLeader, retry := work.fn()
247+
if retry {
248+
// come back in a bit
249+
glog.V(4).Infof("[%s] Retrying %s", l.name, key)
250+
l.queue.AddAfter(key, l.retryInterval)
251+
l.queue.Done(key)
252+
return true
249253
}
254+
255+
l.finishKey(key, isLeader, work.id)
256+
return true
250257
}
251258

252-
func (l *WriterLease) finishKey(key string, isLeader bool) {
259+
func (l *WriterLease) finishKey(key string, isLeader bool, id int) {
253260
l.lock.Lock()
254261
defer l.lock.Unlock()
255262

@@ -271,7 +278,9 @@ func (l *WriterLease) finishKey(key string, isLeader bool) {
271278
}
272279
l.expires = nowFn().Add(l.nextBackoff())
273280
}
274-
delete(l.queued, key)
281+
if work, ok := l.queued[key]; ok && work.id == id {
282+
delete(l.queued, key)
283+
}
275284
// close the channel before we remove the key from the queue to prevent races in Wait
276285
if resolvedElection {
277286
close(l.once)

pkg/util/writerlease/writerlease_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,46 @@ func TestBecomeFollowerAfterRetry(t *testing.T) {
8181
}
8282
}
8383

84+
func TestRunOverlappingWork(t *testing.T) {
85+
l := New(0, 0)
86+
l.backoff.Steps = 0
87+
l.backoff.Duration = 0
88+
done := make(chan struct{})
89+
defer func() {
90+
<-done
91+
if len(l.queued) > 0 {
92+
t.Fatalf("queue was not empty on shutdown: %#v", l.queued)
93+
}
94+
}()
95+
96+
go func() {
97+
t.Logf("processing first")
98+
l.work()
99+
t.Logf("processing second")
100+
l.work()
101+
t.Logf("processing done")
102+
close(done)
103+
}()
104+
105+
first := make(chan struct{})
106+
l.Try("test", func() (bool, bool) {
107+
first <- struct{}{}
108+
t.Logf("waiting for second item to be added")
109+
first <- struct{}{}
110+
return true, false
111+
})
112+
<-first
113+
second := make(chan struct{}, 1)
114+
l.Try("test", func() (bool, bool) {
115+
second <- struct{}{}
116+
return true, false
117+
})
118+
t.Logf("second item added")
119+
<-first
120+
<-second
121+
<-done
122+
}
123+
84124
func TestExtend(t *testing.T) {
85125
nowFn = func() time.Time { return time.Unix(0, 0) }
86126
defer func() { nowFn = time.Now }()

test/extended/router/stress.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,12 +275,11 @@ var _ = g.Describe("[Conformance][Area:Networking][Feature:Router]", func() {
275275
case _, ok := <-ch:
276276
writes++
277277
o.Expect(ok).To(o.BeTrue())
278-
o.Expect(i).To(o.BeNumerically("<", 3))
279278
case <-timer.C:
280279
break Wait
281280
}
282281
}
283-
e2e.Logf("Recorded %d writes total", writes)
282+
o.Expect(writes).To(o.BeNumerically("<", 5))
284283
}()
285284
})
286285
})

0 commit comments

Comments
 (0)