Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub #182

Merged
merged 28 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a09b636
Added Handling of Ping and pool join request process
ehsan6sha Nov 7, 2023
5c4d12f
ensure the JoinPoolRequest announcement can be initiated once only
ehsan6sha Nov 7, 2023
98a7cb8
add the peerID of announcer to a list and no longer accepts the pubsub
ehsan6sha Nov 7, 2023
049577d
Auto-run FetchUsersAndPopulateSets if not run for the past x hours fr…
ehsan6sha Nov 7, 2023
c03e3f7
Corrected shutdown error
ehsan6sha Nov 9, 2023
a6d61ae
enhancewd start of FetchUsersAndPopulateSets
ehsan6sha Nov 9, 2023
a1aeca7
Updating Peerstore with pool nodes
ehsan6sha Nov 9, 2023
aa67f9e
joinpool final draft
ehsan6sha Nov 9, 2023
c7791bd
first issue correction after testing
ehsan6sha Nov 10, 2023
64cd448
switched to callBlockchain
ehsan6sha Nov 10, 2023
511e648
corrected some bugs in reading invalid peerid
ehsan6sha Nov 10, 2023
300c290
Added a parameter --generateNodeKey to generate node key from identit…
ehsan6sha Nov 10, 2023
f85fd17
corrected lint error
ehsan6sha Nov 10, 2023
08e3dc6
corrected test files
ehsan6sha Nov 10, 2023
68a90d8
Update example_test.go
ehsan6sha Nov 10, 2023
a393eab
reoved t *testing.T
ehsan6sha Nov 10, 2023
bcdc3e8
corrected announcement tests
ehsan6sha Nov 11, 2023
45b2675
adding to peerstore moved to process
ehsan6sha Nov 11, 2023
49288f3
Added tests for ping
ehsan6sha Nov 12, 2023
e9ced6d
Added test for announcements
ehsan6sha Nov 13, 2023
5510a19
first draft of full testcase
ehsan6sha Nov 16, 2023
b46b507
Update announcements.go
ehsan6sha Nov 16, 2023
5efbc4c
added example
ehsan6sha Nov 16, 2023
9cc2879
Corrected vote method
ehsan6sha Nov 16, 2023
58832e6
added some logs
ehsan6sha Nov 16, 2023
a5e2601
commented unused code
ehsan6sha Nov 16, 2023
d1b75fd
updated tests
ehsan6sha Nov 18, 2023
05a6e06
Update example_test.go
ehsan6sha Nov 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 152 additions & 51 deletions announcements/announcements.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multiaddr"
)

var (
Expand All @@ -28,11 +29,14 @@ var (
type (
FxAnnouncements struct {
*options
h host.Host
sub *pubsub.Subscription
topic *pubsub.Topic
stopJoinPoolRequestChan chan struct{} // add this line
closeJoinPoolRequestOnce sync.Once
h host.Host
sub *pubsub.Subscription
topic *pubsub.Topic
stopJoinPoolRequestChan chan struct{} // add this line
closeJoinPoolRequestOnce sync.Once
PoolJoinRequestHandler PoolJoinRequestHandler
announcingJoinPoolRequest bool
announcingJoinPoolMutex sync.Mutex
}
)

Expand All @@ -55,82 +59,141 @@ func NewFxAnnouncements(h host.Host, o ...Option) (*FxAnnouncements, error) {
}

func (an *FxAnnouncements) Start(ctx context.Context, validator pubsub.Validator) error {
if an.topicName == "" || an.topicName == "0" {
log.Warnw("Announcement do not have any topic to subscribe to", "on peer", an.h.ID())
return errors.New("Announcement do not have any topic to subscribe to")
}
typeSystem, err := ipld.LoadSchemaBytes(schemaBytes)
if err != nil {
panic(fmt.Errorf("cannot load schema: %w", err))
}
PubSubPrototypes.Announcement = bindnode.Prototype((*Announcement)(nil), typeSystem.TypeByName("Announcement"))

gsub, err := pubsub.NewGossipSub(ctx, an.h,
gr := pubsub.DefaultGossipSubRouter(an.h)

/*var addrInfos []peer.AddrInfo
for _, relay := range an.relays {
// Parse the multiaddr
ma, err := multiaddr.NewMultiaddr(relay)
if err != nil {
log.Warnln("Error parsing multiaddr:", err)
continue
}

// Extract the peer ID
addrInfo, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
log.Warnln("Error extracting peer ID:", err)
continue
}
if addrInfo != nil {
addrInfos = append(addrInfos, *addrInfo)
}
}*/

gsub, err := pubsub.NewGossipSubWithRouter(ctx, an.h, gr,
pubsub.WithPeerExchange(true),
pubsub.WithFloodPublish(true),
pubsub.WithMessageSigning(true),
pubsub.WithDefaultValidator(validator),
)

if err != nil {
log.Errorw("Error happened while creating pubsub", "peer", an.h.ID())
return err
}

log.Debugw("Created topic", "on peer", an.h.ID(), "topic", an.topicName)
an.topic, err = gsub.Join(an.topicName)
if err != nil {
log.Errorw("Error happened while joining the topic", "peer", an.h.ID())
return err
}
an.sub, err = an.topic.Subscribe()
if err != nil {
log.Errorw("Error happened while subscribing the topic", "peer", an.h.ID())
return err
}
return nil
}

func (an *FxAnnouncements) processAnnouncement(ctx context.Context, from peer.ID, atype AnnouncementType) error {
func (an *FxAnnouncements) processAnnouncement(ctx context.Context, from peer.ID, atype AnnouncementType, addrs []multiaddr.Multiaddr, topicString string) error {
log.Infow("processing announcement", "on", an.h.ID(), "from", from)
switch atype {
case IExistAnnouncementType:
log.Debug("IExist request")
log.Info("IExist request", "on", an.h.ID(), "from", from)
an.h.Peerstore().AddAddrs(from, addrs, peerstore.ConnectedAddrTTL)
case PoolJoinRequestAnnouncementType:
log.Debug("PoolJoin request")
log.Info("PoolJoin request", "on", an.h.ID(), "from", from)
if err := an.PoolJoinRequestHandler.HandlePoolJoinRequest(ctx, from, topicString, true); err != nil {
log.Errorw("An error occurred in handling pool join request announcement", "on", an.h.ID(), "from", from, err)
return err
}
default:
log.Debug("Unknown request")
log.Info("Unknown request", "on", an.h.ID(), "from", from)
}
return nil
}

func (an *FxAnnouncements) HandleAnnouncements(ctx context.Context) {
log.Debug("called wg.Done in HandleAnnouncements")
defer an.wg.Done()
for {
msg, err := an.sub.Next(ctx)
switch {
case ctx.Err() != nil || err == pubsub.ErrSubscriptionCancelled || err == pubsub.ErrTopicClosed:
log.Info("stopped handling announcements")
return
case err != nil:
log.Errorw("failed to get the next announcement", "err", err)
continue
}
from, err := peer.IDFromBytes(msg.From)
if err != nil {
log.Errorw("failed to decode announcement sender", "err", err)
continue
}
if from == an.h.ID() {
continue
}
a := &Announcement{}
if err = a.UnmarshalBinary(msg.Data); err != nil {
log.Errorw("failed to decode announcement data", "err", err)
continue
}
addrs, err := a.GetAddrs()
if err != nil {
log.Errorw("failed to decode announcement addrs", "err", err)
if msg != nil {
log.Debugw("HandleAnnouncements", "msg.Topic", msg.Topic)
if err != nil {
log.Debugw("HandleAnnouncements Error", err)
}
switch {
case ctx.Err() != nil || err == pubsub.ErrSubscriptionCancelled || err == pubsub.ErrTopicClosed:
log.Info("stopped handling announcements")
return
case err != nil:
log.Errorw("failed to get the next announcement", "err", err)
continue
}
from, err := peer.IDFromBytes(msg.From)
if err != nil {
log.Errorw("failed to decode announcement sender", "err", err)
continue
}
if from == an.h.ID() {
log.Debug("ignoring announcement from self")
continue
}
a := &Announcement{}
if err = a.UnmarshalBinary(msg.Data); err != nil {
log.Errorw("failed to decode announcement data", "err", err)
continue
}

addrs, err := a.GetAddrs()
if err != nil {
log.Errorw("failed to decode announcement addrs", "err", err)
continue
}

log.Debugw("received announcement", "from", from, "self", an.h.ID(), "announcement", a)
log.Debug("processAnnouncement call")
if msg.Topic != nil {
err = an.processAnnouncement(ctx, from, a.Type, addrs, *msg.Topic)
if err != nil {
log.Errorw("failed to process announcement", "err", err)
continue
}
} else {
log.Debug("Topic is nil")
continue
}
} else {
continue
}
an.h.Peerstore().AddAddrs(from, addrs, peerstore.PermanentAddrTTL)
log.Infow("received announcement", "from", from, "self", an.h.ID(), "announcement", a)
an.processAnnouncement(ctx, from, a.Type)
}
}

func (an *FxAnnouncements) AnnounceIExistPeriodically(ctx context.Context) {
log.Debug("called wg.Done in AnnounceIExistPeriodically")
defer an.wg.Done()
ticker := time.NewTicker(an.announceInterval)
for {
Expand Down Expand Up @@ -167,15 +230,33 @@ func (an *FxAnnouncements) AnnounceIExistPeriodically(ctx context.Context) {
}

func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Context) {
log.Debugw("called wg.Done in AnnounceJoinPoolRequestPeriodically pool join request", "peer", an.h.ID())
log.Debugw("Starting AnnounceJoinPoolRequestPeriodically pool join request", "peer", an.h.ID())
log.Debugw("peerlist before AnnounceJoinPoolRequestPeriodically pool join request", "on", an.h.ID(), "peerlist", an.topic.ListPeers())

defer an.wg.Done()
an.announcingJoinPoolMutex.Lock()
if an.announcingJoinPoolRequest {
an.announcingJoinPoolMutex.Unlock()
log.Info("pool join request announcements are already in progress.", "peer", an.h.ID())
return
}
an.announcingJoinPoolRequest = true
an.announcingJoinPoolMutex.Unlock()
defer func() {
an.announcingJoinPoolMutex.Lock()
an.announcingJoinPoolRequest = false
an.announcingJoinPoolMutex.Unlock()
}()
ticker := time.NewTicker(an.announceInterval)
for {
log.Debugw("inside ticker for join pool request", "peer", an.h.ID())
select {
case <-ctx.Done():
log.Info("stopped making periodic announcements")
log.Info("stopped making periodic pool join request announcements", "peer", an.h.ID())
return
case <-an.stopJoinPoolRequestChan: // Assume an.stopChan is a `chan struct{}` used to signal stopping the ticker.
log.Info("stopped making periodic joinpoolrequest announcements due to stop signal")
log.Info("stopped making periodic pool join request announcements due to stop signal", "peer", an.h.ID())
return
case t := <-ticker.C:
a := &Announcement{
Expand All @@ -185,52 +266,58 @@ func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Conte
a.SetAddrs(an.h.Addrs()...)
b, err := a.MarshalBinary()
if err != nil {
log.Errorw("failed to encode pool join request announcement", "err", err)
log.Errorw("failed to encode pool join request announcement", "peer", an.h.ID(), "err", err)
continue
}
log.Debugw("inside ticker for join pool request now publishing", "peer", an.h.ID())
if err := an.topic.Publish(ctx, b); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
log.Info("stopped making periodic announcements")
log.Info("stopped making periodic pool join request announcements", "peer", an.h.ID(), "err", err)
return
}
if errors.Is(err, pubsub.ErrTopicClosed) || errors.Is(err, pubsub.ErrSubscriptionCancelled) {
log.Info("stopped making periodic iexist announcements as topic is closed or subscription cancelled")
log.Info("stopped making periodic pool join request announcements as topic is closed or subscription cancelled", "peer", an.h.ID(), "err", err)
return
}
log.Errorw("failed to publish pool join request announcement", "err", err)
log.Errorw("failed to publish pool join request announcement", "peer", an.h.ID(), "err", err)
continue
}
log.Infow("announced pool join request message", "from", an.h.ID(), "announcement", a, "time", t)
log.Debugw("announced pool join request message", "from", an.h.ID(), "announcement", a, "time", t)
}
}
}

func (an *FxAnnouncements) ValidateAnnouncement(ctx context.Context, id peer.ID, msg *pubsub.Message, status common.MemberStatus, exists bool) bool {
log.Debugw("ValidateAnnouncement", "on peer", an.h.ID(), "from peerID", id)
a := &Announcement{}
if err := a.UnmarshalBinary(msg.Data); err != nil {
log.Errorw("failed to unmarshal announcement data", "err", err)
return false
}
log.Debugw("ValidateAnnouncement", "on peer", an.h.ID(), "from peerID", id, "type", a.Type)

switch a.Type {
case NewManifestAnnouncementType:
// Check if sender is approved
if !exists {
log.Errorw("peer is not recognized", "peer", id)
log.Debugw("peer is not recognized", "on peer", an.h.ID(), "from peer", id)
return false
}
if status != common.Approved {
log.Errorw("peer is not an approved member", "peer", id)
log.Debugw("peer is not an approved member", "on peer", an.h.ID(), "from peer", id)
return false
}
case PoolJoinRequestAnnouncementType:
if status != common.Unknown {
log.Errorw("peer is no longer permitted to send this message type", "peer", id)
if status == common.Unknown {
log.Debugw("peer is no longer permitted to send this message type", "on peer", an.h.ID(), "from peer", id, "status", status)
return false
} else {
log.Debugw("PoolJoinRequestAnnouncementType status is not Unknown and ok")
}
case PoolJoinApproveAnnouncementType, IExistAnnouncementType:
// Any member status is valid for a pool join announcement
default:
log.Debugw("The Type is not set ", a.Type)
return false
}

Expand All @@ -240,12 +327,26 @@ func (an *FxAnnouncements) ValidateAnnouncement(ctx context.Context, id peer.ID,

func (an *FxAnnouncements) StopJoinPoolRequestAnnouncements() {
an.closeJoinPoolRequestOnce.Do(func() {
an.announcingJoinPoolMutex.Lock()
an.announcingJoinPoolRequest = false
an.announcingJoinPoolMutex.Unlock()
close(an.stopJoinPoolRequestChan)
})
}

func (an *FxAnnouncements) Shutdown(ctx context.Context) error {
an.sub.Cancel()
tErr := an.topic.Close()
return tErr
log.Debugw("closed topic", "peer", an.h.ID())
if an.sub != nil {
an.sub.Cancel()
tErr := an.topic.Close()
return tErr
}
log.Debug("Announcements are already closed")
return nil
}

// In the announcements package, add this to your concrete type that implements the Announcements interface.
func (an *FxAnnouncements) SetPoolJoinRequestHandler(handler PoolJoinRequestHandler) {
// Set the handler
an.PoolJoinRequestHandler = handler
}
3 changes: 3 additions & 0 deletions announcements/announcements_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package announcements_test

//Test are included in the blox
5 changes: 5 additions & 0 deletions announcements/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type Announcements interface {
Shutdown(context.Context) error
}

// PoolJoinRequestHandler is the interface that will be called by the blockchain package.
type PoolJoinRequestHandler interface {
HandlePoolJoinRequest(context.Context, peer.ID, string, bool) error
}

var (
PubSubPrototypes struct {
Announcement schema.TypedPrototype
Expand Down
8 changes: 8 additions & 0 deletions announcements/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type (
timeout int
topicName string
wg *sync.WaitGroup
relays []string
}
)

Expand Down Expand Up @@ -52,3 +53,10 @@ func WithWg(wg *sync.WaitGroup) Option {
return nil
}
}

func WithRelays(r []string) Option {
return func(o *options) error {
o.relays = r
return nil
}
}
Loading