Skip to content

Commit

Permalink
Add managed subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanTinianov committed Dec 6, 2024
1 parent b45500a commit d2c881d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
33 changes: 10 additions & 23 deletions pkg/solana/client/multinode/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,6 @@ type MultiNodeClient[RPC any, HEAD Head] struct {
latestChainInfo ChainInfo
}

// ManagedSubscription is used to ensure that the subscription is removed from the client when unsubscribed
type ManagedSubscription struct {
Subscription
removeSub func(sub Subscription)
}

func (w *ManagedSubscription) Unsubscribe() {
w.Subscription.Unsubscribe()
if w.removeSub != nil {
w.removeSub(w)
}
}

func NewMultiNodeClient[RPC any, HEAD Head](
cfg *mnCfg.MultiNodeConfig, rpc *RPC, ctxTimeout time.Duration, log logger.Logger,
latestBlock func(ctx context.Context, rpc *RPC) (HEAD, error),
Expand All @@ -73,12 +60,6 @@ func (m *MultiNodeClient[RPC, HEAD]) LenSubs() int {
return len(m.subs)
}

func (m *MultiNodeClient[RPC, HEAD]) removeSubscription(sub Subscription) {
m.subsSliceMu.Lock()
defer m.subsSliceMu.Unlock()
delete(m.subs, sub)
}

// registerSub adds the sub to the rpcClient list
func (m *MultiNodeClient[RPC, HEAD]) registerSub(sub Subscription, stopInFLightCh chan struct{}) error {
m.subsSliceMu.Lock()
Expand All @@ -95,6 +76,12 @@ func (m *MultiNodeClient[RPC, HEAD]) registerSub(sub Subscription, stopInFLightC
return nil
}

func (m *MultiNodeClient[RPC, HEAD]) removeSub(sub Subscription) {
m.subsSliceMu.Lock()
defer m.subsSliceMu.Unlock()
delete(m.subs, sub)
}

func (m *MultiNodeClient[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, error) {
// capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle
ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout)
Expand Down Expand Up @@ -152,8 +139,8 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-ch
}

sub := &ManagedSubscription{
Subscription: &poller,
removeSub: m.removeSubscription,
Subscription: &poller,
onUnsubscribe: m.removeSub,
}

err := m.registerSub(sub, chStopInFlight)
Expand Down Expand Up @@ -185,8 +172,8 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Conte
}

sub := &ManagedSubscription{
Subscription: &poller,
removeSub: m.removeSubscription,
Subscription: &poller,
onUnsubscribe: m.removeSub,
}

err := m.registerSub(sub, chStopInFlight)
Expand Down
13 changes: 13 additions & 0 deletions pkg/solana/client/multinode/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ type Subscription interface {
Err() <-chan error
}

// ManagedSubscription is a Subscription which contains an onUnsubscribe callback
type ManagedSubscription struct {
Subscription
onUnsubscribe func(sub Subscription)
}

func (w *ManagedSubscription) Unsubscribe() {
w.Subscription.Unsubscribe()
if w.onUnsubscribe != nil {
w.onUnsubscribe(w)
}
}

// RPCClient includes all the necessary generalized RPC methods used by Node to perform health checks
type RPCClient[
CHAIN_ID ID,
Expand Down

0 comments on commit d2c881d

Please sign in to comment.