Skip to content

Commit

Permalink
Distributor: add support to tee writes to ingesters and partitions
Browse files Browse the repository at this point in the history
Remove indirection.
Rename functions
Move function
Clarified push error wrapping
Added unit tests
Fixed TestDistributor_Push_SendMessageMetadata
Remove unused mockInstanceClient
Fix multi error handling
Re-ordering
Fix typo

Signed-off-by: Marco Pracucci <[email protected]>
Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pracucci committed Apr 12, 2024
1 parent 9930b29 commit 501b691
Show file tree
Hide file tree
Showing 4 changed files with 410 additions and 37 deletions.
89 changes: 77 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1327,31 +1327,51 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
// Get both series and metadata keys in one slice.
keys, initialMetadataIndex := getSeriesAndMetadataTokens(userID, req)

var (
ingestersSubring ring.DoBatchRing
partitionsSubring ring.DoBatchRing
)

// Get the tenant's subring to use to either write to ingesters or partitions.
var tenantRing ring.DoBatchRing
if d.cfg.IngestStorageConfig.Enabled {
subring, err := d.partitionsRing.ShuffleShard(userID, d.limits.IngestionPartitionsTenantShardSize(userID))
if err != nil {
return err
}

tenantRing = ring.NewActivePartitionBatchRing(subring.PartitionRing())
} else {
tenantRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))
partitionsSubring = ring.NewActivePartitionBatchRing(subring.PartitionRing())
}

if !d.cfg.IngestStorageConfig.Enabled || d.cfg.IngestStorageConfig.Migration.DistributorSendToIngestersEnabled {
ingestersSubring = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))
}

// we must not re-use buffers now until all writes to backends (e.g. ingesters) have completed, which can happen
// even after this function returns. For this reason, it's unsafe to cleanup in the defer and we'll do the cleanup
// once all backend requests have completed (see cleanup function passed to sendWriteRequestToBackends()).
cleanupInDefer = false

return d.sendWriteRequestToBackends(ctx, userID, req, keys, initialMetadataIndex, tenantRing, pushReq.CleanUp)
return d.sendWriteRequestToBackends(ctx, userID, req, keys, initialMetadataIndex, ingestersSubring, partitionsSubring, pushReq.CleanUp)
}

// sendWriteRequestToBackends sends the input req data to backends (i.e. ingesters or partitions).
// sendWriteRequestToBackends sends the input req data to backends. The backends could be:
// - Ingesters, when ingestersSubring is not nil
// - Ingest storage partitions, when partitionsSubring is not nil
//
// The input cleanup function is guaranteed to be called after all requests to all backends have completed.
func (d *Distributor) sendWriteRequestToBackends(ctx context.Context, tenantID string, req *mimirpb.WriteRequest, keys []uint32, initialMetadataIndex int, tenantRing ring.DoBatchRing, cleanup func()) error {
func (d *Distributor) sendWriteRequestToBackends(ctx context.Context, tenantID string, req *mimirpb.WriteRequest, keys []uint32, initialMetadataIndex int, ingestersSubring, partitionsSubring ring.DoBatchRing, cleanup func()) error {
var (
wg = sync.WaitGroup{}
partitionsErr error
ingestersErr error
)

// Ensure at least one ring has been provided.
if ingestersSubring == nil && partitionsSubring == nil {
// It should never happen. If it happens, it's a logic bug.
panic("no tenant subring has been provided to sendWriteRequestToBackends()")
}

// Use an independent context to make sure all backend instances (e.g. ingesters) get samples even if we return early.
// It will still take a while to lookup the ring and calculate which instance gets which series,
// so we'll start the remote timeout once the first callback is called.
Expand Down Expand Up @@ -1380,14 +1400,53 @@ func (d *Distributor) sendWriteRequestToBackends(ctx context.Context, tenantID s
Go: d.doBatchPushWorkers,
}

if d.cfg.IngestStorageConfig.Enabled {
return d.sendWriteRequestToPartitions(ctx, tenantID, tenantRing, req, keys, initialMetadataIndex, remoteRequestContext, batchOptions)
// Keep it easy if there's only 1 backend to write to.
if partitionsSubring == nil {
return d.sendWriteRequestToIngesters(ctx, ingestersSubring, req, keys, initialMetadataIndex, remoteRequestContext, batchOptions)
}
if ingestersSubring == nil {
return d.sendWriteRequestToPartitions(ctx, tenantID, partitionsSubring, req, keys, initialMetadataIndex, remoteRequestContext, batchOptions)
}

// Prepare a callback function that will call the input cleanup callback function only after
// the cleanup has been done for all backends.
cleanupWaitBackends := atomic.NewInt64(2)
batchOptions.Cleanup = func() {
if cleanupWaitBackends.Dec() == 0 {
batchCleanup()
}
}

// Write both to ingesters and partitions.
wg.Add(2)

go func() {
defer wg.Done()

ingestersErr = d.sendWriteRequestToIngesters(ctx, ingestersSubring, req, keys, initialMetadataIndex, remoteRequestContext, batchOptions)
}()

go func() {
defer wg.Done()

partitionsErr = d.sendWriteRequestToPartitions(ctx, tenantID, partitionsSubring, req, keys, initialMetadataIndex, remoteRequestContext, batchOptions)
}()

// Wait until all backends have done.
wg.Wait()

// Ingester errors could be soft (e.g. 4xx) or hard errors (e.g. 5xx) errors, while partition errors are always hard
// errors. For this reason, it's important to give precedence to partition errors, otherwise the distributor may return
// a 4xx (ingester error) when it should actually be a 5xx (partition error).
// TODO unit test
if partitionsErr != nil {
return partitionsErr
}
return d.sendWriteRequestToIngesters(ctx, tenantRing, req, keys, initialMetadataIndex, remoteRequestContext, batchOptions)
return ingestersErr
}

func (d *Distributor) sendWriteRequestToIngesters(ctx context.Context, tenantRing ring.DoBatchRing, req *mimirpb.WriteRequest, keys []uint32, initialMetadataIndex int, remoteRequestContext func() context.Context, batchOptions ring.DoBatchOptions) error {
return ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, tenantRing, keys,
err := ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, tenantRing, keys,
func(ingester ring.InstanceDesc, indexes []int) error {
req := req.ForIndexes(indexes, initialMetadataIndex)

Expand All @@ -1406,10 +1465,13 @@ func (d *Distributor) sendWriteRequestToIngesters(ctx context.Context, tenantRin

return err
}, batchOptions)

// Since data may be written to different backends it may be helpful to clearly identify which backend failed.
return errors.Wrap(err, "send data to ingesters")
}

func (d *Distributor) sendWriteRequestToPartitions(ctx context.Context, tenantID string, tenantRing ring.DoBatchRing, req *mimirpb.WriteRequest, keys []uint32, initialMetadataIndex int, remoteRequestContext func() context.Context, batchOptions ring.DoBatchOptions) error {
return ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, tenantRing, keys,
err := ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, tenantRing, keys,
func(partition ring.InstanceDesc, indexes []int) error {
req := req.ForIndexes(indexes, initialMetadataIndex)

Expand All @@ -1427,6 +1489,9 @@ func (d *Distributor) sendWriteRequestToPartitions(ctx context.Context, tenantID
return err
}, batchOptions,
)

// Since data may be written to different backends it may be helpful to clearly identify which backend failed.
return errors.Wrap(err, "send data to partitions")
}

// getSeriesAndMetadataTokens returns a slice of tokens for the series and metadata from the request in this specific order.
Expand Down
Loading

0 comments on commit 501b691

Please sign in to comment.