diff --git a/CHANGELOG.md b/CHANGELOG.md index 114eef55d7..9260a8f43a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed +- [#7933](https://github.com/thanos-io/thanos/pull/7933) *: Fix channel deadlock in meta sync fetcher + ### Added - [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers. diff --git a/go.mod b/go.mod index 9a725cb167..baa29f27ff 100644 --- a/go.mod +++ b/go.mod @@ -115,6 +115,7 @@ require ( capnproto.org/go/capnp/v3 v3.0.0-alpha.30 github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 + github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/mitchellh/go-ps v1.0.0 github.com/onsi/gomega v1.34.0 @@ -142,6 +143,7 @@ require ( github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect github.com/google/s2a-go v0.1.8 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect github.com/jcchavezs/porto v0.1.0 // indirect github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353 // indirect diff --git a/go.sum b/go.sum index 9933abda40..fe6e01980d 100644 --- a/go.sum +++ b/go.sum @@ -1871,6 +1871,7 @@ github.com/hashicorp/consul/api v1.29.4 h1:P6slzxDLBOxUSj3fWo2o65VuKtbtOXFi7TSSg github.com/hashicorp/consul/api v1.29.4/go.mod h1:HUlfw+l2Zy68ceJavv2zAyArl2fqhGWnMycyt56sBgg= github.com/hashicorp/cronexpr v1.1.2 h1:wG/ZYIKT+RT3QkOdgYc+xsKWVRgnxJ1OJtjjy84fJ9A= github.com/hashicorp/cronexpr v1.1.2/go.mod h1:P4wA0KBl9C5q2hABiMO7cp6jcIg96CDh1Efb3g1PWA4= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 02e9dec513..1420301fc7 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -15,6 +15,8 @@ import ( "sync" "time" + "github.com/hashicorp/go-multierror" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/golang/groupcache/singleflight" @@ -236,9 +238,10 @@ func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch c partialBlocks = make(map[ulid.ULID]bool) var ( - metaChan = make(chan ulid.ULID, concurrency) - eg, gCtx = errgroup.WithContext(ctx) - mu sync.Mutex + metaChan = make(chan ulid.ULID, concurrency) + eg, gCtx = errgroup.WithContext(ctx) + mu, memu sync.Mutex + multiError error ) for i := 0; i < concurrency; i++ { eg.Go(func() error { @@ -247,9 +250,14 @@ func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch c // For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix. // TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverride work). metaFile := path.Join(uid.String(), MetaFilename) + ok, err := f.bkt.Exists(gCtx, metaFile) + if err != nil { - return errors.Wrapf(err, "meta.json file exists: %v", uid) + memu.Lock() + multiError = multierror.Append(multiError, errors.Wrapf(err, "meta.json file exists: %v", uid)) + memu.Unlock() + continue } if !ok { mu.Lock() @@ -283,9 +291,22 @@ func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch c } close(metaChan) - if err := eg.Wait(); err != nil { - return nil, err + multiError = multierror.Append(eg.Wait(), multiError) + + if multiError != nil { + if multiErrorUnwrap, ok := err.(interface{ Unwrap() []error }); ok { + errs := multiErrorUnwrap.Unwrap() + // return the multierror if there are multiple errors wrapped + if len(errs) > 1 { + return nil, multiError + } + // return singular unwrapped error + if len(errs) > 0 { + return nil, errs[0] + } + } } + return partialBlocks, nil } diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 5e24e26538..72e62b8da1 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "fmt" + "io" "math/rand" "os" "path" @@ -17,6 +18,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -1211,3 +1214,82 @@ func Test_ParseRelabelConfig(t *testing.T) { testutil.NotOk(t, err) testutil.Equals(t, "unsupported relabel action: labelmap", err.Error()) } + +func Test_ConcurrentLister_channel_deadlock(t *testing.T) { + lister := ConcurrentLister{ + bkt: InstrumentedBucketReaderMock{}, + logger: nil, + } + + outputChannel := make(chan ulid.ULID) + defer close(outputChannel) + + timeout, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + _, err := lister.GetActiveAndPartialBlockIDs(timeout, outputChannel) + + assert.NoError(t, err) +} + +type InstrumentedBucketReaderMock struct{} + +func (InstrumentedBucketReaderMock) Iter(ctx context.Context, dir string, f func(name string) error, options ...objstore.IterOption) error { + // Concurrency is 64 and the queue has capacity of 64 + // Sending 64+64+1 ulids is enough + // 64 to terminate all 64 workers + // 64 more to fill the 64 capacity channel + // 1 extra for the channel writer to block on + for i := 1; i <= 129; i++ { + err := f(ULID(i).String()) + if err != nil { + return err + } + } + return nil +} + +func (InstrumentedBucketReaderMock) Exists(ctx context.Context, name string) (bool, error) { + // simulating an objstore error + return false, errors.New("Simulated") +} + +func (InstrumentedBucketReaderMock) ReaderWithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Close() error { panic("not required") } + +func (InstrumentedBucketReaderMock) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + panic("not required") +} + +func (InstrumentedBucketReaderMock) SupportedIterOptions() []objstore.IterOptionType { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Get(ctx context.Context, name string) (io.ReadCloser, error) { + panic("not required") +} + +func (InstrumentedBucketReaderMock) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + panic("not required") +} + +func (InstrumentedBucketReaderMock) IsObjNotFoundErr(err error) bool { panic("not required") } + +func (InstrumentedBucketReaderMock) IsAccessDeniedErr(err error) bool { panic("not required") } + +func (InstrumentedBucketReaderMock) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Upload(ctx context.Context, name string, r io.Reader) error { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Delete(ctx context.Context, name string) error { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Name() string { panic("not required") }