From 5510a19b3aab522fd02f6d8dc1d4bc299a1efa18 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Wed, 15 Nov 2023 19:58:11 -0500 Subject: [PATCH] first draft of full testcase --- announcements/announcements.go | 82 ++++--- announcements/options.go | 8 + blockchain/blockchain.go | 52 +++-- blockchain/options.go | 3 + blox/blox.go | 7 +- blox/example_test.go | 381 +++++++++++++++++++++++++++++++++ blox/options.go | 37 ++-- cmd/blox/main.go | 18 +- cmd/test/main.go | 294 +++++++++++++++++++------ 9 files changed, 756 insertions(+), 126 deletions(-) diff --git a/announcements/announcements.go b/announcements/announcements.go index f1a1320e..93409fd8 100644 --- a/announcements/announcements.go +++ b/announcements/announcements.go @@ -59,8 +59,8 @@ func NewFxAnnouncements(h host.Host, o ...Option) (*FxAnnouncements, error) { } func (an *FxAnnouncements) Start(ctx context.Context, validator pubsub.Validator) error { - if an.topicName == "" { - log.Warn("Announcement do not have any topic to subscribe to") + if an.topicName == "" || an.topicName == "0" { + log.Warnw("Announcement do not have any topic to subscribe to", "on peer", an.h.ID()) return errors.New("Announcement do not have any topic to subscribe to") } typeSystem, err := ipld.LoadSchemaBytes(schemaBytes) @@ -69,41 +69,68 @@ func (an *FxAnnouncements) Start(ctx context.Context, validator pubsub.Validator } PubSubPrototypes.Announcement = bindnode.Prototype((*Announcement)(nil), typeSystem.TypeByName("Announcement")) - gsub, err := pubsub.NewGossipSub(ctx, an.h, + gr := pubsub.DefaultGossipSubRouter(an.h) + + var addrInfos []peer.AddrInfo + for _, relay := range an.relays { + // Parse the multiaddr + ma, err := multiaddr.NewMultiaddr(relay) + if err != nil { + fmt.Println("Error parsing multiaddr:", err) + continue + } + + // Extract the peer ID + addrInfo, err := peer.AddrInfoFromP2pAddr(ma) + if err != nil { + fmt.Println("Error extracting peer ID:", err) + continue + } + if addrInfo != nil { + addrInfos = append(addrInfos, *addrInfo) + } + } + + gsub, err := pubsub.NewGossipSubWithRouter(ctx, an.h, gr, pubsub.WithPeerExchange(true), pubsub.WithFloodPublish(true), pubsub.WithMessageSigning(true), pubsub.WithDefaultValidator(validator), ) + if err != nil { + log.Errorw("Error happened while creating pubsub", "peer", an.h.ID()) return err } + log.Debugw("Created topic", "on peer", an.h.ID(), "topic", an.topicName) an.topic, err = gsub.Join(an.topicName) if err != nil { + log.Errorw("Error happened while joining the topic", "peer", an.h.ID()) return err } an.sub, err = an.topic.Subscribe() if err != nil { + log.Errorw("Error happened while subscribing the topic", "peer", an.h.ID()) return err } return nil } func (an *FxAnnouncements) processAnnouncement(ctx context.Context, from peer.ID, atype AnnouncementType, addrs []multiaddr.Multiaddr, topicString string) error { - log.Info("processing announcement") + log.Infow("processing announcement", "on", an.h.ID(), "from", from) switch atype { case IExistAnnouncementType: - log.Info("IExist request") + log.Info("IExist request", "on", an.h.ID(), "from", from) an.h.Peerstore().AddAddrs(from, addrs, peerstore.ConnectedAddrTTL) case PoolJoinRequestAnnouncementType: - log.Info("PoolJoin request") + log.Info("PoolJoin request", "on", an.h.ID(), "from", from) if err := an.PoolJoinRequestHandler.HandlePoolJoinRequest(ctx, from, topicString, true); err != nil { - log.Errorw("An error occurred in handling pool join request announcement", err) + log.Errorw("An error occurred in handling pool join request announcement", "on", an.h.ID(), "from", from, err) return err } default: - log.Info("Unknown request") + log.Info("Unknown request", "on", an.h.ID(), "from", from) } return nil } @@ -203,13 +230,15 @@ func (an *FxAnnouncements) AnnounceIExistPeriodically(ctx context.Context) { } func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Context) { - log.Debug("called wg.Done in AnnounceJoinPoolRequestPeriodically") - log.Debug("Starting AnnounceJoinPoolRequestPeriodically") + log.Debugw("called wg.Done in AnnounceJoinPoolRequestPeriodically pool join request", "peer", an.h.ID()) + log.Debugw("Starting AnnounceJoinPoolRequestPeriodically pool join request", "peer", an.h.ID()) + log.Debugw("peerlist before AnnounceJoinPoolRequestPeriodically pool join request", "on", an.h.ID(), "peerlist", an.topic.ListPeers()) + defer an.wg.Done() an.announcingJoinPoolMutex.Lock() if an.announcingJoinPoolRequest { an.announcingJoinPoolMutex.Unlock() - log.Info("Join pool request announcements are already in progress.") + log.Info("pool join request announcements are already in progress.", "peer", an.h.ID()) return } an.announcingJoinPoolRequest = true @@ -221,12 +250,13 @@ func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Conte }() ticker := time.NewTicker(an.announceInterval) for { + log.Debugw("inside ticker for join pool request", "peer", an.h.ID()) select { case <-ctx.Done(): - log.Info("stopped making periodic announcements") + log.Info("stopped making periodic pool join request announcements", "peer", an.h.ID()) return case <-an.stopJoinPoolRequestChan: // Assume an.stopChan is a `chan struct{}` used to signal stopping the ticker. - log.Info("stopped making periodic joinpoolrequest announcements due to stop signal") + log.Info("stopped making periodic pool join request announcements due to stop signal", "peer", an.h.ID()) return case t := <-ticker.C: a := &Announcement{ @@ -236,19 +266,20 @@ func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Conte a.SetAddrs(an.h.Addrs()...) b, err := a.MarshalBinary() if err != nil { - log.Errorw("failed to encode pool join request announcement", "err", err) + log.Errorw("failed to encode pool join request announcement", "peer", an.h.ID(), "err", err) continue } + log.Debugw("inside ticker for join pool request now publishing", "peer", an.h.ID()) if err := an.topic.Publish(ctx, b); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - log.Info("stopped making periodic announcements") + log.Info("stopped making periodic pool join request announcements", "peer", an.h.ID(), "err", err) return } if errors.Is(err, pubsub.ErrTopicClosed) || errors.Is(err, pubsub.ErrSubscriptionCancelled) { - log.Info("stopped making periodic iexist announcements as topic is closed or subscription cancelled") + log.Info("stopped making periodic pool join request announcements as topic is closed or subscription cancelled", "peer", an.h.ID(), "err", err) return } - log.Errorw("failed to publish pool join request announcement", "err", err) + log.Errorw("failed to publish pool join request announcement", "peer", an.h.ID(), "err", err) continue } log.Debugw("announced pool join request message", "from", an.h.ID(), "announcement", a, "time", t) @@ -257,36 +288,36 @@ func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Conte } func (an *FxAnnouncements) ValidateAnnouncement(ctx context.Context, id peer.ID, msg *pubsub.Message, status common.MemberStatus, exists bool) bool { - log.Debug("ValidateAnnouncement") + log.Debugw("ValidateAnnouncement", "on peer", an.h.ID(), "from peerID", id) a := &Announcement{} if err := a.UnmarshalBinary(msg.Data); err != nil { log.Errorw("failed to unmarshal announcement data", "err", err) return false } - log.Debugw("ValidateAnnouncement", "peerID", id, "type", a.Type) + log.Debugw("ValidateAnnouncement", "on peer", an.h.ID(), "from peerID", id, "type", a.Type) switch a.Type { case NewManifestAnnouncementType: // Check if sender is approved if !exists { - log.Errorw("peer is not recognized", "peer", id) + log.Debugw("peer is not recognized", "on peer", an.h.ID(), "from peer", id) return false } if status != common.Approved { - log.Errorw("peer is not an approved member", "peer", id) + log.Debugw("peer is not an approved member", "on peer", an.h.ID(), "from peer", id) return false } case PoolJoinRequestAnnouncementType: - if status != common.Unknown { - log.Errorw("peer is no longer permitted to send this message type", "peer", id) + if status == common.Unknown { + log.Debugw("peer is no longer permitted to send this message type", "on peer", an.h.ID(), "from peer", id, "status", status) return false } else { - log.Debugw("PoolJoinRequestAnnouncementType status is Unknown and ok") + log.Debugw("PoolJoinRequestAnnouncementType status is not Unknown and ok") } case PoolJoinApproveAnnouncementType, IExistAnnouncementType: // Any member status is valid for a pool join announcement default: - log.Errorw("The Type is not set ", a.Type) + log.Debugw("The Type is not set ", a.Type) return false } @@ -304,6 +335,7 @@ func (an *FxAnnouncements) StopJoinPoolRequestAnnouncements() { } func (an *FxAnnouncements) Shutdown(ctx context.Context) error { + log.Debugw("closed topic", "peer", an.h.ID()) an.sub.Cancel() tErr := an.topic.Close() return tErr diff --git a/announcements/options.go b/announcements/options.go index 932a54c5..271e61f3 100644 --- a/announcements/options.go +++ b/announcements/options.go @@ -12,6 +12,7 @@ type ( timeout int topicName string wg *sync.WaitGroup + relays []string } ) @@ -52,3 +53,10 @@ func WithWg(wg *sync.WaitGroup) Option { return nil } } + +func WithRelays(r []string) Option { + return func(o *options) error { + o.relays = r + return nil + } +} diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 282125ca..7d57eb85 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -639,6 +639,7 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri //If members list is empty we should check what peerIDs we already voted on and update to avoid re-voting isMembersEmpty := bl.IsMembersEmpty() if isMembersEmpty { + log.Debugw("Members list is empty", "peer", bl.h.ID()) // Call the bl.PoolRequests and get the list of requests req := PoolRequestsRequest{ PoolID: topic, // assuming 'topic' is your pool id @@ -656,6 +657,7 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri // For each one check if voted field in the response, contains the bl.h.ID().String() and if so it means we already voted // Move it to members with status UnKnown. + log.Debugw("Empty members for ", "peer", bl.h.ID(), "Received response from blockchian", poolRequestsResponse.PoolRequests) for _, request := range poolRequestsResponse.PoolRequests { if contains(request.Voted, bl.h.ID().String()) { pid, err := peer.Decode(request.PeerID) @@ -671,7 +673,7 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri // Loop through the static relays and convert them to multiaddr for _, relay := range bl.relays { - ma, err := multiaddr.NewMultiaddr(relay) + ma, err := multiaddr.NewMultiaddr(relay + "/p2p-circuit/p2p/" + pid.String()) if err != nil { bl.membersLock.Unlock() return err @@ -685,10 +687,10 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri } } } - log.Debugw("stored members are", bl.members) + log.Debugw("stored members after empty member list", "peer", bl.h.ID(), "members", bl.members) } - //Get hte list of both join requests and joined members for the pool + //Get the list of both join requests and joined members for the pool // Create a struct for the POST req req := PoolUserListRequest{ PoolID: topic, @@ -707,7 +709,7 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri } // Now iterate through the users and populate the member map - log.Debugw("Now iterate through the users and populate the member map", "response", response.Users) + log.Debugw("Now iterate through the users and populate the member map", "peer", bl.h.ID(), "response", response.Users) bl.membersLock.Lock() for _, user := range response.Users { pid, err := peer.Decode(user.PeerID) @@ -722,11 +724,14 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri log.Debugw("Found self peerID", user.PeerID) if user.RequestPoolID != nil { if !bl.p.Status() { + log.Debugw("Found self peerID and running Ping Server now", "peer", user.PeerID) err = bl.p.Start(ctx) if err != nil { - log.Errorw("Error when starting hte Ping Server", "PeerID", user.PeerID, "err", err) + log.Errorw("Error when starting the Ping Server", "PeerID", user.PeerID, "err", err) } else { - bl.a.AnnounceJoinPoolRequestPeriodically(ctx) + log.Debugw("Found self peerID and ran Ping Server and announcing pooljoinrequest now", "peer", user.PeerID) + bl.wg.Add(1) + go bl.a.AnnounceJoinPoolRequestPeriodically(ctx) } } else { log.Debugw("Ping Server is already running for self peerID", user.PeerID) @@ -751,22 +756,29 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri } if initiate { - //Vote for any peer that has not votd already - if exists && existingStatus != common.Unknown { + //Vote for any peer that has not voted already + if exists && existingStatus == common.Pending { + log.Debugw("Voting for peers", "pool", topicString, "from", bl.h.ID(), "for", pid) err = bl.HandlePoolJoinRequest(ctx, pid, topicString, false) if err == nil { status = common.Unknown + } else { + log.Errorw("Error happened while voting", "pool", topicString, "from", bl.h.ID(), "for", pid) } } } if exists { - if existingStatus != common.Approved && status == common.Approved { + log.Debugw("peer already exists in members", "h.ID", bl.h.ID(), "pid", pid, "existingStatus", existingStatus, "status", status) + if existingStatus != status && (existingStatus != common.Approved) { // If the user is already pending and now approved, update to ApprovedOrPending - bl.members[pid] = common.Approved + bl.members[pid] = status + } else { + // If the user status is the same as before, there's no need to update + log.Debugw("member exists but is not approved so no need to change status", "h.ID", bl.h.ID(), "pid", pid, "Status", status, "existingStatus", existingStatus) } - // If the user status is the same as before, there's no need to update } else { + log.Debugw("member does not exists", "h.ID", bl.h.ID(), "pid", pid) // If the user does not exist in the map, add them bl.members[pid] = status // Create a slice to hold the multiaddresses for the peer @@ -774,7 +786,7 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri // Loop through the static relays and convert them to multiaddr for _, relay := range bl.relays { - ma, err := multiaddr.NewMultiaddr(relay) + ma, err := multiaddr.NewMultiaddr(relay + "/p2p-circuit/p2p/" + pid.String()) if err != nil { bl.membersLock.Unlock() return err @@ -783,10 +795,26 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri } // Add the relay addresses to the peerstore for the peer ID + bl.h.Peerstore().AddAddrs(pid, addrs, peerstore.ConnectedAddrTTL) + log.Debugw("Added peer to peerstore", "h.ID", bl.h.ID(), "pid", pid) + } + if pid != bl.h.ID() { + //bl.h.Connect(ctx, peer.AddrInfo{ID: pid, Addrs: addrs}) + log.Debugw("Connecting to other peer", "from", bl.h.ID(), "to", pid) + conn, err := bl.h.Network().DialPeer(ctx, pid) + if err == nil { + maddr := conn.RemoteMultiaddr() + log.Debugw("Connected to peer", "from", bl.h.ID(), "to", pid, "maddr", maddr) + conn.Close() + } else { + log.Debugw("Not Connected to peer", "from", bl.h.ID(), "to", pid, "err", err) + } + } } bl.membersLock.Unlock() + log.Debugw("peerstore for ", "id", bl.h.ID(), "peers", bl.h.Peerstore().Peers()) if initiate { bl.cleanUnwantedPeers(keepPeers) } diff --git a/blockchain/options.go b/blockchain/options.go index 5eb7c651..a2347707 100644 --- a/blockchain/options.go +++ b/blockchain/options.go @@ -58,6 +58,9 @@ func WithAllowTransientConnection(t bool) Option { func WithBlockchainEndPoint(b string) Option { return func(o *options) error { + if b == "" { + b = "127.0.0.1:4000" + } o.blockchainEndPoint = b return nil } diff --git a/blox/blox.go b/blox/blox.go index a3f59826..c12da583 100644 --- a/blox/blox.go +++ b/blox/blox.go @@ -71,7 +71,9 @@ func New(o ...Option) (*Blox, error) { announcements.WithAnnounceInterval(5), announcements.WithTimeout(3), announcements.WithTopicName(p.topicName), - announcements.WithWg(&p.wg)) + announcements.WithWg(&p.wg), + announcements.WithRelays(p.relays), + ) if err != nil { return nil, err } @@ -80,8 +82,7 @@ func New(o ...Option) (*Blox, error) { blockchain.NewSimpleKeyStorer(""), blockchain.WithAuthorizer(authorizer), blockchain.WithAuthorizedPeers(authorizedPeers), - //blockchain.WithBlockchainEndPoint("127.0.0.1:4000"), - blockchain.WithBlockchainEndPoint("api.node3.functionyard.fula.network"), + blockchain.WithBlockchainEndPoint(p.blockchainEndpoint), blockchain.WithTimeout(30), blockchain.WithWg(&p.wg), blockchain.WithFetchFrequency(3), diff --git a/blox/example_test.go b/blox/example_test.go index 159cabc6..4fa473df 100644 --- a/blox/example_test.go +++ b/blox/example_test.go @@ -3,8 +3,10 @@ package blox_test import ( "bytes" "context" + "encoding/json" "fmt" "math/rand" + "net/http" "time" "github.com/functionland/go-fula/blox" @@ -44,6 +46,7 @@ func Example_poolDiscoverPeersViaPubSub() { if err != nil { panic(err) } + n1, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h1)) if err != nil { panic(err) @@ -421,6 +424,7 @@ func Example_ping() { blox.WithHost(h1), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithBlockchainEndPoint("api.node3.functionyard.fula.network"), ) if err != nil { panic(err) @@ -446,6 +450,7 @@ func Example_ping() { blox.WithHost(h2), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithBlockchainEndPoint("api.node3.functionyard.fula.network"), ) if err != nil { panic(err) @@ -471,6 +476,7 @@ func Example_ping() { blox.WithHost(h3), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithBlockchainEndPoint("api.node3.functionyard.fula.network"), ) if err != nil { panic(err) @@ -496,6 +502,7 @@ func Example_ping() { blox.WithHost(h4), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithBlockchainEndPoint("api.node3.functionyard.fula.network"), ) if err != nil { panic(err) @@ -651,6 +658,380 @@ func Example_announcements() { // output for the example. rng := rand.New(rand.NewSource(42)) + // Instantiate the first node in the pool + pid1, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h1, err := libp2p.New(libp2p.Identity(pid1)) + if err != nil { + panic(err) + } + n1, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h1), + blox.WithUpdatePoolName(updatePoolName), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithPingCount(5), + blox.WithBlockchainEndPoint("api.node3.functionyard.fula.network"), + ) + if err != nil { + panic(err) + } + if err := n1.Start(ctx); err != nil { + panic(err) + } + defer n1.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) + + // Instantiate the second node in the pool + pid2, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h2, err := libp2p.New(libp2p.Identity(pid2)) + if err != nil { + panic(err) + } + n2, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h2), + blox.WithUpdatePoolName(updatePoolName), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithPingCount(5), + blox.WithBlockchainEndPoint("api.node3.functionyard.fula.network"), + ) + if err != nil { + panic(err) + } + if err := n2.Start(ctx); err != nil { + panic(err) + } + defer n2.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String()) + + // Instantiate the third node in the pool + pid3, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h3, err := libp2p.New(libp2p.Identity(pid3)) + if err != nil { + panic(err) + } + n3, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h3), + blox.WithUpdatePoolName(updatePoolName), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithPingCount(5), + blox.WithBlockchainEndPoint("api.node3.functionyard.fula.network"), + ) + if err != nil { + panic(err) + } + if err := n3.Start(ctx); err != nil { + panic(err) + } + defer n3.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String()) + + // Connect n1 to n2 and n3 so that there is a path for gossip propagation. + // Note that we are not connecting n2 to n3 as they should discover + // each other via pool's iexist announcements. + h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) + if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { + panic(err) + } + h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) + if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { + panic(err) + } + + // Wait until the nodes discover each other + for { + if len(h1.Peerstore().Peers()) == 3 && + len(h2.Peerstore().Peers()) == 3 && + len(h3.Peerstore().Peers()) == 3 { + break + } + select { + case <-ctx.Done(): + panic(ctx.Err()) + default: + time.Sleep(time.Second) + } + } + + h1Peers := h1.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers)) + for _, id := range h1Peers { + fmt.Printf("- %s\n", id) + } + + h2Peers := h2.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers)) + for _, id := range h2Peers { + fmt.Printf("- %s\n", id) + } + + h3Peers := h3.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers)) + for _, id := range h3Peers { + fmt.Printf("- %s\n", id) + } + + // Instantiate the fourth node not in the pool + pid4, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h4, err := libp2p.New(libp2p.Identity(pid4)) + if err != nil { + panic(err) + } + n4, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h4), + blox.WithUpdatePoolName(updatePoolName), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithPingCount(5), + blox.WithBlockchainEndPoint("api.node3.functionyard.fula.network"), + ) + if err != nil { + panic(err) + } + if err := n4.Start(ctx); err != nil { + panic(err) + } + defer n4.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h4.ID().String()) + + //Manually adding h4 as it is not in the same pool + h4.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) + if err = h4.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil { + panic(err) + } + h4.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) + if err = h4.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { + panic(err) + } + h4.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) + if err = h4.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { + panic(err) + } + + // Wait until the fourth node discover others + for { + if len(h4.Peerstore().Peers()) >= 3 { + break + } + select { + case <-ctx.Done(): + panic(ctx.Err()) + default: + time.Sleep(time.Second) + } + } + + h4Peers := h4.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h4.ID(), len(h4Peers)) + for _, id := range h4Peers { + fmt.Printf("- %s\n", id) + } + + n4.AnnounceJoinPoolRequestPeriodically(ctx) + + // Wait until the fourth node discover others + for { + members := n4.GetBlMembers() + if len(members) >= 2 { + for id, status := range members { + memberInfo := fmt.Sprintf("Member ID: %s, Status: %v", id.String(), status) + fmt.Println(memberInfo) + } + break + } + select { + case <-ctx.Done(): + panic(ctx.Err()) + default: + time.Sleep(3 * time.Second) + } + } + + // Unordered output: + //Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // Instantiated node in pool 1 with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // Instantiated node in pool 1 with ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // Finally QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT peerstore contains 3 nodes: + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // Finally QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF peerstore contains 3 nodes: + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // Finally QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA peerstore contains 3 nodes: + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // Instantiated node in pool 1 with ID: QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Finally QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe peerstore contains 4 nodes: + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // Member ID: 12D3KooWACVcVsQh18jM9UudRQzeYEjxCJQJgFUaAgs41tayjxC4, Status: 1 + // Member ID: , Status: 2 +} + +func startMockServer(addr string) *http.Server { + handler := http.NewServeMux() + + handler.HandleFunc("/fula/pool/join", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/cancel_join", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/poolrequests", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "poolrequests": []map[string]interface{}{ + { + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + "voted": []string{}, + "positive_votes": 0, + "peer_id": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/all", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pools": []map[string]interface{}{ + { + "pool_id": 1, + "creator": "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY", + "pool_name": "PoolTest1", + "region": "Ontario", + "parent": nil, + "participants": []string{ + "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + }, + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/users", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "users": []map[string]interface{}{ + { + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + "pool_id": nil, + "request_pool_id": 1, + "peer_id": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + }, + { + "account": "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + }, + { + "account": "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + }, + { + "account": "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/vote", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/leave", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + // Create an HTTP server + server := &http.Server{ + Addr: addr, + Handler: handler, + } + // Start the server in a new goroutine + go func() { + if err := server.ListenAndServe(); err != http.ErrServerClosed { + panic(err) // Handle the error as you see fit + } + }() + + // Give the server a moment to start + time.Sleep(time.Millisecond * 100) + + return server +} + +func Example_withServer() { + server := startMockServer("127.0.0.1:4000") + defer func() { + // Shutdown the server after test + if err := server.Shutdown(context.Background()); err != nil { + panic(err) // Handle the error as you see fit + } + }() + + const poolName = "1" + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // Elevate log level to show internal communications. + if err := logging.SetLogLevel("*", "info"); err != nil { + panic(err) + } + + // Use a deterministic random generator to generate deterministic + // output for the example. + rng := rand.New(rand.NewSource(42)) + // Instantiate the first node in the pool pid1, _, err := crypto.GenerateECDSAKeyPair(rng) if err != nil { diff --git a/blox/options.go b/blox/options.go index e4ef234e..3afe4135 100644 --- a/blox/options.go +++ b/blox/options.go @@ -21,19 +21,20 @@ type ( Option func(*options) error PoolNameUpdater func(string) error options struct { - h host.Host - name string - topicName string - storeDir string - announceInterval time.Duration - ds datastore.Batching - ls *ipld.LinkSystem - authorizer peer.ID - authorizedPeers []peer.ID - exchangeOpts []exchange.Option - relays []string - updatePoolName PoolNameUpdater - pingCount int + h host.Host + name string + topicName string + storeDir string + announceInterval time.Duration + ds datastore.Batching + ls *ipld.LinkSystem + authorizer peer.ID + authorizedPeers []peer.ID + exchangeOpts []exchange.Option + relays []string + updatePoolName PoolNameUpdater + pingCount int + blockchainEndpoint string } ) @@ -181,3 +182,13 @@ func WithPingCount(pc int) Option { return nil } } + +func WithBlockchainEndPoint(b string) Option { + return func(o *options) error { + if b == "" { + b = "127.0.0.1:4000" + } + o.blockchainEndpoint = b + return nil + } +} diff --git a/cmd/blox/main.go b/cmd/blox/main.go index dbcd68c0..2ea92d44 100644 --- a/cmd/blox/main.go +++ b/cmd/blox/main.go @@ -37,11 +37,12 @@ var ( logger = logging.Logger("fula/cmd/blox") app struct { cli.App - initOnly bool - generateNodeKey bool - wireless bool - configPath string - config struct { + initOnly bool + blockchainEndpoint string + generateNodeKey bool + wireless bool + configPath string + config struct { Identity string `yaml:"identity"` StoreDir string `yaml:"storeDir"` PoolName string `yaml:"poolName"` @@ -162,6 +163,12 @@ func init() { Usage: "Generate node key from identity", Destination: &app.generateNodeKey, }, + &cli.StringFlag{ + Name: "blockchainEndpoint", + Usage: "Change the blockchain APIs endpoint", + Destination: &app.blockchainEndpoint, + Value: "127.0.0.1:4000", + }, }, Before: before, Action: action, @@ -467,6 +474,7 @@ func action(ctx *cli.Context) error { blox.WithStoreDir(app.config.StoreDir), blox.WithRelays(app.config.StaticRelays), blox.WithUpdatePoolName(updatePoolName), + blox.WithBlockchainEndPoint(app.blockchainEndpoint), blox.WithPingCount(5), blox.WithExchangeOpts( exchange.WithUpdateConfig(updateConfig), diff --git a/cmd/test/main.go b/cmd/test/main.go index 432eac50..6ba7bb58 100644 --- a/cmd/test/main.go +++ b/cmd/test/main.go @@ -1,9 +1,13 @@ package main import ( + "bytes" "context" + "encoding/json" "fmt" + "io" "math/rand" + "net/http" "time" "github.com/functionland/go-fula/blox" @@ -11,20 +15,167 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/multiformats/go-multiaddr" ) func updatePoolName(newPoolName string) error { return nil } +var log = logging.Logger("fula/mockserver") + +// requestLoggerMiddleware logs the details of each request +func requestLoggerMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Log the request + body, _ := io.ReadAll(r.Body) + log.Debugw("Received request", "url", r.URL.Path, "method", r.Method, "body", string(body)) + + // Create a new io.Reader from the read body as the original body is now drained + r.Body = io.NopCloser(bytes.NewBuffer(body)) + + // Call the next handler + next.ServeHTTP(w, r) + }) +} +func startMockServer(addr string) *http.Server { + handler := http.NewServeMux() + + handler.HandleFunc("/fula/pool/join", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/cancel_join", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/poolrequests", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "poolrequests": []map[string]interface{}{ + { + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + "voted": []string{}, + "positive_votes": 0, + "peer_id": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/all", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pools": []map[string]interface{}{ + { + "pool_id": 1, + "creator": "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY", + "pool_name": "PoolTest1", + "region": "Ontario", + "parent": nil, + "participants": []string{ + "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + }, + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/users", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "users": []map[string]interface{}{ + { + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + "pool_id": nil, + "request_pool_id": 1, + "peer_id": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + }, + { + "account": "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + }, + { + "account": "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + }, + { + "account": "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/vote", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/leave", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + // Wrap the handlers with the logging middleware + loggedHandler := requestLoggerMiddleware(handler) + + // Create an HTTP server + server := &http.Server{ + Addr: addr, + Handler: loggedHandler, + } + // Start the server in a new goroutine + go func() { + if err := server.ListenAndServe(); err != http.ErrServerClosed { + panic(err) // Handle the error as you see fit + } + }() + + // Give the server a moment to start + time.Sleep(time.Millisecond * 100) + + return server +} + func main() { + server := startMockServer("127.0.0.1:4000") + defer func() { + // Shutdown the server after test + if err := server.Shutdown(context.Background()); err != nil { + panic(err) // Handle the error as you see fit + } + }() + const poolName = "1" - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) defer cancel() // Elevate log level to show internal communications. - if err := logging.SetLogLevel("*", "error"); err != nil { + if err := logging.SetLogLevel("*", "debug"); err != nil { panic(err) } @@ -32,12 +183,33 @@ func main() { // output for the example. rng := rand.New(rand.NewSource(42)) + relays := []string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"} + var addrInfos []peer.AddrInfo + for _, relay := range relays { + // Parse the multiaddr + ma, err := multiaddr.NewMultiaddr(relay) + if err != nil { + fmt.Println("Error parsing multiaddr:", err) + continue + } + + // Extract the peer ID + addrInfo, err := peer.AddrInfoFromP2pAddr(ma) + if err != nil { + fmt.Println("Error extracting peer ID:", err) + continue + } + if addrInfo != nil { + addrInfos = append(addrInfos, *addrInfo) + } + } + // Instantiate the first node in the pool pid1, _, err := crypto.GenerateECDSAKeyPair(rng) if err != nil { panic(err) } - h1, err := libp2p.New(libp2p.Identity(pid1)) + h1, err := libp2p.New(libp2p.Identity(pid1), libp2p.EnableRelay(), libp2p.EnableNATService(), libp2p.EnableHolePunching(), libp2p.EnableAutoRelayWithStaticRelays(addrInfos)) if err != nil { panic(err) } @@ -56,14 +228,14 @@ func main() { panic(err) } defer n1.Shutdown(ctx) - fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) + fmt.Printf("n1 Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) // Instantiate the second node in the pool pid2, _, err := crypto.GenerateECDSAKeyPair(rng) if err != nil { panic(err) } - h2, err := libp2p.New(libp2p.Identity(pid2)) + h2, err := libp2p.New(libp2p.Identity(pid2), libp2p.EnableRelay(), libp2p.EnableNATService(), libp2p.EnableHolePunching(), libp2p.EnableAutoRelayWithStaticRelays(addrInfos)) if err != nil { panic(err) } @@ -82,14 +254,14 @@ func main() { panic(err) } defer n2.Shutdown(ctx) - fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String()) + fmt.Printf("n2 Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String()) // Instantiate the third node in the pool pid3, _, err := crypto.GenerateECDSAKeyPair(rng) if err != nil { panic(err) } - h3, err := libp2p.New(libp2p.Identity(pid3)) + h3, err := libp2p.New(libp2p.Identity(pid3), libp2p.EnableRelay(), libp2p.EnableNATService(), libp2p.EnableHolePunching(), libp2p.EnableAutoRelayWithStaticRelays(addrInfos)) if err != nil { panic(err) } @@ -108,25 +280,13 @@ func main() { panic(err) } defer n3.Shutdown(ctx) - fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String()) - - // Connect n1 to n2 and n3 so that there is a path for gossip propagation. - // Note that we are not connecting n2 to n3 as they should discover - // each other via pool's iexist announcements. - h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) - if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { - panic(err) - } - h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) - if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { - panic(err) - } + fmt.Printf("n3 Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String()) // Wait until the nodes discover each other for { - if len(h1.Peerstore().Peers()) == 3 && - len(h2.Peerstore().Peers()) == 3 && - len(h3.Peerstore().Peers()) == 3 { + if len(h1.Peerstore().Peers()) >= 3 && + len(h2.Peerstore().Peers()) >= 3 && + len(h3.Peerstore().Peers()) >= 3 { break } select { @@ -138,32 +298,37 @@ func main() { } h1Peers := h1.Peerstore().Peers() - fmt.Printf("Finally %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers)) + fmt.Printf("n1 Finally %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers)) for _, id := range h1Peers { fmt.Printf("- %s\n", id) } h2Peers := h2.Peerstore().Peers() - fmt.Printf("Finally %s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers)) + fmt.Printf("n2 Finally %s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers)) for _, id := range h2Peers { fmt.Printf("- %s\n", id) } h3Peers := h3.Peerstore().Peers() - fmt.Printf("Finally %s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers)) + fmt.Printf("n3 Finally %s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers)) for _, id := range h3Peers { fmt.Printf("- %s\n", id) } // Instantiate the fourth node not in the pool + log.Debug("Now creating pid of n4") pid4, _, err := crypto.GenerateECDSAKeyPair(rng) if err != nil { + log.Errorw("An error happened in creating keypair of n4", "Err", err) panic(err) } + log.Debug("Now creating host of n4") h4, err := libp2p.New(libp2p.Identity(pid4)) if err != nil { + log.Errorw("An error happened in creating libp2p instance of n4", "Err", err) panic(err) } + log.Debug("Now creating blox for n4") n4, err := blox.New( blox.WithPoolName(poolName), blox.WithTopicName(poolName), @@ -173,89 +338,82 @@ func main() { blox.WithPingCount(5), ) if err != nil { + log.Errorw("An error happened in creating blox instance of n4", "Err", err) panic(err) } + log.Debug("Now starting n4") if err := n4.Start(ctx); err != nil { + log.Errorw("An error happened in starting instance of n4", "Err", err) panic(err) } defer n4.Shutdown(ctx) - fmt.Printf("Instantiated node in pool %s with ID: %s\n", "", h4.ID().String()) + fmt.Printf("n4 Instantiated node in pool %s with ID: %s\n", poolName, h4.ID().String()) - //Manually adding h4 as it is not in the same pool - h4.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) - if err = h4.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil { - panic(err) - } - h4.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) - if err = h4.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { - panic(err) - } - h4.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) - if err = h4.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { - panic(err) + h4Peers := h4.Peerstore().Peers() + fmt.Printf("n4 Finally %s peerstore contains %d nodes:\n", h4.ID(), len(h4Peers)) + for _, id := range h4Peers { + fmt.Printf("- %s\n", id) } + n4.AnnounceJoinPoolRequestPeriodically(ctx) + // Wait until the fourth node discover others for { - if len(h4.Peerstore().Peers()) >= 3 { + members := n4.GetBlMembers() + + for id, status := range members { + memberInfo := fmt.Sprintf("Member ID: %s, Status: %v", id.String(), status) + fmt.Println(memberInfo) + } + if len(members) >= 2 { break } select { case <-ctx.Done(): panic(ctx.Err()) default: - time.Sleep(time.Second) + time.Sleep(3 * time.Second) } } - h4Peers := h4.Peerstore().Peers() - fmt.Printf("Finally %s peerstore contains %d nodes:\n", h4.ID(), len(h4Peers)) - for _, id := range h4Peers { - fmt.Printf("- %s\n", id) - } - - n4.AnnounceJoinPoolRequestPeriodically(ctx) - - // Wait until the fourth node discover others + //wait for 30 seconds + count := 1 for { - members := n4.GetBlMembers() - if len(members) >= 2 { - for id, status := range members { - memberInfo := fmt.Sprintf("Member ID: %s, Status: %v", id.String(), status) - fmt.Println(memberInfo) - } + count = count + 1 + if count > 60 { break } select { case <-ctx.Done(): panic(ctx.Err()) default: - time.Sleep(3 * time.Second) + time.Sleep(1 * time.Second) } } + panic("end") // Unordered output: - // Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + //Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT // Instantiated node in pool 1 with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF // Instantiated node in pool 1 with ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA - // Instantiated node in pool 1 with ID: QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe - // QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT peerstore contains 3 nodes: + // Finally QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT peerstore contains 3 nodes: + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF - // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA - // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF peerstore contains 3 nodes: + // Finally QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF peerstore contains 3 nodes: // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA - // QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA peerstore contains 3 nodes: - // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF - // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // Finally QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA peerstore contains 3 nodes: // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA - // Finally QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe peerstore contains 4 nodes: // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // Instantiated node in pool 1 with ID: QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Finally QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe peerstore contains 4 nodes: // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe - // Member ID: , Status: 2 + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF // Member ID: 12D3KooWACVcVsQh18jM9UudRQzeYEjxCJQJgFUaAgs41tayjxC4, Status: 1 + // Member ID: , Status: 2 }