Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
enhancewd start of FetchUsersAndPopulateSets
Browse files Browse the repository at this point in the history
1- If the members list is empty also call pool/requests endpoint to see who voted and it self voted move status to UnKnown
2- used bl_pool methods instead of directly calling blockchain
3- corrected some types
4- added methods to FetchUsersAndPopulateSets for start
ehsan6sha committed Nov 9, 2023
1 parent c03e3f7 commit a6d61ae
Showing 3 changed files with 83 additions and 48 deletions.
4 changes: 2 additions & 2 deletions blockchain/bl_pool.go
Original file line number Diff line number Diff line change
@@ -345,7 +345,7 @@ func (bl *FxBlockchain) PoolLeave(ctx context.Context, to peer.ID, r PoolLeaveRe

func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID, topicString string, withMemberListUpdate bool) error {
if withMemberListUpdate {
err := bl.FetchUsersAndPopulateSets(ctx, topicString)
err := bl.FetchUsersAndPopulateSets(ctx, topicString, false)
if err != nil {
return err
}
@@ -380,7 +380,7 @@ func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID,
}

// Call PoolVote method
responseBytes, err := bl.PoolVote(ctx, from, voteRequest)
responseBytes, err := bl.PoolVote(ctx, bl.h.ID(), voteRequest)
if err != nil {
return fmt.Errorf("failed to cast vote: %w", err)
}
125 changes: 80 additions & 45 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
@@ -145,7 +145,7 @@ func (bl *FxBlockchain) startFetchCheck() {
select {
case <-bl.fetchCheckTicker.C:
if time.Since(bl.lastFetchTime) >= bl.fetchInterval {
bl.FetchUsersAndPopulateSets(context.Background(), "someTopicString")
bl.FetchUsersAndPopulateSets(context.Background(), bl.topicName, false)
bl.lastFetchTime = time.Now() // update last fetch time
}
case <-bl.fetchCheckStop:
@@ -167,48 +167,6 @@ func (bl *FxBlockchain) Start(ctx context.Context) error {
defer bl.wg.Done()
bl.s.Serve(listen)
}()
/*
//This should be done in start not here
//If members list is empty we should check what peerIDs we already voted on and update to avoid re-voting
isMembersEmpty := bl.IsMembersEmpty()
if isMembersEmpty {
//Call the bl.PoolRequests and get the list of requests
//For each one check if voted field in the response, contains the bl.h.ID().String() and if so it means we already voted
//Move it to members with status UnKnown.
}
*/
/*
//This should be done in Start not here
//Check if self status is in pool request, start ping server and announce join request
if user.PeerID == bl.h.ID().String() {
log.Debugw("Found self peerID", user.PeerID)
if user.RequestPoolID != nil {
if !bl.p.Status() {
err = bl.p.Start(ctx)
if err != nil {
log.Errorw("Error when starting hte Ping Server", "PeerID", user.PeerID, "err", err)
} else {
bl.a.AnnounceJoinPoolRequestPeriodically(ctx)
}
} else {
log.Debugw("Ping Server is already running for self peerID", user.PeerID)
}
} else {
log.Debugw("PeerID is already a member of the pool", user.PeerID)
}
}
*/
/*
//This should be done in Start in auto-runs not here
//Vote for any peer that has not votd already
if (exists && existingStatus != common.Unknown) || (!exists) {
err = bl.HandlePoolJoinRequest(ctx, pid, topicString, false)
if err == nil {
status = common.Unknown
}
}
*/
return nil
}

@@ -612,7 +570,17 @@ func (bl *FxBlockchain) IsMembersEmpty() bool {
return len(bl.members) == 0
}

func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicString string) error {
// contains is a helper function to check if the slice contains a string
func contains(slice []string, str string) bool {
for _, v := range slice {
if v == str {
return true
}
}
return false
}

func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicString string, initiate bool) error {
// Update last fetch time on successful fetch
bl.lastFetchTime = time.Now()

@@ -627,14 +595,49 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
return nil
}

if initiate {
//If members list is empty we should check what peerIDs we already voted on and update to avoid re-voting
isMembersEmpty := bl.IsMembersEmpty()
if isMembersEmpty {
// Call the bl.PoolRequests and get the list of requests
req := PoolRequestsRequest{
PoolID: topic, // assuming 'topic' is your pool id
}
responseBody, err := bl.PoolRequests(ctx, bl.h.ID(), req)
if err != nil {
return err
}
var poolRequestsResponse PoolRequestsResponse

// Unmarshal the response body into the poolRequestsResponse struct
if err := json.Unmarshal(responseBody, &poolRequestsResponse); err != nil {
return err
}

// For each one check if voted field in the response, contains the bl.h.ID().String() and if so it means we already voted
// Move it to members with status UnKnown.
for _, request := range poolRequestsResponse.PoolRequests {
if contains(request.Voted, bl.h.ID().String()) {
pid, err := peer.Decode(request.PeerID)
if err != nil {
return err
}
bl.membersLock.Lock()
bl.members[pid] = common.Unknown
bl.membersLock.Unlock()
}
}
}
}

//Get hte list of both join requests and joined members for the pool
// Create a struct for the POST payload
payload := PoolUserListRequest{
PoolID: topic,
}

// Call the existing function to make the request
responseBody, err := bl.callBlockchain(ctx, "POST", actionPoolUserList, payload)
responseBody, err := bl.PoolUserList(ctx, bl.h.ID(), payload) // 'to' should be the peer ID you want to send requests to
if err != nil {
return err
}
@@ -653,8 +656,30 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
return err
}

if initiate {
//Check if self status is in pool request, start ping server and announce join request
if user.PeerID == bl.h.ID().String() {
log.Debugw("Found self peerID", user.PeerID)
if user.RequestPoolID != nil {
if !bl.p.Status() {
err = bl.p.Start(ctx)
if err != nil {
log.Errorw("Error when starting hte Ping Server", "PeerID", user.PeerID, "err", err)
} else {
bl.a.AnnounceJoinPoolRequestPeriodically(ctx)
}
} else {
log.Debugw("Ping Server is already running for self peerID", user.PeerID)
}
} else {
log.Debugw("PeerID is already a member of the pool", user.PeerID)
}
}
}

// Determine the status based on pool_id and request_pool_id
existingStatus, exists := bl.members[pid]

var status common.MemberStatus
if user.PoolID != nil && *user.PoolID == topic {
status = common.Approved
@@ -665,6 +690,16 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
continue
}

if initiate {
//Vote for any peer that has not votd already
if (exists && existingStatus != common.Unknown) || (!exists) {
err = bl.HandlePoolJoinRequest(ctx, pid, topicString, false)
if err == nil {
status = common.Unknown
}
}
}

if exists {
if existingStatus != common.Approved && status == common.Approved {
// If the user is already pending and now approved, update to ApprovedOrPending
2 changes: 1 addition & 1 deletion blox/blox.go
Original file line number Diff line number Diff line change
@@ -108,7 +108,7 @@ func (p *Blox) Start(ctx context.Context) error {
if err := p.bl.Start(ctx); err != nil {
return err
}
p.bl.FetchUsersAndPopulateSets(ctx, p.topicName)
p.bl.FetchUsersAndPopulateSets(ctx, p.topicName, true)
go func() {
log.Infow("IPFS RPC server started on address http://localhost:5001")
switch err := http.ListenAndServe("localhost:5001", p.ServeIpfsRpc()); {

0 comments on commit a6d61ae

Please sign in to comment.