Skip to content

Commit

Permalink
[updateCandidateStatus] Reduce rpc calls in updateSignerPenAndStatus (r…
Browse files Browse the repository at this point in the history
…esolve #801)
  • Loading branch information
bobcoin98 committed Dec 26, 2023
1 parent a2b3c92 commit 645c380
Showing 1 changed file with 161 additions and 92 deletions.
253 changes: 161 additions & 92 deletions crawl.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ let cpValidator = 0

let tweetedMN = ''

async function watchValidator () {
async function watchValidator() {
var blockNumber = cpValidator || await web3.eth.getBlockNumber()
try {
blockNumber = blockNumber || await web3.eth.getBlockNumber()
Expand Down Expand Up @@ -160,7 +160,7 @@ async function watchValidator () {
}
}

async function updateCandidateInfo (candidate) {
async function updateCandidateInfo(candidate) {
try {
let capacity = await validator.methods.getCandidateCap(candidate).call()
let owner = (await validator.methods.getCandidateOwner(candidate).call() || '').toLowerCase()
Expand Down Expand Up @@ -206,7 +206,7 @@ async function updateCandidateInfo (candidate) {
}
}

async function updateVoterCap (candidate, voter) {
async function updateVoterCap(candidate, voter) {
try {
let capacity = await validator.methods.getVoterCap(candidate, voter).call()
logger.debug('Update voter %s for candidate %s capacity %s', voter, candidate, String(capacity))
Expand All @@ -229,7 +229,7 @@ async function updateVoterCap (candidate, voter) {
}

// Get current candates
async function getCurrentCandidates () {
async function getCurrentCandidates() {
try {
let candidates = await validator.methods.getCandidates().call()
let candidatesInDb = await db.Candidate.find({
Expand Down Expand Up @@ -257,108 +257,177 @@ async function getCurrentCandidates () {
}
}

async function updateSignerPenAndStatus () {
async function updateSignerPenAndStatus() {
try {
const latestBlockNumber = await web3.eth.getBlockNumber()
const latestCheckpoint = latestBlockNumber - (latestBlockNumber % parseInt(config.get('blockchain.epoch')))
const currentEpoch = (parseInt(latestCheckpoint / config.get('blockchain.epoch')) + 1).toString()
const blk = await web3.eth.getBlock(latestCheckpoint)
const signers = []
const penalties = []
let candidateBulkOps = []
let statusBulkOps = []

// get candidate list
const candidates = await db.Candidate.find({
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: {
$ne: 'RESIGNED'
}
})
// loop and get status
await Promise.all(candidates.map(async (c) => {
const data = {
'jsonrpc': '2.0',
'method': 'eth_getCandidateStatus',
'params': [c.candidate.toLowerCase(), 'latest'],
'id': config.get('blockchain.networkId')
},
{
candidate: 1,
status: 1
}
const response = await axios.post(config.get('blockchain.rpc'), data)

if (response.data) {
const result = (response.data.result || {}).status
switch (result) {
case 'MASTERNODE':
signers.push(c.candidate)
await db.Candidate.findOneAndUpdate({
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: c.candidate.toLowerCase()
}, {
$set: {
status: 'MASTERNODE'
}
}, { upsert: true })
await db.Status.findOneAndUpdate({ epoch: currentEpoch, candidate: c.candidate }, {
epoch: currentEpoch,
candidate: c.candidate,
status: 'MASTERNODE',
epochCreatedAt: moment.unix(blk.timestamp).utc()
}, { upsert: true })
break
case 'SLASHED':
logger.info('Update candidate %s slashed at blockNumber %s', c.candidate, String(blk.number))
// fireNotification
if (result.toLowerCase() !== c.status.toLowerCase()) {
// get all voters who have capacity > 0
const voters = await db.Voter.find({
candidate: c.candidate,
smartContractAddress: config.get('blockchain.validatorAddress'),
capacityNumber: { $gt: 0 }
)
const onchainCandidateParams = {
'jsonrpc': '2.0',
'method': 'eth_getCandidates',
'params': ['latest'],
'id': config.get('blockchain.networkId')
}
const response = await axios.post(config.get('blockchain.rpc'), onchainCandidateParams)
if (response.data && response.data.result) {
const allCandidates = response.data.result.candidates
if (!allCandidates || allCandidates.length == 0) {
logger.error('no onchain candidate found')
return
}
for (const c of candidates) {
const thisCandidate = allCandidates[c.candidate]
switch (thisCandidate.status) {
case 'MASTERNODE':
signers.push(c.candidate)
candidateBulkOps.push({
updateOne: {
filter: {
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: c.candidate.toLowerCase()
},
update: {
$set: {
status: 'MASTERNODE'
}
},
upsert: true
}
})
if (voters && voters.length > 0) {
await Promise.all(voters.map(async (v) => {
await fireNotification(v.voter, c.candidate, c.name, 'Slash', latestBlockNumber)
}))
statusBulkOps.push({
updateOne: {
filter: {
epoch: currentEpoch, candidate: c.candidate
},
update: {
$set: {
epoch: currentEpoch,
candidate: c.candidate,
status: 'MASTERNODE',
epochCreatedAt: moment.unix(blk.timestamp).utc()
}
},
upsert: true
}
})
break
case 'SLASHED':
logger.info('Update candidate %s slashed at blockNumber %s', c.candidate, String(blk.number))
// fireNotification
if (thisCandidate.status.toLowerCase() !== c.status.toLowerCase()) {
// get all voters who have capacity > 0
const voters = await db.Voter.find({
candidate: c.candidate,
smartContractAddress: config.get('blockchain.validatorAddress'),
capacityNumber: { $gt: 0 }
})
if (voters && voters.length > 0) {
await Promise.all(voters.map(async (v) => {
await fireNotification(v.voter, c.candidate, c.name, 'Slash', latestBlockNumber)
}))
}
}
}
penalties.push(c.candidate)

candidateBulkOps.push({
updateOne: {
filter: {
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: c.candidate.toLowerCase()
},
update: {
$set: {
status: 'SLASHED'
}
},
upsert: true
}
})

db.Candidate.findOneAndUpdate({
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: c.candidate.toLowerCase()
}, {
$set: {
status: 'SLASHED'
}
}, { upsert: true }).then(() => true)
.catch(error => console.log(error))
statusBulkOps.push({
updateOne: {
filter: {
epoch: currentEpoch, candidate: c.candidate
},
update: {
$set: {
epoch: currentEpoch,
candidate: c.candidate,
status: 'SLASHED',
epochCreatedAt: moment.unix(blk.timestamp).utc()
}
},
upsert: true
}
})
break
case 'PROPOSED':
candidateBulkOps.push({
updateOne: {
filter: {
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: c.candidate.toLowerCase()
},
update: {
$set: {
status: 'PROPOSED'
}
},
upsert: true
}
})

statusBulkOps.push({
updateOne: {
filter: {
epoch: currentEpoch, candidate: c.candidate
},
update: {
$set: {
epoch: currentEpoch,
candidate: c.candidate,
status: 'PROPOSED',
epochCreatedAt: moment.unix(blk.timestamp).utc()
}
},
upsert: true
}
})
default:
break

db.Status.findOneAndUpdate({ epoch: currentEpoch, candidate: c.candidate }, {
epoch: currentEpoch,
candidate: c.candidate,
status: 'SLASHED',
epochCreatedAt: moment.unix(blk.timestamp).utc()
}, { upsert: true }).then(() => true)
.catch(error => console.log(error))
penalties.push(c.candidate)
break
case 'PROPOSED':
await db.Candidate.findOneAndUpdate({
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: c.candidate.toLowerCase()
}, {
$set: {
status: 'PROPOSED'
}
}, { upsert: true })
await db.Status.findOneAndUpdate({ epoch: currentEpoch, candidate: c.candidate }, {
epoch: currentEpoch,
candidate: c.candidate,
status: 'PROPOSED',
epochCreatedAt: moment.unix(blk.timestamp).utc()
}, { upsert: true })
break
default:
break
}
}
}))

}

if (candidateBulkOps.length > 0) {
const res = await db.Candidate.collection.bulkWrite(candidateBulkOps)
logger.debug(`Update candidates at block ${blk.number}, result ${res}`)
}
if (statusBulkOps.length > 0) {
const res = await db.Status.collection.bulkWrite(statusBulkOps)
logger.debug(`Update statuses at block ${blk.number}, result ${res}`)
}

await db.Signer.findOneAndUpdate({ blockNumber: blk.number }, {
networkId: config.get('blockchain.networkId'),
blockNumber: blk.number,
Expand All @@ -381,7 +450,7 @@ async function updateSignerPenAndStatus () {
}

let sleep = (time) => new Promise((resolve) => setTimeout(resolve, time))
async function watchNewBlock (n) {
async function watchNewBlock(n) {
try {
let blockNumber = await web3.eth.getBlockNumber()
n = n || blockNumber
Expand Down Expand Up @@ -505,7 +574,7 @@ async function watchNewBlock (n) {
return watchNewBlock(n)
}

async function fireNotification (voter, candidate, name, event, blockNumber, amount = '') {
async function fireNotification(voter, candidate, name, event, blockNumber, amount = '') {
try {
const isRead = false
await db.Notification.findOneAndUpdate({
Expand All @@ -526,7 +595,7 @@ async function fireNotification (voter, candidate, name, event, blockNumber, amo
}
}

function diff (a, b) {
function diff(a, b) {
return a.filter((i) => {
return b.indexOf(i) < 0
})
Expand Down Expand Up @@ -561,11 +630,11 @@ const getBlockSigners = async (number) => {
return []
}

async function updateLatestSignedBlock (blk) {
async function updateLatestSignedBlock(blk) {
try {
if (!blk ||
blk.number % parseInt(config.get('blockchain.blockSignerGap')) !==
parseInt(config.get('blockchain.blockSignerDelay'))
parseInt(config.get('blockchain.blockSignerDelay'))
) {
return
}
Expand Down Expand Up @@ -597,7 +666,7 @@ async function updateLatestSignedBlock (blk) {
}
}

async function getPastEvent () {
async function getPastEvent() {
let blockNumber = await web3.eth.getBlockNumber()
let lastBlockTx = await db.Transaction.findOne().sort({ blockNumber: -1 })
let lb = (lastBlockTx && lastBlockTx.blockNumber) ? lastBlockTx.blockNumber : 0
Expand Down

0 comments on commit 645c380

Please sign in to comment.