Skip to content

Commit

Permalink
[close #138] br return backup ts (#156)
Browse files Browse the repository at this point in the history
* br return backupts

Signed-off-by: haojinming <[email protected]>

* add unit test and disable check-reqirements

Signed-off-by: haojinming <[email protected]>

* adjust parse place

Signed-off-by: haojinming <[email protected]>

* fix review comments

Signed-off-by: haojinming <[email protected]>

* change gcttl to duration

Signed-off-by: haojinming <[email protected]>
  • Loading branch information
haojinming authored Jul 16, 2022
1 parent 3a3d747 commit 7d32868
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 16 deletions.
26 changes: 23 additions & 3 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Client struct {
storage storage.ExternalStorage
backend *backuppb.StorageBackend

gcTTL int64
gcTTL time.Duration
}

// NewBackupClient returns a new backup client.
Expand Down Expand Up @@ -135,6 +135,26 @@ func (bc *Client) GetCurAPIVersion() kvrpcpb.APIVersion {
return bc.curAPIVer
}

func (bc *Client) UpdateBRGCSafePoint(ctx context.Context, safeInterval time.Duration) (uint64, error) {
if bc.GetCurAPIVersion() != kvrpcpb.APIVersion_V2 {
return 0, nil
}
backupTS, err := bc.GetTS(ctx, safeInterval, 0)
if err != nil {
return 0, errors.Trace(err)
}
sp := utils.BRServiceSafePoint{
BackupTS: backupTS,
TTL: int64(bc.GetGCTTL().Seconds()),
ID: utils.MakeSafePointID(),
}
err = utils.UpdateServiceSafePoint(ctx, bc.mgr.GetPDClient(), sp)
if err != nil {
return 0, errors.Trace(err)
}
return backupTS, nil
}

// SetLockFile set write lock file.
func (bc *Client) SetLockFile(ctx context.Context) error {
return bc.storage.WriteFile(ctx, metautil.LockFile,
Expand All @@ -143,15 +163,15 @@ func (bc *Client) SetLockFile(ctx context.Context) error {
}

// SetGCTTL set gcTTL for client.
func (bc *Client) SetGCTTL(ttl int64) {
func (bc *Client) SetGCTTL(ttl time.Duration) {
if ttl <= 0 {
ttl = utils.DefaultBRGCSafePointTTL
}
bc.gcTTL = ttl
}

// GetGCTTL get gcTTL for this backup.
func (bc *Client) GetGCTTL() int64 {
func (bc *Client) GetGCTTL() time.Duration {
return bc.gcTTL
}

Expand Down
25 changes: 24 additions & 1 deletion br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ package backup_test

import (
"context"
"math"
"testing"
"time"

"github.com/jarcoal/httpmock"
. "github.com/pingcap/check"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -52,7 +54,7 @@ func (r *testBackup) SetUpSuite(c *C) {
defer httpmock.DeactivateAndReset()
// Exact URL match
httpmock.RegisterResponder("GET", `=~^/config`,
httpmock.NewStringResponder(200, `{"storage":{"api-version":1, "enable-ttl":false}}`))
httpmock.NewStringResponder(200, `{"storage":{"api-version":2, "enable-ttl":true}}`))

r.backupClient, err = backup.NewBackupClient(r.ctx, mockMgr, nil)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -225,3 +227,24 @@ func (r *testBackup) TestCheckBackupIsLocked(c *C) {
err = backup.CheckBackupStorageIsLocked(ctx, r.storage)
c.Assert(err, ErrorMatches, "backup lock file and sst file exist in(.+)")
}

func (r *testBackup) TestUpdateBRGCSafePoint(c *C) {
r.SetUpSuite(c)
duration := time.Duration(1) * time.Minute
physical, logical, err := r.mockPDClient.GetTS(r.ctx)
c.Assert(err, IsNil)
tso := oracle.ComposeTS(physical, logical)
curTime := oracle.GetTimeFromTS(tso)
backupAgo := curTime.Add(-duration)
expectbackupTS := oracle.ComposeTS(oracle.GetPhysical(backupAgo), logical)
r.backupClient.SetGCTTL(time.Minute)
c.Assert(r.backupClient.GetCurAPIVersion(), Equals, kvrpcpb.APIVersion_V2)

backupTs, err := r.backupClient.UpdateBRGCSafePoint(r.ctx, duration)
c.Assert(err, IsNil)
c.Assert(backupTs-expectbackupTS, Equals, uint64(1)) // UpdateBRGCSafePoint will get ts again, so there is a gap.

curSafePoint, err := r.mockPDClient.UpdateServiceGCSafePoint(r.ctx, "test", 100, math.MaxUint64)
c.Assert(err, IsNil)
c.Assert(curSafePoint, Equals, backupTs-1)
}
3 changes: 1 addition & 2 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,7 @@ func NewMgr(
if checkRequirements {
err = version.CheckClusterVersion(ctx, controller.GetPDClient(), version.CheckVersionForBR)
if err != nil {
return nil, errors.Annotate(err, "running BR in incompatible version of cluster, "+
"if you believe it's OK, use --check-requirements=false to skip.")
return nil, errors.Annotate(err, "running BR in incompatible version of cluster.")
}
}

Expand Down
6 changes: 1 addition & 5 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package task
import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/spf13/pflag"
"github.com/tikv/migration/br/pkg/utils"
)

const (
Expand All @@ -17,8 +16,6 @@ const (
flagRemoveSchedulers = "remove-schedulers"
flagIgnoreStats = "ignore-stats"
flagUseBackupMetaV2 = "use-backupmeta-v2"

flagGCTTL = "gcttl"
)

// CompressionConfig is the configuration for sst file compression.
Expand All @@ -38,7 +35,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
" use for incremental backup, support TSO only")
flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+
" e.g. '400036290571534337', '2018-05-11 01:42:23'")
flags.Int64(flagGCTTL, utils.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint")

flags.String(flagCompressionType, "zstd",
"backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'")
flags.Int32(flagCompressionLevel, 0, "compression level used for sst file compression")
Expand All @@ -51,7 +48,6 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
_ = flags.MarkHidden(flagBackupTimeago)
_ = flags.MarkHidden(flagLastBackupTS)
_ = flags.MarkHidden(flagBackupTS)
_ = flags.MarkHidden(flagGCTTL)

// Disable stats by default. because of
// 1. DumpStatsToJson is not stable
Expand Down
17 changes: 17 additions & 0 deletions br/pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
flagStartKey = "start"
flagEndKey = "end"
flagDstAPIVersion = "dst-api-version"
flagSafeInterval = "safe-interval"
flagGCTTL = "gcttl"
)

// DefineRawBackupFlags defines common flags for the backup command.
Expand All @@ -49,6 +51,12 @@ func DefineRawBackupFlags(command *cobra.Command) {
command.Flags().Bool(flagRemoveSchedulers, false,
"disable the balance, shuffle and region-merge schedulers in PD to speed up backup.")

command.Flags().Duration(flagSafeInterval, utils.DefaultBRSafeInterval,
"The interval between backup-ts and current tso.")
command.Flags().Duration(flagGCTTL, utils.DefaultBRGCSafePointTTL, "The TTL of BR's GC safepoint")

// safe-interval is difficult for common users to set one suitable value. Hide it.
_ = command.Flags().MarkHidden(flagSafeInterval)
// This flag can impact the online cluster, so hide it in case of abuse.
_ = command.Flags().MarkHidden(flagCompressionType)
_ = command.Flags().MarkHidden(flagRemoveSchedulers)
Expand Down Expand Up @@ -114,6 +122,15 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
if err = client.SetStorage(ctx, u, &opts); err != nil {
return errors.Trace(err)
}
client.SetGCTTL(cfg.GCTTL)
if curAPIVersion == kvrpcpb.APIVersion_V2 {
// set safepoint to avoid the logical deletion data to gc.
backupTs, err := client.UpdateBRGCSafePoint(ctx, cfg.SafeInterval)
if err != nil {
return errors.Trace(err)
}
g.Record("BackupTS", backupTs)
}

backupRange := rtree.Range{StartKey: cfg.StartKey, EndKey: cfg.EndKey}

Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) {

flags.Bool(flagCheckRequirement, true,
"Whether start version check before execute command")
_ = flags.MarkHidden(flagCheckRequirement)
flags.Duration(flagSwitchModeInterval, defaultSwitchInterval, "maintain import mode on TiKV during restore")
_ = flags.MarkHidden(flagSwitchModeInterval)
flags.Duration(flagGrpcKeepaliveTime, defaultGRPCKeepaliveTime,
Expand Down
15 changes: 14 additions & 1 deletion br/pkg/task/rawkv_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package task
import (
"bytes"
"strings"
"time"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
Expand All @@ -34,7 +35,9 @@ type RawKvConfig struct {
EndKey []byte `json:"end-key" toml:"end-key"`
DstAPIVersion string `json:"dst-api-version" toml:"dst-api-version"`
CompressionConfig
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
SafeInterval time.Duration `json:"safe-interval" toml:"safe-interval"`
GCTTL time.Duration `json:"gc-ttl" toml:"gc-ttl"`
}

// ParseBackupConfigFromFlags parses the backup-related flags from the flag set.
Expand All @@ -47,6 +50,16 @@ func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error {
if err = cfg.parseDstAPIVersion(flags); err != nil {
return errors.Trace(err)
}
safeInterval, err := flags.GetDuration(flagSafeInterval)
if err != nil {
return errors.Trace(err)
}
cfg.SafeInterval = safeInterval
gcTTL, err := flags.GetDuration(flagGCTTL)
if err != nil {
return errors.Trace(err)
}
cfg.GCTTL = gcTTL

compressionCfg, err := cfg.parseCompressionFlags(flags)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions br/pkg/utils/safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ const (
preUpdateServiceSafePointFactor = 3
checkGCSafePointGapTime = 5 * time.Second
// DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min.
DefaultBRGCSafePointTTL = 5 * 60
DefaultBRGCSafePointTTL = time.Duration(5) * time.Minute
DefaultBRSafeInterval = time.Minute // safe interval is used to calc the backup-ts.
)

// BRServiceSafePoint is metadata of service safe point from a BR 'instance'.
Expand Down Expand Up @@ -74,7 +75,7 @@ func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error
}

// updateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds.
func updateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error {
func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error {
log.Debug("update PD safePoint limit with TTL", zap.Object("safePoint", sp))

lastSafePoint, err := pdClient.UpdateServiceGCSafePoint(ctx, sp.ID, sp.TTL, sp.BackupTS-1)
Expand Down Expand Up @@ -102,7 +103,7 @@ func StartServiceSafePointKeeper(
}
// Update service safe point immediately to cover the gap between starting
// update goroutine and updating service safe point.
if err := updateServiceSafePoint(ctx, pdClient, sp); err != nil {
if err := UpdateServiceSafePoint(ctx, pdClient, sp); err != nil {
return errors.Trace(err)
}

Expand All @@ -119,7 +120,7 @@ func StartServiceSafePointKeeper(
log.Debug("service safe point keeper exited")
return
case <-updateTick.C:
if err := updateServiceSafePoint(ctx, pdClient, sp); err != nil {
if err := UpdateServiceSafePoint(ctx, pdClient, sp); err != nil {
log.Warn("failed to update service safe point, backup may fail if gc triggered",
zap.Error(err),
)
Expand Down

0 comments on commit 7d32868

Please sign in to comment.