Skip to content

Commit 5e00766

Browse files
committed
replace atomic ops for depth with a channel
ioLoop() now feeds depth into the depthChan synchronously, only when Depth() is trying to read depthChan
1 parent af256ae commit 5e00766

File tree

1 file changed

+11
-9
lines changed

1 file changed

+11
-9
lines changed

diskqueue.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"os"
1212
"path"
1313
"sync"
14-
"sync/atomic"
1514
"time"
1615
)
1716

@@ -92,6 +91,7 @@ type diskQueue struct {
9291
readChan chan []byte
9392

9493
// internal channels
94+
depthChan chan int64
9595
writeChan chan []byte
9696
writeResponseChan chan error
9797
emptyChan chan int
@@ -114,6 +114,7 @@ func New(name string, dataPath string, maxBytesPerFile int64,
114114
minMsgSize: minMsgSize,
115115
maxMsgSize: maxMsgSize,
116116
readChan: make(chan []byte),
117+
depthChan: make(chan int64),
117118
writeChan: make(chan []byte),
118119
writeResponseChan: make(chan error),
119120
emptyChan: make(chan int),
@@ -137,7 +138,7 @@ func New(name string, dataPath string, maxBytesPerFile int64,
137138

138139
// Depth returns the depth of the queue
139140
func (d *diskQueue) Depth() int64 {
140-
return atomic.LoadInt64(&d.depth)
141+
return <-d.depthChan
141142
}
142143

143144
// ReadChan returns the receive-only []byte channel for reading data
@@ -256,7 +257,7 @@ func (d *diskQueue) skipToNextRWFile() error {
256257
d.readPos = 0
257258
d.nextReadFileNum = d.writeFileNum
258259
d.nextReadPos = 0
259-
atomic.StoreInt64(&d.depth, 0)
260+
d.depth = 0
260261

261262
return err
262263
}
@@ -385,7 +386,7 @@ func (d *diskQueue) writeOne(data []byte) error {
385386

386387
totalBytes := int64(4 + dataLen)
387388
d.writePos += totalBytes
388-
atomic.AddInt64(&d.depth, 1)
389+
d.depth += 1
389390

390391
if d.writePos >= d.maxBytesPerFile {
391392
d.writeFileNum++
@@ -446,7 +447,7 @@ func (d *diskQueue) retrieveMetaData() error {
446447
if err != nil {
447448
return err
448449
}
449-
atomic.StoreInt64(&d.depth, depth)
450+
d.depth = depth
450451
d.nextReadFileNum = d.readFileNum
451452
d.nextReadPos = d.readPos
452453

@@ -468,7 +469,7 @@ func (d *diskQueue) persistMetaData() error {
468469
}
469470

470471
_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
471-
atomic.LoadInt64(&d.depth),
472+
d.depth,
472473
d.readFileNum, d.readPos,
473474
d.writeFileNum, d.writePos)
474475
if err != nil {
@@ -508,7 +509,7 @@ func (d *diskQueue) checkTailCorruption(depth int64) {
508509
d.name, depth)
509510
}
510511
// force set depth 0
511-
atomic.StoreInt64(&d.depth, 0)
512+
d.depth = 0
512513
d.needSync = true
513514
}
514515

@@ -534,7 +535,7 @@ func (d *diskQueue) moveForward() {
534535
oldReadFileNum := d.readFileNum
535536
d.readFileNum = d.nextReadFileNum
536537
d.readPos = d.nextReadPos
537-
depth := atomic.AddInt64(&d.depth, -1)
538+
d.depth -= 1
538539

539540
// see if we need to clean up the old file
540541
if oldReadFileNum != d.nextReadFileNum {
@@ -548,7 +549,7 @@ func (d *diskQueue) moveForward() {
548549
}
549550
}
550551

551-
d.checkTailCorruption(depth)
552+
d.checkTailCorruption(d.depth)
552553
}
553554

554555
func (d *diskQueue) handleReadError() {
@@ -639,6 +640,7 @@ func (d *diskQueue) ioLoop() {
639640
count++
640641
// moveForward sets needSync flag if a file is removed
641642
d.moveForward()
643+
case d.depthChan <- d.depth:
642644
case <-d.emptyChan:
643645
d.emptyResponseChan <- d.deleteAllFiles()
644646
count = 0

0 commit comments

Comments
 (0)