Skip to content

Commit

Permalink
Move initialization to supervisor (#5187)
Browse files Browse the repository at this point in the history
## Motivation
Part of #5149  merge after #5186 & #5189

This moves the initialization code into the PoST supervisor. The ATX builder should not concern itself with initialization:
- A remote PoST service that connects is expected to already be fully initialized 
- In supervised mode the supervisor takes care of initialization

This allows to simplify the ATX builder, which has partially already been done in this PR and will continue in future PRs.

## Changes
- Removed dependency on `postSetupManager` from `atxBuilder`
- Simplified `atxBuilder`
  - Initial Proofs are not verified any more (PoST service is trusted)
  - Information about the PoST is fetched from the PoST service rather than accessed via the `postSetupManager`
- Simplified `postSetupManager`
  - Since meta information is now fetched via the client everything besides `StartSession`, `StopSession` and `Reset` has been removed from the interface
  - `postSetupManager` isn't used by the gRPC `SmeshingService` any more, instead it's now used by `PostSupervisor` to init a supervised node before starting it

For next PR: 
- change `atxBuilder` from `Start`/`Stop` to `Run`
  - a post service connecting triggers the builder loop (one per connected service)
  - simplify `atxBuilder` further and consider merging `nipostBuilder` into it.

## Test Plan
- added new tests to PoST supervisor to cover new functionality
- updated existing tests

## TODO
<!-- This section should be removed when all items are complete -->
- [x] Explain motivation or link existing issue(s)
- [x] Test changes and document test plan
- [x] Update documentation as needed
- [x] Update [changelog](../CHANGELOG.md) as needed
  • Loading branch information
fasmat committed Oct 25, 2023
1 parent 512d2d2 commit ba3b4b5
Show file tree
Hide file tree
Showing 26 changed files with 835 additions and 1,012 deletions.
201 changes: 82 additions & 119 deletions activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,19 @@ type Builder struct {

eg errgroup.Group

signer *signing.EdSigner
accountLock sync.RWMutex
nodeID types.NodeID
coinbaseAccount types.Address
goldenATXID types.ATXID
layersPerEpoch uint32
regossipInterval time.Duration
cdb *datastore.CachedDB
publisher pubsub.Publisher
postService postService
nipostBuilder nipostBuilder
postSetupProvider postSetupProvider
initialPost *types.Post
validator nipostValidator
signer *signing.EdSigner
accountLock sync.RWMutex
nodeID types.NodeID
coinbaseAccount types.Address
goldenATXID types.ATXID
regossipInterval time.Duration
cdb *datastore.CachedDB
publisher pubsub.Publisher
postService postService
nipostBuilder nipostBuilder
initialPost *types.Post
initialPostInfo *types.PostInfo
validator nipostValidator

// smeshingMutex protects `StartSmeshing` and `StopSmeshing` from concurrent access
smeshingMutex sync.Mutex
Expand Down Expand Up @@ -153,7 +152,6 @@ func NewBuilder(
publisher pubsub.Publisher,
postService postService,
nipostBuilder nipostBuilder,
postSetupProvider postSetupProvider,
layerClock layerClock,
syncer syncer,
log log.Log,
Expand All @@ -165,13 +163,11 @@ func NewBuilder(
nodeID: nodeID,
coinbaseAccount: conf.CoinbaseAccount,
goldenATXID: conf.GoldenATXID,
layersPerEpoch: conf.LayersPerEpoch,
regossipInterval: conf.RegossipInterval,
cdb: cdb,
publisher: publisher,
postService: postService,
nipostBuilder: nipostBuilder,
postSetupProvider: postSetupProvider,
layerClock: layerClock,
syncer: syncer,
log: log,
Expand All @@ -184,13 +180,25 @@ func NewBuilder(
return b
}

func (b *Builder) proof(ctx context.Context, challenge []byte) (*types.Post, *types.PostMetadata, error) {
client, err := b.postService.Client(b.nodeID)
if err != nil {
return nil, nil, err
func (b *Builder) proof(ctx context.Context, challenge []byte) (*types.Post, *types.PostInfo, error) {
for {
client, err := b.postService.Client(b.nodeID)
if err == nil {
events.EmitPostStart(challenge)
post, postInfo, err := client.Proof(ctx, challenge)
if err != nil {
events.EmitPostFailure()
return nil, nil, err
}
events.EmitPostComplete(challenge)
return post, postInfo, err
}
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-time.After(2 * time.Second):
}
}

return client.Proof(ctx, challenge)
}

// Smeshing returns true iff atx builder is smeshing.
Expand All @@ -204,7 +212,7 @@ func (b *Builder) Smeshing() bool {
// or missing, data creation session will be preceded. Changing of the post
// options (e.g., number of labels), after initial setup, is supported. If data
// creation fails for any reason then the go-routine will panic.
func (b *Builder) StartSmeshing(coinbase types.Address, opts PostSetupOpts) error {
func (b *Builder) StartSmeshing(coinbase types.Address) error {
b.smeshingMutex.Lock()
defer b.smeshingMutex.Unlock()

Expand All @@ -216,32 +224,8 @@ func (b *Builder) StartSmeshing(coinbase types.Address, opts PostSetupOpts) erro
ctx, stop := context.WithCancel(b.parentCtx)
b.stop = stop

err := b.postSetupProvider.PrepareInitializer(b.parentCtx, opts)
if err != nil {
return fmt.Errorf("failed to prepare post initializer: %w", err)
}

b.eg.Go(func() error {
defer b.started.Store(false)

select {
case <-ctx.Done():
return nil
case <-b.syncer.RegisterForATXSynced():
// ensure we are ATX synced before starting the PoST Session
}

// If start session returns any error other than context.Canceled
// (which is how we signal it to stop) then we panic.
err := b.postSetupProvider.StartSession(ctx)
switch {
case errors.Is(err, context.Canceled):
return nil
case err != nil:
b.log.Panic("initialization failed: %v", err)
return err
}

b.run(ctx)
return nil
})
Expand All @@ -255,7 +239,7 @@ func (b *Builder) StartSmeshing(coinbase types.Address, opts PostSetupOpts) erro
return ctx.Err()
case <-ticker.C:
if err := b.Regossip(ctx); err != nil {
b.log.With().Warning("failed to regossip", log.Context(ctx), log.Err(err))
b.log.With().Warning("failed to re-gossip", log.Context(ctx), log.Err(err))
}
}
}
Expand All @@ -282,10 +266,6 @@ func (b *Builder) StopSmeshing(deleteFiles bool) error {
return nil
}

if err := b.postSetupProvider.Reset(); err != nil {
b.log.With().Error("failed to delete post files", log.Err(err))
return err
}
if err := discardBuilderState(b.nipostBuilder.DataDir()); err != nil && !errors.Is(err, fs.ErrNotExist) {
b.log.With().Error("failed to delete builder state", log.Err(err))
return err
Expand All @@ -301,7 +281,7 @@ func (b *Builder) StopSmeshing(deleteFiles bool) error {

return nil
default:
return fmt.Errorf("failed to stop post data creation session: %w", err)
return fmt.Errorf("failed to stop smeshing: %w", err)
}
}

Expand All @@ -310,21 +290,6 @@ func (b *Builder) SmesherID() types.NodeID {
return b.nodeID
}

func (b *Builder) run(ctx context.Context) {
err := b.generateInitialPost(ctx)
if err != nil {
b.log.Error("Failed to generate proof: %s", err)
return
}

select {
case <-ctx.Done():
return
case <-b.layerClock.AwaitLayer(types.LayerID(0)):
}
b.loop(ctx)
}

func (b *Builder) generateInitialPost(ctx context.Context) error {
// Generate the initial POST if we don't have an ATX...
if _, err := b.cdb.GetLastAtx(b.nodeID); err == nil {
Expand All @@ -333,71 +298,50 @@ func (b *Builder) generateInitialPost(ctx context.Context) error {
// ...and if we don't have an initial POST persisted already.
if post, err := loadPost(b.nipostBuilder.DataDir()); err == nil {
b.log.Info("loaded the initial post from disk")
return b.verifyInitialPost(ctx, post, &types.PostMetadata{
Challenge: shared.ZeroChallenge,
LabelsPerUnit: b.postSetupProvider.Config().LabelsPerUnit,
})
b.initialPost = post
// TODO(mafa): initial post info?
return nil
}

// Create the initial post and save it.
startTime := time.Now()
var err error
events.EmitPostStart(shared.ZeroChallenge)
post, metadata, err := b.proof(ctx, shared.ZeroChallenge)
post, postInfo, err := b.proof(ctx, shared.ZeroChallenge)
if err != nil {
events.EmitPostFailure()
return fmt.Errorf("post execution: %w", err)
}
events.EmitPostComplete(shared.ZeroChallenge)
b.initialPost = post
b.initialPostInfo = postInfo
metrics.PostDuration.Set(float64(time.Since(startTime).Nanoseconds()))
public.PostSeconds.Set(float64(time.Since(startTime)))
b.log.Info("created the initial post")
if b.verifyInitialPost(ctx, post, metadata) != nil {
return err
}

if err := savePost(b.nipostBuilder.DataDir(), post); err != nil {
b.log.With().Warning("failed to save initial post: %w", log.Err(err))
}
return nil
}

func (b *Builder) verifyInitialPost(ctx context.Context, post *types.Post, metadata *types.PostMetadata) error {
b.log.With().Info("verifying the initial post", log.Object("post", post), log.Object("metadata", metadata))
commitmentAtxId, err := b.postSetupProvider.CommitmentAtx()
if err != nil {
b.log.With().Panic("failed to fetch commitment ATX ID.", log.Err(err))
}
err = b.validator.Post(
ctx,
b.nodeID,
commitmentAtxId,
post,
metadata,
b.postSetupProvider.LastOpts().NumUnits,
)
switch {
case errors.Is(err, context.Canceled):
// If the context was canceled, we don't want to emit or log errors just propagate the cancellation signal.
return err
case err != nil:
events.EmitInvalidPostProof()
b.log.With().Fatal("initial POST proof is invalid. Probably the initialized POST data is corrupted. Please verify the data with postcli and regenerate the corrupted files.", log.Err(err))
return err
default:
b.initialPost = post
return nil
}
}

func (b *Builder) receivePendingPoetClients() *[]poetClient {
return b.pendingPoetClients.Swap(nil)
}

// loop is the main loop that tries to create an atx per tick received from the global clock.
func (b *Builder) loop(ctx context.Context) {
func (b *Builder) run(ctx context.Context) {
defer b.log.Info("atx builder stopped")

for {
err := b.generateInitialPost(ctx)
if err == nil {
break
}
b.log.Error("Failed to generate initial proof: %s", err)
currentLayer := b.layerClock.CurrentLayer()
select {
case <-ctx.Done():
return
case <-b.layerClock.AwaitLayer(currentLayer.Add(1)):
}
}

for {
if poetClients := b.receivePendingPoetClients(); poetClients != nil {
b.nipostBuilder.UpdatePoETProvers(*poetClients)
Expand Down Expand Up @@ -498,11 +442,24 @@ func (b *Builder) buildNIPostChallenge(ctx context.Context) (*types.NIPostChalle
}

if prevAtx, err := b.cdb.GetLastAtx(b.nodeID); err != nil {
commitmentAtx, err := b.postSetupProvider.CommitmentAtx()
client, err := b.postService.Client(b.nodeID)
if err != nil {
return nil, fmt.Errorf("failed to get commitment ATX: %w", err)
return nil, fmt.Errorf("failed to fetch commitment ATX: %w", err)
}
if b.initialPostInfo == nil {
// This is a temporary workaround for the case where an initial post has been generated,
// persisted to and loaded from disk. In this case we don't have a post info object
// and need to fetch it from the post service
//
// In a future PR all data that is persisted to disk will instead be persisted to db and
// the initial post data will be extended with post info to not require this any more
info, err := client.Info(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get commitment ATX: %w", err)
}
b.initialPostInfo = info
}
challenge.CommitmentATX = &commitmentAtx
challenge.CommitmentATX = &b.initialPostInfo.CommitmentATX
challenge.InitialPost = b.initialPost
} else {
challenge.PrevATXID = prevAtx.ID
Expand Down Expand Up @@ -675,21 +632,27 @@ func (b *Builder) createAtx(ctx context.Context, challenge *types.NIPostChalleng
case <-b.syncer.RegisterForATXSynced():
}

client, err := b.postService.Client(b.nodeID)
if err != nil {
return nil, fmt.Errorf("get post client: %w", err)
}
info, err := client.Info(ctx)
if err != nil {
return nil, fmt.Errorf("get post client info: %w", err)
}

var nonce *types.VRFPostIndex
var nodeID *types.NodeID
if challenge.PrevATXID == types.EmptyATXID {
nodeID = &b.nodeID
nonce, err = b.postSetupProvider.VRFNonce()
if err != nil {
return nil, fmt.Errorf("build atx: %w", err)
}
nonce = info.Nonce
}

atx := types.NewActivationTx(
*challenge,
b.Coinbase(),
nipost,
b.postSetupProvider.LastOpts().NumUnits,
info.NumUnits,
nonce,
)
atx.InnerActivationTx.NodeID = nodeID
Expand Down
Loading

0 comments on commit ba3b4b5

Please sign in to comment.