diff --git a/README.md b/README.md index e8b1de3f..f497d8ab 100644 --- a/README.md +++ b/README.md @@ -224,16 +224,17 @@ sbot.db.getIndex('myindex').myOwnMethodToGetStuff() Or you can wrap that in a secret-stack plugin (in the example above, `exports.init` should return an object with the API functions). -There are other special methods you can implement in order to add -"hooks" in the `Plugin` subclass: +There are other optional methods you can implement in the `Plugin` subclass: -- `onLoaded(cb)`: called once, at startup, when the index is - successfully loaded from disk and is ready to receive queries -- `onFlush(cb)`: called when the leveldb index is about to be saved to +- `onLoaded(cb)`: a hook called once, at startup, when the index is successfully + loaded from disk and is ready to receive queries +- `onFlush(cb)`: a hook called when the leveldb index is about to be saved to disk -- `indexesContent()`: method used when reindexing private group - messages to determine if the leveldb index needs to be updated for - decrypted messages. The default method returns true. +- `indexesContent()`: method used when reindexing private group messages to + determine if the leveldb index needs to be updated for decrypted messages. The + default method returns true. +- `reset()`: a method that you can use to reset in-memory state that you might + have in your plugin, when the leveldb index is about to be rebuilt. ### Compatibility plugins @@ -268,7 +269,7 @@ const sbot = SecretStack({ caps }) The following is a list of modules that works well with ssb-db2: - [ssb-threads] for working with post messages as threads - - [ssb-suggest-lite] for fetching profiles of authors + - [ssb-suggest-lite] for fetching profiles of authors - [ssb-friends] for working with the social graph - [ssb-search2] for full-text searching - [ssb-crut] for working with records that can be modified @@ -463,11 +464,16 @@ Use [JITDB's prepare](https://github.com/ssb-ngi-pointer/jitdb/#prepareoperation Waits for the index with name `indexName` to be in sync with the main log and then call `cb` with no arguments. If `indexName` is not -provided, the base index will be used. +provided, the base index will be used. The reason we do it this way is that indexes are updated asynchronously in order to not block message writing. +### compact(cb) + +Compacts the log (filling in the blanks left by deleted messages and optimizing +space) and then rebuilds indexes. + ## Configuration You can use ssb-config parameters to configure some aspects of ssb-db2: diff --git a/db.js b/db.js index 3f7ff32b..a9ecffb1 100644 --- a/db.js +++ b/db.js @@ -3,6 +3,7 @@ // SPDX-License-Identifier: LGPL-3.0-only const os = require('os') +const fs = require('fs') const path = require('path') const rimraf = require('rimraf') const mkdirp = require('mkdirp') @@ -21,7 +22,6 @@ const pull = require('pull-stream') const paramap = require('pull-paramap') const Ref = require('ssb-ref') const Obv = require('obz') -const promisify = require('promisify-4loc') const jitdbOperators = require('jitdb/operators') const bendyButt = require('ssb-bendy-butt') const JITDB = require('jitdb') @@ -30,7 +30,12 @@ const multicb = require('multicb') const mutexify = require('mutexify') const operators = require('./operators') -const { jitIndexesPath } = require('./defaults') +const { + jitIndexesPath, + resetLevelPath, + resetPrivatePath, + reindexJitPath, +} = require('./defaults') const { onceWhen, ReadyGate } = require('./utils') const DebouncingBatchAdd = require('./debounce-batch') const Log = require('./log') @@ -62,6 +67,7 @@ exports.manifest = { addBatch: 'async', addOOOBatch: 'async', getStatus: 'sync', + compact: 'async', indexingProgress: 'source', // `query` should be `sync`, but secret-stack is automagically converting it @@ -92,6 +98,9 @@ exports.init = function (sbot, config) { const debug = Debug('ssb:db2') const post = Obv() const indexingProgress = Notify() + const indexingActive = Obv().set(0) + let abortLogStreamForIndexes = null + const compacting = Obv().set(false) const hmacKey = null const stateFeedsReady = Obv().set(false) const state = {} @@ -526,8 +535,27 @@ exports.init = function (sbot, config) { ) } - function clearIndexes() { - for (const indexName in indexes) indexes[indexName].remove(() => {}) + function resetAllIndexes(cb) { + const done = multicb({ pluck: 1 }) + if (abortLogStreamForIndexes) { + abortLogStreamForIndexes() + abortLogStreamForIndexes = null + } + for (const indexName in indexes) { + indexes[indexName].reset(done()) + } + done(function onResetAllIndexesDone() { + cb() + updateIndexes() + }) + } + + function restartUpdateIndexes() { + if (abortLogStreamForIndexes) { + abortLogStreamForIndexes() + abortLogStreamForIndexes = null + } + indexesStateLoaded.onReady(updateIndexes) } function registerIndex(Index) { @@ -541,6 +569,7 @@ exports.init = function (sbot, config) { } function updateIndexes() { + if (!log.compactionProgress.value.done) return const start = Date.now() const indexesArr = Object.values(indexes) @@ -551,7 +580,8 @@ exports.init = function (sbot, config) { ) debug(`lowest offset for all indexes is ${lowestOffset}`) - log.stream({ gt: lowestOffset }).pipe({ + indexingActive.set(indexingActive.value + 1) + const sink = log.stream({ gt: lowestOffset }).pipe({ paused: false, write(record) { const buf = record.value @@ -560,13 +590,16 @@ exports.init = function (sbot, config) { }, end() { debug(`updateIndexes() scan time: ${Date.now() - start}ms`) + abortLogStreamForIndexes = null const doneFlushing = multicb({ pluck: 1 }) for (const idx of indexesArr) idx.flush(doneFlushing()) doneFlushing((err) => { // prettier-ignore if (err) console.error(clarify(err, 'updateIndexes() failed to flush indexes')) + indexingActive.set(indexingActive.value - 1) debug('updateIndexes() live streaming') - log.stream({ gt: indexes['base'].offset.value, live: true }).pipe({ + const gt = indexes['base'].offset.value + const sink = log.stream({ gt, live: true }).pipe({ paused: false, write(record) { const buf = record.value @@ -574,9 +607,11 @@ exports.init = function (sbot, config) { for (const idx of indexesArr) idx.onRecord(record, true, pValue) }, }) + abortLogStreamForIndexes = sink.source.abort.bind(sink.source) }) }, }) + abortLogStreamForIndexes = sink.source.abort.bind(sink.source) } function onDrain(indexName, cb) { @@ -638,10 +673,18 @@ exports.init = function (sbot, config) { onceWhen( sbot.db2migrate.synchronized, (isSynced) => isSynced === true, - () => onDrain(cb) + next ) } else { - onDrain(cb) + next() + } + + function next() { + onceWhen( + compacting, + (isCompacting) => isCompacting === false, + () => onDrain(cb) + ) } }) @@ -686,6 +729,7 @@ exports.init = function (sbot, config) { const reindexingLock = mutexify() function reindexEncrypted(cb) { + indexingActive.set(indexingActive.value + 1) reindexingLock((unlock) => { const offsets = privateIndex.missingDecrypt() const keysIndex = indexes['keys'] @@ -731,6 +775,7 @@ exports.init = function (sbot, config) { done((err) => { // prettier-ignore if (err) return unlock(cb, clarify(err, 'reindexEncrypted() failed to force-flush indexes')) + indexingActive.set(indexingActive.value - 1) unlock(cb) }) }) @@ -738,6 +783,65 @@ exports.init = function (sbot, config) { }) } + function notYetZero(obz, fn, ...args) { + if (obz.value > 0) { + onceWhen(obz, (x) => x === 0, fn.bind(null, ...args)) + return true + } else { + return false + } + } + + function compact(cb) { + if (notYetZero(jitdb.indexingActive, compact, cb)) return + if (notYetZero(jitdb.queriesActive, compact, cb)) return + if (notYetZero(indexingActive, compact, cb)) return + + fs.closeSync(fs.openSync(resetLevelPath(dir), 'w')) + fs.closeSync(fs.openSync(resetPrivatePath(dir), 'w')) + fs.closeSync(fs.openSync(reindexJitPath(dir), 'w')) + log.compact(function onLogCompacted(err) { + if (err) cb(clarify(err, 'ssb-db2 compact() failed with the log')) + else cb() + }) + } + + let compactStartOffset = null + log.compactionProgress((stats) => { + if (typeof stats.startOffset === 'number' && compactStartOffset === null) { + compactStartOffset = stats.startOffset + } + + if (compacting.value !== !stats.done) compacting.set(!stats.done) + + if (stats.done) { + if (stats.sizeDiff > 0) { + if (fs.existsSync(resetLevelPath(dir))) { + resetAllIndexes(() => { + rimraf.sync(resetLevelPath(dir)) + }) + } + if (fs.existsSync(resetPrivatePath(dir))) { + privateIndex.reset(() => { + rimraf.sync(resetPrivatePath(dir)) + }) + } + if (fs.existsSync(reindexJitPath(dir))) { + jitdb.reindex(compactStartOffset || 0, (err) => { + if (err) console.error('ssb-db2 reindex jitdb after compact', err) + rimraf.sync(reindexJitPath(dir)) + }) + } + compactStartOffset = null + } else { + rimraf.sync(resetLevelPath(dir)) + rimraf.sync(resetPrivatePath(dir)) + rimraf.sync(reindexJitPath(dir)) + restartUpdateIndexes() + } + } + }) + return (self = { // Public API: get, @@ -755,6 +859,7 @@ exports.init = function (sbot, config) { getStatus: () => status.obv, operators, post, + compact, reindexEncrypted, indexingProgress: () => indexingProgress.listen(), @@ -774,7 +879,6 @@ exports.init = function (sbot, config) { getState: () => state, getIndexes: () => indexes, getIndex: (index) => indexes[index], - clearIndexes, onDrain, getJITDB: () => jitdb, }) diff --git a/defaults.js b/defaults.js index 0809de12..66ebd362 100644 --- a/defaults.js +++ b/defaults.js @@ -9,6 +9,12 @@ exports.flumePath = (dir) => path.join(dir, 'flume') exports.oldLogPath = (dir) => path.join(dir, 'flume', 'log.offset') exports.newLogPath = (dir) => path.join(dir, 'db2', 'log.bipf') exports.indexesPath = (dir) => path.join(dir, 'db2', 'indexes') +exports.resetLevelPath = (dir) => + path.join(dir, 'db2', 'post-compact-reset-level') +exports.resetPrivatePath = (dir) => + path.join(dir, 'db2', 'post-compact-reset-private') +exports.reindexJitPath = (dir) => + path.join(dir, 'db2', 'post-compact-reindex-jit') exports.jitIndexesPath = (dir) => path.join(dir, 'db2', 'jit') exports.tooHotOpts = (config) => config.db2 diff --git a/indexes/about-self.js b/indexes/about-self.js index ef92a465..3461eb52 100644 --- a/indexes/about-self.js +++ b/indexes/about-self.js @@ -68,6 +68,10 @@ module.exports = class AboutSelf extends Plugin { return true } + reset() { + this.profiles = {} + } + updateProfileData(author, content) { let profile = this.profiles[author] || {} diff --git a/indexes/base.js b/indexes/base.js index 6e8c8dbc..7bd06da9 100644 --- a/indexes/base.js +++ b/indexes/base.js @@ -64,6 +64,10 @@ module.exports = function makeBaseIndex(privateIndex) { this.privateIndex.saveIndexes(cb) } + reset() { + this.authorLatest.clear() + } + // pull-stream where each item is { key, value } // where key is the authorId and value is { offset, sequence } getAllLatest() { diff --git a/indexes/plugin.js b/indexes/plugin.js index 47986485..80a31002 100644 --- a/indexes/plugin.js +++ b/indexes/plugin.js @@ -139,6 +139,18 @@ module.exports = class Plugin { }) } }) + + const subClassReset = this.reset + this.reset = (cb) => { + if (subClassReset) subClassReset.call(this) + this.level.clear(() => { + processedSeq = 0 + processedOffset = -1 + this.batch = [] + this.offset.set(-1) + cb() + }) + } } get stateLoaded() { @@ -160,10 +172,6 @@ module.exports = class Plugin { else return undefined } - remove(...args) { - this.level.clear(...args) - } - close(cb) { this.level.close(cb) } diff --git a/indexes/private.js b/indexes/private.js index f418be81..0bd90185 100644 --- a/indexes/private.js +++ b/indexes/private.js @@ -224,11 +224,19 @@ module.exports = function (dir, sbot, config) { return encrypted.filter((x) => !canDecryptSet.has(x)) } + function reset(cb) { + encrypted = [] + canDecrypt = [] + latestOffset.set(-1) + saveIndexes(cb) + } + return { latestOffset, decrypt, missingDecrypt, saveIndexes, + reset, stateLoaded: stateLoaded.promise, } } diff --git a/package.json b/package.json index 26f84282..232fafe5 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,7 @@ "operators/*.js" ], "dependencies": { - "async-append-only-log": "^4.0.0", + "async-append-only-log": "^4.2.2", "atomic-file-rw": "^0.2.1", "binary-search-bounds": "^2.0.4", "bipf": "^1.5.4", @@ -26,7 +26,7 @@ "flumecodec": "0.0.1", "flumelog-offset": "3.4.4", "hoox": "0.0.1", - "jitdb": "^6.5.0", + "jitdb": "^6.6.0", "level": "^6.0.1", "level-codec": "^9.0.2", "lodash.debounce": "^4.0.8", @@ -35,7 +35,6 @@ "mutexify": "^1.3.1", "obz": "^1.1.0", "p-defer": "^3.0.0", - "promisify-4loc": "1.0.0", "pull-cat": "^1.1.11", "pull-cont": "^0.1.1", "pull-drain-gently": "^1.1.0", diff --git a/test/compaction-resume.js b/test/compaction-resume.js new file mode 100644 index 00000000..2f5325e7 --- /dev/null +++ b/test/compaction-resume.js @@ -0,0 +1,102 @@ +// SPDX-FileCopyrightText: 2022 Anders Rune Jensen +// +// SPDX-License-Identifier: Unlicense + +const test = require('tape') +const ssbKeys = require('ssb-keys') +const fs = require('fs') +const path = require('path') +const rimraf = require('rimraf') +const mkdirp = require('mkdirp') +const pify = require('util').promisify +const SecretStack = require('secret-stack') +const caps = require('ssb-caps') +const {resetLevelPath, resetPrivatePath, reindexJitPath} = require('../defaults') + +const dir = '/tmp/ssb-db2-compaction-resume' + +rimraf.sync(dir) +mkdirp.sync(dir) + +test('compaction resumes automatically after a crash', async (t) => { + t.timeoutAfter(20e3) + + const keys = ssbKeys.loadOrCreateSync(path.join(dir, 'secret')) + let sbot = SecretStack({ appKey: caps.shs }).use(require('../')).call(null, { + keys, + path: dir, + }) + + const TOTAL = 1000 + const msgKeys = [] + for (let i = 0; i < TOTAL; i += 1) { + const msg = await pify(sbot.db.publish)({ type: 'post', text: `hi ${i}` }) + msgKeys.push(msg.key) + } + t.pass('published messages') + + await pify(sbot.db.onDrain)('keys') + const oldLogSize = sbot.db.getStatus().value.log + + const keysIndex = sbot.db.getIndex('keys') + const seq3 = await pify(keysIndex.getSeq.bind(keysIndex))(msgKeys[3]) + t.equals(seq3, 3, 'seq 3 for msg #3') + + for (let i = 0; i < TOTAL; i += 2) { + await pify(sbot.db.del)(msgKeys[i]) + } + await pify(sbot.db.getLog().onDeletesFlushed)() + t.pass('deleted messages') + + await pify(sbot.close)(true) + t.pass('closed sbot') + + fs.closeSync(fs.openSync(path.join(dir, 'db2', 'log.bipf.compaction'), 'w')) + fs.closeSync(fs.openSync(resetLevelPath(dir), 'w')) + fs.closeSync(fs.openSync(resetPrivatePath(dir), 'w')) + fs.closeSync(fs.openSync(reindexJitPath(dir), 'w')) + t.pass('pretend that compaction was in progress') + + sbot = SecretStack({ appKey: caps.shs }).use(require('../')).call(null, { + keys, + path: dir, + }) + + let newLogSize = 0 + let done = false + sbot.db.getStatus()((stats) => { + if (!stats.log) return + if (stats.log > oldLogSize * 0.6) return + if (newLogSize) return + if (stats.progress !== 1) return + + newLogSize = stats.log + done = true + }) + + await new Promise((resolve) => { + const interval = setInterval(() => { + if (done) { + clearInterval(interval) + resolve() + } + }, 200) + }) + t.pass('compaction started and ended automatically') + + try { + const keysIndex2 = sbot.db.getIndex('keys') + const seq3after = await pify(keysIndex2.getSeq.bind(keysIndex2))(msgKeys[3]) + t.equals(seq3after, 1, 'seq 1 for msg #3 after reindexing') + } catch (err) { + console.log(err) + } + + t.notEquals(oldLogSize, 0, 'old log size is ' + oldLogSize) + t.notEquals(newLogSize, 0, 'new log size is ' + newLogSize) + t.true(newLogSize < oldLogSize * 0.6, 'at most 0.6x smaller') + t.true(newLogSize > oldLogSize * 0.4, 'at least 0.4x smaller') + + await pify(sbot.close)(true) + t.end() +}) diff --git a/test/compaction.js b/test/compaction.js new file mode 100644 index 00000000..017f41a3 --- /dev/null +++ b/test/compaction.js @@ -0,0 +1,211 @@ +// SPDX-FileCopyrightText: 2022 Anders Rune Jensen +// +// SPDX-License-Identifier: Unlicense + +const test = require('tape') +const ssbKeys = require('ssb-keys') +const path = require('path') +const rimraf = require('rimraf') +const mkdirp = require('mkdirp') +const pify = require('util').promisify +const SecretStack = require('secret-stack') +const caps = require('ssb-caps') +const { where, key, toCallback } = require('../operators') +const { onceWhen } = require('../utils') + +test('compaction fills holes and reindexes', async (t) => { + t.timeoutAfter(20e3) + + const dir = '/tmp/ssb-db2-compaction' + + rimraf.sync(dir) + mkdirp.sync(dir) + + const sbot = SecretStack({ appKey: caps.shs }) + .use(require('../')) + .call(null, { + keys: ssbKeys.loadOrCreateSync(path.join(dir, 'secret')), + path: dir, + }) + + const TOTAL = 1000 + const msgKeys = [] + console.time('publish') + for (let i = 0; i < TOTAL; i += 1) { + const msg = await pify(sbot.db.publish)({ type: 'post', text: `hi ${i}` }) + msgKeys.push(msg.key) + } + t.pass('published messages') + console.timeEnd('publish') + + await pify(sbot.db.onDrain)() + const oldLogSize = sbot.db.getStatus().value.log + + const msg3 = await pify(sbot.db.getMsg)(msgKeys[3]) + t.equals(msg3.value.content.text, 'hi 3') + + const keysIndex = sbot.db.getIndex('keys') + const seq3 = await pify(keysIndex.getSeq.bind(keysIndex))(msgKeys[3]) + t.equals(seq3, 3, 'seq 3 for msg #3') + + console.time('delete') + for (let i = 0; i < TOTAL; i += 2) { + await pify(sbot.db.del)(msgKeys[i]) + } + console.timeEnd('delete') + await pify(sbot.db.getLog().onDeletesFlushed)() + t.pass('deleted messages') + + let newLogSize = 0 + let done = false + sbot.db.getStatus()((stats) => { + if (!stats.log) return + if (stats.log > oldLogSize * 0.6) return + if (newLogSize) return + if (stats.progress !== 1) return + + newLogSize = stats.log + console.timeEnd('reindex') + done = true + }) + + console.time('compact') + await pify(sbot.db.compact)() + console.timeEnd('compact') + console.time('reindex') + + await new Promise((resolve) => { + const interval = setInterval(() => { + if (done) { + clearInterval(interval) + resolve() + } + }, 200) + }) + + const seq3after = await pify(keysIndex.getSeq.bind(keysIndex))(msgKeys[3]) + t.equals(seq3after, 1, 'seq 1 for msg #3 after reindexing') + + t.notEquals(oldLogSize, 0, 'old log size is ' + oldLogSize) + t.notEquals(newLogSize, 0, 'new log size is ' + newLogSize) + t.true(newLogSize < oldLogSize * 0.6, 'at most 0.6x smaller') + t.true(newLogSize > oldLogSize * 0.4, 'at least 0.4x smaller') + + await pify(sbot.close)(true) + t.end() +}) + +test('queries are queued if compaction is in progress', async (t) => { + t.timeoutAfter(20e3) + + const dir = '/tmp/ssb-db2-compaction2' + + rimraf.sync(dir) + mkdirp.sync(dir) + + const sbot = SecretStack({ appKey: caps.shs }) + .use(require('../')) + .call(null, { + keys: ssbKeys.loadOrCreateSync(path.join(dir, 'secret')), + path: dir, + }) + + const TOTAL = 1000 + const msgKeys = [] + for (let i = 0; i < TOTAL; i += 1) { + const msg = await pify(sbot.db.publish)({ type: 'post', text: `hi ${i}` }) + msgKeys.push(msg.key) + } + t.pass('published messages') + + await pify(sbot.db.onDrain)() + + for (let i = 0; i < TOTAL; i += 2) { + await pify(sbot.db.del)(msgKeys[i]) + } + t.pass('deleted messages') + + let compactDoneAt = 0 + let queryDoneAt = 0 + await new Promise((resolve) => { + sbot.db.compact((err) => { + t.error(err, 'no error') + compactDoneAt = Date.now() + if (queryDoneAt > 0) resolve() + }) + + onceWhen( + sbot.db.getLog().compactionProgress, + (stat) => stat.done === false, + () => { + sbot.db.query( + where(key(msgKeys[3])), + toCallback((err, msgs) => { + t.error(err, 'no error') + t.equals(msgs.length, 1) + t.equals(msgs[0].value.content.text, 'hi 3') + queryDoneAt = Date.now() + if (compactDoneAt > 0) resolve() + }) + ) + } + ) + }) + t.true(compactDoneAt < queryDoneAt, 'compaction done before query') + + await pify(sbot.close)(true) + t.end() +}) + +test('post-compaction reindex resets state in memory too', async (t) => { + t.timeoutAfter(20e3) + + const dir = '/tmp/ssb-db2-compaction3' + + rimraf.sync(dir) + mkdirp.sync(dir) + + const author = ssbKeys.loadOrCreateSync(path.join(dir, 'secret')) + + const sbot = SecretStack({ appKey: caps.shs }) + .use(require('../')) + .use(require('../about-self')) + .call(null, { + keys: author, + path: dir, + }) + + const msg1 = await pify(sbot.db.publish)({ + type: 'about', + about: author.id, + name: 'Alice', + }) + t.pass('published name about') + const msg2 = await pify(sbot.db.publish)({ + type: 'about', + about: author.id, + description: 'In Wonderland', + }) + t.pass('published description about') + + await pify(sbot.db.onDrain)('aboutSelf') + + const profileBefore = sbot.db.getIndex('aboutSelf').getProfile(author.id) + t.equal(profileBefore.name, 'Alice') + t.equal(profileBefore.description, 'In Wonderland') + + await pify(sbot.db.del)(msg2.key) + t.pass('deleted description about') + + await pify(sbot.db.compact)() + t.pass('compacted the log') + + await pify(setTimeout)(1000) + + const profileAfter = sbot.db.getIndex('aboutSelf').getProfile(author.id) + t.equal(profileAfter.name, 'Alice') + t.notOk(profileAfter.description) + + await pify(sbot.close)(true) + t.end() +})