Skip to content

Commit

Permalink
Merge pull request #123 from ssb-ngi-pointer/faster-sort
Browse files Browse the repository at this point in the history
Faster sort
  • Loading branch information
staltz authored Feb 20, 2021
2 parents e6ccc22 + f5fc614 commit 0f475d3
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 37 deletions.
48 changes: 43 additions & 5 deletions benchmark/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -334,30 +334,68 @@ test('load two indexes concurrently', (t) => {
})
})

test('paginate one huge index', (t) => {
test('paginate big index with small pageSize', (t) => {
const TOTAL = 20000
const PAGESIZE = 5
const NUMPAGES = TOTAL / PAGESIZE
db.onReady(() => {
const start = Date.now()
let i = 0
pull(
query(
fromDB(db),
and(equal(seekType, 'post', { indexType: 'type' })),
paginate(5),
paginate(PAGESIZE),
toPullStream()
),
pull.take(4000),
pull.take(NUMPAGES),
pull.drain(
(msgs) => {
i++
},
(err) => {
if (err) t.fail(err)
const duration = Date.now() - start
if (i !== 4000) t.fail('wrong number of pages read: ' + i)
if (i !== NUMPAGES) t.fail('wrong number of pages read: ' + i)
t.pass(`duration: ${duration}ms`)
fs.appendFileSync(
reportPath,
`| Paginate 1 big index | ${duration}ms |\n`
`| Paginate ${TOTAL} msgs with pageSize=${PAGESIZE} | ${duration}ms |\n`
)
t.end()
}
)
)
})
})

test('paginate big index with big pageSize', (t) => {
const TOTAL = 20000
const PAGESIZE = 500
const NUMPAGES = TOTAL / PAGESIZE
db.onReady(() => {
const start = Date.now()
let i = 0
pull(
query(
fromDB(db),
and(equal(seekType, 'post', { indexType: 'type' })),
paginate(PAGESIZE),
toPullStream()
),
pull.take(NUMPAGES),
pull.drain(
(msgs) => {
i++
},
(err) => {
if (err) t.fail(err)
const duration = Date.now() - start
if (i !== NUMPAGES) t.fail('wrong number of pages read: ' + i)
t.pass(`duration: ${duration}ms`)
fs.appendFileSync(
reportPath,
`| Paginate ${TOTAL} msgs with pageSize=${PAGESIZE} | ${duration}ms |\n`
)
t.end()
}
Expand Down
51 changes: 33 additions & 18 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const pullAsync = require('pull-async')
const TypedFastBitSet = require('typedfastbitset')
const bsb = require('binary-search-bounds')
const multicb = require('multicb')
const FastPriorityQueue = require('fastpriorityqueue')
const debug = require('debug')('jitdb')
const debugQuery = debug.extend('query')
const Status = require('./status')
Expand Down Expand Up @@ -1021,22 +1022,30 @@ module.exports = function (log, indexesPath) {
})
}

function compareAscending(a, b) {
return b.timestamp > a.timestamp
}

function compareDescending(a, b) {
return a.timestamp > b.timestamp
}

function sortedByTimestamp(bitset, descending) {
updateCacheWithLog()
const order = descending ? 'descending' : 'ascending'
if (sortedCache[order].has(bitset)) return sortedCache[order].get(bitset)
const timestamped = bitset.array().map((seq) => {
return {
const fpq = new FastPriorityQueue(
descending ? compareDescending : compareAscending
)
bitset.array().forEach((seq) => {
fpq.add({
seq,
timestamp: indexes['timestamp'].tarr[seq],
}
})
const sorted = timestamped.sort((a, b) => {
if (descending) return b.timestamp - a.timestamp
else return a.timestamp - b.timestamp
})
})
sortedCache[order].set(bitset, sorted)
return sorted
fpq.trim()
sortedCache[order].set(bitset, fpq)
return fpq
}

function getMessagesFromBitsetSlice(
Expand All @@ -1049,13 +1058,19 @@ module.exports = function (log, indexesPath) {
) {
seq = seq || 0

const sorted = sortedByTimestamp(bitset, descending)
const sliced =
limit != null
? sorted.slice(seq, seq + limit)
: seq > 0
? sorted.slice(seq)
: sorted
let sorted = sortedByTimestamp(bitset, descending)
const resultSize = sorted.size

let sliced
if (seq === 0 && limit === 1) {
sliced = [sorted.peek()]
} else {
if (seq > 0) {
sorted = sorted.clone()
sorted.removeMany(() => true, seq)
}
sliced = sorted.kSmallest(limit || Infinity)
}

push(
push.values(sliced),
Expand All @@ -1067,15 +1082,15 @@ module.exports = function (log, indexesPath) {
push.collect((err, results) => {
cb(err, {
results: results,
total: sorted.length,
total: resultSize,
})
})
)
}

function countBitsetSlice(bitset, seq, descending) {
if (!seq) return bitset.size()
else return sortedByTimestamp(bitset, descending).slice(seq).length
else return bitset.size() - seq
}

function paginate(operation, seq, limit, descending, onlyOffset, cb) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"binary-search-bounds": "^2.0.4",
"bipf": "^1.4.0",
"debug": "^4.2.0",
"fastpriorityqueue": "^0.6.3",
"idb-kv-store": "^4.5.0",
"jsesc": "^3.0.2",
"mkdirp": "^1.0.4",
Expand Down
24 changes: 23 additions & 1 deletion test/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,29 @@ prepareAndRunTest('count operator toCallback', dir, (t, db, raf) => {
})
})

prepareAndRunTest('count with seq operator toCallback', dir, (t, db, raf) => {
const msg = { type: 'food', text: 'Lunch' }
let state = validate.initial()
state = validate.appendNew(state, null, alice, msg, Date.now())
state = validate.appendNew(state, null, bob, msg, Date.now() + 1)

addMsg(state.queue[0].value, raf, (e1, msg1) => {
addMsg(state.queue[1].value, raf, (e2, msg2) => {
query(
fromDB(db),
and(slowEqual('value.content.type', 'food')),
startFrom(1),
count(),
toCallback((err, total) => {
t.error(err, 'no error')
t.equal(total, 1)
t.end()
})
)
})
})
})

prepareAndRunTest('count operator toPullStream', dir, (t, db, raf) => {
const msg = { type: 'drink', text: 'Juice' }
let state = validate.initial()
Expand All @@ -631,7 +654,6 @@ prepareAndRunTest('count operator toPullStream', dir, (t, db, raf) => {
),
pull.collect((err, results) => {
t.error(err, 'no error')
console.log(results)
t.equal(results.length, 1)
t.equal(results[0], 2)
t.end()
Expand Down
31 changes: 18 additions & 13 deletions test/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,12 @@ prepareAndRunTest('Timestamp discontinuity', dir, (t, db, raf) => {
const msg2 = { type: 'post', text: '2nd' }
const msg3 = { type: 'post', text: '3rd' }

const start = Date.now()

let state = validate.initial()
state = validate.appendNew(state, null, keys, msg1, Date.now() + 3000)
state = validate.appendNew(state, null, keys, msg2, Date.now() + 2000)
state = validate.appendNew(state, null, keys, msg3, Date.now() + 1000)
state = validate.appendNew(state, null, keys, msg1, start + 3000)
state = validate.appendNew(state, null, keys, msg2, start + 2000)
state = validate.appendNew(state, null, keys, msg3, start + 1000)

const authorQuery = {
type: 'EQUAL',
Expand All @@ -724,17 +726,20 @@ prepareAndRunTest('Timestamp discontinuity', dir, (t, db, raf) => {
},
}

addMsg(state.queue[0].value, raf, (err, m1) => {
addMsg(state.queue[1].value, raf, (err, m2) => {
addMsg(state.queue[2].value, raf, (err, m3) => {
db.all(authorQuery, 0, false, false, (err, results) => {
t.equal(results.length, 3)
t.equal(results[0].value.content.text, '1st', '1st ok')
t.equal(results[1].value.content.text, '2nd', '2nd ok')
t.equal(results[2].value.content.text, '3rd', '3rd ok')
t.end()
// we need to wait for the declared timestamps to win over arrival
setTimeout(() => {
addMsg(state.queue[0].value, raf, (err, m1) => {
addMsg(state.queue[1].value, raf, (err, m2) => {
addMsg(state.queue[2].value, raf, (err, m3) => {
db.all(authorQuery, 0, false, false, (err, results) => {
t.equal(results.length, 3)
t.equal(results[0].value.content.text, '3rd', '3rd ok')
t.equal(results[1].value.content.text, '2nd', '2nd ok')
t.equal(results[2].value.content.text, '1st', '1st ok')
t.end()
})
})
})
})
})
}, 3000)
})

0 comments on commit 0f475d3

Please sign in to comment.