Skip to content

Commit

Permalink
add event subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
nkitlabs committed Jun 6, 2024
1 parent 4cb031e commit f60c0c8
Show file tree
Hide file tree
Showing 13 changed files with 2,334 additions and 16 deletions.
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ type Client interface {
GetBalance(account sdk.AccAddress) (uint64, error)
SendRequest(msg sdk.Msg, gasPrice float64, key keyring.Record) (*sdk.TxResponse, error)
GetRequestProofByID(reqID uint64) ([]byte, error)
Subscribe(name, query string) (*SubscriptionInfo, error)
Unsubscribe(name string) error
}
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/bandprotocol/go-band-sdk/utils v1.0.1
github.com/cometbft/cometbft v0.37.5
github.com/cosmos/cosmos-sdk v0.47.11
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.9.0
go.uber.org/mock v0.4.0
)
Expand Down Expand Up @@ -137,7 +138,6 @@ require (
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/petermattis/goid v0.0.0-20230904192822-1876fd5063bc // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.17.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
Expand Down
29 changes: 29 additions & 0 deletions client/mock/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

143 changes: 129 additions & 14 deletions client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"time"

bandtsstypes "github.com/bandprotocol/chain/v2/x/bandtss/types"
Expand All @@ -25,12 +26,12 @@ var _ Client = &RPC{}

// RPC implements Clients by using multiple RPC nodes
type RPC struct {
ctx client.Context
txFactory tx.Factory
nodes []*rpchttp.HTTP
keyring keyring.Keyring

logger logging.Logger
ctx client.Context
txFactory tx.Factory
nodes []*rpchttp.HTTP
keyring keyring.Keyring
subscriptionInfos *sync.Map
logger logging.Logger
}

// NewRPC creates new RPC client
Expand All @@ -52,11 +53,12 @@ func NewRPC(
}

return &RPC{
ctx: NewClientCtx(chainID),
txFactory: createTxFactory(chainID, gasPrice, keyring),
nodes: nodes,
keyring: keyring,
logger: logger,
ctx: NewClientCtx(chainID),
txFactory: createTxFactory(chainID, gasPrice, keyring),
nodes: nodes,
keyring: keyring,
subscriptionInfos: &sync.Map{},
logger: logger,
}, nil
}

Expand Down Expand Up @@ -300,14 +302,24 @@ func (c RPC) GetSignature(signingID uint64) (*SigningResult, error) {
}(node)
}

// check if the signature is ready to be used. If every node returns waiting status, return one.
// If every node fails to query a result return an error.
var res *SigningResult
for range c.nodes {
select {
case res := <-resultCh:
return res, nil
case res = <-resultCh:
if res.IsReady() {
return res, nil
}
case <-failCh:
}
}
return nil, fmt.Errorf("failed to get result from all endpoints")

if res == nil {
return nil, fmt.Errorf("failed to get result from all endpoints")
}

return res, nil
}

func (c RPC) GetBalance(_ sdk.AccAddress) (uint64, error) {
Expand Down Expand Up @@ -496,3 +508,106 @@ func (c RPC) blockSearch(query string, page *int, perPage *int, orderBy string)

return nil, fmt.Errorf("failed to block search from all endpoints")
}

// Subscribe subscribes to the given event name and query from multiple clients. If it cannot
// subscribe to a node within the given timeout, it will drop that node from the result.
// If no nodes can be subscribed, it will return an error.
func (c RPC) Subscribe(name, query string) (*SubscriptionInfo, error) {
chInfos := make([]ChannelInfo, 0, len(c.nodes))
queue := make(chan ChannelInfo, 1)
var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// subscribe to all nodes within a given timeout
wg.Add(len(c.nodes))
for _, node := range c.nodes {
go func(node *rpchttp.HTTP) {
if !node.IsRunning() {
c.logger.Debug("Subscribe", "start the node %s", node.Remote())
if err := node.Start(); err != nil {
c.logger.Warning("Subscribe", "Failed to start %s with error %s", node.Remote(), err)
wg.Done()
return
}
}

eventCh, err := node.Subscribe(ctx, name, query, 1000)
if err != nil {
c.logger.Warning("Subscribe", "Failed to subscribe to %s with error %s", node.Remote(), err)
wg.Done()
return
}

queue <- ChannelInfo{
RemoteAddr: node.Remote(),
EventCh: eventCh,
}
}(node)
}

// add result into the list.
go func() {
for info := range queue {
chInfos = append(chInfos, info)
wg.Done()
}
}()

wg.Wait()
if len(chInfos) == 0 {
return nil, fmt.Errorf("failed to subscribe to all endpoints")
}

subInfo := SubscriptionInfo{
Name: name,
Query: query,
ChannelInfos: chInfos,
}

c.subscriptionInfos.Store(name, subInfo)
return &subInfo, nil
}

// Unsubscribe unsubscribes from the given event name from multiple clients.
func (c RPC) Unsubscribe(name string) error {
v, found := c.subscriptionInfos.Load(name)
if !found {
return fmt.Errorf("event is not subscribed; name %s", name)
}

info, ok := v.(SubscriptionInfo)
if !ok {
return fmt.Errorf("failed to cast to SubscriptionInfo; name %s", name)
}

remoteAddrs := make(map[string]struct{})
for _, chInfo := range info.ChannelInfos {
remoteAddrs[chInfo.RemoteAddr] = struct{}{}
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup
wg.Add(len(info.ChannelInfos))
for _, node := range c.nodes {
if _, found := remoteAddrs[node.Remote()]; !found {
continue
}

go func(node *rpchttp.HTTP, name, query string) {
defer wg.Done()
// if failed to unsubscribe, it means that the node object is not running or timeout.
// Log and continue the process.
if err := node.Unsubscribe(ctx, name, query); err != nil {
c.logger.Warning("Unsubscribe", "Failed to unsubscribe from %s with error %s", node.Remote(), err)
}
}(node, name, info.Query)
}

wg.Wait()
c.subscriptionInfos.Delete(name)
return nil
}
20 changes: 20 additions & 0 deletions client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
bandtsstypes "github.com/bandprotocol/chain/v2/x/bandtss/types"
oracletypes "github.com/bandprotocol/chain/v2/x/oracle/types"
tsstypes "github.com/bandprotocol/chain/v2/x/tss/types"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
)

// OracleResult stores the necessary information for an oracle query result.
Expand All @@ -18,6 +19,12 @@ type SigningResult struct {
ReplacingGroup SigningInfo
}

func (s SigningResult) IsReady() bool {
return s.CurrentGroup.Status == tsstypes.SIGNING_STATUS_SUCCESS &&
(s.ReplacingGroup.Status == tsstypes.SIGNING_STATUS_UNSPECIFIED ||
s.ReplacingGroup.Status == tsstypes.SIGNING_STATUS_SUCCESS)
}

// SigningInfo contains signing information.
type SigningInfo struct {
Message []byte
Expand All @@ -26,3 +33,16 @@ type SigningInfo struct {
PubKey []byte
PubNonce []byte
}

// ChannelInfo stores the necessary information for a channel receiving an event from a chain.
type ChannelInfo struct {
RemoteAddr string
EventCh <-chan ctypes.ResultEvent
}

// SubscriptionInfo stores the necessary information for a subscription channel for a specific events.
type SubscriptionInfo struct {
Name string
Query string
ChannelInfos []ChannelInfo
}
5 changes: 4 additions & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ require (
github.com/bandprotocol/chain/v2 v2.5.5-0.20240503145406-b6ed5a969335
github.com/bandprotocol/go-band-sdk/client v1.0.1
github.com/bandprotocol/go-band-sdk/requester v1.0.1
github.com/bandprotocol/go-band-sdk/subscriber v1.0.1
github.com/bandprotocol/go-band-sdk/utils v1.0.1
github.com/cometbft/cometbft v0.37.5
github.com/cosmos/cosmos-sdk v0.47.11
github.com/spf13/viper v1.18.2
)
Expand Down Expand Up @@ -48,7 +50,6 @@ require (
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/coinbase/rosetta-sdk-go/types v1.0.0 // indirect
github.com/cometbft/cometbft v0.37.5 // indirect
github.com/cometbft/cometbft-db v0.10.0 // indirect
github.com/confio/ics23/go v0.9.0 // indirect
github.com/cosmos/btcutil v1.0.5 // indirect
Expand Down Expand Up @@ -137,6 +138,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/oasisprotocol/oasis-core/go v0.2202.7 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 // indirect
Expand Down Expand Up @@ -206,6 +208,7 @@ replace (

github.com/bandprotocol/go-band-sdk/client => ../client
github.com/bandprotocol/go-band-sdk/requester => ../requester
github.com/bandprotocol/go-band-sdk/subscriber => ../subscriber
github.com/bandprotocol/go-band-sdk/utils => ../utils
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
// cosmos-sdk v0.47.11 is incompatible with gogoproto 1.4.10
Expand Down
2 changes: 2 additions & 0 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,8 @@ github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIw
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
Expand Down
Loading

0 comments on commit f60c0c8

Please sign in to comment.