From d8dcb6612074e30837d5df95bd4a231b8b652a14 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Wed, 13 Apr 2022 16:48:15 +0300 Subject: [PATCH] merge deleted records --- index.js | 39 +++++++++++++++++++++++++++++++++++++++ record.js | 17 ++++++++++++----- test/delete.js | 44 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 5 deletions(-) diff --git a/index.js b/index.js index 57f0f36..949784c 100644 --- a/index.js +++ b/index.js @@ -268,12 +268,51 @@ module.exports = function AsyncAppendOnlyLog(filename, opts) { if (err) return cb(err) const actualBlockBuf = blocksWithDeletables.get(blockIndex) || blockBuf Record.overwriteWithZeroes(actualBlockBuf, getOffsetInBlock(offset)) + mergeDeletedRecords(actualBlockBuf) blocksWithDeletables.set(blockIndex, actualBlockBuf) scheduleFlushDelete() cb() } } + function mergeDeletedRecords(blockBuf) { + let offsetInBlock = 0 + let eobReached = false + while (offsetInBlock < blockSize) { + const [dataBuf, recSize] = Record.read(blockBuf, offsetInBlock) + const length = Record.readDataLength(blockBuf, offsetInBlock) + if (length === EOB.asNumber) return + if (isBufferZero(dataBuf)) { + let newLength = length + let newRecSize = recSize + let nextOffsetInBlock = offsetInBlock + recSize + while (true) { + const nextLength = Record.readDataLength(blockBuf, nextOffsetInBlock) + if (nextLength === EOB.asNumber) { + eobReached = true + break + } + const [nextDataBuf, nextRecSize] = Record.read( + blockBuf, + nextOffsetInBlock + ) + if (isBufferZero(nextDataBuf)) { + newLength += nextRecSize + newRecSize += nextRecSize + nextOffsetInBlock += nextRecSize + } else break + } + if (newLength !== length) { + Record.overwriteWithZeroes(blockBuf, offsetInBlock, newLength) + } + if (eobReached) return + else offsetInBlock += newRecSize + } else { + offsetInBlock += recSize + } + } + } + function hasNoSpaceFor(dataBuf, offsetInBlock) { return offsetInBlock + Record.size(dataBuf) + EOB.SIZE > blockSize } diff --git a/record.js b/record.js index 5e1da89..d1f69a1 100644 --- a/record.js +++ b/record.js @@ -41,11 +41,18 @@ function write(blockBuf, offsetInBlock, dataBuf) { dataBuf.copy(blockBuf, offsetInBlock + HEADER_SIZE) // write dataBuf } -function overwriteWithZeroes(blockBuf, offsetInBlock) { - const dataLength = readDataLength(blockBuf, offsetInBlock) - const dataStart = offsetInBlock + HEADER_SIZE - const dataEnd = dataStart + dataLength - blockBuf.fill(0, dataStart, dataEnd) +function overwriteWithZeroes(blockBuf, offsetInBlock, newLength = 0) { + if (newLength) { + blockBuf.writeUInt16LE(newLength, offsetInBlock) // overwrite dataLength + const dataStart = offsetInBlock + HEADER_SIZE + const dataEnd = dataStart + newLength + blockBuf.fill(0, dataStart, dataEnd) // overwrite dataBuf + } else { + const dataLength = readDataLength(blockBuf, offsetInBlock) + const dataStart = offsetInBlock + HEADER_SIZE + const dataEnd = dataStart + dataLength + blockBuf.fill(0, dataStart, dataEnd) // overwrite dataBuf + } } module.exports = { diff --git a/test/delete.js b/test/delete.js index 58a548e..7436a52 100644 --- a/test/delete.js +++ b/test/delete.js @@ -5,6 +5,7 @@ var tape = require('tape') var fs = require('fs') var pify = require('util').promisify +var run = require('promisify-tuple') var push = require('push-stream') var Log = require('../') @@ -199,3 +200,46 @@ tape('delete many', async (t) => { await pify(log.close)() t.end() }) + +tape('merge consecutive deletes', async function (t) { + var file = '/tmp/test_del_merge_consecutives.log' + try { + fs.unlinkSync(file) + } catch (_) {} + var log = Log(file, { blockSize: 2 * 1024 }) + + function b(str) { + return Buffer.from(str.replace(/ +/g, ''), 'hex') + } + + const [, offset1] = await run(log.append)(Buffer.from('abc')) //msg1) + const [, offset2] = await run(log.append)(Buffer.from('def')) //msg2) + const [, offset3] = await run(log.append)(Buffer.from('ghi')) //msg3) + await run(log.onDrain)() + const [errB1, blockBefore] = await run(log.getBlock)(0) + t.error(errB1) + const expectedBefore = b('0300 61 62 63 0300 64 65 66 0300 67 68 69') + const actualBefore = blockBefore.slice(0, expectedBefore.length) + console.log(expectedBefore) + console.log(actualBefore) + t.equals(actualBefore.compare(expectedBefore), 0, 'block buf looks okay') + + const [err1] = await run(log.del)(offset1) + t.error(err1) + const [err2] = await run(log.del)(offset2) + t.error(err2) + const [err3] = await run(log.del)(offset3) + t.error(err3) + await run(log.onDrain)() + + const [errB2, blockAfter] = await run(log.getBlock)(0) + t.error(errB2) + const expectedAfter = b('0d00 00 00 00 00 00 00 00 00 00 00 00 00 00') + const actualAfter = blockAfter.slice(0, expectedAfter.length) + console.log(expectedAfter) + console.log(actualAfter) + t.equals(actualAfter.compare(expectedAfter), 0, 'block buf looks okay') + + await run(log.close)() + t.end() +})