diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 9021df83..654ac55d 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -72,7 +72,7 @@ type Client struct { storage storage.ExternalStorage backend *backuppb.StorageBackend - gcTTL int64 + gcTTL time.Duration } // NewBackupClient returns a new backup client. @@ -135,6 +135,26 @@ func (bc *Client) GetCurAPIVersion() kvrpcpb.APIVersion { return bc.curAPIVer } +func (bc *Client) UpdateBRGCSafePoint(ctx context.Context, safeInterval time.Duration) (uint64, error) { + if bc.GetCurAPIVersion() != kvrpcpb.APIVersion_V2 { + return 0, nil + } + backupTS, err := bc.GetTS(ctx, safeInterval, 0) + if err != nil { + return 0, errors.Trace(err) + } + sp := utils.BRServiceSafePoint{ + BackupTS: backupTS, + TTL: int64(bc.GetGCTTL().Seconds()), + ID: utils.MakeSafePointID(), + } + err = utils.UpdateServiceSafePoint(ctx, bc.mgr.GetPDClient(), sp) + if err != nil { + return 0, errors.Trace(err) + } + return backupTS, nil +} + // SetLockFile set write lock file. func (bc *Client) SetLockFile(ctx context.Context) error { return bc.storage.WriteFile(ctx, metautil.LockFile, @@ -143,7 +163,7 @@ func (bc *Client) SetLockFile(ctx context.Context) error { } // SetGCTTL set gcTTL for client. -func (bc *Client) SetGCTTL(ttl int64) { +func (bc *Client) SetGCTTL(ttl time.Duration) { if ttl <= 0 { ttl = utils.DefaultBRGCSafePointTTL } @@ -151,7 +171,7 @@ func (bc *Client) SetGCTTL(ttl int64) { } // GetGCTTL get gcTTL for this backup. -func (bc *Client) GetGCTTL() int64 { +func (bc *Client) GetGCTTL() time.Duration { return bc.gcTTL } diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index 77c7f740..3ad94897 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -4,6 +4,7 @@ package backup_test import ( "context" + "math" "testing" "time" @@ -11,6 +12,7 @@ import ( . "github.com/pingcap/check" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/errorpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" @@ -52,7 +54,7 @@ func (r *testBackup) SetUpSuite(c *C) { defer httpmock.DeactivateAndReset() // Exact URL match httpmock.RegisterResponder("GET", `=~^/config`, - httpmock.NewStringResponder(200, `{"storage":{"api-version":1, "enable-ttl":false}}`)) + httpmock.NewStringResponder(200, `{"storage":{"api-version":2, "enable-ttl":true}}`)) r.backupClient, err = backup.NewBackupClient(r.ctx, mockMgr, nil) c.Assert(err, IsNil) @@ -225,3 +227,24 @@ func (r *testBackup) TestCheckBackupIsLocked(c *C) { err = backup.CheckBackupStorageIsLocked(ctx, r.storage) c.Assert(err, ErrorMatches, "backup lock file and sst file exist in(.+)") } + +func (r *testBackup) TestUpdateBRGCSafePoint(c *C) { + r.SetUpSuite(c) + duration := time.Duration(1) * time.Minute + physical, logical, err := r.mockPDClient.GetTS(r.ctx) + c.Assert(err, IsNil) + tso := oracle.ComposeTS(physical, logical) + curTime := oracle.GetTimeFromTS(tso) + backupAgo := curTime.Add(-duration) + expectbackupTS := oracle.ComposeTS(oracle.GetPhysical(backupAgo), logical) + r.backupClient.SetGCTTL(time.Minute) + c.Assert(r.backupClient.GetCurAPIVersion(), Equals, kvrpcpb.APIVersion_V2) + + backupTs, err := r.backupClient.UpdateBRGCSafePoint(r.ctx, duration) + c.Assert(err, IsNil) + c.Assert(backupTs-expectbackupTS, Equals, uint64(1)) // UpdateBRGCSafePoint will get ts again, so there is a gap. + + curSafePoint, err := r.mockPDClient.UpdateServiceGCSafePoint(r.ctx, "test", 100, math.MaxUint64) + c.Assert(err, IsNil) + c.Assert(curSafePoint, Equals, backupTs-1) +} diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 1e7f87d1..75042e08 100755 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -251,8 +251,7 @@ func NewMgr( if checkRequirements { err = version.CheckClusterVersion(ctx, controller.GetPDClient(), version.CheckVersionForBR) if err != nil { - return nil, errors.Annotate(err, "running BR in incompatible version of cluster, "+ - "if you believe it's OK, use --check-requirements=false to skip.") + return nil, errors.Annotate(err, "running BR in incompatible version of cluster.") } } diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index b41ab2f9..025c7b80 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -5,7 +5,6 @@ package task import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/spf13/pflag" - "github.com/tikv/migration/br/pkg/utils" ) const ( @@ -17,8 +16,6 @@ const ( flagRemoveSchedulers = "remove-schedulers" flagIgnoreStats = "ignore-stats" flagUseBackupMetaV2 = "use-backupmeta-v2" - - flagGCTTL = "gcttl" ) // CompressionConfig is the configuration for sst file compression. @@ -38,7 +35,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) { " use for incremental backup, support TSO only") flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+ " e.g. '400036290571534337', '2018-05-11 01:42:23'") - flags.Int64(flagGCTTL, utils.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint") + flags.String(flagCompressionType, "zstd", "backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'") flags.Int32(flagCompressionLevel, 0, "compression level used for sst file compression") @@ -51,7 +48,6 @@ func DefineBackupFlags(flags *pflag.FlagSet) { _ = flags.MarkHidden(flagBackupTimeago) _ = flags.MarkHidden(flagLastBackupTS) _ = flags.MarkHidden(flagBackupTS) - _ = flags.MarkHidden(flagGCTTL) // Disable stats by default. because of // 1. DumpStatsToJson is not stable diff --git a/br/pkg/task/backup_raw.go b/br/pkg/task/backup_raw.go index a0ab7316..3c6fd547 100644 --- a/br/pkg/task/backup_raw.go +++ b/br/pkg/task/backup_raw.go @@ -27,6 +27,8 @@ const ( flagStartKey = "start" flagEndKey = "end" flagDstAPIVersion = "dst-api-version" + flagSafeInterval = "safe-interval" + flagGCTTL = "gcttl" ) // DefineRawBackupFlags defines common flags for the backup command. @@ -49,6 +51,12 @@ func DefineRawBackupFlags(command *cobra.Command) { command.Flags().Bool(flagRemoveSchedulers, false, "disable the balance, shuffle and region-merge schedulers in PD to speed up backup.") + command.Flags().Duration(flagSafeInterval, utils.DefaultBRSafeInterval, + "The interval between backup-ts and current tso.") + command.Flags().Duration(flagGCTTL, utils.DefaultBRGCSafePointTTL, "The TTL of BR's GC safepoint") + + // safe-interval is difficult for common users to set one suitable value. Hide it. + _ = command.Flags().MarkHidden(flagSafeInterval) // This flag can impact the online cluster, so hide it in case of abuse. _ = command.Flags().MarkHidden(flagCompressionType) _ = command.Flags().MarkHidden(flagRemoveSchedulers) @@ -114,6 +122,15 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf if err = client.SetStorage(ctx, u, &opts); err != nil { return errors.Trace(err) } + client.SetGCTTL(cfg.GCTTL) + if curAPIVersion == kvrpcpb.APIVersion_V2 { + // set safepoint to avoid the logical deletion data to gc. + backupTs, err := client.UpdateBRGCSafePoint(ctx, cfg.SafeInterval) + if err != nil { + return errors.Trace(err) + } + g.Record("BackupTS", backupTs) + } backupRange := rtree.Range{StartKey: cfg.StartKey, EndKey: cfg.EndKey} diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index e6bb1c6f..bc453c92 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -108,6 +108,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) { flags.Bool(flagCheckRequirement, true, "Whether start version check before execute command") + _ = flags.MarkHidden(flagCheckRequirement) flags.Duration(flagSwitchModeInterval, defaultSwitchInterval, "maintain import mode on TiKV during restore") _ = flags.MarkHidden(flagSwitchModeInterval) flags.Duration(flagGrpcKeepaliveTime, defaultGRPCKeepaliveTime, diff --git a/br/pkg/task/rawkv_config.go b/br/pkg/task/rawkv_config.go index 3ffcda6f..423b4e67 100644 --- a/br/pkg/task/rawkv_config.go +++ b/br/pkg/task/rawkv_config.go @@ -17,6 +17,7 @@ package task import ( "bytes" "strings" + "time" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" @@ -34,7 +35,9 @@ type RawKvConfig struct { EndKey []byte `json:"end-key" toml:"end-key"` DstAPIVersion string `json:"dst-api-version" toml:"dst-api-version"` CompressionConfig - RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"` + RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"` + SafeInterval time.Duration `json:"safe-interval" toml:"safe-interval"` + GCTTL time.Duration `json:"gc-ttl" toml:"gc-ttl"` } // ParseBackupConfigFromFlags parses the backup-related flags from the flag set. @@ -47,6 +50,16 @@ func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error { if err = cfg.parseDstAPIVersion(flags); err != nil { return errors.Trace(err) } + safeInterval, err := flags.GetDuration(flagSafeInterval) + if err != nil { + return errors.Trace(err) + } + cfg.SafeInterval = safeInterval + gcTTL, err := flags.GetDuration(flagGCTTL) + if err != nil { + return errors.Trace(err) + } + cfg.GCTTL = gcTTL compressionCfg, err := cfg.parseCompressionFlags(flags) if err != nil { diff --git a/br/pkg/utils/safe_point.go b/br/pkg/utils/safe_point.go index 37522889..3dfbd2a1 100644 --- a/br/pkg/utils/safe_point.go +++ b/br/pkg/utils/safe_point.go @@ -22,7 +22,8 @@ const ( preUpdateServiceSafePointFactor = 3 checkGCSafePointGapTime = 5 * time.Second // DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min. - DefaultBRGCSafePointTTL = 5 * 60 + DefaultBRGCSafePointTTL = time.Duration(5) * time.Minute + DefaultBRSafeInterval = time.Minute // safe interval is used to calc the backup-ts. ) // BRServiceSafePoint is metadata of service safe point from a BR 'instance'. @@ -74,7 +75,7 @@ func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error } // updateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds. -func updateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error { +func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error { log.Debug("update PD safePoint limit with TTL", zap.Object("safePoint", sp)) lastSafePoint, err := pdClient.UpdateServiceGCSafePoint(ctx, sp.ID, sp.TTL, sp.BackupTS-1) @@ -102,7 +103,7 @@ func StartServiceSafePointKeeper( } // Update service safe point immediately to cover the gap between starting // update goroutine and updating service safe point. - if err := updateServiceSafePoint(ctx, pdClient, sp); err != nil { + if err := UpdateServiceSafePoint(ctx, pdClient, sp); err != nil { return errors.Trace(err) } @@ -119,7 +120,7 @@ func StartServiceSafePointKeeper( log.Debug("service safe point keeper exited") return case <-updateTick.C: - if err := updateServiceSafePoint(ctx, pdClient, sp); err != nil { + if err := UpdateServiceSafePoint(ctx, pdClient, sp); err != nil { log.Warn("failed to update service safe point, backup may fail if gc triggered", zap.Error(err), )