diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 5e24e26538..6616f9589e 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -8,6 +8,8 @@ import ( "context" "encoding/json" "fmt" + "github.com/stretchr/testify/assert" + "io" "math/rand" "os" "path" @@ -1211,3 +1213,86 @@ func Test_ParseRelabelConfig(t *testing.T) { testutil.NotOk(t, err) testutil.Equals(t, "unsupported relabel action: labelmap", err.Error()) } + +func feedUlids(ulidChannel chan ulid.ULID) { + //for i := 1; i <= 100; i++ { + // ulidChannel <- ULID(i) + //} +} + +func Test_ConcurrentLister_channel_deadlock(t *testing.T) { + lister := ConcurrentLister{bkt: InstrumentedBucketReaderMock{}, + logger: nil, + } + + outputChannel := make(chan ulid.ULID) + defer close(outputChannel) + + timeout, _ := context.WithTimeout(context.Background(), time.Second*5) + + _, err := lister.GetActiveAndPartialBlockIDs(timeout, outputChannel) + + assert.NoError(t, err) +} + +type InstrumentedBucketReaderMock struct{} + +func (InstrumentedBucketReaderMock) Iter(ctx context.Context, dir string, f func(name string) error, options ...objstore.IterOption) error { + // Concurrency is 64 and the queue has capacity of 64 + // Sending 64+64+1 ulids is enough + // 64 to terminate all 64 workers + // 64 more to fill the 64 capacity channel + // 1 extra for the channel writer to block on + for i := 1; i <= 129; i++ { + err := f(ULID(i).String()) + if err != nil { + return err + } + } + return nil +} + +func (InstrumentedBucketReaderMock) Exists(ctx context.Context, name string) (bool, error) { + // simulating an objstore error + return false, errors.New("Simulated") +} + +func (InstrumentedBucketReaderMock) ReaderWithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Close() error { panic("not required") } + +func (InstrumentedBucketReaderMock) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + panic("not required") +} + +func (InstrumentedBucketReaderMock) SupportedIterOptions() []objstore.IterOptionType { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Get(ctx context.Context, name string) (io.ReadCloser, error) { + panic("not required") +} + +func (InstrumentedBucketReaderMock) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + panic("not required") +} + +func (InstrumentedBucketReaderMock) IsObjNotFoundErr(err error) bool { panic("not required") } + +func (InstrumentedBucketReaderMock) IsAccessDeniedErr(err error) bool { panic("not required") } + +func (InstrumentedBucketReaderMock) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Upload(ctx context.Context, name string, r io.Reader) error { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Delete(ctx context.Context, name string) error { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Name() string { panic("not required") }