Skip to content

Commit

Permalink
feat: Remove DumpFile operations
Browse files Browse the repository at this point in the history
  • Loading branch information
fappy1234567 committed Oct 11, 2024
1 parent 8fa319b commit d6c16a2
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 64 deletions.
32 changes: 21 additions & 11 deletions config/daemonconfig/daemonconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"reflect"
"strings"
"sync"

"github.com/pkg/errors"

Expand All @@ -36,7 +37,7 @@ type DaemonConfig interface {
StorageBackend() (StorageBackendType, *BackendConfig)
UpdateMirrors(mirrorsConfigDir, registryHost string) error
DumpString() (string, error)
DumpFile(path string) error
// DumpFile(path string) error
}

// Daemon configurations factory
Expand Down Expand Up @@ -122,9 +123,6 @@ type DeviceConfig struct {
} `json:"cache"`
}

// For nydusd as FUSE daemon. Serialize Daemon info and persist to a json file
// We don't have to persist configuration file for fscache since its configuration
// is passed through HTTP API.
func DumpConfigFile(c interface{}, path string) error {
if config.IsBackendSourceEnabled() {
c = serializeWithSecretFilter(c)
Expand All @@ -137,26 +135,38 @@ func DumpConfigFile(c interface{}, path string) error {
return os.WriteFile(path, b, 0600)
}

var configRWMutex sync.RWMutex

type SupplementInfoInterface interface {
GetImageID() string
GetSnapshotID() string
IsVPCRegistry() bool
GetLabels() map[string]string
GetParams() map[string]string
}

func DumpConfigString(c interface{}) (string, error) {
b, err := json.Marshal(c)
return string(b), err
}

// Achieve a daemon configuration from template or snapshotter's configuration
func SupplementDaemonConfig(c DaemonConfig, imageID, snapshotID string,
vpcRegistry bool, labels map[string]string, params map[string]string) error {
func SupplementDaemonConfig(c DaemonConfig, info SupplementInfoInterface) error {

configRWMutex.Lock()
defer configRWMutex.Unlock()

image, err := registry.ParseImage(imageID)
image, err := registry.ParseImage(info.GetImageID())
if err != nil {
return errors.Wrapf(err, "parse image %s", imageID)
return errors.Wrapf(err, "parse image %s", info.GetImageID())
}

backendType, _ := c.StorageBackend()

switch backendType {
case backendTypeRegistry:
registryHost := image.Host
if vpcRegistry {
if info.IsVPCRegistry() {
registryHost = registry.ConvertToVPCHost(registryHost)
} else if registryHost == "docker.io" {
// For docker.io images, we should use index.docker.io
Expand All @@ -170,8 +180,8 @@ func SupplementDaemonConfig(c DaemonConfig, imageID, snapshotID string,
// If no auth is provided, don't touch auth from provided nydusd configuration file.
// We don't validate the original nydusd auth from configuration file since it can be empty
// when repository is public.
keyChain := auth.GetRegistryKeyChain(registryHost, imageID, labels)
c.Supplement(registryHost, image.Repo, snapshotID, params)
keyChain := auth.GetRegistryKeyChain(registryHost, info.GetImageID(), info.GetLabels())
c.Supplement(registryHost, image.Repo, info.GetSnapshotID(), info.GetParams())
c.FillAuth(keyChain)

// Localfs and OSS backends don't need any update,
Expand Down
2 changes: 2 additions & 0 deletions config/daemonconfig/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func (c *FuseDaemonConfig) StorageBackend() (string, *BackendConfig) {
}

func (c *FuseDaemonConfig) DumpString() (string, error) {
configRWMutex.Lock()
defer configRWMutex.Unlock()
return DumpConfigString(c)
}

Expand Down
2 changes: 1 addition & 1 deletion misc/snapshotter/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ FROM base AS kubectl-sourcer
ARG TARGETARCH

RUN apk add -q --no-cache curl && \
curl -fsSL -o /usr/bin/kubectl https://storage.googleapis.com/kubernetes-release/release/"$(curl -L -s https://dl.k8s.io/release/stable.txt)"/bin/linux/"$TARGETARCH"/kubectl && \
curl -fsSL -o /usr/bin/kubectl https://dl.k8s.io/release/"$(curl -L -s https://dl.k8s.io/release/stable.txt)"/bin/linux/"$TARGETARCH"/kubectl && \
chmod +x /usr/bin/kubectl

FROM base
Expand Down
55 changes: 43 additions & 12 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ type Daemon struct {
state types.DaemonState
}

type NydusdSupplementInfo struct {
DaemonState ConfigState
ImageID string
SnapshotID string
Vpc bool
Labels map[string]string
Params map[string]string
}

func (s *NydusdSupplementInfo) GetImageID() string { return s.ImageID }
func (s *NydusdSupplementInfo) GetSnapshotID() string { return s.SnapshotID }
func (s *NydusdSupplementInfo) IsVPCRegistry() bool { return s.Vpc }
func (s *NydusdSupplementInfo) GetLabels() map[string]string { return s.Labels }
func (s *NydusdSupplementInfo) GetParams() map[string]string { return s.Params }

func (d *Daemon) Lock() {
d.mu.Lock()
}
Expand Down Expand Up @@ -250,12 +265,7 @@ func (d *Daemon) sharedFusedevMount(rafs *rafs.Rafs) error {
return err
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID))
if err != nil {
return errors.Wrapf(err, "Failed to reload instance configuration %s",
d.ConfigFile(rafs.SnapshotID))
}

c := d.Config
cfg, err := c.DumpString()
if err != nil {
return errors.Wrap(err, "dump instance configuration")
Expand All @@ -280,12 +290,7 @@ func (d *Daemon) sharedErofsMount(ra *rafs.Rafs) error {
return errors.Wrapf(err, "failed to create fscache work dir %s", ra.FscacheWorkDir())
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(ra.SnapshotID))
if err != nil {
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(ra.SnapshotID), err)
return err
}

c := d.Config
cfgStr, err := c.DumpString()
if err != nil {
return err
Expand Down Expand Up @@ -650,3 +655,29 @@ func NewDaemon(opt ...NewDaemonOpt) (*Daemon, error) {

return d, nil
}

func (d *Daemon) MountByAPI() error {
rafs := d.RafsCache.Head()
if rafs == nil {
return errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
}
client, err := d.GetClient()
if err != nil {
return errors.Wrapf(err, "mount instance %s", rafs.SnapshotID)
}
bootstrap, err := rafs.BootstrapFile()
if err != nil {
return err
}
c := d.Config
cfg, err := c.DumpString()
if err != nil {
return errors.Wrap(err, "dump instance configuration")
}
err = client.Mount("/", bootstrap, cfg)
if err != nil {
return errors.Wrapf(err, "mount rafs instance MountByAPI()")
}
return nil

}
53 changes: 26 additions & 27 deletions pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,19 @@ func NewFileSystem(ctx context.Context, opt ...NewFSOpt) (*Filesystem, error) {
if err != nil {
return errors.Wrapf(err, "get filesystem manager for daemon %s", d.States.ID)
}

supplementInfo, err := fsManager.GetInfo(d.ID())
if err != nil {
return errors.Wrap(err, "GetInfo failed")
}

cfg := d.Config
err = daemonconfig.SupplementDaemonConfig(cfg, supplementInfo)
if err != nil {
return errors.Wrap(err, "supplement configuration")
}
d.Config = cfg

if err := fsManager.StartDaemon(d); err != nil {
return errors.Wrapf(err, "start daemon %s", d.ID())
}
Expand Down Expand Up @@ -232,7 +245,6 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
// Instance already exists, how could this happen? Can containerd handle this case?
return nil
}

fsDriver := config.GetFsDriver()
if label.IsTarfsDataLayer(labels) {
fsDriver = config.FsDriverBlockdev
Expand Down Expand Up @@ -302,34 +314,25 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
daemonconfig.WorkDir: workDir,
daemonconfig.CacheDir: cacheDir,
}
supplementInfo := &daemon.NydusdSupplementInfo{
DaemonState: d.States,
ImageID: imageID,
SnapshotID: snapshotID,
Vpc: false,
Labels: labels,
Params: params,
}
cfg := deepcopy.Copy(*fsManager.DaemonConfig).(daemonconfig.DaemonConfig)
err = daemonconfig.SupplementDaemonConfig(cfg, imageID, snapshotID, false, labels, params)
err = daemonconfig.SupplementDaemonConfig(cfg, supplementInfo)
if err != nil {
return errors.Wrap(err, "supplement configuration")
}

if errs := fsManager.AddSupplementInfo(supplementInfo); errs != nil {
return errors.Wrapf(err, "AddSupplementInfo failed %s", d.States.ID)
}
// TODO: How to manage rafs configurations on-disk? separated json config file or DB record?
// In order to recover erofs mount, the configuration file has to be persisted.
var configSubDir string
if useSharedDaemon {
configSubDir = snapshotID
} else {
// Associate daemon config object when creating a new daemon object to avoid
// reading disk file again and again.
// For shared daemon, each rafs instance has its own configuration, so we don't
// attach a config interface to daemon in this case.
d.Config = cfg
}

err = cfg.DumpFile(d.ConfigFile(configSubDir))
if err != nil {
if errors.Is(err, errdefs.ErrAlreadyExists) {
log.L.Debugf("Configuration file %s already exits", d.ConfigFile(configSubDir))
} else {
return errors.Wrap(err, "dump daemon configuration file")
}
}

d.Config = cfg
d.AddRafsInstance(rafs)

// if publicKey is not empty we should verify bootstrap file of image
Expand Down Expand Up @@ -596,10 +599,6 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {
// it is loaded when requesting mount api
// Dump the configuration file since it is reloaded when recovering the nydusd
d.Config = *fsManager.DaemonConfig
err = d.Config.DumpFile(d.ConfigFile(""))
if err != nil && !errors.Is(err, errdefs.ErrAlreadyExists) {
return errors.Wrapf(err, "dump configuration file %s", d.ConfigFile(""))
}

if err := fsManager.StartDaemon(d); err != nil {
return errors.Wrap(err, "start shared daemon")
Expand Down
14 changes: 10 additions & 4 deletions pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
if err := cmd.Start(); err != nil {
return err
}
fsDriver := config.GetFsDriver()
isSharedFusedev := fsDriver == config.FsDriverFusedev && config.GetDaemonMode() == config.DaemonModeShared
useSharedDaemon := fsDriver == config.FsDriverFscache || isSharedFusedev

if !useSharedDaemon {
errs := d.MountByAPI()
if errs != nil {
return errors.Wrapf(err, "failed to mount")
}
}

d.Lock()
defer d.Unlock()
Expand Down Expand Up @@ -155,10 +165,6 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
return nil, errors.Wrapf(err, "locate bootstrap %s", bootstrap)
}

cmdOpts = append(cmdOpts,
command.WithConfig(d.ConfigFile("")),
command.WithBootstrap(bootstrap),
)
if config.IsBackendSourceEnabled() {
configAPIPath := fmt.Sprintf(endpointGetBackend, d.States.ID)
cmdOpts = append(cmdOpts,
Expand Down
25 changes: 18 additions & 7 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,23 @@ func (m *Manager) AddDaemon(daemon *daemon.Daemon) error {
return nil
}

func (m *Manager) AddSupplementInfo(supplementInfo *daemon.NydusdSupplementInfo) error {
m.mu.Lock()
defer m.mu.Unlock()
if err := m.store.AddInfo(supplementInfo); err != nil {
return errors.Wrapf(err, "add supplementInfo %s", supplementInfo.DaemonState.ID)
}
return nil
}

func (m *Manager) GetInfo(daemonID string) (*daemon.NydusdSupplementInfo, error) {
info, err := m.store.GetInfo(daemonID)
if err != nil {
return nil, errors.Wrapf(err, "add supplementInfo %s", daemonID)
}
return info, nil
}

func (m *Manager) UpdateDaemon(daemon *daemon.Daemon) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down Expand Up @@ -322,13 +339,7 @@ func (m *Manager) recoverDaemons(ctx context.Context,
}

if d.States.FsDriver == config.FsDriverFusedev {
cfg, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(""))
if err != nil {
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(""), err)
return err
}

d.Config = cfg
d.Config = *m.DaemonConfig
}

state, err := d.GetState()
Expand Down
3 changes: 3 additions & 0 deletions pkg/manager/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Store interface {
WalkRafsInstances(ctx context.Context, cb func(*rafs.Rafs) error) error

NextInstanceSeq() (uint64, error)

AddInfo(supplementInfo *daemon.NydusdSupplementInfo) error
GetInfo(daemonID string) (*daemon.NydusdSupplementInfo, error)
}

var _ Store = &store.DaemonRafsStore{}
8 changes: 8 additions & 0 deletions pkg/store/daemonstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ func (s *DaemonRafsStore) AddDaemon(d *daemon.Daemon) error {
return s.db.SaveDaemon(context.TODO(), d)
}

func (s *DaemonRafsStore) AddInfo(supplementInfo *daemon.NydusdSupplementInfo) error {
return s.db.SaveInfo(context.TODO(), supplementInfo)
}

func (s *DaemonRafsStore) GetInfo(imageID string) (*daemon.NydusdSupplementInfo, error) {
return s.db.GetSupplementInfo(context.TODO(), imageID)
}

func (s *DaemonRafsStore) UpdateDaemon(d *daemon.Daemon) error {
return s.db.UpdateDaemon(context.TODO(), d)
}
Expand Down
Loading

0 comments on commit d6c16a2

Please sign in to comment.