Skip to content

Commit

Permalink
switch to next file before maxBytesPerFile is reached
Browse files Browse the repository at this point in the history
rather than switching *after* maxBytesPerFile is reached

fixes nsqio#30
  • Loading branch information
ploxiln committed Sep 12, 2021
1 parent bc05aaf commit eefc786
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 39 deletions.
54 changes: 27 additions & 27 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,33 @@ func (d *diskQueue) readOne() ([]byte, error) {
func (d *diskQueue) writeOne(data []byte) error {
var err error

dataLen := int32(len(data))
totalBytes := int64(4 + dataLen)

if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize)
}

// will not wrap-around if maxBytesPerFile + maxMsgSize < Int64Max
if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile {
if d.readFileNum == d.writeFileNum {
d.maxBytesPerFileRead = d.writePos
}

d.writeFileNum++
d.writePos = 0

// sync every time we start writing to a new file
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}

if d.writeFile != nil {
d.writeFile.Close()
d.writeFile = nil
}
}
if d.writeFile == nil {
curFileName := d.fileName(d.writeFileNum)
d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
Expand All @@ -377,12 +404,6 @@ func (d *diskQueue) writeOne(data []byte) error {
}
}

dataLen := int32(len(data))

if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize)
}

d.writeBuf.Reset()
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
if err != nil {
Expand All @@ -402,30 +423,9 @@ func (d *diskQueue) writeOne(data []byte) error {
return err
}

totalBytes := int64(4 + dataLen)
d.writePos += totalBytes
d.depth += 1

if d.writePos >= d.maxBytesPerFile {
if d.readFileNum == d.writeFileNum {
d.maxBytesPerFileRead = d.writePos
}

d.writeFileNum++
d.writePos = 0

// sync every time we start writing to a new file
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}

if d.writeFile != nil {
d.writeFile.Close()
d.writeFile = nil
}
}

return err
}

Expand Down
28 changes: 16 additions & 12 deletions diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,23 +108,23 @@ func TestDiskQueueRoll(t *testing.T) {
panic(err)
}
defer os.RemoveAll(tmpDir)
msg := bytes.Repeat([]byte{0}, 10)
msg := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
ml := int64(len(msg))
dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l)
defer dq.Close()
NotNil(t, dq)
Equal(t, int64(0), dq.Depth())

for i := 0; i < 10; i++ {
for i := 0; i < 11; i++ {
err := dq.Put(msg)
Nil(t, err)
Equal(t, int64(i+1), dq.Depth())
}

Equal(t, int64(1), dq.(*diskQueue).writeFileNum)
Equal(t, int64(0), dq.(*diskQueue).writePos)
Equal(t, int64(ml+4), dq.(*diskQueue).writePos)

for i := 10; i > 0; i-- {
for i := 11; i > 0; i-- {
Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(i-1), dq.Depth())
}
Expand Down Expand Up @@ -216,7 +216,11 @@ func TestDiskQueueCorruption(t *testing.T) {
dq := New(dqName, tmpDir, 1000, 10, 1<<10, 5, 2*time.Second, l)
defer dq.Close()

msg := make([]byte, 123) // 127 bytes per message, 8 (1016 bytes) messages per file
msg := make([]byte, 120) // 124 bytes per message, 8 messages (992 bytes) per file
msg[0] = 91
msg[62] = 4
msg[119] = 211

for i := 0; i < 25; i++ {
dq.Put(msg)
}
Expand All @@ -225,7 +229,7 @@ func TestDiskQueueCorruption(t *testing.T) {

// corrupt the 2nd file
dqFn := dq.(*diskQueue).fileName(1)
os.Truncate(dqFn, 500) // 3 valid messages, 5 corrupted
os.Truncate(dqFn, 400) // 3 valid messages, 5 corrupted

for i := 0; i < 19; i++ { // 1 message leftover in 4th file
Equal(t, msg, <-dq.ReadChan())
Expand Down Expand Up @@ -451,14 +455,14 @@ func TestDiskQueueResize(t *testing.T) {
NotNil(t, dq)
Equal(t, int64(0), dq.Depth())

for i := 0; i < 8; i++ {
for i := 0; i < 9; i++ {
msg[0] = byte(i)
err := dq.Put(msg)
Nil(t, err)
}
Equal(t, int64(1), dq.(*diskQueue).writeFileNum)
Equal(t, int64(0), dq.(*diskQueue).writePos)
Equal(t, int64(8), dq.Depth())
Equal(t, int64(ml+4), dq.(*diskQueue).writePos)
Equal(t, int64(9), dq.Depth())

dq.Close()
dq = New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, time.Second, l)
Expand All @@ -469,10 +473,10 @@ func TestDiskQueueResize(t *testing.T) {
Nil(t, err)
}
Equal(t, int64(2), dq.(*diskQueue).writeFileNum)
Equal(t, int64(0), dq.(*diskQueue).writePos)
Equal(t, int64(18), dq.Depth())
Equal(t, int64(ml+4), dq.(*diskQueue).writePos)
Equal(t, int64(19), dq.Depth())

for i := 0; i < 8; i++ {
for i := 0; i < 9; i++ {
msg[0] = byte(i)
Equal(t, msg, <-dq.ReadChan())
}
Expand Down

0 comments on commit eefc786

Please sign in to comment.