Skip to content

Commit

Permalink
Refactoring: introduce Distributor.sendWriteRequestToPartitions() and…
Browse files Browse the repository at this point in the history
… sendWriteRequestToIngesters()

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci committed Apr 12, 2024
1 parent 2d363d0 commit 3ed4082
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 62 deletions.
75 changes: 39 additions & 36 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1380,24 +1380,52 @@ func (d *Distributor) sendWriteRequestToBackends(ctx context.Context, tenantID s
Go: d.doBatchPushWorkers,
}

if d.cfg.IngestStorageConfig.Enabled {
return d.sendWriteRequestToPartitions(ctx, tenantID, tenantRing, req, keys, initialMetadataIndex, remoteRequestContext, batchOptions)
}
return d.sendWriteRequestToIngesters(ctx, tenantRing, req, keys, initialMetadataIndex, remoteRequestContext, batchOptions)
}

func (d *Distributor) sendWriteRequestToIngesters(ctx context.Context, tenantRing ring.DoBatchRing, req *mimirpb.WriteRequest, keys []uint32, initialMetadataIndex int, remoteRequestContext func() context.Context, batchOptions ring.DoBatchOptions) error {
return ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, tenantRing, keys,
func(instance ring.InstanceDesc, indexes []int) error {
func(ingester ring.InstanceDesc, indexes []int) error {
req := req.ForIndexes(indexes, initialMetadataIndex)

// Do not cancel the remoteRequestContext in this callback:
// there are more callbacks using it at the same time.
localCtx := remoteRequestContext()
var err error
if d.cfg.IngestStorageConfig.Enabled {
err = d.sendToStorage(localCtx, tenantID, instance, req)
} else {
err = d.sendToIngester(localCtx, instance, req)
h, err := d.ingesterPool.GetClientForInstance(ingester)
if err != nil {
return err
}
c := h.(ingester_client.IngesterClient)

ctx := remoteRequestContext()
ctx = grpcutil.AppendMessageSizeToOutgoingContext(ctx, req) // Let ingester know the size of the message, without needing to read the message first.

_, err = c.Push(ctx, req)
err = wrapIngesterPushError(err, ingester.Id)
err = wrapDeadlineExceededPushError(err)

return err
},
batchOptions,
}, batchOptions)
}

func (d *Distributor) sendWriteRequestToPartitions(ctx context.Context, tenantID string, tenantRing ring.DoBatchRing, req *mimirpb.WriteRequest, keys []uint32, initialMetadataIndex int, remoteRequestContext func() context.Context, batchOptions ring.DoBatchOptions) error {
return ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, tenantRing, keys,
func(partition ring.InstanceDesc, indexes []int) error {
req := req.ForIndexes(indexes, initialMetadataIndex)

// The partition ID is stored in the ring.InstanceDesc Id.
partitionID, err := strconv.ParseUint(partition.Id, 10, 31)
if err != nil {
return err
}

ctx := remoteRequestContext()
err = d.ingestStorageWriter.WriteSync(ctx, int32(partitionID), tenantID, req)
err = wrapPartitionPushError(err, int32(partitionID))
err = wrapDeadlineExceededPushError(err)

return err
}, batchOptions,
)
}

Expand Down Expand Up @@ -1460,31 +1488,6 @@ func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID st
d.receivedMetadata.WithLabelValues(userID).Add(float64(receivedMetadata))
}

// sendToIngester sends received data to a specific ingester. This function is used when ingest storage is disabled.
func (d *Distributor) sendToIngester(ctx context.Context, ingester ring.InstanceDesc, req *mimirpb.WriteRequest) error {
h, err := d.ingesterPool.GetClientForInstance(ingester)
if err != nil {
return err
}
c := h.(ingester_client.IngesterClient)

ctx = grpcutil.AppendMessageSizeToOutgoingContext(ctx, req) // Let ingester know the size of the message, without needing to read the message first.
_, err = c.Push(ctx, req)
return wrapIngesterPushError(err, ingester.Id)
}

// sendToStorage sends received data to the configured ingest storage. This function is used when ingest storage is enabled.
func (d *Distributor) sendToStorage(ctx context.Context, userID string, partition ring.InstanceDesc, req *mimirpb.WriteRequest) error {
// The partition ID is stored in the ring.InstanceDesc Id.
partitionID, err := strconv.ParseUint(partition.Id, 10, 31)
if err != nil {
return err
}

err = d.ingestStorageWriter.WriteSync(ctx, int32(partitionID), userID, req)
return wrapPartitionPushError(err, int32(partitionID))
}

// forReplicationSets runs f, in parallel, for all ingesters in the input replicationSets.
// Return an error if any f fails for any of the input replicationSets.
func forReplicationSets[R any](ctx context.Context, d *Distributor, replicationSets []ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (R, error)) ([]R, error) {
Expand Down
71 changes: 45 additions & 26 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5575,6 +5575,22 @@ type mockIngester struct {
// partitionReader is responsible to consume a partition from Kafka when the
// ingest storage is enabled. This field is nil if the ingest storage is disabled.
partitionReader *ingest.PartitionReader

// Hooks.
hooksMx sync.Mutex
beforePushHook func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error, bool)
}

func (i *mockIngester) registerBeforePushHook(fn func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error, bool)) {
i.hooksMx.Lock()
defer i.hooksMx.Unlock()
i.beforePushHook = fn
}

func (i *mockIngester) getBeforePushHook() func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error, bool) {
i.hooksMx.Lock()
defer i.hooksMx.Unlock()
return i.beforePushHook
}

func (i *mockIngester) instanceID() string {
Expand Down Expand Up @@ -5621,6 +5637,12 @@ func (i *mockIngester) Push(ctx context.Context, req *mimirpb.WriteRequest, _ ..

time.Sleep(i.pushDelay)

if hook := i.getBeforePushHook(); hook != nil {
if res, err, handled := hook(ctx, req); handled {
return res, err
}
}

i.Lock()
defer i.Unlock()

Expand Down Expand Up @@ -7416,46 +7438,43 @@ func TestStartFinishRequest(t *testing.T) {
}
}

func TestSendMessageMetadata(t *testing.T) {
var distributorCfg Config
var clientConfig client.Config
var ringConfig ring.Config
flagext.DefaultValues(&distributorCfg, &clientConfig, &ringConfig)

kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

ringConfig.KVStore.Mock = kvStore
ingestersRing, err := ring.New(ringConfig, ingester.IngesterRingKey, ingester.IngesterRingKey, log.NewNopLogger(), nil)
require.NoError(t, err)
func TestDistributor_Push_SendMessageMetadata(t *testing.T) {
const userID = "test"

mock := &mockInstanceClient{}
distributorCfg.IngesterClientFactory = ring_client.PoolInstFunc(func(ring.InstanceDesc) (ring_client.PoolClient, error) {
return mock, nil
distributors, ingesters, _, _ := prepare(t, prepConfig{
numIngesters: 1,
happyIngesters: 1,
numDistributors: 1,
replicationFactor: 1,
})

d, err := New(distributorCfg, clientConfig, validation.MockDefaultOverrides(), nil, ingestersRing, nil, false, nil, log.NewNopLogger())
require.NoError(t, err)
require.NotNil(t, d)
require.Len(t, distributors, 1)
require.Len(t, ingesters, 1)

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "test")
ctx = user.InjectOrgID(ctx, userID)

req := &mimirpb.WriteRequest{
Timeseries: []mimirpb.PreallocTimeseries{
{TimeSeries: &mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test1"}},
Exemplars: []mimirpb.Exemplar{},
}},
makeTimeseries([]string{model.MetricNameLabel, "test1"}, makeSamples(time.Now().UnixMilli(), 1), nil),
},
Source: mimirpb.API,
}

err = d.sendToIngester(ctx, ring.InstanceDesc{Addr: "1.2.3.4:5555", Id: "test"}, req)
// Register a hook in the ingester's Push() to check whether the context contains the expected gRPC metadata.
ingesters[0].registerBeforePushHook(func(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error, bool) {
md, ok := metadata.FromOutgoingContext(ctx)
require.True(t, ok)
require.Equal(t, []string{strconv.Itoa(req.Size())}, md[grpcutil.MetadataMessageSize])

return nil, nil, false
})

_, err := distributors[0].Push(ctx, req)
require.NoError(t, err)

// Verify that d.sendToIngester added message size to metadata.
require.Equal(t, []string{strconv.Itoa(req.Size())}, mock.md[grpcutil.MetadataMessageSize])
// Ensure the ingester's Push() has been called.
require.Equal(t, 1, ingesters[0].countCalls("Push"))
}

func TestQueryIngestersRingZoneSorter(t *testing.T) {
Expand Down

0 comments on commit 3ed4082

Please sign in to comment.