Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial replication changes #52

Merged
merged 51 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
66af62c
Expose the clock over a RPC
arj03 Sep 3, 2021
a20e9d8
Refactor to allow multiple ebts
arj03 Sep 4, 2021
d96de1d
Update test/clock.js
arj03 Sep 6, 2021
9dfd146
Update test/clock.js
arj03 Sep 6, 2021
67991e8
Refactor formats a bit more
arj03 Sep 6, 2021
8f3bb44
Refactor formats a bit and add test
arj03 Sep 6, 2021
a3534bf
More format tests, fix some bugs
arj03 Sep 7, 2021
6ada5fe
Use replicateFormat for newer feed formats, fix clock manifest
arj03 Sep 7, 2021
2f609a4
Index test
arj03 Sep 8, 2021
c171049
Add sliced index replication test
arj03 Sep 8, 2021
db0eccf
Use released ssb-db2
arj03 Sep 8, 2021
877bcc1
Tweak replication timeout
arj03 Sep 8, 2021
e703ae0
Update index.js
arj03 Sep 9, 2021
f8f4cee
Refactor peerStatus to deduce the format
arj03 Sep 9, 2021
cd7194e
Use opts in clock to make it consistent with other APIs and update RE…
arj03 Sep 9, 2021
011be45
Refactor request + block to deduce format
arj03 Sep 9, 2021
0f1b189
Document registerFormat
arj03 Sep 9, 2021
0d09d6c
Document setClockForSlicedReplication and simplify api
arj03 Sep 9, 2021
0bca49e
Missing </details>
arj03 Sep 9, 2021
7423142
Bendy butt fixes from 8k testing
arj03 Sep 11, 2021
6762348
Rewrite tests to use index writer + fix bendy butt name
arj03 Sep 14, 2021
bb992d3
Bump ssb-index-feed-writer to fix tests
arj03 Sep 14, 2021
5fe8343
Try increasing the wait for replication for tests
arj03 Sep 14, 2021
2208d43
Gargh, wrong test result
arj03 Sep 14, 2021
532f787
add missing ssb-caps dev dep
staltz Sep 14, 2021
ae757fa
add missing mkdirp dev dep
staltz Sep 14, 2021
3c823eb
Split formats into a folder
arj03 Sep 14, 2021
5de319d
Use spread operator instead of object.assign
arj03 Sep 15, 2021
38f96db
Minor review fixes
arj03 Sep 15, 2021
213fc0b
Fix "API" typo in README
staltz Sep 15, 2021
71fdda6
Tweak README.md text about format example
staltz Sep 15, 2021
ebc87a3
Update index.js
arj03 Sep 15, 2021
ce78437
Review fixes
arj03 Sep 15, 2021
8cb3b9c
Add formats to package.json
arj03 Sep 15, 2021
dee03d0
Refactor to not use formats
arj03 Sep 15, 2021
7c520b8
Allow one to specify the format in request/block
arj03 Sep 15, 2021
a28bce9
Better sliced example
arj03 Sep 15, 2021
e5f7c1e
Refactor to simplify formats
arj03 Sep 15, 2021
55c2372
Update index.js
arj03 Sep 16, 2021
8a0e3bb
Review fixes
arj03 Sep 16, 2021
6e64d0d
lint it
arj03 Sep 16, 2021
417f523
setup prettier like in ssb-db2
staltz Sep 16, 2021
14e04e7
introduce format.prepareForIsFeed()
staltz Sep 16, 2021
2fe8791
update ssb-meta-feeds and fix usage
staltz Sep 16, 2021
0e398b8
improve test for index feeds
staltz Sep 22, 2021
b4e06a3
Use transactions for indexed and fix tests
arj03 Sep 22, 2021
39e40d8
Send indexes as tuples instead
arj03 Sep 23, 2021
c488bd4
Change p-defer to a simple waiting queue system
arj03 Sep 23, 2021
70c4660
Merge pull request #55 from ssbc/fix-async-blocking
arj03 Sep 23, 2021
a8fd938
Bump EBT
arj03 Sep 24, 2021
e03caff
Better error handling
arj03 Oct 8, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 171 additions & 55 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const path = require('path')
const pull = require('pull-stream')
const toPull = require('push-stream-to-pull-stream')
const EBT = require('epidemic-broadcast-trees')
const isFeed = require('ssb-ref').isFeed
const ref = require('ssb-ref')
const Store = require('lossy-store')
const toUrlFriendly = require('base64-url').escape
const getSeverity = require('ssb-network-errors')
Expand All @@ -21,21 +21,23 @@ exports.version = '1.0.0'

exports.manifest = {
replicate: 'duplex',
replicateFormat: 'duplex',
request: 'sync',
block: 'sync',
peerStatus: 'sync'
peerStatus: 'sync',
clock: 'async'
}

exports.permissions = {
anonymous: {
allow: ['replicate']
allow: ['replicate', 'replicateFormat', 'clock']
}
}

// there was a bug that caused some peers
// to request things that weren't feeds.
// this is fixed, so just ignore anything that isn't a feed.
function cleanClock (clock) {
function cleanClock (clock, isFeed) {
for (const k in clock) {
if (!isFeed(k)) {
delete clock[k]
Expand All @@ -44,48 +46,105 @@ function cleanClock (clock) {
}

exports.init = function (sbot, config) {
const dir = config.path ? path.join(config.path, 'ebt') : null
const store = Store(dir, null, toUrlFriendly)

const ebt = EBT({
logging: config.ebt && config.ebt.logging,
id: sbot.id,
getClock (id, cb) {
store.ensure(id, function () {
const clock = store.get(id) || {}
cleanClock(clock)
cb(null, clock)
})
},
setClock (id, clock) {
cleanClock(clock, 'non-feed key when saving clock')
store.set(id, clock)
},
getAt (pair, cb) {
sbot.getAtSequence([pair.id, pair.sequence], (err, data) => {
cb(err, data ? data.value : null)
})
},
append (msg, cb) {
sbot.add(msg, (err, msg) => {
cb(err && err.fatal ? err : null, msg)
})
},
isFeed: isFeed
})
const formats = {
'classic': {
// used in request, block, cleanClock, sbot.post
isFeed: ref.isFeed,
getAtSequence(sbot, pair, cb) {
sbot.getAtSequence([pair.id, pair.sequence], (err, msg) => {
cb(err, msg ? msg.value : null)
})
},
appendMsg(sbot, msgVal, cb) {
sbot.add(msgVal, (err, msg) => {
cb(err && err.fatal ? err : null, msg)
})
},

// used in ebt:stream to distinguish between messages and notes
isMsg(msgVal) {
return Number.isInteger(msgVal.sequence) && msgVal.sequence > 0 &&
ref.isFeed(msgVal.author) && msgVal.content
},
// used in ebt:events
getMsgAuthor(msgVal) {
return msgVal.author
},
// used in ebt:events
getMsgSequence(msgVal) {
return msgVal.sequence
},
}
}

const ebts = {}
function addEBT(formatName) {
arj03 marked this conversation as resolved.
Show resolved Hide resolved
const dirName = 'ebt' + (formatName === 'classic' ? '' : formatName)
const dir = config.path ? path.join(config.path, dirName) : null
const store = Store(dir, null, toUrlFriendly)

const format = formats[formatName]

const ebt = EBT(Object.assign({
arj03 marked this conversation as resolved.
Show resolved Hide resolved
logging: config.ebt && config.ebt.logging,
id: sbot.id,
getClock (id, cb) {
store.ensure(id, function () {
const clock = store.get(id) || {}
cleanClock(clock, format.isFeed)
cb(null, clock)
})
},
setClock (id, clock) {
cleanClock(clock, format.isFeed)
store.set(id, clock)
},
getAt (pair, cb) {
format.getAtSequence(sbot, pair, cb)
},
append (msgVal, cb) {
format.appendMsg(sbot, msgVal, cb)
}
}, format))

ebts[formatName] = ebt
}

function getEBT(formatName) {
const ebt = ebts[formatName]
if (!ebt)
throw new Error('Unknown format' + formatName)
arj03 marked this conversation as resolved.
Show resolved Hide resolved

return ebt
}

addEBT('classic')

const initialized = DeferredPromise()

sbot.getVectorClock((err, clock) => {
if (err) console.warn('Failed to getVectorClock in ssb-ebt because:', err)
ebt.state.clock = clock || {}
ebt.update()
for (let formatName in ebts) {
const format = formats[formatName]
const ebt = ebts[formatName]

validClock = {}
for (let k in clock)
if (format.isFeed(k))
validClock[k] = clock[k]

ebt.state.clock = validClock
ebt.update()
}
initialized.resolve()
})

sbot.post((msg) => {
initialized.promise.then(() => {
ebt.onAppend(msg.value)
for (let format in ebts) {
if (formats[format].isFeed(msg.value.author))
ebts[format].onAppend(msg.value)
}
})
})

Expand All @@ -94,6 +153,7 @@ exports.init = function (sbot, config) {
if (sbot.progress) {
hook(sbot.progress, function (fn) {
const _progress = fn()
const ebt = ebts['classic']
const ebtProg = ebt.progress()
if (ebtProg.target) _progress.ebt = ebtProg
return _progress
Expand All @@ -104,29 +164,48 @@ exports.init = function (sbot, config) {
if (rpc.id === sbot.id) return // ssb-client connecting to ssb-server
if (isClient) {
initialized.promise.then(() => {
const opts = { version: 3 }
const local = toPull.duplex(ebt.createStream(rpc.id, opts.version, true))
const remote = rpc.ebt.replicate(opts, (networkError) => {
if (networkError && getSeverity(networkError) >= 3) {
console.error('rpc.ebt.replicate exception:', networkError)
}
})
pull(local, remote, local)
for (let format in ebts) {
const ebt = ebts[format]
const opts = { version: 3, format }
const local = toPull.duplex(ebt.createStream(rpc.id, opts.version, true))

// for backwards compatibility we always replicate classic
// feeds using existing replicate RPC
const replicate = (format === 'classic' ? rpc.ebt.replicate : rpc.ebt.replicateFormat)

const remote = replicate(opts, (networkError) => {
if (networkError && getSeverity(networkError) >= 3) {
console.error('rpc.ebt.replicate exception:', networkError)
}
})
pull(local, remote, local)
}
})
}
})

function request (destFeedId, requesting) {
function request(destFeedId, requesting, formatName) {
initialized.promise.then(() => {
if (!isFeed(destFeedId)) return
ebt.request(destFeedId, requesting)
formatName = formatName || 'classic'
const format = formats[formatName]

if (!(format && format.isFeed(destFeedId))) return
arj03 marked this conversation as resolved.
Show resolved Hide resolved

ebts[formatName].request(destFeedId, requesting)
})
}

function block (origFeedId, destFeedId, blocking) {
function block(origFeedId, destFeedId, blocking, formatName) {
initialized.promise.then(() => {
if (!isFeed(origFeedId)) return
if (!isFeed(destFeedId)) return
formatName = formatName || 'classic'
const format = formats[formatName]

if (!format) return
if (!format.isFeed(origFeedId)) return
if (!format.isFeed(destFeedId)) return

const ebt = ebts[formatName]

if (blocking) {
ebt.block(origFeedId, destFeedId, true)
} else if (
Expand All @@ -139,11 +218,14 @@ exports.init = function (sbot, config) {
})
}

function replicate (opts) {
function replicateFormat(opts) {
if (opts.version !== 3) {
throw new Error('expected ebt.replicate({version: 3})')
}

let formatName = opts.format || 'classic'
arj03 marked this conversation as resolved.
Show resolved Hide resolved
const ebt = getEBT(formatName)

var deferred = pullDefer.duplex()
initialized.promise.then(() => {
// `this` refers to the remote peer who called this muxrpc API
Expand All @@ -152,14 +234,18 @@ exports.init = function (sbot, config) {
return deferred
}

// get replication status for feeds for this id.
function peerStatus (id) {
// get replication status for feeds for this id
function peerStatus(id, formatName) {
id = id || sbot.id
formatName = formatName || 'classic'
const ebt = getEBT(formatName)

const data = {
id: id,
seq: ebt.state.clock[id],
peers: {}
}

for (const k in ebt.state.peers) {
const peer = ebt.state.peers[k]
if (peer.clock[id] != null ||
Expand All @@ -171,13 +257,43 @@ exports.init = function (sbot, config) {
}
}
}

return data
}

function clock(formatName, cb) {
if (!cb) {
cb = formatName
formatName = 'classic'
}

initialized.promise.then(() => {
const ebt = getEBT(formatName)
cb(null, ebt.state.clock)
})
}

function setClockForSlicedReplication(format, feed, sequence) {
initialized.promise.then(() => {
const ebt = getEBT(format)
ebt.state.clock[feed] = sequence
})
}

function registerFormat(formatName, methods) {
formats[formatName] = methods
addEBT(formatName)
}

return {
request,
block,
replicate,
peerStatus
replicate: replicateFormat,
replicateFormat,
peerStatus,
clock,
setClockForSlicedReplication,
registerFormat,
formats
}
}
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@
"rimraf": "^2.7.1",
"rng": "^0.2.2",
"secret-stack": "^6.4.0",
"ssb-bendy-butt": "^0.12.2",
"ssb-client": "^4.9.0",
"ssb-db": "^19.2.0",
"ssb-db2": "^2.4.0",
"ssb-generate": "^1.0.1",
"ssb-keys": "^8.1.0",
"ssb-uri2": "^1.5.2",
"ssb-validate": "^4.1.4",
"standardx": "^7.0.0",
"tap-spec": "^5.0.0",
Expand Down
Loading