Skip to content

Commit

Permalink
Merge pull request #71 from ssb-ngi-pointer/holes-filled
Browse files Browse the repository at this point in the history
use sizeDiff=1 to represent holes filled
  • Loading branch information
staltz authored Apr 24, 2022
2 parents e3b19e1 + ea3c6e4 commit 1c07825
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 12 deletions.
16 changes: 11 additions & 5 deletions compaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ function Compaction(log, onDone) {
const progress = Obv() // for the unshifted offset
let startOffset = 0
let version = 0
let holesFound = true // assume true

let compactedBlockIndex = -1
let compactedBlockBuf = null
Expand Down Expand Up @@ -170,13 +171,11 @@ function Compaction(log, onDone) {
if (state.initial) {
findStateFromLog(function foundStateFromLog(err, state) {
if (err) return cb(err)
startOffset = state.compactedBlockIndex * log.blockSize
compactedBlockIndex = state.compactedBlockIndex
startOffset = compactedBlockIndex * log.blockSize
unshiftedOffset = state.unshiftedOffset
unshiftedBlockBuf = state.unshiftedBlockBuf
unshiftedBlockIndex = Math.floor(
state.unshiftedOffset / log.blockSize
)
unshiftedBlockIndex = Math.floor(unshiftedOffset / log.blockSize)
savePersistentState(cb)
})
} else {
Expand Down Expand Up @@ -212,6 +211,7 @@ function Compaction(log, onDone) {
if (err) return cb(err)
if (holeOffset === -1) {
compactedBlockIndex = Math.floor(log.since.value / log.blockSize)
holesFound = false
stop()
return
}
Expand Down Expand Up @@ -414,7 +414,13 @@ function Compaction(log, onDone) {
if (err) return onDone(err)
persistentState.destroy(function onStateDestroyed(err) {
if (err) return onDone(err)
onDone(null, sizeDiff)
if (sizeDiff === 0 && holesFound) {
// Truncation did not make the log smaller but it did rewrite the log.
// So report 1 byte as a way of saying that compaction filled holes.
onDone(null, { sizeDiff: 1 })
} else {
onDone(null, { sizeDiff })
}
})
})
}
Expand Down
4 changes: 2 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -485,10 +485,10 @@ module.exports = function AsyncAppendOnlyLog(filename, opts) {
}
onDrain(function startCompactAfterDrain() {
onDeletesFlushed(function startCompactAfterDeletes() {
compaction = new Compaction(self, (err, sizeDiff) => {
compaction = new Compaction(self, (err, stats) => {
compaction = null
if (err) return cb(err)
compactionProgress.set({ sizeDiff, percent: 1, done: true })
compactionProgress.set({ ...stats, percent: 1, done: true })
for (const callback of waitingCompaction) callback()
waitingCompaction.length = 0
cb()
Expand Down
32 changes: 27 additions & 5 deletions test/compaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,32 @@ tape('compact a log that does not have holes', async (t) => {
await run(log.onDrain)()
t.pass('append two records')

const progressArr = []
log.compactionProgress((stats) => {
progressArr.push(stats)
})

const [err] = await run(log.compact)()
await run(log.onDrain)()
t.error(err, 'no error when compacting')

t.deepEquals(
progressArr,
[
{
sizeDiff: 0,
percent: 1,
done: true,
},
{
sizeDiff: 0,
percent: 1,
done: true,
},
],
'progress events'
)

await new Promise((resolve) => {
log.stream({ offsets: false }).pipe(
push.collect((err, ary) => {
Expand Down Expand Up @@ -519,7 +541,7 @@ tape('startOffset is correct', async (t) => {
done: false,
},
{
sizeDiff: 0,
sizeDiff: 1,
percent: 1,
done: true,
},
Expand Down Expand Up @@ -616,7 +638,7 @@ tape('recovers from crash just after persisting state', async (t) => {
done: false,
},
{
sizeDiff: 0,
sizeDiff: 1,
percent: 1,
done: true,
},
Expand Down Expand Up @@ -668,8 +690,8 @@ tape('recovers from crash just after persisting block', async (t) => {
t.pass('suppose compaction was in progress: [0x22, 0x33] and [0x33, 0x44]')

const version = [1, 0, 0, 0] // uint32LE
const startOffset = [0,0,0,0] // uint32LE
const truncateBlockIndex = [255, 255, 255, 255] //uint32LE
const startOffset = [0, 0, 0, 0] // uint32LE
const truncateBlockIndex = [255, 255, 255, 255] // uint32LE
const compactingBlockIndex = [0, 0, 0, 0] // uint32LE
const unshiftedOffset = [0, 0, 0, 0] // uint32LE
const unshiftedBlock = [
Expand Down Expand Up @@ -739,7 +761,7 @@ tape('restarts from crash just before truncating log', async (t) => {
t.pass('suppose compaction ready: [0x22, 0x44], [0x55, 0x66], [0x55, 0x66]')

const version = [1, 0, 0, 0] // uint32LE
const startOffset = [0,0,0,0] // uint32LE
const startOffset = [0, 0, 0, 0] // uint32LE
const truncateBlockIndex = [1, 0, 0, 0] //uint32LE
const compactingBlockIndex = [0, 0, 0, 0] // uint32LE
const unshiftedOffset = [0, 0, 0, 0] // uint32LE
Expand Down

0 comments on commit 1c07825

Please sign in to comment.