From afea20bb9ddbe4206fb22b138425b5169d01fa37 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Fri, 1 Apr 2022 18:27:44 +0300 Subject: [PATCH 1/8] queries are queued until compaction is done --- index.js | 34 ++++++++++++++++++++++++++++++++++ package.json | 4 ++-- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/index.js b/index.js index daedbe5..1b808e3 100644 --- a/index.js +++ b/index.js @@ -40,6 +40,7 @@ module.exports = function (log, indexesPath) { const indexes = {} let isReady = false let waiting = [] + const waitingCompaction = [] const coreIndexNames = ['seq', 'timestamp', 'sequence'] loadIndexes(() => { @@ -82,6 +83,15 @@ module.exports = function (log, indexesPath) { else waiting.push(cb) } + log.compactionProgress((stats) => { + if (stats.done && waitingCompaction.length > 0) { + for (let i = 0, n = waitingCompaction.length; i < n; ++i) { + waitingCompaction[i]() + } + waitingCompaction.length = 0 + } + }) + const B_TIMESTAMP = Buffer.from('timestamp') const B_SEQUENCE = Buffer.from('sequence') const B_VALUE = Buffer.from('value') @@ -1365,6 +1375,13 @@ module.exports = function (log, indexesPath) { } function paginate(operation, seq, limit, descending, onlyOffset, sortBy, cb) { + if (!log.compactionProgress.value.done) { + waitingCompaction.push(() => + paginate(operation, seq, limit, descending, onlyOffset, sortBy, cb) + ) + return + } + onReady(() => { const start = Date.now() executeOperation(operation, (err0, result) => { @@ -1399,6 +1416,13 @@ module.exports = function (log, indexesPath) { } function all(operation, seq, descending, onlyOffset, sortBy, cb) { + if (!log.compactionProgress.value.done) { + waitingCompaction.push(() => + all(operation, seq, descending, onlyOffset, sortBy, cb) + ) + return + } + onReady(() => { const start = Date.now() executeOperation(operation, (err0, result) => { @@ -1431,6 +1455,11 @@ module.exports = function (log, indexesPath) { } function count(operation, seq, descending, cb) { + if (!log.compactionProgress.value.done) { + waitingCompaction.push(() => count(operation, seq, descending, cb)) + return + } + onReady(() => { const start = Date.now() executeOperation(operation, (err0, result) => { @@ -1450,6 +1479,11 @@ module.exports = function (log, indexesPath) { } function prepare(operation, cb) { + if (!log.compactionProgress.value.done) { + waitingCompaction.push(() => prepare(operation, cb)) + return + } + onReady(() => { const start = Date.now() // Update status at the beginning: diff --git a/package.json b/package.json index 5c62aaf..aec308e 100644 --- a/package.json +++ b/package.json @@ -38,10 +38,10 @@ "typedfastbitset": "~0.2.1" }, "peerDependencies": { - "async-append-only-log": "^4.0.0" + "async-append-only-log": "ssb-ngi-pointer/async-append-only-log#compact-reindex" }, "devDependencies": { - "async-append-only-log": "^4.0.0", + "async-append-only-log": "ssb-ngi-pointer/async-append-only-log#compact-reindex", "expose-gc": "^1.0.0", "flumecodec": "0.0.1", "flumelog-offset": "3.4.4", From 8d53a08ff0d6b0c9972516c86d3e901c19caa78a Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Thu, 7 Apr 2022 15:33:37 +0300 Subject: [PATCH 2/8] reindex() affects also core indexes --- index.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/index.js b/index.js index 1b808e3..96f26a0 100644 --- a/index.js +++ b/index.js @@ -1639,9 +1639,11 @@ module.exports = function (log, indexesPath) { push( push.values(Object.entries(indexes)), push.asyncMap(([indexName, index], cb) => { - if (coreIndexNames.includes(indexName)) return cb() - - if (index.lazy) { + if (coreIndexNames.includes(indexName)) { + resetIndex(index) + saveCoreIndex(indexName, index, seq) + cb() + } else if (index.lazy) { loadLazyIndex(indexName, (err) => { if (err) return cb(err) From 28a46f664ca2b33d20bc022ddd9b1bba205bfcb2 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Fri, 8 Apr 2022 10:23:18 +0300 Subject: [PATCH 3/8] add indexingActive() obz API --- index.js | 8 ++++++++ test/add.js | 16 +++++++++++----- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/index.js b/index.js index 96f26a0..108bb73 100644 --- a/index.js +++ b/index.js @@ -12,6 +12,7 @@ const TypedFastBitSet = require('typedfastbitset') const bsb = require('binary-search-bounds') const multicb = require('multicb') const FastPriorityQueue = require('fastpriorityqueue') +const Obv = require('obz') const debug = require('debug')('jitdb') const debugQuery = debug.extend('query') const Status = require('./status') @@ -42,6 +43,8 @@ module.exports = function (log, indexesPath) { let waiting = [] const waitingCompaction = [] const coreIndexNames = ['seq', 'timestamp', 'sequence'] + const indexingActive = Obv() + indexingActive.set(0) loadIndexes(() => { debug('loaded indexes', Object.keys(indexes)) @@ -534,6 +537,7 @@ module.exports = function (log, indexesPath) { const logstreamId = Math.ceil(Math.random() * 1000) debug(`log.stream #${logstreamId} started, to update index ${waitingKey}`) status.update(indexes, indexNamesForStatus) + indexingActive.set(indexingActive.value + 1) log.stream({ gt: index.offset }).pipe({ paused: false, @@ -588,6 +592,7 @@ module.exports = function (log, indexesPath) { status.update(indexes, indexNamesForStatus) status.done(indexNamesForStatus) + indexingActive.set(indexingActive.value - 1) runWaitingIndexLoadCbs(waitingIndexUpdate, waitingKey) @@ -668,6 +673,7 @@ module.exports = function (log, indexesPath) { debug(`log.stream #${logstreamId} started, to create indexes ${waitingKey}`) status.update(indexes, coreIndexNames) status.update(newIndexes, newIndexNames) + indexingActive.set(indexingActive.value + 1) log.stream({}).pipe({ paused: false, @@ -748,6 +754,7 @@ module.exports = function (log, indexesPath) { status.update(newIndexes, newIndexNames) status.done(coreIndexNames) status.done(newIndexNames) + indexingActive.set(indexingActive.value - 1) runWaitingIndexLoadCbs(waitingIndexCreate, waitingKey) @@ -1676,6 +1683,7 @@ module.exports = function (log, indexesPath) { live, status: status.obv, reindex, + indexingActive, // testing indexes, diff --git a/test/add.js b/test/add.js index 82eba33..a4f482d 100644 --- a/test/add.js +++ b/test/add.js @@ -165,7 +165,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => { }) prepareAndRunTest('Update index', dir, (t, db, raf) => { - t.plan(4) + t.plan(9) t.timeoutAfter(5000) const msg = { type: 'post', text: 'Testing!' } let state = validate.initial() @@ -182,19 +182,25 @@ prepareAndRunTest('Update index', dir, (t, db, raf) => { }, } + const expectedIndexingActive = [0, 1 /* seq */, 0, 1 /* type_post */, 0] const expectedStatus = [{ seq: -1, timestamp: -1, sequence: -1 }, {}] db.status((stats) => { - t.deepEqual(stats, expectedStatus.shift()) - if (expectedStatus.length === 0) t.end() + t.deepEqual(stats, expectedStatus.shift(), 'status matches') + if (!expectedStatus.length && !expectedIndexingActive.length) t.end() }) addMsg(state.queue[0].value, raf, (err, msg1) => { db.all(typeQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 1) + t.equal(results.length, 1, '1 message') + + db.indexingActive((x) => { + t.equals(x, expectedIndexingActive.shift(), 'indexingActive matches') + if (!expectedStatus.length && !expectedIndexingActive.length) t.end() + }) addMsg(state.queue[1].value, raf, (err, msg1) => { db.all(typeQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 2) + t.equal(results.length, 2, '2 messages') }) }) }) From 05491babb8ca332cdcf8b3dcb90826179a919a06 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Fri, 8 Apr 2022 10:32:32 +0300 Subject: [PATCH 4/8] add queriesActive() obz API --- index.js | 55 ++++++++++++++++++++++++++--------------------- test/operators.js | 6 ++++++ 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/index.js b/index.js index 108bb73..ead8c66 100644 --- a/index.js +++ b/index.js @@ -44,7 +44,9 @@ module.exports = function (log, indexesPath) { const waitingCompaction = [] const coreIndexNames = ['seq', 'timestamp', 'sequence'] const indexingActive = Obv() + const queriesActive = Obv() indexingActive.set(0) + queriesActive.set(0) loadIndexes(() => { debug('loaded indexes', Object.keys(indexes)) @@ -1389,6 +1391,7 @@ module.exports = function (log, indexesPath) { return } + queriesActive.set(queriesActive.value + 1) onReady(() => { const start = Date.now() executeOperation(operation, (err0, result) => { @@ -1403,19 +1406,18 @@ module.exports = function (log, indexesPath) { onlyOffset, sortBy, (err1, answer) => { - if (err1) cb(err1) - else { - answer.duration = Date.now() - start - if (debugQuery.enabled) - debugQuery( - `paginate(${getNameFromOperation( - operation - )}), seq: ${seq}, limit: ${limit}: ${ - answer.duration - }ms, total messages: ${answer.total}`.replace(/%/g, '%% ') - ) - cb(null, answer) - } + queriesActive.set(queriesActive.value - 1) + if (err1) return cb(err1) + answer.duration = Date.now() - start + if (debugQuery.enabled) + debugQuery( + `paginate(${getNameFromOperation( + operation + )}), seq: ${seq}, limit: ${limit}: ${ + answer.duration + }ms, total messages: ${answer.total}`.replace(/%/g, '%% ') + ) + cb(null, answer) } ) }) @@ -1430,6 +1432,7 @@ module.exports = function (log, indexesPath) { return } + queriesActive.set(queriesActive.value + 1) onReady(() => { const start = Date.now() executeOperation(operation, (err0, result) => { @@ -1444,17 +1447,16 @@ module.exports = function (log, indexesPath) { onlyOffset, sortBy, (err1, answer) => { - if (err1) cb(err1) - else { - answer.duration = Date.now() - start - if (debugQuery.enabled) - debugQuery( - `all(${getNameFromOperation(operation)}): ${ - answer.duration - }ms, total messages: ${answer.total}`.replace(/%/g, '%% ') - ) - cb(null, answer.results) - } + queriesActive.set(queriesActive.value - 1) + if (err1) return cb(err1) + answer.duration = Date.now() - start + if (debugQuery.enabled) + debugQuery( + `all(${getNameFromOperation(operation)}): ${ + answer.duration + }ms, total messages: ${answer.total}`.replace(/%/g, '%% ') + ) + cb(null, answer.results) } ) }) @@ -1467,9 +1469,11 @@ module.exports = function (log, indexesPath) { return } + queriesActive.set(queriesActive.value + 1) onReady(() => { const start = Date.now() executeOperation(operation, (err0, result) => { + queriesActive.set(queriesActive.value - 1) if (err0) return cb(err0) const [bitset] = result const total = countBitsetSlice(bitset, seq, descending) @@ -1491,6 +1495,7 @@ module.exports = function (log, indexesPath) { return } + queriesActive.set(queriesActive.value + 1) onReady(() => { const start = Date.now() // Update status at the beginning: @@ -1506,6 +1511,7 @@ module.exports = function (log, indexesPath) { status.update(indexesToReportStatus, indexNamesToReportStatus) // Build indexes: executeOperation(operation, (err) => { + queriesActive.set(queriesActive.value - 1) if (err) return cb(err) const duration = Date.now() - start cb(null, duration) @@ -1684,6 +1690,7 @@ module.exports = function (log, indexesPath) { status: status.obv, reindex, indexingActive, + queriesActive, // testing indexes, diff --git a/test/operators.js b/test/operators.js index f5074a1..f336162 100644 --- a/test/operators.js +++ b/test/operators.js @@ -625,6 +625,11 @@ prepareAndRunTest('not operator', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (e1, msg1) => { addMsg(state.queue[1].value, raf, (e2, msg2) => { + const expectedQueriesActive = [0, 1, 0] + db.queriesActive((x) => { + t.equals(x, expectedQueriesActive.shift(), 'queriesActive matches') + }) + pull( query( fromDB(db), @@ -639,6 +644,7 @@ prepareAndRunTest('not operator', dir, (t, db, raf) => { t.equal(msgs.length, 1, 'page has one messages') t.equal(msgs[0].value.author, bob.id) t.equal(msgs[0].value.content.type, 'post') + t.equal(expectedQueriesActive.length, 0) t.end() }) ) From 5c78892f6a196333fb3d9154b6fa4c757eec3657 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Fri, 8 Apr 2022 16:18:58 +0300 Subject: [PATCH 5/8] tiny refactor --- index.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/index.js b/index.js index ead8c66..38115b8 100644 --- a/index.js +++ b/index.js @@ -43,10 +43,8 @@ module.exports = function (log, indexesPath) { let waiting = [] const waitingCompaction = [] const coreIndexNames = ['seq', 'timestamp', 'sequence'] - const indexingActive = Obv() - const queriesActive = Obv() - indexingActive.set(0) - queriesActive.set(0) + const indexingActive = Obv().set(0) + const queriesActive = Obv().set(0) loadIndexes(() => { debug('loaded indexes', Object.keys(indexes)) From 3fe625ded891f058ac410958c02fc9f12ac18a56 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Thu, 14 Apr 2022 18:40:58 +0300 Subject: [PATCH 6/8] refactor a for-loop --- index.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/index.js b/index.js index 38115b8..dc118f4 100644 --- a/index.js +++ b/index.js @@ -88,9 +88,7 @@ module.exports = function (log, indexesPath) { log.compactionProgress((stats) => { if (stats.done && waitingCompaction.length > 0) { - for (let i = 0, n = waitingCompaction.length; i < n; ++i) { - waitingCompaction[i]() - } + for (const cb of waitingCompaction) cb() waitingCompaction.length = 0 } }) From 824d40fe86825a3fb95a784a2d4ef8115cc3c457 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Wed, 20 Apr 2022 14:16:05 +0300 Subject: [PATCH 7/8] use AAOL 4.2.1 --- package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index aec308e..8a3520b 100644 --- a/package.json +++ b/package.json @@ -38,10 +38,10 @@ "typedfastbitset": "~0.2.1" }, "peerDependencies": { - "async-append-only-log": "ssb-ngi-pointer/async-append-only-log#compact-reindex" + "async-append-only-log": "^4.2.1" }, "devDependencies": { - "async-append-only-log": "ssb-ngi-pointer/async-append-only-log#compact-reindex", + "async-append-only-log": "^4.2.1", "expose-gc": "^1.0.0", "flumecodec": "0.0.1", "flumelog-offset": "3.4.4", From be2f51ab45d729d4d548a023ef5bb5b4566eca98 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Wed, 20 Apr 2022 15:02:50 +0300 Subject: [PATCH 8/8] remove tests for jitdb.status that are flaky --- test/add.js | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/test/add.js b/test/add.js index a4f482d..fdf620e 100644 --- a/test/add.js +++ b/test/add.js @@ -165,7 +165,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => { }) prepareAndRunTest('Update index', dir, (t, db, raf) => { - t.plan(9) + t.plan(7) t.timeoutAfter(5000) const msg = { type: 'post', text: 'Testing!' } let state = validate.initial() @@ -183,11 +183,6 @@ prepareAndRunTest('Update index', dir, (t, db, raf) => { } const expectedIndexingActive = [0, 1 /* seq */, 0, 1 /* type_post */, 0] - const expectedStatus = [{ seq: -1, timestamp: -1, sequence: -1 }, {}] - db.status((stats) => { - t.deepEqual(stats, expectedStatus.shift(), 'status matches') - if (!expectedStatus.length && !expectedIndexingActive.length) t.end() - }) addMsg(state.queue[0].value, raf, (err, msg1) => { db.all(typeQuery, 0, false, false, 'declared', (err, results) => { @@ -195,7 +190,6 @@ prepareAndRunTest('Update index', dir, (t, db, raf) => { db.indexingActive((x) => { t.equals(x, expectedIndexingActive.shift(), 'indexingActive matches') - if (!expectedStatus.length && !expectedIndexingActive.length) t.end() }) addMsg(state.queue[1].value, raf, (err, msg1) => {