Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/image/controller/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func NewScheduledImageStreamController(client imageclient.Interface, informer im
listerSynced: informer.Informer().HasSynced,
}

controller.scheduler = NewScheduler(opts.Buckets(), bucketLimiter, controller.syncTimed)
controller.scheduler = newScheduler(opts.Buckets(), bucketLimiter, controller.syncTimed)

informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addImageStream,
Expand Down
2 changes: 1 addition & 1 deletion pkg/image/controller/scheduled_image_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type ScheduledImageStreamController struct {
rateLimiter flowcontrol.RateLimiter

// scheduler for timely image re-imports
scheduler *Scheduler
scheduler *scheduler
}

// Importing is invoked when the controller decides to import a stream in order to push back
Expand Down
15 changes: 9 additions & 6 deletions pkg/image/controller/scheduled_image_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ func TestScheduledImport(t *testing.T) {
}

// encountering a not found error for image streams should drop the stream
sched.scheduler.RunOnce() // we need to run it twice since we have 2 buckets
sched.scheduler.RunOnce()
for i := 0; i < 3; i++ { // loop all the buckets (2 + the additional internal one)
sched.scheduler.RunOnce()
}
if sched.scheduler.Len() != 0 {
t.Fatalf("should have removed item in scheduler: %#v", sched.scheduler)
}
Expand All @@ -72,8 +73,9 @@ func TestScheduledImport(t *testing.T) {
isInformer.Informer().GetIndexer().Add(stream)

// run a background import
sched.scheduler.RunOnce() // we need to run it twice since we have 2 buckets
sched.scheduler.RunOnce()
for i := 0; i < 3; i++ { // loop all the buckets (2 + the additional internal one)
sched.scheduler.RunOnce()
}
if sched.scheduler.Len() != 1 {
t.Fatalf("should have left item in scheduler: %#v", sched.scheduler)
}
Expand All @@ -85,8 +87,9 @@ func TestScheduledImport(t *testing.T) {
sched.enabled = false
fake.ClearActions()

sched.scheduler.RunOnce() // we need to run it twice since we have 2 buckets
sched.scheduler.RunOnce()
for i := 0; i < 3; i++ { // loop all the buckets (2 + the additional internal one)
sched.scheduler.RunOnce()
}
if sched.scheduler.Len() != 0 {
t.Fatalf("should have removed item from scheduler: %#v", sched.scheduler)
}
Expand Down
43 changes: 24 additions & 19 deletions pkg/image/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@ import (
"k8s.io/client-go/util/flowcontrol"
)

// Scheduler is a self-balancing, rate-limited, bucketed queue that can periodically invoke
// NOTE: scheduler's semantics do not lend it for reuse elsewhere and its use in
// this package quite probably has some odd corner cases/race conditions. If
// these cause problems in the future, this implementation should be replaced
// with a new and simpler one based on container/heap. End users looking for a
// component like this: see if k8s.io/client-go/util/workqueue.NewDelayingQueue
// suits your needs.

// scheduler is a self-balancing, rate-limited, bucketed queue that can periodically invoke
// an action on all items in a bucket before moving to the next bucket. A ratelimiter sets
// an upper bound on the number of buckets processed per unit time. The queue has a key and a
// value, so both uniqueness and equality can be tested (key must be unique, value can carry
// info for the next processing). Items remain in the queue until removed by a call to Remove().
type Scheduler struct {
type scheduler struct {
handle func(key, value interface{})
position int
limiter flowcontrol.RateLimiter
Expand All @@ -23,32 +30,32 @@ type Scheduler struct {

type bucket map[interface{}]interface{}

// NewScheduler creates a scheduler with bucketCount buckets, a rate limiter for restricting
// newScheduler creates a scheduler with bucketCount buckets, a rate limiter for restricting
// the rate at which buckets are processed, and a function to invoke when items are scanned in
// a bucket.
// TODO: remove DEBUG statements from this file once this logic has been adequately validated.
func NewScheduler(bucketCount int, bucketLimiter flowcontrol.RateLimiter, fn func(key, value interface{})) *Scheduler {
func newScheduler(bucketCount int, bucketLimiter flowcontrol.RateLimiter, fn func(key, value interface{})) *scheduler {
// Add one more bucket to serve as the "current" bucket
bucketCount++
buckets := make([]bucket, bucketCount)
for i := range buckets {
buckets[i] = make(bucket)
}
return &Scheduler{
return &scheduler{
handle: fn,
buckets: buckets,
limiter: bucketLimiter,
}
}

// RunUntil launches the scheduler until ch is closed.
func (s *Scheduler) RunUntil(ch <-chan struct{}) {
func (s *scheduler) RunUntil(ch <-chan struct{}) {
go utilwait.Until(s.RunOnce, 0, ch)
}

// RunOnce takes a single item out of the current bucket and processes it. If
// the bucket is empty, we wait for the rate limiter before returning.
func (s *Scheduler) RunOnce() {
func (s *scheduler) RunOnce() {
key, value, last := s.next()
if last {
s.limiter.Accept()
Expand All @@ -58,27 +65,25 @@ func (s *Scheduler) RunOnce() {
}

// at returns the bucket index relative to the current bucket.
func (s *Scheduler) at(inc int) int {
func (s *scheduler) at(inc int) int {
return (s.position + inc + len(s.buckets)) % len(s.buckets)
}

// next takes a key from the current bucket and places it in the last bucket, returns the
// removed key. Returns true if the current bucket is empty and no key and value were returned.
func (s *Scheduler) next() (interface{}, interface{}, bool) {
func (s *scheduler) next() (interface{}, interface{}, bool) {
s.mu.Lock()
defer s.mu.Unlock()

last := s.buckets[s.position]
if len(last) == 0 {
s.position = s.at(1)
last = s.buckets[s.position]
}

// Grab the first item in the bucket, move it to the end and return it.
for k, v := range last {
delete(last, k)
s.buckets[s.at(-1)][k] = v
return k, v, false
}
// The bucket was empty. Advance to the next bucket.
s.position = s.at(1)
return nil, nil, true
}

Expand All @@ -87,7 +92,7 @@ func (s *Scheduler) next() (interface{}, interface{}, bool) {
// removes the previous key and value and will place the item in a new bucket. This allows callers to ensure
// that Add'ing a new item to the queue purges old versions of the item, while Remove can be conditional on
// removing only the known old version.
func (s *Scheduler) Add(key, value interface{}) {
func (s *scheduler) Add(key, value interface{}) {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -116,7 +121,7 @@ func (s *Scheduler) Add(key, value interface{}) {

// Remove takes the key out of all buckets. If value is non-nil, the key will only be removed if it has
// the same value. Returns true if the key was removed.
func (s *Scheduler) Remove(key, value interface{}) bool {
func (s *scheduler) Remove(key, value interface{}) bool {
s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -134,7 +139,7 @@ func (s *Scheduler) Remove(key, value interface{}) bool {
}

// Delay moves the key to the end of the chain if it exists.
func (s *Scheduler) Delay(key interface{}) {
func (s *scheduler) Delay(key interface{}) {
s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -151,7 +156,7 @@ func (s *Scheduler) Delay(key interface{}) {
}

// Len returns the number of scheduled items.
func (s *Scheduler) Len() int {
func (s *scheduler) Len() int {
s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -164,7 +169,7 @@ func (s *Scheduler) Len() int {

// Map returns a copy of the scheduler contents, but does not copy the keys or values themselves.
// If values and keys are not immutable, changing the value will affect the value in the queue.
func (s *Scheduler) Map() map[interface{}]interface{} {
func (s *scheduler) Map() map[interface{}]interface{} {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down
123 changes: 113 additions & 10 deletions pkg/image/controller/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package controller

import (
"container/heap"
"fmt"
"reflect"
"sync"
"testing"
"time"

"github.com/juju/ratelimit"
flowcontrol "k8s.io/client-go/util/flowcontrol"
)

func TestScheduler(t *testing.T) {
keys := []string{}
s := NewScheduler(2, flowcontrol.NewFakeAlwaysRateLimiter(), func(key, value interface{}) {
s := newScheduler(2, flowcontrol.NewFakeAlwaysRateLimiter(), func(key, value interface{}) {
keys = append(keys, key.(string))
})

Expand Down Expand Up @@ -39,14 +44,11 @@ func TestScheduler(t *testing.T) {
t.Fatal("expected to find key in a bucket")
}

for i := 0; i < 10; i++ {
s.Delay("first")
if _, ok := s.buckets[(s.position-1+len(s.buckets))%len(s.buckets)]["first"]; !ok {
t.Fatal("key was not in the last bucket")
}
}
s.Delay("first")

s.RunOnce()
for i := 0; i < 2; i++ { // Delay shouldn't have put the item in the current bucket
s.RunOnce()
}
if len(keys) != 0 {
t.Fatal(keys)
}
Expand All @@ -57,7 +59,7 @@ func TestScheduler(t *testing.T) {
}

func TestSchedulerAddAndDelay(t *testing.T) {
s := NewScheduler(3, flowcontrol.NewFakeAlwaysRateLimiter(), func(key, value interface{}) {})
s := newScheduler(3, flowcontrol.NewFakeAlwaysRateLimiter(), func(key, value interface{}) {})
// 3 is the last bucket, 0 is the current bucket
s.Add("first", "other")
if s.buckets[3]["first"] != "other" {
Expand Down Expand Up @@ -103,7 +105,7 @@ func TestSchedulerAddAndDelay(t *testing.T) {
}

func TestSchedulerRemove(t *testing.T) {
s := NewScheduler(2, flowcontrol.NewFakeAlwaysRateLimiter(), func(key, value interface{}) {})
s := newScheduler(2, flowcontrol.NewFakeAlwaysRateLimiter(), func(key, value interface{}) {})
s.Add("test", "other")
if s.Remove("test", "value") {
t.Fatal(s)
Expand All @@ -126,3 +128,104 @@ func TestSchedulerRemove(t *testing.T) {
t.Fatal(s)
}
}

type int64Heap []int64

var _ heap.Interface = &int64Heap{}

func (h int64Heap) Len() int { return len(h) }
func (h int64Heap) Less(i, j int) bool { return h[i] < h[j] }
func (h int64Heap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *int64Heap) Push(x interface{}) { *h = append(*h, x.(int64)) }
func (h *int64Heap) Pop() interface{} {
x := (*h)[len(*h)-1]
*h = (*h)[:len(*h)-1]
return x
}

type wallClock struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does not appear to be used


var _ ratelimit.Clock = &wallClock{}

func (c wallClock) Now() time.Time { return time.Now() }
func (c wallClock) Sleep(d time.Duration) { time.Sleep(d) }

// fakeClock implements ratelimit.Clock. Its time starts at the UNIX epoch.
// When all known threads are in Sleep(), its time advances just enough to wake
// the first thread (or threads) due to wake.
type fakeClock struct {
c sync.Cond
threads int
now int64
wake int64Heap
}

var _ ratelimit.Clock = &fakeClock{}

func newFakeClock(threads int) ratelimit.Clock {
return &fakeClock{threads: threads, c: sync.Cond{L: &sync.Mutex{}}}
}

func (c *fakeClock) Now() time.Time {
c.c.L.Lock()
defer c.c.L.Unlock()
return time.Unix(0, c.now)
}

func (c *fakeClock) Sleep(d time.Duration) {
c.c.L.Lock()
defer c.c.L.Unlock()
wake := c.now + int64(d)
heap.Push(&c.wake, wake)
if len(c.wake) == c.threads {
// everyone is asleep, advance the clock.
c.now = heap.Pop(&c.wake).(int64)
for len(c.wake) > 0 && c.wake[0] == c.now {
// pop any additional threads waiting on the same time.
heap.Pop(&c.wake)
}
c.c.Broadcast()
}
for c.now < wake {
c.c.Wait()
}
}

func TestSchedulerSanity(t *testing.T) {
const (
buckets = 4
items = 10
)

// if needed for testing, you can revert to using the wall clock via
// clock := &wallClock{}
clock := newFakeClock(2) // 2 threads: us and the scheduler.

// 1 token per second => one bucket's worth of items should get scheduled
// per second.
limiter := flowcontrol.NewTokenBucketRateLimiterWithClock(1, 1, clock)

m := map[int]int{}
s := newScheduler(buckets, limiter, func(key, value interface{}) {
fmt.Printf("%v: %v\n", clock.Now().UTC(), key)
m[key.(int)]++
})
for i := 0; i < items; i++ {
s.Add(i, nil)
}

go s.RunUntil(make(chan struct{}))

// run the clock just long enough to expect to have scheduled each item
// exactly twice.
clock.Sleep((2*buckets-1)*time.Second + 1)

expected := map[int]int{}
for i := 0; i < items; i++ {
expected[i] = 2
}

if !reflect.DeepEqual(m, expected) {
t.Errorf("m did not match expected: %#v\n", m)
}
}