Skip to content

Commit 74864cb

Browse files
authored
Revert "Using atomic instead of mutex and delete scratch slice" (#1846)
* Remove a redundant field and clarify the comments. * Revert "Using atomic instead of mutex and delete scratch slice (#1833)" This reverts commit 19c50cd.
1 parent 1d8ad87 commit 74864cb

File tree

1 file changed

+91
-74
lines changed

1 file changed

+91
-74
lines changed

workerpool.go

Lines changed: 91 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"runtime"
77
"strings"
88
"sync"
9-
"sync/atomic"
109
"time"
1110
)
1211

@@ -22,57 +21,29 @@ type workerPool struct {
2221

2322
// Function for serving server connections.
2423
// It must leave c unclosed.
25-
ready workerChanStack
2624
WorkerFunc ServeHandler
2725

2826
stopCh chan struct{}
2927

3028
connState func(net.Conn, ConnState)
3129

30+
ready []*workerChan
31+
3232
MaxWorkersCount int
3333

3434
MaxIdleWorkerDuration time.Duration
3535

36-
workersCount int32
36+
workersCount int
3737

38-
mustStop atomic.Bool
38+
lock sync.Mutex
3939

4040
LogAllErrors bool
41+
mustStop bool
4142
}
4243

4344
type workerChan struct {
44-
next *workerChan
45-
46-
ch chan net.Conn
47-
48-
lastUseTime int64
49-
}
50-
51-
type workerChanStack struct {
52-
head atomic.Pointer[workerChan]
53-
}
54-
55-
func (s *workerChanStack) push(ch *workerChan) {
56-
for {
57-
oldHead := s.head.Load()
58-
ch.next = oldHead
59-
if s.head.CompareAndSwap(oldHead, ch) {
60-
break
61-
}
62-
}
63-
}
64-
65-
func (s *workerChanStack) pop() *workerChan {
66-
for {
67-
oldHead := s.head.Load()
68-
if oldHead == nil {
69-
return nil
70-
}
71-
72-
if s.head.CompareAndSwap(oldHead, oldHead.next) {
73-
return oldHead
74-
}
75-
}
45+
lastUseTime time.Time
46+
ch chan net.Conn
7647
}
7748

7849
func (wp *workerPool) Start() {
@@ -87,8 +58,9 @@ func (wp *workerPool) Start() {
8758
}
8859
}
8960
go func() {
61+
var scratch []*workerChan
9062
for {
91-
wp.clean()
63+
wp.clean(&scratch)
9264
select {
9365
case <-stopCh:
9466
return
@@ -109,15 +81,15 @@ func (wp *workerPool) Stop() {
10981
// Stop all the workers waiting for incoming connections.
11082
// Do not wait for busy workers - they will stop after
11183
// serving the connection and noticing wp.mustStop = true.
112-
113-
for {
114-
ch := wp.ready.pop()
115-
if ch == nil {
116-
break
117-
}
118-
ch.ch <- nil
84+
wp.lock.Lock()
85+
ready := wp.ready
86+
for i := range ready {
87+
ready[i].ch <- nil
88+
ready[i] = nil
11989
}
120-
wp.mustStop.Store(true)
90+
wp.ready = ready[:0]
91+
wp.mustStop = true
92+
wp.lock.Unlock()
12193
}
12294

12395
func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
@@ -127,22 +99,50 @@ func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
12799
return wp.MaxIdleWorkerDuration
128100
}
129101

130-
func (wp *workerPool) clean() {
102+
func (wp *workerPool) clean(scratch *[]*workerChan) {
131103
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
132-
criticalTime := time.Now().Add(-maxIdleWorkerDuration).UnixNano()
133104

134-
for {
135-
current := wp.ready.head.Load()
136-
if current == nil || atomic.LoadInt64(&current.lastUseTime) >= criticalTime {
137-
break
138-
}
105+
// Clean least recently used workers if they didn't serve connections
106+
// for more than maxIdleWorkerDuration.
107+
criticalTime := time.Now().Add(-maxIdleWorkerDuration)
108+
109+
wp.lock.Lock()
110+
ready := wp.ready
111+
n := len(ready)
139112

140-
next := current.next
141-
if wp.ready.head.CompareAndSwap(current, next) {
142-
current.ch <- nil
143-
wp.workerChanPool.Put(current)
113+
// Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
114+
l, r := 0, n-1
115+
for l <= r {
116+
mid := (l + r) / 2
117+
if criticalTime.After(wp.ready[mid].lastUseTime) {
118+
l = mid + 1
119+
} else {
120+
r = mid - 1
144121
}
145122
}
123+
i := r
124+
if i == -1 {
125+
wp.lock.Unlock()
126+
return
127+
}
128+
129+
*scratch = append((*scratch)[:0], ready[:i+1]...)
130+
m := copy(ready, ready[i+1:])
131+
for i = m; i < n; i++ {
132+
ready[i] = nil
133+
}
134+
wp.ready = ready[:m]
135+
wp.lock.Unlock()
136+
137+
// Notify obsolete workers to stop.
138+
// This notification must be outside the wp.lock, since ch.ch
139+
// may be blocking and may consume a lot of time if many workers
140+
// are located on non-local CPUs.
141+
tmp := *scratch
142+
for i := range tmp {
143+
tmp[i].ch <- nil
144+
tmp[i] = nil
145+
}
146146
}
147147

148148
func (wp *workerPool) Serve(c net.Conn) bool {
@@ -169,32 +169,47 @@ var workerChanCap = func() int {
169169
}()
170170

171171
func (wp *workerPool) getCh() *workerChan {
172-
for {
173-
ch := wp.ready.pop()
174-
if ch != nil {
175-
return ch
172+
var ch *workerChan
173+
createWorker := false
174+
175+
wp.lock.Lock()
176+
ready := wp.ready
177+
n := len(ready) - 1
178+
if n < 0 {
179+
if wp.workersCount < wp.MaxWorkersCount {
180+
createWorker = true
181+
wp.workersCount++
176182
}
183+
} else {
184+
ch = ready[n]
185+
ready[n] = nil
186+
wp.ready = ready[:n]
187+
}
188+
wp.lock.Unlock()
177189

178-
currentWorkers := atomic.LoadInt32(&wp.workersCount)
179-
if int(currentWorkers) < wp.MaxWorkersCount {
180-
if atomic.CompareAndSwapInt32(&wp.workersCount, currentWorkers, currentWorkers+1) {
181-
ch = wp.workerChanPool.Get().(*workerChan)
182-
go wp.workerFunc(ch)
183-
return ch
184-
}
185-
} else {
186-
break
190+
if ch == nil {
191+
if !createWorker {
192+
return nil
187193
}
194+
vch := wp.workerChanPool.Get()
195+
ch = vch.(*workerChan)
196+
go func() {
197+
wp.workerFunc(ch)
198+
wp.workerChanPool.Put(vch)
199+
}()
188200
}
189-
return nil
201+
return ch
190202
}
191203

192204
func (wp *workerPool) release(ch *workerChan) bool {
193-
atomic.StoreInt64(&ch.lastUseTime, time.Now().UnixNano())
194-
if wp.mustStop.Load() {
205+
ch.lastUseTime = time.Now()
206+
wp.lock.Lock()
207+
if wp.mustStop {
208+
wp.lock.Unlock()
195209
return false
196210
}
197-
wp.ready.push(ch)
211+
wp.ready = append(wp.ready, ch)
212+
wp.lock.Unlock()
198213
return true
199214
}
200215

@@ -230,5 +245,7 @@ func (wp *workerPool) workerFunc(ch *workerChan) {
230245
}
231246
}
232247

233-
atomic.AddInt32(&wp.workersCount, -1)
248+
wp.lock.Lock()
249+
wp.workersCount--
250+
wp.lock.Unlock()
234251
}

0 commit comments

Comments
 (0)