From 262890e402d40f1e8c7559f25065a850c37a6262 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 11 Dec 2024 10:28:50 -0500 Subject: [PATCH] Create MultiNodeAdaptor --- pkg/solana/chain_test.go | 9 +-- .../multinode/{client.go => adaptor.go} | 59 ++++++++-------- .../{client_test.go => adaptor_test.go} | 70 ++++++++++++------- pkg/solana/client/multinode/config/config.go | 67 ------------------ pkg/solana/client/multinode_client.go | 14 ++-- pkg/solana/config/toml.go | 48 ++++++++++++- 6 files changed, 131 insertions(+), 136 deletions(-) rename pkg/solana/client/multinode/{client.go => adaptor.go} (72%) rename pkg/solana/client/multinode/{client_test.go => adaptor_test.go} (60%) diff --git a/pkg/solana/chain_test.go b/pkg/solana/chain_test.go index eed7dbbc5..9196419dd 100644 --- a/pkg/solana/chain_test.go +++ b/pkg/solana/chain_test.go @@ -29,7 +29,6 @@ import ( "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" - mnCfg "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode/config" solcfg "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" "github.com/smartcontractkit/chainlink-solana/pkg/solana/fees" "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm/mocks" @@ -336,12 +335,8 @@ func TestSolanaChain_MultiNode_GetClient(t *testing.T) { ch := solcfg.Chain{} ch.SetDefaults() - mnCfg := mnCfg.MultiNodeConfig{ - MultiNode: mnCfg.MultiNode{ - Enabled: ptr(true), - }, - } - mnCfg.SetDefaults() + mnCfg := solcfg.NewDefaultMultiNodeConfig() + mnCfg.MultiNode.Enabled = ptr(true) cfg := &solcfg.TOMLConfig{ ChainID: ptr("devnet"), diff --git a/pkg/solana/client/multinode/client.go b/pkg/solana/client/multinode/adaptor.go similarity index 72% rename from pkg/solana/client/multinode/client.go rename to pkg/solana/client/multinode/adaptor.go index 9753c26da..c01bb7880 100644 --- a/pkg/solana/client/multinode/client.go +++ b/pkg/solana/client/multinode/adaptor.go @@ -13,7 +13,8 @@ import ( mnCfg "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode/config" ) -type MultiNodeClient[RPC any, HEAD Head] struct { +// MultiNodeAdapter is used to integrate multinode into chain-specific clients +type MultiNodeAdapter[RPC any, HEAD Head] struct { cfg *mnCfg.MultiNodeConfig log logger.Logger rpc *RPC @@ -26,23 +27,23 @@ type MultiNodeClient[RPC any, HEAD Head] struct { latestFinalizedBlock func(ctx context.Context, rpc *RPC) (HEAD, error) // chStopInFlight can be closed to immediately cancel all in-flight requests on - // this RpcClient. Closing and replacing should be serialized through - // stateMu since it can happen on state transitions as well as RpcClient Close. + // this RpcMultiNodeAdapter. Closing and replacing should be serialized through + // stateMu since it can happen on state transitions as well as RpcMultiNodeAdapter Close. chStopInFlight chan struct{} chainInfoLock sync.RWMutex - // intercepted values seen by callers of the rpcClient excluding health check calls. Need to ensure MultiNode provides repeatable read guarantee + // intercepted values seen by callers of the rpcMultiNodeAdapter excluding health check calls. Need to ensure MultiNode provides repeatable read guarantee highestUserObservations ChainInfo // most recent chain info observed during current lifecycle (reseted on DisconnectAll) latestChainInfo ChainInfo } -func NewMultiNodeClient[RPC any, HEAD Head]( +func NewMultiNodeAdapter[RPC any, HEAD Head]( cfg *mnCfg.MultiNodeConfig, rpc *RPC, ctxTimeout time.Duration, log logger.Logger, latestBlock func(ctx context.Context, rpc *RPC) (HEAD, error), latestFinalizedBlock func(ctx context.Context, rpc *RPC) (HEAD, error), -) (*MultiNodeClient[RPC, HEAD], error) { - return &MultiNodeClient[RPC, HEAD]{ +) (*MultiNodeAdapter[RPC, HEAD], error) { + return &MultiNodeAdapter[RPC, HEAD]{ cfg: cfg, rpc: rpc, log: log, @@ -54,17 +55,17 @@ func NewMultiNodeClient[RPC any, HEAD Head]( }, nil } -func (m *MultiNodeClient[RPC, HEAD]) LenSubs() int { +func (m *MultiNodeAdapter[RPC, HEAD]) LenSubs() int { m.subsSliceMu.RLock() defer m.subsSliceMu.RUnlock() return len(m.subs) } -// RegisterSub adds the sub to the rpcClient list -func (m *MultiNodeClient[RPC, HEAD]) RegisterSub(sub Subscription, stopInFLightCh chan struct{}) error { +// registerSub adds the sub to the rpcMultiNodeAdapter list +func (m *MultiNodeAdapter[RPC, HEAD]) registerSub(sub Subscription, stopInFLightCh chan struct{}) error { m.subsSliceMu.Lock() defer m.subsSliceMu.Unlock() - // ensure that the `sub` belongs to current life cycle of the `rpcClient` and it should not be killed due to + // ensure that the `sub` belongs to current life cycle of the `rpcMultiNodeAdapter` and it should not be killed due to // previous `DisconnectAll` call. select { case <-stopInFLightCh: @@ -77,7 +78,7 @@ func (m *MultiNodeClient[RPC, HEAD]) RegisterSub(sub Subscription, stopInFLightC return nil } -func (m *MultiNodeClient[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, error) { +func (m *MultiNodeAdapter[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, error) { // capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout) defer cancel() @@ -91,11 +92,11 @@ func (m *MultiNodeClient[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, err return head, errors.New("invalid head") } - m.OnNewHead(ctx, chStopInFlight, head) + m.onNewHead(ctx, chStopInFlight, head) return head, nil } -func (m *MultiNodeClient[RPC, HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error) { +func (m *MultiNodeAdapter[RPC, HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error) { ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout) defer cancel() @@ -112,7 +113,7 @@ func (m *MultiNodeClient[RPC, HEAD]) LatestFinalizedBlock(ctx context.Context) ( return head, nil } -func (m *MultiNodeClient[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error) { +func (m *MultiNodeAdapter[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error) { ctx, cancel, chStopInFlight, _ := m.AcquireQueryCtx(ctx, m.ctxTimeout) defer cancel() @@ -133,7 +134,7 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-ch return nil, nil, err } - err := m.RegisterSub(&poller, chStopInFlight) + err := m.registerSub(&poller, chStopInFlight) if err != nil { poller.Unsubscribe() return nil, nil, err @@ -142,7 +143,7 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-ch return channel, &poller, nil } -func (m *MultiNodeClient[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) { +func (m *MultiNodeAdapter[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) { ctx, cancel, chStopInFlight, _ := m.AcquireQueryCtx(ctx, m.ctxTimeout) defer cancel() @@ -161,7 +162,7 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Conte return nil, nil, err } - err := m.RegisterSub(&poller, chStopInFlight) + err := m.registerSub(&poller, chStopInFlight) if err != nil { poller.Unsubscribe() return nil, nil, err @@ -170,7 +171,7 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Conte return channel, &poller, nil } -func (m *MultiNodeClient[RPC, HEAD]) OnNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) { +func (m *MultiNodeAdapter[RPC, HEAD]) onNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) { if !head.IsValid() { return } @@ -181,14 +182,14 @@ func (m *MultiNodeClient[RPC, HEAD]) OnNewHead(ctx context.Context, requestCh <- m.highestUserObservations.BlockNumber = max(m.highestUserObservations.BlockNumber, head.BlockNumber()) } select { - case <-requestCh: // no need to update latestChainInfo, as rpcClient already started new life cycle + case <-requestCh: // no need to update latestChainInfo, as rpcMultiNodeAdapter already started new life cycle return default: m.latestChainInfo.BlockNumber = head.BlockNumber() } } -func (m *MultiNodeClient[RPC, HEAD]) OnNewFinalizedHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) { +func (m *MultiNodeAdapter[RPC, HEAD]) OnNewFinalizedHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) { if !head.IsValid() { return } @@ -199,7 +200,7 @@ func (m *MultiNodeClient[RPC, HEAD]) OnNewFinalizedHead(ctx context.Context, req m.highestUserObservations.FinalizedBlockNumber = max(m.highestUserObservations.FinalizedBlockNumber, head.BlockNumber()) } select { - case <-requestCh: // no need to update latestChainInfo, as rpcClient already started new life cycle + case <-requestCh: // no need to update latestChainInfo, as rpcMultiNodeAdapter already started new life cycle return default: m.latestChainInfo.FinalizedBlockNumber = head.BlockNumber() @@ -221,7 +222,7 @@ func MakeQueryCtx(ctx context.Context, ch services.StopChan, timeout time.Durati return ctx, cancel } -func (m *MultiNodeClient[RPC, HEAD]) AcquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, +func (m *MultiNodeAdapter[RPC, HEAD]) AcquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, chStopInFlight chan struct{}, raw *RPC) { // Need to wrap in mutex because state transition can cancel and replace context m.stateMu.RLock() @@ -233,7 +234,7 @@ func (m *MultiNodeClient[RPC, HEAD]) AcquireQueryCtx(parentCtx context.Context, return } -func (m *MultiNodeClient[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription) { +func (m *MultiNodeAdapter[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription) { m.subsSliceMu.Lock() defer m.subsSliceMu.Unlock() @@ -250,23 +251,23 @@ func (m *MultiNodeClient[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription) } } -// CancelInflightRequests closes and replaces the chStopInFlight -func (m *MultiNodeClient[RPC, HEAD]) CancelInflightRequests() { +// cancelInflightRequests closes and replaces the chStopInFlight +func (m *MultiNodeAdapter[RPC, HEAD]) cancelInflightRequests() { m.stateMu.Lock() defer m.stateMu.Unlock() close(m.chStopInFlight) m.chStopInFlight = make(chan struct{}) } -func (m *MultiNodeClient[RPC, HEAD]) Close() { - m.CancelInflightRequests() +func (m *MultiNodeAdapter[RPC, HEAD]) Close() { + m.cancelInflightRequests() m.UnsubscribeAllExcept() m.chainInfoLock.Lock() m.latestChainInfo = ChainInfo{} m.chainInfoLock.Unlock() } -func (m *MultiNodeClient[RPC, HEAD]) GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) { +func (m *MultiNodeAdapter[RPC, HEAD]) GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) { m.chainInfoLock.Lock() defer m.chainInfoLock.Unlock() return m.latestChainInfo, m.highestUserObservations diff --git a/pkg/solana/client/multinode/client_test.go b/pkg/solana/client/multinode/adaptor_test.go similarity index 60% rename from pkg/solana/client/multinode/client_test.go rename to pkg/solana/client/multinode/adaptor_test.go index 5cb59f797..3c47ecab7 100644 --- a/pkg/solana/client/multinode/client_test.go +++ b/pkg/solana/client/multinode/adaptor_test.go @@ -8,52 +8,72 @@ import ( "github.com/stretchr/testify/require" + common "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode/config" ) -type TestRPC struct { +type testRPC struct { latestBlock int64 } -type TestHead struct { +type testHead struct { blockNumber int64 } -func (t *TestHead) BlockNumber() int64 { return t.blockNumber } -func (t *TestHead) BlockDifficulty() *big.Int { return nil } -func (t *TestHead) IsValid() bool { return true } +func (t *testHead) BlockNumber() int64 { return t.blockNumber } +func (t *testHead) BlockDifficulty() *big.Int { return nil } +func (t *testHead) IsValid() bool { return true } -func LatestBlock(ctx context.Context, rpc *TestRPC) (*TestHead, error) { +func LatestBlock(ctx context.Context, rpc *testRPC) (*testHead, error) { rpc.latestBlock++ - return &TestHead{rpc.latestBlock}, nil + return &testHead{rpc.latestBlock}, nil } -func initializeMultiNodeClient(t *testing.T) *MultiNodeClient[TestRPC, *TestHead] { +func ptr[T any](t T) *T { + return &t +} + +func newTestClient(t *testing.T) *MultiNodeAdapter[testRPC, *testHead] { requestTimeout := 5 * time.Second lggr := logger.Test(t) - cfg := &config.MultiNodeConfig{} - cfg.SetDefaults() - enabled := true - cfg.MultiNode.Enabled = &enabled - - c, err := NewMultiNodeClient[TestRPC, *TestHead](cfg, &TestRPC{}, requestTimeout, lggr, LatestBlock, LatestBlock) + cfg := &config.MultiNodeConfig{ + MultiNode: config.MultiNode{ + Enabled: ptr(true), + PollFailureThreshold: ptr(uint32(5)), + PollInterval: common.MustNewDuration(15 * time.Second), + SelectionMode: ptr(NodeSelectionModePriorityLevel), + SyncThreshold: ptr(uint32(10)), + LeaseDuration: common.MustNewDuration(time.Minute), + NodeIsSyncingEnabled: ptr(false), + FinalizedBlockPollInterval: common.MustNewDuration(5 * time.Second), + EnforceRepeatableRead: ptr(true), + DeathDeclarationDelay: common.MustNewDuration(20 * time.Second), + NodeNoNewHeadsThreshold: common.MustNewDuration(20 * time.Second), + NoNewFinalizedHeadsThreshold: common.MustNewDuration(20 * time.Second), + FinalityTagEnabled: ptr(true), + FinalityDepth: ptr(uint32(0)), + FinalizedBlockOffset: ptr(uint32(50)), + }, + } + c, err := NewMultiNodeAdapter[testRPC, *testHead](cfg, &testRPC{}, requestTimeout, lggr, LatestBlock, LatestBlock) require.NoError(t, err) + t.Cleanup(c.Close) return c } func TestMultiNodeClient_LatestBlock(t *testing.T) { - c := initializeMultiNodeClient(t) - t.Run("LatestBlock", func(t *testing.T) { + c := newTestClient(t) head, err := c.LatestBlock(tests.Context(t)) require.NoError(t, err) require.Equal(t, true, head.IsValid()) }) t.Run("LatestFinalizedBlock", func(t *testing.T) { + c := newTestClient(t) finalizedHead, err := c.LatestFinalizedBlock(tests.Context(t)) require.NoError(t, err) require.Equal(t, true, finalizedHead.IsValid()) @@ -61,9 +81,8 @@ func TestMultiNodeClient_LatestBlock(t *testing.T) { } func TestMultiNodeClient_HeadSubscriptions(t *testing.T) { - c := initializeMultiNodeClient(t) - t.Run("SubscribeToHeads", func(t *testing.T) { + c := newTestClient(t) ch, sub, err := c.SubscribeToHeads(tests.Context(t)) require.NoError(t, err) defer sub.Unsubscribe() @@ -80,6 +99,7 @@ func TestMultiNodeClient_HeadSubscriptions(t *testing.T) { }) t.Run("SubscribeToFinalizedHeads", func(t *testing.T) { + c := newTestClient(t) finalizedCh, finalizedSub, err := c.SubscribeToFinalizedHeads(tests.Context(t)) require.NoError(t, err) defer finalizedSub.Unsubscribe() @@ -112,38 +132,40 @@ func (s *mockSub) Err() <-chan error { } func TestMultiNodeClient_RegisterSubs(t *testing.T) { - c := initializeMultiNodeClient(t) - t.Run("registerSub", func(t *testing.T) { + c := newTestClient(t) sub := newMockSub() - err := c.RegisterSub(sub, make(chan struct{})) + err := c.registerSub(sub, make(chan struct{})) require.NoError(t, err) require.Equal(t, 1, c.LenSubs()) c.UnsubscribeAllExcept() }) t.Run("chStopInFlight returns error and unsubscribes", func(t *testing.T) { + c := newTestClient(t) chStopInFlight := make(chan struct{}) close(chStopInFlight) sub := newMockSub() - err := c.RegisterSub(sub, chStopInFlight) + err := c.registerSub(sub, chStopInFlight) require.Error(t, err) require.Equal(t, true, sub.unsubscribed) }) t.Run("UnsubscribeAllExcept", func(t *testing.T) { + c := newTestClient(t) chStopInFlight := make(chan struct{}) sub1 := newMockSub() sub2 := newMockSub() - err := c.RegisterSub(sub1, chStopInFlight) + err := c.registerSub(sub1, chStopInFlight) require.NoError(t, err) - err = c.RegisterSub(sub2, chStopInFlight) + err = c.registerSub(sub2, chStopInFlight) require.NoError(t, err) require.Equal(t, 2, c.LenSubs()) c.UnsubscribeAllExcept(sub1) require.Equal(t, 1, c.LenSubs()) require.Equal(t, true, sub2.unsubscribed) + require.Equal(t, false, sub1.unsubscribed) c.UnsubscribeAllExcept() require.Equal(t, 0, c.LenSubs()) diff --git a/pkg/solana/client/multinode/config/config.go b/pkg/solana/client/multinode/config/config.go index a8c12075e..bd7995531 100644 --- a/pkg/solana/client/multinode/config/config.go +++ b/pkg/solana/client/multinode/config/config.go @@ -88,73 +88,6 @@ func (c *MultiNodeConfig) FinalityTagEnabled() bool { return *c.MultiNode.Finali func (c *MultiNodeConfig) FinalizedBlockOffset() uint32 { return *c.MultiNode.FinalizedBlockOffset } -func (c *MultiNodeConfig) SetDefaults() { - // MultiNode is disabled as it's not fully implemented yet: BCFR-122 - if c.MultiNode.Enabled == nil { - c.MultiNode.Enabled = ptr(false) - } - - /* Node Configs */ - // Failure threshold for polling set to 5 to tolerate some polling failures before taking action. - if c.MultiNode.PollFailureThreshold == nil { - c.MultiNode.PollFailureThreshold = ptr(uint32(5)) - } - // Poll interval is set to 15 seconds to ensure timely updates while minimizing resource usage. - if c.MultiNode.PollInterval == nil { - c.MultiNode.PollInterval = config.MustNewDuration(15 * time.Second) - } - // Selection mode defaults to priority level to enable using node priorities - if c.MultiNode.SelectionMode == nil { - c.MultiNode.SelectionMode = ptr("PriorityLevel") - } - // The sync threshold is set to 10 to allow for some flexibility in node synchronization before considering it out of sync. - if c.MultiNode.SyncThreshold == nil { - c.MultiNode.SyncThreshold = ptr(uint32(10)) - } - // Lease duration is set to 1 minute by default to allow node locks for a reasonable amount of time. - if c.MultiNode.LeaseDuration == nil { - c.MultiNode.LeaseDuration = config.MustNewDuration(time.Minute) - } - // Node syncing is not relevant for Solana and is disabled by default. - if c.MultiNode.NodeIsSyncingEnabled == nil { - c.MultiNode.NodeIsSyncingEnabled = ptr(false) - } - // The finalized block polling interval is set to 5 seconds to ensure timely updates while minimizing resource usage. - if c.MultiNode.FinalizedBlockPollInterval == nil { - c.MultiNode.FinalizedBlockPollInterval = config.MustNewDuration(5 * time.Second) - } - // Repeatable read guarantee should be enforced by default. - if c.MultiNode.EnforceRepeatableRead == nil { - c.MultiNode.EnforceRepeatableRead = ptr(true) - } - // The delay before declaring a node dead is set to 20 seconds to give nodes time to recover from temporary issues. - if c.MultiNode.DeathDeclarationDelay == nil { - c.MultiNode.DeathDeclarationDelay = config.MustNewDuration(20 * time.Second) - } - - /* Chain Configs */ - // Threshold for no new heads is set to 20 seconds, assuming that heads should update at a reasonable pace. - if c.MultiNode.NodeNoNewHeadsThreshold == nil { - c.MultiNode.NodeNoNewHeadsThreshold = config.MustNewDuration(20 * time.Second) - } - // Similar to heads, finalized heads should be updated within 20 seconds. - if c.MultiNode.NoNewFinalizedHeadsThreshold == nil { - c.MultiNode.NoNewFinalizedHeadsThreshold = config.MustNewDuration(20 * time.Second) - } - // Finality tags are used in Solana and enabled by default. - if c.MultiNode.FinalityTagEnabled == nil { - c.MultiNode.FinalityTagEnabled = ptr(true) - } - // Finality depth will not be used since finality tags are enabled. - if c.MultiNode.FinalityDepth == nil { - c.MultiNode.FinalityDepth = ptr(uint32(0)) - } - // Finalized block offset allows for RPCs to be slightly behind the finalized block. - if c.MultiNode.FinalizedBlockOffset == nil { - c.MultiNode.FinalizedBlockOffset = ptr(uint32(50)) - } -} - func (c *MultiNodeConfig) SetFrom(f *MultiNodeConfig) { if f.MultiNode.Enabled != nil { c.MultiNode.Enabled = f.MultiNode.Enabled diff --git a/pkg/solana/client/multinode_client.go b/pkg/solana/client/multinode_client.go index 700fbd5d3..610ef20c8 100644 --- a/pkg/solana/client/multinode_client.go +++ b/pkg/solana/client/multinode_client.go @@ -42,8 +42,8 @@ var _ mn.RPCClient[mn.StringID, *Head] = (*MultiNodeClient)(nil) var _ mn.SendTxRPCClient[*solana.Transaction, *SendTxResult] = (*MultiNodeClient)(nil) type MultiNodeClient struct { - Client - *mn.MultiNodeClient[rpc.Client, *Head] + *Client + *mn.MultiNodeAdapter[rpc.Client, *Head] cfg *config.TOMLConfig } @@ -52,15 +52,15 @@ func NewMultiNodeClient(endpoint string, cfg *config.TOMLConfig, requestTimeout if err != nil { return nil, err } - multiNodeClient, err := mn.NewMultiNodeClient[rpc.Client, *Head]( + multiNodeClient, err := mn.NewMultiNodeAdapter[rpc.Client, *Head]( &cfg.MultiNode, client.rpc, client.contextDuration, client.log, LatestBlock, LatestFinalizedBlock) if err != nil { return nil, err } return &MultiNodeClient{ - Client: *client, - MultiNodeClient: multiNodeClient, - cfg: cfg, + Client: client, + MultiNodeAdapter: multiNodeClient, + cfg: cfg, }, nil } @@ -112,7 +112,7 @@ func (m *MultiNodeClient) Close() { m.Client.log.Errorf("error closing rpc: %v", err) } }() - m.MultiNodeClient.Close() + m.MultiNodeAdapter.Close() } type SendTxResult struct { diff --git a/pkg/solana/config/toml.go b/pkg/solana/config/toml.go index 1a30608f1..76b7d43d9 100644 --- a/pkg/solana/config/toml.go +++ b/pkg/solana/config/toml.go @@ -13,6 +13,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/config" relaytypes "github.com/smartcontractkit/chainlink-common/pkg/types" + mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" mnCfg "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode/config" ) @@ -307,12 +308,55 @@ func (c *TOMLConfig) ListNodes() Nodes { func (c *TOMLConfig) SetDefaults() { c.Chain.SetDefaults() - c.MultiNode.SetDefaults() + c.MultiNode.SetFrom(defaultMultiNodeConfig) } func NewDefault() *TOMLConfig { cfg := &TOMLConfig{} cfg.Chain.SetDefaults() - cfg.MultiNode.SetDefaults() + cfg.MultiNode.SetFrom(defaultMultiNodeConfig) + return cfg +} + +var defaultMultiNodeConfig = &mnCfg.MultiNodeConfig{ + MultiNode: mnCfg.MultiNode{ + // Have multinode disabled by default + Enabled: ptr(false), + /* Node Configs */ + // Failure threshold for polling set to 5 to tolerate some polling failures before taking action. + PollFailureThreshold: ptr(uint32(5)), + // Poll interval is set to 15 seconds to ensure timely updates while minimizing resource usage. + PollInterval: config.MustNewDuration(15 * time.Second), + // Selection mode defaults to priority level to enable using node priorities + SelectionMode: ptr(mn.NodeSelectionModePriorityLevel), + // The sync threshold is set to 10 to allow for some flexibility in node synchronization before considering it out of sync. + SyncThreshold: ptr(uint32(10)), + // Lease duration is set to 1 minute by default to allow node locks for a reasonable amount of time. + LeaseDuration: config.MustNewDuration(time.Minute), + // Node syncing is not relevant for Solana and is disabled by default. + NodeIsSyncingEnabled: ptr(false), + // The finalized block polling interval is set to 5 seconds to ensure timely updates while minimizing resource usage. + FinalizedBlockPollInterval: config.MustNewDuration(5 * time.Second), + // Repeatable read guarantee should be enforced by default. + EnforceRepeatableRead: ptr(true), + // The delay before declaring a node dead is set to 20 seconds to give nodes time to recover from temporary issues. + DeathDeclarationDelay: config.MustNewDuration(20 * time.Second), + /* Chain Configs */ + // Threshold for no new heads is set to 20 seconds, assuming that heads should update at a reasonable pace. + NodeNoNewHeadsThreshold: config.MustNewDuration(20 * time.Second), + // Similar to heads, finalized heads should be updated within 20 seconds. + NoNewFinalizedHeadsThreshold: config.MustNewDuration(20 * time.Second), + // Finality tags are used in Solana and enabled by default. + FinalityTagEnabled: ptr(true), + // Finality depth will not be used since finality tags are enabled. + FinalityDepth: ptr(uint32(0)), + // Finalized block offset allows for RPCs to be slightly behind the finalized block. + FinalizedBlockOffset: ptr(uint32(50)), + }, +} + +func NewDefaultMultiNodeConfig() mnCfg.MultiNodeConfig { + cfg := mnCfg.MultiNodeConfig{} + cfg.SetFrom(defaultMultiNodeConfig) return cfg }