Skip to content

Commit 519b94b

Browse files
author
Jim Minter
committed
unexport scheduler and add health warning
1 parent aeddc60 commit 519b94b

File tree

4 files changed

+27
-20
lines changed

4 files changed

+27
-20
lines changed

pkg/image/controller/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func NewScheduledImageStreamController(client imageclient.Interface, informer im
9292
listerSynced: informer.Informer().HasSynced,
9393
}
9494

95-
controller.scheduler = NewScheduler(opts.Buckets(), bucketLimiter, controller.syncTimed)
95+
controller.scheduler = newScheduler(opts.Buckets(), bucketLimiter, controller.syncTimed)
9696

9797
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
9898
AddFunc: controller.addImageStream,

pkg/image/controller/scheduled_image_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type ScheduledImageStreamController struct {
3636
rateLimiter flowcontrol.RateLimiter
3737

3838
// scheduler for timely image re-imports
39-
scheduler *Scheduler
39+
scheduler *scheduler
4040
}
4141

4242
// Importing is invoked when the controller decides to import a stream in order to push back

pkg/image/controller/scheduler.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,19 @@ import (
77
"k8s.io/client-go/util/flowcontrol"
88
)
99

10-
// Scheduler is a self-balancing, rate-limited, bucketed queue that can periodically invoke
10+
// NOTE: scheduler's semantics do not lend it for reuse elsewhere and its use in
11+
// this package quite probably has some odd corner cases/race conditions. If
12+
// these cause problems in the future, this implementation should be replaced
13+
// with a new and simpler one based on container/heap. End users looking for a
14+
// component like this: see if k8s.io/client-go/util/workqueue.NewDelayingQueue
15+
// suits your needs.
16+
17+
// scheduler is a self-balancing, rate-limited, bucketed queue that can periodically invoke
1118
// an action on all items in a bucket before moving to the next bucket. A ratelimiter sets
1219
// an upper bound on the number of buckets processed per unit time. The queue has a key and a
1320
// value, so both uniqueness and equality can be tested (key must be unique, value can carry
1421
// info for the next processing). Items remain in the queue until removed by a call to Remove().
15-
type Scheduler struct {
22+
type scheduler struct {
1623
handle func(key, value interface{})
1724
position int
1825
limiter flowcontrol.RateLimiter
@@ -23,32 +30,32 @@ type Scheduler struct {
2330

2431
type bucket map[interface{}]interface{}
2532

26-
// NewScheduler creates a scheduler with bucketCount buckets, a rate limiter for restricting
33+
// newScheduler creates a scheduler with bucketCount buckets, a rate limiter for restricting
2734
// the rate at which buckets are processed, and a function to invoke when items are scanned in
2835
// a bucket.
2936
// TODO: remove DEBUG statements from this file once this logic has been adequately validated.
30-
func NewScheduler(bucketCount int, bucketLimiter flowcontrol.RateLimiter, fn func(key, value interface{})) *Scheduler {
37+
func newScheduler(bucketCount int, bucketLimiter flowcontrol.RateLimiter, fn func(key, value interface{})) *scheduler {
3138
// Add one more bucket to serve as the "current" bucket
3239
bucketCount++
3340
buckets := make([]bucket, bucketCount)
3441
for i := range buckets {
3542
buckets[i] = make(bucket)
3643
}
37-
return &Scheduler{
44+
return &scheduler{
3845
handle: fn,
3946
buckets: buckets,
4047
limiter: bucketLimiter,
4148
}
4249
}
4350

4451
// RunUntil launches the scheduler until ch is closed.
45-
func (s *Scheduler) RunUntil(ch <-chan struct{}) {
52+
func (s *scheduler) RunUntil(ch <-chan struct{}) {
4653
go utilwait.Until(s.RunOnce, 0, ch)
4754
}
4855

4956
// RunOnce takes a single item out of the current bucket and processes it. If
5057
// the bucket is empty, we wait for the rate limiter before returning.
51-
func (s *Scheduler) RunOnce() {
58+
func (s *scheduler) RunOnce() {
5259
key, value, last := s.next()
5360
if last {
5461
s.limiter.Accept()
@@ -58,13 +65,13 @@ func (s *Scheduler) RunOnce() {
5865
}
5966

6067
// at returns the bucket index relative to the current bucket.
61-
func (s *Scheduler) at(inc int) int {
68+
func (s *scheduler) at(inc int) int {
6269
return (s.position + inc + len(s.buckets)) % len(s.buckets)
6370
}
6471

6572
// next takes a key from the current bucket and places it in the last bucket, returns the
6673
// removed key. Returns true if the current bucket is empty and no key and value were returned.
67-
func (s *Scheduler) next() (interface{}, interface{}, bool) {
74+
func (s *scheduler) next() (interface{}, interface{}, bool) {
6875
s.mu.Lock()
6976
defer s.mu.Unlock()
7077

@@ -85,7 +92,7 @@ func (s *Scheduler) next() (interface{}, interface{}, bool) {
8592
// removes the previous key and value and will place the item in a new bucket. This allows callers to ensure
8693
// that Add'ing a new item to the queue purges old versions of the item, while Remove can be conditional on
8794
// removing only the known old version.
88-
func (s *Scheduler) Add(key, value interface{}) {
95+
func (s *scheduler) Add(key, value interface{}) {
8996
s.mu.Lock()
9097
defer s.mu.Unlock()
9198

@@ -114,7 +121,7 @@ func (s *Scheduler) Add(key, value interface{}) {
114121

115122
// Remove takes the key out of all buckets. If value is non-nil, the key will only be removed if it has
116123
// the same value. Returns true if the key was removed.
117-
func (s *Scheduler) Remove(key, value interface{}) bool {
124+
func (s *scheduler) Remove(key, value interface{}) bool {
118125
s.mu.Lock()
119126
defer s.mu.Unlock()
120127

@@ -132,7 +139,7 @@ func (s *Scheduler) Remove(key, value interface{}) bool {
132139
}
133140

134141
// Delay moves the key to the end of the chain if it exists.
135-
func (s *Scheduler) Delay(key interface{}) {
142+
func (s *scheduler) Delay(key interface{}) {
136143
s.mu.Lock()
137144
defer s.mu.Unlock()
138145

@@ -149,7 +156,7 @@ func (s *Scheduler) Delay(key interface{}) {
149156
}
150157

151158
// Len returns the number of scheduled items.
152-
func (s *Scheduler) Len() int {
159+
func (s *scheduler) Len() int {
153160
s.mu.Lock()
154161
defer s.mu.Unlock()
155162

@@ -162,7 +169,7 @@ func (s *Scheduler) Len() int {
162169

163170
// Map returns a copy of the scheduler contents, but does not copy the keys or values themselves.
164171
// If values and keys are not immutable, changing the value will affect the value in the queue.
165-
func (s *Scheduler) Map() map[interface{}]interface{} {
172+
func (s *scheduler) Map() map[interface{}]interface{} {
166173
s.mu.Lock()
167174
defer s.mu.Unlock()
168175

pkg/image/controller/scheduler_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414

1515
func TestScheduler(t *testing.T) {
1616
keys := []string{}
17-
s := NewScheduler(2, flowcontrol.NewFakeAlwaysRateLimiter(), func(key, value interface{}) {
17+
s := newScheduler(2, flowcontrol.NewFakeAlwaysRateLimiter(), func(key, value interface{}) {
1818
keys = append(keys, key.(string))
1919
})
2020

@@ -59,7 +59,7 @@ func TestScheduler(t *testing.T) {
5959
}
6060

6161
func TestSchedulerAddAndDelay(t *testing.T) {
62-
s := NewScheduler(3, flowcontrol.NewFakeAlwaysRateLimiter(), func(key, value interface{}) {})
62+
s := newScheduler(3, flowcontrol.NewFakeAlwaysRateLimiter(), func(key, value interface{}) {})
6363
// 3 is the last bucket, 0 is the current bucket
6464
s.Add("first", "other")
6565
if s.buckets[3]["first"] != "other" {
@@ -105,7 +105,7 @@ func TestSchedulerAddAndDelay(t *testing.T) {
105105
}
106106

107107
func TestSchedulerRemove(t *testing.T) {
108-
s := NewScheduler(2, flowcontrol.NewFakeAlwaysRateLimiter(), func(key, value interface{}) {})
108+
s := newScheduler(2, flowcontrol.NewFakeAlwaysRateLimiter(), func(key, value interface{}) {})
109109
s.Add("test", "other")
110110
if s.Remove("test", "value") {
111111
t.Fatal(s)
@@ -204,7 +204,7 @@ func TestSchedulerSanity(t *testing.T) {
204204
limiter := flowcontrol.NewTokenBucketRateLimiterWithClock(1, 1, clock)
205205

206206
m := map[int]int{}
207-
s := NewScheduler(buckets, limiter, func(key, value interface{}) {
207+
s := newScheduler(buckets, limiter, func(key, value interface{}) {
208208
fmt.Printf("%v: %v\n", clock.Now().UTC(), key)
209209
m[key.(int)]++
210210
})

0 commit comments

Comments
 (0)