Skip to content

Commit

Permalink
bootstrapper: wipe disk and reboot on non-recoverable error (#2971)
Browse files Browse the repository at this point in the history
* Let JoinClient return fatal errors
* Mark disk for wiping if JoinClient or InitServer return errors
* Reboot system if bootstrapper detects an error
* Refactor joinClient start/stop implementation
* Fix joining nodes retrying kubeadm 3 times in all cases
* Write non-recoverable failures to syslog before rebooting

---------

Signed-off-by: Daniel Weiße <[email protected]>
  • Loading branch information
daniel-weisse authored Mar 12, 2024
1 parent 1b973bf commit 1077b7a
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 221 deletions.
77 changes: 63 additions & 14 deletions bootstrapper/cmd/bootstrapper/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
"context"
"fmt"
"log/slog"
"log/syslog"
"net"
"os"
"sync"
"syscall"
"time"

"github.com/edgelesssys/constellation/v2/bootstrapper/internal/clean"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/diskencryption"
Expand All @@ -32,7 +35,8 @@ func run(issuer atls.Issuer, openDevice vtpm.TPMOpenFunc, fileHandler file.Handl
) {
log.With(slog.String("version", constants.BinaryVersion().String())).Info("Starting bootstrapper")

uuid, err := getDiskUUID()
disk := diskencryption.New()
uuid, err := getDiskUUID(disk)
if err != nil {
log.With(slog.Any("error", err)).Error("Failed to get disk UUID")
} else {
Expand All @@ -42,43 +46,58 @@ func run(issuer atls.Issuer, openDevice vtpm.TPMOpenFunc, fileHandler file.Handl
nodeBootstrapped, err := initialize.IsNodeBootstrapped(openDevice)
if err != nil {
log.With(slog.Any("error", err)).Error("Failed to check if node was previously bootstrapped")
os.Exit(1)
reboot(fmt.Errorf("checking if node was previously bootstrapped: %w", err))
}

if nodeBootstrapped {
if err := kube.StartKubelet(); err != nil {
log.With(slog.Any("error", err)).Error("Failed to restart kubelet")
os.Exit(1)
reboot(fmt.Errorf("restarting kubelet: %w", err))
}
return
}

nodeLock := nodelock.New(openDevice)
initServer, err := initserver.New(context.Background(), nodeLock, kube, issuer, fileHandler, metadata, log)
initServer, err := initserver.New(context.Background(), nodeLock, kube, issuer, disk, fileHandler, metadata, log)
if err != nil {
log.With(slog.Any("error", err)).Error("Failed to create init server")
os.Exit(1)
reboot(fmt.Errorf("creating init server: %w", err))
}

dialer := dialer.New(issuer, nil, &net.Dialer{})
joinClient := joinclient.New(nodeLock, dialer, kube, metadata, log)
joinClient := joinclient.New(nodeLock, dialer, kube, metadata, disk, log)

cleaner := clean.New().With(initServer).With(joinClient)
go cleaner.Start()
defer cleaner.Done()

joinClient.Start(cleaner)
var wg sync.WaitGroup

if err := initServer.Serve(bindIP, bindPort, cleaner); err != nil {
log.With(slog.Any("error", err)).Error("Failed to serve init server")
os.Exit(1)
}
wg.Add(1)
go func() {
defer wg.Done()
if err := joinClient.Start(cleaner); err != nil {
log.With(slog.Any("error", err)).Error("Failed to join cluster")
markDiskForReset(disk)
reboot(fmt.Errorf("joining cluster: %w", err))
}
}()

wg.Add(1)
go func() {
defer wg.Done()
if err := initServer.Serve(bindIP, bindPort, cleaner); err != nil {
log.With(slog.Any("error", err)).Error("Failed to serve init server")
markDiskForReset(disk)
reboot(fmt.Errorf("serving init server: %w", err))
}
}()
wg.Wait()

log.Info("bootstrapper done")
}

func getDiskUUID() (string, error) {
disk := diskencryption.New()
func getDiskUUID(disk *diskencryption.DiskEncryption) (string, error) {
free, err := disk.Open()
if err != nil {
return "", err
Expand All @@ -87,6 +106,36 @@ func getDiskUUID() (string, error) {
return disk.UUID()
}

// markDiskForReset sets a token in the cryptsetup header of the disk to indicate the disk should be reset on next boot.
// This is used to reset all state of a node in case the bootstrapper encountered a non recoverable error
// after the node successfully retrieved a join ticket from the JoinService.
// As setting this token is safe as long as we are certain we don't need the data on the disk anymore, we call this
// unconditionally when either the JoinClient or the InitServer encounter an error.
// We don't call it before that, as the node may be restarting after a previous, successful bootstrapping,
// and now encountered a transient error on rejoining the cluster. Wiping the disk now would delete existing data.
func markDiskForReset(disk *diskencryption.DiskEncryption) {
free, err := disk.Open()
if err != nil {
return
}
defer free()
_ = disk.MarkDiskForReset()
}

// reboot writes an error message to the system log and reboots the system.
// We call this instead of os.Exit() since failures in the bootstrapper usually require a node reset.
func reboot(e error) {
syslogWriter, err := syslog.New(syslog.LOG_EMERG|syslog.LOG_KERN, "bootstrapper")
if err != nil {
_ = syscall.Reboot(syscall.LINUX_REBOOT_CMD_RESTART)
}
_ = syslogWriter.Err(e.Error())
_ = syslogWriter.Emerg("bootstrapper has encountered a non recoverable error. Rebooting...")
time.Sleep(time.Minute) // sleep to allow the message to be written to syslog and seen by the user

_ = syscall.Reboot(syscall.LINUX_REBOOT_CMD_RESTART)
}

type clusterInitJoiner interface {
joinclient.ClusterJoiner
initserver.ClusterInitializer
Expand Down
5 changes: 5 additions & 0 deletions bootstrapper/internal/diskencryption/diskencryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (c *DiskEncryption) UpdatePassphrase(passphrase string) error {
return c.device.SetConstellationStateDiskToken(cryptsetup.SetDiskInitialized)
}

// MarkDiskForReset marks the state disk as not initialized so it may be wiped (reset) on reboot.
func (c *DiskEncryption) MarkDiskForReset() error {
return c.device.SetConstellationStateDiskToken(cryptsetup.SetDiskNotInitialized)
}

// getInitialPassphrase retrieves the initial passphrase used on first boot.
func (c *DiskEncryption) getInitialPassphrase() (string, error) {
passphrase, err := afero.ReadFile(c.fs, initialKeyPath)
Expand Down
1 change: 0 additions & 1 deletion bootstrapper/internal/initserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ go_library(
visibility = ["//bootstrapper:__subpackages__"],
deps = [
"//bootstrapper/initproto",
"//bootstrapper/internal/diskencryption",
"//bootstrapper/internal/journald",
"//internal/atls",
"//internal/attestation",
Expand Down
25 changes: 20 additions & 5 deletions bootstrapper/internal/initserver/initserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"time"

"github.com/edgelesssys/constellation/v2/bootstrapper/initproto"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/diskencryption"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/journald"
"github.com/edgelesssys/constellation/v2/internal/atls"
"github.com/edgelesssys/constellation/v2/internal/attestation"
Expand Down Expand Up @@ -65,6 +64,7 @@ type Server struct {
shutdownLock sync.RWMutex

initSecretHash []byte
initFailure error

kmsURI string

Expand All @@ -76,7 +76,10 @@ type Server struct {
}

// New creates a new initialization server.
func New(ctx context.Context, lock locker, kube ClusterInitializer, issuer atls.Issuer, fh file.Handler, metadata MetadataAPI, log *slog.Logger) (*Server, error) {
func New(
ctx context.Context, lock locker, kube ClusterInitializer, issuer atls.Issuer,
disk encryptedDisk, fh file.Handler, metadata MetadataAPI, log *slog.Logger,
) (*Server, error) {
log = log.WithGroup("initServer")

initSecretHash, err := metadata.InitSecretHash(ctx)
Expand All @@ -94,7 +97,7 @@ func New(ctx context.Context, lock locker, kube ClusterInitializer, issuer atls.

server := &Server{
nodeLock: lock,
disk: diskencryption.New(),
disk: disk,
initializer: kube,
fileHandler: fh,
issuer: issuer,
Expand Down Expand Up @@ -123,11 +126,20 @@ func (s *Server) Serve(ip, port string, cleaner cleaner) error {
}

s.log.Info("Starting")
return s.grpcServer.Serve(lis)
err = s.grpcServer.Serve(lis)

// If Init failed, we mark the disk for reset, so the node can restart the process
// In this case we don't care about any potential errors from the grpc server
if s.initFailure != nil {
s.log.Error("Fatal error during Init request", "error", s.initFailure)
return err
}

return err
}

// Init initializes the cluster.
func (s *Server) Init(req *initproto.InitRequest, stream initproto.API_InitServer) (err error) {
func (s *Server) Init(req *initproto.InitRequest, stream initproto.API_InitServer) (retErr error) {
// Acquire lock to prevent shutdown while Init is still running
s.shutdownLock.RLock()
defer s.shutdownLock.RUnlock()
Expand Down Expand Up @@ -188,6 +200,9 @@ func (s *Server) Init(req *initproto.InitRequest, stream initproto.API_InitServe
// since we are bootstrapping a new one.
// Any errors following this call will result in a failed node that may not join any cluster.
s.cleaner.Clean()
defer func() {
s.initFailure = retErr
}()

if err := s.setupDisk(stream.Context(), cloudKms); err != nil {
if e := s.sendLogsWithMessage(stream, status.Errorf(codes.Internal, "setting up disk: %s", err)); e != nil {
Expand Down
13 changes: 12 additions & 1 deletion bootstrapper/internal/initserver/initserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ func TestNew(t *testing.T) {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)

server, err := New(context.TODO(), newFakeLock(), &stubClusterInitializer{}, atls.NewFakeIssuer(variant.Dummy{}), fh, &tc.metadata, logger.NewTest(t))
server, err := New(
context.TODO(), newFakeLock(), &stubClusterInitializer{}, atls.NewFakeIssuer(variant.Dummy{}),
&stubDisk{}, fh, &tc.metadata, logger.NewTest(t),
)
if tc.wantErr {
assert.Error(err)
return
Expand Down Expand Up @@ -381,6 +384,10 @@ func (d *fakeDisk) UpdatePassphrase(passphrase string) error {
return nil
}

func (d *fakeDisk) MarkDiskForReset() error {
return nil
}

type stubDisk struct {
openErr error
uuid string
Expand All @@ -402,6 +409,10 @@ func (d *stubDisk) UpdatePassphrase(string) error {
return d.updatePassphraseErr
}

func (d *stubDisk) MarkDiskForReset() error {
return nil
}

type stubClusterInitializer struct {
initClusterKubeconfig []byte
initClusterErr error
Expand Down
1 change: 0 additions & 1 deletion bootstrapper/internal/joinclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ go_library(
visibility = ["//bootstrapper:__subpackages__"],
deps = [
"//bootstrapper/internal/certificate",
"//bootstrapper/internal/diskencryption",
"//internal/attestation",
"//internal/cloud/metadata",
"//internal/constants",
Expand Down
Loading

0 comments on commit 1077b7a

Please sign in to comment.