Skip to content

Commit

Permalink
Pushrate fix (#218)
Browse files Browse the repository at this point in the history
* corrected ipfs requirements for testing

* removed systemresourcemanager

* Update main.go

* gobreaker
  • Loading branch information
ehsan6sha authored Mar 10, 2024
1 parent 07f4dcb commit 4d86d1b
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 52 deletions.
4 changes: 3 additions & 1 deletion blockchain/bl_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,9 @@ func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID,
// Set up the request with context for timeout
ctxPing, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

if bl.rpc == nil {
return fmt.Errorf("IPFS rpc is not defined")
}
// Send the ping request
res, err := bl.rpc.Request("ping", from.String()).Send(ctxPing)
if err != nil {
Expand Down
20 changes: 10 additions & 10 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,17 +862,16 @@ func (bl *FxBlockchain) SetAuth(ctx context.Context, on peer.ID, subject peer.ID

func (bl *FxBlockchain) authorized(pid peer.ID, action string) bool {
log.Debugw("Checking authorization", "action", action, "pid", pid, "bl.authorizer", bl.authorizer, "h.ID", bl.h.ID())
if bl.authorizer == bl.h.ID() || bl.authorizer == "" { //to cover the cases where in poolHost mode
return action == actionReplicateInPool
}
switch action {
case actionReplicateInPool:
return (bl.authorizer == bl.h.ID() || bl.authorizer == "")
case actionBloxFreeSpace, actionAccountFund, actionManifestBatchUpload, actionAssetsBalance, actionGetDatastoreSize, actionGetFolderSize, actionFetchContainerLogs, actionEraseBlData, actionWifiRemoveall, actionReboot, actionPartition, actionDeleteWifi, actionDisconnectWifi, actionDeleteFulaConfig, actionGetAccount, actionSeeded, actionAccountExists, actionPoolCreate, actionPoolJoin, actionPoolCancelJoin, actionPoolRequests, actionPoolList, actionPoolVote, actionPoolLeave, actionManifestUpload, actionManifestStore, actionManifestAvailable, actionManifestRemove, actionManifestRemoveStorer, actionManifestRemoveStored:
bl.authorizedPeersLock.RLock()
_, ok := bl.authorizedPeers[pid]
bl.authorizedPeersLock.RUnlock()
return ok
case actionAuth:
return pid == bl.authorizer
return pid == bl.authorizer && bl.authorizer != ""
default:
return false
}
Expand Down Expand Up @@ -1113,15 +1112,16 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
reqCtx, cancelReqCtx := context.WithTimeout(ctx, 2*time.Second)
defer cancelReqCtx() // Ensures resources are cleaned up after the Stat call
poolHostAddrString := "/dns4/" + clusterEndpoint + "/tcp/4001/p2p/" + poolHostPeerID
bl.rpc.Request("bootstrap/add", poolHostAddrString).Send(reqCtx)
poolHostAddr, err := ma.NewMultiaddr(poolHostAddrString)
if err == nil {
poolHostAddrInfos, err := peer.AddrInfosFromP2pAddrs(poolHostAddr)
if bl.rpc != nil {
bl.rpc.Request("bootstrap/add", poolHostAddrString).Send(reqCtx)
poolHostAddr, err := ma.NewMultiaddr(poolHostAddrString)
if err == nil {
bl.rpc.Swarm().Connect(reqCtx, poolHostAddrInfos[0])
poolHostAddrInfos, err := peer.AddrInfosFromP2pAddrs(poolHostAddr)
if err == nil {
bl.rpc.Swarm().Connect(reqCtx, poolHostAddrInfos[0])
}
}
}

} else {
// Handle the error: Endpoint didn't match the pattern
fmt.Println("Error: Could not extract peerID from endpoint")
Expand Down
15 changes: 11 additions & 4 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/kubo/client/rpc"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Expand Down Expand Up @@ -126,6 +127,9 @@ func (p *Blox) PubsubValidator(ctx context.Context, id peer.ID, msg *pubsub.Mess
func (p *Blox) storeCidIPFS(ctx context.Context, c path.Path) error {
getCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if p.rpc == nil {
return fmt.Errorf("IPFS rpc is undefined")
}
_, err := p.rpc.Block().Get(getCtx, c)
if err != nil {
log.Errorw("It seems that the link is not found", "c", c, "err", err)
Expand Down Expand Up @@ -185,10 +189,13 @@ func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error {
}
statCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel() // Ensures resources are cleaned up after the Stat call
stat, err := p.rpc.Block().Stat(statCtx, cidPath)
if err != nil {
log.Errorw("It seems that the link is not stored", "l", l, "err", err)
continue
var stat iface.BlockStat
if p.rpc != nil {
stat, err = p.rpc.Block().Stat(statCtx, cidPath)
if err != nil {
log.Errorw("It seems that the link is not stored", "l", l, "err", err)
continue
}
}
p.ex.IpniNotifyLink(l)
log.Debugw("link might be successfully stored", "l", l, "from", provider.ID, "size", stat.Size())
Expand Down
31 changes: 28 additions & 3 deletions cmd/blox/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -1043,7 +1044,6 @@ func action(ctx *cli.Context) error {
listenAddrs = append(listenAddrs, relayAddr2)

hopts := []libp2p.Option{
libp2p.ListenAddrs(listenAddrs...),
libp2p.EnableNATService(),
libp2p.NATPortMap(),
libp2p.EnableRelay(),
Expand Down Expand Up @@ -1078,12 +1078,33 @@ func action(ctx *cli.Context) error {
))
}

bopts := append(hopts, libp2p.Identity(k))
bopts := append(hopts, libp2p.Identity(k), libp2p.ListenAddrs(listenAddrs...))
h, err := libp2p.New(bopts...)
if err != nil {
return err
}
iopts := append(hopts, libp2p.Identity(ipnik))

// Create a new slice for updated listen addresses
ipniListenAddrs := make([]ma.Multiaddr, len(listenAddrs))
// Update each address in the slice
for i, addr := range listenAddrs {
// Convert the multiaddr to a string for manipulation
addrStr := addr.String()

// Replace "40001" with "40002" in the string representation
updatedAddrStr := strings.Replace(addrStr, "40001", "40002", -1)

// Parse the updated string back into a multiaddr.Multiaddr
updatedAddr, err := ma.NewMultiaddr(updatedAddrStr)
if err != nil {
log.Fatalf("Failed to parse updated multiaddr '%s': %v", updatedAddrStr, err)
}

// Store the updated multiaddr in the slice
ipniListenAddrs[i] = updatedAddr
}

iopts := append(hopts, libp2p.Identity(ipnik), libp2p.ListenAddrs(ipniListenAddrs...))
ipnih, err := libp2p.New(iopts...)
if err != nil {
return err
Expand All @@ -1102,6 +1123,10 @@ func action(ctx *cli.Context) error {
panic(fmt.Errorf("invalid multiaddress: %w", err))
}
node, err := rpc.NewApi(nodeMultiAddr)
if err != nil {
logger.Fatal(err)
return err
}

const useIPFSServer = "none" //internal: runs local ipfs instance, none requires an external one and fula runs the mock server on 5001
if useIPFSServer == "internal" {
Expand Down
119 changes: 103 additions & 16 deletions exchange/fx_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"errors"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"strings"
"sync"
Expand All @@ -34,6 +36,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/sony/gobreaker"
"golang.org/x/sync/errgroup"
)

Expand All @@ -51,13 +54,24 @@ var (
log = logging.Logger("fula/exchange")
errUnauthorized = errors.New("not authorized")
exploreAllRecursivelySelector selector.Selector
breakTime = 10 * time.Second
)

type tempAuthEntry struct {
peerID peer.ID
timestamp time.Time
}

var cbSettings = gobreaker.Settings{
Name: "Push Circuit Breaker",
MaxRequests: 3, // Open circuit after 3 consecutive failures
Interval: breakTime, // Remain open for 10 seconds
OnStateChange: func(name string, from gobreaker.State, to gobreaker.State) {
log.Errorf("Circuit breaker '%s' changed state from '%s' to '%s'", name, from, to)
},
}
var pushCircuitBreaker = gobreaker.NewCircuitBreaker(cbSettings)

func init() {
var err error
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
Expand Down Expand Up @@ -101,7 +115,13 @@ func NewFxExchange(h host.Host, ls ipld.LinkSystem, o ...Option) (*FxExchange, e
if err != nil {
return nil, err
}
tr := &http.Transport{}
tr := &http.Transport{
DisableKeepAlives: true, // Ensure connections are not reused
MaxIdleConns: 500,
MaxConnsPerHost: 2000,
IdleConnTimeout: 20 * time.Second,
// Include any other necessary transport configuration here
}
tr.RegisterProtocol("libp2p", p2phttp.NewTransport(h, p2phttp.ProtocolOption(FxExchangeProtocolID)))
client := &http.Client{Transport: tr}

Expand Down Expand Up @@ -407,6 +427,7 @@ func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error {
// Recursively traverse the node and push all its leaves.
err = progress.WalkMatching(node, exploreAllRecursivelySelector, func(progress traversal.Progress, node datamodel.Node) error {
eg.Go(func() error {
// Create a new context for this specific push operation
e.pushRateLimiter.Take()
link, err := e.ls.ComputeLink(l.Prototype(), node)
if err != nil {
Expand All @@ -424,7 +445,10 @@ func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error {
}

func (e *FxExchange) pushOneNode(ctx context.Context, node ipld.Node, to peer.ID, link datamodel.Link) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var buf bytes.Buffer

c := link.(cidlink.Link).Cid
encoder, err := ipldmc.LookupEncoder(c.Prefix().Codec)
if err != nil {
Expand All @@ -441,24 +465,84 @@ func (e *FxExchange) pushOneNode(ctx context.Context, node ipld.Node, to peer.ID
log.Errorw("Failed to instantiate push request", "err", err)
return err
}
resp, err := e.c.Do(req)
_, err = pushCircuitBreaker.Execute(func() (interface{}, error) {
resp, err := e.c.Do(req)
if err != nil {
log.Errorw("Failed to do push request1", "err", err)
return nil, err
}
defer resp.Body.Close()

// Handle successful response with your existing logic
b, err := io.ReadAll(resp.Body)
switch {
case err != nil:
log.Errorw("Failed to read the response from push", "err", err)
return nil, err // Signal an error to the circuit breaker
case resp.StatusCode != http.StatusOK:
log.Errorw("Received non-OK response from push", "err", err)
return nil, fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(b))
default:
log.Debug("Successfully pushed traversed node")
return nil, nil // Signal success to the circuit breaker
}
})
// Handle potential errors and circuit breaker state
if err != nil {
log.Errorw("Failed to send push request", "err", err)
return err
switch {
case errors.Is(err, gobreaker.ErrOpenState):
// Circuit breaker is open (likely due to previous errors)
return fmt.Errorf("circuit breaker open: %w", err)
default:
// Potentially retry or return the error based on its nature
return retryWithBackoff(ctx, func() error {
// Attempt to execute the request within the circuit breaker again
_, err := pushCircuitBreaker.Execute(func() (interface{}, error) {
resp, err := e.c.Do(req)
if err != nil {
log.Errorw("Failed to do push request on retry", "err", err)
return nil, err // Signal failure to the circuit breaker
}
defer resp.Body.Close()

// Handle successful response with your existing logic
b, err := io.ReadAll(resp.Body)
switch {
case err != nil:
log.Errorw("Failed to read the response from push", "err", err)
return nil, err // Signal an error to the circuit breaker
case resp.StatusCode != http.StatusOK:
log.Errorw("Received non-OK response from push", "err", err)
return nil, fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(b))
default:
log.Debug("Successfully pushed traversed node")
return nil, nil // Signal success to the circuit breaker
}
})
return err // Return potential errors after circuit breaker execution attempt
})
}
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
switch {
case err != nil:
log.Errorw("Failed to read the response from push", "err", err)
return err
case resp.StatusCode != http.StatusOK:
log.Errorw("Received non-OK response from push", "err", err)
return fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(b))
default:
log.Debug("Successfully pushed traversed node")
return nil
return nil
}
func retryWithBackoff(ctx context.Context, fn func() error) error {
maxRetries := 5
baseDelay := 1 * time.Second

for attempt := 0; attempt < maxRetries; attempt++ {
err := fn()
if err == nil {
return nil
}
delay := baseDelay * time.Duration(math.Pow(2, float64(attempt)))
jitter := time.Duration(rand.Float64() * float64(delay))
select {
case <-time.After(delay + jitter):
case <-ctx.Done():
return ctx.Err()
}
}
return fmt.Errorf("failed after %d retries", maxRetries)
}

func (e *FxExchange) serve(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -510,7 +594,10 @@ func (e *FxExchange) handlePush(from peer.ID, w http.ResponseWriter, r *http.Req
}
if err := e.decodeAndStoreBlock(r.Context(), r.Body, cidlink.Link{Cid: cidInPath}); err != nil {
http.Error(w, "invalid cid", http.StatusBadRequest)
return
}

w.WriteHeader(http.StatusOK)
}

func (e *FxExchange) decodeAndStoreBlock(ctx context.Context, r io.ReadCloser, link ipld.Link) error {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ require (
github.com/rs/cors v1.10.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/samber/lo v1.39.0 // indirect
github.com/sony/gobreaker v0.5.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,8 @@ github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9
github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg=
github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM=
github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY=
github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg=
github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE=
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc=
Expand Down
6 changes: 3 additions & 3 deletions mobile/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func NewConfig() *Config {
AllowTransientConnection: true,
PoolName: "0",
BlockchainEndpoint: "api.node3.functionyard.fula.network",
DisableResourceManger: true,
}
}

Expand All @@ -96,6 +97,7 @@ func (cfg *Config) init(mc *Client) error {
libp2p.EnableRelay(),
libp2p.EnableHolePunching(),
}
cfg.DisableResourceManger = true
if cfg.DisableResourceManger {
hopts = append(hopts, libp2p.ResourceManager(&network.NullResourceManager{}))
}
Expand Down Expand Up @@ -196,6 +198,7 @@ func (cfg *Config) init(mc *Client) error {
exchange.WithAuthorizer(mc.h.ID()),
exchange.WithAllowTransientConnection(cfg.AllowTransientConnection),
exchange.WithIpniPublishDisabled(true),
exchange.WithMaxPushRate(50),
exchange.WithDhtProviderOptions(
dht.Datastore(namespace.Wrap(mc.ds, datastore.NewKey("dht"))),
dht.ProtocolExtension(protocol.ID("/"+cfg.PoolName)),
Expand Down Expand Up @@ -223,9 +226,6 @@ func (cfg *Config) init(mc *Client) error {
return err
}
}
if err != nil {
return err
}
}
return mc.ex.Start(context.TODO())
}
Loading

0 comments on commit 4d86d1b

Please sign in to comment.