Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add compact() API and auto-rebuild indexes #339

Merged
merged 9 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,17 @@ sbot.db.getIndex('myindex').myOwnMethodToGetStuff()
Or you can wrap that in a secret-stack plugin (in the example above,
`exports.init` should return an object with the API functions).

There are other special methods you can implement in order to add
"hooks" in the `Plugin` subclass:
There are other optional methods you can implement in the `Plugin` subclass:

- `onLoaded(cb)`: called once, at startup, when the index is
successfully loaded from disk and is ready to receive queries
- `onFlush(cb)`: called when the leveldb index is about to be saved to
- `onLoaded(cb)`: a hook called once, at startup, when the index is successfully
loaded from disk and is ready to receive queries
- `onFlush(cb)`: a hook called when the leveldb index is about to be saved to
disk
- `indexesContent()`: method used when reindexing private group
messages to determine if the leveldb index needs to be updated for
decrypted messages. The default method returns true.
- `indexesContent()`: method used when reindexing private group messages to
determine if the leveldb index needs to be updated for decrypted messages. The
default method returns true.
- `reset()`: a method that you can use to reset in-memory state that you might
have in your plugin, when the leveldb index is about to be rebuilt.

### Compatibility plugins

Expand Down Expand Up @@ -268,7 +269,7 @@ const sbot = SecretStack({ caps })
The following is a list of modules that works well with ssb-db2:

- [ssb-threads] for working with post messages as threads
- [ssb-suggest-lite] for fetching profiles of authors
- [ssb-suggest-lite] for fetching profiles of authors
- [ssb-friends] for working with the social graph
- [ssb-search2] for full-text searching
- [ssb-crut] for working with records that can be modified
Expand Down Expand Up @@ -463,11 +464,16 @@ Use [JITDB's prepare](https://github.com/ssb-ngi-pointer/jitdb/#prepareoperation

Waits for the index with name `indexName` to be in sync with the main
log and then call `cb` with no arguments. If `indexName` is not
provided, the base index will be used.
provided, the base index will be used.
staltz marked this conversation as resolved.
Show resolved Hide resolved

The reason we do it this way is that indexes are updated
asynchronously in order to not block message writing.

### compact(cb)

Compacts the log (filling in the blanks left by deleted messages and optimizing
space) and then rebuilds indexes.

## Configuration

You can use ssb-config parameters to configure some aspects of ssb-db2:
Expand Down
122 changes: 113 additions & 9 deletions db.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

const os = require('os')
const fs = require('fs')
const path = require('path')
const rimraf = require('rimraf')
const mkdirp = require('mkdirp')
Expand All @@ -21,7 +22,6 @@ const pull = require('pull-stream')
const paramap = require('pull-paramap')
const Ref = require('ssb-ref')
const Obv = require('obz')
const promisify = require('promisify-4loc')
staltz marked this conversation as resolved.
Show resolved Hide resolved
const jitdbOperators = require('jitdb/operators')
const bendyButt = require('ssb-bendy-butt')
const JITDB = require('jitdb')
Expand All @@ -30,7 +30,12 @@ const multicb = require('multicb')
const mutexify = require('mutexify')

const operators = require('./operators')
const { jitIndexesPath } = require('./defaults')
const {
jitIndexesPath,
resetLevelPath,
resetPrivatePath,
reindexJitPath,
} = require('./defaults')
const { onceWhen, ReadyGate } = require('./utils')
const DebouncingBatchAdd = require('./debounce-batch')
const Log = require('./log')
Expand Down Expand Up @@ -62,6 +67,7 @@ exports.manifest = {
addBatch: 'async',
addOOOBatch: 'async',
getStatus: 'sync',
compact: 'async',
indexingProgress: 'source',

// `query` should be `sync`, but secret-stack is automagically converting it
Expand Down Expand Up @@ -92,6 +98,9 @@ exports.init = function (sbot, config) {
const debug = Debug('ssb:db2')
const post = Obv()
const indexingProgress = Notify()
const indexingActive = Obv().set(0)
let abortLogStreamForIndexes = null
const compacting = Obv().set(false)
const hmacKey = null
const stateFeedsReady = Obv().set(false)
const state = {}
Expand Down Expand Up @@ -526,8 +535,27 @@ exports.init = function (sbot, config) {
)
}

function clearIndexes() {
for (const indexName in indexes) indexes[indexName].remove(() => {})
function resetAllIndexes(cb) {
const done = multicb({ pluck: 1 })
if (abortLogStreamForIndexes) {
abortLogStreamForIndexes()
abortLogStreamForIndexes = null
}
for (const indexName in indexes) {
indexes[indexName].reset(done())
}
done(function onResetAllIndexesDone() {
cb()
updateIndexes()
})
}

function restartUpdateIndexes() {
if (abortLogStreamForIndexes) {
abortLogStreamForIndexes()
abortLogStreamForIndexes = null
}
indexesStateLoaded.onReady(updateIndexes)
}

function registerIndex(Index) {
Expand All @@ -541,6 +569,7 @@ exports.init = function (sbot, config) {
}

function updateIndexes() {
if (!log.compactionProgress.value.done) return
const start = Date.now()

const indexesArr = Object.values(indexes)
Expand All @@ -551,7 +580,8 @@ exports.init = function (sbot, config) {
)
debug(`lowest offset for all indexes is ${lowestOffset}`)

log.stream({ gt: lowestOffset }).pipe({
indexingActive.set(indexingActive.value + 1)
const sink = log.stream({ gt: lowestOffset }).pipe({
paused: false,
write(record) {
const buf = record.value
Expand All @@ -560,23 +590,28 @@ exports.init = function (sbot, config) {
},
end() {
debug(`updateIndexes() scan time: ${Date.now() - start}ms`)
abortLogStreamForIndexes = null
const doneFlushing = multicb({ pluck: 1 })
for (const idx of indexesArr) idx.flush(doneFlushing())
doneFlushing((err) => {
// prettier-ignore
if (err) console.error(clarify(err, 'updateIndexes() failed to flush indexes'))
indexingActive.set(indexingActive.value - 1)
debug('updateIndexes() live streaming')
log.stream({ gt: indexes['base'].offset.value, live: true }).pipe({
const gt = indexes['base'].offset.value
const sink = log.stream({ gt, live: true }).pipe({
paused: false,
write(record) {
const buf = record.value
const pValue = buf ? bipf.seekKey2(buf, 0, BIPF_VALUE, 0) : -1
for (const idx of indexesArr) idx.onRecord(record, true, pValue)
},
})
abortLogStreamForIndexes = sink.source.abort.bind(sink.source)
})
},
})
abortLogStreamForIndexes = sink.source.abort.bind(sink.source)
}

function onDrain(indexName, cb) {
Expand Down Expand Up @@ -638,10 +673,18 @@ exports.init = function (sbot, config) {
onceWhen(
sbot.db2migrate.synchronized,
(isSynced) => isSynced === true,
() => onDrain(cb)
next
)
} else {
onDrain(cb)
next()
}

function next() {
onceWhen(
compacting,
(isCompacting) => isCompacting === false,
() => onDrain(cb)
)
}
})

Expand Down Expand Up @@ -686,6 +729,7 @@ exports.init = function (sbot, config) {
const reindexingLock = mutexify()

function reindexEncrypted(cb) {
indexingActive.set(indexingActive.value + 1)
reindexingLock((unlock) => {
const offsets = privateIndex.missingDecrypt()
const keysIndex = indexes['keys']
Expand Down Expand Up @@ -731,13 +775,73 @@ exports.init = function (sbot, config) {
done((err) => {
// prettier-ignore
if (err) return unlock(cb, clarify(err, 'reindexEncrypted() failed to force-flush indexes'))
indexingActive.set(indexingActive.value - 1)
unlock(cb)
})
})
)
})
}

function notYetZero(obz, fn, ...args) {
if (obz.value > 0) {
onceWhen(obz, (x) => x === 0, fn.bind(null, ...args))
return true
} else {
return false
}
}

function compact(cb) {
if (notYetZero(jitdb.indexingActive, compact, cb)) return
if (notYetZero(jitdb.queriesActive, compact, cb)) return
if (notYetZero(indexingActive, compact, cb)) return

fs.closeSync(fs.openSync(resetLevelPath(dir), 'w'))
fs.closeSync(fs.openSync(resetPrivatePath(dir), 'w'))
fs.closeSync(fs.openSync(reindexJitPath(dir), 'w'))
staltz marked this conversation as resolved.
Show resolved Hide resolved
log.compact(function onLogCompacted(err) {
if (err) cb(clarify(err, 'ssb-db2 compact() failed with the log'))
else cb()
})
}

let compactStartOffset = null
log.compactionProgress((stats) => {
if (typeof stats.startOffset === 'number' && compactStartOffset === null) {
compactStartOffset = stats.startOffset
}

if (compacting.value !== !stats.done) compacting.set(!stats.done)

if (stats.done) {
if (stats.sizeDiff > 0) {
if (fs.existsSync(resetLevelPath(dir))) {
resetAllIndexes(() => {
rimraf.sync(resetLevelPath(dir))
})
}
if (fs.existsSync(resetPrivatePath(dir))) {
privateIndex.reset(() => {
rimraf.sync(resetPrivatePath(dir))
})
}
if (fs.existsSync(reindexJitPath(dir))) {
jitdb.reindex(compactStartOffset || 0, (err) => {
if (err) console.error('ssb-db2 reindex jitdb after compact', err)
rimraf.sync(reindexJitPath(dir))
})
}
compactStartOffset = null
} else {
rimraf.sync(resetLevelPath(dir))
rimraf.sync(resetPrivatePath(dir))
rimraf.sync(reindexJitPath(dir))
restartUpdateIndexes()
staltz marked this conversation as resolved.
Show resolved Hide resolved
}
}
})

return (self = {
// Public API:
get,
Expand All @@ -755,6 +859,7 @@ exports.init = function (sbot, config) {
getStatus: () => status.obv,
operators,
post,
compact,
reindexEncrypted,
indexingProgress: () => indexingProgress.listen(),

Expand All @@ -774,7 +879,6 @@ exports.init = function (sbot, config) {
getState: () => state,
getIndexes: () => indexes,
getIndex: (index) => indexes[index],
clearIndexes,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was something I used in browser core for testing. I don't mind removing this. It's a thin wrapper around indexes reset anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to bring it back?

onDrain,
getJITDB: () => jitdb,
})
Expand Down
6 changes: 6 additions & 0 deletions defaults.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ exports.flumePath = (dir) => path.join(dir, 'flume')
exports.oldLogPath = (dir) => path.join(dir, 'flume', 'log.offset')
exports.newLogPath = (dir) => path.join(dir, 'db2', 'log.bipf')
exports.indexesPath = (dir) => path.join(dir, 'db2', 'indexes')
exports.resetLevelPath = (dir) =>
path.join(dir, 'db2', 'post-compact-reset-level')
exports.resetPrivatePath = (dir) =>
path.join(dir, 'db2', 'post-compact-reset-private')
exports.reindexJitPath = (dir) =>
path.join(dir, 'db2', 'post-compact-reindex-jit')
exports.jitIndexesPath = (dir) => path.join(dir, 'db2', 'jit')
exports.tooHotOpts = (config) =>
config.db2
Expand Down
4 changes: 4 additions & 0 deletions indexes/about-self.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ module.exports = class AboutSelf extends Plugin {
return true
}

reset() {
this.profiles = {}
}

updateProfileData(author, content) {
let profile = this.profiles[author] || {}

Expand Down
4 changes: 4 additions & 0 deletions indexes/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ module.exports = function makeBaseIndex(privateIndex) {
this.privateIndex.saveIndexes(cb)
}

reset() {
this.authorLatest.clear()
}

// pull-stream where each item is { key, value }
// where key is the authorId and value is { offset, sequence }
getAllLatest() {
Expand Down
16 changes: 12 additions & 4 deletions indexes/plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ module.exports = class Plugin {
})
}
})

const subClassReset = this.reset
this.reset = (cb) => {
if (subClassReset) subClassReset.call(this)
this.level.clear(() => {
processedSeq = 0
processedOffset = -1
this.batch = []
this.offset.set(-1)
cb()
})
}
}

get stateLoaded() {
Expand All @@ -160,10 +172,6 @@ module.exports = class Plugin {
else return undefined
}

remove(...args) {
this.level.clear(...args)
}

close(cb) {
this.level.close(cb)
}
Expand Down
8 changes: 8 additions & 0 deletions indexes/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,19 @@ module.exports = function (dir, sbot, config) {
return encrypted.filter((x) => !canDecryptSet.has(x))
}

function reset(cb) {
encrypted = []
canDecrypt = []
latestOffset.set(-1)
saveIndexes(cb)
}

return {
latestOffset,
decrypt,
missingDecrypt,
saveIndexes,
reset,
stateLoaded: stateLoaded.promise,
}
}
Expand Down
Loading