Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New test case catching the ConcurrentLister channel deadlock #7966

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"io"
"math/rand"
"os"
"path"
Expand Down Expand Up @@ -1211,3 +1213,81 @@ func Test_ParseRelabelConfig(t *testing.T) {
testutil.NotOk(t, err)
testutil.Equals(t, "unsupported relabel action: labelmap", err.Error())
}

func Test_ConcurrentLister_channel_deadlock(t *testing.T) {
lister := ConcurrentLister{
bkt: InstrumentedBucketReaderMock{},
logger: nil,
}

outputChannel := make(chan ulid.ULID)
defer close(outputChannel)

timeout, _ := context.WithTimeout(context.Background(), time.Second*5)

_, err := lister.GetActiveAndPartialBlockIDs(timeout, outputChannel)

assert.NoError(t, err)
}

type InstrumentedBucketReaderMock struct{}

func (InstrumentedBucketReaderMock) Iter(ctx context.Context, dir string, f func(name string) error, options ...objstore.IterOption) error {
// Concurrency is 64 and the queue has capacity of 64
// Sending 64+64+1 ulids is enough
// 64 to terminate all 64 workers
// 64 more to fill the 64 capacity channel
// 1 extra for the channel writer to block on
for i := 1; i <= 129; i++ {
err := f(ULID(i).String())
if err != nil {
return err
}
}
return nil
}

func (InstrumentedBucketReaderMock) Exists(ctx context.Context, name string) (bool, error) {
// simulating an objstore error
return false, errors.New("Simulated")
}

func (InstrumentedBucketReaderMock) ReaderWithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
panic("not required")
}

func (InstrumentedBucketReaderMock) Close() error { panic("not required") }

func (InstrumentedBucketReaderMock) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
panic("not required")
}

func (InstrumentedBucketReaderMock) SupportedIterOptions() []objstore.IterOptionType {
panic("not required")
}

func (InstrumentedBucketReaderMock) Get(ctx context.Context, name string) (io.ReadCloser, error) {
panic("not required")
}

func (InstrumentedBucketReaderMock) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
panic("not required")
}

func (InstrumentedBucketReaderMock) IsObjNotFoundErr(err error) bool { panic("not required") }

func (InstrumentedBucketReaderMock) IsAccessDeniedErr(err error) bool { panic("not required") }

func (InstrumentedBucketReaderMock) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
panic("not required")
}

func (InstrumentedBucketReaderMock) Upload(ctx context.Context, name string, r io.Reader) error {
panic("not required")
}

func (InstrumentedBucketReaderMock) Delete(ctx context.Context, name string) error {
panic("not required")
}

func (InstrumentedBucketReaderMock) Name() string { panic("not required") }
Loading