diff --git a/diskqueue.go b/diskqueue.go index 9a982a5..ee6c22d 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -358,6 +358,33 @@ func (d *diskQueue) readOne() ([]byte, error) { func (d *diskQueue) writeOne(data []byte) error { var err error + dataLen := int32(len(data)) + totalBytes := int64(4 + dataLen) + + if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { + return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize) + } + + // will not wrap-around if maxBytesPerFile + maxMsgSize < Int64Max + if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile { + if d.readFileNum == d.writeFileNum { + d.maxBytesPerFileRead = d.writePos + } + + d.writeFileNum++ + d.writePos = 0 + + // sync every time we start writing to a new file + err = d.sync() + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) + } + + if d.writeFile != nil { + d.writeFile.Close() + d.writeFile = nil + } + } if d.writeFile == nil { curFileName := d.fileName(d.writeFileNum) d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) @@ -377,12 +404,6 @@ func (d *diskQueue) writeOne(data []byte) error { } } - dataLen := int32(len(data)) - - if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { - return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize) - } - d.writeBuf.Reset() err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { @@ -402,30 +423,9 @@ func (d *diskQueue) writeOne(data []byte) error { return err } - totalBytes := int64(4 + dataLen) d.writePos += totalBytes d.depth += 1 - if d.writePos >= d.maxBytesPerFile { - if d.readFileNum == d.writeFileNum { - d.maxBytesPerFileRead = d.writePos - } - - d.writeFileNum++ - d.writePos = 0 - - // sync every time we start writing to a new file - err = d.sync() - if err != nil { - d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) - } - - if d.writeFile != nil { - d.writeFile.Close() - d.writeFile = nil - } - } - return err } diff --git a/diskqueue_test.go b/diskqueue_test.go index c3981d4..fc72406 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -108,23 +108,23 @@ func TestDiskQueueRoll(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - msg := bytes.Repeat([]byte{0}, 10) + msg := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0} ml := int64(len(msg)) dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l) defer dq.Close() NotNil(t, dq) Equal(t, int64(0), dq.Depth()) - for i := 0; i < 10; i++ { + for i := 0; i < 11; i++ { err := dq.Put(msg) Nil(t, err) Equal(t, int64(i+1), dq.Depth()) } Equal(t, int64(1), dq.(*diskQueue).writeFileNum) - Equal(t, int64(0), dq.(*diskQueue).writePos) + Equal(t, int64(ml+4), dq.(*diskQueue).writePos) - for i := 10; i > 0; i-- { + for i := 11; i > 0; i-- { Equal(t, msg, <-dq.ReadChan()) Equal(t, int64(i-1), dq.Depth()) } @@ -216,7 +216,11 @@ func TestDiskQueueCorruption(t *testing.T) { dq := New(dqName, tmpDir, 1000, 10, 1<<10, 5, 2*time.Second, l) defer dq.Close() - msg := make([]byte, 123) // 127 bytes per message, 8 (1016 bytes) messages per file + msg := make([]byte, 120) // 124 bytes per message, 8 messages (992 bytes) per file + msg[0] = 91 + msg[62] = 4 + msg[119] = 211 + for i := 0; i < 25; i++ { dq.Put(msg) } @@ -225,7 +229,7 @@ func TestDiskQueueCorruption(t *testing.T) { // corrupt the 2nd file dqFn := dq.(*diskQueue).fileName(1) - os.Truncate(dqFn, 500) // 3 valid messages, 5 corrupted + os.Truncate(dqFn, 400) // 3 valid messages, 5 corrupted for i := 0; i < 19; i++ { // 1 message leftover in 4th file Equal(t, msg, <-dq.ReadChan()) @@ -451,14 +455,14 @@ func TestDiskQueueResize(t *testing.T) { NotNil(t, dq) Equal(t, int64(0), dq.Depth()) - for i := 0; i < 8; i++ { + for i := 0; i < 9; i++ { msg[0] = byte(i) err := dq.Put(msg) Nil(t, err) } Equal(t, int64(1), dq.(*diskQueue).writeFileNum) - Equal(t, int64(0), dq.(*diskQueue).writePos) - Equal(t, int64(8), dq.Depth()) + Equal(t, int64(ml+4), dq.(*diskQueue).writePos) + Equal(t, int64(9), dq.Depth()) dq.Close() dq = New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, time.Second, l) @@ -469,10 +473,10 @@ func TestDiskQueueResize(t *testing.T) { Nil(t, err) } Equal(t, int64(2), dq.(*diskQueue).writeFileNum) - Equal(t, int64(0), dq.(*diskQueue).writePos) - Equal(t, int64(18), dq.Depth()) + Equal(t, int64(ml+4), dq.(*diskQueue).writePos) + Equal(t, int64(19), dq.Depth()) - for i := 0; i < 8; i++ { + for i := 0; i < 9; i++ { msg[0] = byte(i) Equal(t, msg, <-dq.ReadChan()) }