Skip to content

Commit

Permalink
Refactor Distributor.push() (#6978)
Browse files Browse the repository at this point in the history
* Refactor Distributor.push()

Extracted methods, removed needless receivers, moved code to mimirpb.

This should not change any logic.

Signed-off-by: Oleg Zaytsev <[email protected]>

* ingester.Writer.WriteSync() accepting WriteRequest

We already have a WriteRequest allocated, there's no need to create
another one. Also the signature is shorter now.

Signed-off-by: Oleg Zaytsev <[email protected]>

* make lint, hopefully

Signed-off-by: Oleg Zaytsev <[email protected]>

* Apply @bboreham's naming feedback

Signed-off-by: Oleg Zaytsev <[email protected]>

---------

Signed-off-by: Oleg Zaytsev <[email protected]>
  • Loading branch information
colega authored Dec 21, 2023
1 parent be1f450 commit 297e905
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 91 deletions.
112 changes: 46 additions & 66 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"math"
"net/http"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -593,14 +594,6 @@ func (d *Distributor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
}

func (d *Distributor) tokenForLabels(userID string, labels []mimirpb.LabelAdapter) uint32 {
return mimirpb.ShardByAllLabelAdapters(userID, labels)
}

func (d *Distributor) tokenForMetadata(userID string, metricName string) uint32 {
return mimirpb.ShardByMetricName(userID, metricName)
}

// Returns a boolean that indicates whether or not we want to remove the replica label going forward,
// and an error that indicates whether we want to accept samples based on the cluster/replica found in ts.
// nil for the error means accept the sample.
Expand Down Expand Up @@ -730,7 +723,7 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {
haReplicaLabel := d.limits.HAReplicaLabel(userID)
cluster, replica := findHALabels(haReplicaLabel, d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
// Make a copy of these, since they may be retained as labels on our metrics, e.g. dedupedSamples.
cluster, replica = copyString(cluster), copyString(replica)
cluster, replica = strings.Clone(cluster), strings.Clone(replica)

span := opentracing.SpanFromContext(ctx)
if span != nil {
Expand Down Expand Up @@ -1295,16 +1288,6 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
span.SetTag("organization", userID)
}

seriesKeys := d.getTokensForSeries(userID, req.Timeseries)
metadataKeys := make([]uint32, 0, len(req.Metadata))

for _, m := range req.Metadata {
metadataKeys = append(metadataKeys, d.tokenForMetadata(userID, m.MetricFamilyName))
}

// Get a subring if tenant has shuffle shard size configured.
subRing := d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))

if d.cfg.WriteRequestsBufferPoolingEnabled {
slabPool := pool.NewFastReleasingSlabPool[byte](&d.writeRequestBytePool, writeRequestSlabPoolSize)
ctx = ingester_client.WithSlabPool(ctx, slabPool)
Expand All @@ -1317,46 +1300,26 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
return context.WithTimeout(context.WithoutCancel(ctx), d.cfg.RemoteTimeout)
})

// All tokens, stored in order: series, metadata.
keys := make([]uint32, len(seriesKeys)+len(metadataKeys))
initialMetadataIndex := len(seriesKeys)
copy(keys, seriesKeys)
copy(keys[initialMetadataIndex:], metadataKeys)
// Get both series and metadata keys in one slice.
keys, initialMetadataIndex := getSeriesAndMetadataTokens(userID, req)
// Get a subring if tenant has shuffle shard size configured.
subRing := d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))

// we must not re-use buffers now until all DoBatch goroutines have finished,
// so set this flag false and pass cleanup() to DoBatch.
cleanupInDefer = false

err = ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, subRing, keys,
func(ingester ring.InstanceDesc, indexes []int) error {
var timeseriesCount, metadataCount int
for _, i := range indexes {
if i >= initialMetadataIndex {
metadataCount++
} else {
timeseriesCount++
}
}

timeseries := preallocSliceIfNeeded[mimirpb.PreallocTimeseries](timeseriesCount)
metadata := preallocSliceIfNeeded[*mimirpb.MetricMetadata](metadataCount)

for _, i := range indexes {
if i >= initialMetadataIndex {
metadata = append(metadata, req.Metadata[i-initialMetadataIndex])
} else {
timeseries = append(timeseries, req.Timeseries[i])
}
}
req := req.ForIndexes(indexes, initialMetadataIndex)

// Do not cancel the remoteRequestContext in this callback:
// there are more callbacks using it at the same time.
localCtx, _ := remoteRequestContext()
var err error
if d.cfg.IngestStorageConfig.Enabled {
err = d.sendToStorage(localCtx, userID, ingester, timeseries, metadata, req.Source)
err = d.sendToStorage(localCtx, userID, ingester, req)
} else {
err = d.sendToIngester(localCtx, ingester, timeseries, metadata, req.Source)
err = d.sendToIngester(localCtx, ingester, req)
}

if errors.Is(err, context.DeadlineExceeded) {
Expand All @@ -1378,25 +1341,52 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
return err
}

func preallocSliceIfNeeded[T any](size int) []T {
if size > 0 {
return make([]T, 0, size)
}
return nil
// getSeriesAndMetadataTokens returns a slice of tokens for the series and metadata from the request in this specific order.
// Metadata tokens start at initialMetadataIndex.
func getSeriesAndMetadataTokens(userID string, req *mimirpb.WriteRequest) (keys []uint32, initialMetadataIndex int) {
seriesKeys := getTokensForSeries(userID, req.Timeseries)
metadataKeys := getTokensForMetadata(userID, req.Metadata)

// All tokens, stored in order: series, metadata.
keys = make([]uint32, len(seriesKeys)+len(metadataKeys))
initialMetadataIndex = len(seriesKeys)
copy(keys, seriesKeys)
copy(keys[initialMetadataIndex:], metadataKeys)
return keys, initialMetadataIndex
}

func (d *Distributor) getTokensForSeries(userID string, series []mimirpb.PreallocTimeseries) []uint32 {
func getTokensForSeries(userID string, series []mimirpb.PreallocTimeseries) []uint32 {
if len(series) == 0 {
return nil
}

result := make([]uint32, 0, len(series))
for _, ts := range series {
result = append(result, d.tokenForLabels(userID, ts.Labels))
result = append(result, tokenForLabels(userID, ts.Labels))
}
return result
}

func getTokensForMetadata(userID string, metadata []*mimirpb.MetricMetadata) []uint32 {
if len(metadata) == 0 {
return nil
}
metadataKeys := make([]uint32, 0, len(metadata))

for _, m := range metadata {
metadataKeys = append(metadataKeys, tokenForMetadata(userID, m.MetricFamilyName))
}
return metadataKeys
}

func tokenForLabels(userID string, labels []mimirpb.LabelAdapter) uint32 {
return mimirpb.ShardByAllLabelAdapters(userID, labels)
}

func tokenForMetadata(userID string, metricName string) uint32 {
return mimirpb.ShardByMetricName(userID, metricName)
}

func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string) {
var receivedSamples, receivedExemplars, receivedMetadata int
for _, ts := range req.Timeseries {
Expand All @@ -1410,38 +1400,28 @@ func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID st
d.receivedMetadata.WithLabelValues(userID).Add(float64(receivedMetadata))
}

func copyString(s string) string {
return string([]byte(s))
}

// sendToIngester sends received data to a specific ingester. This function is used when ingest storage is disabled.
func (d *Distributor) sendToIngester(ctx context.Context, ingester ring.InstanceDesc, timeseries []mimirpb.PreallocTimeseries, metadata []*mimirpb.MetricMetadata, source mimirpb.WriteRequest_SourceEnum) error {
func (d *Distributor) sendToIngester(ctx context.Context, ingester ring.InstanceDesc, req *mimirpb.WriteRequest) error {
h, err := d.ingesterPool.GetClientForInstance(ingester)
if err != nil {
return err
}
c := h.(ingester_client.IngesterClient)

req := &mimirpb.WriteRequest{
Timeseries: timeseries,
Metadata: metadata,
Source: source,
}

ctx = grpcutil.AppendMessageSizeToOutgoingContext(ctx, req) // Let ingester know the size of the message, without needing to read the message first.
_, err = c.Push(ctx, req)
return wrapIngesterPushError(err, ingester.Id)
}

// sendToStorage sends received data to the object storage, computing the partition based on the input ingester.
// This function is used when ingest storage is enabled.
func (d *Distributor) sendToStorage(ctx context.Context, userID string, ingester ring.InstanceDesc, timeseries []mimirpb.PreallocTimeseries, metadata []*mimirpb.MetricMetadata, source mimirpb.WriteRequest_SourceEnum) error {
func (d *Distributor) sendToStorage(ctx context.Context, userID string, ingester ring.InstanceDesc, req *mimirpb.WriteRequest) error {
partitionID, err := ingest.IngesterPartition(ingester.Id)
if err != nil {
return err
}

return d.ingestStorageWriter.WriteSync(ctx, partitionID, userID, timeseries, metadata, source)
return d.ingestStorageWriter.WriteSync(ctx, partitionID, userID, req)
}

// forReplicationSet runs f, in parallel, for all ingesters in the input replication set.
Expand Down
8 changes: 4 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5194,14 +5194,14 @@ func TestSeriesAreShardedToCorrectIngesters(t *testing.T) {
totalMetadata += len(ing[ix].metadata)

for _, ts := range ing[ix].timeseries {
token := distrib.tokenForLabels(userName, ts.Labels)
token := tokenForLabels(userName, ts.Labels)
ingIx := getIngesterIndexForToken(token, ing)
assert.Equal(t, ix, ingIx)
}

for _, metadataMap := range ing[ix].metadata {
for m := range metadataMap {
token := distrib.tokenForMetadata(userName, m.MetricFamilyName)
token := tokenForMetadata(userName, m.MetricFamilyName)
ingIx := getIngesterIndexForToken(token, ing)
assert.Equal(t, ix, ingIx)
}
Expand All @@ -5210,7 +5210,7 @@ func TestSeriesAreShardedToCorrectIngesters(t *testing.T) {

// Verify that all timeseries were forwarded to ingesters.
for _, ts := range req.Timeseries {
token := distrib.tokenForLabels(userName, ts.Labels)
token := tokenForLabels(userName, ts.Labels)
ingIx := getIngesterIndexForToken(token, ing)

assert.Equal(t, ts.Labels, ing[ingIx].timeseries[token].Labels)
Expand Down Expand Up @@ -5578,7 +5578,7 @@ func TestSendMessageMetadata(t *testing.T) {
Source: mimirpb.API,
}

err = d.sendToIngester(ctx, ring.InstanceDesc{Addr: "1.2.3.4:5555", Id: "test"}, req.Timeseries, nil, req.Source)
err = d.sendToIngester(ctx, ring.InstanceDesc{Addr: "1.2.3.4:5555", Id: "test"}, req)
require.NoError(t, err)

// Verify that d.sendToIngester added message size to metadata.
Expand Down
37 changes: 37 additions & 0 deletions pkg/mimirpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,3 +590,40 @@ func copyToYoloString(buf []byte, src string) (string, []byte) {
copy(buf, *((*[]byte)(unsafe.Pointer(&src))))
return yoloString(buf), buf[len(buf):]
}

// ForIndexes builds a new WriteRequest from the given WriteRequest, containing only the timeseries and metadata for the given indexes.
// It assumes the indexes before the initialMetadataIndex are timeseries, and the rest are metadata.
func (p *WriteRequest) ForIndexes(indexes []int, initialMetadataIndex int) *WriteRequest {
var timeseriesCount, metadataCount int
for _, i := range indexes {
if i >= initialMetadataIndex {
metadataCount++
} else {
timeseriesCount++
}
}

timeseries := preallocSliceIfNeeded[PreallocTimeseries](timeseriesCount)
metadata := preallocSliceIfNeeded[*MetricMetadata](metadataCount)

for _, i := range indexes {
if i >= initialMetadataIndex {
metadata = append(metadata, p.Metadata[i-initialMetadataIndex])
} else {
timeseries = append(timeseries, p.Timeseries[i])
}
}

return &WriteRequest{
Timeseries: timeseries,
Metadata: metadata,
Source: p.Source,
}
}

func preallocSliceIfNeeded[T any](size int) []T {
if size > 0 {
return make([]T, 0, size)
}
return nil
}
13 changes: 3 additions & 10 deletions pkg/storage/ingest/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,15 @@ func (w *Writer) stopping(_ error) error {

// WriteSync the input data to the ingest storage. The function blocks until the data has been successfully committed,
// or an error occurred.
func (w *Writer) WriteSync(ctx context.Context, partitionID int32, userID string, timeseries []mimirpb.PreallocTimeseries, metadata []*mimirpb.MetricMetadata, source mimirpb.WriteRequest_SourceEnum) error {
func (w *Writer) WriteSync(ctx context.Context, partitionID int32, userID string, req *mimirpb.WriteRequest) error {
startTime := time.Now()

// Nothing to do if the input data is empty.
if len(timeseries) == 0 && len(metadata) == 0 {
if len(req.Timeseries) == 0 && len(req.Metadata) == 0 {
return nil
}

// Serialise the input data.
entry := &mimirpb.WriteRequest{
Timeseries: timeseries,
Metadata: metadata,
Source: source,
}

data, err := entry.Marshal()
data, err := req.Marshal()
if err != nil {
return errors.Wrap(err, "failed to serialise data")
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/storage/ingest/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestWriter_WriteSync(t *testing.T) {
return nil, nil, false
})

err := writer.WriteSync(ctx, partitionID, tenantID, multiSeries, nil, mimirpb.API)
err := writer.WriteSync(ctx, partitionID, tenantID, &mimirpb.WriteRequest{Timeseries: multiSeries, Metadata: nil, Source: mimirpb.API})
require.NoError(t, err)

// Ensure it was processed before returning.
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestWriter_WriteSync(t *testing.T) {

// Write the first record, which is expected to be sent immediately.
runAsync(&wg, func() {
require.NoError(t, writer.WriteSync(ctx, partitionID, tenantID, series1, nil, mimirpb.API))
require.NoError(t, writer.WriteSync(ctx, partitionID, tenantID, &mimirpb.WriteRequest{Timeseries: series1, Metadata: nil, Source: mimirpb.API}))
})

// Once the 1st Produce request is received by the server but still processing (there's a 1s sleep),
Expand All @@ -145,11 +145,11 @@ func TestWriter_WriteSync(t *testing.T) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()

require.Equal(t, context.DeadlineExceeded, writer.WriteSync(ctxWithTimeout, partitionID, tenantID, series2, nil, mimirpb.API))
require.Equal(t, context.DeadlineExceeded, writer.WriteSync(ctxWithTimeout, partitionID, tenantID, &mimirpb.WriteRequest{Timeseries: series2, Metadata: nil, Source: mimirpb.API}))
})

runAsyncAfter(&wg, firstRequestReceived, func() {
require.NoError(t, writer.WriteSync(ctx, partitionID, tenantID, series3, nil, mimirpb.API))
require.NoError(t, writer.WriteSync(ctx, partitionID, tenantID, &mimirpb.WriteRequest{Timeseries: series3, Metadata: nil, Source: mimirpb.API}))
})

wg.Wait()
Expand Down Expand Up @@ -199,11 +199,11 @@ func TestWriter_WriteSync(t *testing.T) {
wg.Add(3)

runAsync(&wg, func() {
require.NoError(t, writer.WriteSync(ctx, partitionID, tenantID, series1, nil, mimirpb.API))
require.NoError(t, writer.WriteSync(ctx, partitionID, tenantID, &mimirpb.WriteRequest{Timeseries: series1, Metadata: nil, Source: mimirpb.API}))
})

runAsyncAfter(&wg, firstRequestReceived, func() {
require.NoError(t, writer.WriteSync(ctx, partitionID, tenantID, series2, nil, mimirpb.API))
require.NoError(t, writer.WriteSync(ctx, partitionID, tenantID, &mimirpb.WriteRequest{Timeseries: series2, Metadata: nil, Source: mimirpb.API}))
})

runAsyncAfter(&wg, firstRequestReceived, func() {
Expand All @@ -212,7 +212,7 @@ func TestWriter_WriteSync(t *testing.T) {
// and not because it's waiting for the 1st call to complete.
time.Sleep(100 * time.Millisecond)

require.NoError(t, writer.WriteSync(ctx, partitionID, tenantID, series3, nil, mimirpb.API))
require.NoError(t, writer.WriteSync(ctx, partitionID, tenantID, &mimirpb.WriteRequest{Timeseries: series3, Metadata: nil, Source: mimirpb.API}))
})

wg.Wait()
Expand All @@ -228,7 +228,7 @@ func TestWriter_WriteSync(t *testing.T) {
writer, _ := createTestWriter(t, createTestKafkaConfig(clusterAddr, topicName))

// Write to a non-existing partition.
err := writer.WriteSync(ctx, 100, tenantID, multiSeries, nil, mimirpb.API)
err := writer.WriteSync(ctx, 100, tenantID, &mimirpb.WriteRequest{Timeseries: multiSeries, Metadata: nil, Source: mimirpb.API})
require.Error(t, err)
})

Expand All @@ -246,7 +246,7 @@ func TestWriter_WriteSync(t *testing.T) {
})

startTime := time.Now()
require.Equal(t, kgo.ErrRecordTimeout, writer.WriteSync(ctx, partitionID, tenantID, series1, nil, mimirpb.API))
require.Equal(t, kgo.ErrRecordTimeout, writer.WriteSync(ctx, partitionID, tenantID, &mimirpb.WriteRequest{Timeseries: series1, Metadata: nil, Source: mimirpb.API}))
elapsedTime := time.Since(startTime)

assert.Greater(t, elapsedTime, kafkaCfg.WriteTimeout/2)
Expand Down Expand Up @@ -290,7 +290,7 @@ func TestWriter_WriteSync(t *testing.T) {
// The 1st request is expected to fail because Kafka will take longer than the configured timeout.
runAsync(&wg, func() {
startTime := time.Now()
require.Equal(t, kgo.ErrRecordTimeout, writer.WriteSync(ctx, partitionID, tenantID, series1, nil, mimirpb.API))
require.Equal(t, kgo.ErrRecordTimeout, writer.WriteSync(ctx, partitionID, tenantID, &mimirpb.WriteRequest{Timeseries: series1, Metadata: nil, Source: mimirpb.API}))
elapsedTime := time.Since(startTime)

// It should take nearly the client's write timeout.
Expand All @@ -308,7 +308,7 @@ func TestWriter_WriteSync(t *testing.T) {
time.Sleep(kafkaCfg.WriteTimeout + writerRequestTimeoutOverhead - delay)

startTime := time.Now()
require.Equal(t, kgo.ErrRecordTimeout, writer.WriteSync(ctx, partitionID, tenantID, series2, nil, mimirpb.API))
require.Equal(t, kgo.ErrRecordTimeout, writer.WriteSync(ctx, partitionID, tenantID, &mimirpb.WriteRequest{Timeseries: series2, Metadata: nil, Source: mimirpb.API}))
elapsedTime := time.Since(startTime)

// We expect to fail once the previous request fails, so it should take nearly the client's write timeout
Expand Down

0 comments on commit 297e905

Please sign in to comment.