Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: background worker routines to shutdown client for migration #2538

Merged
merged 26 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0bc8952
add background threads
kingpinXD Jul 19, 2024
3338f44
add cancel for child context
kingpinXD Jul 24, 2024
625259b
add comments
kingpinXD Jul 24, 2024
ee16242
remove commented code
kingpinXD Jul 24, 2024
a0e0df5
add default constant backoff
kingpinXD Jul 24, 2024
731b03e
generate files
kingpinXD Jul 24, 2024
b62938a
resolve comments 1
kingpinXD Jul 24, 2024
2cce32c
resolve comments 1
kingpinXD Jul 24, 2024
31e854c
resolve comments 2
kingpinXD Jul 24, 2024
cae80f2
rename to callback to clarify terminiology
kingpinXD Jul 24, 2024
6834fa8
remove cancel cause
kingpinXD Jul 24, 2024
476ee0a
rebase develop
kingpinXD Jul 25, 2024
55a3c82
Merge branch 'develop' into restart-thread-zetaclient
kingpinXD Jul 27, 2024
6b058e2
generate files
kingpinXD Jul 28, 2024
0c30b3d
move changelog to unreleased
kingpinXD Jul 31, 2024
f993da5
Add bg.OnComplete
swift1337 Jul 31, 2024
7c113a5
Add maintenance package. Move TSS listener to maintenance
swift1337 Jul 31, 2024
8e93dee
Merge branch 'develop' into restart-thread-zetaclient
swift1337 Aug 2, 2024
a665e67
Fix merge conflicts
swift1337 Aug 2, 2024
b93f067
fix tss migration test
kingpinXD Aug 2, 2024
4d8c689
Merge remote-tracking branch 'origin/restart-thread-zetaclient' into …
kingpinXD Aug 2, 2024
42a57a7
add structured logging
kingpinXD Aug 2, 2024
1c132de
remove check fro nonce 0
kingpinXD Aug 6, 2024
b5c0d77
fix OutboundID generation to make the identifier more unique
ws4charlie Aug 6, 2024
c5da6e1
rebase develop
kingpinXD Aug 6, 2024
3895d4f
Merge branch 'develop' into restart-thread-zetaclient
kingpinXD Aug 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 5 additions & 128 deletions cmd/zetaclientd-supervisor/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import (
"runtime"
"strings"
"sync"
"syscall"
"time"

"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types"
"github.com/hashicorp/go-getter"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

observertypes "github.com/zeta-chain/zetacore/x/observer/types"
"github.com/zeta-chain/zetacore/zetaclient/config"
)

Expand Down Expand Up @@ -79,7 +78,7 @@ func newZetaclientdSupervisor(
logger = logger.With().Str("module", "zetaclientdSupervisor").Logger()
conn, err := grpc.Dial(
fmt.Sprintf("%s:9090", zetaCoreURL),
grpc.WithInsecure(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("grpc dial: %w", err)
Expand All @@ -99,9 +98,6 @@ func newZetaclientdSupervisor(
func (s *zetaclientdSupervisor) Start(ctx context.Context) {
go s.watchForVersionChanges(ctx)
go s.handleCoreUpgradePlan(ctx)
go s.handleNewKeygen(ctx)
go s.handleNewTSSKeyGeneration(ctx)
go s.handleTSSUpdate(ctx)
}

func (s *zetaclientdSupervisor) WaitForReloadSignal(ctx context.Context) {
Expand Down Expand Up @@ -177,125 +173,6 @@ func (s *zetaclientdSupervisor) watchForVersionChanges(ctx context.Context) {
}
}

func (s *zetaclientdSupervisor) handleTSSUpdate(ctx context.Context) {
maxRetries := 11
retryInterval := 5 * time.Second

// TODO : use retry library under pkg/retry
// https://github.com/zeta-chain/node/issues/2492
for i := 0; i < maxRetries; i++ {
client := observertypes.NewQueryClient(s.zetacoredConn)
tss, err := client.TSS(ctx, &observertypes.QueryGetTSSRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get original tss")
time.Sleep(retryInterval)
continue
}
i = 0
for {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
tssNew, err := client.TSS(ctx, &observertypes.QueryGetTSSRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get tss")
continue
}

if tssNew.TSS.TssPubkey == tss.TSS.TssPubkey {
continue
}

tss = tssNew
s.logger.Info().
Msgf("tss address is updated from %s to %s", tss.TSS.TssPubkey, tssNew.TSS.TssPubkey)
time.Sleep(6 * time.Second)
s.logger.Info().Msg("restarting zetaclientd to update tss address")
s.restartChan <- syscall.SIGHUP
}
}
s.logger.Warn().Msg("handleTSSUpdate exiting without success")
}

func (s *zetaclientdSupervisor) handleNewTSSKeyGeneration(ctx context.Context) {
maxRetries := 11
retryInterval := 5 * time.Second

// TODO : use retry library under pkg/retry
for i := 0; i < maxRetries; i++ {
client := observertypes.NewQueryClient(s.zetacoredConn)
alltss, err := client.TssHistory(ctx, &observertypes.QueryTssHistoryRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get tss original history")
time.Sleep(retryInterval)
continue
}
i = 0
tssLenCurrent := len(alltss.TssList)
for {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
tssListNew, err := client.TssHistory(ctx, &observertypes.QueryTssHistoryRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get tss new history")
continue
}
tssLenUpdated := len(tssListNew.TssList)

if tssLenUpdated == tssLenCurrent {
continue
}
if tssLenUpdated < tssLenCurrent {
tssLenCurrent = len(tssListNew.TssList)
continue
}

tssLenCurrent = tssLenUpdated
s.logger.Info().Msgf("tss list updated from %d to %d", tssLenCurrent, tssLenUpdated)
time.Sleep(5 * time.Second)
s.logger.Info().Msg("restarting zetaclientd to update tss list")
s.restartChan <- syscall.SIGHUP
}
}
s.logger.Warn().Msg("handleNewTSSKeyGeneration exiting without success")
}

func (s *zetaclientdSupervisor) handleNewKeygen(ctx context.Context) {
client := observertypes.NewQueryClient(s.zetacoredConn)
prevKeygenBlock := int64(0)
for {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
resp, err := client.Keygen(ctx, &observertypes.QueryGetKeygenRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get keygen")
continue
}
if resp.Keygen == nil {
s.logger.Warn().Err(err).Msg("keygen is nil")
continue
}

if resp.Keygen.Status != observertypes.KeygenStatus_PendingKeygen {
continue
}
keygenBlock := resp.Keygen.BlockNumber
if prevKeygenBlock == keygenBlock {
continue
}
prevKeygenBlock = keygenBlock
s.logger.Info().Msgf("got new keygen at block %d", keygenBlock)
s.restartChan <- syscall.SIGHUP
}
}
func (s *zetaclientdSupervisor) handleCoreUpgradePlan(ctx context.Context) {
client := upgradetypes.NewQueryClient(s.zetacoredConn)

Expand Down Expand Up @@ -345,16 +222,16 @@ func (s *zetaclientdSupervisor) downloadZetaclientd(ctx context.Context, plan *u
if plan.Info == "" {
return errors.New("upgrade info empty")
}
var config upgradeConfig
err := json.Unmarshal([]byte(plan.Info), &config)
var cfg upgradeConfig
err := json.Unmarshal([]byte(plan.Info), &cfg)
if err != nil {
return fmt.Errorf("unmarshal upgrade config: %w", err)
}

s.logger.Info().Msg("downloading zetaclientd")

binKey := fmt.Sprintf("%s-%s/%s", zetaclientdBinaryName, runtime.GOOS, runtime.GOARCH)
binURL, ok := config.Binaries[binKey]
binURL, ok := cfg.Binaries[binKey]
if !ok {
return fmt.Errorf("no binary found for: %s", binKey)
}
Expand Down
12 changes: 10 additions & 2 deletions cmd/zetaclientd-supervisor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,18 @@ func main() {
cmd.Stdin = &passwordInputBuffer

eg, ctx := errgroup.WithContext(ctx)
eg.Go(cmd.Run)
eg.Go(func() error {
defer cancel()
if err := cmd.Run(); err != nil {
logger.Error().Err(err).Msg("zetaclient process exited with error")
return err
}

logger.Info().Msg("zetaclient process exited")
return nil
})
eg.Go(func() error {
supervisor.WaitForReloadSignal(ctx)
cancel()
return nil
})
eg.Go(func() error {
Expand Down
29 changes: 22 additions & 7 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ func start(_ *cobra.Command, _ []string) error {
// Set P2P ID for telemetry
telemetryServer.SetP2PID(server.GetLocalPeerID())

// Create a notification context for background threads.
// These threads are responsible for sending shutdown signals to the main thread.
notifyCtx, cancel := context.WithCancelCause(ctx)
fbac marked this conversation as resolved.
Show resolved Hide resolved

// Start background threads which monitors zeta core for state changes related to TSS migration
cancelBackgroundThreads := zetacoreClient.StartTssMigrationRoutines(notifyCtx, cancel, masterLogger)
defer cancelBackgroundThreads()

// Generate a new TSS if keygen is set and add it into the tss server
// If TSS has already been generated, and keygen was successful ; we use the existing TSS
err = GenerateTSS(ctx, masterLogger, zetacoreClient, server)
Expand Down Expand Up @@ -318,15 +326,16 @@ func start(_ *cobra.Command, _ []string) error {
}

// Orchestrator wraps the zetacore client and adds the observers and signer maps to it . This is the high level object used for CCTX interactions
orchestrator := orchestrator.NewOrchestrator(
cctxOrchestrator := orchestrator.NewOrchestrator(
kingpinXD marked this conversation as resolved.
Show resolved Hide resolved
ctx,
zetacoreClient,
signerMap,
observerMap,
masterLogger,
telemetryServer,
)
err = orchestrator.MonitorCore(ctx)

err = cctxOrchestrator.MonitorCore(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("Orchestrator failed to start")
return err
Expand All @@ -346,18 +355,24 @@ func start(_ *cobra.Command, _ []string) error {
// defer zetaSupplyChecker.Stop()
//}

startLogger.Info().Msgf("awaiting the os.Interrupt, syscall.SIGTERM signals...")
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
sig := <-ch
startLogger.Info().Msgf("stop signal received: %s", sig)

// stop chain observers
startLogger.Info().Msgf("awaiting shutdown signals")
select {
case <-notifyCtx.Done():
cause := context.Cause(notifyCtx)
startLogger.Info().Msgf("shutdown signal received , cause : %s", cause)

case sig := <-ch:
startLogger.Info().Msgf("stop signal received: %s", sig)
}

//stop chain observers
for _, observer := range observerMap {
observer.Stop()
}
zetacoreClient.Stop()
kingpinXD marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions contrib/localnet/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ services:
- ETHDEV_ENDPOINT=http://eth:8545
- HOTKEY_BACKEND=file
- HOTKEY_PASSWORD=password # test purposes only
restart: always
volumes:
- ssh:/root/.ssh
- preparams:/root/preparams
Expand All @@ -140,6 +141,7 @@ services:
- ETHDEV_ENDPOINT=http://eth:8545
- HOTKEY_BACKEND=file
- HOTKEY_PASSWORD=password # test purposes only
restart: always
volumes:
- ssh:/root/.ssh
- preparams:/root/preparams
Expand Down
14 changes: 13 additions & 1 deletion pkg/bg/bg.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
type config struct {
name string
logger zerolog.Logger
cancel context.CancelCauseFunc
}

type Opt func(*config)
Expand All @@ -19,6 +20,10 @@ func WithName(name string) Opt {
return func(cfg *config) { cfg.name = name }
}

func WithCancel(cancel context.CancelCauseFunc) Opt {
return func(cfg *config) { cfg.cancel = cancel }
}

func WithLogger(logger zerolog.Logger) Opt {
return func(cfg *config) { cfg.logger = logger }
}
Expand All @@ -28,6 +33,7 @@ func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) {
cfg := config{
name: "",
logger: zerolog.Nop(),
cancel: nil,
}

for _, opt := range opts {
Expand All @@ -42,9 +48,15 @@ func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) {
}
}()

if err := f(ctx); err != nil {
err := f(ctx)
if err != nil {
logError(err, cfg)
}
// Use cancel function if it is provided.
// This is used for stopping the main thread based on the outcome of the background task.
swift1337 marked this conversation as resolved.
Show resolved Hide resolved
if cfg.cancel != nil {
cfg.cancel(fmt.Errorf("cancel function triggered by %s", cfg.name))
}
}()
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ func DefaultBackoff() Backoff {
return backoff.WithMaxRetries(bo, 5)
}

func DefaultConstantBackoff() Backoff {
bo := backoff.NewConstantBackOff(5 * time.Second)
return backoff.WithMaxRetries(bo, 10)
}

// Do executes the callback function with the default backoff config.
// It will retry a callback ONLY if error is retryable.
func Do(cb Callback) error {
Expand Down
4 changes: 4 additions & 0 deletions zetaclient/context/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (a *AppContext) Config() config.Config {
return a.config
}

func (a *AppContext) Logger() zerolog.Logger {
kingpinXD marked this conversation as resolved.
Show resolved Hide resolved
return a.logger
}

// GetBTCChainAndConfig returns btc chain and config if enabled
func (a *AppContext) GetBTCChainAndConfig() (chains.Chain, config.BTCConfig, bool) {
btcConfig, configEnabled := a.Config().GetBTCConfig()
Expand Down
3 changes: 0 additions & 3 deletions zetaclient/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ func (oc *Orchestrator) MonitorCore(ctx context.Context) error {
shutdownOrchestrator := func() {
// now stop orchestrator and all observers
close(oc.stop)
for _, c := range oc.observerMap {
c.Stop()
kingpinXD marked this conversation as resolved.
Show resolved Hide resolved
}
}

oc.zetacoreClient.OnBeforeStop(shutdownOrchestrator)
Expand Down
8 changes: 8 additions & 0 deletions zetaclient/zetacore/client_query_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ func (c *Client) GetBTCTSSAddress(ctx context.Context, chainID int64) (string, e
return resp.Btc, nil
}

func (c *Client) GetTSS(ctx context.Context) (types.TSS, error) {
kingpinXD marked this conversation as resolved.
Show resolved Hide resolved
kingpinXD marked this conversation as resolved.
Show resolved Hide resolved
resp, err := c.client.observer.TSS(ctx, &types.QueryGetTSSRequest{})
if err != nil {
return types.TSS{}, errors.Wrap(err, "failed to get tss")
}
return resp.TSS, nil
}

// GetTSSHistory returns the TSS history
func (c *Client) GetTSSHistory(ctx context.Context) ([]types.TSS, error) {
resp, err := c.client.observer.TssHistory(ctx, &types.QueryTssHistoryRequest{})
Expand Down
Loading
Loading