diff --git a/index.js b/index.js index daedbe5..dc118f4 100644 --- a/index.js +++ b/index.js @@ -12,6 +12,7 @@ const TypedFastBitSet = require('typedfastbitset') const bsb = require('binary-search-bounds') const multicb = require('multicb') const FastPriorityQueue = require('fastpriorityqueue') +const Obv = require('obz') const debug = require('debug')('jitdb') const debugQuery = debug.extend('query') const Status = require('./status') @@ -40,7 +41,10 @@ module.exports = function (log, indexesPath) { const indexes = {} let isReady = false let waiting = [] + const waitingCompaction = [] const coreIndexNames = ['seq', 'timestamp', 'sequence'] + const indexingActive = Obv().set(0) + const queriesActive = Obv().set(0) loadIndexes(() => { debug('loaded indexes', Object.keys(indexes)) @@ -82,6 +86,13 @@ module.exports = function (log, indexesPath) { else waiting.push(cb) } + log.compactionProgress((stats) => { + if (stats.done && waitingCompaction.length > 0) { + for (const cb of waitingCompaction) cb() + waitingCompaction.length = 0 + } + }) + const B_TIMESTAMP = Buffer.from('timestamp') const B_SEQUENCE = Buffer.from('sequence') const B_VALUE = Buffer.from('value') @@ -524,6 +535,7 @@ module.exports = function (log, indexesPath) { const logstreamId = Math.ceil(Math.random() * 1000) debug(`log.stream #${logstreamId} started, to update index ${waitingKey}`) status.update(indexes, indexNamesForStatus) + indexingActive.set(indexingActive.value + 1) log.stream({ gt: index.offset }).pipe({ paused: false, @@ -578,6 +590,7 @@ module.exports = function (log, indexesPath) { status.update(indexes, indexNamesForStatus) status.done(indexNamesForStatus) + indexingActive.set(indexingActive.value - 1) runWaitingIndexLoadCbs(waitingIndexUpdate, waitingKey) @@ -658,6 +671,7 @@ module.exports = function (log, indexesPath) { debug(`log.stream #${logstreamId} started, to create indexes ${waitingKey}`) status.update(indexes, coreIndexNames) status.update(newIndexes, newIndexNames) + indexingActive.set(indexingActive.value + 1) log.stream({}).pipe({ paused: false, @@ -738,6 +752,7 @@ module.exports = function (log, indexesPath) { status.update(newIndexes, newIndexNames) status.done(coreIndexNames) status.done(newIndexNames) + indexingActive.set(indexingActive.value - 1) runWaitingIndexLoadCbs(waitingIndexCreate, waitingKey) @@ -1365,6 +1380,14 @@ module.exports = function (log, indexesPath) { } function paginate(operation, seq, limit, descending, onlyOffset, sortBy, cb) { + if (!log.compactionProgress.value.done) { + waitingCompaction.push(() => + paginate(operation, seq, limit, descending, onlyOffset, sortBy, cb) + ) + return + } + + queriesActive.set(queriesActive.value + 1) onReady(() => { const start = Date.now() executeOperation(operation, (err0, result) => { @@ -1379,19 +1402,18 @@ module.exports = function (log, indexesPath) { onlyOffset, sortBy, (err1, answer) => { - if (err1) cb(err1) - else { - answer.duration = Date.now() - start - if (debugQuery.enabled) - debugQuery( - `paginate(${getNameFromOperation( - operation - )}), seq: ${seq}, limit: ${limit}: ${ - answer.duration - }ms, total messages: ${answer.total}`.replace(/%/g, '%% ') - ) - cb(null, answer) - } + queriesActive.set(queriesActive.value - 1) + if (err1) return cb(err1) + answer.duration = Date.now() - start + if (debugQuery.enabled) + debugQuery( + `paginate(${getNameFromOperation( + operation + )}), seq: ${seq}, limit: ${limit}: ${ + answer.duration + }ms, total messages: ${answer.total}`.replace(/%/g, '%% ') + ) + cb(null, answer) } ) }) @@ -1399,6 +1421,14 @@ module.exports = function (log, indexesPath) { } function all(operation, seq, descending, onlyOffset, sortBy, cb) { + if (!log.compactionProgress.value.done) { + waitingCompaction.push(() => + all(operation, seq, descending, onlyOffset, sortBy, cb) + ) + return + } + + queriesActive.set(queriesActive.value + 1) onReady(() => { const start = Date.now() executeOperation(operation, (err0, result) => { @@ -1413,17 +1443,16 @@ module.exports = function (log, indexesPath) { onlyOffset, sortBy, (err1, answer) => { - if (err1) cb(err1) - else { - answer.duration = Date.now() - start - if (debugQuery.enabled) - debugQuery( - `all(${getNameFromOperation(operation)}): ${ - answer.duration - }ms, total messages: ${answer.total}`.replace(/%/g, '%% ') - ) - cb(null, answer.results) - } + queriesActive.set(queriesActive.value - 1) + if (err1) return cb(err1) + answer.duration = Date.now() - start + if (debugQuery.enabled) + debugQuery( + `all(${getNameFromOperation(operation)}): ${ + answer.duration + }ms, total messages: ${answer.total}`.replace(/%/g, '%% ') + ) + cb(null, answer.results) } ) }) @@ -1431,9 +1460,16 @@ module.exports = function (log, indexesPath) { } function count(operation, seq, descending, cb) { + if (!log.compactionProgress.value.done) { + waitingCompaction.push(() => count(operation, seq, descending, cb)) + return + } + + queriesActive.set(queriesActive.value + 1) onReady(() => { const start = Date.now() executeOperation(operation, (err0, result) => { + queriesActive.set(queriesActive.value - 1) if (err0) return cb(err0) const [bitset] = result const total = countBitsetSlice(bitset, seq, descending) @@ -1450,6 +1486,12 @@ module.exports = function (log, indexesPath) { } function prepare(operation, cb) { + if (!log.compactionProgress.value.done) { + waitingCompaction.push(() => prepare(operation, cb)) + return + } + + queriesActive.set(queriesActive.value + 1) onReady(() => { const start = Date.now() // Update status at the beginning: @@ -1465,6 +1507,7 @@ module.exports = function (log, indexesPath) { status.update(indexesToReportStatus, indexNamesToReportStatus) // Build indexes: executeOperation(operation, (err) => { + queriesActive.set(queriesActive.value - 1) if (err) return cb(err) const duration = Date.now() - start cb(null, duration) @@ -1605,9 +1648,11 @@ module.exports = function (log, indexesPath) { push( push.values(Object.entries(indexes)), push.asyncMap(([indexName, index], cb) => { - if (coreIndexNames.includes(indexName)) return cb() - - if (index.lazy) { + if (coreIndexNames.includes(indexName)) { + resetIndex(index) + saveCoreIndex(indexName, index, seq) + cb() + } else if (index.lazy) { loadLazyIndex(indexName, (err) => { if (err) return cb(err) @@ -1640,6 +1685,8 @@ module.exports = function (log, indexesPath) { live, status: status.obv, reindex, + indexingActive, + queriesActive, // testing indexes, diff --git a/package.json b/package.json index 5c62aaf..8a3520b 100644 --- a/package.json +++ b/package.json @@ -38,10 +38,10 @@ "typedfastbitset": "~0.2.1" }, "peerDependencies": { - "async-append-only-log": "^4.0.0" + "async-append-only-log": "^4.2.1" }, "devDependencies": { - "async-append-only-log": "^4.0.0", + "async-append-only-log": "^4.2.1", "expose-gc": "^1.0.0", "flumecodec": "0.0.1", "flumelog-offset": "3.4.4", diff --git a/test/add.js b/test/add.js index 82eba33..fdf620e 100644 --- a/test/add.js +++ b/test/add.js @@ -165,7 +165,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => { }) prepareAndRunTest('Update index', dir, (t, db, raf) => { - t.plan(4) + t.plan(7) t.timeoutAfter(5000) const msg = { type: 'post', text: 'Testing!' } let state = validate.initial() @@ -182,19 +182,19 @@ prepareAndRunTest('Update index', dir, (t, db, raf) => { }, } - const expectedStatus = [{ seq: -1, timestamp: -1, sequence: -1 }, {}] - db.status((stats) => { - t.deepEqual(stats, expectedStatus.shift()) - if (expectedStatus.length === 0) t.end() - }) + const expectedIndexingActive = [0, 1 /* seq */, 0, 1 /* type_post */, 0] addMsg(state.queue[0].value, raf, (err, msg1) => { db.all(typeQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 1) + t.equal(results.length, 1, '1 message') + + db.indexingActive((x) => { + t.equals(x, expectedIndexingActive.shift(), 'indexingActive matches') + }) addMsg(state.queue[1].value, raf, (err, msg1) => { db.all(typeQuery, 0, false, false, 'declared', (err, results) => { - t.equal(results.length, 2) + t.equal(results.length, 2, '2 messages') }) }) }) diff --git a/test/operators.js b/test/operators.js index f5074a1..f336162 100644 --- a/test/operators.js +++ b/test/operators.js @@ -625,6 +625,11 @@ prepareAndRunTest('not operator', dir, (t, db, raf) => { addMsg(state.queue[0].value, raf, (e1, msg1) => { addMsg(state.queue[1].value, raf, (e2, msg2) => { + const expectedQueriesActive = [0, 1, 0] + db.queriesActive((x) => { + t.equals(x, expectedQueriesActive.shift(), 'queriesActive matches') + }) + pull( query( fromDB(db), @@ -639,6 +644,7 @@ prepareAndRunTest('not operator', dir, (t, db, raf) => { t.equal(msgs.length, 1, 'page has one messages') t.equal(msgs[0].value.author, bob.id) t.equal(msgs[0].value.content.type, 'post') + t.equal(expectedQueriesActive.length, 0) t.end() }) )