diff --git a/blockchain/bl_pool.go b/blockchain/bl_pool.go index bdd83679..a532b0c6 100644 --- a/blockchain/bl_pool.go +++ b/blockchain/bl_pool.go @@ -53,6 +53,93 @@ func (bl *FxBlockchain) PoolCreate(ctx context.Context, to peer.ID, r PoolCreate } } +func (bl *FxBlockchain) HandlePoolJoin(method string, action string, from peer.ID, w http.ResponseWriter, r *http.Request) { + // This handles the join request sent from client for a blox which is not part of the pool yet + log := log.With("action", action, "from", from) + var req PoolJoinRequest + var res PoolJoinResponse + + defer r.Body.Close() + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + log.Debug("cannot parse request body: %v", err) + http.Error(w, "", http.StatusBadRequest) + return + } + + //TODO: Ensure it is optimized for long-running calls + ctx, cancel := context.WithTimeout(r.Context(), time.Second*time.Duration(bl.timeout)) + defer cancel() + response, statusCode, err := bl.callBlockchain(ctx, method, action, &req) + if err != nil { + poolID := req.PoolID + poolIDStr := strconv.Itoa(poolID) + requestSubmitted, err := bl.checkIfUserHasOpenPoolRequests(ctx, poolIDStr) + if err == nil && requestSubmitted { + errUpdateConfig := bl.updatePoolName(poolIDStr) + errPingServer := bl.StartPingServer(ctx) + if errUpdateConfig != nil && errPingServer != nil { + errMsg := map[string]interface{}{ + "message": "Pool Join is submitted but Error in Starting Ping Server and updating Config", + "description": fmt.Sprintf("Error in Ping server: %s , Error in updateConfig: %s", errPingServer.Error(), errUpdateConfig.Error()), + } + w.WriteHeader(http.StatusExpectationFailed) + json.NewEncoder(w).Encode(errMsg) + return + } else if errUpdateConfig != nil { + errMsg := map[string]interface{}{ + "message": "Pool Join is submitted but Error in updating Config", + "description": fmt.Sprintf("Error in updateConfig: %s", errUpdateConfig.Error()), + } + w.WriteHeader(http.StatusExpectationFailed) + json.NewEncoder(w).Encode(errMsg) + return + } else if errPingServer != nil { + errMsg := map[string]interface{}{ + "message": "Pool Join is submitted but Error in Ping Server Start", + "description": fmt.Sprintf("Error in PingServer: %s", errPingServer.Error()), + } + w.WriteHeader(http.StatusExpectationFailed) + json.NewEncoder(w).Encode(errMsg) + return + } + statusCode = http.StatusAccepted + err = nil + response = []byte(fmt.Sprintf("{\"account\":\"\",\"pool_id\":%d}", req.PoolID)) + } + } + if statusCode == http.StatusOK { + statusCode = http.StatusAccepted + } + log.Debugw("callblockchain response in JoinPool", "statusCode", statusCode, "response", response, "err", err) + // If status code is not 200, attempt to format the response as JSON + if statusCode != http.StatusAccepted || err != nil { + w.WriteHeader(statusCode) + // Try to parse the error and format it as JSON + var errMsg map[string]interface{} + if jsonErr := json.Unmarshal(response, &errMsg); jsonErr != nil { + // If the response isn't JSON or can't be parsed, use a generic message + errMsg = map[string]interface{}{ + "message": "An error occurred", + "description": err.Error(), + } + } + json.NewEncoder(w).Encode(errMsg) + return + } + poolIDStr := strconv.Itoa(req.PoolID) + bl.processSuccessfulPoolJoinRequest(ctx, poolIDStr, bl.h.ID()) + w.WriteHeader(statusCode) + err1 := json.Unmarshal(response, &res) + if err1 != nil { + log.Error("failed to format response: %v", err1) + } + + if err := json.NewEncoder(w).Encode(res); err != nil { + log.Error("failed to write response: %v", err) + } +} + func (bl *FxBlockchain) PoolJoin(ctx context.Context, to peer.ID, r PoolJoinRequest) ([]byte, error) { if bl.allowTransientConnection { @@ -76,38 +163,8 @@ func (bl *FxBlockchain) PoolJoin(ctx context.Context, to peer.ID, r PoolJoinRequ b, err := io.ReadAll(resp.Body) switch { case err != nil: - poolID := r.PoolID - poolIDStr := strconv.Itoa(poolID) - requestSubmitted, err := bl.checkIfUserHasOpenPoolRequests(ctx, poolIDStr) - if err == nil && requestSubmitted { - err := bl.updatePoolName(poolIDStr) - if err != nil { - return []byte("{}"), err - } - err = bl.StartPingServer(ctx) - if err != nil { - return []byte("{}"), err - } - bl.processSuccessfulPoolJoinRequest(ctx, poolIDStr, to) - return []byte("{}"), nil - } return nil, err case resp.StatusCode != http.StatusAccepted: - poolID := r.PoolID - poolIDStr := strconv.Itoa(poolID) - requestSubmitted, err := bl.checkIfUserHasOpenPoolRequests(ctx, poolIDStr) - if err == nil && requestSubmitted { - err := bl.updatePoolName(poolIDStr) - if err != nil { - return []byte("{}"), err - } - err = bl.StartPingServer(ctx) - if err != nil { - return []byte("{}"), err - } - bl.processSuccessfulPoolJoinRequest(ctx, poolIDStr, to) - return []byte("{}"), nil - } // Attempt to parse the body as JSON. if jsonErr := json.Unmarshal(b, &apiError); jsonErr != nil { // If we can't parse the JSON, return the original body in the error. @@ -116,17 +173,6 @@ func (bl *FxBlockchain) PoolJoin(ctx context.Context, to peer.ID, r PoolJoinRequ // Return the parsed error message and description. return nil, fmt.Errorf("unexpected response: %d %s - %s", resp.StatusCode, apiError.Message, apiError.Description) default: - poolID := r.PoolID - poolIDStr := strconv.Itoa(poolID) - err := bl.updatePoolName(poolIDStr) - if err != nil { - return b, err - } - err = bl.StartPingServer(ctx) - if err != nil { - return b, err - } - bl.processSuccessfulPoolJoinRequest(ctx, poolIDStr, to) return b, nil } } @@ -138,7 +184,7 @@ func (bl *FxBlockchain) processSuccessfulPoolJoinRequest(ctx context.Context, po log.Errorw("Error fetching and populating users", "err", err) } bl.stopFetchUsersAfterJoinChan = make(chan struct{}) - ticker := time.NewTicker(bl.fetchInterval * time.Minute) + ticker := time.NewTicker(bl.fetchInterval) defer ticker.Stop() if bl.wg != nil { log.Debug("called wg.Add in PoolJoin ticker") @@ -174,20 +220,24 @@ func (bl *FxBlockchain) processSuccessfulPoolJoinRequest(ctx context.Context, po } } }() - if bl.a != nil { - if bl.wg != nil { - log.Debug("called wg.Add in PoolJoin ticker2") - bl.wg.Add(1) - } - go func() { + // TODO: THIS METHOD BELOW NEEDS TO RE_INITIALIZE ANNONCEMENTS WITH NEW TOPIC ND START IT FIRST + /* + if bl.a != nil { if bl.wg != nil { - log.Debug("Called wg.Done in PoolJoin ticker2") - defer bl.wg.Done() // Decrement the counter when the goroutine completes + log.Debug("called wg.Add in PoolJoin ticker2") + bl.wg.Add(1) } - defer log.Debug("PoolJoin ticker2 go routine is ending") - bl.a.AnnounceJoinPoolRequestPeriodically(ctx) - }() - } + go func() { + if bl.wg != nil { + log.Debug("Called wg.Done in PoolJoin ticker2") + defer bl.wg.Done() // Decrement the counter when the goroutine completes + } + defer log.Debug("PoolJoin ticker2 go routine is ending") + + bl.a.AnnounceJoinPoolRequestPeriodically(ctx) + }() + } + */ } func (bl *FxBlockchain) StartPingServer(ctx context.Context) error { @@ -229,10 +279,8 @@ func (bl *FxBlockchain) PoolCancelJoin(ctx context.Context, to peer.ID, r PoolCa b, err := io.ReadAll(resp.Body) switch { case err != nil: - bl.cleanLeaveJoinPool(ctx, r.PoolID) return nil, err case resp.StatusCode != http.StatusAccepted: - bl.cleanLeaveJoinPool(ctx, r.PoolID) // Attempt to parse the body as JSON. if jsonErr := json.Unmarshal(b, &apiError); jsonErr != nil { // If we can't parse the JSON, return the original body in the error. @@ -241,11 +289,96 @@ func (bl *FxBlockchain) PoolCancelJoin(ctx context.Context, to peer.ID, r PoolCa // Return the parsed error message and description. return nil, fmt.Errorf("unexpected response: %d %s - %s", resp.StatusCode, apiError.Message, apiError.Description) default: - bl.cleanLeaveJoinPool(ctx, r.PoolID) return b, nil } } +func (bl *FxBlockchain) HandlePoolCancelJoin(method string, action string, from peer.ID, w http.ResponseWriter, r *http.Request) { + // This handles the join request sent from client for a blox which is not part of the pool yet + log := log.With("action", action, "from", from) + var req PoolCancelJoinRequest + var res PoolCancelJoinResponse + + defer r.Body.Close() + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + log.Debug("cannot parse request body: %v", err) + http.Error(w, "", http.StatusBadRequest) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), time.Second*time.Duration(bl.timeout)) + defer cancel() + response, statusCode, err := bl.callBlockchain(ctx, method, action, &req) + if err != nil { + poolID := req.PoolID + poolIDStr := strconv.Itoa(poolID) + requestSubmitted, err := bl.checkIfUserHasOpenPoolRequests(ctx, poolIDStr) + if err == nil && !requestSubmitted { + errUpdateConfig := bl.updatePoolName("0") + errPingServer := bl.StopPingServer(ctx) + if errUpdateConfig != nil && errPingServer != nil { + errMsg := map[string]interface{}{ + "message": "Pool Cancel Join is submitted but Error in Stopping Ping Server and updating Config", + "description": fmt.Sprintf("Error in Ping server: %s , Error in updateConfig: %s", errPingServer.Error(), errUpdateConfig.Error()), + } + w.WriteHeader(http.StatusExpectationFailed) + json.NewEncoder(w).Encode(errMsg) + return + } else if errUpdateConfig != nil { + errMsg := map[string]interface{}{ + "message": "Pool Cancel Join is submitted but Error in updating Config", + "description": fmt.Sprintf("Error in updateConfig: %s", errUpdateConfig.Error()), + } + w.WriteHeader(http.StatusExpectationFailed) + json.NewEncoder(w).Encode(errMsg) + return + } else if errPingServer != nil { + errMsg := map[string]interface{}{ + "message": "Pool Cancel Join is submitted but Error in Ping Server Stop", + "description": fmt.Sprintf("Error in PingServer: %s", errPingServer.Error()), + } + w.WriteHeader(http.StatusExpectationFailed) + json.NewEncoder(w).Encode(errMsg) + return + } + statusCode = http.StatusAccepted + err = nil + response = []byte(fmt.Sprintf("{\"account\":\"\",\"pool_id\":%d}", req.PoolID)) + } + } + if statusCode == http.StatusOK { + statusCode = http.StatusAccepted + } + log.Debugw("callblockchain response in PoolCancelJoin", "statusCode", statusCode, "response", response, "err", err) + // If status code is not 200, attempt to format the response as JSON + if statusCode != http.StatusAccepted || err != nil { + w.WriteHeader(statusCode) + // Try to parse the error and format it as JSON + var errMsg map[string]interface{} + if jsonErr := json.Unmarshal(response, &errMsg); jsonErr != nil { + // If the response isn't JSON or can't be parsed, use a generic message + errMsg = map[string]interface{}{ + "message": "An error occurred", + "description": err.Error(), + } + } + json.NewEncoder(w).Encode(errMsg) + return + } + + bl.cleanLeaveJoinPool(ctx, req.PoolID) + w.WriteHeader(statusCode) + err1 := json.Unmarshal(response, &res) + if err1 != nil { + log.Error("failed to format response: %v", err1) + } + + if err := json.NewEncoder(w).Encode(res); err != nil { + log.Error("failed to write response: %v", err) + } +} + func (bl *FxBlockchain) cleanLeaveJoinPool(ctx context.Context, PoolID int) { bl.updatePoolName("0") bl.StopPingServer(ctx) @@ -447,6 +580,7 @@ func (bl *FxBlockchain) PoolLeave(ctx context.Context, to peer.ID, r PoolLeaveRe } func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID, account string, topicString string, withMemberListUpdate bool) error { + // This handles the pending pool requests on a member that is already joined the pool if withMemberListUpdate { err := bl.FetchUsersAndPopulateSets(ctx, topicString, false) if err != nil { @@ -459,14 +593,23 @@ func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID, } if status == common.Pending { // Ping + log.Debugw("****** Pinging pending node", "from", bl.h.ID(), "to", from) averageDuration, successCount, err := bl.p.Ping(ctx, from) if err != nil { log.Errorw("An error occurred during ping", "error", err) return err } + if bl.maxPingTime == 0 { + //TODO: This should not happen but is happening! + bl.maxPingTime = 200 + } + if bl.minPingSuccessCount == 0 { + //TODO: This should not happen but is happening! + bl.minPingSuccessCount = 3 + } vote := averageDuration <= bl.maxPingTime && successCount >= bl.minPingSuccessCount - log.Debugw("Ping result", "averageDuration", averageDuration, "successCount", successCount, "vote", vote) + log.Debugw("Ping result", "averageDuration", averageDuration, "successCount", successCount, "vote", vote, "bl.maxPingTime", bl.maxPingTime, "bl.minPingSuccessCount", bl.minPingSuccessCount) // Convert topic from string to int poolID, err := strconv.Atoi(topicString) @@ -481,13 +624,13 @@ func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID, } // Call PoolVote method - responseBody, statusCode, err := bl.callBlockchain(ctx, "POST", actionPoolVote, voteRequest) + responseBody, statusCode, err := bl.callBlockchain(ctx, "POST", actionPoolVote, &voteRequest) if err != nil { return fmt.Errorf("blockchain call error: %w, status code: %d", err, statusCode) } // Check if the status code is OK; if not, handle it as an error - if statusCode != http.StatusOK { + if statusCode != http.StatusOK && statusCode != http.StatusAccepted { var errMsg map[string]interface{} if jsonErr := json.Unmarshal(responseBody, &errMsg); jsonErr == nil { return fmt.Errorf("unexpected response status: %d, message: %s, description: %s", @@ -504,7 +647,7 @@ func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID, } // Handle the response as needed - log.Infow("Vote cast successfully", "response", voteResponse, "on", from, "by", bl.h.ID()) + log.Infow("Vote cast successfully", "statusCode", statusCode, "voteResponse", voteResponse, "on", from, "by", bl.h.ID()) // Update member status to unknown bl.membersLock.Lock() bl.members[from] = common.Unknown diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 3d9eb2c1..bfa7de75 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -126,7 +126,7 @@ func NewFxBlockchain(h host.Host, p *ping.FxPing, a *announcements.FxAnnouncemen }, keyStorer: keyStorer, lastFetchTime: time.Now(), - fetchInterval: opts.fetchFrequency * time.Hour, + fetchInterval: opts.fetchFrequency, fetchCheckStop: make(chan struct{}), } if bl.authorizer != "" { @@ -139,7 +139,7 @@ func NewFxBlockchain(h host.Host, p *ping.FxPing, a *announcements.FxAnnouncemen } func (bl *FxBlockchain) startFetchCheck() { - bl.fetchCheckTicker = time.NewTicker(1 * time.Hour) // check every hour, adjust as needed + bl.fetchCheckTicker = time.NewTicker(1 * time.Minute) // check every hour, adjust as needed if bl.wg != nil { // Increment the WaitGroup counter before starting the goroutine @@ -331,10 +331,10 @@ func (bl *FxBlockchain) serve(w http.ResponseWriter, r *http.Request) { bl.handleAction(http.MethodPost, actionPoolCreate, from, w, r) }, actionPoolJoin: func(from peer.ID, w http.ResponseWriter, r *http.Request) { - bl.handleAction(http.MethodPost, actionPoolJoin, from, w, r) + bl.HandlePoolJoin(http.MethodPost, actionPoolJoin, from, w, r) }, actionPoolCancelJoin: func(from peer.ID, w http.ResponseWriter, r *http.Request) { - bl.handleAction(http.MethodPost, actionPoolCancelJoin, from, w, r) + bl.HandlePoolCancelJoin(http.MethodPost, actionPoolCancelJoin, from, w, r) }, actionPoolRequests: func(from peer.ID, w http.ResponseWriter, r *http.Request) { bl.handleAction(http.MethodGet, actionPoolRequests, from, w, r) @@ -774,9 +774,8 @@ func (bl *FxBlockchain) checkIfUserHasOpenPoolRequests(ctx context.Context, topi for _, user := range response.Users { //Check if self status is in pool request, start ping server and announce join request if user.PeerID == localPeerIDStr { - userRequestPoolIDStr := strconv.Itoa(*user.RequestPoolID) log.Debugw("Found self peerID", user.PeerID) - if user.RequestPoolID != nil && userRequestPoolIDStr == topicString { + if user.RequestPoolID != nil && strconv.Itoa(*user.RequestPoolID) == topicString { log.Debugw("Found self peerID in pool", "peer", user.PeerID, "pool", topicString) return true, nil } else if user.PoolID != nil { @@ -852,6 +851,7 @@ func (bl *FxBlockchain) clearPoolPeersFromPeerAddr(ctx context.Context, topic in } func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicString string, initiate bool) error { + log.Debugw("FetchUsersAndPopulateSets executed", "initiate", initiate) // Initialize the map if it's nil if bl.members == nil { bl.membersLock.Lock() @@ -992,6 +992,7 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri // Now iterate through the users and populate the member map log.Debugw("Now iterate through the users and populate the member map", "peer", localPeerID, "response", response.Users) + foundSelfInPool := false for _, user := range response.Users { pid, err := peer.Decode(user.PeerID) if err != nil { @@ -1004,6 +1005,7 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri //Check if self status is in pool request, start ping server and announce join request if user.PeerID == localPeerIDStr { log.Debugw("Found self peerID", user.PeerID) + foundSelfInPool = true if user.RequestPoolID != nil { userRequestPoolIDStr := strconv.Itoa(*user.RequestPoolID) bl.updatePoolName(userRequestPoolIDStr) @@ -1013,19 +1015,22 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri if err != nil { log.Errorw("Error when starting the Ping Server", "PeerID", user.PeerID, "err", err) } else { - log.Debugw("Found self peerID and ran Ping Server and announcing pooljoinrequest now", "peer", user.PeerID) - if bl.wg != nil { - log.Debug("Called wg.Add in somewhere before AnnounceJoinPoolRequestPeriodically") - bl.wg.Add(1) - } - go func() { + // TODO: THIS METHOD BELOW NEEDS TO RE_INITIALIZE ANNONCEMENTS WITH NEW TOPIC ND START IT FIRST + /* + log.Debugw("Found self peerID and ran Ping Server and announcing pooljoinrequest now", "peer", user.PeerID) if bl.wg != nil { - log.Debug("called wg.Done in somewhere before AnnounceJoinPoolRequestPeriodically") - defer bl.wg.Done() // Decrement the counter when the goroutine completes + log.Debug("Called wg.Add in somewhere before AnnounceJoinPoolRequestPeriodically") + bl.wg.Add(1) } - defer log.Debug("somewhere before AnnounceJoinPoolRequestPeriodically go routine is ending") - bl.a.AnnounceJoinPoolRequestPeriodically(ctx) - }() + go func() { + if bl.wg != nil { + log.Debug("called wg.Done in somewhere before AnnounceJoinPoolRequestPeriodically") + defer bl.wg.Done() // Decrement the counter when the goroutine completes + } + defer log.Debug("somewhere before AnnounceJoinPoolRequestPeriodically go routine is ending") + bl.a.AnnounceJoinPoolRequestPeriodically(ctx) + }() + */ } } else { userPoolIDStr := strconv.Itoa(*user.PoolID) @@ -1041,6 +1046,10 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri } } + if initiate && !foundSelfInPool { + bl.updatePoolName("0") + } + // Determine the status based on pool_id and request_pool_id bl.membersLock.RLock() existingStatus, exists := bl.members[pid] @@ -1066,7 +1075,10 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri if err == nil { status = common.Unknown } else { - log.Errorw("Error happened while voting", "pool", topicString, "from", localPeerID, "for", pid) + log.Errorw("Error happened while voting", "pool", topicString, "from", localPeerID, "for", pid, "err", err) + if strings.Contains(err.Error(), "AlreadyVoted") { + status = common.Unknown + } } } //} diff --git a/blockchain/options.go b/blockchain/options.go index 4fddbec5..d45603a9 100644 --- a/blockchain/options.go +++ b/blockchain/options.go @@ -14,6 +14,7 @@ type ( authorizedPeers []peer.ID allowTransientConnection bool blockchainEndPoint string + secretsPath string timeout int wg *sync.WaitGroup minPingSuccessCount int @@ -34,10 +35,11 @@ func newOptions(o ...Option) (*options, error) { authorizedPeers: []peer.ID{}, // default to an empty slice allowTransientConnection: true, // or false, as per your default blockchainEndPoint: "127.0.0.1:4000", // default endpoint + secretsPath: "", //path to secrets dir timeout: 30, // default timeout in seconds wg: nil, // initialized WaitGroup minPingSuccessCount: 3, // default minimum success count - maxPingTime: 10, // default maximum ping time in seconds + maxPingTime: 200, // default maximum ping time in miliseconds topicName: "0", // default topic name relays: []string{}, // default to an empty slice updatePoolName: defaultUpdatePoolName, // set a default function or leave nil @@ -82,6 +84,13 @@ func WithBlockchainEndPoint(b string) Option { } } +func WithSecretsPath(b string) Option { + return func(o *options) error { + o.secretsPath = b + return nil + } +} + func WithTimeout(to int) Option { return func(o *options) error { o.timeout = to diff --git a/blox/blox.go b/blox/blox.go index 77a74aa4..b629b4bb 100644 --- a/blox/blox.go +++ b/blox/blox.go @@ -70,7 +70,7 @@ func New(o ...Option) (*Blox, error) { if err != nil { return nil, err } - + log.Debug("Ping server started successfully") p.an, err = announcements.NewFxAnnouncements(p.h, announcements.WithAnnounceInterval(5), announcements.WithTimeout(3), @@ -83,13 +83,14 @@ func New(o ...Option) (*Blox, error) { } p.bl, err = blockchain.NewFxBlockchain(p.h, p.pn, p.an, - blockchain.NewSimpleKeyStorer(""), + blockchain.NewSimpleKeyStorer(p.secretsPath), blockchain.WithAuthorizer(authorizer), blockchain.WithAuthorizedPeers(authorizedPeers), blockchain.WithBlockchainEndPoint(p.blockchainEndpoint), + blockchain.WithSecretsPath(p.secretsPath), blockchain.WithTimeout(65), blockchain.WithWg(p.wg), - blockchain.WithFetchFrequency(3), + blockchain.WithFetchFrequency(1*time.Minute), blockchain.WithTopicName(p.topicName), blockchain.WithUpdatePoolName(p.updatePoolName), blockchain.WithRelays(p.relays), diff --git a/blox/options.go b/blox/options.go index 25d43093..63c81bb5 100644 --- a/blox/options.go +++ b/blox/options.go @@ -39,6 +39,7 @@ type ( maxPingTime int minSuccessRate int blockchainEndpoint string + secretsPath string IPFShttpServer *http.Server wg *sync.WaitGroup } @@ -216,6 +217,13 @@ func WithBlockchainEndPoint(b string) Option { } } +func WithSecretsPath(b string) Option { + return func(o *options) error { + o.secretsPath = b + return nil + } +} + func WithWg(wg *sync.WaitGroup) Option { return func(o *options) error { o.wg = wg diff --git a/cmd/blox/main.go b/cmd/blox/main.go index 69049841..d6f67ba6 100644 --- a/cmd/blox/main.go +++ b/cmd/blox/main.go @@ -74,6 +74,7 @@ var ( cli.App initOnly bool blockchainEndpoint string + secretsPath string generateNodeKey bool wireless bool configPath string @@ -382,6 +383,12 @@ func init() { Destination: &app.blockchainEndpoint, Value: "127.0.0.1:4000", }, + &cli.StringFlag{ + Name: "secretsPath", + Usage: "Change the path for storing secret words", + Destination: &app.secretsPath, + Value: "", + }, }, Before: before, Action: action, @@ -785,6 +792,7 @@ func action(ctx *cli.Context) error { blox.WithRelays(app.config.StaticRelays), blox.WithUpdatePoolName(updatePoolName), blox.WithBlockchainEndPoint(app.blockchainEndpoint), + blox.WithSecretsPath(app.secretsPath), blox.WithPingCount(5), blox.WithExchangeOpts( exchange.WithUpdateConfig(updateConfig), diff --git a/ping/ping.go b/ping/ping.go index 33f02ff4..536561a2 100644 --- a/ping/ping.go +++ b/ping/ping.go @@ -225,8 +225,9 @@ func (pn *FxPing) authorized(pid peer.ID, action string) bool { // If the peer has pinged before, increment the count pn.pingedPeers[pid] = count + 1 // Check if the count exceeds the limit - if count >= pn.count { - log.Errorw("Rejecting ping request; count limit exceeded", "pid", pid, "count", count, "limit", pn.count) + // We allow 3 times the required ping to allow 3 retries for a peer t ping this one + if count >= pn.count*3 { + log.Errorw("Rejecting ping request; count limit exceeded", "pid", pid, "count", count, "limit", pn.count*3) return false } log.Infow("Authorizing peer for additional ping", "pid", pid, "count", count)