diff --git a/relayer/chain/parachain/outgoing_extrinsics.go b/relayer/chain/parachain/outgoing_extrinsics.go index c99cd8cdaa68a..9b85ddb1309ab 100644 --- a/relayer/chain/parachain/outgoing_extrinsics.go +++ b/relayer/chain/parachain/outgoing_extrinsics.go @@ -14,8 +14,6 @@ import ( "golang.org/x/sync/semaphore" ) -const MaxWatchedExtrinsics = 10 - type ExtrinsicPool struct { conn *Connection eg *errgroup.Group @@ -24,11 +22,11 @@ type ExtrinsicPool struct { type OnFinalized func(types.Hash) error -func NewExtrinsicPool(eg *errgroup.Group, conn *Connection) *ExtrinsicPool { +func NewExtrinsicPool(eg *errgroup.Group, conn *Connection, maxWatchedExtrinsics int64) *ExtrinsicPool { ep := ExtrinsicPool{ conn: conn, eg: eg, - sem: semaphore.NewWeighted(MaxWatchedExtrinsics), + sem: semaphore.NewWeighted(maxWatchedExtrinsics), } return &ep } diff --git a/relayer/config/config.go b/relayer/config/config.go index f3d3e78f7f63c..53d90f17aefb4 100644 --- a/relayer/config/config.go +++ b/relayer/config/config.go @@ -5,7 +5,8 @@ type PolkadotConfig struct { } type ParachainConfig struct { - Endpoint string `mapstructure:"endpoint"` + Endpoint string `mapstructure:"endpoint"` + MaxWatchedExtrinsics int64 `mapstructure:"maxWatchedExtrinsics"` } type EthereumConfig struct { diff --git a/relayer/relays/beacon/header/header.go b/relayer/relays/beacon/header/header.go index b71b8ef5e90d9..4434fd4967460 100644 --- a/relayer/relays/beacon/header/header.go +++ b/relayer/relays/beacon/header/header.go @@ -236,7 +236,7 @@ func (h *Header) SyncHeader(ctx context.Context, blockRoot common.Hash, syncAggr headerUpdate.SyncAggregate = syncAggregate - _, err = h.writer.WriteToParachain(ctx, "EthereumBeaconClient.import_execution_header", headerUpdate) + err = h.writer.WriteToParachainAndRateLimit(ctx, "EthereumBeaconClient.import_execution_header", headerUpdate) if err != nil { return syncer.HeaderUpdate{}, fmt.Errorf("write to parachain: %w", err) } diff --git a/relayer/relays/beacon/header/syncer/syncer_test.go b/relayer/relays/beacon/header/syncer/syncer_test.go index 191fddf6976ab..9166748810687 100644 --- a/relayer/relays/beacon/header/syncer/syncer_test.go +++ b/relayer/relays/beacon/header/syncer/syncer_test.go @@ -6,37 +6,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestComputeEpochAtSlot(t *testing.T) { - values := []struct { - name string - slot uint64 - expected uint64 - }{ - { - name: "valid", - slot: 3433200, - expected: 107287, - }, - { - name: "valid", - slot: 400, - expected: 12, - }, - { - name: "0", - slot: 0, - expected: 0, - }, - } - - for _, tt := range values { - total := computeEpochAtSlot(tt.slot) - if total != tt.expected { - t.Errorf("ComputeEpochAtSlot of slot (%d) was incorrect, got: %d, want: %d.", tt.slot, total, tt.expected) - } - } -} - func TestHexToBinaryString(t *testing.T) { values := []struct { name string diff --git a/relayer/relays/beacon/main.go b/relayer/relays/beacon/main.go index f80b1cbf7f7ba..95253b6a7d859 100644 --- a/relayer/relays/beacon/main.go +++ b/relayer/relays/beacon/main.go @@ -46,6 +46,7 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error { writer := writer.NewParachainWriter( paraconn, + r.config.Sink.Parachain.MaxWatchedExtrinsics, ) err = writer.Start(ctx, eg) diff --git a/relayer/relays/beacon/writer/parachain-writer.go b/relayer/relays/beacon/writer/parachain-writer.go index 9a15fdd52297e..433463336d82e 100644 --- a/relayer/relays/beacon/writer/parachain-writer.go +++ b/relayer/relays/beacon/writer/parachain-writer.go @@ -12,17 +12,20 @@ import ( ) type ParachainWriter struct { - conn *parachain.Connection - nonce uint32 - pool *parachain.ExtrinsicPool - genesisHash types.Hash + conn *parachain.Connection + nonce uint32 + pool *parachain.ExtrinsicPool + genesisHash types.Hash + maxWatchedExtrinsics int64 } func NewParachainWriter( conn *parachain.Connection, + maxWatchedExtrinsics int64, ) *ParachainWriter { return &ParachainWriter{ - conn: conn, + conn: conn, + maxWatchedExtrinsics: maxWatchedExtrinsics, } } @@ -39,7 +42,7 @@ func (wr *ParachainWriter) Start(ctx context.Context, eg *errgroup.Group) error } wr.genesisHash = genesisHash - wr.pool = parachain.NewExtrinsicPool(eg, wr.conn) + wr.pool = parachain.NewExtrinsicPool(eg, wr.conn, wr.maxWatchedExtrinsics) return nil } @@ -63,6 +66,65 @@ func (wr *ParachainWriter) queryAccountNonce() (uint32, error) { } func (wr *ParachainWriter) WriteToParachain(ctx context.Context, extrinsicName string, payload ...interface{}) (*author.ExtrinsicStatusSubscription, error) { + extI, err := wr.prepExtrinstic(ctx, extrinsicName, payload...) + if err != nil { + return nil, err + } + + sub, err := wr.conn.API().RPC.Author.SubmitAndWatchExtrinsic(*extI) + if err != nil { + return nil, err + } + + wr.nonce = wr.nonce + 1 + + return sub, nil +} + +func (wr *ParachainWriter) WriteToParachainAndRateLimit(ctx context.Context, extrinsicName string, payload ...interface{}) error { + extI, err := wr.prepExtrinstic(ctx, extrinsicName, payload...) + if err != nil { + return err + } + + callback := func(h types.Hash) error { return nil } + + err = wr.pool.WaitForSubmitAndWatch(ctx, extI, callback) + if err != nil { + return err + } + + wr.nonce = wr.nonce + 1 + + return nil +} + +func (wr *ParachainWriter) WriteToParachainAndWatch(ctx context.Context, extrinsicName string, payload ...interface{}) error { + sub, err := wr.WriteToParachain(ctx, extrinsicName, payload...) + if err != nil { + return err + } + + defer sub.Unsubscribe() + + for { + select { + case status := <-sub.Chan(): + if status.IsDropped || status.IsInvalid || status.IsUsurped { + return fmt.Errorf("parachain write status was dropped, invalid or usurped") + } + if status.IsInBlock { + return nil + } + case err = <-sub.Err(): + return err + case <-ctx.Done(): + return nil + } + } +} + +func (wr ParachainWriter) prepExtrinstic(ctx context.Context, extrinsicName string, payload ...interface{}) (*types.Extrinsic, error) { meta, err := wr.conn.API().RPC.State.GetMetadataLatest() if err != nil { return nil, err @@ -113,40 +175,7 @@ func (wr *ParachainWriter) WriteToParachain(ctx context.Context, extrinsicName s return nil, err } - sub, err := wr.conn.API().RPC.Author.SubmitAndWatchExtrinsic(extI) - if err != nil { - return nil, err - } - - wr.nonce = wr.nonce + 1 - - return sub, nil -} - -func (wr *ParachainWriter) WriteToParachainAndWatch(ctx context.Context, extrinsicName string, payload ...interface{}) error { - sub, err := wr.WriteToParachain(ctx, extrinsicName, payload...) - if err != nil { - return err - } - - - defer sub.Unsubscribe() - - for { - select { - case status := <-sub.Chan(): - if status.IsDropped || status.IsInvalid || status.IsUsurped { - return fmt.Errorf("parachain write status was dropped, invalid or usurped") - } - if status.IsInBlock { - return nil - } - case err = <-sub.Err(): - return err - case <-ctx.Done(): - return nil - } - } + return &extI, nil } func (wr *ParachainWriter) GetLastSyncedSyncCommitteePeriod() (uint64, error) { diff --git a/relayer/relays/ethereum/main.go b/relayer/relays/ethereum/main.go index 6a205c143e050..ffc95014cd3b4 100644 --- a/relayer/relays/ethereum/main.go +++ b/relayer/relays/ethereum/main.go @@ -68,6 +68,7 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error { writer := NewParachainWriter( r.paraconn, payloads, + r.config.Sink.Parachain.MaxWatchedExtrinsics, ) err = writer.Start(ctx, eg) diff --git a/relayer/relays/ethereum/parachain-writer.go b/relayer/relays/ethereum/parachain-writer.go index ebeac8c428ea9..2adb9b9aba1b7 100644 --- a/relayer/relays/ethereum/parachain-writer.go +++ b/relayer/relays/ethereum/parachain-writer.go @@ -23,20 +23,23 @@ type ParachainPayload struct { } type ParachainWriter struct { - conn *parachain.Connection - payloads <-chan ParachainPayload - nonce uint32 - pool *parachain.ExtrinsicPool - genesisHash types.Hash + conn *parachain.Connection + payloads <-chan ParachainPayload + nonce uint32 + pool *parachain.ExtrinsicPool + genesisHash types.Hash + maxWatchedExtrinsics int64 } func NewParachainWriter( conn *parachain.Connection, payloads <-chan ParachainPayload, + maxWatchedExtrinsics int64, ) *ParachainWriter { return &ParachainWriter{ - conn: conn, - payloads: payloads, + conn: conn, + payloads: payloads, + maxWatchedExtrinsics: maxWatchedExtrinsics, } } @@ -53,7 +56,7 @@ func (wr *ParachainWriter) Start(ctx context.Context, eg *errgroup.Group) error } wr.genesisHash = genesisHash - wr.pool = parachain.NewExtrinsicPool(eg, wr.conn) + wr.pool = parachain.NewExtrinsicPool(eg, wr.conn, wr.maxWatchedExtrinsics) eg.Go(func() error { err := wr.writeLoop(ctx) diff --git a/relayer/relays/ethereum/parachain-writer_test.go b/relayer/relays/ethereum/parachain-writer_test.go index 8e0f90091cbbf..7eafc6115734b 100644 --- a/relayer/relays/ethereum/parachain-writer_test.go +++ b/relayer/relays/ethereum/parachain-writer_test.go @@ -24,7 +24,7 @@ func TestWrite(t *testing.T) { eg, ctx := errgroup.WithContext(ctx) defer cancel() - writer := ethereumRelay.NewParachainWriter(conn, payloads) + writer := ethereumRelay.NewParachainWriter(conn, payloads, 10) err := conn.Connect(ctx) if err != nil { diff --git a/test/config/beacon-relay.json b/test/config/beacon-relay.json index 7d2d4ffd8057b..0c981de209529 100644 --- a/test/config/beacon-relay.json +++ b/test/config/beacon-relay.json @@ -24,7 +24,8 @@ }, "sink": { "parachain": { - "endpoint": "ws://localhost:11144" + "endpoint": "ws://localhost:11144", + "maxWatchedExtrinsics": 32 } } } diff --git a/test/config/ethereum-relay.json b/test/config/ethereum-relay.json index 5d17d4a1ebf67..c68ce31d382ee 100644 --- a/test/config/ethereum-relay.json +++ b/test/config/ethereum-relay.json @@ -12,7 +12,8 @@ }, "sink": { "parachain": { - "endpoint": "ws://localhost:11144" + "endpoint": "ws://localhost:11144", + "maxWatchedExtrinsics": 10 } } }