Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Allow setting concurrency for bucket replicate
Browse files Browse the repository at this point in the history
This commit allows setting the block copy concurrency for the bucket replicate command.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
fpetkovski committed Dec 17, 2024
1 parent 683cf17 commit 834a09c
Showing 4 changed files with 27 additions and 8 deletions.
4 changes: 4 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
@@ -129,6 +129,7 @@ type bucketReplicateConfig struct {
compactMin int
compactMax int
compactions []int
concurrency int
matcherStrs string
singleRun bool
}
@@ -225,6 +226,8 @@ func (tbc *bucketReplicateConfig) registerBucketReplicateFlag(cmd extkingpin.Fla

cmd.Flag("compaction", "Only blocks with these compaction levels will be replicated. Repeated flag. Overrides compaction-min and compaction-max if set.").Default().IntsVar(&tbc.compactions)

cmd.Flag("concurrency", "The concurrency with which to replicate blocks from the source to the destination bucket.").Default("1").IntVar(&tbc.concurrency)

cmd.Flag("matcher", "blocks whose external labels match this matcher will be replicated. All Prometheus matchers are supported, including =, !=, =~ and !~.").StringVar(&tbc.matcherStrs)

cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").BoolVar(&tbc.singleRun)
@@ -779,6 +782,7 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
matchers,
resolutionLevels,
tbc.compactions,
tbc.concurrency,
objStoreConfig,
toObjStoreConfig,
tbc.singleRun,
2 changes: 2 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
@@ -588,6 +588,8 @@ Flags:
level will be replicated.
--compaction-min=1 Only blocks with at least this compaction level
will be replicated.
--concurrency=1 The concurrency with which to replicate blocks
from the source to the destination bucket.
--enable-auto-gomemlimit Enable go runtime to automatically limit memory
consumption.
-h, --help Show context-sensitive help (also try
3 changes: 2 additions & 1 deletion pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
@@ -70,6 +70,7 @@ func RunReplicate(
labelSelector labels.Selector,
resolutions []compact.ResolutionLevel,
compactions []int,
concurrency int,
fromObjStoreConfig *extflag.PathOrContent,
toObjStoreConfig *extflag.PathOrContent,
singleRun bool,
@@ -188,7 +189,7 @@ func RunReplicate(
logger := log.With(logger, "replication-run-id", runID.String())
level.Info(logger).Log("msg", "running replication attempt")

if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx); err != nil {
if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, concurrency, reg).execute(ctx); err != nil {
return errors.Wrap(err, "replication execute")
}

26 changes: 19 additions & 7 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"bytes"
"context"
"fmt"
"golang.org/x/sync/errgroup"
"io"
"path"
"sort"
@@ -109,8 +110,9 @@ type blockFilterFunc func(b *metadata.Meta) bool

// TODO: Add filters field.
type replicationScheme struct {
fromBkt objstore.InstrumentedBucketReader
toBkt objstore.Bucket
fromBkt objstore.InstrumentedBucketReader
toBkt objstore.Bucket
concurrency int

blockFilter blockFilterFunc
fetcher thanosblock.MetadataFetcher
@@ -152,6 +154,7 @@ func newReplicationScheme(
fetcher thanosblock.MetadataFetcher,
from objstore.InstrumentedBucketReader,
to objstore.Bucket,
concurrency int,
reg prometheus.Registerer,
) *replicationScheme {
if logger == nil {
@@ -164,6 +167,7 @@ func newReplicationScheme(
fetcher: fetcher,
fromBkt: from,
toBkt: to,
concurrency: concurrency,
metrics: metrics,
reg: reg,
}
@@ -194,13 +198,21 @@ func (rs *replicationScheme) execute(ctx context.Context) error {
return availableBlocks[i].BlockMeta.MinTime < availableBlocks[j].BlockMeta.MinTime
})

for _, b := range availableBlocks {
if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil {
return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String())
}
var g errgroup.Group
if rs.concurrency > 0 {
g.SetLimit(rs.concurrency)
}
for i := range availableBlocks {
b := availableBlocks[i]
g.Go(func() error {
if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil {
return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String())
}
return nil
})
}

return nil
return g.Wait()
}

// ensureBlockIsReplicated ensures that a block present in the origin bucket is

0 comments on commit 834a09c

Please sign in to comment.