Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring: introduce Distributor.sendWriteRequestToPartitions() and sendWriteRequestToIngesters() #7891

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 39 additions & 36 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1380,24 +1380,52 @@ func (d *Distributor) sendWriteRequestToBackends(ctx context.Context, tenantID s
Go: d.doBatchPushWorkers,
}

if d.cfg.IngestStorageConfig.Enabled {
return d.sendWriteRequestToPartitions(ctx, tenantID, tenantRing, req, keys, initialMetadataIndex, remoteRequestContext, batchOptions)
}
return d.sendWriteRequestToIngesters(ctx, tenantRing, req, keys, initialMetadataIndex, remoteRequestContext, batchOptions)
}

func (d *Distributor) sendWriteRequestToIngesters(ctx context.Context, tenantRing ring.DoBatchRing, req *mimirpb.WriteRequest, keys []uint32, initialMetadataIndex int, remoteRequestContext func() context.Context, batchOptions ring.DoBatchOptions) error {
return ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, tenantRing, keys,
func(instance ring.InstanceDesc, indexes []int) error {
func(ingester ring.InstanceDesc, indexes []int) error {
req := req.ForIndexes(indexes, initialMetadataIndex)

// Do not cancel the remoteRequestContext in this callback:
// there are more callbacks using it at the same time.
localCtx := remoteRequestContext()
var err error
if d.cfg.IngestStorageConfig.Enabled {
err = d.sendToStorage(localCtx, tenantID, instance, req)
} else {
err = d.sendToIngester(localCtx, instance, req)
h, err := d.ingesterPool.GetClientForInstance(ingester)
if err != nil {
return err
}
c := h.(ingester_client.IngesterClient)

ctx := remoteRequestContext()
ctx = grpcutil.AppendMessageSizeToOutgoingContext(ctx, req) // Let ingester know the size of the message, without needing to read the message first.

_, err = c.Push(ctx, req)
err = wrapIngesterPushError(err, ingester.Id)
err = wrapDeadlineExceededPushError(err)

return err
},
batchOptions,
}, batchOptions)
}

func (d *Distributor) sendWriteRequestToPartitions(ctx context.Context, tenantID string, tenantRing ring.DoBatchRing, req *mimirpb.WriteRequest, keys []uint32, initialMetadataIndex int, remoteRequestContext func() context.Context, batchOptions ring.DoBatchOptions) error {
return ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, tenantRing, keys,
func(partition ring.InstanceDesc, indexes []int) error {
req := req.ForIndexes(indexes, initialMetadataIndex)

// The partition ID is stored in the ring.InstanceDesc Id.
partitionID, err := strconv.ParseUint(partition.Id, 10, 31)
if err != nil {
return err
}

ctx := remoteRequestContext()
err = d.ingestStorageWriter.WriteSync(ctx, int32(partitionID), tenantID, req)
err = wrapPartitionPushError(err, int32(partitionID))
err = wrapDeadlineExceededPushError(err)

return err
}, batchOptions,
)
}

Expand Down Expand Up @@ -1460,31 +1488,6 @@ func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID st
d.receivedMetadata.WithLabelValues(userID).Add(float64(receivedMetadata))
}

// sendToIngester sends received data to a specific ingester. This function is used when ingest storage is disabled.
func (d *Distributor) sendToIngester(ctx context.Context, ingester ring.InstanceDesc, req *mimirpb.WriteRequest) error {
h, err := d.ingesterPool.GetClientForInstance(ingester)
if err != nil {
return err
}
c := h.(ingester_client.IngesterClient)

ctx = grpcutil.AppendMessageSizeToOutgoingContext(ctx, req) // Let ingester know the size of the message, without needing to read the message first.
_, err = c.Push(ctx, req)
return wrapIngesterPushError(err, ingester.Id)
}

// sendToStorage sends received data to the configured ingest storage. This function is used when ingest storage is enabled.
func (d *Distributor) sendToStorage(ctx context.Context, userID string, partition ring.InstanceDesc, req *mimirpb.WriteRequest) error {
// The partition ID is stored in the ring.InstanceDesc Id.
partitionID, err := strconv.ParseUint(partition.Id, 10, 31)
if err != nil {
return err
}

err = d.ingestStorageWriter.WriteSync(ctx, int32(partitionID), userID, req)
return wrapPartitionPushError(err, int32(partitionID))
}

// forReplicationSets runs f, in parallel, for all ingesters in the input replicationSets.
// Return an error if any f fails for any of the input replicationSets.
func forReplicationSets[R any](ctx context.Context, d *Distributor, replicationSets []ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (R, error)) ([]R, error) {
Expand Down
86 changes: 45 additions & 41 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5575,6 +5575,22 @@ type mockIngester struct {
// partitionReader is responsible to consume a partition from Kafka when the
// ingest storage is enabled. This field is nil if the ingest storage is disabled.
partitionReader *ingest.PartitionReader

// Hooks.
hooksMx sync.Mutex
beforePushHook func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error, bool)
}

func (i *mockIngester) registerBeforePushHook(fn func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error, bool)) {
Copy link
Collaborator Author

@pracucci pracucci Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: this will be also used by a new test that will be added by #7871.

i.hooksMx.Lock()
defer i.hooksMx.Unlock()
i.beforePushHook = fn
}

func (i *mockIngester) getBeforePushHook() func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error, bool) {
i.hooksMx.Lock()
defer i.hooksMx.Unlock()
return i.beforePushHook
}

func (i *mockIngester) instanceID() string {
Expand Down Expand Up @@ -5621,6 +5637,12 @@ func (i *mockIngester) Push(ctx context.Context, req *mimirpb.WriteRequest, _ ..

time.Sleep(i.pushDelay)

if hook := i.getBeforePushHook(); hook != nil {
if res, err, handled := hook(ctx, req); handled {
return res, err
}
}

i.Lock()
defer i.Unlock()

Expand Down Expand Up @@ -7416,46 +7438,43 @@ func TestStartFinishRequest(t *testing.T) {
}
}

func TestSendMessageMetadata(t *testing.T) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: this was was directly calling Distributor.sendToIngester() which doesn't exist anymore. I've refactored the test to run like other tests, and then assert on the gRPC context when the mockIngester.Push() request is received.

var distributorCfg Config
var clientConfig client.Config
var ringConfig ring.Config
flagext.DefaultValues(&distributorCfg, &clientConfig, &ringConfig)

kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
func TestDistributor_Push_SendMessageMetadata(t *testing.T) {
const userID = "test"

ringConfig.KVStore.Mock = kvStore
ingestersRing, err := ring.New(ringConfig, ingester.IngesterRingKey, ingester.IngesterRingKey, log.NewNopLogger(), nil)
require.NoError(t, err)

mock := &mockInstanceClient{}
distributorCfg.IngesterClientFactory = ring_client.PoolInstFunc(func(ring.InstanceDesc) (ring_client.PoolClient, error) {
return mock, nil
distributors, ingesters, _, _ := prepare(t, prepConfig{
numIngesters: 1,
happyIngesters: 1,
numDistributors: 1,
replicationFactor: 1,
})

d, err := New(distributorCfg, clientConfig, validation.MockDefaultOverrides(), nil, ingestersRing, nil, false, nil, log.NewNopLogger())
require.NoError(t, err)
require.NotNil(t, d)
require.Len(t, distributors, 1)
require.Len(t, ingesters, 1)

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "test")
ctx = user.InjectOrgID(ctx, userID)

req := &mimirpb.WriteRequest{
Timeseries: []mimirpb.PreallocTimeseries{
{TimeSeries: &mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test1"}},
Exemplars: []mimirpb.Exemplar{},
}},
makeTimeseries([]string{model.MetricNameLabel, "test1"}, makeSamples(time.Now().UnixMilli(), 1), nil),
},
Source: mimirpb.API,
}

err = d.sendToIngester(ctx, ring.InstanceDesc{Addr: "1.2.3.4:5555", Id: "test"}, req)
// Register a hook in the ingester's Push() to check whether the context contains the expected gRPC metadata.
ingesters[0].registerBeforePushHook(func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error, bool) {
md, ok := metadata.FromOutgoingContext(ctx)
require.True(t, ok)
require.Equal(t, []string{strconv.Itoa(req.Size())}, md[grpcutil.MetadataMessageSize])

return nil, nil, false
})

_, err := distributors[0].Push(ctx, req)
require.NoError(t, err)

// Verify that d.sendToIngester added message size to metadata.
require.Equal(t, []string{strconv.Itoa(req.Size())}, mock.md[grpcutil.MetadataMessageSize])
// Ensure the ingester's Push() has been called.
require.Equal(t, 1, ingesters[0].countCalls("Push"))
}

func TestQueryIngestersRingZoneSorter(t *testing.T) {
Expand Down Expand Up @@ -7657,21 +7676,6 @@ func uniqueZones(instances []ring.InstanceDesc) []string {
return zones
}

type mockInstanceClient struct {
client.HealthAndIngesterClient

md metadata.MD
}

func (m *mockInstanceClient) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest, _ ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

func (m *mockInstanceClient) Push(ctx context.Context, _ *mimirpb.WriteRequest, _ ...grpc.CallOption) (*mimirpb.WriteResponse, error) {
m.md, _ = metadata.FromOutgoingContext(ctx)
return nil, nil
}

func cloneTimeseries(orig *mimirpb.TimeSeries) (*mimirpb.TimeSeries, error) {
data, err := orig.Marshal()
if err != nil {
Expand Down
Loading