Skip to content

Commit 4917c6b

Browse files
author
Jim Minter
committed
stop scheduler from advancing through empty buckets without accept from ratelimiter
1 parent 56efdb0 commit 4917c6b

File tree

3 files changed

+118
-18
lines changed

3 files changed

+118
-18
lines changed

pkg/image/controller/scheduled_image_controller_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@ func TestScheduledImport(t *testing.T) {
5757
}
5858

5959
// encountering a not found error for image streams should drop the stream
60-
sched.scheduler.RunOnce() // we need to run it twice since we have 2 buckets
61-
sched.scheduler.RunOnce()
60+
for i := 0; i < 3; i++ { // loop all the buckets (2 + the additional internal one)
61+
sched.scheduler.RunOnce()
62+
}
6263
if sched.scheduler.Len() != 0 {
6364
t.Fatalf("should have removed item in scheduler: %#v", sched.scheduler)
6465
}
@@ -72,8 +73,9 @@ func TestScheduledImport(t *testing.T) {
7273
isInformer.Informer().GetIndexer().Add(stream)
7374

7475
// run a background import
75-
sched.scheduler.RunOnce() // we need to run it twice since we have 2 buckets
76-
sched.scheduler.RunOnce()
76+
for i := 0; i < 3; i++ { // loop all the buckets (2 + the additional internal one)
77+
sched.scheduler.RunOnce()
78+
}
7779
if sched.scheduler.Len() != 1 {
7880
t.Fatalf("should have left item in scheduler: %#v", sched.scheduler)
7981
}
@@ -85,8 +87,9 @@ func TestScheduledImport(t *testing.T) {
8587
sched.enabled = false
8688
fake.ClearActions()
8789

88-
sched.scheduler.RunOnce() // we need to run it twice since we have 2 buckets
89-
sched.scheduler.RunOnce()
90+
for i := 0; i < 3; i++ { // loop all the buckets (2 + the additional internal one)
91+
sched.scheduler.RunOnce()
92+
}
9093
if sched.scheduler.Len() != 0 {
9194
t.Fatalf("should have removed item from scheduler: %#v", sched.scheduler)
9295
}

pkg/image/controller/scheduler.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,12 @@ func (s *Scheduler) next() (interface{}, interface{}, bool) {
6969
defer s.mu.Unlock()
7070

7171
last := s.buckets[s.position]
72-
if len(last) == 0 {
73-
s.position = s.at(1)
74-
last = s.buckets[s.position]
75-
}
76-
7772
for k, v := range last {
7873
delete(last, k)
7974
s.buckets[s.at(-1)][k] = v
8075
return k, v, false
8176
}
77+
s.position = s.at(1)
8278
return nil, nil, true
8379
}
8480

pkg/image/controller/scheduler_test.go

Lines changed: 108 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
package controller
22

33
import (
4+
"container/heap"
5+
"fmt"
46
"reflect"
7+
"sync"
58
"testing"
9+
"time"
610

11+
"github.com/juju/ratelimit"
712
flowcontrol "k8s.io/client-go/util/flowcontrol"
813
)
914

@@ -39,14 +44,11 @@ func TestScheduler(t *testing.T) {
3944
t.Fatal("expected to find key in a bucket")
4045
}
4146

42-
for i := 0; i < 10; i++ {
43-
s.Delay("first")
44-
if _, ok := s.buckets[(s.position-1+len(s.buckets))%len(s.buckets)]["first"]; !ok {
45-
t.Fatal("key was not in the last bucket")
46-
}
47-
}
47+
s.Delay("first")
4848

49-
s.RunOnce()
49+
for i := 0; i < 2; i++ { // Delay shouldn't have put the item in the current bucket
50+
s.RunOnce()
51+
}
5052
if len(keys) != 0 {
5153
t.Fatal(keys)
5254
}
@@ -126,3 +128,102 @@ func TestSchedulerRemove(t *testing.T) {
126128
t.Fatal(s)
127129
}
128130
}
131+
132+
type int64Heap []int64
133+
134+
var _ heap.Interface = &int64Heap{}
135+
136+
func (h int64Heap) Len() int { return len(h) }
137+
func (h int64Heap) Less(i, j int) bool { return h[i] < h[j] }
138+
func (h int64Heap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
139+
func (h *int64Heap) Push(x interface{}) { *h = append(*h, x.(int64)) }
140+
func (h *int64Heap) Pop() interface{} {
141+
x := (*h)[len(*h)-1]
142+
*h = (*h)[:len(*h)-1]
143+
return x
144+
}
145+
146+
type wallClock struct{}
147+
148+
var _ ratelimit.Clock = &wallClock{}
149+
150+
func (c wallClock) Now() time.Time { return time.Now() }
151+
func (c wallClock) Sleep(d time.Duration) { time.Sleep(d) }
152+
153+
// fakeClock implements ratelimit.Clock. Its time starts at the UNIX epoch.
154+
// When all known threads are in Sleep(), its time advances just enough to wake
155+
// the first thread (or threads) due to wake.
156+
type fakeClock struct {
157+
c sync.Cond
158+
threads int
159+
now int64
160+
wake int64Heap
161+
}
162+
163+
var _ ratelimit.Clock = &fakeClock{}
164+
165+
func newFakeClock(threads int) ratelimit.Clock {
166+
return &fakeClock{threads: threads, c: sync.Cond{L: &sync.Mutex{}}}
167+
}
168+
169+
func (c *fakeClock) Now() time.Time {
170+
c.c.L.Lock()
171+
defer c.c.L.Unlock()
172+
return time.Unix(0, c.now)
173+
}
174+
175+
func (c *fakeClock) Sleep(d time.Duration) {
176+
c.c.L.Lock()
177+
defer c.c.L.Unlock()
178+
wake := c.now + int64(d)
179+
heap.Push(&c.wake, wake)
180+
if len(c.wake) == c.threads {
181+
// everyone is asleep, advance the clock.
182+
c.now = heap.Pop(&c.wake).(int64)
183+
for len(c.wake) > 0 && c.wake[0] == c.now {
184+
// pop any additional threads waiting on the same time.
185+
heap.Pop(&c.wake)
186+
}
187+
c.c.Broadcast()
188+
}
189+
for c.now < wake {
190+
c.c.Wait()
191+
}
192+
}
193+
194+
func TestSchedulerSanity(t *testing.T) {
195+
const (
196+
buckets = 4
197+
items = 10
198+
)
199+
200+
clock := newFakeClock(2) // 2 threads: us and the scheduler.
201+
202+
// 1 token per second => one bucket's worth of items should get scheduled
203+
// per second.
204+
limiter := flowcontrol.NewTokenBucketRateLimiterWithClock(1, 1, clock)
205+
206+
m := map[int]int{}
207+
s := NewScheduler(buckets, limiter, func(key, value interface{}) {
208+
fmt.Printf("%v: %v\n", clock.Now().UTC(), key)
209+
m[key.(int)]++
210+
})
211+
for i := 0; i < items; i++ {
212+
s.Add(i, nil)
213+
}
214+
215+
go s.RunUntil(make(chan struct{}))
216+
217+
// run the clock just long enough to expect to have scheduled each item
218+
// exactly twice.
219+
clock.Sleep((2*buckets-1)*time.Second + 1)
220+
221+
expected := map[int]int{}
222+
for i := 0; i < items; i++ {
223+
expected[i] = 2
224+
}
225+
226+
if !reflect.DeepEqual(m, expected) {
227+
t.Errorf("m did not match expected: %#v\n", m)
228+
}
229+
}

0 commit comments

Comments
 (0)