Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge deleted records #67

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,44 @@ module.exports = function AsyncAppendOnlyLog(filename, opts) {
}
}

function mergeDeletedRecords(blockBuf) {
let offsetInBlock = 0
let eobReached = false
while (offsetInBlock < blockSize) {
const [dataBuf, recSize] = Record.read(blockBuf, offsetInBlock)
const length = Record.readDataLength(blockBuf, offsetInBlock)
if (length === EOB.asNumber) return
if (isBufferZero(dataBuf)) {
let newLength = length
let newRecSize = recSize
let nextOffsetInBlock = offsetInBlock + recSize
while (true) {
const nextLength = Record.readDataLength(blockBuf, nextOffsetInBlock)
if (nextLength === EOB.asNumber) {
eobReached = true
break
}
const [nextDataBuf, nextRecSize] = Record.read(
blockBuf,
nextOffsetInBlock
)
if (isBufferZero(nextDataBuf)) {
newLength += nextRecSize
newRecSize += nextRecSize
nextOffsetInBlock += nextRecSize
} else break
}
if (newLength !== length) {
Record.overwriteWithZeroes(blockBuf, offsetInBlock, newLength)
}
if (eobReached) return
else offsetInBlock += newRecSize
} else {
offsetInBlock += recSize
}
}
}

function hasNoSpaceFor(dataBuf, offsetInBlock) {
return offsetInBlock + Record.size(dataBuf) + EOB.SIZE > blockSize
}
Expand All @@ -292,6 +330,7 @@ module.exports = function AsyncAppendOnlyLog(filename, opts) {
blocksWithDeletables.delete(blockIndex)
blocksWithDeletables.set(-1, null) // indicate that flush is active

mergeDeletedRecords(blockBuf)
writeWithFSync(blockStart, blockBuf, null, function flushedDelete(err) {
blocksWithDeletables.delete(-1) // indicate that flush is not active
if (err) {
Expand Down
17 changes: 12 additions & 5 deletions record.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,18 @@ function write(blockBuf, offsetInBlock, dataBuf) {
dataBuf.copy(blockBuf, offsetInBlock + HEADER_SIZE) // write dataBuf
}

function overwriteWithZeroes(blockBuf, offsetInBlock) {
const dataLength = readDataLength(blockBuf, offsetInBlock)
const dataStart = offsetInBlock + HEADER_SIZE
const dataEnd = dataStart + dataLength
blockBuf.fill(0, dataStart, dataEnd)
function overwriteWithZeroes(blockBuf, offsetInBlock, newLength = 0) {
if (newLength) {
blockBuf.writeUInt16LE(newLength, offsetInBlock) // overwrite dataLength
const dataStart = offsetInBlock + HEADER_SIZE
const dataEnd = dataStart + newLength
blockBuf.fill(0, dataStart, dataEnd) // overwrite dataBuf
} else {
const dataLength = readDataLength(blockBuf, offsetInBlock)
const dataStart = offsetInBlock + HEADER_SIZE
const dataEnd = dataStart + dataLength
blockBuf.fill(0, dataStart, dataEnd) // overwrite dataBuf
}
}

module.exports = {
Expand Down
10 changes: 5 additions & 5 deletions test/compaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ tape('shift many blocks', async (t) => {
// block 0
[0x11, 0x22, 0x33],
// block 1
[0x44, null, null],
[0x44, null, 0x77],
// block 2
[0x77, 0x88, 0x99],
[0x88, 0x99, 0xaa],
// block 3
[0xaa, null, 0xcc],
[null, 0xcc, 0xdd],
// block 4
[0xdd, 0xee, 0xff],
[0xee, 0xff],
].flat(),
'log has 5 blocks and some holes'
)
Expand Down Expand Up @@ -304,7 +304,7 @@ tape('cannot read truncated regions of the log', async (t) => {
// block 0
[0x11, 0x22, 0x33],
// block 1
[0x44, null, null],
[0x44, null],
// block 2
[null, 0x88, 0x99],
].flat(),
Expand Down
44 changes: 44 additions & 0 deletions test/delete.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
var tape = require('tape')
var fs = require('fs')
var pify = require('util').promisify
var run = require('promisify-tuple')
var push = require('push-stream')
var Log = require('../')

Expand Down Expand Up @@ -199,3 +200,46 @@ tape('delete many', async (t) => {
await pify(log.close)()
t.end()
})

tape('merge consecutive deletes', async function (t) {
var file = '/tmp/test_del_merge_consecutives.log'
try {
fs.unlinkSync(file)
} catch (_) {}
var log = Log(file, { blockSize: 2 * 1024 })

function b(str) {
return Buffer.from(str.replace(/ +/g, ''), 'hex')
}

const [, offset1] = await run(log.append)(Buffer.from('abc')) //msg1)
const [, offset2] = await run(log.append)(Buffer.from('def')) //msg2)
const [, offset3] = await run(log.append)(Buffer.from('ghi')) //msg3)
await run(log.onDrain)()
const [errB1, blockBefore] = await run(log.getBlock)(0)
t.error(errB1)
const expectedBefore = b('0300 61 62 63 0300 64 65 66 0300 67 68 69')
const actualBefore = blockBefore.slice(0, expectedBefore.length)
console.log(expectedBefore)
console.log(actualBefore)
t.equals(actualBefore.compare(expectedBefore), 0, 'block buf looks okay')

const [err1] = await run(log.del)(offset1)
t.error(err1)
const [err2] = await run(log.del)(offset2)
t.error(err2)
const [err3] = await run(log.del)(offset3)
t.error(err3)
await run(log.onDeletesFlushed)()

const [errB2, blockAfter] = await run(log.getBlock)(0)
t.error(errB2)
const expectedAfter = b('0d00 00 00 00 00 00 00 00 00 00 00 00 00 00')
const actualAfter = blockAfter.slice(0, expectedAfter.length)
console.log(expectedAfter)
console.log(actualAfter)
t.equals(actualAfter.compare(expectedAfter), 0, 'block buf looks okay')

await run(log.close)()
t.end()
})