Skip to content

Commit

Permalink
Switched Ping to IPFS
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Mar 6, 2024
1 parent 11cfcf7 commit 4227664
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 13 deletions.
73 changes: 62 additions & 11 deletions blockchain/bl_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

type PingResponse struct {
Success bool `json:"Success"`
Time int64 `json:"Time"`
Text string `json:"Text"`
}

func (bl *FxBlockchain) PoolCreate(ctx context.Context, to peer.ID, r PoolCreateRequest) ([]byte, error) {

if bl.allowTransientConnection {
Expand Down Expand Up @@ -624,27 +630,72 @@ func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID,
return err
}
}
averageDuration := float64(2000)
successCount := 0
status, exists := bl.GetMemberStatus(from)
if !exists {
return fmt.Errorf("peerID does not exist in the list of pool requests or pool members: %s", from)
}
if status == common.Pending {
// Ping
log.Debugw("****** Pinging pending node", "from", bl.h.ID(), "to", from)
averageDuration, successCount, err := bl.p.Ping(ctx, from)
/*
// Use IPFS Ping
log.Debugw("****** Pinging pending node", "from", bl.h.ID(), "to", from)
averageDuration, successCount, err := bl.p.Ping(ctx, from)
if err != nil {
log.Errorw("An error occurred during ping", "error", err)
return err
}
if bl.maxPingTime == 0 {
//TODO: This should not happen but is happening!
bl.maxPingTime = 200
}
if bl.minPingSuccessCount == 0 {
//TODO: This should not happen but is happening!
bl.minPingSuccessCount = 3
}
*/
// Set up the request with context for timeout
ctxPing, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

// Send the ping request
res, err := bl.rpc.Request("ping", from.String()).Send(ctxPing)
if err != nil {
log.Errorw("An error occurred during ping", "error", err)
return err
return fmt.Errorf("ping was unsuccessful: %s", err)
}
if res.Error != nil {
return fmt.Errorf("ping was unsuccessful: %s", res.Error)
}
if bl.maxPingTime == 0 {
//TODO: This should not happen but is happening!
bl.maxPingTime = 200

// Process multiple responses using a decoder
decoder := json.NewDecoder(res.Output)

var totalTime int64
var count int

for decoder.More() {
var pingResp PingResponse
err := decoder.Decode(&pingResp)
if err != nil {
log.Errorf("error decoding JSON response: %s", err)
continue
}

if pingResp.Text == "" && pingResp.Time > 0 { // Check for empty Text field and Time
totalTime += pingResp.Time
count++
}
}
if bl.minPingSuccessCount == 0 {
//TODO: This should not happen but is happening!
bl.minPingSuccessCount = 3

if count > 0 {
averageDuration = float64(totalTime) / float64(count) / 1e6 // Convert nanoseconds to milliseconds
successCount = count
} else {
fmt.Println("No valid ping responses received")
}
vote := averageDuration <= bl.maxPingTime && successCount >= bl.minPingSuccessCount

vote := int(averageDuration) <= bl.maxPingTime && successCount >= bl.minPingSuccessCount

log.Debugw("Ping result", "averageDuration", averageDuration, "successCount", successCount, "vote", vote, "bl.maxPingTime", bl.maxPingTime, "bl.minPingSuccessCount", bl.minPingSuccessCount)

Expand Down
4 changes: 2 additions & 2 deletions blockchain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func newOptions(o ...Option) (*options, error) {
secretsPath: "", //path to secrets dir
timeout: 30, // default timeout in seconds
wg: nil, // initialized WaitGroup
minPingSuccessCount: 3, // default minimum success count
maxPingTime: 200, // default maximum ping time in miliseconds
minPingSuccessCount: 7, // default minimum success count
maxPingTime: 400, // default maximum ping time in miliseconds
topicName: "0", // default topic name
relays: []string{}, // default to an empty slice
updatePoolName: defaultUpdatePoolName, // set a default function or leave nil
Expand Down
6 changes: 6 additions & 0 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ipfs/boxo/path"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/kubo/client/rpc"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Expand Down Expand Up @@ -533,6 +534,11 @@ func (p *Blox) GetBlMembers() map[peer.ID]common.MemberStatus {
return p.bl.GetMembers()
}

func (p *Blox) GetIPFSRPC() *rpc.HttpApi {
//This is for unit testing and no need to call directly
return p.rpc
}

func (p *Blox) BloxFreeSpace(ctx context.Context, to peer.ID) ([]byte, error) {
//This is for unit testing and no need to call directly
return p.bl.BloxFreeSpace(ctx, to)
Expand Down
115 changes: 115 additions & 0 deletions blox/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/functionland/go-fula/blox"
"github.com/functionland/go-fula/exchange"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/kubo/client/rpc"
"github.com/ipld/go-ipld-prime/codec/dagjson"
"github.com/ipld/go-ipld-prime/fluent"
"github.com/ipld/go-ipld-prime/node/basicnode"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
"github.com/multiformats/go-multiaddr"

Check failure on line 28 in blox/example_test.go

View workflow job for this annotation

GitHub Actions / All

package "github.com/multiformats/go-multiaddr" is being imported more than once (ST1019)

Check failure on line 28 in blox/example_test.go

View workflow job for this annotation

GitHub Actions / All

package "github.com/multiformats/go-multiaddr" is being imported more than once (ST1019)
ma "github.com/multiformats/go-multiaddr"
)

var log = logging.Logger("fula/mockserver")
Expand Down Expand Up @@ -1508,3 +1510,116 @@ func Example_blserver() {
// Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true}
// Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true}
}

type PingResponse struct {
Success bool `json:"Success"`
Time int64 `json:"Time"`
Text string `json:"Text"`
}

func Example_pingtest() {
averageDuration := float64(2000)
successCount := 0
server := startMockServer("127.0.0.1:4000")
defer func() {
// Shutdown the server after test
if err := server.Shutdown(context.Background()); err != nil {
panic(err) // Handle the error as you see fit
}
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
defer cancel()

// Elevate log level to show internal communications.
if err := logging.SetLogLevel("*", "debug"); err != nil {
panic(err)
}

nodeMultiAddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5001")
if err != nil {
panic(fmt.Errorf("invalid multiaddress: %w", err))
}
node, err := rpc.NewApi(nodeMultiAddr)

// Use a deterministic random generator to generate deterministic
// output for the example.
h1, err := libp2p.New(libp2p.Identity(generateIdentity(1)))
if err != nil {
panic(err)
}
n1, err := blox.New(
blox.WithPoolName(poolName),
blox.WithTopicName(poolName),
blox.WithHost(h1),
blox.WithUpdatePoolName(updatePoolName),
blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}),
blox.WithPingCount(5),
blox.WithIpfsClient(node),
blox.WithExchangeOpts(
exchange.WithDhtProviderOptions(
dht.ProtocolExtension(protocol.ID("/"+poolName)),
dht.ProtocolPrefix("/fula"),
dht.Resiliency(1),
dht.Mode(dht.ModeAutoServer),
),
),
)
if err != nil {
panic(err)
}
if err := n1.Start(ctx); err != nil {
panic(err)
}
defer n1.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String())
PingCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

// Send the ping request
n1.GetBlMembers()
rpc := n1.GetIPFSRPC()
res, err := rpc.Request("ping", "12D3KooWHb38UxY8akVGWZBuFtS3NJ7rJUwd36t3cfkoY7EbgNt9").Send(PingCtx)
if err != nil {
fmt.Printf("ping was unsuccessful: %s", err)
return
}
if res.Error != nil {
fmt.Printf("ping was unsuccessful: %s", res.Error)
return
}

// Process multiple responses using a decoder
decoder := json.NewDecoder(res.Output)

var totalTime int64
var count int

for decoder.More() {
var pingResp PingResponse
err := decoder.Decode(&pingResp)
if err != nil {
log.Errorf("error decoding JSON response: %s", err)
continue
}

if pingResp.Text == "" && pingResp.Time > 0 { // Check for empty Text field and Time
totalTime += pingResp.Time
count++
}
}

if count > 0 {
averageDuration = float64(totalTime) / float64(count) / 1e6 // Convert nanoseconds to milliseconds
successCount = count
} else {
fmt.Println("No valid ping responses received")
return
}
if int(averageDuration) > 1 {
fmt.Printf("Final response: successCount=%d", successCount)
}
// Unordered output:
// Final response: successCount=10
}

0 comments on commit 4227664

Please sign in to comment.