Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: node can't quit by signal #543

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (rpc) [#534](https://github.com/crypto-org-chain/ethermint/pull/534), [#540](https://github.com/crypto-org-chain/ethermint/pull/540) Fix opBlockhash when no block header in abci request.
* (rpc) [#536](https://github.com/crypto-org-chain/ethermint/pull/536) Fix validate basic after transaction conversion with raw field.
* (cli) [#537](https://github.com/crypto-org-chain/ethermint/pull/537) Fix unsuppored sign mode SIGN_MODE_TEXTUAL for bank transfer.
* (cli) [#]() Fix graceful shutdown.
yihuang marked this conversation as resolved.
Show resolved Hide resolved

### Improvements

Expand Down
34 changes: 26 additions & 8 deletions server/json_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package server

import (
"context"
"fmt"
"net/http"
"time"
Expand Down Expand Up @@ -47,18 +48,20 @@ type AppWithPendingTxStream interface {
}

// StartJSONRPC starts the JSON-RPC server
func StartJSONRPC(srvCtx *server.Context,
func StartJSONRPC(
ctx context.Context,
srvCtx *server.Context,
clientCtx client.Context,
g *errgroup.Group,
config *config.Config,
indexer ethermint.EVMTxIndexer,
app AppWithPendingTxStream,
) (*http.Server, chan struct{}, error) {
) (*http.Server, error) {
logger := srvCtx.Logger.With("module", "geth")

evtClient, ok := clientCtx.Client.(rpcclient.EventsClient)
if !ok {
return nil, nil, fmt.Errorf("client %T does not implement EventsClient", clientCtx.Client)
return nil, fmt.Errorf("client %T does not implement EventsClient", clientCtx.Client)
}

var rpcStream *stream.RPCStream
Expand All @@ -73,7 +76,7 @@ func StartJSONRPC(srvCtx *server.Context,
}

if err != nil {
return nil, nil, fmt.Errorf("failed to create rpc streams after %d attempts: %w", MaxRetry, err)
return nil, fmt.Errorf("failed to create rpc streams after %d attempts: %w", MaxRetry, err)
}

app.RegisterPendingTxListener(rpcStream.ListenPendingTx)
Expand Down Expand Up @@ -104,7 +107,7 @@ func StartJSONRPC(srvCtx *server.Context,
"namespace", api.Namespace,
"service", api.Service,
)
return nil, nil, err
return nil, err
}
}

Expand All @@ -128,12 +131,27 @@ func StartJSONRPC(srvCtx *server.Context,

ln, err := Listen(httpSrv.Addr, config)
if err != nil {
return nil, nil, err
return nil, err
}

g.Go(func() error {
srvCtx.Logger.Info("Starting JSON-RPC server", "address", config.JSONRPC.Address)
if err := httpSrv.Serve(ln); err != nil {
errCh := make(chan error)
go func() {
errCh <- httpSrv.Serve(ln)
}()

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
select {
case <-ctx.Done():
mmsqe marked this conversation as resolved.
Show resolved Hide resolved
// The calling process canceled or closed the provided context, so we must
// gracefully stop the gRPC server.
logger.Info("stopping JSON-RPC server...", "address", config.JSONRPC.Address)
shutdownCtx, _ := context.WithTimeout(context.Background(), 10*time.Second)
httpSrv.Shutdown(shutdownCtx)

case err := <-errCh:
if err == http.ErrServerClosed {
close(httpSrvDone)
}
Expand All @@ -148,5 +166,5 @@ func StartJSONRPC(srvCtx *server.Context,

wsSrv := rpc.NewWebsocketsServer(clientCtx, srvCtx.Logger, rpcStream, config)
wsSrv.Start()
return httpSrv, httpSrvDone, nil
return httpSrv, nil
}
50 changes: 12 additions & 38 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"fmt"
"io"
"net"
"net/http"
"os"
"path/filepath"
"runtime/pprof"
"time"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
Expand Down Expand Up @@ -419,10 +417,7 @@ func startInProcess(svrCtx *server.Context, clientCtx client.Context, opts Start
idxer = indexer.NewKVIndexer(idxDB, idxLogger, clientCtx)
indexerService := NewEVMIndexerService(idxer, clientCtx.Client.(rpcclient.Client), config.JSONRPC.AllowIndexerGap)
indexerService.SetLogger(servercmtlog.CometLoggerWrapper{Logger: idxLogger})

g.Go(func() error {
return indexerService.Start()
})
go indexerService.Start()
}

if config.API.Enable || config.JSONRPC.Enable {
Expand All @@ -443,30 +438,12 @@ func startInProcess(svrCtx *server.Context, clientCtx client.Context, opts Start
if err != nil {
return err
}
if grpcSrv != nil {
defer grpcSrv.GracefulStop()
}

apiSrv := startAPIServer(ctx, svrCtx, clientCtx, g, config.Config, app, grpcSrv, metrics)
if apiSrv != nil {
defer apiSrv.Close()
}
startAPIServer(ctx, svrCtx, clientCtx, g, config.Config, app, grpcSrv, metrics)

clientCtx, httpSrv, httpSrvDone, err := startJSONRPCServer(svrCtx, clientCtx, g, config, genDocProvider, idxer, app)
if httpSrv != nil {
defer func() {
shutdownCtx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()
if err := httpSrv.Shutdown(shutdownCtx); err != nil {
logger.Error("HTTP server shutdown produced a warning", "error", err.Error())
} else {
logger.Info("HTTP server shut down, waiting 5 sec")
select {
case <-time.Tick(5 * time.Second):
case <-httpSrvDone:
}
}
}()
clientCtx, err = startJSONRPCServer(ctx, svrCtx, clientCtx, g, config, genDocProvider, idxer, app)
if err != nil {
return err
}

// At this point it is safe to block the process if we're in query only mode as
Expand Down Expand Up @@ -619,9 +596,9 @@ func startAPIServer(
app types.Application,
grpcSrv *grpc.Server,
metrics *telemetry.Metrics,
) *api.Server {
) {
if !svrCfg.API.Enable {
return nil
return
}

apiSrv := api.New(clientCtx, svrCtx.Logger.With("server", "api"), grpcSrv)
Expand All @@ -634,38 +611,35 @@ func startAPIServer(
g.Go(func() error {
return apiSrv.Start(ctx, svrCfg)
})
return apiSrv
}

func startJSONRPCServer(
stdCtx context.Context,
svrCtx *server.Context,
clientCtx client.Context,
g *errgroup.Group,
config config.Config,
genDocProvider node.GenesisDocProvider,
idxer ethermint.EVMTxIndexer,
app types.Application,
) (ctx client.Context, httpSrv *http.Server, httpSrvDone chan struct{}, err error) {
) (ctx client.Context, err error) {
ctx = clientCtx
if !config.JSONRPC.Enable {
return
}

txApp, ok := app.(AppWithPendingTxStream)
if !ok {
return ctx, httpSrv, httpSrvDone, fmt.Errorf("json-rpc server requires AppWithPendingTxStream")
return ctx, fmt.Errorf("json-rpc server requires AppWithPendingTxStream")
}

genDoc, err := genDocProvider()
if err != nil {
return ctx, httpSrv, httpSrvDone, err
return ctx, err
}

ctx = clientCtx.WithChainID(genDoc.ChainID)
g.Go(func() error {
httpSrv, httpSrvDone, err = StartJSONRPC(svrCtx, clientCtx, g, &config, idxer, txApp)
return err
})
_, err = StartJSONRPC(stdCtx, svrCtx, clientCtx, g, &config, idxer, txApp)
return
}

Expand Down
21 changes: 7 additions & 14 deletions testutil/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,13 @@ type (
RPCClient tmrpcclient.Client
JSONRPCClient *ethclient.Client

tmNode *node.Node
api *api.Server
grpc *grpc.Server
grpcWeb *http.Server
jsonrpc *http.Server
jsonrpcDone chan struct{}
errGroup *errgroup.Group
cancelFn context.CancelFunc
tmNode *node.Node
api *api.Server
grpc *grpc.Server
grpcWeb *http.Server
jsonrpc *http.Server
errGroup *errgroup.Group
cancelFn context.CancelFunc
}
)

Expand Down Expand Up @@ -653,12 +652,6 @@ func (n *Network) Cleanup() {

if err := v.jsonrpc.Shutdown(shutdownCtx); err != nil {
v.tmNode.Logger.Error("HTTP server shutdown produced a warning", "error", err.Error())
} else {
v.tmNode.Logger.Info("HTTP server shut down, waiting 5 sec")
select {
case <-time.Tick(5 * time.Second):
case <-v.jsonrpcDone:
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions testutil/network/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func startInProcess(cfg Config, val *Validator) error {
return fmt.Errorf("validator %s context is nil", val.Moniker)
}

val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(
val.Ctx, val.ClientCtx, val.errGroup, val.AppConfig,
val.jsonrpc, err = server.StartJSONRPC(
ctx, val.Ctx, val.ClientCtx, val.errGroup, val.AppConfig,
nil, app.(server.AppWithPendingTxStream),
)
if err != nil {
Expand Down
Loading