From ea3c6e4887c400553b8a7dd7a114583cda1df14e Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Wed, 20 Apr 2022 19:06:54 +0300 Subject: [PATCH] use sizeDiff=1 to represent holes filled --- compaction.js | 16 +++++++++++----- index.js | 4 ++-- test/compaction.js | 32 +++++++++++++++++++++++++++----- 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/compaction.js b/compaction.js index bf63260..53202bf 100644 --- a/compaction.js +++ b/compaction.js @@ -134,6 +134,7 @@ function Compaction(log, onDone) { const progress = Obv() // for the unshifted offset let startOffset = 0 let version = 0 + let holesFound = true // assume true let compactedBlockIndex = -1 let compactedBlockBuf = null @@ -170,13 +171,11 @@ function Compaction(log, onDone) { if (state.initial) { findStateFromLog(function foundStateFromLog(err, state) { if (err) return cb(err) - startOffset = state.compactedBlockIndex * log.blockSize compactedBlockIndex = state.compactedBlockIndex + startOffset = compactedBlockIndex * log.blockSize unshiftedOffset = state.unshiftedOffset unshiftedBlockBuf = state.unshiftedBlockBuf - unshiftedBlockIndex = Math.floor( - state.unshiftedOffset / log.blockSize - ) + unshiftedBlockIndex = Math.floor(unshiftedOffset / log.blockSize) savePersistentState(cb) }) } else { @@ -212,6 +211,7 @@ function Compaction(log, onDone) { if (err) return cb(err) if (holeOffset === -1) { compactedBlockIndex = Math.floor(log.since.value / log.blockSize) + holesFound = false stop() return } @@ -414,7 +414,13 @@ function Compaction(log, onDone) { if (err) return onDone(err) persistentState.destroy(function onStateDestroyed(err) { if (err) return onDone(err) - onDone(null, sizeDiff) + if (sizeDiff === 0 && holesFound) { + // Truncation did not make the log smaller but it did rewrite the log. + // So report 1 byte as a way of saying that compaction filled holes. + onDone(null, { sizeDiff: 1 }) + } else { + onDone(null, { sizeDiff }) + } }) }) } diff --git a/index.js b/index.js index 57f0f36..0b2b1d4 100644 --- a/index.js +++ b/index.js @@ -485,10 +485,10 @@ module.exports = function AsyncAppendOnlyLog(filename, opts) { } onDrain(function startCompactAfterDrain() { onDeletesFlushed(function startCompactAfterDeletes() { - compaction = new Compaction(self, (err, sizeDiff) => { + compaction = new Compaction(self, (err, stats) => { compaction = null if (err) return cb(err) - compactionProgress.set({ sizeDiff, percent: 1, done: true }) + compactionProgress.set({ ...stats, percent: 1, done: true }) for (const callback of waitingCompaction) callback() waitingCompaction.length = 0 cb() diff --git a/test/compaction.js b/test/compaction.js index 9070e3f..4c02c91 100644 --- a/test/compaction.js +++ b/test/compaction.js @@ -26,10 +26,32 @@ tape('compact a log that does not have holes', async (t) => { await run(log.onDrain)() t.pass('append two records') + const progressArr = [] + log.compactionProgress((stats) => { + progressArr.push(stats) + }) + const [err] = await run(log.compact)() await run(log.onDrain)() t.error(err, 'no error when compacting') + t.deepEquals( + progressArr, + [ + { + sizeDiff: 0, + percent: 1, + done: true, + }, + { + sizeDiff: 0, + percent: 1, + done: true, + }, + ], + 'progress events' + ) + await new Promise((resolve) => { log.stream({ offsets: false }).pipe( push.collect((err, ary) => { @@ -519,7 +541,7 @@ tape('startOffset is correct', async (t) => { done: false, }, { - sizeDiff: 0, + sizeDiff: 1, percent: 1, done: true, }, @@ -616,7 +638,7 @@ tape('recovers from crash just after persisting state', async (t) => { done: false, }, { - sizeDiff: 0, + sizeDiff: 1, percent: 1, done: true, }, @@ -668,8 +690,8 @@ tape('recovers from crash just after persisting block', async (t) => { t.pass('suppose compaction was in progress: [0x22, 0x33] and [0x33, 0x44]') const version = [1, 0, 0, 0] // uint32LE - const startOffset = [0,0,0,0] // uint32LE - const truncateBlockIndex = [255, 255, 255, 255] //uint32LE + const startOffset = [0, 0, 0, 0] // uint32LE + const truncateBlockIndex = [255, 255, 255, 255] // uint32LE const compactingBlockIndex = [0, 0, 0, 0] // uint32LE const unshiftedOffset = [0, 0, 0, 0] // uint32LE const unshiftedBlock = [ @@ -739,7 +761,7 @@ tape('restarts from crash just before truncating log', async (t) => { t.pass('suppose compaction ready: [0x22, 0x44], [0x55, 0x66], [0x55, 0x66]') const version = [1, 0, 0, 0] // uint32LE - const startOffset = [0,0,0,0] // uint32LE + const startOffset = [0, 0, 0, 0] // uint32LE const truncateBlockIndex = [1, 0, 0, 0] //uint32LE const compactingBlockIndex = [0, 0, 0, 0] // uint32LE const unshiftedOffset = [0, 0, 0, 0] // uint32LE