Skip to content

Commit

Permalink
feat: add configurable signer latency correction
Browse files Browse the repository at this point in the history
  • Loading branch information
gartnera committed Dec 18, 2024
1 parent 7d93d5a commit b444563
Show file tree
Hide file tree
Showing 16 changed files with 399 additions and 123 deletions.
3 changes: 2 additions & 1 deletion cmd/zetae2e/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ func localE2ETest(cmd *cobra.Command, _ []string) {

if testAdmin {
eg.Go(adminTestRoutine(conf, deployerRunner, verbose,
e2etests.TestOperationalFlagsName,
e2etests.TestZetaclientSignerOffsetName,
e2etests.TestZetaclientRestartHeightName,
e2etests.TestWhitelistERC20Name,
e2etests.TestPauseZRC20Name,
e2etests.TestUpdateBytecodeZRC20Name,
Expand Down
15 changes: 11 additions & 4 deletions e2e/e2etests/e2etests.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ const (
TestMigrateERC20CustodyFundsName = "migrate_erc20_custody_funds"
TestMigrateTSSName = "migrate_tss"
TestSolanaWhitelistSPLName = "solana_whitelist_spl"
TestOperationalFlagsName = "operational_flags"
TestZetaclientRestartHeightName = "zetaclient_restart_height"
TestZetaclientSignerOffsetName = "zetaclient_signer_offset"

/*
Operational tests
Expand Down Expand Up @@ -880,10 +881,16 @@ var AllE2ETests = []runner.E2ETest{
TestMigrateERC20CustodyFunds,
),
runner.NewE2ETest(
TestOperationalFlagsName,
"operational flags functionality",
TestZetaclientRestartHeightName,
"zetaclient scheduled restart height",
[]runner.ArgDefinition{},
TestOperationalFlags,
TestZetaclientRestartHeight,
),
runner.NewE2ETest(
TestZetaclientSignerOffsetName,
"zetaclient signer offset",
[]runner.ArgDefinition{},
TestZetaclientSignerOffset,
),
/*
Special tests
Expand Down
45 changes: 42 additions & 3 deletions e2e/e2etests/test_operational_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
)

const (
startTimestampMetricName = "zetaclient_last_start_timestamp_seconds"
startTimestampMetricName = "zetaclient_last_start_timestamp_seconds"
blockTimeLatencyMetricName = "zetaclient_core_block_latency"
blockTimeLatencySleepMetricName = "zetaclient_core_block_latency_sleep"
)

// TestOperationalFlags tests the functionality of operations flags.
func TestOperationalFlags(r *runner.E2ERunner, _ []string) {
// TestZetaclientRestartHeight tests scheduling a zetaclient restart via operational flags
func TestZetaclientRestartHeight(r *runner.E2ERunner, _ []string) {
_, err := r.Clients.Zetacore.Observer.OperationalFlags(
r.Ctx,
&observertypes.QueryOperationalFlagsRequest{},
Expand Down Expand Up @@ -60,3 +62,40 @@ func TestOperationalFlags(r *runner.E2ERunner, _ []string) {

require.Greater(r, currentStartTime, originalStartTime+1)
}

// TestZetaclientSignerOffset tests scheduling a zetaclient restart via operational flags
func TestZetaclientSignerOffset(r *runner.E2ERunner, _ []string) {
startBlockTimeLatencySleep, err := r.Clients.ZetaclientMetrics.FetchGauge(blockTimeLatencySleepMetricName)
require.NoError(r, err)
require.InDelta(r, 0, startBlockTimeLatencySleep, .01, "start block time latency should be 0")

startBlockTimeLatency, err := r.Clients.ZetaclientMetrics.FetchGauge(blockTimeLatencyMetricName)
require.NoError(r, err)

desiredSignerBlockTimeOffset := time.Duration(startBlockTimeLatency*float64(time.Second)) + time.Millisecond*200

updateMsg := observertypes.NewMsgUpdateOperationalFlags(
r.ZetaTxServer.MustGetAccountAddressFromName(utils.OperationalPolicyName),
observertypes.OperationalFlags{
SignerBlockTimeOffset: &desiredSignerBlockTimeOffset,
},
)

_, err = r.ZetaTxServer.BroadcastTx(utils.OperationalPolicyName, updateMsg)
require.NoError(r, err)

operationalFlagsRes, err := r.Clients.Zetacore.Observer.OperationalFlags(
r.Ctx,
&observertypes.QueryOperationalFlagsRequest{},
)
require.NoError(r, err)
require.InDelta(r, desiredSignerBlockTimeOffset, *(operationalFlagsRes.OperationalFlags.SignerBlockTimeOffset), .01)

require.Eventually(r, func() bool {
blockTimeLatencySleep, err := r.Clients.ZetaclientMetrics.FetchGauge(blockTimeLatencySleepMetricName)
if err != nil {
return false
}
return blockTimeLatencySleep > .1
}, time.Second*15, time.Second*1)
}
8 changes: 8 additions & 0 deletions proto/zetachain/zetacore/observer/operational.proto
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
syntax = "proto3";
package zetachain.zetacore.observer;

import "gogoproto/gogo.proto";
import "google/protobuf/duration.proto";

option go_package = "github.com/zeta-chain/node/x/observer/types";

// Flags for the top-level operation of zetaclient.
message OperationalFlags {
// Height for a coordinated zetaclient restart.
// Will be ignored if missed.
int64 restart_height = 1;

// Offset from the zetacore block time to initiate signing.
// Should be calculated and set based on max(zetaclient_core_block_latency).
google.protobuf.Duration signer_block_time_offset = 2
[ (gogoproto.stdduration) = true ];
}
3 changes: 2 additions & 1 deletion proto/zetachain/zetacore/observer/tx.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";
package zetachain.zetacore.observer;

import "gogoproto/gogo.proto";
import "google/protobuf/field_mask.proto";
import "zetachain/zetacore/observer/blame.proto";
import "zetachain/zetacore/observer/crosschain_flags.proto";
import "zetachain/zetacore/observer/observer.proto";
Expand Down Expand Up @@ -143,7 +144,7 @@ message MsgUpdateGasPriceIncreaseFlagsResponse {}

message MsgUpdateOperationalFlags {
string creator = 1;
OperationalFlags operationalFlags = 2 [ (gogoproto.nullable) = false ];
OperationalFlags operational_flags = 2 [ (gogoproto.nullable) = false ];
}

message MsgUpdateOperationalFlagsResponse {}
5 changes: 4 additions & 1 deletion testutil/sample/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math/rand"
"testing"
"time"

"github.com/cosmos/cosmos-sdk/crypto/keys/ed25519"
"github.com/cosmos/cosmos-sdk/testutil/testdata"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/cosmos"
zetacrypto "github.com/zeta-chain/node/pkg/crypto"
"github.com/zeta-chain/node/pkg/ptr"
"github.com/zeta-chain/node/x/observer/types"
)

Expand Down Expand Up @@ -287,6 +289,7 @@ func GasPriceIncreaseFlags() types.GasPriceIncreaseFlags {

func OperationalFlags() types.OperationalFlags {
return types.OperationalFlags{
RestartHeight: 1,
RestartHeight: 1,
SignerBlockTimeOffset: ptr.Ptr(time.Second),
}
}
8 changes: 6 additions & 2 deletions x/observer/client/cli/tx_update_operational_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
)

const (
fileFlag = "file"
restartHeightFlag = "restart-height"
fileFlag = "file"
restartHeightFlag = "restart-height"
signerBlockTimeOffsetFlag = "signer-block-time-offset"
)

func CmdUpdateOperationalFlags() *cobra.Command {
Expand All @@ -33,6 +34,7 @@ func CmdUpdateOperationalFlags() *cobra.Command {
flagSet := cmd.Flags()
file, _ := flagSet.GetString(fileFlag)
restartHeight, _ := flagSet.GetInt64(restartHeightFlag)
signerBlockTimeOffset, _ := flagSet.GetDuration(signerBlockTimeOffsetFlag)

if file != "" {
input, err := os.ReadFile(file) // #nosec G304
Expand All @@ -45,6 +47,7 @@ func CmdUpdateOperationalFlags() *cobra.Command {
}
} else {
operationalFlags.RestartHeight = restartHeight
operationalFlags.SignerBlockTimeOffset = &signerBlockTimeOffset
}

msg := types.NewMsgUpdateOperationalFlags(
Expand All @@ -58,6 +61,7 @@ func CmdUpdateOperationalFlags() *cobra.Command {

cmd.Flags().String(fileFlag, "", "Path to a JSON file containing OperationalFlags")
cmd.Flags().Int64(restartHeightFlag, 0, "Height for a coordinated zetaclient restart")
cmd.Flags().Duration(signerBlockTimeOffsetFlag, 0, "Offset from the zetacore block time to initiate signing")
flags.AddTxFlagsToCmd(cmd)

return cmd
Expand Down
12 changes: 10 additions & 2 deletions x/observer/keeper/msg_server_update_operational_flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package keeper_test

import (
"testing"
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"

"github.com/zeta-chain/node/pkg/ptr"
keepertest "github.com/zeta-chain/node/testutil/keeper"
"github.com/zeta-chain/node/testutil/sample"
authoritytypes "github.com/zeta-chain/node/x/authority/types"
Expand All @@ -26,10 +28,12 @@ func TestMsgServer_UpdateOperationalFlags(t *testing.T) {

// test initial set
restartHeight := int64(100)
signerBlockTimeOffset := ptr.Ptr(time.Second)
msg := types.MsgUpdateOperationalFlags{
Creator: admin,
OperationalFlags: types.OperationalFlags{
RestartHeight: restartHeight,
RestartHeight: restartHeight,
SignerBlockTimeOffset: signerBlockTimeOffset,
},
}
keepertest.MockCheckAuthorization(&authorityMock.Mock, &msg, nil)
Expand All @@ -39,13 +43,16 @@ func TestMsgServer_UpdateOperationalFlags(t *testing.T) {
operationalFlags, found := k.GetOperationalFlags(ctx)
require.True(t, found)
require.Equal(t, restartHeight, operationalFlags.RestartHeight)
require.Equal(t, signerBlockTimeOffset, operationalFlags.SignerBlockTimeOffset)

// verify that we can set it again
restartHeight = 101
signerBlockTimeOffset = ptr.Ptr(time.Second * 2)
msg = types.MsgUpdateOperationalFlags{
Creator: admin,
OperationalFlags: types.OperationalFlags{
RestartHeight: restartHeight,
RestartHeight: restartHeight,
SignerBlockTimeOffset: signerBlockTimeOffset,
},
}
keepertest.MockCheckAuthorization(&authorityMock.Mock, &msg, nil)
Expand All @@ -55,6 +62,7 @@ func TestMsgServer_UpdateOperationalFlags(t *testing.T) {
operationalFlags, found = k.GetOperationalFlags(ctx)
require.True(t, found)
require.Equal(t, restartHeight, operationalFlags.RestartHeight)
require.Equal(t, signerBlockTimeOffset, operationalFlags.SignerBlockTimeOffset)
})

t.Run("cannot update operational flags if not authorized", func(t *testing.T) {
Expand Down
16 changes: 9 additions & 7 deletions x/observer/types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ var (
ErrObserverSetNotFound = errorsmod.Register(ModuleName, 1130, "observer set not found")
ErrTssNotFound = errorsmod.Register(ModuleName, 1131, "tss not found")

ErrInboundDisabled = errorsmod.Register(ModuleName, 1132, "inbound tx processing is disabled")
ErrInvalidZetaCoinTypes = errorsmod.Register(ModuleName, 1133, "invalid zeta coin types")
ErrNotObserver = errorsmod.Register(ModuleName, 1134, "sender is not an observer")
ErrDuplicateObserver = errorsmod.Register(ModuleName, 1135, "observer already exists")
ErrObserverNotFound = errorsmod.Register(ModuleName, 1136, "observer not found")
ErrInvalidObserverAddress = errorsmod.Register(ModuleName, 1137, "invalid observer address")
ErrOperationalFlagsRestartHeightNegative = errorsmod.Register(ModuleName, 1138, "restart height cannot be negative")
ErrInboundDisabled = errorsmod.Register(ModuleName, 1132, "inbound tx processing is disabled")
ErrInvalidZetaCoinTypes = errorsmod.Register(ModuleName, 1133, "invalid zeta coin types")
ErrNotObserver = errorsmod.Register(ModuleName, 1134, "sender is not an observer")
ErrDuplicateObserver = errorsmod.Register(ModuleName, 1135, "observer already exists")
ErrObserverNotFound = errorsmod.Register(ModuleName, 1136, "observer not found")
ErrInvalidObserverAddress = errorsmod.Register(ModuleName, 1137, "invalid observer address")
ErrOperationalFlagsRestartHeightNegative = errorsmod.Register(ModuleName, 1138, "restart height cannot be negative")
ErrOperationalFlagsSignerBlockTimeOffsetNegative = errorsmod.Register(ModuleName, 1139, "signer block time offset cannot be negative")
ErrOperationalFlagsSignerBlockTimeOffsetLimit = errorsmod.Register(ModuleName, 1140, "signer block time offset exceeds limit")
)
16 changes: 15 additions & 1 deletion x/observer/types/message_update_operational_flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package types_test

import (
"testing"
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"

"github.com/zeta-chain/node/pkg/ptr"
"github.com/zeta-chain/node/testutil/sample"
"github.com/zeta-chain/node/x/observer/types"
)
Expand All @@ -24,7 +26,7 @@ func TestMsgUpdateOperationalFlags_ValidateBasic(t *testing.T) {
},
},
{
name: "invalid operational flags",
name: "invalid restart height",
msg: types.NewMsgUpdateOperationalFlags(
sample.AccAddress(),
types.OperationalFlags{
Expand All @@ -35,6 +37,18 @@ func TestMsgUpdateOperationalFlags_ValidateBasic(t *testing.T) {
require.ErrorContains(t, err, "invalid request")
},
},
{
name: "invalid signer block time offset",
msg: types.NewMsgUpdateOperationalFlags(
sample.AccAddress(),
types.OperationalFlags{
SignerBlockTimeOffset: ptr.Ptr(-time.Second),
},
),
err: func(t require.TestingT, err error, i ...interface{}) {
require.ErrorContains(t, err, "invalid request")
},
},
{
name: "valid",
msg: types.NewMsgUpdateOperationalFlags(sample.AccAddress(), sample.OperationalFlags()),
Expand Down
19 changes: 19 additions & 0 deletions x/observer/types/operational.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
package types

import (
"time"

cosmoserrors "cosmossdk.io/errors"
)

const (
signerBlockTimeOffsetLimit = time.Second * 10
)

func (f *OperationalFlags) Validate() error {
if f.RestartHeight < 0 {
return ErrOperationalFlagsRestartHeightNegative
}
if f.SignerBlockTimeOffset != nil {
signerBlockTimeOffset := *f.SignerBlockTimeOffset
if signerBlockTimeOffset < 0 {
return ErrOperationalFlagsRestartHeightNegative
}
if signerBlockTimeOffset > signerBlockTimeOffsetLimit {
return cosmoserrors.Wrapf(ErrOperationalFlagsSignerBlockTimeOffsetLimit, "(%s)", signerBlockTimeOffset)
}
}
return nil
}
Loading

0 comments on commit b444563

Please sign in to comment.