Skip to content

Commit

Permalink
add api for dynamically setting and resetting topic score parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzo committed Sep 9, 2020
1 parent a3445b7 commit 5d06aa2
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 0 deletions.
44 changes: 44 additions & 0 deletions score.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,50 @@ func newPeerScore(params *PeerScoreParams) *peerScore {
}
}

// update interface
func (ps *peerScore) SetTopicScoreParams(topic string, p *TopicScoreParams) error {
// Note: assumes that the topic score parameters have already been validated
ps.Lock()
defer ps.Unlock()

old, exist := ps.params.Topics[topic]
ps.params.Topics[topic] = p

if !exist {
return nil
}

// check to see if the counter Caps are being lowered; if that's the case we need to recap them
recap := false
if p.FirstMessageDeliveriesCap < old.FirstMessageDeliveriesCap {
recap = true
}
if p.MeshMessageDeliveriesCap < old.MeshMessageDeliveriesCap {
recap = true
}
if !recap {
return nil
}

// recap counters for topic
for _, pstats := range ps.peerStats {
tstats, ok := pstats.topics[topic]
if !ok {
continue
}

if tstats.firstMessageDeliveries > p.FirstMessageDeliveriesCap {
tstats.firstMessageDeliveries = p.FirstMessageDeliveriesCap
}

if tstats.meshMessageDeliveries > p.MeshMessageDeliveriesCap {
tstats.meshMessageDeliveries = p.MeshMessageDeliveriesCap
}
}

return nil
}

// router interface
func (ps *peerScore) Start(gs *GossipSubRouter) {
if ps == nil {
Expand Down
39 changes: 39 additions & 0 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,45 @@ func (t *Topic) String() string {
return t.topic
}

// SetScoreParams sets the topic score parameters if the pubsub router supports peer
// scoring
func (t *Topic) SetScoreParams(p *TopicScoreParams) error {
t.mux.Lock()
defer t.mux.Unlock()

if t.closed {
return ErrTopicClosed
}

err := p.validate()
if err != nil {
return fmt.Errorf("invalid topic score parameters: %w", err)
}

result := make(chan error, 1)
select {
case t.p.eval <- func() {
gs, ok := t.p.rt.(*GossipSubRouter)
if !ok {
result <- fmt.Errorf("pubsub router is not gossipsub")
return
}

if gs.score == nil {
result <- fmt.Errorf("peer scoring is not enabled in router")
return
}

result <- gs.score.SetTopicScoreParams(t.topic, p)
}:
err = <-result
return err

case <-t.p.ctx.Done():
return t.p.ctx.Err()
}
}

// EventHandler creates a handle for topic specific events
// Multiple event handlers may be created and will operate independently of each other
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) {
Expand Down

0 comments on commit 5d06aa2

Please sign in to comment.