Skip to content

Commit

Permalink
Create MultiNodeAdaptor
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanTinianov committed Dec 11, 2024
1 parent 825bb5f commit 262890e
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 136 deletions.
9 changes: 2 additions & 7 deletions pkg/solana/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"
mnCfg "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode/config"
solcfg "github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/fees"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/txm/mocks"
Expand Down Expand Up @@ -336,12 +335,8 @@ func TestSolanaChain_MultiNode_GetClient(t *testing.T) {

ch := solcfg.Chain{}
ch.SetDefaults()
mnCfg := mnCfg.MultiNodeConfig{
MultiNode: mnCfg.MultiNode{
Enabled: ptr(true),
},
}
mnCfg.SetDefaults()
mnCfg := solcfg.NewDefaultMultiNodeConfig()
mnCfg.MultiNode.Enabled = ptr(true)

cfg := &solcfg.TOMLConfig{
ChainID: ptr("devnet"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
mnCfg "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode/config"
)

type MultiNodeClient[RPC any, HEAD Head] struct {
// MultiNodeAdapter is used to integrate multinode into chain-specific clients
type MultiNodeAdapter[RPC any, HEAD Head] struct {
cfg *mnCfg.MultiNodeConfig
log logger.Logger
rpc *RPC
Expand All @@ -26,23 +27,23 @@ type MultiNodeClient[RPC any, HEAD Head] struct {
latestFinalizedBlock func(ctx context.Context, rpc *RPC) (HEAD, error)

// chStopInFlight can be closed to immediately cancel all in-flight requests on
// this RpcClient. Closing and replacing should be serialized through
// stateMu since it can happen on state transitions as well as RpcClient Close.
// this RpcMultiNodeAdapter. Closing and replacing should be serialized through
// stateMu since it can happen on state transitions as well as RpcMultiNodeAdapter Close.
chStopInFlight chan struct{}

chainInfoLock sync.RWMutex
// intercepted values seen by callers of the rpcClient excluding health check calls. Need to ensure MultiNode provides repeatable read guarantee
// intercepted values seen by callers of the rpcMultiNodeAdapter excluding health check calls. Need to ensure MultiNode provides repeatable read guarantee
highestUserObservations ChainInfo
// most recent chain info observed during current lifecycle (reseted on DisconnectAll)
latestChainInfo ChainInfo
}

func NewMultiNodeClient[RPC any, HEAD Head](
func NewMultiNodeAdapter[RPC any, HEAD Head](
cfg *mnCfg.MultiNodeConfig, rpc *RPC, ctxTimeout time.Duration, log logger.Logger,
latestBlock func(ctx context.Context, rpc *RPC) (HEAD, error),
latestFinalizedBlock func(ctx context.Context, rpc *RPC) (HEAD, error),
) (*MultiNodeClient[RPC, HEAD], error) {
return &MultiNodeClient[RPC, HEAD]{
) (*MultiNodeAdapter[RPC, HEAD], error) {
return &MultiNodeAdapter[RPC, HEAD]{
cfg: cfg,
rpc: rpc,
log: log,
Expand All @@ -54,17 +55,17 @@ func NewMultiNodeClient[RPC any, HEAD Head](
}, nil
}

func (m *MultiNodeClient[RPC, HEAD]) LenSubs() int {
func (m *MultiNodeAdapter[RPC, HEAD]) LenSubs() int {
m.subsSliceMu.RLock()
defer m.subsSliceMu.RUnlock()
return len(m.subs)
}

// RegisterSub adds the sub to the rpcClient list
func (m *MultiNodeClient[RPC, HEAD]) RegisterSub(sub Subscription, stopInFLightCh chan struct{}) error {
// registerSub adds the sub to the rpcMultiNodeAdapter list
func (m *MultiNodeAdapter[RPC, HEAD]) registerSub(sub Subscription, stopInFLightCh chan struct{}) error {
m.subsSliceMu.Lock()
defer m.subsSliceMu.Unlock()
// ensure that the `sub` belongs to current life cycle of the `rpcClient` and it should not be killed due to
// ensure that the `sub` belongs to current life cycle of the `rpcMultiNodeAdapter` and it should not be killed due to
// previous `DisconnectAll` call.
select {
case <-stopInFLightCh:
Expand All @@ -77,7 +78,7 @@ func (m *MultiNodeClient[RPC, HEAD]) RegisterSub(sub Subscription, stopInFLightC
return nil
}

func (m *MultiNodeClient[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, error) {
func (m *MultiNodeAdapter[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, error) {
// capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle
ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()
Expand All @@ -91,11 +92,11 @@ func (m *MultiNodeClient[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, err
return head, errors.New("invalid head")
}

m.OnNewHead(ctx, chStopInFlight, head)
m.onNewHead(ctx, chStopInFlight, head)
return head, nil
}

func (m *MultiNodeClient[RPC, HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error) {
func (m *MultiNodeAdapter[RPC, HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error) {
ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

Expand All @@ -112,7 +113,7 @@ func (m *MultiNodeClient[RPC, HEAD]) LatestFinalizedBlock(ctx context.Context) (
return head, nil
}

func (m *MultiNodeClient[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
func (m *MultiNodeAdapter[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
ctx, cancel, chStopInFlight, _ := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

Expand All @@ -133,7 +134,7 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-ch
return nil, nil, err
}

err := m.RegisterSub(&poller, chStopInFlight)
err := m.registerSub(&poller, chStopInFlight)
if err != nil {
poller.Unsubscribe()
return nil, nil, err
Expand All @@ -142,7 +143,7 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-ch
return channel, &poller, nil
}

func (m *MultiNodeClient[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
func (m *MultiNodeAdapter[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
ctx, cancel, chStopInFlight, _ := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

Expand All @@ -161,7 +162,7 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Conte
return nil, nil, err
}

err := m.RegisterSub(&poller, chStopInFlight)
err := m.registerSub(&poller, chStopInFlight)
if err != nil {
poller.Unsubscribe()
return nil, nil, err
Expand All @@ -170,7 +171,7 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Conte
return channel, &poller, nil
}

func (m *MultiNodeClient[RPC, HEAD]) OnNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
func (m *MultiNodeAdapter[RPC, HEAD]) onNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
if !head.IsValid() {
return
}
Expand All @@ -181,14 +182,14 @@ func (m *MultiNodeClient[RPC, HEAD]) OnNewHead(ctx context.Context, requestCh <-
m.highestUserObservations.BlockNumber = max(m.highestUserObservations.BlockNumber, head.BlockNumber())
}
select {
case <-requestCh: // no need to update latestChainInfo, as rpcClient already started new life cycle
case <-requestCh: // no need to update latestChainInfo, as rpcMultiNodeAdapter already started new life cycle
return
default:
m.latestChainInfo.BlockNumber = head.BlockNumber()
}
}

func (m *MultiNodeClient[RPC, HEAD]) OnNewFinalizedHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
func (m *MultiNodeAdapter[RPC, HEAD]) OnNewFinalizedHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
if !head.IsValid() {
return
}
Expand All @@ -199,7 +200,7 @@ func (m *MultiNodeClient[RPC, HEAD]) OnNewFinalizedHead(ctx context.Context, req
m.highestUserObservations.FinalizedBlockNumber = max(m.highestUserObservations.FinalizedBlockNumber, head.BlockNumber())
}
select {
case <-requestCh: // no need to update latestChainInfo, as rpcClient already started new life cycle
case <-requestCh: // no need to update latestChainInfo, as rpcMultiNodeAdapter already started new life cycle
return
default:
m.latestChainInfo.FinalizedBlockNumber = head.BlockNumber()
Expand All @@ -221,7 +222,7 @@ func MakeQueryCtx(ctx context.Context, ch services.StopChan, timeout time.Durati
return ctx, cancel
}

func (m *MultiNodeClient[RPC, HEAD]) AcquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc,
func (m *MultiNodeAdapter[RPC, HEAD]) AcquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc,
chStopInFlight chan struct{}, raw *RPC) {
// Need to wrap in mutex because state transition can cancel and replace context
m.stateMu.RLock()
Expand All @@ -233,7 +234,7 @@ func (m *MultiNodeClient[RPC, HEAD]) AcquireQueryCtx(parentCtx context.Context,
return
}

func (m *MultiNodeClient[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription) {
func (m *MultiNodeAdapter[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription) {
m.subsSliceMu.Lock()
defer m.subsSliceMu.Unlock()

Expand All @@ -250,23 +251,23 @@ func (m *MultiNodeClient[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription)
}
}

// CancelInflightRequests closes and replaces the chStopInFlight
func (m *MultiNodeClient[RPC, HEAD]) CancelInflightRequests() {
// cancelInflightRequests closes and replaces the chStopInFlight
func (m *MultiNodeAdapter[RPC, HEAD]) cancelInflightRequests() {
m.stateMu.Lock()
defer m.stateMu.Unlock()
close(m.chStopInFlight)
m.chStopInFlight = make(chan struct{})
}

func (m *MultiNodeClient[RPC, HEAD]) Close() {
m.CancelInflightRequests()
func (m *MultiNodeAdapter[RPC, HEAD]) Close() {
m.cancelInflightRequests()
m.UnsubscribeAllExcept()
m.chainInfoLock.Lock()
m.latestChainInfo = ChainInfo{}
m.chainInfoLock.Unlock()
}

func (m *MultiNodeClient[RPC, HEAD]) GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) {
func (m *MultiNodeAdapter[RPC, HEAD]) GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) {
m.chainInfoLock.Lock()
defer m.chainInfoLock.Unlock()
return m.latestChainInfo, m.highestUserObservations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,62 +8,81 @@ import (

"github.com/stretchr/testify/require"

common "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode/config"
)

type TestRPC struct {
type testRPC struct {
latestBlock int64
}

type TestHead struct {
type testHead struct {
blockNumber int64
}

func (t *TestHead) BlockNumber() int64 { return t.blockNumber }
func (t *TestHead) BlockDifficulty() *big.Int { return nil }
func (t *TestHead) IsValid() bool { return true }
func (t *testHead) BlockNumber() int64 { return t.blockNumber }
func (t *testHead) BlockDifficulty() *big.Int { return nil }
func (t *testHead) IsValid() bool { return true }

func LatestBlock(ctx context.Context, rpc *TestRPC) (*TestHead, error) {
func LatestBlock(ctx context.Context, rpc *testRPC) (*testHead, error) {
rpc.latestBlock++
return &TestHead{rpc.latestBlock}, nil
return &testHead{rpc.latestBlock}, nil
}

func initializeMultiNodeClient(t *testing.T) *MultiNodeClient[TestRPC, *TestHead] {
func ptr[T any](t T) *T {
return &t
}

func newTestClient(t *testing.T) *MultiNodeAdapter[testRPC, *testHead] {
requestTimeout := 5 * time.Second
lggr := logger.Test(t)
cfg := &config.MultiNodeConfig{}
cfg.SetDefaults()
enabled := true
cfg.MultiNode.Enabled = &enabled

c, err := NewMultiNodeClient[TestRPC, *TestHead](cfg, &TestRPC{}, requestTimeout, lggr, LatestBlock, LatestBlock)
cfg := &config.MultiNodeConfig{
MultiNode: config.MultiNode{
Enabled: ptr(true),
PollFailureThreshold: ptr(uint32(5)),
PollInterval: common.MustNewDuration(15 * time.Second),
SelectionMode: ptr(NodeSelectionModePriorityLevel),
SyncThreshold: ptr(uint32(10)),
LeaseDuration: common.MustNewDuration(time.Minute),
NodeIsSyncingEnabled: ptr(false),
FinalizedBlockPollInterval: common.MustNewDuration(5 * time.Second),
EnforceRepeatableRead: ptr(true),
DeathDeclarationDelay: common.MustNewDuration(20 * time.Second),
NodeNoNewHeadsThreshold: common.MustNewDuration(20 * time.Second),
NoNewFinalizedHeadsThreshold: common.MustNewDuration(20 * time.Second),
FinalityTagEnabled: ptr(true),
FinalityDepth: ptr(uint32(0)),
FinalizedBlockOffset: ptr(uint32(50)),
},
}
c, err := NewMultiNodeAdapter[testRPC, *testHead](cfg, &testRPC{}, requestTimeout, lggr, LatestBlock, LatestBlock)
require.NoError(t, err)
t.Cleanup(c.Close)
return c
}

func TestMultiNodeClient_LatestBlock(t *testing.T) {
c := initializeMultiNodeClient(t)

t.Run("LatestBlock", func(t *testing.T) {
c := newTestClient(t)
head, err := c.LatestBlock(tests.Context(t))
require.NoError(t, err)
require.Equal(t, true, head.IsValid())
})

t.Run("LatestFinalizedBlock", func(t *testing.T) {
c := newTestClient(t)
finalizedHead, err := c.LatestFinalizedBlock(tests.Context(t))
require.NoError(t, err)
require.Equal(t, true, finalizedHead.IsValid())
})
}

func TestMultiNodeClient_HeadSubscriptions(t *testing.T) {
c := initializeMultiNodeClient(t)

t.Run("SubscribeToHeads", func(t *testing.T) {
c := newTestClient(t)
ch, sub, err := c.SubscribeToHeads(tests.Context(t))
require.NoError(t, err)
defer sub.Unsubscribe()
Expand All @@ -80,6 +99,7 @@ func TestMultiNodeClient_HeadSubscriptions(t *testing.T) {
})

t.Run("SubscribeToFinalizedHeads", func(t *testing.T) {
c := newTestClient(t)
finalizedCh, finalizedSub, err := c.SubscribeToFinalizedHeads(tests.Context(t))
require.NoError(t, err)
defer finalizedSub.Unsubscribe()
Expand Down Expand Up @@ -112,38 +132,40 @@ func (s *mockSub) Err() <-chan error {
}

func TestMultiNodeClient_RegisterSubs(t *testing.T) {
c := initializeMultiNodeClient(t)

t.Run("registerSub", func(t *testing.T) {
c := newTestClient(t)
sub := newMockSub()
err := c.RegisterSub(sub, make(chan struct{}))
err := c.registerSub(sub, make(chan struct{}))
require.NoError(t, err)
require.Equal(t, 1, c.LenSubs())
c.UnsubscribeAllExcept()
})

t.Run("chStopInFlight returns error and unsubscribes", func(t *testing.T) {
c := newTestClient(t)
chStopInFlight := make(chan struct{})
close(chStopInFlight)
sub := newMockSub()
err := c.RegisterSub(sub, chStopInFlight)
err := c.registerSub(sub, chStopInFlight)
require.Error(t, err)
require.Equal(t, true, sub.unsubscribed)
})

t.Run("UnsubscribeAllExcept", func(t *testing.T) {
c := newTestClient(t)
chStopInFlight := make(chan struct{})
sub1 := newMockSub()
sub2 := newMockSub()
err := c.RegisterSub(sub1, chStopInFlight)
err := c.registerSub(sub1, chStopInFlight)
require.NoError(t, err)
err = c.RegisterSub(sub2, chStopInFlight)
err = c.registerSub(sub2, chStopInFlight)
require.NoError(t, err)
require.Equal(t, 2, c.LenSubs())

c.UnsubscribeAllExcept(sub1)
require.Equal(t, 1, c.LenSubs())
require.Equal(t, true, sub2.unsubscribed)
require.Equal(t, false, sub1.unsubscribed)

c.UnsubscribeAllExcept()
require.Equal(t, 0, c.LenSubs())
Expand Down
Loading

0 comments on commit 262890e

Please sign in to comment.