diff --git a/CHANGELOG.md b/CHANGELOG.md index 617f9c97ec..9e7c45320d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers. - [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons. +- [#7997](https://github.com/thanos-io/thanos/pull/7997) Tools: Add `--concurrency` flag to the `replicate` command to allow controlling the concurrency for copying blocks. - [#8000](https://github.com/thanos-io/thanos/pull/8000) Query: Bump promql-engine, pass partial response through options ### Changed diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index e0391af15b..25266135a7 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -129,6 +129,7 @@ type bucketReplicateConfig struct { compactMin int compactMax int compactions []int + concurrency int matcherStrs string singleRun bool } @@ -225,6 +226,8 @@ func (tbc *bucketReplicateConfig) registerBucketReplicateFlag(cmd extkingpin.Fla cmd.Flag("compaction", "Only blocks with these compaction levels will be replicated. Repeated flag. Overrides compaction-min and compaction-max if set.").Default().IntsVar(&tbc.compactions) + cmd.Flag("concurrency", "The concurrency with which to replicate blocks from the source to the destination bucket.").Default("1").IntVar(&tbc.concurrency) + cmd.Flag("matcher", "blocks whose external labels match this matcher will be replicated. All Prometheus matchers are supported, including =, !=, =~ and !~.").StringVar(&tbc.matcherStrs) cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").BoolVar(&tbc.singleRun) @@ -779,6 +782,7 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P matchers, resolutionLevels, tbc.compactions, + tbc.concurrency, objStoreConfig, toObjStoreConfig, tbc.singleRun, diff --git a/docs/components/tools.md b/docs/components/tools.md index 7e34ab4c27..07ddfea2f4 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -588,6 +588,8 @@ Flags: level will be replicated. --compaction-min=1 Only blocks with at least this compaction level will be replicated. + --concurrency=1 The concurrency with which to replicate blocks + from the source to the destination bucket. --enable-auto-gomemlimit Enable go runtime to automatically limit memory consumption. -h, --help Show context-sensitive help (also try diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index dc9e804f56..6ce259ecc7 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -70,6 +70,7 @@ func RunReplicate( labelSelector labels.Selector, resolutions []compact.ResolutionLevel, compactions []int, + concurrency int, fromObjStoreConfig *extflag.PathOrContent, toObjStoreConfig *extflag.PathOrContent, singleRun bool, @@ -188,7 +189,7 @@ func RunReplicate( logger := log.With(logger, "replication-run-id", runID.String()) level.Info(logger).Log("msg", "running replication attempt") - if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx); err != nil { + if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, concurrency, reg).execute(ctx); err != nil { return errors.Wrap(err, "replication execute") } diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 31249fbf77..cdd6103752 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -11,6 +11,8 @@ import ( "path" "sort" + "golang.org/x/sync/errgroup" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/ulid" @@ -109,8 +111,9 @@ type blockFilterFunc func(b *metadata.Meta) bool // TODO: Add filters field. type replicationScheme struct { - fromBkt objstore.InstrumentedBucketReader - toBkt objstore.Bucket + fromBkt objstore.InstrumentedBucketReader + toBkt objstore.Bucket + concurrency int blockFilter blockFilterFunc fetcher thanosblock.MetadataFetcher @@ -152,6 +155,7 @@ func newReplicationScheme( fetcher thanosblock.MetadataFetcher, from objstore.InstrumentedBucketReader, to objstore.Bucket, + concurrency int, reg prometheus.Registerer, ) *replicationScheme { if logger == nil { @@ -164,6 +168,7 @@ func newReplicationScheme( fetcher: fetcher, fromBkt: from, toBkt: to, + concurrency: concurrency, metrics: metrics, reg: reg, } @@ -194,13 +199,21 @@ func (rs *replicationScheme) execute(ctx context.Context) error { return availableBlocks[i].BlockMeta.MinTime < availableBlocks[j].BlockMeta.MinTime }) - for _, b := range availableBlocks { - if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil { - return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String()) - } + var g errgroup.Group + if rs.concurrency > 0 { + g.SetLimit(rs.concurrency) + } + for i := range availableBlocks { + b := availableBlocks[i] + g.Go(func() error { + if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil { + return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String()) + } + return nil + }) } - return nil + return g.Wait() } // ensureBlockIsReplicated ensures that a block present in the origin bucket is diff --git a/pkg/replicate/scheme_test.go b/pkg/replicate/scheme_test.go index 6f7f3c4e12..d0767554a3 100644 --- a/pkg/replicate/scheme_test.go +++ b/pkg/replicate/scheme_test.go @@ -393,7 +393,7 @@ func TestReplicationSchemeAll(t *testing.T) { ) testutil.Ok(t, err) - r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil) + r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, 1, nil) err = r.execute(ctx) testutil.Ok(t, err)