Skip to content

Commit

Permalink
Implement TON cctx scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
swift1337 committed Oct 24, 2024
1 parent cede35f commit f107ccc
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 129 deletions.
98 changes: 97 additions & 1 deletion zetaclient/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
btcobserver "github.com/zeta-chain/node/zetaclient/chains/bitcoin/observer"
"github.com/zeta-chain/node/zetaclient/chains/interfaces"
solanaobserver "github.com/zeta-chain/node/zetaclient/chains/solana/observer"
tonobserver "github.com/zeta-chain/node/zetaclient/chains/ton/observer"
tonsigner "github.com/zeta-chain/node/zetaclient/chains/ton/signer"
zctx "github.com/zeta-chain/node/zetaclient/context"
"github.com/zeta-chain/node/zetaclient/logs"
"github.com/zeta-chain/node/zetaclient/metrics"
Expand Down Expand Up @@ -193,6 +195,16 @@ func (oc *Orchestrator) resolveSigner(app *zctx.AppContext, chainID int64) (inte
Str("signer.gateway_address", params.GatewayAddress).
Msgf("updated gateway address for chain %d", chainID)
}
case chain.IsTON():
newAddress := chain.Params().GatewayAddress

if newAddress != signer.GetGatewayAddress() {
signer.SetGatewayAddress(newAddress)
oc.logger.Info().
Str("signer.new_gateway_address", newAddress).
Int64("signer.chain_id", chainID).
Msgf("set gateway address")
}
}

return signer, nil
Expand Down Expand Up @@ -235,8 +247,9 @@ func (oc *Orchestrator) resolveObserver(app *zctx.AppContext, chainID int64) (in
if !observertypes.ChainParamsEqual(curParams, *freshParams) {
observer.SetChainParams(*freshParams)
oc.logger.Info().
Int64("observer.chain_id", chainID).
Interface("observer.chain_params", *freshParams).
Msgf("updated chain params for chainID %d", chainID)
Msg("updated chain params")
}

return observer, nil
Expand Down Expand Up @@ -416,6 +429,8 @@ func (oc *Orchestrator) runScheduler(ctx context.Context) error {
oc.ScheduleCctxBTC(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsSolana():
oc.ScheduleCctxSolana(ctx, zetaHeight, chainID, cctxList, ob, signer)
case chain.IsTON():
oc.ScheduleCCTXTON(ctx, zetaHeight, chainID, cctxList, ob, signer)

Check warning on line 433 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L432-L433

Added lines #L432 - L433 were not covered by tests
default:
oc.logger.Error().Msgf("runScheduler: no scheduler found chain %d", chainID)
continue
Expand Down Expand Up @@ -665,6 +680,87 @@ func (oc *Orchestrator) ScheduleCctxSolana(
}
}

// ScheduleCCTXTON schedules TON outbound keySign on each ZetaChain block
func (oc *Orchestrator) ScheduleCCTXTON(
ctx context.Context,
zetaHeight uint64,
chainID int64,
cctxList []*types.CrossChainTx,
observer interfaces.ChainObserver,
signer interfaces.ChainSigner,
) {
// should never happen
if _, ok := observer.(*tonobserver.Observer); !ok {
oc.logger.Error().Msgf("ScheduleCCTXTON: observer is not TON")
return
}

Check warning on line 696 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L691-L696

Added lines #L691 - L696 were not covered by tests

if _, ok := signer.(*tonsigner.Signer); !ok {
oc.logger.Error().Msgf("ScheduleCCTXTON: signer is not TON")
return
}

Check warning on line 701 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L698-L701

Added lines #L698 - L701 were not covered by tests

// Scheduler interval measured in zeta blocks.
// runScheduler() guarantees that this function is called every zeta block.
// Note that TON blockchain is async and doesn't have a concept of confirmations
// i.e. tx is finalized as soon as it's included in the next block (less than 6 seconds)
// #nosec G701 positive
interval := uint64(observer.ChainParams().OutboundScheduleInterval)

shouldProcessOutbounds := zetaHeight%interval == 0

for i := range cctxList {
var (
cctx = cctxList[i]
params = cctx.GetCurrentOutboundParam()
nonce = params.TssNonce
outboundID = outboundprocessor.ToOutboundID(cctx.Index, params.ReceiverChainId, nonce)
)

if params.ReceiverChainId != chainID {
// should not happen
oc.logger.Error().Msgf("ScheduleCCTXTON: outbound chain id mismatch (got %d)", params.ReceiverChainId)
continue

Check warning on line 723 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L708-L723

Added lines #L708 - L723 were not covered by tests
}

// vote outbound if it's already confirmed
continueKeySign, err := observer.VoteOutboundIfConfirmed(ctx, cctx)

switch {
case err != nil:
oc.logger.Error().Err(err).Uint64("outbound.nonce", nonce).
Msg("ScheduleCCTXTON: VoteOutboundIfConfirmed failed")
continue
case !continueKeySign:
oc.logger.Info().Uint64("outbound.nonce", nonce).
Msg("ScheduleCCTXTON: outbound already processed")
continue
case !shouldProcessOutbounds:
// well, let's wait for another block to (probably) trigger the processing
continue

Check warning on line 740 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L727-L740

Added lines #L727 - L740 were not covered by tests
}

// try to sign and broadcast cctx to TON
task := func(ctx context.Context) error {
signer.TryProcessOutbound(
ctx,
cctx,
oc.outboundProc,
outboundID,
observer,
oc.zetacoreClient,
zetaHeight,
)

return nil
}

Check warning on line 756 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L744-L756

Added lines #L744 - L756 were not covered by tests

// fire async task
taskLogger := oc.logger.Logger.With().Str("outbound.id", outboundID).Logger()
bg.Work(ctx, task, bg.WithName("TryProcessOutbound"), bg.WithLogger(taskLogger))

Check warning on line 760 in zetaclient/orchestrator/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/orchestrator.go#L759-L760

Added lines #L759 - L760 were not covered by tests
}
}

// runObserverSignerSync runs a blocking ticker that observes chain changes from zetacore
// and optionally (de)provisions respective observers and signers.
func (oc *Orchestrator) runObserverSignerSync(ctx context.Context) error {
Expand Down
83 changes: 83 additions & 0 deletions zetaclient/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ func Test_GetUpdatedSigner(t *testing.T) {
evmChain = chains.Ethereum
btcChain = chains.BitcoinMainnet
solChain = chains.SolanaMainnet
tonChain = chains.TONMainnet
)

var (
evmChainParams = mocks.MockChainParams(evmChain.ChainId, 100)
btcChainParams = mocks.MockChainParams(btcChain.ChainId, 100)
solChainParams = mocks.MockChainParams(solChain.ChainId, 100)
tonChainParams = mocks.MockChainParams(tonChain.ChainId, 100)
)

solChainParams.GatewayAddress = solanacontracts.SolanaGatewayProgramID
Expand All @@ -50,6 +52,9 @@ func Test_GetUpdatedSigner(t *testing.T) {
solChainParamsNew := mocks.MockChainParams(solChain.ChainId, 100)
solChainParamsNew.GatewayAddress = sample.SolanaAddress(t)

tonChainParamsNew := mocks.MockChainParams(tonChain.ChainId, 100)
tonChainParamsNew.GatewayAddress = sample.GenerateTONAccountID().ToRaw()

t.Run("signer should not be found", func(t *testing.T) {
orchestrator := mockOrchestrator(t, nil, evmChain, btcChain, evmChainParams, btcChainParams)
appContext := createAppContext(t, evmChain, btcChain, evmChainParamsNew, btcChainParams)
Expand Down Expand Up @@ -86,6 +91,23 @@ func Test_GetUpdatedSigner(t *testing.T) {
require.NoError(t, err)
require.Equal(t, solChainParamsNew.GatewayAddress, signer.GetGatewayAddress())
})

t.Run("should be able to update ton gateway address", func(t *testing.T) {
orchestrator := mockOrchestrator(t, nil,
evmChain, btcChain, solChain, tonChain,
evmChainParams, btcChainParams, solChainParams, tonChainParams,
)

appContext := createAppContext(t,
evmChain, btcChain, solChain, tonChain,
evmChainParams, btcChainParams, solChainParamsNew, tonChainParamsNew,
)

// update signer with new gateway address
signer, err := orchestrator.resolveSigner(appContext, tonChain.ChainId)
require.NoError(t, err)
require.Equal(t, tonChainParamsNew.GatewayAddress, signer.GetGatewayAddress())
})
}

func Test_GetUpdatedChainObserver(t *testing.T) {
Expand All @@ -94,15 +116,18 @@ func Test_GetUpdatedChainObserver(t *testing.T) {
evmChain = chains.Ethereum
btcChain = chains.BitcoinMainnet
solChain = chains.SolanaMainnet
tonChain = chains.TONMainnet
)

var (
evmChainParams = mocks.MockChainParams(evmChain.ChainId, 100)
btcChainParams = mocks.MockChainParams(btcChain.ChainId, 100)
solChainParams = mocks.MockChainParams(solChain.ChainId, 100)
tonChainParams = mocks.MockChainParams(tonChain.ChainId, 100)
)

solChainParams.GatewayAddress = solanacontracts.SolanaGatewayProgramID
tonChainParams.GatewayAddress = sample.GenerateTONAccountID().ToRaw()

// new chain params in AppContext
evmChainParamsNew := &observertypes.ChainParams{
Expand Down Expand Up @@ -153,6 +178,22 @@ func Test_GetUpdatedChainObserver(t *testing.T) {
MinObserverDelegation: sdk.OneDec(),
IsSupported: true,
}
tonChainParamsNew := &observertypes.ChainParams{
ChainId: tonChain.ChainId,
ConfirmationCount: 10,
GasPriceTicker: 5,
InboundTicker: 6,
OutboundTicker: 6,
WatchUtxoTicker: 1,
ZetaTokenContractAddress: "",
ConnectorContractAddress: "",
Erc20CustodyContractAddress: "",
OutboundScheduleInterval: 10,
OutboundScheduleLookahead: 10,
BallotThreshold: sdk.OneDec(),
MinObserverDelegation: sdk.OneDec(),
IsSupported: true,
}

t.Run("evm chain observer should not be found", func(t *testing.T) {
orchestrator := mockOrchestrator(
Expand Down Expand Up @@ -284,6 +325,43 @@ func Test_GetUpdatedChainObserver(t *testing.T) {
require.NotNil(t, chainOb)
require.True(t, observertypes.ChainParamsEqual(*solChainParamsNew, chainOb.ChainParams()))
})
t.Run("ton chain observer should not be found", func(t *testing.T) {
orchestrator := mockOrchestrator(
t,
nil,
evmChain, btcChain, solChain,
evmChainParams, btcChainParams, solChainParams,
)

appContext := createAppContext(
t,
evmChain,
btcChain,
solChain,
evmChainParams,
btcChainParams,
solChainParamsNew,
)

_, err := orchestrator.resolveObserver(appContext, tonChain.ChainId)
require.ErrorContains(t, err, "observer not found")
})
t.Run("chain params in ton chain observer should be updated successfully", func(t *testing.T) {
orchestrator := mockOrchestrator(t, nil,
evmChain, btcChain, tonChain,
evmChainParams, btcChainParams, tonChainParams,
)
appContext := createAppContext(t,
evmChain, btcChain, tonChain,
evmChainParams, btcChainParams, tonChainParamsNew,
)

// update solana chain observer with new chain params
chainOb, err := orchestrator.resolveObserver(appContext, tonChain.ChainId)
require.NoError(t, err)
require.NotNil(t, chainOb)
require.True(t, observertypes.ChainParamsEqual(*tonChainParamsNew, chainOb.ChainParams()))
})
}

func Test_GetPendingCctxsWithinRateLimit(t *testing.T) {
Expand Down Expand Up @@ -500,6 +578,9 @@ func mockOrchestrator(t *testing.T, zetaClient interfaces.ZetacoreClient, chains
case chains.IsSolanaChain(cp.ChainId, nil):
observers[cp.ChainId] = mocks.NewSolanaObserver(cp)
signers[cp.ChainId] = mocks.NewSolanaSigner()
case chains.IsTONChain(cp.ChainId, nil):
observers[cp.ChainId] = mocks.NewTONObserver(cp)
signers[cp.ChainId] = mocks.NewTONSigner()
default:
t.Fatalf("mock orcestrator: unsupported chain %d", cp.ChainId)
}
Expand All @@ -526,6 +607,8 @@ func createAppContext(t *testing.T, chainsOrParams ...any) *zctx.AppContext {
cfg.BTCChainConfigs[c.ChainId] = config.BTCConfig{RPCHost: "localhost"}
case chains.IsSolanaChain(c.ChainId, nil):
cfg.SolanaConfig = config.SolanaConfig{Endpoint: "localhost"}
case chains.IsTONChain(c.ChainId, nil):
cfg.TONConfig = config.TONConfig{LiteClientConfigURL: "localhost"}
default:
t.Fatalf("create app context: unsupported chain %d", c.ChainId)
}
Expand Down
Loading

0 comments on commit f107ccc

Please sign in to comment.