From 14aef7beda8c3daf321c84ef2c39776a93c5c4f1 Mon Sep 17 00:00:00 2001 From: Michal-Leszczynski <74614433+Michal-Leszczynski@users.noreply.github.com> Date: Thu, 3 Oct 2024 11:58:47 +0200 Subject: [PATCH] Restore improvement: index (#4044) * feat(schema): drop sequential restore run tracking Right now SM restores location by location, manifest by manifest, table by table. That's why it tracks restore progress by keeping location/manifest/table in the DB. We are moving away from sequential restore approach in favor of restoring from all locations/manifests/tables at the same time. * feat(restore): adjust model to dropped sequential restore run tracking * refactor(backupspec): include the newest file version in ListVersionedFiles There is no need to iterate versioned files (ListVersionedFiles) and not versioned files (buildFilesSizesCache) separately. Doing it in a single iteration is faster, and it allows to store all size information in a single place. * feat(restore): add workload indexing This commit introduces the structure of restore workload. Workload is divided per location->table->remote sstable directory. This changes the hierarchy established by manifests (location->node->table->remote sstable dir). It also aggregates files into actual sstables, extracts their IDs, and aggregates their sizes, and keeps track of sstable versioning. * feat(restore): index, support resume Indexed workload won't contain sstables that were already restored during previous restore run. * feat(restore): index, support metrics init * feat(restore): add primitive batching using indexed workload This is a temporary implementation used for integrating workload indexing with the rest of the code. It will be improved as a part of the #3979. * feat(restore): integrate new indexing and batching with codebase This commit makes use of the new indexing and batching approaches and uses them in the restore tables codebase. * fix(restore): handle download of fully versioned batch Recent commits changed versioned batch download so that if any sstable component is versioned, then all sstable components are downloaded as versioned files. It was done in that way to allow easier versioned progress calculation (we don't store per file size, only the whole sstable size). This brought to light a bug (that existed before, but was more difficult to hit), in which restoring batch failed when the whole batch was versioned, as calling RcloneSyncCopyPaths on empty paths parameter resulted in broken download. We could just skip the RcloneSyncCopyPaths call when the whole batch is versioned, but this would leave us without the agentJobID which is a part of sort key in RestoreRunProgress. Without it, we could potentially overwrite one restore run progress with another - if both of them happened on the same RemoteSSTableDir, by the same Host, and were fully versioned. It would also introduce a different path for restoring regular batch and fully versioned batch, which is not desirable. That's why I decided to modify rclone server to allow empty path parameter, so that it still generates agentJobID, but it doesn't do anything except for that. * feat(restore): index, log workload info Workload info contains location/table/remote sstable dir sstable count, total size, max and average sstable size. --- pkg/rclone/rcserver/rc.go | 4 +- pkg/schema/table/table.go | 4 - pkg/scyllaclient/client_rclone.go | 3 + pkg/service/backup/backupspec/versioning.go | 37 +- pkg/service/restore/batch.go | 175 ++++++ pkg/service/restore/index.go | 339 ++++++++++++ pkg/service/restore/model.go | 40 +- pkg/service/restore/schema_worker.go | 24 +- pkg/service/restore/service.go | 8 +- .../service_restore_integration_test.go | 11 +- pkg/service/restore/tables_worker.go | 158 +++--- pkg/service/restore/tablesdir_worker.go | 520 +++--------------- pkg/service/restore/worker.go | 35 +- schema/v3.4.0.cql | 4 + 14 files changed, 746 insertions(+), 616 deletions(-) create mode 100644 pkg/service/restore/batch.go create mode 100644 pkg/service/restore/index.go create mode 100644 schema/v3.4.0.cql diff --git a/pkg/rclone/rcserver/rc.go b/pkg/rclone/rcserver/rc.go index 639d0848e5..0e0537d07d 100644 --- a/pkg/rclone/rcserver/rc.go +++ b/pkg/rclone/rcserver/rc.go @@ -554,7 +554,9 @@ func rcCopyPaths() func(ctx context.Context, in rc.Params) (rc.Params, error) { if err != nil { return nil, err } - + if len(paths) == 0 { + return nil, nil + } return nil, sync.CopyPaths(ctx, dstFs, dstRemote, srcFs, srcRemote, paths, false) } } diff --git a/pkg/schema/table/table.go b/pkg/schema/table/table.go index 1e5bf63515..5dcbeefcc6 100644 --- a/pkg/schema/table/table.go +++ b/pkg/schema/table/table.go @@ -188,14 +188,10 @@ var ( Columns: []string{ "cluster_id", "id", - "keyspace_name", - "location", - "manifest_path", "prev_id", "repair_task_id", "snapshot_tag", "stage", - "table_name", "task_id", "units", "views", diff --git a/pkg/scyllaclient/client_rclone.go b/pkg/scyllaclient/client_rclone.go index 4db3e27822..b4f415353e 100644 --- a/pkg/scyllaclient/client_rclone.go +++ b/pkg/scyllaclient/client_rclone.go @@ -280,6 +280,9 @@ func (c *Client) RcloneCopyPaths(ctx context.Context, host, dstRemoteDir, srcRem if err != nil { return 0, err } + if paths == nil { + paths = make([]string, 0) + } p := operations.SyncCopyPathsParams{ Context: forceHost(ctx, host), diff --git a/pkg/service/backup/backupspec/versioning.go b/pkg/service/backup/backupspec/versioning.go index 103502d130..af33203943 100644 --- a/pkg/service/backup/backupspec/versioning.go +++ b/pkg/service/backup/backupspec/versioning.go @@ -21,13 +21,18 @@ import ( // (e.g. older version of 'md-2-big-Data.db' could be 'md-2-big-Data.db.sm_20230114183231UTC') // Note, that the newest version of SSTable does not have snapshot tag extension. type VersionedSSTable struct { - Name string // Original SSTable name (e.g. md-2-big-Data.db) - Version string // Snapshot tag extension representing backup that introduced newer version of this SSTable (e.g. sm_20230114183231UTC) + Name string // Original SSTable name (e.g. md-2-big-Data.db) + // Snapshot tag extension representing backup that introduced newer version of this SSTable (e.g. sm_20230114183231UTC). + // Empty version describes not versioned (newest version) of sstable without the snapshot tag suffix. + Version string Size int64 } // FullName returns versioned file name. func (vt VersionedSSTable) FullName() string { + if vt.Version == "" { + return vt.Name + } return vt.Name + "." + vt.Version } @@ -66,6 +71,9 @@ func IsVersionedFileRemovable(oldest time.Time, versioned string) (bool, error) // SplitNameAndVersion splits versioned file name into its original name and its version. func SplitNameAndVersion(versioned string) (name, version string) { versionExt := path.Ext(versioned) + if versionExt == "" || !IsSnapshotTag(versionExt[1:]) { + return versioned, "" + } baseName := strings.TrimSuffix(versioned, versionExt) return baseName, versionExt[1:] } @@ -74,11 +82,7 @@ func SplitNameAndVersion(versioned string) (name, version string) { func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapshotTag, host, dir string) (VersionedMap, error) { versionedFiles := make(VersionedMap) allVersions := make(map[string][]VersionedSSTable) - - opts := &scyllaclient.RcloneListDirOpts{ - FilesOnly: true, - VersionedOnly: true, - } + opts := &scyllaclient.RcloneListDirOpts{FilesOnly: true} f := func(item *scyllaclient.RcloneListDirItem) { name, version := SplitNameAndVersion(item.Name) allVersions[name] = append(allVersions[name], VersionedSSTable{ @@ -87,7 +91,6 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh Size: item.Size, }) } - if err := client.RcloneListDirIter(ctx, host, dir, opts, f); err != nil { return nil, errors.Wrapf(err, "host %s: listing versioned files", host) } @@ -97,9 +100,17 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh return nil, err } // Chose correct version with respect to currently restored snapshot tag - for _, versions := range allVersions { - var candidate VersionedSSTable + for name, versions := range allVersions { + var ( + candidate VersionedSSTable + newest VersionedSSTable + ) for _, v := range versions { + if v.Version == "" { + newest = v + continue + } + tagT, err := SnapshotTagTime(v.Version) if err != nil { return nil, err @@ -111,8 +122,10 @@ func ListVersionedFiles(ctx context.Context, client *scyllaclient.Client, snapsh } } - if candidate.Version != "" { - versionedFiles[candidate.Name] = candidate + if candidate.Version == "" { + versionedFiles[name] = newest + } else { + versionedFiles[name] = candidate } } diff --git a/pkg/service/restore/batch.go b/pkg/service/restore/batch.go new file mode 100644 index 0000000000..b432e653d7 --- /dev/null +++ b/pkg/service/restore/batch.go @@ -0,0 +1,175 @@ +// Copyright (C) 2024 ScyllaDB + +package restore + +import ( + "slices" + "sync" + + "github.com/pkg/errors" + . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" +) + +type batchDispatcher struct { + mu sync.Mutex + workload []LocationWorkload + batchSize int + locationHosts map[Location][]string +} + +func newBatchDispatcher(workload []LocationWorkload, batchSize int, locationHosts map[Location][]string) *batchDispatcher { + return &batchDispatcher{ + mu: sync.Mutex{}, + workload: workload, + batchSize: batchSize, + locationHosts: locationHosts, + } +} + +type batch struct { + TableName + *ManifestInfo + + RemoteSSTableDir string + Size int64 + SSTables []RemoteSSTable +} + +func (b batch) NotVersionedSSTables() []RemoteSSTable { + var ssts []RemoteSSTable + for _, sst := range b.SSTables { + if !sst.Versioned { + ssts = append(ssts, sst) + } + } + return ssts +} + +func (b batch) VersionedSSTables() []RemoteSSTable { + var ssts []RemoteSSTable + for _, sst := range b.SSTables { + if sst.Versioned { + ssts = append(ssts, sst) + } + } + return ssts +} + +func (b batch) VersionedSize() int64 { + var size int64 + for _, sst := range b.SSTables { + if sst.Versioned { + size += sst.Size + } + } + return size +} + +func (b batch) IDs() []string { + var ids []string + for _, sst := range b.SSTables { + ids = append(ids, sst.ID) + } + return ids +} + +// ValidateAllDispatched returns error if not all sstables were dispatched. +func (b *batchDispatcher) ValidateAllDispatched() error { + for _, lw := range b.workload { + if lw.Size != 0 { + for _, tw := range lw.Tables { + if tw.Size != 0 { + for _, dw := range tw.RemoteDirs { + if dw.Size != 0 || len(dw.SSTables) != 0 { + return errors.Errorf("expected all data to be restored, missing sstable ids from location %s table %s.%s: %v (%d bytes)", + dw.Location, dw.Keyspace, dw.Table, dw.SSTables, dw.Size) + } + } + return errors.Errorf("expected all data to be restored, missinng table from location %s: %s.%s (%d bytes)", + tw.Location, tw.Keyspace, tw.Table, tw.Size) + } + } + return errors.Errorf("expected all data to be restored, missinng location: %s (%d bytes)", + lw.Location, lw.Size) + } + } + return nil +} + +// DispatchBatch batch to be restored or false when there is no more work to do. +func (b *batchDispatcher) DispatchBatch(host string) (batch, bool) { + b.mu.Lock() + defer b.mu.Unlock() + + l := b.chooseLocation(host) + if l == nil { + return batch{}, false + } + t := b.chooseTable(l) + if t == nil { + return batch{}, false + } + dir := b.chooseRemoteDir(t) + if dir == nil { + return batch{}, false + } + out := b.createBatch(l, t, dir) + return out, true +} + +// Returns location for which batch should be created. +func (b *batchDispatcher) chooseLocation(host string) *LocationWorkload { + for i := range b.workload { + if b.workload[i].Size == 0 { + continue + } + if slices.Contains(b.locationHosts[b.workload[i].Location], host) { + return &b.workload[i] + } + } + return nil +} + +// Returns table for which batch should be created. +func (b *batchDispatcher) chooseTable(location *LocationWorkload) *TableWorkload { + for i := range location.Tables { + if location.Tables[i].Size == 0 { + continue + } + return &location.Tables[i] + } + return nil +} + +// Return remote dir for which batch should be created. +func (b *batchDispatcher) chooseRemoteDir(table *TableWorkload) *RemoteDirWorkload { + for i := range table.RemoteDirs { + if table.RemoteDirs[i].Size == 0 { + continue + } + return &table.RemoteDirs[i] + } + return nil +} + +// Returns batch and updates RemoteDirWorkload and its parents. +func (b *batchDispatcher) createBatch(l *LocationWorkload, t *TableWorkload, dir *RemoteDirWorkload) batch { + i := min(b.batchSize, len(dir.SSTables)) + sstables := dir.SSTables[:i] + dir.SSTables = dir.SSTables[i:] + + var size int64 + for _, sst := range sstables { + size += sst.Size + } + dir.Size -= size + t.Size -= size + l.Size -= size + return batch{ + TableName: dir.TableName, + ManifestInfo: dir.ManifestInfo, + RemoteSSTableDir: dir.RemoteSSTableDir, + Size: size, + SSTables: sstables, + } +} diff --git a/pkg/service/restore/index.go b/pkg/service/restore/index.go new file mode 100644 index 0000000000..dd7b7b72cd --- /dev/null +++ b/pkg/service/restore/index.go @@ -0,0 +1,339 @@ +// Copyright (C) 2024 ScyllaDB + +package restore + +import ( + "context" + "slices" + + "github.com/pkg/errors" + "github.com/scylladb/scylla-manager/v3/pkg/metrics" + . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" + "github.com/scylladb/scylla-manager/v3/pkg/sstable" +) + +// LocationWorkload represents aggregated restore workload +// in given backup location. +type LocationWorkload struct { + Location + + Size int64 + Tables []TableWorkload +} + +// TableWorkload represents restore workload +// from many manifests for given table in given backup location. +type TableWorkload struct { + Location + TableName + + Size int64 + RemoteDirs []RemoteDirWorkload +} + +// RemoteDirWorkload represents restore workload +// for given table and manifest in given backup location. +type RemoteDirWorkload struct { + TableName + *ManifestInfo + + RemoteSSTableDir string + Size int64 + SSTables []RemoteSSTable +} + +// RemoteSSTable represents SSTable updated with size and version info from remote. +type RemoteSSTable struct { + SSTable // File names might contain versioned snapshot tag extension + Size int64 + Versioned bool +} + +// SSTable represents files creating a single sstable. +type SSTable struct { + ID string + Files []string +} + +// IndexWorkload returns sstables to be restored aggregated by location, table and remote sstable dir. +func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []Location) ([]LocationWorkload, error) { + var workload []LocationWorkload + for _, l := range locations { + lw, err := w.indexLocationWorkload(ctx, l) + if err != nil { + return nil, errors.Wrapf(err, "index workload in %s", l) + } + workload = append(workload, lw) + } + return workload, nil +} + +func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location Location) (LocationWorkload, error) { + rawWorkload, err := w.createRemoteDirWorkloads(ctx, location) + if err != nil { + return LocationWorkload{}, errors.Wrap(err, "create remote dir workloads") + } + if w.target.Continue { + rawWorkload, err = w.filterPreviouslyRestoredSStables(ctx, rawWorkload) + if err != nil { + return LocationWorkload{}, errors.Wrap(err, "filter already restored sstables") + } + } + workload := aggregateLocationWorkload(rawWorkload) + w.logWorkloadInfo(ctx, workload) + return workload, nil +} + +func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location Location) ([]RemoteDirWorkload, error) { + var rawWorkload []RemoteDirWorkload + err := w.forEachManifest(ctx, location, func(m ManifestInfoWithContent) error { + return m.ForEachIndexIterWithError(nil, func(fm FilesMeta) error { + if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { + return nil + } + + sstables, err := filesMetaToSSTables(fm) + if err != nil { + return errors.Wrapf(err, "convert files meta to sstables") + } + sstDir := m.LocationSSTableVersionDir(fm.Keyspace, fm.Table, fm.Version) + remoteSSTables, err := w.adjustSSTablesWithRemote(ctx, w.randomHostFromLocation(location), sstDir, sstables) + if err != nil { + return errors.Wrap(err, "fetch sstables sizes") + } + + var size int64 + for _, sst := range remoteSSTables { + size += sst.Size + } + t := TableName{ + Keyspace: fm.Keyspace, + Table: fm.Table, + } + workload := RemoteDirWorkload{ + TableName: t, + ManifestInfo: m.ManifestInfo, + RemoteSSTableDir: sstDir, + Size: size, + SSTables: remoteSSTables, + } + if size > 0 { + rawWorkload = append(rawWorkload, workload) + } + return nil + }) + }) + if err != nil { + return nil, errors.Wrap(err, "iterate over manifests") + } + return rawWorkload, nil +} + +func (w *tablesWorker) filterPreviouslyRestoredSStables(ctx context.Context, rawWorkload []RemoteDirWorkload) ([]RemoteDirWorkload, error) { + w.logger.Info(ctx, "Filter out previously restored sstables") + + remoteSSTableDirToRestoredIDs := make(map[string][]string) + err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.ID, func(pr *RunProgress) { + if validateTimeIsSet(pr.RestoreCompletedAt) { + remoteSSTableDirToRestoredIDs[pr.RemoteSSTableDir] = append(remoteSSTableDirToRestoredIDs[pr.RemoteSSTableDir], pr.SSTableID...) + } + }) + if err != nil { + return nil, errors.Wrap(err, "iterate over prev run progress") + } + if len(remoteSSTableDirToRestoredIDs) == 0 { + return rawWorkload, nil + } + + var ( + filtered []RemoteDirWorkload + skippedCount int + skippedSize int64 + ) + for _, rw := range rawWorkload { + var filteredSSTables []RemoteSSTable + var size int64 + for _, sst := range rw.SSTables { + if !slices.Contains(remoteSSTableDirToRestoredIDs[rw.RemoteSSTableDir], sst.ID) { + filteredSSTables = append(filteredSSTables, sst) + size += sst.Size + } else { + skippedCount++ + skippedSize += sst.Size + } + } + if len(filteredSSTables) > 0 { + filtered = append(filtered, RemoteDirWorkload{ + TableName: rw.TableName, + ManifestInfo: rw.ManifestInfo, + RemoteSSTableDir: rw.RemoteSSTableDir, + Size: size, + SSTables: filteredSSTables, + }) + } else { + w.logger.Info(ctx, "Completely filtered out remote sstable dir", "remote dir", rw.RemoteSSTableDir) + } + } + + w.logger.Info(ctx, "Filtered out sstables info", "count", skippedCount, "size", skippedSize) + return filtered, nil +} + +func (w *tablesWorker) initMetrics(workload []LocationWorkload) { + // For now, the only persistent across task runs metrics are progress and remaining_bytes. + // The rest: state, view_build_status, batch_size are calculated from scratch. + w.metrics.ResetClusterMetrics(w.run.ClusterID) + + // Init remaining bytes + for _, wl := range workload { + for _, twl := range wl.Tables { + for _, rdwl := range twl.RemoteDirs { + w.metrics.SetRemainingBytes(metrics.RestoreBytesLabels{ + ClusterID: rdwl.ClusterID.String(), + SnapshotTag: rdwl.SnapshotTag, + Location: rdwl.Location.String(), + DC: rdwl.DC, + Node: rdwl.NodeID, + Keyspace: rdwl.Keyspace, + Table: rdwl.Table, + }, rdwl.Size) + } + } + } + + // Init progress + var totalSize int64 + for _, u := range w.run.Units { + totalSize += u.Size + } + var workloadSize int64 + for _, wl := range workload { + workloadSize += wl.Size + } + w.metrics.SetProgress(metrics.RestoreProgressLabels{ + ClusterID: w.run.ClusterID.String(), + SnapshotTag: w.run.SnapshotTag, + }, float64(totalSize-workloadSize)/float64(totalSize)*100) +} + +func (w *tablesWorker) logWorkloadInfo(ctx context.Context, workload LocationWorkload) { + if workload.Size == 0 { + return + } + var locMax, locCnt int64 + for _, twl := range workload.Tables { + if twl.Size == 0 { + continue + } + var tabMax, tabCnt int64 + for _, rdwl := range twl.RemoteDirs { + if rdwl.Size == 0 { + continue + } + var dirMax int64 + for _, sst := range rdwl.SSTables { + dirMax = max(dirMax, sst.Size) + } + dirCnt := int64(len(rdwl.SSTables)) + w.logger.Info(ctx, "Remote sstable dir workload info", + "path", rdwl.RemoteSSTableDir, + "max size", dirMax, + "average size", rdwl.Size/dirCnt, + "count", dirCnt) + tabCnt += dirCnt + tabMax = max(tabMax, dirMax) + } + w.logger.Info(ctx, "Table workload info", + "keyspace", twl.Keyspace, + "table", twl.Table, + "max size", tabMax, + "average size", twl.Size/tabCnt, + "count", tabCnt) + locCnt += tabCnt + locMax = max(locMax, tabMax) + } + w.logger.Info(ctx, "Location workload info", + "location", workload.Location.String(), + "max size", locMax, + "average size", workload.Size/locCnt, + "count", locCnt) +} + +func aggregateLocationWorkload(rawWorkload []RemoteDirWorkload) LocationWorkload { + remoteDirWorkloads := make(map[TableName][]RemoteDirWorkload) + for _, rw := range rawWorkload { + remoteDirWorkloads[rw.TableName] = append(remoteDirWorkloads[rw.TableName], rw) + } + + var tableWorkloads []TableWorkload + for _, tw := range remoteDirWorkloads { + var size int64 + for _, rdw := range tw { + size += rdw.Size + } + tableWorkloads = append(tableWorkloads, TableWorkload{ + Location: tw[0].Location, + TableName: tw[0].TableName, + Size: size, + RemoteDirs: tw, + }) + } + + var size int64 + for _, tw := range tableWorkloads { + size += tw.Size + } + return LocationWorkload{ + Location: tableWorkloads[0].Location, + Size: size, + Tables: tableWorkloads, + } +} + +func (w *tablesWorker) adjustSSTablesWithRemote(ctx context.Context, host, remoteDir string, sstables map[string]SSTable) ([]RemoteSSTable, error) { + versioned, err := ListVersionedFiles(ctx, w.client, w.run.SnapshotTag, host, remoteDir) + if err != nil { + return nil, errors.Wrap(err, "list versioned files") + } + + remoteSSTables := make([]RemoteSSTable, 0, len(sstables)) + for id, sst := range sstables { + rsst := RemoteSSTable{SSTable: SSTable{ID: id}} + for _, f := range sst.Files { + v, ok := versioned[f] + if !ok { + return nil, errors.Errorf("file %s is not present in listed versioned files", f) + } + + rsst.Files = append(rsst.Files, v.FullName()) + rsst.Size += v.Size + rsst.Versioned = rsst.Versioned || v.Version != "" + } + remoteSSTables = append(remoteSSTables, rsst) + } + + return remoteSSTables, nil +} + +func filesMetaToSSTables(fm FilesMeta) (map[string]SSTable, error) { + const expectedSSTableFileCnt = 9 + sstables := make(map[string]SSTable, len(fm.Files)/expectedSSTableFileCnt) + + for _, f := range fm.Files { + id, err := sstable.ExtractID(f) + if err != nil { + return nil, errors.Wrapf(err, "extract sstable component %s generation ID", f) + } + + if sst, ok := sstables[id]; ok { + sst.Files = append(sst.Files, f) + sstables[id] = sst + } else { + sstables[id] = SSTable{ + ID: id, + Files: []string{f}, + } + } + } + return sstables, nil +} diff --git a/pkg/service/restore/model.go b/pkg/service/restore/model.go index 39a7d4a282..c4d4a26b80 100644 --- a/pkg/service/restore/model.go +++ b/pkg/service/restore/model.go @@ -80,16 +80,11 @@ type Run struct { TaskID uuid.UUID ID uuid.UUID - PrevID uuid.UUID - Location string // marks currently processed location - ManifestPath string // marks currently processed manifest - Keyspace string `db:"keyspace_name"` // marks currently processed keyspace - Table string `db:"table_name"` // marks currently processed table - SnapshotTag string - Stage Stage + PrevID uuid.UUID + SnapshotTag string + Stage Stage RepairTaskID uuid.UUID // task ID of the automated post-restore repair - // Cache that's initialized once for entire task Units []Unit Views []View @@ -165,20 +160,21 @@ func (t *View) UnmarshalUDT(name string, info gocql.TypeInfo, data []byte) error return gocql.Unmarshal(info, data, f.Addr().Interface()) } -// RunProgress describes restore progress (like in RunProgress) of -// already started download of SSTables with specified IDs to host. +// RunProgress describes progress of restoring a single batch. type RunProgress struct { ClusterID uuid.UUID TaskID uuid.UUID RunID uuid.UUID - ManifestPath string - Keyspace string `db:"keyspace_name"` - Table string `db:"table_name"` - Host string // IP of the node to which SSTables are downloaded. - AgentJobID int64 + // Different DB name because of historical reasons and because we can't drop/alter clustering column + RemoteSSTableDir string `db:"manifest_path"` + Keyspace string `db:"keyspace_name"` + Table string `db:"table_name"` + SSTableID []string `db:"sstable_id"` + + Host string // IP of the node to which SSTables are downloaded. + AgentJobID int64 - SSTableID []string `db:"sstable_id"` DownloadStartedAt *time.Time DownloadCompletedAt *time.Time RestoreStartedAt *time.Time @@ -206,7 +202,7 @@ func (pr *RunProgress) ForEachTableProgress(session gocqlx.Session, cb func(*Run "cluster_id": pr.ClusterID, "task_id": pr.TaskID, "run_id": pr.RunID, - "manifest_path": pr.ManifestPath, + "manifest_path": pr.RemoteSSTableDir, "keyspace_name": pr.Keyspace, "table_name": pr.Table, }).Iter() @@ -218,10 +214,6 @@ func (pr *RunProgress) ForEachTableProgress(session gocqlx.Session, cb func(*Run return iter.Close() } -func (pr *RunProgress) idCnt() int64 { - return int64(len(pr.SSTableID)) -} - func (pr *RunProgress) setRestoreStartedAt() { t := timeutc.Now() pr.RestoreStartedAt = &t @@ -279,3 +271,9 @@ type ViewProgress struct { Status scyllaclient.ViewBuildStatus `json:"status"` } + +// TableName represents full table name. +type TableName struct { + Keyspace string + Table string +} diff --git a/pkg/service/restore/schema_worker.go b/pkg/service/restore/schema_worker.go index bf1048f7f3..8f7bea05f9 100644 --- a/pkg/service/restore/schema_worker.go +++ b/pkg/service/restore/schema_worker.go @@ -167,7 +167,6 @@ func (w *schemaWorker) restoreFromSchemaFile(ctx context.Context) error { ClusterID: w.run.ClusterID, TaskID: w.run.TaskID, RunID: w.run.ID, - ManifestPath: "DESCRIBE SCHEMA WITH INTERNALS", Keyspace: u.Keyspace, Table: t.Table, DownloadStartedAt: &start, @@ -188,8 +187,6 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc w.logger.Info(ctx, "Downloading schema from location", "location", location) defer w.logger.Info(ctx, "Downloading schema from location finished", "location", location) - w.run.Location = location.String() - tableDownloadHandler := func(fm FilesMeta) error { if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { return nil @@ -198,9 +195,6 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc w.logger.Info(ctx, "Downloading schema table", "keyspace", fm.Keyspace, "table", fm.Table) defer w.logger.Info(ctx, "Downloading schema table finished", "keyspace", fm.Keyspace, "table", fm.Table) - w.run.Table = fm.Table - w.run.Keyspace = fm.Keyspace - return w.workFunc(ctx, fm) } @@ -209,7 +203,6 @@ func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Loc defer w.logger.Info(ctx, "Downloading schema from manifest finished", "manifest", miwc.ManifestInfo) w.miwc = miwc - w.run.ManifestPath = miwc.Path() w.insertRun(ctx) return miwc.ForEachIndexIterWithError(nil, tableDownloadHandler) @@ -242,12 +235,6 @@ func (w *schemaWorker) workFunc(ctx context.Context, fm FilesMeta) error { if err != nil { return errors.Wrap(err, "initialize versioned SSTables") } - if len(w.versionedFiles) > 0 { - w.logger.Info(ctx, "Chosen versioned SSTables", - "dir", srcDir, - "versioned_files", w.versionedFiles, - ) - } idMapping := w.getFileNamesMapping(fm.Files, false) uuidMapping := w.getFileNamesMapping(fm.Files, true) @@ -275,10 +262,7 @@ func (w *schemaWorker) workFunc(ctx context.Context, fm FilesMeta) error { // Rename SSTable in the destination in order to avoid name conflicts dstFile := renamedSSTables[file] // Take the correct version of restored file - srcFile := file - if v, ok := w.versionedFiles[file]; ok { - srcFile = v.FullName() - } + srcFile := w.versionedFiles[file].FullName() srcPath := path.Join(srcDir, srcFile) dstPath := path.Join(dstDir, dstFile) @@ -308,9 +292,9 @@ func (w *schemaWorker) workFunc(ctx context.Context, fm FilesMeta) error { ClusterID: w.run.ClusterID, TaskID: w.run.TaskID, RunID: w.run.ID, - ManifestPath: w.run.ManifestPath, - Keyspace: w.run.Keyspace, - Table: w.run.Table, + RemoteSSTableDir: srcDir, + Keyspace: fm.Keyspace, + Table: fm.Table, Host: host, DownloadStartedAt: &start, DownloadCompletedAt: &end, diff --git a/pkg/service/restore/service.go b/pkg/service/restore/service.go index 523bf01f23..dfe1fbc7a6 100644 --- a/pkg/service/restore/service.go +++ b/pkg/service/restore/service.go @@ -115,8 +115,12 @@ func (s *Service) Restore(ctx context.Context, clusterID, taskID, runID uuid.UUI for _, unit := range w.run.Units { totalBytesToRestore += unit.Size } - tw := newTablesWorker(w, s.repairSvc, totalBytesToRestore) - err = tw.restore(ctx) + tw, workerErr := newTablesWorker(w, s.repairSvc, totalBytesToRestore) + if workerErr != nil { + err = workerErr + } else { + err = tw.restore(ctx) + } } else { sw := &schemaWorker{worker: w} err = sw.restore(ctx) diff --git a/pkg/service/restore/service_restore_integration_test.go b/pkg/service/restore/service_restore_integration_test.go index a2e784c945..5ff1d7c86a 100644 --- a/pkg/service/restore/service_restore_integration_test.go +++ b/pkg/service/restore/service_restore_integration_test.go @@ -852,15 +852,6 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo t.Fatalf("Expected context error but got: %+v", err) } - pr, err := dstH.service.GetProgress(context.Background(), dstH.ClusterID, dstH.TaskID, dstH.RunID) - if err != nil { - t.Fatal(err) - } - Printf("And: restore progress: %+#v\n", pr) - if pr.Downloaded == 0 { - t.Fatal("Expected partial restore progress") - } - Print("When: resume restore and stop in during repair") dstH.RunID = uuid.MustRandom() err = dstH.service.Restore(ctx2, dstH.ClusterID, dstH.TaskID, dstH.RunID, dstH.targetToProperties(target)) @@ -872,7 +863,7 @@ func restoreWithResume(t *testing.T, target Target, keyspace string, loadCnt, lo t.Fatalf("Expected context error but got: %+v", err) } - pr, err = dstH.service.GetProgress(context.Background(), dstH.ClusterID, dstH.TaskID, dstH.RunID) + pr, err := dstH.service.GetProgress(context.Background(), dstH.ClusterID, dstH.TaskID, dstH.RunID) if err != nil { t.Fatal(err) } diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index 774b581de7..5d77c52bb8 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -9,20 +9,21 @@ import ( "sync" "github.com/pkg/errors" + "github.com/scylladb/go-set/strset" "github.com/scylladb/scylla-manager/v3/pkg/metrics" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" "github.com/scylladb/scylla-manager/v3/pkg/service/repair" + "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" + "github.com/scylladb/scylla-manager/v3/pkg/util/query" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) type tablesWorker struct { worker - repairSvc *repair.Service - progress *TotalRestoreProgress - // When set to false, tablesWorker will skip restoration of location/manifest/table - // until it encounters the one present in run. - alreadyResumed bool + tableVersion map[TableName]string + repairSvc *repair.Service + progress *TotalRestoreProgress } // TotalRestoreProgress is a struct that holds information about the total progress of the restore job. @@ -64,22 +65,33 @@ func (p *TotalRestoreProgress) Update(bytesRestored int64) { p.restoredBytes += bytesRestored } -func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) *tablesWorker { - return &tablesWorker{ - worker: w, - repairSvc: repairSvc, - alreadyResumed: true, - progress: NewTotalRestoreProgress(totalBytes), +func newTablesWorker(w worker, repairSvc *repair.Service, totalBytes int64) (*tablesWorker, error) { + versions := make(map[TableName]string) + for _, u := range w.run.Units { + for _, t := range u.Tables { + v, err := query.GetTableVersion(w.clusterSession, u.Keyspace, t.Table) + if err != nil { + return nil, errors.Wrapf(err, "get %s.%s version", u.Keyspace, t.Table) + } + versions[TableName{ + Keyspace: u.Keyspace, + Table: t.Table, + }] = v + } } + + return &tablesWorker{ + worker: w, + tableVersion: versions, + repairSvc: repairSvc, + progress: NewTotalRestoreProgress(totalBytes), + }, nil } // restore files from every location specified in restore target. func (w *tablesWorker) restore(ctx context.Context) error { - if w.target.Continue && w.run.PrevID != uuid.Nil && w.run.Table != "" { - w.alreadyResumed = false - } // Init metrics only on fresh start - if w.alreadyResumed { + if w.run.PrevID == uuid.Nil { w.initRestoreMetrics(ctx) } @@ -156,90 +168,60 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { w.logger.Info(ctx, "Started restoring tables") defer w.logger.Info(ctx, "Restoring tables finished") - // Restore locations in deterministic order - for _, l := range w.target.Location { - if !w.alreadyResumed && w.run.Location != l.String() { - w.logger.Info(ctx, "Skipping location", "location", l) - continue - } - if err := w.restoreLocation(ctx, l); err != nil { - return err - } + workload, err := w.IndexWorkload(ctx, w.target.Location) + if err != nil { + return err } - return nil -} - -func (w *tablesWorker) restoreLocation(ctx context.Context, location Location) error { - w.logger.Info(ctx, "Restoring location", "location", location) - defer w.logger.Info(ctx, "Restoring location finished", "location", location) + w.initMetrics(workload) - restoreManifest := func(miwc ManifestInfoWithContent) error { - if !w.alreadyResumed && w.run.ManifestPath != miwc.Path() { - w.logger.Info(ctx, "Skipping manifest", "manifest", miwc.ManifestInfo) - return nil - } - - w.logger.Info(ctx, "Restoring manifest", "manifest", miwc.ManifestInfo) - defer w.logger.Info(ctx, "Restoring manifest finished", "manifest", miwc.ManifestInfo) - - return miwc.ForEachIndexIterWithError(nil, w.restoreDir(ctx, miwc)) + bd := newBatchDispatcher(workload, w.target.BatchSize, w.target.locationHosts) + hostsS := strset.New() + for _, h := range w.target.locationHosts { + hostsS.Add(h...) } - - return w.forEachManifest(ctx, location, restoreManifest) -} - -func (w *tablesWorker) restoreDir(ctx context.Context, miwc ManifestInfoWithContent) func(fm FilesMeta) error { - return func(fm FilesMeta) error { - if !unitsContainTable(w.run.Units, fm.Keyspace, fm.Table) { - return nil - } - - if !w.alreadyResumed { - if w.run.Keyspace != fm.Keyspace || w.run.Table != fm.Table { - w.logger.Info(ctx, "Skipping table", "keyspace", fm.Keyspace, "table", fm.Table) + hosts := hostsS.List() + + f := func(n int) (err error) { + h := hosts[n] + for { + // Download and stream in parallel + b, ok := bd.DispatchBatch(h) + if !ok { + w.logger.Info(ctx, "No more batches to restore", "host", h) return nil } - } - - w.logger.Info(ctx, "Restoring table", "keyspace", fm.Keyspace, "table", fm.Table) - defer w.logger.Info(ctx, "Restoring table finished", "keyspace", fm.Keyspace, "table", fm.Table) - - w.run.Location = miwc.Location.String() - w.run.ManifestPath = miwc.Path() - w.run.Table = fm.Table - w.run.Keyspace = fm.Keyspace - w.insertRun(ctx) - - dw, err := newTablesDirWorker(ctx, w.worker, miwc, fm, w.progress) - if err != nil { - return errors.Wrap(err, "create dir worker") - } - if !w.alreadyResumed { - if err := dw.resumePrevProgress(); err != nil { - return errors.Wrap(err, "resume prev run progress") - } - } - w.alreadyResumed = true + w.metrics.IncreaseBatchSize(w.run.ClusterID, h, b.Size) + w.logger.Info(ctx, "Got batch to restore", + "host", h, + "keyspace", b.Keyspace, + "table", b.Table, + "size", b.Size, + "sstable count", len(b.SSTables), + ) - if err := dw.restore(ctx); err != nil { - if ctx.Err() != nil { - return ctx.Err() + pr, err := w.newRunProgress(ctx, h, b) + if err != nil { + return errors.Wrap(err, "create new run progress") } - // In case all SSTables have been restored, restore can proceed even - // with errors from some hosts. - if len(dw.bundleIDPool) > 0 { - return errors.Wrapf(err, "not restored bundles %v", dw.bundleIDPool.drain()) + if err := w.restoreBatch(ctx, b, pr); err != nil { + return errors.Wrap(err, "restore batch") } - - w.logger.Error(ctx, "Restore table failed on some hosts but restore will proceed", - "keyspace", w.run.Keyspace, - "table", w.run.Table, - "error", err, - ) + w.decreaseRemainingBytesMetric(b) } + } + + notify := func(n int, err error) { + w.logger.Error(ctx, "Failed to restore files on host", + "host", hosts[n], + "error", err, + ) + } - return nil + err = parallel.Run(len(hosts), w.target.Parallel, f, notify) + if err == nil { + return bd.ValidateAllDispatched() } + return err } func (w *tablesWorker) stageRepair(ctx context.Context) error { diff --git a/pkg/service/restore/tablesdir_worker.go b/pkg/service/restore/tablesdir_worker.go index b2942a9eb7..990efc9ad2 100644 --- a/pkg/service/restore/tablesdir_worker.go +++ b/pkg/service/restore/tablesdir_worker.go @@ -8,190 +8,43 @@ import ( "time" "github.com/pkg/errors" - "github.com/scylladb/go-set/strset" "github.com/scylladb/scylla-manager/v3/pkg/metrics" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" - "github.com/scylladb/scylla-manager/v3/pkg/sstable" "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" - "github.com/scylladb/scylla-manager/v3/pkg/util/query" - "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" - "go.uber.org/atomic" ) -// tablesDirWorker is responsible for restoring table files from manifest in parallel. -type tablesDirWorker struct { - worker - - bundles map[string]bundle // Maps bundle to it's ID - bundleIDPool idPool // SSTable IDs yet to be restored - - dstDir string // "data:" prefixed path to upload dir (common for every host) - srcDir string // Full path to remote directory with backed-up files - miwc ManifestInfoWithContent // Manifest containing fm - fm FilesMeta // Describes table and it's files located in srcDir - - ongoingPr []*RunProgress // Unfinished RunProgress from previous run of each host - - // Maps original SSTable name to its existing older version - // (with respect to currently restored snapshot tag) - // that should be used during the restore procedure. - versionedFiles VersionedMap - fileSizesCache map[string]int64 - progress *TotalRestoreProgress -} - -func newTablesDirWorker(ctx context.Context, w worker, miwc ManifestInfoWithContent, fm FilesMeta, progress *TotalRestoreProgress) (tablesDirWorker, error) { - bundles := newBundles(fm) - bundleIDPool := newIDPool(bundles) - - version, err := query.GetTableVersion(w.clusterSession, fm.Keyspace, fm.Table) - if err != nil { - return tablesDirWorker{}, errors.Wrap(err, "get table version") - } - - srcDir := miwc.LocationSSTableVersionDir(fm.Keyspace, fm.Table, fm.Version) - dstDir := UploadTableDir(fm.Keyspace, fm.Table, version) - w.logger.Info(ctx, "Found table's src and dst dirs", - "keyspace", fm.Keyspace, - "table", fm.Table, - "src_dir", srcDir, - "dst_dir", dstDir, - ) - - hosts := w.target.locationHosts[miwc.Location] - versionedFiles, err := ListVersionedFiles(ctx, w.client, w.run.SnapshotTag, hosts[0], srcDir) - if err != nil { - return tablesDirWorker{}, errors.Wrap(err, "initialize versioned SSTables") - } - if len(versionedFiles) > 0 { - w.logger.Info(ctx, "Chosen versioned SSTables", - "dir", srcDir, - "versioned_files", versionedFiles, - ) - } - - fileSizesCache, err := buildFilesSizesCache(ctx, w.client, hosts[0], srcDir, versionedFiles) - if err != nil { - return tablesDirWorker{}, errors.Wrap(err, "build files sizes cache") - } - - return tablesDirWorker{ - worker: w, - bundles: bundles, - bundleIDPool: bundleIDPool, - dstDir: dstDir, - srcDir: srcDir, - miwc: miwc, - fm: fm, - ongoingPr: make([]*RunProgress, len(hosts)), - versionedFiles: versionedFiles, - fileSizesCache: fileSizesCache, - progress: progress, - }, nil -} - -// restore SSTables of receivers manifest/table in parallel. -func (w *tablesDirWorker) restore(ctx context.Context) error { - // Count of SSTable IDs yet to be successfully restored - ctr := atomic.NewInt64(w.bundleIDPool.size()) - for _, pr := range w.ongoingPr { - if pr != nil { - ctr.Add(pr.idCnt()) - } - } - - if ctr.Load() == 0 { - w.logger.Info(ctx, "Table does not have any more SSTables to restore", - "keyspace", w.fm.Keyspace, - "table", w.fm.Table, - ) - return nil - } - - hosts := w.target.locationHosts[w.miwc.Location] - f := func(n int) (err error) { - h := hosts[n] - ongoingPr := w.ongoingPr[n] - - // First handle ongoing restore - if ongoingPr != nil { - if err := w.reactivateRunProgress(ctx, ongoingPr); err != nil { - return errors.Wrap(err, "reactivate run progress") - } - - if err := w.restoreBatch(ctx, ongoingPr); err != nil { - return errors.Wrap(err, "restore reactivated batch") - } - - if ctr.Sub(ongoingPr.idCnt()) <= 0 { - close(w.bundleIDPool) - } - } - - for { - pr, err := w.newRunProgress(ctx, h) - if err != nil { - return errors.Wrap(err, "create run progress") - } - if pr == nil { - w.logger.Info(ctx, "No more batches to restore", "host", h) - return nil - } - - if err := w.restoreBatch(ctx, pr); err != nil { - return errors.Wrap(err, "restore batch") - } - - if ctr.Sub(pr.idCnt()) <= 0 { - close(w.bundleIDPool) - } - } - } - - notify := func(n int, err error) { - w.logger.Error(ctx, "Failed to restore files on host", - "host", hosts[n], - "error", err, - ) - } - - return parallel.Run(len(hosts), w.target.Parallel, f, notify) -} - -func (w *tablesDirWorker) restoreBatch(ctx context.Context, pr *RunProgress) (err error) { +func (w *tablesWorker) restoreBatch(ctx context.Context, b batch, pr *RunProgress) (err error) { defer func() { // Run cleanup on non-pause error - if err != nil && ctx.Err() == nil { - w.metrics.SetRestoreState(w.run.ClusterID, w.miwc.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateError) + if err != nil { + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateError) w.cleanupRunProgress(context.Background(), pr) } else { - w.metrics.SetRestoreState(w.run.ClusterID, w.miwc.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateIdle) + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateIdle) } }() // Download has already been started on RunProgress creation. // Skip steps already done in the previous run. if !validateTimeIsSet(pr.DownloadCompletedAt) { - if err := w.waitJob(ctx, pr); err != nil { + if err := w.waitJob(ctx, b, pr); err != nil { return errors.Wrap(err, "wait for job") } } if !validateTimeIsSet(pr.RestoreCompletedAt) { - if err := w.restoreSSTables(ctx, pr); err != nil { + if err := w.restoreSSTables(ctx, b, pr); err != nil { return errors.Wrap(err, "call load and stream") } } - - w.decreaseRemainingBytesMetric(pr.Downloaded + pr.Skipped + pr.VersionedProgress) w.logger.Info(ctx, "Restored batch", "host", pr.Host, "sstable_id", pr.SSTableID) return nil } // waitJob waits for rclone job to finish while updating its progress. -func (w *tablesDirWorker) waitJob(ctx context.Context, pr *RunProgress) (err error) { - w.metrics.SetRestoreState(w.run.ClusterID, w.miwc.Location, w.miwc.SnapshotTag, pr.Host, metrics.RestoreStateDownloading) +func (w *tablesWorker) waitJob(ctx context.Context, b batch, pr *RunProgress) (err error) { + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateDownloading) w.logger.Info(ctx, "Waiting for job", "host", pr.Host, "job_id", pr.AgentJobID) defer func() { @@ -229,7 +82,7 @@ func (w *tablesDirWorker) waitJob(ctx context.Context, pr *RunProgress) (err err } } -func (w *tablesDirWorker) updateDownloadProgress(ctx context.Context, pr *RunProgress, job *scyllaclient.RcloneJobProgress) { +func (w *tablesWorker) updateDownloadProgress(ctx context.Context, pr *RunProgress, job *scyllaclient.RcloneJobProgress) { // Set StartedAt and CompletedAt based on Job if t := time.Time(job.StartedAt); !t.IsZero() { pr.DownloadStartedAt = &t @@ -245,8 +98,8 @@ func (w *tablesDirWorker) updateDownloadProgress(ctx context.Context, pr *RunPro w.insertRunProgress(ctx, pr) } -func (w *tablesDirWorker) restoreSSTables(ctx context.Context, pr *RunProgress) error { - w.metrics.SetRestoreState(w.run.ClusterID, w.miwc.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateLoading) +func (w *tablesWorker) restoreSSTables(ctx context.Context, b batch, pr *RunProgress) error { + w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, pr.Host, metrics.RestoreStateLoading) if !validateTimeIsSet(pr.RestoreStartedAt) { pr.setRestoreStartedAt() w.insertRunProgress(ctx, pr) @@ -260,110 +113,19 @@ func (w *tablesDirWorker) restoreSSTables(ctx context.Context, pr *RunProgress) return err } -func (w *tablesDirWorker) resumePrevProgress() error { - bind := &RunProgress{ - ClusterID: w.run.ClusterID, - TaskID: w.run.TaskID, - RunID: w.run.ID, - ManifestPath: w.miwc.Path(), - Keyspace: w.fm.Keyspace, - Table: w.fm.Table, - } - - // All bundles IDs started in the previous run - startedID := strset.New() - // All unfinished RunProgress started in the previous run - ongoingPr := make(map[string]*RunProgress) - err := bind.ForEachTableProgress(w.session, func(pr *RunProgress) { - startedID.Add(pr.SSTableID...) - if !validateTimeIsSet(pr.RestoreCompletedAt) { - cp := *pr - ongoingPr[pr.Host] = &cp - } - }) - if err != nil { - return err - } - - // Remove already started ID from the pool - ids := w.bundleIDPool.drain() - for _, id := range ids { - if !startedID.Has(id) { - w.bundleIDPool <- id - } - } - - hosts := w.target.locationHosts[w.miwc.Location] - // Set ongoing RunProgress so that they can be resumed - for i, h := range hosts { - w.ongoingPr[i] = ongoingPr[h] - } - return nil -} - -// reactivateRunProgress preserves batch assembled in the previous run and tries to reuse its unfinished rclone job. -func (w *tablesDirWorker) reactivateRunProgress(ctx context.Context, pr *RunProgress) error { - // Nothing to do if download has already finished - if validateTimeIsSet(pr.DownloadCompletedAt) { - return nil - } - - job, err := w.client.RcloneJobProgress(ctx, pr.Host, pr.AgentJobID, w.config.LongPollingTimeoutSeconds) - if err != nil { - return errors.Wrapf(err, "get progress of rclone job %d", pr.AgentJobID) - } - // Nothing to do if rclone job is still running - if scyllaclient.WorthWaitingForJob(job.Status) { - return nil - } - - // Recreate rclone job - batch := w.batchFromIDs(pr.SSTableID) - if err := w.cleanUploadDir(ctx, pr.Host, w.dstDir, batch); err != nil { - return errors.Wrapf(err, "clean upload dir of host %s", pr.Host) - } - - jobID, versionedPr, err := w.startDownload(ctx, pr.Host, batch) - if err != nil { - w.deleteRunProgress(ctx, pr) - w.returnBatchToPool(pr.SSTableID, pr.Host) - return err - } - - pr.AgentJobID = jobID - pr.VersionedProgress = versionedPr - w.insertRunProgress(ctx, pr) - return nil -} - -// newRunProgress creates RunProgress by assembling batch and starting download to host's upload dir. -func (w *tablesDirWorker) newRunProgress(ctx context.Context, host string) (*RunProgress, error) { +// newRunProgress creates RunProgress by starting download to host's upload dir. +func (w *tablesWorker) newRunProgress(ctx context.Context, host string, b batch) (*RunProgress, error) { if err := w.checkAvailableDiskSpace(ctx, host); err != nil { return nil, errors.Wrap(err, "validate free disk space") } - takenIDs := w.chooseIDsForBatch(ctx, w.target.BatchSize, host) - if ctx.Err() != nil { - w.returnBatchToPool(takenIDs, host) - return nil, ctx.Err() - } - if takenIDs == nil { - return nil, nil //nolint: nilnil - } - - w.logger.Info(ctx, "Created new batch", - "host", host, - "sstable_id", takenIDs, - ) - - batch := w.batchFromIDs(takenIDs) - if err := w.cleanUploadDir(ctx, host, w.dstDir, nil); err != nil { + uploadDir := UploadTableDir(b.Keyspace, b.Table, w.tableVersion[b.TableName]) + if err := w.cleanUploadDir(ctx, host, uploadDir, nil); err != nil { return nil, errors.Wrapf(err, "clean upload dir of host %s", host) } - jobID, versionedPr, err := w.startDownload(ctx, host, batch) + jobID, versionedPr, err := w.startDownload(ctx, host, b) if err != nil { - w.returnBatchToPool(takenIDs, host) return nil, err } @@ -371,12 +133,12 @@ func (w *tablesDirWorker) newRunProgress(ctx context.Context, host string) (*Run ClusterID: w.run.ClusterID, TaskID: w.run.TaskID, RunID: w.run.ID, - ManifestPath: w.miwc.Path(), - Keyspace: w.fm.Keyspace, - Table: w.fm.Table, + RemoteSSTableDir: b.RemoteSSTableDir, + Keyspace: b.Keyspace, + Table: b.Table, Host: host, AgentJobID: jobID, - SSTableID: takenIDs, + SSTableID: b.IDs(), VersionedProgress: versionedPr, } @@ -388,135 +150,86 @@ func (w *tablesDirWorker) newRunProgress(ctx context.Context, host string) (*Run // Downloading of versioned files happens first in a synchronous way. // It returns jobID for asynchronous download of the newest versions of files // alongside with the size of the already downloaded versioned files. -func (w *tablesDirWorker) startDownload(ctx context.Context, host string, batch []string) (jobID, versionedPr int64, err error) { - var ( - regularBatch = make([]string, 0) - versionedBatch = make([]VersionedSSTable, 0) - ) - // Decide which files require to be downloaded in their older version - for _, file := range batch { - if v, ok := w.versionedFiles[file]; ok { - versionedBatch = append(versionedBatch, v) - versionedPr += v.Size - } else { - regularBatch = append(regularBatch, file) - } - } - // Downloading versioned files requires us to rename them (strip version extension) - // and function RcloneCopyPaths lacks this option. In order to achieve that, we copy - // all versioned files one by one with RcloneCopyFile (which supports renaming files). - // The assumption is that the existence of versioned files is low and that they - // are rather small, so we can do it in a synchronous way. - // Copying files can be done in full parallel because of rclone ability to limit transfers. - f := func(i int) error { - file := versionedBatch[i] - // Restore file without its version extension - dst := path.Join(w.dstDir, file.Name) - src := path.Join(w.srcDir, file.FullName()) - - if err := w.client.RcloneCopyFile(ctx, host, dst, src); err != nil { - return parallel.Abort(errors.Wrapf(err, "host %s: download versioned file %s into %s", host, src, dst)) +func (w *tablesWorker) startDownload(ctx context.Context, host string, b batch) (jobID, versionedPr int64, err error) { + uploadDir := UploadTableDir(b.Keyspace, b.Table, w.tableVersion[b.TableName]) + sstables := b.NotVersionedSSTables() + versioned := b.VersionedSSTables() + versionedSize := b.VersionedSize() + if len(versioned) > 0 { + if err := w.downloadVersioned(ctx, host, b.RemoteSSTableDir, uploadDir, versioned); err != nil { + return 0, 0, errors.Wrapf(err, "download versioned sstabled on host %s", host) } - - w.logger.Info(ctx, "Downloaded versioned file", - "host", host, - "src", src, - "dst", dst, - "size", file.Size, - ) - - return nil - } - - notify := func(i int, err error) { - file := versionedBatch[i] - dst := path.Join(w.dstDir, file.Name) - src := path.Join(w.srcDir, file.FullName()) - w.logger.Error(ctx, "Failed to download versioned SSTable", - "file", file, - "dst", dst, - "src", src, - "error", err, - ) } - if err := parallel.Run(len(versionedBatch), parallel.NoLimit, f, notify); err != nil { - return 0, 0, err - } // Start asynchronous job for downloading the newest versions of remaining files - jobID, err = w.client.RcloneCopyPaths(ctx, host, w.dstDir, w.srcDir, regularBatch) + files := make([]string, 0) + for _, sst := range sstables { + files = append(files, sst.Files...) + } + jobID, err = w.client.RcloneCopyPaths(ctx, host, uploadDir, b.RemoteSSTableDir, files) if err != nil { return 0, 0, errors.Wrap(err, "download batch to upload dir") } - w.logger.Info(ctx, "Started downloading files", "host", host, "job_id", jobID, - "batch", regularBatch, ) - - return jobID, versionedPr, nil + return jobID, versionedSize, nil } -// chooseIDsForBatch returns slice of IDs of SSTables that the batch consists of. -func (w *tablesDirWorker) chooseIDsForBatch(ctx context.Context, size int, host string) (takenIDs []string) { - defer func() { - w.increaseBatchSizeMetric(w.run.ClusterID, w.batchFromIDs(takenIDs), host) - }() - - // All hosts are trying to get IDs for batch from the pool. - // Pool is closed after the whole table has been restored. - - // Take at most size IDs - for i := 0; i < size; i++ { - select { - case <-ctx.Done(): - return takenIDs - default: - } - - select { - case id, ok := <-w.bundleIDPool: - if !ok { - return takenIDs - } - takenIDs = append(takenIDs, id) - default: - // Don't wait for more IDs if the pool is empty - // and host already has something to restore. - if len(takenIDs) > 0 { - return takenIDs - } - // Here host hasn't taken any IDs and pool is empty, - // so it waits for the whole table to be restored or - // for IDs that might return to the pool in case of error on the other hosts. - select { - case id, ok := <-w.bundleIDPool: - if !ok { - return takenIDs - } - takenIDs = append(takenIDs, id) - case <-ctx.Done(): - return takenIDs +// Downloading versioned files requires us to rename them (strip version extension) +// and function RcloneCopyPaths lacks this option. In order to achieve that, we copy +// all versioned files one by one with RcloneCopyFile (which supports renaming files). +// The assumption is that the existence of versioned files is low and that they +// are rather small, so we can do it in a synchronous way. +// Copying files can be done in full parallel because of rclone ability to limit transfers. +func (w *tablesWorker) downloadVersioned(ctx context.Context, host, srcDir, dstDir string, versioned []RemoteSSTable) error { + f := func(i int) error { + sst := versioned[i] + for _, file := range sst.Files { + name, _ := SplitNameAndVersion(file) + // Restore file without its version extension + dst := path.Join(dstDir, name) + src := path.Join(srcDir, file) + if err := w.client.RcloneCopyFile(ctx, host, dst, src); err != nil { + return parallel.Abort(errors.Wrapf(err, "host %s: download versioned file %s into %s", host, src, dst)) } } + w.logger.Info(ctx, "Downloaded versioned sstable", + "host", host, + "sstable ID", sst.ID, + "src dir", srcDir, + "dst dir", dstDir, + ) + return nil + } + + notify := func(i int, err error) { + sst := versioned[i] + w.logger.Error(ctx, "Failed to download versioned sstable", + "host", host, + "sstable ID", sst.ID, + "src dir", srcDir, + "dst dir", dstDir, + "error", err, + ) } - return takenIDs + return parallel.Run(len(versioned), parallel.NoLimit, f, notify) } -func (w *tablesDirWorker) decreaseRemainingBytesMetric(bytes int64) { +func (w *tablesWorker) decreaseRemainingBytesMetric(b batch) { labels := metrics.RestoreBytesLabels{ ClusterID: w.run.ClusterID.String(), SnapshotTag: w.run.SnapshotTag, - Location: w.miwc.Location.String(), - DC: w.miwc.DC, - Node: w.miwc.NodeID, - Keyspace: w.fm.Keyspace, - Table: w.fm.Table, + Location: b.Location.String(), + DC: b.DC, + Node: b.NodeID, + Keyspace: b.Keyspace, + Table: b.Table, } - w.metrics.DecreaseRemainingBytes(labels, bytes) - w.progress.Update(bytes) + w.metrics.DecreaseRemainingBytes(labels, b.Size) + w.progress.Update(b.Size) progressLabels := metrics.RestoreProgressLabels{ ClusterID: w.run.ClusterID.String(), @@ -525,80 +238,13 @@ func (w *tablesDirWorker) decreaseRemainingBytesMetric(bytes int64) { w.metrics.SetProgress(progressLabels, w.progress.CurrentProgress()) } -func (w *tablesDirWorker) increaseBatchSizeMetric(clusterID uuid.UUID, batch []string, host string) { - w.metrics.IncreaseBatchSize(clusterID, host, w.countBatchSize(batch)) -} - -func (w *tablesDirWorker) decreaseBatchSizeMetric(clusterID uuid.UUID, batch []string, host string) { - w.metrics.DecreaseBatchSize(clusterID, host, w.countBatchSize(batch)) -} - -func (w *tablesDirWorker) cleanupRunProgress(ctx context.Context, pr *RunProgress) { +func (w *tablesWorker) cleanupRunProgress(ctx context.Context, pr *RunProgress) { w.deleteRunProgress(ctx, pr) - w.returnBatchToPool(pr.SSTableID, pr.Host) - - if cleanErr := w.cleanUploadDir(ctx, pr.Host, w.dstDir, nil); cleanErr != nil { - w.logger.Error(ctx, "Couldn't clear destination directory", "host", pr.Host, "error", cleanErr) + tn := TableName{ + Keyspace: pr.Keyspace, + Table: pr.Table, } -} - -func (w *tablesDirWorker) returnBatchToPool(ids []string, host string) { - w.decreaseBatchSizeMetric(w.run.ClusterID, w.batchFromIDs(ids), host) - for _, id := range ids { - w.bundleIDPool <- id - } -} - -func (w *tablesDirWorker) batchFromIDs(ids []string) []string { - var batch []string - for _, id := range ids { - batch = append(batch, w.bundles[id]...) - } - return batch -} - -func (w *tablesDirWorker) countBatchSize(batch []string) int64 { - var batchSize int64 - for _, file := range batch { - batchSize += w.fileSizesCache[file] - } - return batchSize -} - -// bundle represents SSTables with the same ID. -type bundle []string - -func newBundles(fm FilesMeta) map[string]bundle { - bundles := make(map[string]bundle) - for _, f := range fm.Files { - id, err := sstable.ExtractID(f) - if err != nil { - panic(err) - } - bundles[id] = append(bundles[id], f) - } - return bundles -} - -// idPool represents pool of SSTableIDs yet to be restored. -type idPool chan string - -func (p idPool) drain() []string { - var out []string - for len(p) > 0 { - out = append(out, <-p) - } - return out -} - -func (p idPool) size() int64 { - return int64(len(p)) -} - -func newIDPool(bundles map[string]bundle) chan string { - bundleIDPool := make(chan string, len(bundles)) - for id := range bundles { - bundleIDPool <- id + if cleanErr := w.cleanUploadDir(ctx, pr.Host, UploadTableDir(pr.Keyspace, pr.Table, w.tableVersion[tn]), nil); cleanErr != nil { + w.logger.Error(ctx, "Couldn't clear destination directory", "host", pr.Host, "error", cleanErr) } - return bundleIDPool } diff --git a/pkg/service/restore/worker.go b/pkg/service/restore/worker.go index 00f96c6dca..8984a934a2 100644 --- a/pkg/service/restore/worker.go +++ b/pkg/service/restore/worker.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "fmt" + "math/rand" "path" "regexp" "slices" @@ -43,6 +44,14 @@ type worker struct { clusterSession gocqlx.Session } +func (w *worker) randomHostFromLocation(loc Location) string { + hosts, ok := w.target.locationHosts[loc] + if !ok { + panic("no hosts for location: " + loc.String()) + } + return hosts[rand.Intn(len(hosts))] +} + func (w *worker) init(ctx context.Context, properties json.RawMessage) error { if err := w.initTarget(ctx, properties); err != nil { return errors.Wrap(err, "init target") @@ -666,10 +675,6 @@ func (w *worker) decorateWithPrevRun(ctx context.Context) error { if w.target.Continue { w.run.PrevID = prev.ID - w.run.Location = prev.Location - w.run.ManifestPath = prev.ManifestPath - w.run.Keyspace = prev.Keyspace - w.run.Table = prev.Table w.run.Stage = prev.Stage w.run.RepairTaskID = prev.RepairTaskID } @@ -684,6 +689,11 @@ func (w *worker) clonePrevProgress(ctx context.Context) { defer q.Release() err := forEachProgress(w.session, w.run.ClusterID, w.run.TaskID, w.run.PrevID, func(pr *RunProgress) { + // We don't support interrupted run progresses resume, + // so only finished run progresses should be copied. + if !validateTimeIsSet(pr.RestoreCompletedAt) { + return + } pr.RunID = w.run.ID if err := q.BindStruct(pr).Exec(); err != nil { w.logger.Error(ctx, "Couldn't clone run progress", @@ -794,20 +804,3 @@ func (w *worker) stopJob(ctx context.Context, jobID int64, host string) { ) } } - -func buildFilesSizesCache(ctx context.Context, client *scyllaclient.Client, host, dir string, versioned VersionedMap) (map[string]int64, error) { - filesSizesCache := make(map[string]int64) - opts := &scyllaclient.RcloneListDirOpts{ - FilesOnly: true, - } - f := func(item *scyllaclient.RcloneListDirItem) { - filesSizesCache[item.Name] = item.Size - } - if err := client.RcloneListDirIter(ctx, host, dir, opts, f); err != nil { - return nil, errors.Wrapf(err, "host %s: listing all files from %s", host, dir) - } - for k, v := range versioned { - filesSizesCache[k] = v.Size - } - return filesSizesCache, nil -} diff --git a/schema/v3.4.0.cql b/schema/v3.4.0.cql new file mode 100644 index 0000000000..4fe13883cd --- /dev/null +++ b/schema/v3.4.0.cql @@ -0,0 +1,4 @@ +ALTER TABLE restore_run DROP location; +ALTER TABLE restore_run DROP manifest_path; +ALTER TABLE restore_run DROP keyspace_name; +ALTER TABLE restore_run DROP table_name; \ No newline at end of file