Skip to content

Commit

Permalink
added announec to joinpool
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Nov 6, 2023
1 parent fa2a6cb commit 6d81b64
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 45 deletions.
12 changes: 6 additions & 6 deletions announcements/announcements.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

_ "embed"

"github.com/functionland/go-fula/blockchain"
"github.com/functionland/go-fula/common"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/node/bindnode"
Expand Down Expand Up @@ -123,7 +123,7 @@ func (an *FxAnnouncements) AnnounceIExistPeriodically(ctx context.Context) {
return
case t := <-ticker.C:
a := &Announcement{
Version: an.version,
Version: common.Version0,
Type: IExistAnnouncementType,
}
a.SetAddrs(an.h.Addrs()...)
Expand Down Expand Up @@ -159,7 +159,7 @@ func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Conte
return
case t := <-ticker.C:
a := &Announcement{
Version: an.version,
Version: common.Version0,
Type: PoolJoinRequestAnnouncementType,
}
a.SetAddrs(an.h.Addrs()...)
Expand All @@ -185,7 +185,7 @@ func (an *FxAnnouncements) AnnounceJoinPoolRequestPeriodically(ctx context.Conte
}
}

func (an *FxAnnouncements) ValidateAnnouncement(ctx context.Context, id peer.ID, msg *pubsub.Message, status blockchain.MemberStatus, exists bool) bool {
func (an *FxAnnouncements) ValidateAnnouncement(ctx context.Context, id peer.ID, msg *pubsub.Message, status common.MemberStatus, exists bool) bool {
a := &Announcement{}
if err := a.UnmarshalBinary(msg.Data); err != nil {
log.Errorw("failed to unmarshal announcement data", "err", err)
Expand All @@ -199,12 +199,12 @@ func (an *FxAnnouncements) ValidateAnnouncement(ctx context.Context, id peer.ID,
log.Errorw("peer is not recognized", "peer", id)
return false
}
if status != blockchain.Approved {
if status != common.Approved {
log.Errorw("peer is not an approved member", "peer", id)
return false
}
case PoolJoinRequestAnnouncementType:
if status != blockchain.Unknown {
if status != common.Unknown {
log.Errorw("peer is no longer permitted to send this message type", "peer", id)
return false
}
Expand Down
4 changes: 2 additions & 2 deletions announcements/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"context"
_ "embed"

"github.com/functionland/go-fula/blockchain"
"github.com/functionland/go-fula/common"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/node/bindnode"
"github.com/ipld/go-ipld-prime/schema"
Expand All @@ -18,7 +18,7 @@ type Announcements interface {
HandleAnnouncements(context.Context)
AnnounceIExistPeriodically(context.Context)
AnnounceJoinPoolRequestPeriodically(context.Context)
ValidateAnnouncement(context.Context, peer.ID, *pubsub.Message, blockchain.MemberStatus, bool) bool
ValidateAnnouncement(context.Context, peer.ID, *pubsub.Message, common.MemberStatus, bool) bool
Shutdown(context.Context) error
}

Expand Down
8 changes: 0 additions & 8 deletions announcements/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ type (
announceInterval time.Duration
timeout int
topicName string
version string
wg *sync.WaitGroup
}
)
Expand Down Expand Up @@ -47,13 +46,6 @@ func WithTopicName(n string) Option {
}
}

func WithVersion(v string) Option {
return func(o *options) error {
o.version = v
return nil
}
}

func WithWg(wg *sync.WaitGroup) Option {
return func(o *options) error {
o.wg = wg
Expand Down
2 changes: 2 additions & 0 deletions blockchain/bl_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func (bl *FxBlockchain) PoolJoin(ctx context.Context, to peer.ID, r PoolJoinRequ
if err != nil {
return b, err
}
bl.wg.Add(1)
go bl.a.AnnounceJoinPoolRequestPeriodically(ctx)
return b, nil
}
}
Expand Down
31 changes: 13 additions & 18 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"sync"
"time"

"github.com/functionland/go-fula/announcements"
"github.com/functionland/go-fula/common"
"github.com/functionland/go-fula/ping"
wifi "github.com/functionland/go-fula/wap/pkg/wifi"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -40,15 +42,6 @@ var (
log = logging.Logger("fula/blockchain")
)

// MemberStatus represents the approval status of a peer
type MemberStatus int

const (
Unknown MemberStatus = iota // iota provides automatic enumeration. Here, Pending = 0
Pending // Pending = 1
Approved // Approved = 2
)

type Config struct {
StoreDir string `yaml:"storeDir"`
// other fields
Expand All @@ -70,16 +63,17 @@ type (
keyStorer KeyStorer

p *ping.FxPing
a *announcements.FxAnnouncements

members map[peer.ID]MemberStatus
members map[peer.ID]common.MemberStatus
}
authorizationRequest struct {
Subject peer.ID `json:"id"`
Allow bool `json:"allow"`
}
)

func NewFxBlockchain(h host.Host, p *ping.FxPing, keyStorer KeyStorer, o ...Option) (*FxBlockchain, error) {
func NewFxBlockchain(h host.Host, p *ping.FxPing, a *announcements.FxAnnouncements, keyStorer KeyStorer, o ...Option) (*FxBlockchain, error) {
opts, err := newOptions(o...)
if err != nil {
return nil, err
Expand All @@ -88,6 +82,7 @@ func NewFxBlockchain(h host.Host, p *ping.FxPing, keyStorer KeyStorer, o ...Opti
options: opts,
h: h,
p: p,
a: a,
s: &http.Server{},
c: &http.Client{
Transport: &http.Transport{
Expand Down Expand Up @@ -563,21 +558,21 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
}

// Determine the status based on pool_id and request_pool_id
var status MemberStatus
var status common.MemberStatus
if user.PoolID != nil && *user.PoolID == topic {
status = Approved
status = common.Approved
} else if user.RequestPoolID != nil && *user.RequestPoolID == topic {
status = Pending
status = common.Pending
} else {
// Skip users that do not match the topic criteria
continue
}

existingStatus, exists := bl.members[pid]
if exists {
if existingStatus == Pending && status == Approved {
if existingStatus == common.Pending && status == common.Approved {
// If the user is already pending and now approved, update to ApprovedOrPending
bl.members[pid] = Approved
bl.members[pid] = common.Approved
}
// If the user status is the same as before, there's no need to update
} else {
Expand All @@ -589,11 +584,11 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri
return nil
}

func (bl *FxBlockchain) GetMemberStatus(id peer.ID) (MemberStatus, bool) {
func (bl *FxBlockchain) GetMemberStatus(id peer.ID) (common.MemberStatus, bool) {
status, exists := bl.members[id]
if !exists {
// If the peer.ID doesn't exist in the members map, we treat it as an error case.
return MemberStatus(0), false
return common.MemberStatus(0), false
}
return status, true
}
10 changes: 10 additions & 0 deletions blockchain/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package blockchain

import (
"sync"

"github.com/libp2p/go-libp2p/core/peer"
)

Expand All @@ -12,6 +14,7 @@ type (
allowTransientConnection bool
blockchainEndPoint string
timeout int
wg *sync.WaitGroup
}
)

Expand Down Expand Up @@ -59,3 +62,10 @@ func WithTimeout(to int) Option {
return nil
}
}

func WithWg(wg *sync.WaitGroup) Option {
return func(o *options) error {
o.wg = wg
return nil
}
}
9 changes: 4 additions & 5 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (

var log = logging.Logger("fula/blox")

const Version0 = "0"

type (
Blox struct {
ctx context.Context
Expand Down Expand Up @@ -61,6 +59,7 @@ func New(o ...Option) (*Blox, error) {
}
p.pn, err = ping.NewFxPing(p.h,
ping.WithAllowTransientConnection(true),
ping.WithWg(&p.wg),
ping.WithTimeout(3))
if err != nil {
return nil, err
Expand All @@ -70,18 +69,18 @@ func New(o ...Option) (*Blox, error) {
announcements.WithAnnounceInterval(5),
announcements.WithTimeout(3),
announcements.WithTopicName(p.topicName),
announcements.WithVersion(Version0),
announcements.WithWg(&p.wg))
if err != nil {
return nil, err
}

p.bl, err = blockchain.NewFxBlockchain(p.h, p.pn,
p.bl, err = blockchain.NewFxBlockchain(p.h, p.pn, p.an,
blockchain.NewSimpleKeyStorer(""),
blockchain.WithAuthorizer(authorizer),
blockchain.WithAuthorizedPeers(authorizedPeers),
blockchain.WithBlockchainEndPoint("127.0.0.1:4000"),
blockchain.WithTimeout(30))
blockchain.WithTimeout(30),
blockchain.WithWg(&p.wg))
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions blox/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"net/http"

"github.com/functionland/go-fula/common"
wifi "github.com/functionland/go-fula/wap/pkg/wifi"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -186,9 +187,9 @@ func (p *Blox) ServeIpfsRpc() http.Handler {
PublicKey string `json:"PublicKey"`
}{
Addresses: addressStrings,
AgentVersion: Version0,
AgentVersion: common.Version0,
ID: p.h.ID().String(),
ProtocolVersion: "fx_exchange/" + Version0,
ProtocolVersion: "fx_exchange/" + common.Version0,
Protocols: []string{"fx_exchange"},
PublicKey: pubKeyBase64,
}
Expand Down Expand Up @@ -250,7 +251,7 @@ func (p *Blox) ServeIpfsRpc() http.Handler {
RepoSize: uint64(repoSize),
StorageMax: uint64(storage.Size),
},
Version: "fx-repo@" + Version0,
Version: "fx-repo@" + common.Version0,
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
log.Errorw("failed to encode response to stats repo", "err", err)
Expand Down
12 changes: 12 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package common

const Version0 = "0"

// MemberStatus represents the approval status of a peer
type MemberStatus int

const (
Unknown MemberStatus = iota // iota provides automatic enumeration. Here, Pending = 0
Pending // Pending = 1
Approved // Approved = 2
)
10 changes: 10 additions & 0 deletions ping/options.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package ping

import "sync"

type (
Option func(*options) error
options struct {
allowTransientConnection bool
timeout int
wg *sync.WaitGroup
}
)

Expand All @@ -31,3 +34,10 @@ func WithTimeout(to int) Option {
return nil
}
}

func WithWg(wg *sync.WaitGroup) Option {
return func(o *options) error {
o.wg = wg
return nil
}
}
13 changes: 10 additions & 3 deletions ping/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,24 @@ func NewFxPing(h host.Host, o ...Option) (*FxPing, error) {

func (pn *FxPing) Start(ctx context.Context) error {
pn.mu.Lock()
defer pn.mu.Unlock()
if pn.started {
pn.mu.Unlock()
return errors.New("ping already started")
}
pn.started = true
pn.mu.Unlock()

listen, err := gostream.Listen(pn.h, FxPingProtocolID)
if err != nil {
return err
}
pn.s.Handler = http.HandlerFunc(pn.serve)
go func() { pn.s.Serve(listen) }()
pn.started = true
pn.wg.Add(1)
go func() {
defer pn.wg.Done()
pn.s.Serve(listen)
}()

return nil
}

Expand Down

0 comments on commit 6d81b64

Please sign in to comment.