Skip to content

Commit

Permalink
merge deleted records
Browse files Browse the repository at this point in the history
  • Loading branch information
staltz committed Apr 19, 2022
1 parent 89a164d commit 35c5b3b
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 5 deletions.
39 changes: 39 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,51 @@ module.exports = function AsyncAppendOnlyLog(filename, opts) {
if (err) return cb(err)
const actualBlockBuf = blocksWithDeletables.get(blockIndex) || blockBuf
Record.overwriteWithZeroes(actualBlockBuf, getOffsetInBlock(offset))
mergeDeletedRecords(actualBlockBuf)
blocksWithDeletables.set(blockIndex, actualBlockBuf)
scheduleFlushDelete()
cb()
}
}

function mergeDeletedRecords(blockBuf) {
let offsetInBlock = 0
let eobReached = false
while (offsetInBlock < blockSize) {
const [dataBuf, recSize] = Record.read(blockBuf, offsetInBlock)
const length = Record.readDataLength(blockBuf, offsetInBlock)
if (length === EOB.asNumber) return
if (isBufferZero(dataBuf)) {
let newLength = length
let newRecSize = recSize
let nextOffsetInBlock = offsetInBlock + recSize
while (true) {
const nextLength = Record.readDataLength(blockBuf, nextOffsetInBlock)
if (nextLength === EOB.asNumber) {
eobReached = true
break
}
const [nextDataBuf, nextRecSize] = Record.read(
blockBuf,
nextOffsetInBlock
)
if (isBufferZero(nextDataBuf)) {
newLength += nextRecSize
newRecSize += nextRecSize
nextOffsetInBlock += nextRecSize
} else break
}
if (newLength !== length) {
Record.overwriteWithZeroes(blockBuf, offsetInBlock, newLength)
}
if (eobReached) return
else offsetInBlock += newRecSize
} else {
offsetInBlock += recSize
}
}
}

function hasNoSpaceFor(dataBuf, offsetInBlock) {
return offsetInBlock + Record.size(dataBuf) + EOB.SIZE > blockSize
}
Expand Down
17 changes: 12 additions & 5 deletions record.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,18 @@ function write(blockBuf, offsetInBlock, dataBuf) {
dataBuf.copy(blockBuf, offsetInBlock + HEADER_SIZE) // write dataBuf
}

function overwriteWithZeroes(blockBuf, offsetInBlock) {
const dataLength = readDataLength(blockBuf, offsetInBlock)
const dataStart = offsetInBlock + HEADER_SIZE
const dataEnd = dataStart + dataLength
blockBuf.fill(0, dataStart, dataEnd)
function overwriteWithZeroes(blockBuf, offsetInBlock, newLength = 0) {
if (newLength) {
blockBuf.writeUInt16LE(newLength, offsetInBlock) // overwrite dataLength
const dataStart = offsetInBlock + HEADER_SIZE
const dataEnd = dataStart + newLength
blockBuf.fill(0, dataStart, dataEnd) // overwrite dataBuf
} else {
const dataLength = readDataLength(blockBuf, offsetInBlock)
const dataStart = offsetInBlock + HEADER_SIZE
const dataEnd = dataStart + dataLength
blockBuf.fill(0, dataStart, dataEnd) // overwrite dataBuf
}
}

module.exports = {
Expand Down
44 changes: 44 additions & 0 deletions test/delete.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
var tape = require('tape')
var fs = require('fs')
var pify = require('util').promisify
var run = require('promisify-tuple')
var push = require('push-stream')
var Log = require('../')

Expand Down Expand Up @@ -199,3 +200,46 @@ tape('delete many', async (t) => {
await pify(log.close)()
t.end()
})

tape('merge consecutive deletes', async function (t) {
var file = '/tmp/test_del_merge_consecutives.log'
try {
fs.unlinkSync(file)
} catch (_) {}
var log = Log(file, { blockSize: 2 * 1024 })

function b(str) {
return Buffer.from(str.replace(/ +/g, ''), 'hex')
}

const [, offset1] = await run(log.append)(Buffer.from('abc')) //msg1)
const [, offset2] = await run(log.append)(Buffer.from('def')) //msg2)
const [, offset3] = await run(log.append)(Buffer.from('ghi')) //msg3)
await run(log.onDrain)()
const [errB1, blockBefore] = await run(log.getBlock)(0)
t.error(errB1)
const expectedBefore = b('0300 61 62 63 0300 64 65 66 0300 67 68 69')
const actualBefore = blockBefore.slice(0, expectedBefore.length)
console.log(expectedBefore)
console.log(actualBefore)
t.equals(actualBefore.compare(expectedBefore), 0, 'block buf looks okay')

const [err1] = await run(log.del)(offset1)
t.error(err1)
const [err2] = await run(log.del)(offset2)
t.error(err2)
const [err3] = await run(log.del)(offset3)
t.error(err3)
await run(log.onDrain)()

const [errB2, blockAfter] = await run(log.getBlock)(0)
t.error(errB2)
const expectedAfter = b('0d00 00 00 00 00 00 00 00 00 00 00 00 00 00')
const actualAfter = blockAfter.slice(0, expectedAfter.length)
console.log(expectedAfter)
console.log(actualAfter)
t.equals(actualAfter.compare(expectedAfter), 0, 'block buf looks okay')

await run(log.close)()
t.end()
})

0 comments on commit 35c5b3b

Please sign in to comment.