Skip to content

Commit

Permalink
Restore improvement: index (#4044)
Browse files Browse the repository at this point in the history
* feat(schema): drop sequential restore run tracking

Right now SM restores location by location, manifest by manifest, table by table.
That's why it tracks restore progress by keeping location/manifest/table in the DB.
We are moving away from sequential restore approach in favor of
restoring from all locations/manifests/tables at the same time.

* feat(restore): adjust model to dropped sequential restore run tracking

* refactor(backupspec): include the newest file version in ListVersionedFiles

There is no need to iterate versioned files (ListVersionedFiles) and not versioned files (buildFilesSizesCache) separately.
Doing it in a single iteration is faster, and it allows to store all size information in a single place.

* feat(restore): add workload indexing

This commit introduces the structure of restore workload.
Workload is divided per location->table->remote sstable directory.
This changes the hierarchy established by manifests (location->node->table->remote sstable dir).
It also aggregates files into actual sstables, extracts their IDs, and aggregates their sizes,
and keeps track of sstable versioning.

* feat(restore): index, support resume

Indexed workload won't contain sstables that were already
restored during previous restore run.

* feat(restore): index, support metrics init

* feat(restore): add primitive batching using indexed workload

This is a temporary implementation used for integrating workload
indexing with the rest of the code. It will be improved as a part
of the #3979.

* feat(restore): integrate new indexing and batching with codebase

This commit makes use of the new indexing and batching
approaches and uses them in the restore tables codebase.

* fix(restore): handle download of fully versioned batch

Recent commits changed versioned batch download so that
if any sstable component is versioned, then all sstable components
are downloaded as versioned files.
It was done in that way to allow easier versioned progress calculation
(we don't store per file size, only the whole sstable size).

This brought to light a bug (that existed before, but was more difficult to hit),
in which restoring batch failed when the whole batch was versioned,
as calling RcloneSyncCopyPaths on empty paths parameter resulted in broken download.

We could just skip the RcloneSyncCopyPaths call when the whole batch is versioned,
but this would leave us without the agentJobID which is a part of sort key
in RestoreRunProgress. Without it, we could potentially overwrite one
restore run progress with another - if both of them happened on the same
RemoteSSTableDir, by the same Host, and were fully versioned.
It would also introduce a different path for restoring regular batch and fully versioned batch,
which is not desirable.

That's why I decided to modify rclone server to allow empty path parameter,
so that it still generates agentJobID, but it doesn't do anything except for that.

* feat(restore): index, log workload info

Workload info contains location/table/remote sstable dir sstable count,
total size, max and average sstable size.
  • Loading branch information
Michal-Leszczynski authored Oct 3, 2024
1 parent 5048128 commit 14aef7b
Show file tree
Hide file tree
Showing 14 changed files with 746 additions and 616 deletions.
4 changes: 3 additions & 1 deletion pkg/rclone/rcserver/rc.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,9 @@ func rcCopyPaths() func(ctx context.Context, in rc.Params) (rc.Params, error) {
if err != nil {
return nil, err
}

if len(paths) == 0 {
return nil, nil
}
return nil, sync.CopyPaths(ctx, dstFs, dstRemote, srcFs, srcRemote, paths, false)
}
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/schema/table/table.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/scyllaclient/client_rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ func (c *Client) RcloneCopyPaths(ctx context.Context, host, dstRemoteDir, srcRem
if err != nil {
return 0, err
}
if paths == nil {
paths = make([]string, 0)
}

p := operations.SyncCopyPathsParams{
Context: forceHost(ctx, host),
Expand Down
37 changes: 25 additions & 12 deletions pkg/service/backup/backupspec/versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@ import (
// (e.g. older version of 'md-2-big-Data.db' could be 'md-2-big-Data.db.sm_20230114183231UTC')
// Note, that the newest version of SSTable does not have snapshot tag extension.
type VersionedSSTable struct {
Name string // Original SSTable name (e.g. md-2-big-Data.db)
Version string // Snapshot tag extension representing backup that introduced newer version of this SSTable (e.g. sm_20230114183231UTC)
Name string // Original SSTable name (e.g. md-2-big-Data.db)
// Snapshot tag extension representing backup that introduced newer version of this SSTable (e.g. sm_20230114183231UTC).
// Empty version describes not versioned (newest version) of sstable without the snapshot tag suffix.
Version string
Size int64
}

// FullName returns versioned file name.
func (vt VersionedSSTable) FullName() string {
if vt.Version == "" {
return vt.Name
}
return vt.Name + "." + vt.Version
}

Expand Down Expand Up @@ -66,6 +71,9 @@ func IsVersionedFileRemovable(oldest time.Time, versioned string) (bool, error)
// SplitNameAndVersion splits versioned file name into its original name and its version.
func SplitNameAndVersion(versioned string) (name, version string) {
versionExt := path.Ext(versioned)
if versionExt == "" || !IsSnapshotTag(versionExt[1:]) {
return versioned, ""
}
baseName := strings.TrimSuffix(versioned, versionExt)
return baseName, versionExt[1:]
}
Expand All @@ -74,11 +82,7 @@ func SplitNameAndVersion(versioned string) (name, version string) {
func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapshotTag, host, dir string) (VersionedMap, error) {
versionedFiles := make(VersionedMap)
allVersions := make(map[string][]VersionedSSTable)

opts := &scyllaclient.RcloneListDirOpts{
FilesOnly: true,
VersionedOnly: true,
}
opts := &scyllaclient.RcloneListDirOpts{FilesOnly: true}
f := func(item *scyllaclient.RcloneListDirItem) {
name, version := SplitNameAndVersion(item.Name)
allVersions[name] = append(allVersions[name], VersionedSSTable{
Expand All @@ -87,7 +91,6 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh
Size: item.Size,
})
}

if err := client.RcloneListDirIter(ctx, host, dir, opts, f); err != nil {
return nil, errors.Wrapf(err, "host %s: listing versioned files", host)
}
Expand All @@ -97,9 +100,17 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh
return nil, err
}
// Chose correct version with respect to currently restored snapshot tag
for _, versions := range allVersions {
var candidate VersionedSSTable
for name, versions := range allVersions {
var (
candidate VersionedSSTable
newest VersionedSSTable
)
for _, v := range versions {
if v.Version == "" {
newest = v
continue
}

tagT, err := SnapshotTagTime(v.Version)
if err != nil {
return nil, err
Expand All @@ -111,8 +122,10 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh
}
}

if candidate.Version != "" {
versionedFiles[candidate.Name] = candidate
if candidate.Version == "" {
versionedFiles[name] = newest
} else {
versionedFiles[name] = candidate
}
}

Expand Down
175 changes: 175 additions & 0 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright (C) 2024 ScyllaDB

package restore

import (
"slices"
"sync"

"github.com/pkg/errors"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
)

type batchDispatcher struct {
mu sync.Mutex
workload []LocationWorkload
batchSize int
locationHosts map[Location][]string
}

func newBatchDispatcher(workload []LocationWorkload, batchSize int, locationHosts map[Location][]string) *batchDispatcher {
return &batchDispatcher{
mu: sync.Mutex{},
workload: workload,
batchSize: batchSize,
locationHosts: locationHosts,
}
}

type batch struct {
TableName
*ManifestInfo

RemoteSSTableDir string
Size int64
SSTables []RemoteSSTable
}

func (b batch) NotVersionedSSTables() []RemoteSSTable {
var ssts []RemoteSSTable
for _, sst := range b.SSTables {
if !sst.Versioned {
ssts = append(ssts, sst)
}
}
return ssts
}

func (b batch) VersionedSSTables() []RemoteSSTable {
var ssts []RemoteSSTable
for _, sst := range b.SSTables {
if sst.Versioned {
ssts = append(ssts, sst)
}
}
return ssts
}

func (b batch) VersionedSize() int64 {
var size int64
for _, sst := range b.SSTables {
if sst.Versioned {
size += sst.Size
}
}
return size
}

func (b batch) IDs() []string {
var ids []string
for _, sst := range b.SSTables {
ids = append(ids, sst.ID)
}
return ids
}

// ValidateAllDispatched returns error if not all sstables were dispatched.
func (b *batchDispatcher) ValidateAllDispatched() error {
for _, lw := range b.workload {
if lw.Size != 0 {
for _, tw := range lw.Tables {
if tw.Size != 0 {
for _, dw := range tw.RemoteDirs {
if dw.Size != 0 || len(dw.SSTables) != 0 {
return errors.Errorf("expected all data to be restored, missing sstable ids from location %s table %s.%s: %v (%d bytes)",
dw.Location, dw.Keyspace, dw.Table, dw.SSTables, dw.Size)
}
}
return errors.Errorf("expected all data to be restored, missinng table from location %s: %s.%s (%d bytes)",
tw.Location, tw.Keyspace, tw.Table, tw.Size)
}
}
return errors.Errorf("expected all data to be restored, missinng location: %s (%d bytes)",
lw.Location, lw.Size)
}
}
return nil
}

// DispatchBatch batch to be restored or false when there is no more work to do.
func (b *batchDispatcher) DispatchBatch(host string) (batch, bool) {
b.mu.Lock()
defer b.mu.Unlock()

l := b.chooseLocation(host)
if l == nil {
return batch{}, false
}
t := b.chooseTable(l)
if t == nil {
return batch{}, false
}
dir := b.chooseRemoteDir(t)
if dir == nil {
return batch{}, false
}
out := b.createBatch(l, t, dir)
return out, true
}

// Returns location for which batch should be created.
func (b *batchDispatcher) chooseLocation(host string) *LocationWorkload {
for i := range b.workload {
if b.workload[i].Size == 0 {
continue
}
if slices.Contains(b.locationHosts[b.workload[i].Location], host) {
return &b.workload[i]
}
}
return nil
}

// Returns table for which batch should be created.
func (b *batchDispatcher) chooseTable(location *LocationWorkload) *TableWorkload {
for i := range location.Tables {
if location.Tables[i].Size == 0 {
continue
}
return &location.Tables[i]
}
return nil
}

// Return remote dir for which batch should be created.
func (b *batchDispatcher) chooseRemoteDir(table *TableWorkload) *RemoteDirWorkload {
for i := range table.RemoteDirs {
if table.RemoteDirs[i].Size == 0 {
continue
}
return &table.RemoteDirs[i]
}
return nil
}

// Returns batch and updates RemoteDirWorkload and its parents.
func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir *RemoteDirWorkload) batch {
i := min(b.batchSize, len(dir.SSTables))
sstables := dir.SSTables[:i]
dir.SSTables = dir.SSTables[i:]

var size int64
for _, sst := range sstables {
size += sst.Size
}
dir.Size -= size
t.Size -= size
l.Size -= size
return batch{
TableName: dir.TableName,
ManifestInfo: dir.ManifestInfo,
RemoteSSTableDir: dir.RemoteSSTableDir,
Size: size,
SSTables: sstables,
}
}
Loading

0 comments on commit 14aef7b

Please sign in to comment.