diff --git a/diskqueue.go b/diskqueue.go index f01ca71..19f43fd 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -279,12 +279,44 @@ func (d *diskQueue) skipToNextRWFile() error { return err } +func (d *diskQueue) readRollFile() { + d.nextReadFileNum++ + d.nextReadPos = 0 + + oldReadFileNum := d.readFileNum + d.readFileNum = d.nextReadFileNum + d.readPos = d.nextReadPos + + // sync every time we start reading from a new file + d.needSync = true + + fn := d.fileName(oldReadFileNum) + err := os.Remove(fn) + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err) + } + + d.checkTailCorruption(d.depth) +} + // readOne performs a low level filesystem read for a single []byte // while advancing read positions and rolling files, if necessary func (d *diskQueue) readOne() ([]byte, error) { var err error var msgSize int32 + // we only consider rotating if we're reading a "complete" file + // and since we cannot know the size at which it was rotated, we + // rely on maxBytesPerFileRead rather than maxBytesPerFile + if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead && d.readFile != nil { + if d.readFile != nil { + d.readFile.Close() + d.readFile = nil + } + + d.readRollFile() + } + if d.readFile == nil { curFileName := d.fileName(d.readFileNum) d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600) @@ -346,19 +378,6 @@ func (d *diskQueue) readOne() ([]byte, error) { d.nextReadPos = d.readPos + totalBytes d.nextReadFileNum = d.readFileNum - // we only consider rotating if we're reading a "complete" file - // and since we cannot know the size at which it was rotated, we - // rely on maxBytesPerFileRead rather than maxBytesPerFile - if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead { - if d.readFile != nil { - d.readFile.Close() - d.readFile = nil - } - - d.nextReadFileNum++ - d.nextReadPos = 0 - } - return readBuf, nil } diff --git a/diskqueue_test.go b/diskqueue_test.go index 9d9ae0b..8480c26 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -77,6 +77,17 @@ func NewTestLogger(tbl tbLog) AppLogFunc { } } +func NewTestLoggerFatalAtError(tbl tbLog, fatalChan chan error) AppLogFunc { + return func(lvl LogLevel, f string, args ...interface{}) { + if lvl == ERROR || lvl == FATAL { + // tbl.Fatal(fmt.Sprintf(lvl.String()+": "+f, args...)) + fatalChan <- fmt.Errorf(lvl.String()+": "+f, args...) + } + + tbl.Log(fmt.Sprintf(lvl.String()+": "+f, args...)) + } +} + func TestDiskQueue(t *testing.T) { l := NewTestLogger(t) @@ -573,6 +584,37 @@ func TestDiskQueueResize(t *testing.T) { } } +func TestWriteRollReadEOF(t *testing.T) { + fatalChan := make(chan error, 1) + l := NewTestLoggerFatalAtError(t, fatalChan) + + dqName := "test_disk_queue" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + dq := New(dqName, tmpDir, 1024, 4, 1<<10, 2500, 2*time.Second, l) + defer dq.Close() + NotNil(t, dq) + Equal(t, int64(0), dq.Depth()) + + for i := 0; i < 205; i++ { // 204 messages fit, but message 205 will be too big + msg := []byte(fmt.Sprintf("%05d", i)) // 5 bytes + err = dq.Put(msg) + + msgOut := <-dq.ReadChan() + Equal(t, msg, msgOut) + } + + select { + case err = <-fatalChan: + t.Fatal(err) + default: + } + +} + func BenchmarkDiskQueuePut16(b *testing.B) { benchmarkDiskQueuePut(16, b) }