Skip to content

Commit

Permalink
add operator batch()
Browse files Browse the repository at this point in the history
  • Loading branch information
staltz committed Aug 18, 2021
1 parent a0d465b commit bc56b12
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 4 deletions.
33 changes: 32 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ want to get results in batches, you should use **`toPullStream`**,
**`paginate`**, and optionally `startFrom` and `descending`.

- **toPullStream** creates a [pull-stream] source to stream the results
- **paginate** configures the size of each page stream to the pull-stream source
- **paginate** configures the size of each array sent to the pull-stream source
- **startFrom** configures the beginning seq from where to start streaming
- **descending** configures the pagination stream to order results
from newest to oldest (otherwise the default order is oldest to
Expand Down Expand Up @@ -259,6 +259,36 @@ pull(
)
```

**Batching** with the operator `batch()` is similar to pagination in terms of
performance, but the messages are delivered one-by-one to the final pull-stream,
instead of as any array. Example:

```js
const pull = require('pull-stream')

const source = query(
fromDB(db),
where(
and(
slowEqual('value.content.type', 'contact')
or(slowEqual('value.author', aliceId), slowEqual('value.author', bobId)),
),
),
batch(10), // Note `batch` instead of `paginate`
descending(),
toPullStream()
)

pull(
source,
// Note the below drain is `msg`, not `msgs` array:
pull.drain((msg) => {
console.log('next message:')
console.log(msg)
})
)
```

#### async/await

There are also operators that support getting the values using
Expand Down Expand Up @@ -372,6 +402,7 @@ const {
offsets,
count,
paginate,
batch,
startFrom,
descending,
asOffsets,
Expand Down
17 changes: 14 additions & 3 deletions operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ function paginate(pageSize) {
return (ops) => updateMeta(ops, 'pageSize', pageSize)
}

function batch(batchSize) {
return (ops) => updateMeta(ops, 'batchSize', batchSize)
}

function asOffsets() {
return (ops) => updateMeta(ops, 'asOffsets', true)
}
Expand Down Expand Up @@ -396,9 +400,9 @@ function toPullStream() {
function paginateStream(ops) {
let seq = meta.seq || 0
let total = Infinity
const limit = meta.pageSize || 1
const limit = meta.pageSize || meta.batchSize || 20
let shouldEnd = false
return function readable(end, cb) {
function readable(end, cb) {
if (end) return cb(end)
if (seq >= total || shouldEnd) return cb(true)
if (meta.count) {
Expand All @@ -417,12 +421,18 @@ function toPullStream() {
else {
total = answer.total
seq += limit
cb(null, !meta.pageSize ? answer.results[0] : answer.results)
cb(null, answer.results)
}
}
)
}
}
if (meta.pageSize || meta.count) {
return readable
} else {
// Flatten the "pages" (arrays) into individual messages
return pull(readable, pull.map(pull.values), pull.flatten())
}
}

return pull(
Expand Down Expand Up @@ -480,6 +490,7 @@ module.exports = {
count,
startFrom,
paginate,
batch,
asOffsets,
toCallback,
toPullStream,
Expand Down
2 changes: 2 additions & 0 deletions test/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const {
offsets,
fromDB,
paginate,
batch,
startFrom,
live,
count,
Expand Down Expand Up @@ -1257,6 +1258,7 @@ prepareAndRunTest('support live operations', dir, (t, db, raf) => {
fromDB(db),
where(slowEqual('value.content.type', 'post')),
live({ old: true }),
batch(2),
toPullStream(),
pull.drain((msg) => {
if (i++ == 0) {
Expand Down

0 comments on commit bc56b12

Please sign in to comment.