Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add APIs necessary for log compaction algorithms #217

Merged
merged 8 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 74 additions & 27 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const TypedFastBitSet = require('typedfastbitset')
const bsb = require('binary-search-bounds')
const multicb = require('multicb')
const FastPriorityQueue = require('fastpriorityqueue')
const Obv = require('obz')
const debug = require('debug')('jitdb')
const debugQuery = debug.extend('query')
const Status = require('./status')
Expand Down Expand Up @@ -40,7 +41,10 @@ module.exports = function (log, indexesPath) {
const indexes = {}
let isReady = false
let waiting = []
const waitingCompaction = []
const coreIndexNames = ['seq', 'timestamp', 'sequence']
const indexingActive = Obv().set(0)
const queriesActive = Obv().set(0)

loadIndexes(() => {
debug('loaded indexes', Object.keys(indexes))
Expand Down Expand Up @@ -82,6 +86,13 @@ module.exports = function (log, indexesPath) {
else waiting.push(cb)
}

log.compactionProgress((stats) => {
if (stats.done && waitingCompaction.length > 0) {
for (const cb of waitingCompaction) cb()
waitingCompaction.length = 0
}
})

const B_TIMESTAMP = Buffer.from('timestamp')
const B_SEQUENCE = Buffer.from('sequence')
const B_VALUE = Buffer.from('value')
Expand Down Expand Up @@ -524,6 +535,7 @@ module.exports = function (log, indexesPath) {
const logstreamId = Math.ceil(Math.random() * 1000)
debug(`log.stream #${logstreamId} started, to update index ${waitingKey}`)
status.update(indexes, indexNamesForStatus)
indexingActive.set(indexingActive.value + 1)

log.stream({ gt: index.offset }).pipe({
paused: false,
Expand Down Expand Up @@ -578,6 +590,7 @@ module.exports = function (log, indexesPath) {

status.update(indexes, indexNamesForStatus)
status.done(indexNamesForStatus)
indexingActive.set(indexingActive.value - 1)

runWaitingIndexLoadCbs(waitingIndexUpdate, waitingKey)

Expand Down Expand Up @@ -658,6 +671,7 @@ module.exports = function (log, indexesPath) {
debug(`log.stream #${logstreamId} started, to create indexes ${waitingKey}`)
status.update(indexes, coreIndexNames)
status.update(newIndexes, newIndexNames)
indexingActive.set(indexingActive.value + 1)

log.stream({}).pipe({
paused: false,
Expand Down Expand Up @@ -738,6 +752,7 @@ module.exports = function (log, indexesPath) {
status.update(newIndexes, newIndexNames)
status.done(coreIndexNames)
status.done(newIndexNames)
indexingActive.set(indexingActive.value - 1)

runWaitingIndexLoadCbs(waitingIndexCreate, waitingKey)

Expand Down Expand Up @@ -1365,6 +1380,14 @@ module.exports = function (log, indexesPath) {
}

function paginate(operation, seq, limit, descending, onlyOffset, sortBy, cb) {
if (!log.compactionProgress.value.done) {
waitingCompaction.push(() =>
paginate(operation, seq, limit, descending, onlyOffset, sortBy, cb)
)
return
}

queriesActive.set(queriesActive.value + 1)
onReady(() => {
const start = Date.now()
executeOperation(operation, (err0, result) => {
Expand All @@ -1379,26 +1402,33 @@ module.exports = function (log, indexesPath) {
onlyOffset,
sortBy,
(err1, answer) => {
if (err1) cb(err1)
else {
answer.duration = Date.now() - start
if (debugQuery.enabled)
debugQuery(
`paginate(${getNameFromOperation(
operation
)}), seq: ${seq}, limit: ${limit}: ${
answer.duration
}ms, total messages: ${answer.total}`.replace(/%/g, '%% ')
)
cb(null, answer)
}
queriesActive.set(queriesActive.value - 1)
if (err1) return cb(err1)
answer.duration = Date.now() - start
if (debugQuery.enabled)
debugQuery(
`paginate(${getNameFromOperation(
operation
)}), seq: ${seq}, limit: ${limit}: ${
answer.duration
}ms, total messages: ${answer.total}`.replace(/%/g, '%% ')
)
cb(null, answer)
}
)
})
})
}

function all(operation, seq, descending, onlyOffset, sortBy, cb) {
if (!log.compactionProgress.value.done) {
waitingCompaction.push(() =>
all(operation, seq, descending, onlyOffset, sortBy, cb)
)
return
}

queriesActive.set(queriesActive.value + 1)
onReady(() => {
const start = Date.now()
executeOperation(operation, (err0, result) => {
Expand All @@ -1413,27 +1443,33 @@ module.exports = function (log, indexesPath) {
onlyOffset,
sortBy,
(err1, answer) => {
if (err1) cb(err1)
else {
answer.duration = Date.now() - start
if (debugQuery.enabled)
debugQuery(
`all(${getNameFromOperation(operation)}): ${
answer.duration
}ms, total messages: ${answer.total}`.replace(/%/g, '%% ')
)
cb(null, answer.results)
}
queriesActive.set(queriesActive.value - 1)
if (err1) return cb(err1)
answer.duration = Date.now() - start
if (debugQuery.enabled)
debugQuery(
`all(${getNameFromOperation(operation)}): ${
answer.duration
}ms, total messages: ${answer.total}`.replace(/%/g, '%% ')
)
cb(null, answer.results)
}
)
})
})
}

function count(operation, seq, descending, cb) {
if (!log.compactionProgress.value.done) {
waitingCompaction.push(() => count(operation, seq, descending, cb))
return
}

queriesActive.set(queriesActive.value + 1)
onReady(() => {
const start = Date.now()
executeOperation(operation, (err0, result) => {
queriesActive.set(queriesActive.value - 1)
if (err0) return cb(err0)
const [bitset] = result
const total = countBitsetSlice(bitset, seq, descending)
Expand All @@ -1450,6 +1486,12 @@ module.exports = function (log, indexesPath) {
}

function prepare(operation, cb) {
if (!log.compactionProgress.value.done) {
waitingCompaction.push(() => prepare(operation, cb))
return
}

queriesActive.set(queriesActive.value + 1)
onReady(() => {
const start = Date.now()
// Update status at the beginning:
Expand All @@ -1465,6 +1507,7 @@ module.exports = function (log, indexesPath) {
status.update(indexesToReportStatus, indexNamesToReportStatus)
// Build indexes:
executeOperation(operation, (err) => {
queriesActive.set(queriesActive.value - 1)
if (err) return cb(err)
const duration = Date.now() - start
cb(null, duration)
Expand Down Expand Up @@ -1605,9 +1648,11 @@ module.exports = function (log, indexesPath) {
push(
push.values(Object.entries(indexes)),
push.asyncMap(([indexName, index], cb) => {
if (coreIndexNames.includes(indexName)) return cb()

if (index.lazy) {
if (coreIndexNames.includes(indexName)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, reindex can now be done when things are removed, so this changes the core indexes. Before when it was only encrypted, these core indexes does not change.

resetIndex(index)
saveCoreIndex(indexName, index, seq)
cb()
} else if (index.lazy) {
loadLazyIndex(indexName, (err) => {
if (err) return cb(err)

Expand Down Expand Up @@ -1640,6 +1685,8 @@ module.exports = function (log, indexesPath) {
live,
status: status.obv,
reindex,
indexingActive,
queriesActive,

// testing
indexes,
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
"typedfastbitset": "~0.2.1"
},
"peerDependencies": {
"async-append-only-log": "^4.0.0"
"async-append-only-log": "^4.2.1"
},
"devDependencies": {
"async-append-only-log": "^4.0.0",
"async-append-only-log": "^4.2.1",
"expose-gc": "^1.0.0",
"flumecodec": "0.0.1",
"flumelog-offset": "3.4.4",
Expand Down
16 changes: 8 additions & 8 deletions test/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ prepareAndRunTest('Base', dir, (t, db, raf) => {
})

prepareAndRunTest('Update index', dir, (t, db, raf) => {
t.plan(4)
t.plan(7)
t.timeoutAfter(5000)
const msg = { type: 'post', text: 'Testing!' }
let state = validate.initial()
Expand All @@ -182,19 +182,19 @@ prepareAndRunTest('Update index', dir, (t, db, raf) => {
},
}

const expectedStatus = [{ seq: -1, timestamp: -1, sequence: -1 }, {}]
db.status((stats) => {
t.deepEqual(stats, expectedStatus.shift())
if (expectedStatus.length === 0) t.end()
})
const expectedIndexingActive = [0, 1 /* seq */, 0, 1 /* type_post */, 0]

addMsg(state.queue[0].value, raf, (err, msg1) => {
db.all(typeQuery, 0, false, false, 'declared', (err, results) => {
t.equal(results.length, 1)
t.equal(results.length, 1, '1 message')

db.indexingActive((x) => {
t.equals(x, expectedIndexingActive.shift(), 'indexingActive matches')
})

addMsg(state.queue[1].value, raf, (err, msg1) => {
db.all(typeQuery, 0, false, false, 'declared', (err, results) => {
t.equal(results.length, 2)
t.equal(results.length, 2, '2 messages')
})
})
})
Expand Down
6 changes: 6 additions & 0 deletions test/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,11 @@ prepareAndRunTest('not operator', dir, (t, db, raf) => {

addMsg(state.queue[0].value, raf, (e1, msg1) => {
addMsg(state.queue[1].value, raf, (e2, msg2) => {
const expectedQueriesActive = [0, 1, 0]
db.queriesActive((x) => {
t.equals(x, expectedQueriesActive.shift(), 'queriesActive matches')
})

pull(
query(
fromDB(db),
Expand All @@ -639,6 +644,7 @@ prepareAndRunTest('not operator', dir, (t, db, raf) => {
t.equal(msgs.length, 1, 'page has one messages')
t.equal(msgs[0].value.author, bob.id)
t.equal(msgs[0].value.content.type, 'post')
t.equal(expectedQueriesActive.length, 0)
t.end()
})
)
Expand Down