From c03e3f7925b6544cdaeac4c8e261cd413f126079 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Wed, 8 Nov 2023 20:16:24 -0500 Subject: [PATCH] Corrected shutdown error --- announcements/announcements.go | 2 +- announcements/interface.go | 2 +- blockchain/bl_pool.go | 10 +++-- blockchain/blockchain.go | 82 +++++++++++++++++++++++++--------- blox/blox.go | 10 +++++ 5 files changed, 78 insertions(+), 28 deletions(-) diff --git a/announcements/announcements.go b/announcements/announcements.go index 619dd63..ad198ab 100644 --- a/announcements/announcements.go +++ b/announcements/announcements.go @@ -92,7 +92,7 @@ func (an *FxAnnouncements) processAnnouncement(ctx context.Context, from peer.ID log.Debug("IExist request") case PoolJoinRequestAnnouncementType: log.Debug("PoolJoin request") - if err := an.PoolJoinRequestHandler.HandlePoolJoinRequest(ctx, from, strconv.Itoa(int(atype))); err != nil { + if err := an.PoolJoinRequestHandler.HandlePoolJoinRequest(ctx, from, strconv.Itoa(int(atype)), true); err != nil { log.Errorw("An error occurred in handling pool join request announcement", err) return err } diff --git a/announcements/interface.go b/announcements/interface.go index decbd25..bb4a4f4 100644 --- a/announcements/interface.go +++ b/announcements/interface.go @@ -25,7 +25,7 @@ type Announcements interface { // PoolJoinRequestHandler is the interface that will be called by the blockchain package. type PoolJoinRequestHandler interface { - HandlePoolJoinRequest(context.Context, peer.ID, string) error + HandlePoolJoinRequest(context.Context, peer.ID, string, bool) error } var ( diff --git a/blockchain/bl_pool.go b/blockchain/bl_pool.go index 1f9cac6..8349296 100644 --- a/blockchain/bl_pool.go +++ b/blockchain/bl_pool.go @@ -343,10 +343,12 @@ func (bl *FxBlockchain) PoolLeave(ctx context.Context, to peer.ID, r PoolLeaveRe } } -func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID, topicString string) error { - err := bl.FetchUsersAndPopulateSets(ctx, topicString) - if err != nil { - return err +func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID, topicString string, withMemberListUpdate bool) error { + if withMemberListUpdate { + err := bl.FetchUsersAndPopulateSets(ctx, topicString) + if err != nil { + return err + } } status, exists := bl.GetMemberStatus(from) if !exists { diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index ecfecaa..ec43a96 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -162,7 +162,53 @@ func (bl *FxBlockchain) Start(ctx context.Context) error { return err } bl.s.Handler = http.HandlerFunc(bl.serve) - go func() { bl.s.Serve(listen) }() + bl.wg.Add(1) + go func() { + defer bl.wg.Done() + bl.s.Serve(listen) + }() + /* + //This should be done in start not here + //If members list is empty we should check what peerIDs we already voted on and update to avoid re-voting + isMembersEmpty := bl.IsMembersEmpty() + if isMembersEmpty { + //Call the bl.PoolRequests and get the list of requests + + //For each one check if voted field in the response, contains the bl.h.ID().String() and if so it means we already voted + //Move it to members with status UnKnown. + } + */ + /* + //This should be done in Start not here + //Check if self status is in pool request, start ping server and announce join request + if user.PeerID == bl.h.ID().String() { + log.Debugw("Found self peerID", user.PeerID) + if user.RequestPoolID != nil { + if !bl.p.Status() { + err = bl.p.Start(ctx) + if err != nil { + log.Errorw("Error when starting hte Ping Server", "PeerID", user.PeerID, "err", err) + } else { + bl.a.AnnounceJoinPoolRequestPeriodically(ctx) + } + } else { + log.Debugw("Ping Server is already running for self peerID", user.PeerID) + } + } else { + log.Debugw("PeerID is already a member of the pool", user.PeerID) + } + } + */ + /* + //This should be done in Start in auto-runs not here + //Vote for any peer that has not votd already + if (exists && existingStatus != common.Unknown) || (!exists) { + err = bl.HandlePoolJoinRequest(ctx, pid, topicString, false) + if err == nil { + status = common.Unknown + } + } + */ return nil } @@ -557,10 +603,15 @@ func (bl *FxBlockchain) Shutdown(ctx context.Context) error { bl.c.CloseIdleConnections() bl.ch.CloseIdleConnections() close(bl.fetchCheckStop) - bl.wg.Wait() return bl.s.Shutdown(ctx) } +func (bl *FxBlockchain) IsMembersEmpty() bool { + bl.membersLock.RLock() // Use RLock for read-only access + defer bl.membersLock.RUnlock() // Defer the unlock operation + return len(bl.members) == 0 +} + func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicString string) error { // Update last fetch time on successful fetch bl.lastFetchTime = time.Now() @@ -571,7 +622,12 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri // Handle the error if the conversion fails return fmt.Errorf("invalid topic, not an integer: %s", err) } + if topic <= 0 { + log.Info("Not a member of any pool at the moment") + return nil + } + //Get hte list of both join requests and joined members for the pool // Create a struct for the POST payload payload := PoolUserListRequest{ PoolID: topic, @@ -597,25 +653,8 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri return err } - //Check if self status is in pool request, start ping server and announce join request - if user.PeerID == bl.h.ID().String() { - log.Debugw("Found self peerID", user.PeerID) - if user.RequestPoolID != nil { - if !bl.p.Status() { - err = bl.p.Start(ctx) - if err != nil { - log.Errorw("Error when starting hte Ping Server", "PeerID", user.PeerID, "err", err) - } else { - bl.a.AnnounceJoinPoolRequestPeriodically(ctx) - } - } else { - log.Debugw("Ping Server is already running for self peerID", user.PeerID) - } - } else { - log.Debugw("PeerID is already a member of the pool", user.PeerID) - } - } // Determine the status based on pool_id and request_pool_id + existingStatus, exists := bl.members[pid] var status common.MemberStatus if user.PoolID != nil && *user.PoolID == topic { status = common.Approved @@ -626,9 +665,8 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri continue } - existingStatus, exists := bl.members[pid] if exists { - if existingStatus == common.Pending && status == common.Approved { + if existingStatus != common.Approved && status == common.Approved { // If the user is already pending and now approved, update to ApprovedOrPending bl.members[pid] = common.Approved } diff --git a/blox/blox.go b/blox/blox.go index c1b98c8..ce12e15 100644 --- a/blox/blox.go +++ b/blox/blox.go @@ -141,12 +141,18 @@ func (p *Blox) SetAuth(ctx context.Context, on peer.ID, subject peer.ID, allow b } func (p *Blox) Shutdown(ctx context.Context) error { + log.Info("Shutdown in progress") bErr := p.bl.Shutdown(ctx) + log.Info("blockchain Shutdown in progress") xErr := p.ex.Shutdown(ctx) + log.Info("exchange Shutdown in progress") pErr := p.pn.Shutdown(ctx) + log.Info("ping Shutdown in progress") tErr := p.an.Shutdown(ctx) + log.Info("announcement Shutdown in progress") p.cancel() dsErr := p.ds.Close() + log.Info("datastore Shutdown in progress") done := make(chan struct{}, 1) go func() { defer close(done) @@ -156,15 +162,19 @@ func (p *Blox) Shutdown(ctx context.Context) error { select { case <-done: if tErr != nil { + log.Errorw("Error occurred in announcement shutdown", "tErr", tErr) return tErr } if dsErr != nil { + log.Errorw("Error occurred in ds shutdown", "dsErr", dsErr) return dsErr } if bErr != nil { + log.Errorw("Error occurred in blockchain shutdown", "bErr", bErr) return bErr } if pErr != nil { + log.Errorw("Error occurred in Ping shutdown", "pErr", pErr) return pErr } return xErr