Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[close #138] br return backup ts #156

Merged
merged 6 commits into from
Jul 16, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,26 @@ func (bc *Client) GetCurAPIVersion() kvrpcpb.APIVersion {
return bc.curAPIVer
}

func (bc *Client) UpdateBRGCSafePoint(ctx context.Context, safeInterval time.Duration) (uint64, error) {
if bc.GetCurAPIVersion() != kvrpcpb.APIVersion_V2 {
return 0, nil
}
backupTS, err := bc.GetTS(ctx, safeInterval, 0)
if err != nil {
return 0, errors.Trace(err)
}
sp := utils.BRServiceSafePoint{
BackupTS: backupTS,
TTL: bc.GetGCTTL(),
ID: utils.MakeSafePointID(),
}
err = utils.UpdateServiceSafePoint(ctx, bc.mgr.GetPDClient(), sp)
if err != nil {
return 0, errors.Trace(err)
}
return backupTS, nil
}

// SetLockFile set write lock file.
func (bc *Client) SetLockFile(ctx context.Context) error {
return bc.storage.WriteFile(ctx, metautil.LockFile,
Expand Down
25 changes: 24 additions & 1 deletion br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ package backup_test

import (
"context"
"math"
"testing"
"time"

"github.com/jarcoal/httpmock"
. "github.com/pingcap/check"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -52,7 +54,7 @@ func (r *testBackup) SetUpSuite(c *C) {
defer httpmock.DeactivateAndReset()
// Exact URL match
httpmock.RegisterResponder("GET", `=~^/config`,
httpmock.NewStringResponder(200, `{"storage":{"api-version":1, "enable-ttl":false}}`))
httpmock.NewStringResponder(200, `{"storage":{"api-version":2, "enable-ttl":false}}`))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"enable-ttl: true" would be reasonable when api-version is 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


r.backupClient, err = backup.NewBackupClient(r.ctx, mockMgr, nil)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -225,3 +227,24 @@ func (r *testBackup) TestCheckBackupIsLocked(c *C) {
err = backup.CheckBackupStorageIsLocked(ctx, r.storage)
c.Assert(err, ErrorMatches, "backup lock file and sst file exist in(.+)")
}

func (r *testBackup) TestUpdateBRGCSafePoint(c *C) {
r.SetUpSuite(c)
duration := time.Duration(1) * time.Minute
physical, logical, err := r.mockPDClient.GetTS(r.ctx)
c.Assert(err, IsNil)
tso := oracle.ComposeTS(physical, logical)
curTime := oracle.GetTimeFromTS(tso)
backupAgo := curTime.Add(-duration)
expectbackupTS := oracle.ComposeTS(oracle.GetPhysical(backupAgo), logical)
r.backupClient.SetGCTTL(10000)
c.Assert(r.backupClient.GetCurAPIVersion(), Equals, kvrpcpb.APIVersion_V2)

backupTs, err := r.backupClient.UpdateBRGCSafePoint(r.ctx, duration)
c.Assert(err, IsNil)
c.Assert(backupTs-expectbackupTS, Equals, uint64(1)) // UpdateBRGCSafePoint will get ts again, so there is a gap.

curSafePoint, err := r.mockPDClient.UpdateServiceGCSafePoint(r.ctx, "test", 100, math.MaxUint64)
c.Assert(err, IsNil)
c.Assert(curSafePoint, Equals, backupTs-1)
}
3 changes: 1 addition & 2 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,7 @@ func NewMgr(
if checkRequirements {
err = version.CheckClusterVersion(ctx, controller.GetPDClient(), version.CheckVersionForBR)
if err != nil {
return nil, errors.Annotate(err, "running BR in incompatible version of cluster, "+
"if you believe it's OK, use --check-requirements=false to skip.")
return nil, errors.Annotate(err, "running BR in incompatible version of cluster.")
}
}

Expand Down
6 changes: 1 addition & 5 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package task
import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/spf13/pflag"
"github.com/tikv/migration/br/pkg/utils"
)

const (
Expand All @@ -17,8 +16,6 @@ const (
flagRemoveSchedulers = "remove-schedulers"
flagIgnoreStats = "ignore-stats"
flagUseBackupMetaV2 = "use-backupmeta-v2"

flagGCTTL = "gcttl"
)

// CompressionConfig is the configuration for sst file compression.
Expand All @@ -38,7 +35,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
" use for incremental backup, support TSO only")
flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+
" e.g. '400036290571534337', '2018-05-11 01:42:23'")
flags.Int64(flagGCTTL, utils.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint")

flags.String(flagCompressionType, "zstd",
"backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'")
flags.Int32(flagCompressionLevel, 0, "compression level used for sst file compression")
Expand All @@ -51,7 +48,6 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
_ = flags.MarkHidden(flagBackupTimeago)
_ = flags.MarkHidden(flagLastBackupTS)
_ = flags.MarkHidden(flagBackupTS)
_ = flags.MarkHidden(flagGCTTL)

// Disable stats by default. because of
// 1. DumpStatsToJson is not stable
Expand Down
15 changes: 15 additions & 0 deletions br/pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
flagStartKey = "start"
flagEndKey = "end"
flagDstAPIVersion = "dst-api-version"
flagSafeInterval = "safe-interval"
flagGCTTL = "gcttl"
)

// DefineRawBackupFlags defines common flags for the backup command.
Expand All @@ -49,6 +51,10 @@ func DefineRawBackupFlags(command *cobra.Command) {
command.Flags().Bool(flagRemoveSchedulers, false,
"disable the balance, shuffle and region-merge schedulers in PD to speed up backup.")

command.Flags().Duration(flagSafeInterval, utils.DefaultBRSafeInterval,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to hide safe interval. It's difficult for common users to set this argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"The interval between backup-ts and current tso.")
command.Flags().Int64(flagGCTTL, utils.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to be a duration argument.
TTL would be expected to be e.g, 30 minutes, or 3 days. It is too big to be in secondes.

Suggested change
command.Flags().Int64(flagGCTTL, utils.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint")
command.Flags().Int64(flagGCTTL, utils.DefaultBRGCSafePointTTL, "the TTL of BR's GC safepoint")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


// This flag can impact the online cluster, so hide it in case of abuse.
_ = command.Flags().MarkHidden(flagCompressionType)
_ = command.Flags().MarkHidden(flagRemoveSchedulers)
Expand Down Expand Up @@ -114,6 +120,15 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
if err = client.SetStorage(ctx, u, &opts); err != nil {
return errors.Trace(err)
}
client.SetGCTTL(cfg.GCTTL)
if curAPIVersion == kvrpcpb.APIVersion_V2 {
// set safepoint to avoid the logical deletion data to gc.
backupTs, err := client.UpdateBRGCSafePoint(ctx, cfg.SafeInterval)
if err != nil {
return errors.Trace(err)
}
g.Record("BackupTS", backupTs)
}

backupRange := rtree.Range{StartKey: cfg.StartKey, EndKey: cfg.EndKey}

Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) {

flags.Bool(flagCheckRequirement, true,
"Whether start version check before execute command")
_ = flags.MarkHidden(flagCheckRequirement)
flags.Duration(flagSwitchModeInterval, defaultSwitchInterval, "maintain import mode on TiKV during restore")
_ = flags.MarkHidden(flagSwitchModeInterval)
flags.Duration(flagGrpcKeepaliveTime, defaultGRPCKeepaliveTime,
Expand Down
15 changes: 14 additions & 1 deletion br/pkg/task/rawkv_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package task
import (
"bytes"
"strings"
"time"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
Expand All @@ -34,7 +35,9 @@ type RawKvConfig struct {
EndKey []byte `json:"end-key" toml:"end-key"`
DstAPIVersion string `json:"dst-api-version" toml:"dst-api-version"`
CompressionConfig
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
SafeInterval time.Duration `json:"safe-interval" toml:"safe-interval"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
}

// ParseBackupConfigFromFlags parses the backup-related flags from the flag set.
Expand All @@ -47,6 +50,16 @@ func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error {
if err = cfg.parseDstAPIVersion(flags); err != nil {
return errors.Trace(err)
}
safeInterval, err := flags.GetDuration(flagSafeInterval)
if err != nil {
return errors.Trace(err)
}
cfg.SafeInterval = safeInterval
gcTTL, err := flags.GetInt64(flagGCTTL)
if err != nil {
return errors.Trace(err)
}
cfg.GCTTL = gcTTL

compressionCfg, err := cfg.parseCompressionFlags(flags)
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions br/pkg/utils/safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ const (
brServiceSafePointIDFormat = "br-%s"
preUpdateServiceSafePointFactor = 3
checkGCSafePointGapTime = 5 * time.Second
// DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min.
DefaultBRGCSafePointTTL = 5 * 60
// DefaultBRGCSafePointTTL means PD keep safePoint limit at least 72h.
DefaultBRGCSafePointTTL = 72 * 60 * 60 // 72h
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaults to 72 hours may be too long, and waste disk space if there is no following CDC.
Maybe we can keep 5 minutes, but tell CDC users to set a suitable value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

DefaultBRSafeInterval = time.Minute // safe interval is used to calc the backup-ts.
)

// BRServiceSafePoint is metadata of service safe point from a BR 'instance'.
Expand Down Expand Up @@ -74,7 +75,7 @@ func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error
}

// updateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds.
func updateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error {
func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error {
log.Debug("update PD safePoint limit with TTL", zap.Object("safePoint", sp))

lastSafePoint, err := pdClient.UpdateServiceGCSafePoint(ctx, sp.ID, sp.TTL, sp.BackupTS-1)
Expand Down Expand Up @@ -102,7 +103,7 @@ func StartServiceSafePointKeeper(
}
// Update service safe point immediately to cover the gap between starting
// update goroutine and updating service safe point.
if err := updateServiceSafePoint(ctx, pdClient, sp); err != nil {
if err := UpdateServiceSafePoint(ctx, pdClient, sp); err != nil {
return errors.Trace(err)
}

Expand All @@ -119,7 +120,7 @@ func StartServiceSafePointKeeper(
log.Debug("service safe point keeper exited")
return
case <-updateTick.C:
if err := updateServiceSafePoint(ctx, pdClient, sp); err != nil {
if err := UpdateServiceSafePoint(ctx, pdClient, sp); err != nil {
log.Warn("failed to update service safe point, backup may fail if gc triggered",
zap.Error(err),
)
Expand Down