Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manifest upload #184

Merged
merged 5 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 71 additions & 53 deletions announcements/announcements.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,70 +136,84 @@ func (an *FxAnnouncements) processAnnouncement(ctx context.Context, from peer.ID
}

func (an *FxAnnouncements) HandleAnnouncements(ctx context.Context) {
log.Debug("called wg.Done in HandleAnnouncements")
defer an.wg.Done()
log.Debug("Starting to handle announcements")
if an.wg != nil {
log.Debug("called wg.Done in handle announcements")
defer an.wg.Done()
}
defer log.Debug("HandleAnnouncements go routine is ending")

for {
msg, err := an.sub.Next(ctx)
if msg != nil {
log.Debugw("HandleAnnouncements", "msg.Topic", msg.Topic)
if err != nil {
log.Debugw("HandleAnnouncements Error", err)
}
switch {
case ctx.Err() != nil || err == pubsub.ErrSubscriptionCancelled || err == pubsub.ErrTopicClosed:
log.Info("stopped handling announcements")
return
case err != nil:
log.Errorw("failed to get the next announcement", "err", err)
continue
}
from, err := peer.IDFromBytes(msg.From)
select {
case <-ctx.Done():
log.Info("Context cancelled, stopping handle announcements")
return
default:
// Directly retrieve the next message
msg, err := an.sub.Next(ctx)
if err != nil {
log.Errorw("failed to decode announcement sender", "err", err)
continue
}
if from == an.h.ID() {
log.Debug("ignoring announcement from self")
continue
}
a := &Announcement{}
if err = a.UnmarshalBinary(msg.Data); err != nil {
log.Errorw("failed to decode announcement data", "err", err)
continue
if err == pubsub.ErrSubscriptionCancelled || err == pubsub.ErrTopicClosed {
log.Info("Subscription cancelled or topic closed, stopping handle announcements")
} else {
log.Errorw("Error while getting next announcement", "err", err)
}
return // Exit the loop in case of error
}

addrs, err := a.GetAddrs()
if err != nil {
log.Errorw("failed to decode announcement addrs", "err", err)
continue
}
// Process the message
if msg != nil {
from, err := peer.IDFromBytes(msg.From)
if err != nil {
log.Errorw("Failed to decode announcement sender", "err", err)
continue
}
if from == an.h.ID() {
log.Debug("Ignoring announcement from self")
continue
}
a := &Announcement{}
if err = a.UnmarshalBinary(msg.Data); err != nil {
log.Errorw("Failed to decode announcement data", "err", err)
continue
}

log.Debugw("received announcement", "from", from, "self", an.h.ID(), "announcement", a)
log.Debug("processAnnouncement call")
if msg.Topic != nil {
err = an.processAnnouncement(ctx, from, a.Type, addrs, *msg.Topic)
addrs, err := a.GetAddrs()
if err != nil {
log.Errorw("failed to process announcement", "err", err)
log.Errorw("Failed to decode announcement addrs", "err", err)
continue
}

log.Debugw("Received announcement", "from", from, "self", an.h.ID(), "announcement", a)
log.Debug("processAnnouncement call")
if msg.Topic != nil {
err = an.processAnnouncement(ctx, from, a.Type, addrs, *msg.Topic)
if err != nil {
log.Errorw("failed to process announcement", "err", err)
continue
}
} else {
log.Debug("Topic is nil")
continue
}
} else {
log.Debug("Topic is nil")
continue
}
} else {
continue
}
}
}

func (an *FxAnnouncements) AnnounceIExistPeriodically(ctx context.Context) {
log.Debug("called wg.Done in AnnounceIExistPeriodically")
defer an.wg.Done()
if an.wg != nil {
log.Debug("called wg.Done in AnnounceIExistPeriodically")
defer an.wg.Done()
}
defer log.Debug("AnnounceIExistPeriodically go routine is ending")

ticker := time.NewTicker(an.announceInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Info("stopped making periodic iexist announcements")
log.Info("Context cancelled, stopped making periodic iexist announcements")
return
case t := <-ticker.C:
a := &Announcement{
Expand All @@ -209,32 +223,35 @@ func (an *FxAnnouncements) AnnounceIExistPeriodically(ctx context.Context) {
a.SetAddrs(an.h.Addrs()...)
b, err := a.MarshalBinary()
if err != nil {
log.Errorw("failed to encode iexist announcement", "err", err)
log.Errorw("Failed to encode iexist announcement", "err", err)
continue
}
if err := an.topic.Publish(ctx, b); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
log.Info("stopped making periodic iexist announcements")
log.Info("Context cancelled or deadline exceeded, stopped making periodic iexist announcements")
return
}
if errors.Is(err, pubsub.ErrTopicClosed) || errors.Is(err, pubsub.ErrSubscriptionCancelled) {
log.Info("stopped making periodic iexist announcements as topic is closed or subscription cancelled")
log.Info("Topic closed or subscription cancelled, stopped making periodic iexist announcements")
return
}
log.Errorw("failed to publish iexist announcement", "err", err)
log.Errorw("Failed to publish iexist announcement", "err", err)
continue
}
log.Infow("announced iexist message", "from", an.h.ID(), "announcement", a, "time", t)
log.Infow("Announced iexist message", "from", an.h.ID(), "announcement", a, "time", t)
}
}
}

func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Context) {
log.Debugw("called wg.Done in AnnounceJoinPoolRequestPeriodically pool join request", "peer", an.h.ID())
if an.wg != nil {
log.Debugw("called wg.Done in AnnounceJoinPoolRequestPeriodically pool join request", "peer", an.h.ID())
defer an.wg.Done()
}
defer log.Debug("AnnounceJoinPoolRequestPeriodically go routine is ending")
log.Debugw("Starting AnnounceJoinPoolRequestPeriodically pool join request", "peer", an.h.ID())
log.Debugw("peerlist before AnnounceJoinPoolRequestPeriodically pool join request", "on", an.h.ID(), "peerlist", an.topic.ListPeers())

defer an.wg.Done()
an.announcingJoinPoolMutex.Lock()
if an.announcingJoinPoolRequest {
an.announcingJoinPoolMutex.Unlock()
Expand All @@ -249,6 +266,7 @@ func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Conte
an.announcingJoinPoolMutex.Unlock()
}()
ticker := time.NewTicker(an.announceInterval)
defer ticker.Stop()
for {
log.Debugw("inside ticker for join pool request", "peer", an.h.ID())
select {
Expand Down
8 changes: 7 additions & 1 deletion announcements/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ type (
)

func newOptions(o ...Option) (*options, error) {
var opts options
opts := options{
announceInterval: 5 * time.Minute, // Example: 5 minutes
timeout: 30, // Example: 30 seconds
topicName: "0", // Example: default topic name
wg: nil,
relays: []string{}, // Example: empty slice by default
}
for _, apply := range o {
if err := apply(&opts); err != nil {
return nil, err
Expand Down
20 changes: 14 additions & 6 deletions blockchain/bl_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,17 @@ func (bl *FxBlockchain) PoolJoin(ctx context.Context, to peer.ID, r PoolJoinRequ
}
bl.stopFetchUsersAfterJoinChan = make(chan struct{})
ticker := time.NewTicker(bl.fetchInterval * time.Minute)
log.Debug("called wg.Add in PoolJoin ticker")
bl.wg.Add(1) // Increment the wait group counter
defer ticker.Stop()
if bl.wg != nil {
log.Debug("called wg.Add in PoolJoin ticker")
bl.wg.Add(1) // Increment the wait group counter
}
go func() {
log.Debug("called wg.Done in PoolJoin ticker")
defer bl.wg.Done() // Decrement the wait group counter when the goroutine completes
if bl.wg != nil {
log.Debug("called wg.Done in PoolJoin ticker")
defer bl.wg.Done() // Decrement the wait group counter when the goroutine completes
}
defer log.Debug("PoolJoin go routine is ending")
defer ticker.Stop() // Ensure the ticker is stopped when the goroutine exits

for {
Expand All @@ -134,8 +140,10 @@ func (bl *FxBlockchain) PoolJoin(ctx context.Context, to peer.ID, r PoolJoinRequ
}
}()
if bl.a != nil {
log.Debug("called wg.Add in PoolJoin ticker2")
bl.wg.Add(1)
if bl.wg != nil {
log.Debug("called wg.Add in PoolJoin ticker2")
bl.wg.Add(1)
}
go bl.a.AnnounceJoinPoolRequestPeriodically(ctx)
}
return b, nil
Expand Down
33 changes: 23 additions & 10 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,18 @@ func NewFxBlockchain(h host.Host, p *ping.FxPing, a *announcements.FxAnnouncemen
func (bl *FxBlockchain) startFetchCheck() {
bl.fetchCheckTicker = time.NewTicker(1 * time.Hour) // check every hour, adjust as needed

// Increment the WaitGroup counter before starting the goroutine
log.Debug("called wg.Add in blockchain startFetchCheck")
bl.wg.Add(1)
if bl.wg != nil {
// Increment the WaitGroup counter before starting the goroutine
log.Debug("called wg.Add in blockchain startFetchCheck")
bl.wg.Add(1)
}

go func() {
log.Debug("called wg.Done in startFetchCheck ticker")
defer bl.wg.Done() // Decrement the counter when the goroutine completes
if bl.wg != nil {
log.Debug("called wg.Done in startFetchCheck ticker")
defer bl.wg.Done() // Decrement the counter when the goroutine completes
}
defer log.Debug("startFetchCheck go routine is ending")

for {
select {
Expand All @@ -169,11 +174,16 @@ func (bl *FxBlockchain) Start(ctx context.Context) error {
return err
}
bl.s.Handler = http.HandlerFunc(bl.serve)
log.Debug("called wg.Add in blockchain start")
bl.wg.Add(1)
if bl.wg != nil {
log.Debug("called wg.Add in blockchain start")
bl.wg.Add(1)
}
go func() {
log.Debug("called wg.Done in Start blockchain")
defer bl.wg.Done()
if bl.wg != nil {
log.Debug("called wg.Done in Start blockchain")
defer bl.wg.Done()
}
defer log.Debug("Start blockchain go routine is ending")
bl.s.Serve(listen)
}()
return nil
Expand Down Expand Up @@ -730,7 +740,10 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
log.Errorw("Error when starting the Ping Server", "PeerID", user.PeerID, "err", err)
} else {
log.Debugw("Found self peerID and ran Ping Server and announcing pooljoinrequest now", "peer", user.PeerID)
bl.wg.Add(1)
if bl.wg != nil {
log.Debug("Called wg.Add in somewhere before AnnounceJoinPoolRequestPeriodically")
bl.wg.Add(1)
}
go bl.a.AnnounceJoinPoolRequestPeriodically(ctx)
}
} else {
Expand Down
35 changes: 24 additions & 11 deletions blockchain/blox_test.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,39 @@
package blockchain

import (
"context"
"math/rand"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
)

func TestBloxFreeSpaceSanity(t *testing.T) {
/*bl, err := NewFxBlockchain(nil, NewSimpleKeyStorer())
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
rng := rand.New(rand.NewSource(42))

// Instantiate the first node in the pool
pid1, _, err := crypto.GenerateECDSAKeyPair(rng)
if err != nil {
t.Errorf("creating blockchain instance: %v", err)
panic(err)
}
resp, err := bl.BloxFreeSpace(context.Background(), "")
h1, err := libp2p.New(libp2p.Identity(pid1))
if err != nil {
t.Errorf("calling blockchain bloxFreeSpace api: %v", err)
panic(err)
}
out := &BloxFreeSpaceResponse{}
err = json.Unmarshal(resp, out)
bl, err := NewFxBlockchain(h1, nil, nil,
NewSimpleKeyStorer(""),
WithAuthorizer(h1.ID()),
WithAllowTransientConnection(true),
WithBlockchainEndPoint("127.0.0.1:4000"),
WithTimeout(30),
)
if err != nil {
t.Errorf("unmarshal bloxFreeSpace api response: %v", err)
t.Errorf("creating blockchain instance: %v", err)
}
bl.Start(ctx)

// Sanity check
if out.Avail+out.Used-out.Size > 0.1 {
t.Error("insane result from the bloxFreeSpace api")
}*/
}
18 changes: 17 additions & 1 deletion blockchain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,24 @@ type (
}
)

func defaultUpdatePoolName(newPoolName string) error {
return nil
}
func newOptions(o ...Option) (*options, error) {
var opts options
opts := options{
authorizer: "", // replace with an appropriate default peer.ID
authorizedPeers: []peer.ID{}, // default to an empty slice
allowTransientConnection: true, // or false, as per your default
blockchainEndPoint: "127.0.0.1:4000", // default endpoint
timeout: 30, // default timeout in seconds
wg: nil, // initialized WaitGroup
minPingSuccessCount: 3, // default minimum success count
maxPingTime: 10, // default maximum ping time in seconds
topicName: "0", // default topic name
relays: []string{}, // default to an empty slice
updatePoolName: defaultUpdatePoolName, // set a default function or leave nil
fetchFrequency: time.Hour * 1, // default frequency, e.g., 1 hour
}
for _, apply := range o {
if err := apply(&opts); err != nil {
return nil, err
Expand Down
Loading