Skip to content

Commit

Permalink
joinpool final draft
Browse files Browse the repository at this point in the history
1- added clean peerstore
2- udpate poolName in config file after joinpool
3- check the pool members periodically to see if the join happened
  • Loading branch information
ehsan6sha committed Nov 9, 2023
1 parent a1aeca7 commit aa67f9e
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 13 deletions.
15 changes: 8 additions & 7 deletions announcements/announcements.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
)

var (
Expand Down Expand Up @@ -127,12 +126,14 @@ func (an *FxAnnouncements) HandleAnnouncements(ctx context.Context) {
log.Errorw("failed to decode announcement data", "err", err)
continue
}
addrs, err := a.GetAddrs()
if err != nil {
log.Errorw("failed to decode announcement addrs", "err", err)
continue
}
an.h.Peerstore().AddAddrs(from, addrs, peerstore.PermanentAddrTTL)
/*
addrs, err := a.GetAddrs()
if err != nil {
log.Errorw("failed to decode announcement addrs", "err", err)
continue
}
an.h.Peerstore().AddAddrs(from, addrs, peerstore.ConnectedAddrTTL)
*/
log.Infow("received announcement", "from", from, "self", an.h.ID(), "announcement", a)
err = an.processAnnouncement(ctx, from, a.Type)
if err != nil {
Expand Down
49 changes: 48 additions & 1 deletion blockchain/bl_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"net/http"
"strconv"
"time"

"github.com/functionland/go-fula/common"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -86,10 +87,50 @@ func (bl *FxBlockchain) PoolJoin(ctx context.Context, to peer.ID, r PoolJoinRequ
// Return the parsed error message and description.
return nil, fmt.Errorf("unexpected response: %d %s - %s", resp.StatusCode, apiError.Message, apiError.Description)
default:
err := bl.StartPingServer(ctx)
poolID := r.PoolID
poolIDStr := strconv.Itoa(poolID)
err := bl.updatePoolName(poolIDStr)
if err != nil {
return b, err
}
err = bl.StartPingServer(ctx)
if err != nil {
return b, err
}
// Create a ticker that triggers every 10 minutes
err = bl.FetchUsersAndPopulateSets(ctx, poolIDStr, true)
if err != nil {
log.Errorw("Error fetching and populating users", "err", err)
}
bl.stopFetchUsersAfterJoinChan = make(chan struct{})
ticker := time.NewTicker(bl.fetchInterval * time.Minute)
bl.wg.Add(1) // Increment the wait group counter
go func() {
defer bl.wg.Done() // Decrement the wait group counter when the goroutine completes
defer ticker.Stop() // Ensure the ticker is stopped when the goroutine exits

for {
select {
case <-ticker.C:
// Call FetchUsersAndPopulateSets at every tick (10 minutes interval)
if err := bl.FetchUsersAndPopulateSets(ctx, strconv.Itoa(r.PoolID), false); err != nil {
log.Errorw("Error fetching and populating users", "err", err)
}
status, exists := bl.GetMemberStatus(to)
if exists && status == common.Approved {
ticker.Stop()
bl.StopPingServer(ctx)
if bl.a != nil {
bl.a.StopJoinPoolRequestAnnouncements()
}
}
case <-bl.stopFetchUsersAfterJoinChan:
// Stop the ticker when receiving a stop signal
ticker.Stop()
return
}
}
}()
if bl.a != nil {
bl.wg.Add(1)
go bl.a.AnnounceJoinPoolRequestPeriodically(ctx)
Expand Down Expand Up @@ -154,6 +195,12 @@ func (bl *FxBlockchain) PoolCancelJoin(ctx context.Context, to peer.ID, r PoolCa
if bl.a != nil {
bl.a.StopJoinPoolRequestAnnouncements()
}
// Send a stop signal if the channel is not nil
if bl.stopFetchUsersAfterJoinChan != nil {
close(bl.stopFetchUsersAfterJoinChan)
// Reset the channel to nil to avoid closing a closed channel
bl.stopFetchUsersAfterJoinChan = nil
}
return b, nil
}
}
Expand Down
29 changes: 27 additions & 2 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type (
fetchInterval time.Duration
fetchCheckTicker *time.Ticker
fetchCheckStop chan struct{}

stopFetchUsersAfterJoinChan chan struct{}
}
authorizationRequest struct {
Subject peer.ID `json:"id"`
Expand Down Expand Up @@ -583,8 +585,26 @@ func contains(slice []string, str string) bool {
return false
}

func (bl *FxBlockchain) cleanUnwantedPeers(keepPeers []peer.ID) {
// Convert the keepPeers slice to a map for efficient existence checks
keepMap := make(map[peer.ID]bool)
for _, p := range keepPeers {
keepMap[p] = true
}

// Retrieve all peers from the AddrBook
allPeers := bl.h.Peerstore().PeersWithAddrs()

// Iterate over all peers and clear addresses for those not in keepMap
for _, peerID := range allPeers {
if _, found := keepMap[peerID]; !found {
bl.h.Peerstore().ClearAddrs(peerID)
}
}
}
func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicString string, initiate bool) error {
// Update last fetch time on successful fetch
var keepPeers []peer.ID
bl.lastFetchTime = time.Now()

// Convert topic from string to int
Expand Down Expand Up @@ -643,7 +663,8 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
}

// Add the relay addresses to the peerstore for the peer ID
bl.h.Peerstore().AddAddrs(pid, addrs, peerstore.PermanentAddrTTL)
bl.h.Peerstore().AddAddrs(pid, addrs, peerstore.ConnectedAddrTTL)
keepPeers = append(keepPeers, pid)
}
}
}
Expand Down Expand Up @@ -676,6 +697,7 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
}

if initiate {
keepPeers = append(keepPeers, pid)
//Check if self status is in pool request, start ping server and announce join request
if user.PeerID == bl.h.ID().String() {
log.Debugw("Found self peerID", user.PeerID)
Expand Down Expand Up @@ -742,10 +764,13 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
}

// Add the relay addresses to the peerstore for the peer ID
bl.h.Peerstore().AddAddrs(pid, addrs, peerstore.PermanentAddrTTL)
bl.h.Peerstore().AddAddrs(pid, addrs, peerstore.ConnectedAddrTTL)
}
}
bl.membersLock.Unlock()
if initiate {
bl.cleanUnwantedPeers(keepPeers)
}

return nil
}
Expand Down
8 changes: 8 additions & 0 deletions blockchain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type (
maxPingTime int
topicName string
relays []string
updatePoolName func(string) error
fetchFrequency time.Duration //Hours that it should update the list of pool users and pool requests if not called through pubsub
}
)
Expand Down Expand Up @@ -104,6 +105,13 @@ func WithTopicName(n string) Option {
}
}

func WithUpdatePoolName(updatePoolName func(string) error) Option {
return func(o *options) error {
o.updatePoolName = updatePoolName
return nil
}
}

// WithStoreDir sets a the store directory we are using for datastore
// Required.
func WithRelays(r []string) Option {
Expand Down
1 change: 1 addition & 0 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func New(o ...Option) (*Blox, error) {
blockchain.WithWg(&p.wg),
blockchain.WithFetchFrequency(3),
blockchain.WithTopicName(p.topicName),
blockchain.WithUpdatePoolName(p.updatePoolName),
blockchain.WithRelays(p.relays))
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion blox/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func Example_poolDiscoverPeersViaPubSub() {
panic(err)
}
defer n2.Shutdown(ctx)
fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String())
fmt.Printf("Instantiated node in pool %s with ID: %s and addr %v\n", poolName, h2.ID().String(), h2.Addrs())

// Instantiate the third node in the pool
pid3, _, err := crypto.GenerateECDSAKeyPair(rng)
Expand Down
13 changes: 11 additions & 2 deletions blox/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import (
)

type (
Option func(*options) error
options struct {
Option func(*options) error
PoolNameUpdater func(string) error
options struct {
h host.Host
name string
topicName string
Expand All @@ -32,6 +33,7 @@ type (
authorizedPeers []peer.ID
exchangeOpts []exchange.Option
relays []string
updatePoolName PoolNameUpdater
}
)

Expand Down Expand Up @@ -161,3 +163,10 @@ func WithRelays(r []string) Option {
return nil
}
}

func WithUpdatePoolName(updatePoolName PoolNameUpdater) Option {
return func(o *options) error {
o.updatePoolName = updatePoolName
return nil
}
}
32 changes: 32 additions & 0 deletions cmd/blox/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,37 @@ func updateConfig(p []peer.ID) error {
return nil
}

func updatePoolName(newPoolName string) error {
// Load existing config file
configData, err := os.ReadFile(app.configPath)
if err != nil {
return err
}

// Parse the existing config file
if err := yaml.Unmarshal(configData, &app.config); err != nil {
return err
}

// Update the pool name
app.config.PoolName = newPoolName

logger.Infof("Updated pool name to: %s", app.config.PoolName)

// Marshal the updated config back to YAML
configData, err = yaml.Marshal(app.config)
if err != nil {
return err
}

// Write the updated config back to the file
if err := os.WriteFile(app.configPath, configData, 0700); err != nil {
return err
}

return nil
}

func action(ctx *cli.Context) error {
authorizer, err := peer.Decode(app.config.Authorizer)
if err != nil {
Expand Down Expand Up @@ -392,6 +423,7 @@ func action(ctx *cli.Context) error {
blox.WithPoolName(app.config.PoolName),
blox.WithStoreDir(app.config.StoreDir),
blox.WithRelays(app.config.StaticRelays),
blox.WithUpdatePoolName(updatePoolName),
blox.WithExchangeOpts(
exchange.WithUpdateConfig(updateConfig),
exchange.WithAuthorizer(authorizer),
Expand Down

0 comments on commit aa67f9e

Please sign in to comment.