Skip to content

Commit

Permalink
Auto-run FetchUsersAndPopulateSets if not run for the past x hours fr…
Browse files Browse the repository at this point in the history
…om pubsub

This is the failsafe to ensure that we at least run this method every x hours even if for any reason pubsubs are not received
  • Loading branch information
ehsan6sha committed Nov 7, 2023
1 parent 98a7cb8 commit 049577d
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 3 deletions.
41 changes: 40 additions & 1 deletion blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ type (

members map[peer.ID]common.MemberStatus
membersLock sync.RWMutex

lastFetchTime time.Time
fetchInterval time.Duration
fetchCheckTicker *time.Ticker
fetchCheckStop chan struct{}
}
authorizationRequest struct {
Subject peer.ID `json:"id"`
Expand Down Expand Up @@ -113,16 +118,44 @@ func NewFxBlockchain(h host.Host, p *ping.FxPing, a *announcements.FxAnnouncemen
return new(http.Request)
},
},
keyStorer: keyStorer,
keyStorer: keyStorer,
lastFetchTime: time.Now(),
fetchInterval: opts.fetchFrequency * time.Hour,
fetchCheckStop: make(chan struct{}),
}
if bl.authorizer != "" {
if err := bl.SetAuth(context.Background(), h.ID(), bl.authorizer, true); err != nil {
return nil, err
}
}
bl.startFetchCheck()
return bl, nil
}

func (bl *FxBlockchain) startFetchCheck() {
bl.fetchCheckTicker = time.NewTicker(1 * time.Hour) // check every hour, adjust as needed

// Increment the WaitGroup counter before starting the goroutine
bl.wg.Add(1)

go func() {
defer bl.wg.Done() // Decrement the counter when the goroutine completes

for {
select {
case <-bl.fetchCheckTicker.C:
if time.Since(bl.lastFetchTime) >= bl.fetchInterval {
bl.FetchUsersAndPopulateSets(context.Background(), "someTopicString")
bl.lastFetchTime = time.Now() // update last fetch time
}
case <-bl.fetchCheckStop:
bl.fetchCheckTicker.Stop()
return
}
}
}()
}

func (bl *FxBlockchain) Start(ctx context.Context) error {
listen, err := gostream.Listen(bl.h, FxBlockchainProtocolID)
if err != nil {
Expand Down Expand Up @@ -523,10 +556,15 @@ func (bl *FxBlockchain) authorized(pid peer.ID, action string) bool {
func (bl *FxBlockchain) Shutdown(ctx context.Context) error {
bl.c.CloseIdleConnections()
bl.ch.CloseIdleConnections()
close(bl.fetchCheckStop)
bl.wg.Wait()
return bl.s.Shutdown(ctx)
}

func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicString string) error {
// Update last fetch time on successful fetch
bl.lastFetchTime = time.Now()

// Convert topic from string to int
topic, err := strconv.Atoi(topicString)
if err != nil {
Expand Down Expand Up @@ -559,6 +597,7 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
return err
}

//Check if self status is in pool request, start ping server and announce join request
if user.PeerID == bl.h.ID().String() {
log.Debugw("Found self peerID", user.PeerID)
if user.RequestPoolID != nil {
Expand Down
17 changes: 17 additions & 0 deletions blockchain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockchain

import (
"sync"
"time"

"github.com/libp2p/go-libp2p/core/peer"
)
Expand All @@ -17,6 +18,8 @@ type (
wg *sync.WaitGroup
minPingSuccessRate int
maxPingTime int
topicName string
fetchFrequency time.Duration //Hours that it should update the list of pool users and pool requests if not called through pubsub
}
)

Expand Down Expand Up @@ -85,3 +88,17 @@ func WithMaxPingRate(t int) Option {
return nil
}
}

func WithFetchFrequency(t time.Duration) Option {
return func(o *options) error {
o.fetchFrequency = t
return nil
}
}

func WithTopicName(n string) Option {
return func(o *options) error {
o.topicName = n
return nil
}
}
4 changes: 3 additions & 1 deletion blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func New(o ...Option) (*Blox, error) {
blockchain.WithAuthorizedPeers(authorizedPeers),
blockchain.WithBlockchainEndPoint("127.0.0.1:4000"),
blockchain.WithTimeout(30),
blockchain.WithWg(&p.wg))
blockchain.WithWg(&p.wg),
blockchain.WithFetchFrequency(3),
blockchain.WithTopicName(p.topicName))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/blox/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func init() {
Name: "poolName",
Destination: &app.config.PoolName,
EnvVars: []string{"FULA_BLOX_POOL_NAME"},
Value: "my-pool",
Value: "",
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: "logLevel",
Expand Down

0 comments on commit 049577d

Please sign in to comment.