Skip to content

Commit

Permalink
fix: broken shutdown for services on fraud proofs receival (celestiao…
Browse files Browse the repository at this point in the history
…rg#1220)

Closes celestiaorg#1205

Co-authored-by: Wondertan <[email protected]>
  • Loading branch information
distractedm1nd and Wondertan authored Oct 17, 2022
1 parent 7fcb0f3 commit d239616
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 29 deletions.
6 changes: 2 additions & 4 deletions nodebuilder/daser/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/celestiaorg/celestia-node/das"
"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/libs/fxutil"
fraudServ "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
)
Expand All @@ -19,9 +18,8 @@ func ConstructModule(tp node.Type) fx.Option {
"daser",
fx.Provide(fx.Annotate(
NewDASer,
fx.OnStart(func(ctx context.Context, lc fx.Lifecycle, fservice fraudServ.Module, das *das.DASer) error {
lifecycleCtx := fxutil.WithLifecycle(ctx, lc)
return fraudServ.Lifecycle(ctx, lifecycleCtx, fraud.BadEncoding, fservice,
fx.OnStart(func(startCtx, ctx context.Context, fservice fraudServ.Module, das *das.DASer) error {
return fraudServ.Lifecycle(startCtx, ctx, fraud.BadEncoding, fservice,
das.Start, das.Stop)
}),
fx.OnStop(func(ctx context.Context, das *das.DASer) error {
Expand Down
6 changes: 2 additions & 4 deletions nodebuilder/header/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/celestiaorg/celestia-node/header/p2p"
"github.com/celestiaorg/celestia-node/header/store"
"github.com/celestiaorg/celestia-node/header/sync"
"github.com/celestiaorg/celestia-node/libs/fxutil"
fraudServ "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/params"
Expand Down Expand Up @@ -43,7 +42,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
}),
fx.Provide(fx.Annotate(
newSyncer,
fx.OnStart(func(ctx context.Context, lc fx.Lifecycle, fservice fraudServ.Module, syncer *sync.Syncer) error {
fx.OnStart(func(startCtx, ctx context.Context, fservice fraudServ.Module, syncer *sync.Syncer) error {
syncerStartFunc := func(ctx context.Context) error {
err := syncer.Start(ctx)
switch err {
Expand All @@ -55,8 +54,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
}
return nil
}
lifecycleCtx := fxutil.WithLifecycle(ctx, lc)
return fraudServ.Lifecycle(ctx, lifecycleCtx, fraud.BadEncoding, fservice,
return fraudServ.Lifecycle(startCtx, ctx, fraud.BadEncoding, fservice,
syncerStartFunc, syncer.Stop)
}),
fx.OnStop(func(ctx context.Context, syncer *sync.Syncer) error {
Expand Down
5 changes: 4 additions & 1 deletion nodebuilder/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/libs/fxutil"
"github.com/celestiaorg/celestia-node/nodebuilder/core"
"github.com/celestiaorg/celestia-node/nodebuilder/daser"
"github.com/celestiaorg/celestia-node/nodebuilder/fraud"
Expand All @@ -21,7 +22,9 @@ func ConstructModule(tp node.Type, cfg *Config, store Store) fx.Option {
baseComponents := fx.Options(
fx.Provide(params.DefaultNetwork),
fx.Provide(params.BootstrappersFor),
fx.Provide(context.Background),
fx.Provide(func(lc fx.Lifecycle) context.Context {
return fxutil.WithLifecycle(context.Background(), lc)
}),
fx.Supply(cfg),
fx.Supply(store.Config),
fx.Provide(store.Datastore),
Expand Down
7 changes: 2 additions & 5 deletions nodebuilder/p2p/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/libs/fxutil"
nparams "github.com/celestiaorg/celestia-node/params"
)

Expand All @@ -29,9 +28,8 @@ const (

// DataExchange provides a constructor for IPFS block's DataExchange over BitSwap.
func DataExchange(params bitSwapParams) (exchange.Interface, blockstore.Blockstore, error) {
ctx := fxutil.WithLifecycle(params.Ctx, params.Lc)
bs, err := blockstore.CachedBlockstore(
ctx,
params.Ctx,
blockstore.NewBlockstore(params.Ds),
blockstore.CacheOpts{
HasBloomFilterSize: defaultBloomFilterSize,
Expand All @@ -44,7 +42,7 @@ func DataExchange(params bitSwapParams) (exchange.Interface, blockstore.Blocksto
}
prefix := protocol.ID(fmt.Sprintf("/celestia/%s", params.Net))
return bitswap.New(
ctx,
params.Ctx,
network.NewFromIpfsHost(params.Host, &routinghelpers.Null{}, network.Prefix(prefix)),
bs,
bitswap.ProvideEnabled(false),
Expand All @@ -60,7 +58,6 @@ type bitSwapParams struct {

Ctx context.Context
Net nparams.Network
Lc fx.Lifecycle
Host host.Host
Ds datastore.Batching
}
5 changes: 1 addition & 4 deletions nodebuilder/p2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"go.uber.org/fx"
"golang.org/x/crypto/blake2b"

"github.com/celestiaorg/celestia-node/libs/fxutil"
)

// PubSub provides a constructor for PubSub protocol with GossipSub routing.
Expand All @@ -34,7 +32,7 @@ func PubSub(cfg Config, params pubSubParams) (*pubsub.PubSub, error) {
}

return pubsub.NewGossipSub(
fxutil.WithLifecycle(params.Ctx, params.Lc),
params.Ctx,
params.Host,
opts...,
)
Expand All @@ -49,6 +47,5 @@ type pubSubParams struct {
fx.In

Ctx context.Context
Lc fx.Lifecycle
Host host.Host
}
3 changes: 1 addition & 2 deletions nodebuilder/p2p/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
dht "github.com/libp2p/go-libp2p-kad-dht"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/libs/fxutil"
nparams "github.com/celestiaorg/celestia-node/params"
)

Expand Down Expand Up @@ -42,7 +41,7 @@ func PeerRouting(cfg Config, params routingParams) (routing.PeerRouting, error)
)
}

d, err := dht.New(fxutil.WithLifecycle(params.Ctx, params.Lc), params.Host, opts...)
d, err := dht.New(params.Ctx, params.Host, opts...)
if err != nil {
return nil, err
}
Expand Down
6 changes: 2 additions & 4 deletions nodebuilder/state/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/libs/fxutil"
fraudServ "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/state"
Expand All @@ -27,9 +26,8 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Provide(Keyring),
fx.Provide(fx.Annotate(
CoreAccessor,
fx.OnStart(func(ctx context.Context, lc fx.Lifecycle, fservice fraudServ.Module, ca *state.CoreAccessor) error {
lifecycleCtx := fxutil.WithLifecycle(ctx, lc)
return fraudServ.Lifecycle(ctx, lifecycleCtx, fraud.BadEncoding, fservice, ca.Start, ca.Stop)
fx.OnStart(func(startCtx, ctx context.Context, fservice fraudServ.Module, ca *state.CoreAccessor) error {
return fraudServ.Lifecycle(startCtx, ctx, fraud.BadEncoding, fservice, ca.Start, ca.Stop)
}),
fx.OnStop(func(ctx context.Context, ca *state.CoreAccessor) error {
return ca.Stop(ctx)
Expand Down
11 changes: 7 additions & 4 deletions nodebuilder/tests/fraud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestFraudProofSyncing(t *testing.T) {
store := nodebuilder.MockStore(t, cfg)
bridge := sw.NewNodeWithStore(node.Bridge, store, core.WithHeaderConstructFn(header.FraudMaker(t, 10)))

ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout)
t.Cleanup(cancel)

err := bridge.Start(ctx)
Expand All @@ -123,21 +123,24 @@ func TestFraudProofSyncing(t *testing.T) {
lightCfg := nodebuilder.DefaultConfig(node.Light)
lightCfg.P2P.RoutingTableRefreshPeriod = defaultTimeInterval
lightCfg.Share.DiscoveryInterval = defaultTimeInterval
ln := sw.NewNodeWithStore(node.Light, nodebuilder.MockStore(t, lightCfg),
nodebuilder.WithBootstrappers([]peer.AddrInfo{*addr}))
lightCfg.Header.TrustedPeers = append(lightCfg.Header.TrustedPeers, addrs[0].String())
ln := sw.NewNodeWithStore(node.Light, nodebuilder.MockStore(t, lightCfg))

require.NoError(t, full.Start(ctx))
require.NoError(t, ln.Start(ctx))
subsFn, err := full.FraudServ.Subscribe(fraud.BadEncoding)
require.NoError(t, err)
defer subsFn.Cancel()
_, err = subsFn.Proof(ctx)
require.NoError(t, err)

require.NoError(t, ln.Start(ctx))
// internal subscription for the fraud proof is done in order to ensure that light node
// receives the BEFP.
subsLn, err := ln.FraudServ.Subscribe(fraud.BadEncoding)
require.NoError(t, err)

err = ln.Host.Connect(ctx, *host.InfoFromHost(full.Host))
require.NoError(t, err)
_, err = subsLn.Proof(ctx)
require.NoError(t, err)
subsLn.Cancel()
Expand Down
3 changes: 2 additions & 1 deletion state/core_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func (ca *CoreAccessor) Start(ctx context.Context) error {
func (ca *CoreAccessor) Stop(context.Context) error {
defer ca.cancel()
if ca.coreConn == nil {
return fmt.Errorf("core-access: no connection found to close")
log.Warn("no connection found to close")
return nil
}
// close out core connection
err := ca.coreConn.Close()
Expand Down

0 comments on commit d239616

Please sign in to comment.