Skip to content

Commit

Permalink
Merge pull request #56 from arj03/validate2
Browse files Browse the repository at this point in the history
Validate2
  • Loading branch information
arj03 authored Aug 18, 2021
2 parents 3267110 + 03c46e0 commit c1469d4
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 909 deletions.
2 changes: 1 addition & 1 deletion core.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ exports.init = function (dir, config, extraModules) {

// Also listen for DHT connections.
SSB.net.dhtInvite.start((err, success) => { })
}, 2000)
}, 2500)
}

SSB.events.emit("SSB: loaded")
Expand Down
58 changes: 32 additions & 26 deletions feed-replication.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,43 @@ exports.init = function (sbot, config) {

function syncMessages(feed, key, rpcCall, cb) {
if (!partialState[feed] || !partialState[feed][key]) {
// FIXME: this will be much better with rusty validation
let adder = sbot.db.addOOO // this should be default, but is too slow
if (key == 'syncedMessages') { // false for go!
const oooState = validate.initial()
adder = (msg, cb) => sbot.db.addOOOStrictOrder(msg, oooState, cb)
} else { // hack, FIXME: creates duplicate messages
adder = (msg, cb) => {
const oooState = validate.initial()
sbot.db.addOOOStrictOrder(msg, oooState, cb)
}
}

pull(
rpcCall(),
pull.asyncMap(adder),
pull.collect((err, msgs) => {
if (err) {
console.error(err.message)
return cb(err)
}

var newState = {}
newState[key] = true
partial.updateState(feed, newState, (err) => { cb(err, feed) })
if (key === 'syncedMessages') {
const lastMsgValue = msgs[msgs.length - 1]
msgs = msgs.filter(m => m.content.type !== 'contact' &&
m.content.type !== 'about')
const kvt = validate.toKeyValueTimestamp(lastMsgValue)
sbot.db.setPost(kvt)
}

sbot.db.addOOOBatch(msgs, (err) => {
if (err) return cb(err)
var newState = {}
newState[key] = true
partial.updateState(feed, newState, (err) => { cb(err, feed) })
})
})
)
} else
cb(null, feed)
}

let synced = {}

function getLatestSequence(feed, cb) {
sbot.db.getLatest(feed, (err, latest) => {
if (err) return cb(err)

cb(null, latest ? latest.sequence + 1 : 0)
})
}

function syncFeed(rpc, feed, hops, cb) {
// idempotent
Expand All @@ -73,16 +79,16 @@ exports.init = function (sbot, config) {
} else if (hops === 1) {
if (!partialState[feed] || !partialState[feed]['full']) {
console.log("full replication of", feed)
sbot.db.getAllLatest((err, latest) => {
const latestSeq = latest[feed] ? latest[feed].sequence + 1 : 0
getLatestSequence(feed, (err, latestSeq) => {
pull(
rpc.partialReplication.getFeed({ id: feed, seq: latestSeq, keys: false }),
pull.asyncMap(sbot.db.add),
pull.collect((err) => {
pull.collect((err, messages) => {
if (err) return cb(err)

waitingEBTRequests.set(feed, true)
partial.updateState(feed, { full: true }, cb)
sbot.db.addBatch(messages, () => {
waitingEBTRequests.set(feed, true)
partial.updateState(feed, { full: true }, cb)
})
})
)
})
Expand All @@ -92,10 +98,6 @@ exports.init = function (sbot, config) {
//console.log("partial replication of", feed)
pull(
pull.values([feed]),
pull.asyncMap((feed, cb) => {
syncMessages(feed, 'syncedMessages',
() => rpc.partialReplication.getFeedReverse({ id: feed, keys: false, limit: 25 }), cb)
}),
pull.asyncMap((feed, cb) => {
syncMessages(feed, 'syncedProfile',
() => rpc.partialReplication.getMessagesOfType({ id: feed, type: 'about' }), cb)
Expand All @@ -104,6 +106,10 @@ exports.init = function (sbot, config) {
syncMessages(feed, 'syncedContacts',
() => rpc.partialReplication.getMessagesOfType({ id: feed, type: 'contact' }), cb)
}),
pull.asyncMap((feed, cb) => {
syncMessages(feed, 'syncedMessages',
() => rpc.partialReplication.getFeedReverse({ id: feed, keys: false, limit: 25 }), cb)
}),
pull.collect((err) => {
if (err) return cb(err)

Expand Down
Loading

0 comments on commit c1469d4

Please sign in to comment.