Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
Rate limit backfilling execution headers (paritytech#688)
Browse files Browse the repository at this point in the history
* Use parachain outgoing extrinsic pool to send messages to the parachain

* Only rate limit execution headers.

* Increase beacon max watched extrinisics.

* Adds maxWatchedExtrinsics config.

* Move config to parachain.

* Fix incorrect config mapping
  • Loading branch information
claravanstaden authored Sep 19, 2022
1 parent 68806eb commit 0276a3f
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 88 deletions.
6 changes: 2 additions & 4 deletions relayer/chain/parachain/outgoing_extrinsics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"golang.org/x/sync/semaphore"
)

const MaxWatchedExtrinsics = 10

type ExtrinsicPool struct {
conn *Connection
eg *errgroup.Group
Expand All @@ -24,11 +22,11 @@ type ExtrinsicPool struct {

type OnFinalized func(types.Hash) error

func NewExtrinsicPool(eg *errgroup.Group, conn *Connection) *ExtrinsicPool {
func NewExtrinsicPool(eg *errgroup.Group, conn *Connection, maxWatchedExtrinsics int64) *ExtrinsicPool {
ep := ExtrinsicPool{
conn: conn,
eg: eg,
sem: semaphore.NewWeighted(MaxWatchedExtrinsics),
sem: semaphore.NewWeighted(maxWatchedExtrinsics),
}
return &ep
}
Expand Down
3 changes: 2 additions & 1 deletion relayer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ type PolkadotConfig struct {
}

type ParachainConfig struct {
Endpoint string `mapstructure:"endpoint"`
Endpoint string `mapstructure:"endpoint"`
MaxWatchedExtrinsics int64 `mapstructure:"maxWatchedExtrinsics"`
}

type EthereumConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion relayer/relays/beacon/header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (h *Header) SyncHeader(ctx context.Context, blockRoot common.Hash, syncAggr

headerUpdate.SyncAggregate = syncAggregate

_, err = h.writer.WriteToParachain(ctx, "EthereumBeaconClient.import_execution_header", headerUpdate)
err = h.writer.WriteToParachainAndRateLimit(ctx, "EthereumBeaconClient.import_execution_header", headerUpdate)
if err != nil {
return syncer.HeaderUpdate{}, fmt.Errorf("write to parachain: %w", err)
}
Expand Down
31 changes: 0 additions & 31 deletions relayer/relays/beacon/header/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestComputeEpochAtSlot(t *testing.T) {
values := []struct {
name string
slot uint64
expected uint64
}{
{
name: "valid",
slot: 3433200,
expected: 107287,
},
{
name: "valid",
slot: 400,
expected: 12,
},
{
name: "0",
slot: 0,
expected: 0,
},
}

for _, tt := range values {
total := computeEpochAtSlot(tt.slot)
if total != tt.expected {
t.Errorf("ComputeEpochAtSlot of slot (%d) was incorrect, got: %d, want: %d.", tt.slot, total, tt.expected)
}
}
}

func TestHexToBinaryString(t *testing.T) {
values := []struct {
name string
Expand Down
1 change: 1 addition & 0 deletions relayer/relays/beacon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {

writer := writer.NewParachainWriter(
paraconn,
r.config.Sink.Parachain.MaxWatchedExtrinsics,
)

err = writer.Start(ctx, eg)
Expand Down
109 changes: 69 additions & 40 deletions relayer/relays/beacon/writer/parachain-writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,20 @@ import (
)

type ParachainWriter struct {
conn *parachain.Connection
nonce uint32
pool *parachain.ExtrinsicPool
genesisHash types.Hash
conn *parachain.Connection
nonce uint32
pool *parachain.ExtrinsicPool
genesisHash types.Hash
maxWatchedExtrinsics int64
}

func NewParachainWriter(
conn *parachain.Connection,
maxWatchedExtrinsics int64,
) *ParachainWriter {
return &ParachainWriter{
conn: conn,
conn: conn,
maxWatchedExtrinsics: maxWatchedExtrinsics,
}
}

Expand All @@ -39,7 +42,7 @@ func (wr *ParachainWriter) Start(ctx context.Context, eg *errgroup.Group) error
}
wr.genesisHash = genesisHash

wr.pool = parachain.NewExtrinsicPool(eg, wr.conn)
wr.pool = parachain.NewExtrinsicPool(eg, wr.conn, wr.maxWatchedExtrinsics)

return nil
}
Expand All @@ -63,6 +66,65 @@ func (wr *ParachainWriter) queryAccountNonce() (uint32, error) {
}

func (wr *ParachainWriter) WriteToParachain(ctx context.Context, extrinsicName string, payload ...interface{}) (*author.ExtrinsicStatusSubscription, error) {
extI, err := wr.prepExtrinstic(ctx, extrinsicName, payload...)
if err != nil {
return nil, err
}

sub, err := wr.conn.API().RPC.Author.SubmitAndWatchExtrinsic(*extI)
if err != nil {
return nil, err
}

wr.nonce = wr.nonce + 1

return sub, nil
}

func (wr *ParachainWriter) WriteToParachainAndRateLimit(ctx context.Context, extrinsicName string, payload ...interface{}) error {
extI, err := wr.prepExtrinstic(ctx, extrinsicName, payload...)
if err != nil {
return err
}

callback := func(h types.Hash) error { return nil }

err = wr.pool.WaitForSubmitAndWatch(ctx, extI, callback)
if err != nil {
return err
}

wr.nonce = wr.nonce + 1

return nil
}

func (wr *ParachainWriter) WriteToParachainAndWatch(ctx context.Context, extrinsicName string, payload ...interface{}) error {
sub, err := wr.WriteToParachain(ctx, extrinsicName, payload...)
if err != nil {
return err
}

defer sub.Unsubscribe()

for {
select {
case status := <-sub.Chan():
if status.IsDropped || status.IsInvalid || status.IsUsurped {
return fmt.Errorf("parachain write status was dropped, invalid or usurped")
}
if status.IsInBlock {
return nil
}
case err = <-sub.Err():
return err
case <-ctx.Done():
return nil
}
}
}

func (wr ParachainWriter) prepExtrinstic(ctx context.Context, extrinsicName string, payload ...interface{}) (*types.Extrinsic, error) {
meta, err := wr.conn.API().RPC.State.GetMetadataLatest()
if err != nil {
return nil, err
Expand Down Expand Up @@ -113,40 +175,7 @@ func (wr *ParachainWriter) WriteToParachain(ctx context.Context, extrinsicName s
return nil, err
}

sub, err := wr.conn.API().RPC.Author.SubmitAndWatchExtrinsic(extI)
if err != nil {
return nil, err
}

wr.nonce = wr.nonce + 1

return sub, nil
}

func (wr *ParachainWriter) WriteToParachainAndWatch(ctx context.Context, extrinsicName string, payload ...interface{}) error {
sub, err := wr.WriteToParachain(ctx, extrinsicName, payload...)
if err != nil {
return err
}


defer sub.Unsubscribe()

for {
select {
case status := <-sub.Chan():
if status.IsDropped || status.IsInvalid || status.IsUsurped {
return fmt.Errorf("parachain write status was dropped, invalid or usurped")
}
if status.IsInBlock {
return nil
}
case err = <-sub.Err():
return err
case <-ctx.Done():
return nil
}
}
return &extI, nil
}

func (wr *ParachainWriter) GetLastSyncedSyncCommitteePeriod() (uint64, error) {
Expand Down
1 change: 1 addition & 0 deletions relayer/relays/ethereum/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
writer := NewParachainWriter(
r.paraconn,
payloads,
r.config.Sink.Parachain.MaxWatchedExtrinsics,
)

err = writer.Start(ctx, eg)
Expand Down
19 changes: 11 additions & 8 deletions relayer/relays/ethereum/parachain-writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,23 @@ type ParachainPayload struct {
}

type ParachainWriter struct {
conn *parachain.Connection
payloads <-chan ParachainPayload
nonce uint32
pool *parachain.ExtrinsicPool
genesisHash types.Hash
conn *parachain.Connection
payloads <-chan ParachainPayload
nonce uint32
pool *parachain.ExtrinsicPool
genesisHash types.Hash
maxWatchedExtrinsics int64
}

func NewParachainWriter(
conn *parachain.Connection,
payloads <-chan ParachainPayload,
maxWatchedExtrinsics int64,
) *ParachainWriter {
return &ParachainWriter{
conn: conn,
payloads: payloads,
conn: conn,
payloads: payloads,
maxWatchedExtrinsics: maxWatchedExtrinsics,
}
}

Expand All @@ -53,7 +56,7 @@ func (wr *ParachainWriter) Start(ctx context.Context, eg *errgroup.Group) error
}
wr.genesisHash = genesisHash

wr.pool = parachain.NewExtrinsicPool(eg, wr.conn)
wr.pool = parachain.NewExtrinsicPool(eg, wr.conn, wr.maxWatchedExtrinsics)

eg.Go(func() error {
err := wr.writeLoop(ctx)
Expand Down
2 changes: 1 addition & 1 deletion relayer/relays/ethereum/parachain-writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestWrite(t *testing.T) {
eg, ctx := errgroup.WithContext(ctx)
defer cancel()

writer := ethereumRelay.NewParachainWriter(conn, payloads)
writer := ethereumRelay.NewParachainWriter(conn, payloads, 10)

err := conn.Connect(ctx)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion test/config/beacon-relay.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
},
"sink": {
"parachain": {
"endpoint": "ws://localhost:11144"
"endpoint": "ws://localhost:11144",
"maxWatchedExtrinsics": 32
}
}
}
3 changes: 2 additions & 1 deletion test/config/ethereum-relay.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
},
"sink": {
"parachain": {
"endpoint": "ws://localhost:11144"
"endpoint": "ws://localhost:11144",
"maxWatchedExtrinsics": 10
}
}
}

0 comments on commit 0276a3f

Please sign in to comment.