From 5d06aa2d4fd4d966e976b3a310f038fc898d626b Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 12 Aug 2020 12:29:49 +0300 Subject: [PATCH] add api for dynamically setting and resetting topic score parameters --- score.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ topic.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/score.go b/score.go index c5f97d67..c9688eec 100644 --- a/score.go +++ b/score.go @@ -182,6 +182,50 @@ func newPeerScore(params *PeerScoreParams) *peerScore { } } +// update interface +func (ps *peerScore) SetTopicScoreParams(topic string, p *TopicScoreParams) error { + // Note: assumes that the topic score parameters have already been validated + ps.Lock() + defer ps.Unlock() + + old, exist := ps.params.Topics[topic] + ps.params.Topics[topic] = p + + if !exist { + return nil + } + + // check to see if the counter Caps are being lowered; if that's the case we need to recap them + recap := false + if p.FirstMessageDeliveriesCap < old.FirstMessageDeliveriesCap { + recap = true + } + if p.MeshMessageDeliveriesCap < old.MeshMessageDeliveriesCap { + recap = true + } + if !recap { + return nil + } + + // recap counters for topic + for _, pstats := range ps.peerStats { + tstats, ok := pstats.topics[topic] + if !ok { + continue + } + + if tstats.firstMessageDeliveries > p.FirstMessageDeliveriesCap { + tstats.firstMessageDeliveries = p.FirstMessageDeliveriesCap + } + + if tstats.meshMessageDeliveries > p.MeshMessageDeliveriesCap { + tstats.meshMessageDeliveries = p.MeshMessageDeliveriesCap + } + } + + return nil +} + // router interface func (ps *peerScore) Start(gs *GossipSubRouter) { if ps == nil { diff --git a/topic.go b/topic.go index f4b670be..cfdf088f 100644 --- a/topic.go +++ b/topic.go @@ -31,6 +31,45 @@ func (t *Topic) String() string { return t.topic } +// SetScoreParams sets the topic score parameters if the pubsub router supports peer +// scoring +func (t *Topic) SetScoreParams(p *TopicScoreParams) error { + t.mux.Lock() + defer t.mux.Unlock() + + if t.closed { + return ErrTopicClosed + } + + err := p.validate() + if err != nil { + return fmt.Errorf("invalid topic score parameters: %w", err) + } + + result := make(chan error, 1) + select { + case t.p.eval <- func() { + gs, ok := t.p.rt.(*GossipSubRouter) + if !ok { + result <- fmt.Errorf("pubsub router is not gossipsub") + return + } + + if gs.score == nil { + result <- fmt.Errorf("peer scoring is not enabled in router") + return + } + + result <- gs.score.SetTopicScoreParams(t.topic, p) + }: + err = <-result + return err + + case <-t.p.ctx.Done(): + return t.p.ctx.Err() + } +} + // EventHandler creates a handle for topic specific events // Multiple event handlers may be created and will operate independently of each other func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) {