Skip to content

Commit

Permalink
first draft of full testcase
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Nov 16, 2023
1 parent e9ced6d commit 5510a19
Show file tree
Hide file tree
Showing 9 changed files with 756 additions and 126 deletions.
82 changes: 57 additions & 25 deletions announcements/announcements.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func NewFxAnnouncements(h host.Host, o ...Option) (*FxAnnouncements, error) {
}

func (an *FxAnnouncements) Start(ctx context.Context, validator pubsub.Validator) error {
if an.topicName == "" {
log.Warn("Announcement do not have any topic to subscribe to")
if an.topicName == "" || an.topicName == "0" {
log.Warnw("Announcement do not have any topic to subscribe to", "on peer", an.h.ID())
return errors.New("Announcement do not have any topic to subscribe to")
}
typeSystem, err := ipld.LoadSchemaBytes(schemaBytes)
Expand All @@ -69,41 +69,68 @@ func (an *FxAnnouncements) Start(ctx context.Context, validator pubsub.Validator
}
PubSubPrototypes.Announcement = bindnode.Prototype((*Announcement)(nil), typeSystem.TypeByName("Announcement"))

gsub, err := pubsub.NewGossipSub(ctx, an.h,
gr := pubsub.DefaultGossipSubRouter(an.h)

var addrInfos []peer.AddrInfo
for _, relay := range an.relays {
// Parse the multiaddr
ma, err := multiaddr.NewMultiaddr(relay)
if err != nil {
fmt.Println("Error parsing multiaddr:", err)
continue
}

// Extract the peer ID
addrInfo, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
fmt.Println("Error extracting peer ID:", err)
continue
}
if addrInfo != nil {
addrInfos = append(addrInfos, *addrInfo)

Check failure on line 90 in announcements/announcements.go

View workflow job for this annotation

GitHub Actions / All

this result of append is never used, except maybe in other appends (SA4010)

Check failure on line 90 in announcements/announcements.go

View workflow job for this annotation

GitHub Actions / All

this result of append is never used, except maybe in other appends (SA4010)
}
}

gsub, err := pubsub.NewGossipSubWithRouter(ctx, an.h, gr,
pubsub.WithPeerExchange(true),
pubsub.WithFloodPublish(true),
pubsub.WithMessageSigning(true),
pubsub.WithDefaultValidator(validator),
)

if err != nil {
log.Errorw("Error happened while creating pubsub", "peer", an.h.ID())
return err
}

log.Debugw("Created topic", "on peer", an.h.ID(), "topic", an.topicName)
an.topic, err = gsub.Join(an.topicName)
if err != nil {
log.Errorw("Error happened while joining the topic", "peer", an.h.ID())
return err
}
an.sub, err = an.topic.Subscribe()
if err != nil {
log.Errorw("Error happened while subscribing the topic", "peer", an.h.ID())
return err
}
return nil
}

func (an *FxAnnouncements) processAnnouncement(ctx context.Context, from peer.ID, atype AnnouncementType, addrs []multiaddr.Multiaddr, topicString string) error {
log.Info("processing announcement")
log.Infow("processing announcement", "on", an.h.ID(), "from", from)
switch atype {
case IExistAnnouncementType:
log.Info("IExist request")
log.Info("IExist request", "on", an.h.ID(), "from", from)
an.h.Peerstore().AddAddrs(from, addrs, peerstore.ConnectedAddrTTL)
case PoolJoinRequestAnnouncementType:
log.Info("PoolJoin request")
log.Info("PoolJoin request", "on", an.h.ID(), "from", from)
if err := an.PoolJoinRequestHandler.HandlePoolJoinRequest(ctx, from, topicString, true); err != nil {
log.Errorw("An error occurred in handling pool join request announcement", err)
log.Errorw("An error occurred in handling pool join request announcement", "on", an.h.ID(), "from", from, err)
return err
}
default:
log.Info("Unknown request")
log.Info("Unknown request", "on", an.h.ID(), "from", from)
}
return nil
}
Expand Down Expand Up @@ -203,13 +230,15 @@ func (an *FxAnnouncements) AnnounceIExistPeriodically(ctx context.Context) {
}

func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Context) {
log.Debug("called wg.Done in AnnounceJoinPoolRequestPeriodically")
log.Debug("Starting AnnounceJoinPoolRequestPeriodically")
log.Debugw("called wg.Done in AnnounceJoinPoolRequestPeriodically pool join request", "peer", an.h.ID())
log.Debugw("Starting AnnounceJoinPoolRequestPeriodically pool join request", "peer", an.h.ID())
log.Debugw("peerlist before AnnounceJoinPoolRequestPeriodically pool join request", "on", an.h.ID(), "peerlist", an.topic.ListPeers())

defer an.wg.Done()
an.announcingJoinPoolMutex.Lock()
if an.announcingJoinPoolRequest {
an.announcingJoinPoolMutex.Unlock()
log.Info("Join pool request announcements are already in progress.")
log.Info("pool join request announcements are already in progress.", "peer", an.h.ID())
return
}
an.announcingJoinPoolRequest = true
Expand All @@ -221,12 +250,13 @@ func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Conte
}()
ticker := time.NewTicker(an.announceInterval)
for {
log.Debugw("inside ticker for join pool request", "peer", an.h.ID())
select {
case <-ctx.Done():
log.Info("stopped making periodic announcements")
log.Info("stopped making periodic pool join request announcements", "peer", an.h.ID())
return
case <-an.stopJoinPoolRequestChan: // Assume an.stopChan is a `chan struct{}` used to signal stopping the ticker.
log.Info("stopped making periodic joinpoolrequest announcements due to stop signal")
log.Info("stopped making periodic pool join request announcements due to stop signal", "peer", an.h.ID())
return
case t := <-ticker.C:
a := &Announcement{
Expand All @@ -236,19 +266,20 @@ func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Conte
a.SetAddrs(an.h.Addrs()...)
b, err := a.MarshalBinary()
if err != nil {
log.Errorw("failed to encode pool join request announcement", "err", err)
log.Errorw("failed to encode pool join request announcement", "peer", an.h.ID(), "err", err)
continue
}
log.Debugw("inside ticker for join pool request now publishing", "peer", an.h.ID())
if err := an.topic.Publish(ctx, b); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
log.Info("stopped making periodic announcements")
log.Info("stopped making periodic pool join request announcements", "peer", an.h.ID(), "err", err)
return
}
if errors.Is(err, pubsub.ErrTopicClosed) || errors.Is(err, pubsub.ErrSubscriptionCancelled) {
log.Info("stopped making periodic iexist announcements as topic is closed or subscription cancelled")
log.Info("stopped making periodic pool join request announcements as topic is closed or subscription cancelled", "peer", an.h.ID(), "err", err)
return
}
log.Errorw("failed to publish pool join request announcement", "err", err)
log.Errorw("failed to publish pool join request announcement", "peer", an.h.ID(), "err", err)
continue
}
log.Debugw("announced pool join request message", "from", an.h.ID(), "announcement", a, "time", t)
Expand All @@ -257,36 +288,36 @@ func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Conte
}

func (an *FxAnnouncements) ValidateAnnouncement(ctx context.Context, id peer.ID, msg *pubsub.Message, status common.MemberStatus, exists bool) bool {
log.Debug("ValidateAnnouncement")
log.Debugw("ValidateAnnouncement", "on peer", an.h.ID(), "from peerID", id)
a := &Announcement{}
if err := a.UnmarshalBinary(msg.Data); err != nil {
log.Errorw("failed to unmarshal announcement data", "err", err)
return false
}
log.Debugw("ValidateAnnouncement", "peerID", id, "type", a.Type)
log.Debugw("ValidateAnnouncement", "on peer", an.h.ID(), "from peerID", id, "type", a.Type)

switch a.Type {
case NewManifestAnnouncementType:
// Check if sender is approved
if !exists {
log.Errorw("peer is not recognized", "peer", id)
log.Debugw("peer is not recognized", "on peer", an.h.ID(), "from peer", id)
return false
}
if status != common.Approved {
log.Errorw("peer is not an approved member", "peer", id)
log.Debugw("peer is not an approved member", "on peer", an.h.ID(), "from peer", id)
return false
}
case PoolJoinRequestAnnouncementType:
if status != common.Unknown {
log.Errorw("peer is no longer permitted to send this message type", "peer", id)
if status == common.Unknown {
log.Debugw("peer is no longer permitted to send this message type", "on peer", an.h.ID(), "from peer", id, "status", status)
return false
} else {
log.Debugw("PoolJoinRequestAnnouncementType status is Unknown and ok")
log.Debugw("PoolJoinRequestAnnouncementType status is not Unknown and ok")
}
case PoolJoinApproveAnnouncementType, IExistAnnouncementType:
// Any member status is valid for a pool join announcement
default:
log.Errorw("The Type is not set ", a.Type)
log.Debugw("The Type is not set ", a.Type)
return false
}

Expand All @@ -304,6 +335,7 @@ func (an *FxAnnouncements) StopJoinPoolRequestAnnouncements() {
}

func (an *FxAnnouncements) Shutdown(ctx context.Context) error {
log.Debugw("closed topic", "peer", an.h.ID())
an.sub.Cancel()
tErr := an.topic.Close()
return tErr
Expand Down
8 changes: 8 additions & 0 deletions announcements/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type (
timeout int
topicName string
wg *sync.WaitGroup
relays []string
}
)

Expand Down Expand Up @@ -52,3 +53,10 @@ func WithWg(wg *sync.WaitGroup) Option {
return nil
}
}

func WithRelays(r []string) Option {
return func(o *options) error {
o.relays = r
return nil
}
}
52 changes: 40 additions & 12 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
//If members list is empty we should check what peerIDs we already voted on and update to avoid re-voting
isMembersEmpty := bl.IsMembersEmpty()
if isMembersEmpty {
log.Debugw("Members list is empty", "peer", bl.h.ID())
// Call the bl.PoolRequests and get the list of requests
req := PoolRequestsRequest{
PoolID: topic, // assuming 'topic' is your pool id
Expand All @@ -656,6 +657,7 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri

// For each one check if voted field in the response, contains the bl.h.ID().String() and if so it means we already voted
// Move it to members with status UnKnown.
log.Debugw("Empty members for ", "peer", bl.h.ID(), "Received response from blockchian", poolRequestsResponse.PoolRequests)
for _, request := range poolRequestsResponse.PoolRequests {
if contains(request.Voted, bl.h.ID().String()) {
pid, err := peer.Decode(request.PeerID)
Expand All @@ -671,7 +673,7 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri

// Loop through the static relays and convert them to multiaddr
for _, relay := range bl.relays {
ma, err := multiaddr.NewMultiaddr(relay)
ma, err := multiaddr.NewMultiaddr(relay + "/p2p-circuit/p2p/" + pid.String())
if err != nil {
bl.membersLock.Unlock()
return err
Expand All @@ -685,10 +687,10 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
}
}
}
log.Debugw("stored members are", bl.members)
log.Debugw("stored members after empty member list", "peer", bl.h.ID(), "members", bl.members)
}

//Get hte list of both join requests and joined members for the pool
//Get the list of both join requests and joined members for the pool
// Create a struct for the POST req
req := PoolUserListRequest{
PoolID: topic,
Expand All @@ -707,7 +709,7 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
}

// Now iterate through the users and populate the member map
log.Debugw("Now iterate through the users and populate the member map", "response", response.Users)
log.Debugw("Now iterate through the users and populate the member map", "peer", bl.h.ID(), "response", response.Users)
bl.membersLock.Lock()
for _, user := range response.Users {
pid, err := peer.Decode(user.PeerID)
Expand All @@ -722,11 +724,14 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
log.Debugw("Found self peerID", user.PeerID)
if user.RequestPoolID != nil {
if !bl.p.Status() {
log.Debugw("Found self peerID and running Ping Server now", "peer", user.PeerID)
err = bl.p.Start(ctx)
if err != nil {
log.Errorw("Error when starting hte Ping Server", "PeerID", user.PeerID, "err", err)
log.Errorw("Error when starting the Ping Server", "PeerID", user.PeerID, "err", err)
} else {
bl.a.AnnounceJoinPoolRequestPeriodically(ctx)
log.Debugw("Found self peerID and ran Ping Server and announcing pooljoinrequest now", "peer", user.PeerID)
bl.wg.Add(1)
go bl.a.AnnounceJoinPoolRequestPeriodically(ctx)
}
} else {
log.Debugw("Ping Server is already running for self peerID", user.PeerID)
Expand All @@ -751,30 +756,37 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
}

if initiate {
//Vote for any peer that has not votd already
if exists && existingStatus != common.Unknown {
//Vote for any peer that has not voted already
if exists && existingStatus == common.Pending {
log.Debugw("Voting for peers", "pool", topicString, "from", bl.h.ID(), "for", pid)
err = bl.HandlePoolJoinRequest(ctx, pid, topicString, false)
if err == nil {
status = common.Unknown
} else {
log.Errorw("Error happened while voting", "pool", topicString, "from", bl.h.ID(), "for", pid)
}
}
}

if exists {
if existingStatus != common.Approved && status == common.Approved {
log.Debugw("peer already exists in members", "h.ID", bl.h.ID(), "pid", pid, "existingStatus", existingStatus, "status", status)
if existingStatus != status && (existingStatus != common.Approved) {
// If the user is already pending and now approved, update to ApprovedOrPending
bl.members[pid] = common.Approved
bl.members[pid] = status
} else {
// If the user status is the same as before, there's no need to update
log.Debugw("member exists but is not approved so no need to change status", "h.ID", bl.h.ID(), "pid", pid, "Status", status, "existingStatus", existingStatus)
}
// If the user status is the same as before, there's no need to update
} else {
log.Debugw("member does not exists", "h.ID", bl.h.ID(), "pid", pid)
// If the user does not exist in the map, add them
bl.members[pid] = status
// Create a slice to hold the multiaddresses for the peer
var addrs []multiaddr.Multiaddr

// Loop through the static relays and convert them to multiaddr
for _, relay := range bl.relays {
ma, err := multiaddr.NewMultiaddr(relay)
ma, err := multiaddr.NewMultiaddr(relay + "/p2p-circuit/p2p/" + pid.String())
if err != nil {
bl.membersLock.Unlock()
return err
Expand All @@ -783,10 +795,26 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
}

// Add the relay addresses to the peerstore for the peer ID

bl.h.Peerstore().AddAddrs(pid, addrs, peerstore.ConnectedAddrTTL)
log.Debugw("Added peer to peerstore", "h.ID", bl.h.ID(), "pid", pid)
}
if pid != bl.h.ID() {
//bl.h.Connect(ctx, peer.AddrInfo{ID: pid, Addrs: addrs})
log.Debugw("Connecting to other peer", "from", bl.h.ID(), "to", pid)
conn, err := bl.h.Network().DialPeer(ctx, pid)
if err == nil {
maddr := conn.RemoteMultiaddr()
log.Debugw("Connected to peer", "from", bl.h.ID(), "to", pid, "maddr", maddr)
conn.Close()
} else {
log.Debugw("Not Connected to peer", "from", bl.h.ID(), "to", pid, "err", err)
}

}
}
bl.membersLock.Unlock()
log.Debugw("peerstore for ", "id", bl.h.ID(), "peers", bl.h.Peerstore().Peers())
if initiate {
bl.cleanUnwantedPeers(keepPeers)
}
Expand Down
3 changes: 3 additions & 0 deletions blockchain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func WithAllowTransientConnection(t bool) Option {

func WithBlockchainEndPoint(b string) Option {
return func(o *options) error {
if b == "" {
b = "127.0.0.1:4000"
}
o.blockchainEndPoint = b
return nil
}
Expand Down
7 changes: 4 additions & 3 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ func New(o ...Option) (*Blox, error) {
announcements.WithAnnounceInterval(5),
announcements.WithTimeout(3),
announcements.WithTopicName(p.topicName),
announcements.WithWg(&p.wg))
announcements.WithWg(&p.wg),
announcements.WithRelays(p.relays),
)
if err != nil {
return nil, err
}
Expand All @@ -80,8 +82,7 @@ func New(o ...Option) (*Blox, error) {
blockchain.NewSimpleKeyStorer(""),
blockchain.WithAuthorizer(authorizer),
blockchain.WithAuthorizedPeers(authorizedPeers),
//blockchain.WithBlockchainEndPoint("127.0.0.1:4000"),
blockchain.WithBlockchainEndPoint("api.node3.functionyard.fula.network"),
blockchain.WithBlockchainEndPoint(p.blockchainEndpoint),
blockchain.WithTimeout(30),
blockchain.WithWg(&p.wg),
blockchain.WithFetchFrequency(3),
Expand Down
Loading

0 comments on commit 5510a19

Please sign in to comment.