Skip to content

Commit

Permalink
Corrected shutdown error
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Nov 9, 2023
1 parent 049577d commit c03e3f7
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 28 deletions.
2 changes: 1 addition & 1 deletion announcements/announcements.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (an *FxAnnouncements) processAnnouncement(ctx context.Context, from peer.ID
log.Debug("IExist request")
case PoolJoinRequestAnnouncementType:
log.Debug("PoolJoin request")
if err := an.PoolJoinRequestHandler.HandlePoolJoinRequest(ctx, from, strconv.Itoa(int(atype))); err != nil {
if err := an.PoolJoinRequestHandler.HandlePoolJoinRequest(ctx, from, strconv.Itoa(int(atype)), true); err != nil {
log.Errorw("An error occurred in handling pool join request announcement", err)
return err
}
Expand Down
2 changes: 1 addition & 1 deletion announcements/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Announcements interface {

// PoolJoinRequestHandler is the interface that will be called by the blockchain package.
type PoolJoinRequestHandler interface {
HandlePoolJoinRequest(context.Context, peer.ID, string) error
HandlePoolJoinRequest(context.Context, peer.ID, string, bool) error
}

var (
Expand Down
10 changes: 6 additions & 4 deletions blockchain/bl_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,12 @@ func (bl *FxBlockchain) PoolLeave(ctx context.Context, to peer.ID, r PoolLeaveRe
}
}

func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID, topicString string) error {
err := bl.FetchUsersAndPopulateSets(ctx, topicString)
if err != nil {
return err
func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID, topicString string, withMemberListUpdate bool) error {
if withMemberListUpdate {
err := bl.FetchUsersAndPopulateSets(ctx, topicString)
if err != nil {
return err
}
}
status, exists := bl.GetMemberStatus(from)
if !exists {
Expand Down
82 changes: 60 additions & 22 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,53 @@ func (bl *FxBlockchain) Start(ctx context.Context) error {
return err
}
bl.s.Handler = http.HandlerFunc(bl.serve)
go func() { bl.s.Serve(listen) }()
bl.wg.Add(1)
go func() {
defer bl.wg.Done()
bl.s.Serve(listen)
}()
/*
//This should be done in start not here
//If members list is empty we should check what peerIDs we already voted on and update to avoid re-voting
isMembersEmpty := bl.IsMembersEmpty()
if isMembersEmpty {
//Call the bl.PoolRequests and get the list of requests
//For each one check if voted field in the response, contains the bl.h.ID().String() and if so it means we already voted
//Move it to members with status UnKnown.
}
*/
/*
//This should be done in Start not here
//Check if self status is in pool request, start ping server and announce join request
if user.PeerID == bl.h.ID().String() {
log.Debugw("Found self peerID", user.PeerID)
if user.RequestPoolID != nil {
if !bl.p.Status() {
err = bl.p.Start(ctx)
if err != nil {
log.Errorw("Error when starting hte Ping Server", "PeerID", user.PeerID, "err", err)
} else {
bl.a.AnnounceJoinPoolRequestPeriodically(ctx)
}
} else {
log.Debugw("Ping Server is already running for self peerID", user.PeerID)
}
} else {
log.Debugw("PeerID is already a member of the pool", user.PeerID)
}
}
*/
/*
//This should be done in Start in auto-runs not here
//Vote for any peer that has not votd already
if (exists && existingStatus != common.Unknown) || (!exists) {
err = bl.HandlePoolJoinRequest(ctx, pid, topicString, false)
if err == nil {
status = common.Unknown
}
}
*/
return nil
}

Expand Down Expand Up @@ -557,10 +603,15 @@ func (bl *FxBlockchain) Shutdown(ctx context.Context) error {
bl.c.CloseIdleConnections()
bl.ch.CloseIdleConnections()
close(bl.fetchCheckStop)
bl.wg.Wait()
return bl.s.Shutdown(ctx)
}

func (bl *FxBlockchain) IsMembersEmpty() bool {
bl.membersLock.RLock() // Use RLock for read-only access
defer bl.membersLock.RUnlock() // Defer the unlock operation
return len(bl.members) == 0
}

func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicString string) error {
// Update last fetch time on successful fetch
bl.lastFetchTime = time.Now()
Expand All @@ -571,7 +622,12 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
// Handle the error if the conversion fails
return fmt.Errorf("invalid topic, not an integer: %s", err)
}
if topic <= 0 {
log.Info("Not a member of any pool at the moment")
return nil
}

//Get hte list of both join requests and joined members for the pool
// Create a struct for the POST payload
payload := PoolUserListRequest{
PoolID: topic,
Expand All @@ -597,25 +653,8 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
return err
}

//Check if self status is in pool request, start ping server and announce join request
if user.PeerID == bl.h.ID().String() {
log.Debugw("Found self peerID", user.PeerID)
if user.RequestPoolID != nil {
if !bl.p.Status() {
err = bl.p.Start(ctx)
if err != nil {
log.Errorw("Error when starting hte Ping Server", "PeerID", user.PeerID, "err", err)
} else {
bl.a.AnnounceJoinPoolRequestPeriodically(ctx)
}
} else {
log.Debugw("Ping Server is already running for self peerID", user.PeerID)
}
} else {
log.Debugw("PeerID is already a member of the pool", user.PeerID)
}
}
// Determine the status based on pool_id and request_pool_id
existingStatus, exists := bl.members[pid]
var status common.MemberStatus
if user.PoolID != nil && *user.PoolID == topic {
status = common.Approved
Expand All @@ -626,9 +665,8 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
continue
}

existingStatus, exists := bl.members[pid]
if exists {
if existingStatus == common.Pending && status == common.Approved {
if existingStatus != common.Approved && status == common.Approved {
// If the user is already pending and now approved, update to ApprovedOrPending
bl.members[pid] = common.Approved
}
Expand Down
10 changes: 10 additions & 0 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,18 @@ func (p *Blox) SetAuth(ctx context.Context, on peer.ID, subject peer.ID, allow b
}

func (p *Blox) Shutdown(ctx context.Context) error {
log.Info("Shutdown in progress")
bErr := p.bl.Shutdown(ctx)
log.Info("blockchain Shutdown in progress")
xErr := p.ex.Shutdown(ctx)
log.Info("exchange Shutdown in progress")
pErr := p.pn.Shutdown(ctx)
log.Info("ping Shutdown in progress")
tErr := p.an.Shutdown(ctx)
log.Info("announcement Shutdown in progress")
p.cancel()
dsErr := p.ds.Close()
log.Info("datastore Shutdown in progress")
done := make(chan struct{}, 1)
go func() {
defer close(done)
Expand All @@ -156,15 +162,19 @@ func (p *Blox) Shutdown(ctx context.Context) error {
select {
case <-done:
if tErr != nil {
log.Errorw("Error occurred in announcement shutdown", "tErr", tErr)
return tErr
}
if dsErr != nil {
log.Errorw("Error occurred in ds shutdown", "dsErr", dsErr)
return dsErr
}
if bErr != nil {
log.Errorw("Error occurred in blockchain shutdown", "bErr", bErr)
return bErr
}
if pErr != nil {
log.Errorw("Error occurred in Ping shutdown", "pErr", pErr)
return pErr
}
return xErr
Expand Down

0 comments on commit c03e3f7

Please sign in to comment.