Skip to content

Commit bcaf9bf

Browse files
authored
Merge pull request #11 from ploxiln/sync_depth
synchronized Depth call
2 parents de2db96 + 5e00766 commit bcaf9bf

File tree

3 files changed

+19
-10
lines changed

3 files changed

+19
-10
lines changed

.travis.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
language: go
22
go:
33
- 1.x
4+
script:
5+
- GOMAXPROCS=1 go test -v
6+
- GOMAXPROCS=4 go test -v -race
47
notifications:
58
email: false
6-

diskqueue.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"os"
1212
"path"
1313
"sync"
14-
"sync/atomic"
1514
"time"
1615
)
1716

@@ -92,6 +91,7 @@ type diskQueue struct {
9291
readChan chan []byte
9392

9493
// internal channels
94+
depthChan chan int64
9595
writeChan chan []byte
9696
writeResponseChan chan error
9797
emptyChan chan int
@@ -114,6 +114,7 @@ func New(name string, dataPath string, maxBytesPerFile int64,
114114
minMsgSize: minMsgSize,
115115
maxMsgSize: maxMsgSize,
116116
readChan: make(chan []byte),
117+
depthChan: make(chan int64),
117118
writeChan: make(chan []byte),
118119
writeResponseChan: make(chan error),
119120
emptyChan: make(chan int),
@@ -137,7 +138,7 @@ func New(name string, dataPath string, maxBytesPerFile int64,
137138

138139
// Depth returns the depth of the queue
139140
func (d *diskQueue) Depth() int64 {
140-
return atomic.LoadInt64(&d.depth)
141+
return <-d.depthChan
141142
}
142143

143144
// ReadChan returns the receive-only []byte channel for reading data
@@ -256,7 +257,7 @@ func (d *diskQueue) skipToNextRWFile() error {
256257
d.readPos = 0
257258
d.nextReadFileNum = d.writeFileNum
258259
d.nextReadPos = 0
259-
atomic.StoreInt64(&d.depth, 0)
260+
d.depth = 0
260261

261262
return err
262263
}
@@ -385,7 +386,7 @@ func (d *diskQueue) writeOne(data []byte) error {
385386

386387
totalBytes := int64(4 + dataLen)
387388
d.writePos += totalBytes
388-
atomic.AddInt64(&d.depth, 1)
389+
d.depth += 1
389390

390391
if d.writePos >= d.maxBytesPerFile {
391392
d.writeFileNum++
@@ -446,7 +447,7 @@ func (d *diskQueue) retrieveMetaData() error {
446447
if err != nil {
447448
return err
448449
}
449-
atomic.StoreInt64(&d.depth, depth)
450+
d.depth = depth
450451
d.nextReadFileNum = d.readFileNum
451452
d.nextReadPos = d.readPos
452453

@@ -468,7 +469,7 @@ func (d *diskQueue) persistMetaData() error {
468469
}
469470

470471
_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
471-
atomic.LoadInt64(&d.depth),
472+
d.depth,
472473
d.readFileNum, d.readPos,
473474
d.writeFileNum, d.writePos)
474475
if err != nil {
@@ -508,7 +509,7 @@ func (d *diskQueue) checkTailCorruption(depth int64) {
508509
d.name, depth)
509510
}
510511
// force set depth 0
511-
atomic.StoreInt64(&d.depth, 0)
512+
d.depth = 0
512513
d.needSync = true
513514
}
514515

@@ -534,7 +535,7 @@ func (d *diskQueue) moveForward() {
534535
oldReadFileNum := d.readFileNum
535536
d.readFileNum = d.nextReadFileNum
536537
d.readPos = d.nextReadPos
537-
depth := atomic.AddInt64(&d.depth, -1)
538+
d.depth -= 1
538539

539540
// see if we need to clean up the old file
540541
if oldReadFileNum != d.nextReadFileNum {
@@ -548,7 +549,7 @@ func (d *diskQueue) moveForward() {
548549
}
549550
}
550551

551-
d.checkTailCorruption(depth)
552+
d.checkTailCorruption(d.depth)
552553
}
553554

554555
func (d *diskQueue) handleReadError() {
@@ -639,6 +640,7 @@ func (d *diskQueue) ioLoop() {
639640
count++
640641
// moveForward sets needSync flag if a file is removed
641642
d.moveForward()
643+
case d.depthChan <- d.depth:
642644
case <-d.emptyChan:
643645
d.emptyResponseChan <- d.deleteAllFiles()
644646
count = 0

diskqueue_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ func TestDiskQueueRoll(t *testing.T) {
123123

124124
Equal(t, int64(1), dq.(*diskQueue).writeFileNum)
125125
Equal(t, int64(0), dq.(*diskQueue).writePos)
126+
127+
for i := 10; i > 0; i-- {
128+
Equal(t, msg, <-dq.ReadChan())
129+
Equal(t, int64(i-1), dq.Depth())
130+
}
126131
}
127132

128133
func assertFileNotExist(t *testing.T, fn string) {

0 commit comments

Comments
 (0)