diff --git a/blockchain/bl_pool.go b/blockchain/bl_pool.go index 64c419a..22f381f 100644 --- a/blockchain/bl_pool.go +++ b/blockchain/bl_pool.go @@ -16,6 +16,12 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ) +type PingResponse struct { + Success bool `json:"Success"` + Time int64 `json:"Time"` + Text string `json:"Text"` +} + func (bl *FxBlockchain) PoolCreate(ctx context.Context, to peer.ID, r PoolCreateRequest) ([]byte, error) { if bl.allowTransientConnection { @@ -624,27 +630,72 @@ func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID, return err } } + averageDuration := float64(2000) + successCount := 0 status, exists := bl.GetMemberStatus(from) if !exists { return fmt.Errorf("peerID does not exist in the list of pool requests or pool members: %s", from) } if status == common.Pending { // Ping - log.Debugw("****** Pinging pending node", "from", bl.h.ID(), "to", from) - averageDuration, successCount, err := bl.p.Ping(ctx, from) + /* + // Use IPFS Ping + log.Debugw("****** Pinging pending node", "from", bl.h.ID(), "to", from) + averageDuration, successCount, err := bl.p.Ping(ctx, from) + if err != nil { + log.Errorw("An error occurred during ping", "error", err) + return err + } + if bl.maxPingTime == 0 { + //TODO: This should not happen but is happening! + bl.maxPingTime = 200 + } + if bl.minPingSuccessCount == 0 { + //TODO: This should not happen but is happening! + bl.minPingSuccessCount = 3 + } + */ + // Set up the request with context for timeout + ctxPing, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // Send the ping request + res, err := bl.rpc.Request("ping", from.String()).Send(ctxPing) if err != nil { - log.Errorw("An error occurred during ping", "error", err) - return err + return fmt.Errorf("ping was unsuccessful: %s", err) + } + if res.Error != nil { + return fmt.Errorf("ping was unsuccessful: %s", res.Error) } - if bl.maxPingTime == 0 { - //TODO: This should not happen but is happening! - bl.maxPingTime = 200 + + // Process multiple responses using a decoder + decoder := json.NewDecoder(res.Output) + + var totalTime int64 + var count int + + for decoder.More() { + var pingResp PingResponse + err := decoder.Decode(&pingResp) + if err != nil { + log.Errorf("error decoding JSON response: %s", err) + continue + } + + if pingResp.Text == "" && pingResp.Time > 0 { // Check for empty Text field and Time + totalTime += pingResp.Time + count++ + } } - if bl.minPingSuccessCount == 0 { - //TODO: This should not happen but is happening! - bl.minPingSuccessCount = 3 + + if count > 0 { + averageDuration = float64(totalTime) / float64(count) / 1e6 // Convert nanoseconds to milliseconds + successCount = count + } else { + fmt.Println("No valid ping responses received") } - vote := averageDuration <= bl.maxPingTime && successCount >= bl.minPingSuccessCount + + vote := int(averageDuration) <= bl.maxPingTime && successCount >= bl.minPingSuccessCount log.Debugw("Ping result", "averageDuration", averageDuration, "successCount", successCount, "vote", vote, "bl.maxPingTime", bl.maxPingTime, "bl.minPingSuccessCount", bl.minPingSuccessCount) diff --git a/blockchain/options.go b/blockchain/options.go index af8a647..9fe2fb7 100644 --- a/blockchain/options.go +++ b/blockchain/options.go @@ -42,8 +42,8 @@ func newOptions(o ...Option) (*options, error) { secretsPath: "", //path to secrets dir timeout: 30, // default timeout in seconds wg: nil, // initialized WaitGroup - minPingSuccessCount: 3, // default minimum success count - maxPingTime: 200, // default maximum ping time in miliseconds + minPingSuccessCount: 7, // default minimum success count + maxPingTime: 400, // default maximum ping time in miliseconds topicName: "0", // default topic name relays: []string{}, // default to an empty slice updatePoolName: defaultUpdatePoolName, // set a default function or leave nil diff --git a/blox/blox.go b/blox/blox.go index b928956..6e283aa 100644 --- a/blox/blox.go +++ b/blox/blox.go @@ -18,6 +18,7 @@ import ( "github.com/ipfs/boxo/path" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" + "github.com/ipfs/kubo/client/rpc" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" @@ -533,6 +534,11 @@ func (p *Blox) GetBlMembers() map[peer.ID]common.MemberStatus { return p.bl.GetMembers() } +func (p *Blox) GetIPFSRPC() *rpc.HttpApi { + //This is for unit testing and no need to call directly + return p.rpc +} + func (p *Blox) BloxFreeSpace(ctx context.Context, to peer.ID) ([]byte, error) { //This is for unit testing and no need to call directly return p.bl.BloxFreeSpace(ctx, to) diff --git a/blox/example_test.go b/blox/example_test.go index 6ad4d5e..6a9a16e 100644 --- a/blox/example_test.go +++ b/blox/example_test.go @@ -14,6 +14,7 @@ import ( "github.com/functionland/go-fula/blox" "github.com/functionland/go-fula/exchange" logging "github.com/ipfs/go-log/v2" + "github.com/ipfs/kubo/client/rpc" "github.com/ipld/go-ipld-prime/codec/dagjson" "github.com/ipld/go-ipld-prime/fluent" "github.com/ipld/go-ipld-prime/node/basicnode" @@ -25,6 +26,7 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/host/autorelay" "github.com/multiformats/go-multiaddr" + ma "github.com/multiformats/go-multiaddr" ) var log = logging.Logger("fula/mockserver") @@ -1508,3 +1510,116 @@ func Example_blserver() { // Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true} // Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true} } + +type PingResponse struct { + Success bool `json:"Success"` + Time int64 `json:"Time"` + Text string `json:"Text"` +} + +func Example_pingtest() { + averageDuration := float64(2000) + successCount := 0 + server := startMockServer("127.0.0.1:4000") + defer func() { + // Shutdown the server after test + if err := server.Shutdown(context.Background()); err != nil { + panic(err) // Handle the error as you see fit + } + }() + + const poolName = "1" + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() + + // Elevate log level to show internal communications. + if err := logging.SetLogLevel("*", "debug"); err != nil { + panic(err) + } + + nodeMultiAddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5001") + if err != nil { + panic(fmt.Errorf("invalid multiaddress: %w", err)) + } + node, err := rpc.NewApi(nodeMultiAddr) + + // Use a deterministic random generator to generate deterministic + // output for the example. + h1, err := libp2p.New(libp2p.Identity(generateIdentity(1))) + if err != nil { + panic(err) + } + n1, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h1), + blox.WithUpdatePoolName(updatePoolName), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithPingCount(5), + blox.WithIpfsClient(node), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), + ) + if err != nil { + panic(err) + } + if err := n1.Start(ctx); err != nil { + panic(err) + } + defer n1.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) + PingCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // Send the ping request + n1.GetBlMembers() + rpc := n1.GetIPFSRPC() + res, err := rpc.Request("ping", "12D3KooWHb38UxY8akVGWZBuFtS3NJ7rJUwd36t3cfkoY7EbgNt9").Send(PingCtx) + if err != nil { + fmt.Printf("ping was unsuccessful: %s", err) + return + } + if res.Error != nil { + fmt.Printf("ping was unsuccessful: %s", res.Error) + return + } + + // Process multiple responses using a decoder + decoder := json.NewDecoder(res.Output) + + var totalTime int64 + var count int + + for decoder.More() { + var pingResp PingResponse + err := decoder.Decode(&pingResp) + if err != nil { + log.Errorf("error decoding JSON response: %s", err) + continue + } + + if pingResp.Text == "" && pingResp.Time > 0 { // Check for empty Text field and Time + totalTime += pingResp.Time + count++ + } + } + + if count > 0 { + averageDuration = float64(totalTime) / float64(count) / 1e6 // Convert nanoseconds to milliseconds + successCount = count + } else { + fmt.Println("No valid ping responses received") + return + } + if int(averageDuration) > 1 { + fmt.Printf("Final response: successCount=%d", successCount) + } + // Unordered output: + // Final response: successCount=10 +}