Skip to content

Commit

Permalink
Refactoring: introduce Distributor.sendWriteRequestToBackends() (#7877)
Browse files Browse the repository at this point in the history
* Refactoring: introduce Distributor.sendWriteRequestToBackends()

Signed-off-by: Marco Pracucci <[email protected]>

* Fixed typo in a comment

Signed-off-by: Marco Pracucci <[email protected]>

* Add function comment

Signed-off-by: Marco Pracucci <[email protected]>

---------

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Apr 11, 2024
1 parent 6f5018c commit b53ab29
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 26 deletions.
69 changes: 43 additions & 26 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,13 +1324,6 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
ctx = ingester_client.WithSlabPool(ctx, slabPool)
}

// Use an independent context to make sure all ingesters get samples even if we return early.
// It will still take a while to calculate which ingester gets which series,
// so we'll start the remote timeout once the first callback is called.
remoteRequestContext := sync.OnceValues(func() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.WithoutCancel(ctx), d.cfg.RemoteTimeout)
})

// Get both series and metadata keys in one slice.
keys, initialMetadataIndex := getSeriesAndMetadataTokens(userID, req)

Expand All @@ -1347,41 +1340,65 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
tenantRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))
}

// we must not re-use buffers now until all DoBatch goroutines have finished,
// so set this flag false and pass cleanup() to DoBatch.
// we must not re-use buffers now until all writes to backends (e.g. ingesters) have completed, which can happen
// even after this function returns. For this reason, it's unsafe to cleanup in the defer and we'll do the cleanup
// once all backend requests have completed (see cleanup function passed to sendWriteRequestToBackends()).
cleanupInDefer = false

err = ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, tenantRing, keys,
return d.sendWriteRequestToBackends(ctx, userID, req, keys, initialMetadataIndex, tenantRing, pushReq.CleanUp)
}

// sendWriteRequestToBackends sends the input req data to backends (i.e. ingesters or partitions).
//
// The input cleanup function is guaranteed to be called after all requests to all backends have completed.
func (d *Distributor) sendWriteRequestToBackends(ctx context.Context, tenantID string, req *mimirpb.WriteRequest, keys []uint32, initialMetadataIndex int, tenantRing ring.DoBatchRing, cleanup func()) error {
// Use an independent context to make sure all backend instances (e.g. ingesters) get samples even if we return early.
// It will still take a while to lookup the ring and calculate which instance gets which series,
// so we'll start the remote timeout once the first callback is called.
remoteRequestContextAndCancel := sync.OnceValues(func() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.WithoutCancel(ctx), d.cfg.RemoteTimeout)
})

remoteRequestContext := func() context.Context {
ctx, _ := remoteRequestContextAndCancel()
return ctx
}

batchCleanup := func() {
// Notify the provided callback that it's now safe to run the cleanup because there are no
// more in-flight requests to the backend.
cleanup()

// All requests have completed, so it's now safe to cancel the requests context to release resources.
_, cancel := remoteRequestContextAndCancel()
cancel()
}

batchOptions := ring.DoBatchOptions{
Cleanup: batchCleanup,
IsClientError: isIngesterClientError,
Go: d.doBatchPushWorkers,
}

return ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, tenantRing, keys,
func(instance ring.InstanceDesc, indexes []int) error {
req := req.ForIndexes(indexes, initialMetadataIndex)

// Do not cancel the remoteRequestContext in this callback:
// there are more callbacks using it at the same time.
localCtx, _ := remoteRequestContext()
localCtx := remoteRequestContext()
var err error
if d.cfg.IngestStorageConfig.Enabled {
err = d.sendToStorage(localCtx, userID, instance, req)
err = d.sendToStorage(localCtx, tenantID, instance, req)
} else {
err = d.sendToIngester(localCtx, instance, req)
}

if errors.Is(err, context.DeadlineExceeded) {
return errors.Wrap(err, deadlineExceededWrapMessage)
}
err = wrapDeadlineExceededPushError(err)
return err
},
ring.DoBatchOptions{
Cleanup: func() {
pushReq.CleanUp()
_, cancel := remoteRequestContext()
cancel()
},
IsClientError: isIngesterClientError,
Go: d.doBatchPushWorkers,
},
batchOptions,
)

return err
}

// getSeriesAndMetadataTokens returns a slice of tokens for the series and metadata from the request in this specific order.
Expand Down
9 changes: 9 additions & 0 deletions pkg/distributor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package distributor

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -295,6 +296,14 @@ func wrapPartitionPushError(err error, partitionID int32) error {
return errors.Wrap(err, fmt.Sprintf("%s %d", failedPushingToPartitionMessage, partitionID))
}

func wrapDeadlineExceededPushError(err error) error {
if err != nil && errors.Is(err, context.DeadlineExceeded) {
return errors.Wrap(err, deadlineExceededWrapMessage)
}

return err
}

func isIngesterClientError(err error) bool {
var ingesterPushErr ingesterPushError
if errors.As(err, &ingesterPushErr) {
Expand Down

0 comments on commit b53ab29

Please sign in to comment.