From 3ed4082ae3fc9f6361a4e9b18702bf41fb97b414 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 12 Apr 2024 12:27:35 +0200 Subject: [PATCH] Refactoring: introduce Distributor.sendWriteRequestToPartitions() and sendWriteRequestToIngesters() Signed-off-by: Marco Pracucci --- pkg/distributor/distributor.go | 75 +++++++++++++++-------------- pkg/distributor/distributor_test.go | 71 +++++++++++++++++---------- 2 files changed, 84 insertions(+), 62 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 7a61e9a4494..95809daa7cd 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1380,24 +1380,52 @@ func (d *Distributor) sendWriteRequestToBackends(ctx context.Context, tenantID s Go: d.doBatchPushWorkers, } + if d.cfg.IngestStorageConfig.Enabled { + return d.sendWriteRequestToPartitions(ctx, tenantID, tenantRing, req, keys, initialMetadataIndex, remoteRequestContext, batchOptions) + } + return d.sendWriteRequestToIngesters(ctx, tenantRing, req, keys, initialMetadataIndex, remoteRequestContext, batchOptions) +} + +func (d *Distributor) sendWriteRequestToIngesters(ctx context.Context, tenantRing ring.DoBatchRing, req *mimirpb.WriteRequest, keys []uint32, initialMetadataIndex int, remoteRequestContext func() context.Context, batchOptions ring.DoBatchOptions) error { return ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, tenantRing, keys, - func(instance ring.InstanceDesc, indexes []int) error { + func(ingester ring.InstanceDesc, indexes []int) error { req := req.ForIndexes(indexes, initialMetadataIndex) - // Do not cancel the remoteRequestContext in this callback: - // there are more callbacks using it at the same time. - localCtx := remoteRequestContext() - var err error - if d.cfg.IngestStorageConfig.Enabled { - err = d.sendToStorage(localCtx, tenantID, instance, req) - } else { - err = d.sendToIngester(localCtx, instance, req) + h, err := d.ingesterPool.GetClientForInstance(ingester) + if err != nil { + return err } + c := h.(ingester_client.IngesterClient) + + ctx := remoteRequestContext() + ctx = grpcutil.AppendMessageSizeToOutgoingContext(ctx, req) // Let ingester know the size of the message, without needing to read the message first. + _, err = c.Push(ctx, req) + err = wrapIngesterPushError(err, ingester.Id) err = wrapDeadlineExceededPushError(err) + return err - }, - batchOptions, + }, batchOptions) +} + +func (d *Distributor) sendWriteRequestToPartitions(ctx context.Context, tenantID string, tenantRing ring.DoBatchRing, req *mimirpb.WriteRequest, keys []uint32, initialMetadataIndex int, remoteRequestContext func() context.Context, batchOptions ring.DoBatchOptions) error { + return ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, tenantRing, keys, + func(partition ring.InstanceDesc, indexes []int) error { + req := req.ForIndexes(indexes, initialMetadataIndex) + + // The partition ID is stored in the ring.InstanceDesc Id. + partitionID, err := strconv.ParseUint(partition.Id, 10, 31) + if err != nil { + return err + } + + ctx := remoteRequestContext() + err = d.ingestStorageWriter.WriteSync(ctx, int32(partitionID), tenantID, req) + err = wrapPartitionPushError(err, int32(partitionID)) + err = wrapDeadlineExceededPushError(err) + + return err + }, batchOptions, ) } @@ -1460,31 +1488,6 @@ func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID st d.receivedMetadata.WithLabelValues(userID).Add(float64(receivedMetadata)) } -// sendToIngester sends received data to a specific ingester. This function is used when ingest storage is disabled. -func (d *Distributor) sendToIngester(ctx context.Context, ingester ring.InstanceDesc, req *mimirpb.WriteRequest) error { - h, err := d.ingesterPool.GetClientForInstance(ingester) - if err != nil { - return err - } - c := h.(ingester_client.IngesterClient) - - ctx = grpcutil.AppendMessageSizeToOutgoingContext(ctx, req) // Let ingester know the size of the message, without needing to read the message first. - _, err = c.Push(ctx, req) - return wrapIngesterPushError(err, ingester.Id) -} - -// sendToStorage sends received data to the configured ingest storage. This function is used when ingest storage is enabled. -func (d *Distributor) sendToStorage(ctx context.Context, userID string, partition ring.InstanceDesc, req *mimirpb.WriteRequest) error { - // The partition ID is stored in the ring.InstanceDesc Id. - partitionID, err := strconv.ParseUint(partition.Id, 10, 31) - if err != nil { - return err - } - - err = d.ingestStorageWriter.WriteSync(ctx, int32(partitionID), userID, req) - return wrapPartitionPushError(err, int32(partitionID)) -} - // forReplicationSets runs f, in parallel, for all ingesters in the input replicationSets. // Return an error if any f fails for any of the input replicationSets. func forReplicationSets[R any](ctx context.Context, d *Distributor, replicationSets []ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (R, error)) ([]R, error) { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 59505e98479..8d6b9dce0b1 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -5575,6 +5575,22 @@ type mockIngester struct { // partitionReader is responsible to consume a partition from Kafka when the // ingest storage is enabled. This field is nil if the ingest storage is disabled. partitionReader *ingest.PartitionReader + + // Hooks. + hooksMx sync.Mutex + beforePushHook func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error, bool) +} + +func (i *mockIngester) registerBeforePushHook(fn func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error, bool)) { + i.hooksMx.Lock() + defer i.hooksMx.Unlock() + i.beforePushHook = fn +} + +func (i *mockIngester) getBeforePushHook() func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error, bool) { + i.hooksMx.Lock() + defer i.hooksMx.Unlock() + return i.beforePushHook } func (i *mockIngester) instanceID() string { @@ -5621,6 +5637,12 @@ func (i *mockIngester) Push(ctx context.Context, req *mimirpb.WriteRequest, _ .. time.Sleep(i.pushDelay) + if hook := i.getBeforePushHook(); hook != nil { + if res, err, handled := hook(ctx, req); handled { + return res, err + } + } + i.Lock() defer i.Unlock() @@ -7416,46 +7438,43 @@ func TestStartFinishRequest(t *testing.T) { } } -func TestSendMessageMetadata(t *testing.T) { - var distributorCfg Config - var clientConfig client.Config - var ringConfig ring.Config - flagext.DefaultValues(&distributorCfg, &clientConfig, &ringConfig) - - kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) - t.Cleanup(func() { assert.NoError(t, closer.Close()) }) - - ringConfig.KVStore.Mock = kvStore - ingestersRing, err := ring.New(ringConfig, ingester.IngesterRingKey, ingester.IngesterRingKey, log.NewNopLogger(), nil) - require.NoError(t, err) +func TestDistributor_Push_SendMessageMetadata(t *testing.T) { + const userID = "test" - mock := &mockInstanceClient{} - distributorCfg.IngesterClientFactory = ring_client.PoolInstFunc(func(ring.InstanceDesc) (ring_client.PoolClient, error) { - return mock, nil + distributors, ingesters, _, _ := prepare(t, prepConfig{ + numIngesters: 1, + happyIngesters: 1, + numDistributors: 1, + replicationFactor: 1, }) - d, err := New(distributorCfg, clientConfig, validation.MockDefaultOverrides(), nil, ingestersRing, nil, false, nil, log.NewNopLogger()) - require.NoError(t, err) - require.NotNil(t, d) + require.Len(t, distributors, 1) + require.Len(t, ingesters, 1) ctx := context.Background() - ctx = user.InjectOrgID(ctx, "test") + ctx = user.InjectOrgID(ctx, userID) req := &mimirpb.WriteRequest{ Timeseries: []mimirpb.PreallocTimeseries{ - {TimeSeries: &mimirpb.TimeSeries{ - Labels: []mimirpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test1"}}, - Exemplars: []mimirpb.Exemplar{}, - }}, + makeTimeseries([]string{model.MetricNameLabel, "test1"}, makeSamples(time.Now().UnixMilli(), 1), nil), }, Source: mimirpb.API, } - err = d.sendToIngester(ctx, ring.InstanceDesc{Addr: "1.2.3.4:5555", Id: "test"}, req) + // Register a hook in the ingester's Push() to check whether the context contains the expected gRPC metadata. + ingesters[0].registerBeforePushHook(func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error, bool) { + md, ok := metadata.FromOutgoingContext(ctx) + require.True(t, ok) + require.Equal(t, []string{strconv.Itoa(req.Size())}, md[grpcutil.MetadataMessageSize]) + + return nil, nil, false + }) + + _, err := distributors[0].Push(ctx, req) require.NoError(t, err) - // Verify that d.sendToIngester added message size to metadata. - require.Equal(t, []string{strconv.Itoa(req.Size())}, mock.md[grpcutil.MetadataMessageSize]) + // Ensure the ingester's Push() has been called. + require.Equal(t, 1, ingesters[0].countCalls("Push")) } func TestQueryIngestersRingZoneSorter(t *testing.T) {