Skip to content

Commit

Permalink
Merge pull request #52 from ssbc/partial-replication-changes
Browse files Browse the repository at this point in the history
Partial replication changes
  • Loading branch information
arj03 authored Oct 12, 2021
2 parents b9a7f8b + e03caff commit 66f47aa
Show file tree
Hide file tree
Showing 17 changed files with 1,069 additions and 178 deletions.
4 changes: 4 additions & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"semi": false,
"singleQuote": true
}
92 changes: 90 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ installed, instead, you need to call its API methods yourself (primarily
`request` or `block`), or use a scheduler module such as
[ssb-replication-scheduler](https://github.com/ssb-ngi-pointer/ssb-replication-scheduler).

### `ssb.ebt.request(destination, replicating)` ("sync" muxrpc API)
### `ssb.ebt.request(destination, replicating, formatName)` ("sync" muxrpc API)

Request that the SSB feed ID `destination` be replicated. `replication` is a
boolean, where `true` indicates we want to replicate the destination. If set to
`false`, replication is stopped.
`false`, replication is stopped. `formatName` is optional and used to specify
the specific EBT instance, otherwise the first where isFeed is `true` for
`destination` is used.

Returns undefined, always.

Expand All @@ -56,6 +58,9 @@ them.
the SSB feed ID of the peer being blocked, and `blocking` is a boolean that
indicates whether to enable the block (`true`) or to unblock (`false`).

`formatName` is optional and used to specify the specific EBT instance,
otherwise the first where isFeed is `true` for `origin` is used.

Returns undefined, always.

### `ssb.ebt.peerStatus(id)` ("sync" muxrpc API)
Expand Down Expand Up @@ -96,16 +101,99 @@ The output looks like this:
}
}
```
</details>

### `ssb.ebt.registerFormat(methods)` ("sync" muxrpc API)

Register a new format for replication. Note this does not have to be a
new feed format, it could also be indexed replication or sliced
replication. See `formats` folder for examples.

By registering a format you create a new EBT instance used for
replicating feeds using that format. This means its own clock. Message
will be replicated using the `replicateFormat` API. The `methods`
argument must implement the following functions. The example below
shows the implementation for 'classic' ed25519 SSB feeds.

<details>
<summary>CLICK HERE</summary>

```js
{
name: 'classic',
// In case `isFeed` needs to load some state asynchronously
prepareForIsFeed(sbot, feedId, cb) {
cb()
},
// used in request, block, cleanClock, sbot.post, vectorClock
isFeed(sbot, feedId) {
return ref.isFeed(feedId)
},
getAtSequence(sbot, pair, cb) {
sbot.getAtSequence([pair.id, pair.sequence], (err, msg) => {
cb(err, msg ? msg.value : null)
})
},
appendMsg(sbot, msgVal, cb) {
sbot.add(msgVal, (err, msg) => {
cb(err && err.fatal ? err : null, msg)
})
},
// used in onAppend
convertMsg(msgVal) {
return msgVal
},
// used in vectorClock
isReady(sbot) {
return Promise.resolve(true)
},

// used in ebt:stream to distinguish between messages and notes
isMsg(msgVal) {
return Number.isInteger(msgVal.sequence) && msgVal.sequence > 0 &&
ref.isFeed(msgVal.author) && msgVal.content
},
// used in ebt:events
getMsgAuthor(msgVal) {
return msgVal.author
},
// used in ebt:events
getMsgSequence(msgVal) {
return msgVal.sequence
}
}
```
</details>

### `ssb.ebt.setClockForSlicedReplication(feedId, sequence, formatName)` ("sync" muxrpc API)

Sets the internal clock of a feed to a specific sequence. Note this
does not start replicating the feed, it only updates the clock. By
combining this with `clock` it is possible do to sliced replication
with a remote peer where say only the latest 100 messages of a feed is
replicated.

### (Internal) `ssb.ebt.replicate(opts)` ("duplex" muxrpc API)

Creates a duplex replication stream to the remote peer. When two peers connect,
the peer who initiated the call (the client) should call this. You do not need
to call this method, it is called automatically in ssb-ebt whenever our peer
connects to a remote peer. `opts` is an object with one field: `version`.

### (Internal) `ssb.ebt.replicateFormat(opts)` ("duplex" muxrpc API)

Creates a duplex replication stream to the remote peer. This behaves
similar to `replicate` except it takes an extra field `format`
specifying what is transferred over this EBT stream. Classic feeds are
still replicated using `replicate` while this will be used to
replicate other feed formats.

### (Internal) `ssb.ebt.clock(opts, cb)` ("async" muxrpc API)

Gets the current vector clock of a remote peer. `opts` is an object
with one field: `format` specifying what format to get the vector
clock for. Defaults to 'classic'.

## Testing and debugging

There are several scripts in `./debug` which can be used for testing EBT
Expand Down
48 changes: 48 additions & 0 deletions formats/bendy-butt.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
const SSBURI = require('ssb-uri2')
const bendyButt = require('ssb-bendy-butt')

module.exports = {
name: 'bendybutt-v1',
prepareForIsFeed(sbot, feedId, cb) {
cb()
},
// used in request, block, cleanClock, sbot.post, vectorClock
isFeed (sbot, feedId) {
return SSBURI.isBendyButtV1FeedSSBURI(feedId)
},
getAtSequence (sbot, pair, cb) {
sbot.getAtSequence([pair.id, pair.sequence], (err, msg) => {
cb(err, msg ? bendyButt.encode(msg.value) : null)
})
},
appendMsg (sbot, msgVal, cb) {
sbot.add(bendyButt.decode(msgVal), (err, msg) => {
cb(err && err.fatal ? err : null, msg)
})
},
convertMsg (sbot, msgVal, cb) {
cb(null, bendyButt.encode(msgVal))
},
// used in vectorClock
isReady (sbot) {
return Promise.resolve(true)
},

// used in ebt:stream to distinguish between messages and notes
isMsg (bbVal) {
if (Buffer.isBuffer(bbVal)) {
const msgVal = bendyButt.decode(bbVal)
return msgVal && SSBURI.isBendyButtV1FeedSSBURI(msgVal.author)
} else {
return bbVal && SSBURI.isBendyButtV1FeedSSBURI(bbVal.author)
}
},
// used in ebt:events
getMsgAuthor (bbVal) {
if (Buffer.isBuffer(bbVal)) { return bendyButt.decode(bbVal).author } else { return bbVal.author }
},
// used in ebt:events
getMsgSequence (bbVal) {
if (Buffer.isBuffer(bbVal)) { return bendyButt.decode(bbVal).sequence } else { return bbVal.sequence }
}
}
44 changes: 44 additions & 0 deletions formats/classic.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
const ref = require('ssb-ref')

module.exports = {
name: 'classic',
prepareForIsFeed(sbot, feedId, cb) {
cb()
},
// used in request, block, cleanClock, sbot.post, vectorClock
isFeed (sbot, feedId) {
return ref.isFeed(feedId)
},
getAtSequence (sbot, pair, cb) {
sbot.getAtSequence([pair.id, pair.sequence], (err, msg) => {
cb(err, msg ? msg.value : null)
})
},
appendMsg (sbot, msgVal, cb) {
sbot.add(msgVal, (err, msg) => {
cb(err && err.fatal ? err : null, msg)
})
},
// used in onAppend
convertMsg (sbot, msgVal, cb) {
cb(null, msgVal)
},
// used in vectorClock
isReady (sbot) {
return Promise.resolve(true)
},

// used in ebt:stream to distinguish between messages and notes
isMsg (msgVal) {
return Number.isInteger(msgVal.sequence) && msgVal.sequence > 0 &&
ref.isFeed(msgVal.author) && msgVal.content
},
// used in ebt:events
getMsgAuthor (msgVal) {
return msgVal.author
},
// used in ebt:events
getMsgSequence (msgVal) {
return msgVal.sequence
}
}
53 changes: 53 additions & 0 deletions formats/indexed.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
const pify = require('promisify-4loc')
const ref = require('ssb-ref')
const { QL0 } = require('ssb-subset-ql')

module.exports = {
name: 'indexed',
prepareForIsFeed(sbot, feedId, cb) {
sbot.metafeeds.ensureLoaded(feedId, cb)
},
isFeed (sbot, author) {
const info = sbot.metafeeds.findByIdSync(author)
return info && info.feedpurpose === 'index'
},
appendMsg (sbot, msgTuple, cb) {
const [msgVal, payload] = msgTuple
sbot.db.addTransaction([msgVal], [payload], cb)
},
getAtSequence (sbot, pair, cb) {
sbot.getAtSequence([pair.id, pair.sequence], (err, msg) => {
if (err) return cb(err)

module.exports.convertMsg(sbot, msg.value, cb)
})
},
convertMsg (sbot, msgVal, cb) {
const { sequence } = msgVal.content.indexed
const authorInfo = sbot.metafeeds.findByIdSync(msgVal.author)
if (!authorInfo) return cb(new Error('Unknown author:' + msgVal.author))
const { author } = QL0.parse(authorInfo.metadata.query)
sbot.getAtSequence([author, sequence], (err, indexedMsg) => {
if (err) return cb(err)

cb(null, [msgVal, indexedMsg.value])
})
},
isReady (sbot) {
return pify(sbot.metafeeds.loadState)()
},
isMsg (msgTuple) {
if (Array.isArray(msgTuple) && msgTuple.length === 2) {
const [msgVal, payload] = msgTuple
return Number.isInteger(msgVal.sequence) && msgVal.sequence > 0 &&
ref.isFeed(msgVal.author) && msgVal.content
} else
return false
},
getMsgAuthor (msgTuple) {
return msgTuple[0].author
},
getMsgSequence (msgTuple) {
return msgTuple[0].sequence
}
}
Loading

0 comments on commit 66f47aa

Please sign in to comment.