Skip to content

Commit

Permalink
improved performance going through all possible compaction groups
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Le <[email protected]>
  • Loading branch information
alexqyle committed Sep 8, 2023
1 parent dc3d3af commit 4f7d9c0
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 70 deletions.
2 changes: 1 addition & 1 deletion pkg/compactor/block_visit_marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func markBlocksVisited(
}
reader.Reset(visitMarkerFileContent)
}
level.Debug(logger).Log("msg", "marked block visited", "partition_id", marker.PartitionID, "blocks", generateBlocksInfo(blocks))
level.Debug(logger).Log("msg", "marked blocks visited", "partition_id", marker.PartitionID, "blocks", generateBlocksInfo(blocks))
}

func markBlocksVisitedHeartBeat(
Expand Down
2 changes: 2 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package compactor

import (
"context"
crypto_rand "crypto/rand"
"flag"
"fmt"
"hash/fnv"
Expand Down Expand Up @@ -810,6 +811,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
defer c.syncerMetrics.gatherThanosSyncerMetrics(reg)

ulogger := util_log.WithUserID(userID, c.logger)
ulogger = util_log.WithExecutionID(ulid.MustNew(ulid.Now(), crypto_rand.Reader).String(), ulogger)

// Filters out duplicate blocks that can be formed from two or more overlapping
// blocks that fully submatches the source blocks of the older blocks.
Expand Down
7 changes: 6 additions & 1 deletion pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1634,10 +1634,15 @@ func removeIgnoredLogs(input []string) []string {
}

out := make([]string, 0, len(input))
durationRe := regexp.MustCompile(`\s?duration=\S+`)
executionIDRe := regexp.MustCompile(`\s?duration=\S+`)
durationRe := regexp.MustCompile(`\s?execution_id=\S+`)

for i := 0; i < len(input); i++ {
log := input[i]

// Remove any execution_id from logs.
log = executionIDRe.ReplaceAllString(log, "")

if strings.Contains(log, "block.MetaFetcher") || strings.Contains(log, "block.BaseFetcher") {
continue
}
Expand Down
155 changes: 88 additions & 67 deletions pkg/compactor/shuffle_sharding_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re
return iGroupKey < jGroupKey
})

var blockGroups []*blocksGroup
for _, group := range groups {
var blockIds []string
for _, block := range group.blocks {
Expand All @@ -234,79 +235,98 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re

partitionedGroupID := partitionedGroupInfo.PartitionedGroupID
partitionCount := partitionedGroupInfo.PartitionCount
partitionAdded := 0
for _, partition := range partitionedGroupInfo.Partitions {
partitionID := partition.PartitionID
partitionedGroup, err := createBlocksGroup(blocks, partition.Blocks, partitionedGroupInfo.RangeStart, partitionedGroupInfo.RangeEnd)
if err != nil {
level.Error(g.logger).Log("msg", "unable to create partitioned group", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "err", err)
continue
}
if isVisited, err := g.isGroupVisited(partitionedGroup.blocks, partitionID, g.ringLifecyclerID); err != nil {
level.Warn(g.logger).Log("msg", "unable to check if blocks in partition are visited", "group hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "err", err, "group", group.String())
continue
} else if isVisited {
level.Info(g.logger).Log("msg", "skipping group because at least one block in partition is visited", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID)
continue
}
partitionedGroup.groupHash = groupHash
partitionedGroup.partitionedGroupInfo = partitionedGroupInfo
partitionedGroup.partition = partition
blockGroups = append(blockGroups, partitionedGroup)
partitionAdded++

level.Debug(g.logger).Log("msg", "found available partition", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount)
remainingCompactions++
}
level.Info(g.logger).Log("msg", fmt.Sprintf("found available partitions: %d", partitionAdded))
}
level.Info(g.logger).Log("msg", fmt.Sprintf("total possible group for compaction: %d", len(blockGroups)))

for _, partitionedGroup := range blockGroups {
groupHash := partitionedGroup.groupHash
partitionedGroupID := partitionedGroup.partitionedGroupInfo.PartitionedGroupID
partitionCount := partitionedGroup.partitionedGroupInfo.PartitionCount
partitionID := partitionedGroup.partition.PartitionID
if isVisited, err := g.isGroupVisited(partitionedGroup.blocks, partitionID, g.ringLifecyclerID); err != nil {
level.Warn(g.logger).Log("msg", "unable to check if blocks in partition are visited", "group hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "err", err, "group", partitionedGroup.String())
continue
} else if isVisited {
level.Info(g.logger).Log("msg", "skipping group because at least one block in partition is visited", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID)
remainingCompactions--
continue
}
partitionedGroupKey := createGroupKeyWithPartitionID(groupHash, partitionID, *partitionedGroup)

level.Info(g.logger).Log("msg", "found compactable group for user", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "group", partitionedGroup.String())
blockVisitMarker := BlockVisitMarker{
VisitTime: time.Now().Unix(),
CompactorID: g.ringLifecyclerID,
Status: Pending,
PartitionedGroupID: partitionedGroupID,
PartitionID: partitionID,
Version: VisitMarkerVersion1,
}
markBlocksVisited(g.ctx, g.bkt, g.logger, partitionedGroup.blocks, blockVisitMarker, g.blockVisitMarkerWriteFailed)
level.Info(g.logger).Log("msg", "marked blocks visited in group", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "group", partitionedGroup.String())

resolution := partitionedGroup.blocks[0].Thanos.Downsample.Resolution
externalLabels := labels.FromMap(partitionedGroup.blocks[0].Thanos.Labels)
thanosGroup, err := compact.NewGroup(
log.With(g.logger, "groupKey", partitionedGroupKey, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "rangeStart", partitionedGroup.rangeStartTime().String(), "rangeEnd", partitionedGroup.rangeEndTime().String(), "externalLabels", externalLabels, "downsampleResolution", resolution),
g.bkt,
partitionedGroupKey,
externalLabels,
resolution,
g.acceptMalformedIndex,
true, // Enable vertical compaction.
g.compactions.WithLabelValues(partitionedGroupKey),
g.compactionRunsStarted.WithLabelValues(partitionedGroupKey),
g.compactionRunsCompleted.WithLabelValues(partitionedGroupKey),
g.compactionFailures.WithLabelValues(partitionedGroupKey),
g.verticalCompactions.WithLabelValues(partitionedGroupKey),
g.garbageCollectedBlocks,
g.blocksMarkedForDeletion,
g.blocksMarkedForNoCompact,
g.hashFunc,
g.blockFilesConcurrency,
g.blocksFetchConcurrency,
)
if err != nil {
level.Error(g.logger).Log("msg", "failed to create partitioned group", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "blocks", partitionedGroup.partition.Blocks)
}

if len(outGroups) < g.compactionConcurrency {
partitionedGroupKey := createGroupKeyWithPartitionID(groupHash, partitionID, *partitionedGroup)

level.Info(g.logger).Log("msg", "found compactable group for user", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "group", partitionedGroup.String())
blockVisitMarker := BlockVisitMarker{
VisitTime: time.Now().Unix(),
CompactorID: g.ringLifecyclerID,
Status: Pending,
PartitionedGroupID: partitionedGroupID,
PartitionID: partitionID,
Version: VisitMarkerVersion1,
}
markBlocksVisited(g.ctx, g.bkt, g.logger, partitionedGroup.blocks, blockVisitMarker, g.blockVisitMarkerWriteFailed)

resolution := partitionedGroup.blocks[0].Thanos.Downsample.Resolution
externalLabels := labels.FromMap(partitionedGroup.blocks[0].Thanos.Labels)
thanosGroup, err := compact.NewGroup(
log.With(g.logger, "groupKey", partitionedGroupKey, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "rangeStart", partitionedGroup.rangeStartTime().String(), "rangeEnd", partitionedGroup.rangeEndTime().String(), "externalLabels", externalLabels, "downsampleResolution", resolution),
g.bkt,
partitionedGroupKey,
externalLabels,
resolution,
g.acceptMalformedIndex,
true, // Enable vertical compaction.
g.compactions.WithLabelValues(partitionedGroupKey),
g.compactionRunsStarted.WithLabelValues(partitionedGroupKey),
g.compactionRunsCompleted.WithLabelValues(partitionedGroupKey),
g.compactionFailures.WithLabelValues(partitionedGroupKey),
g.verticalCompactions.WithLabelValues(partitionedGroupKey),
g.garbageCollectedBlocks,
g.blocksMarkedForDeletion,
g.blocksMarkedForNoCompact,
g.hashFunc,
g.blockFilesConcurrency,
g.blocksFetchConcurrency,
)
if err != nil {
level.Error(g.logger).Log("msg", "failed to create partitioned group", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "blocks", partition.Blocks)
}

for _, m := range partitionedGroup.blocks {
if err := thanosGroup.AppendMeta(m); err != nil {
level.Error(g.logger).Log("msg", "failed to add block to partitioned group", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "block", m.ULID)
}
}
thanosGroup.SetExtensions(&CortexMetaExtensions{
PartitionInfo: &PartitionInfo{
PartitionedGroupID: partitionedGroupID,
PartitionCount: partitionCount,
PartitionID: partitionID,
},
})

outGroups = append(outGroups, thanosGroup)
for _, m := range partitionedGroup.blocks {
if err := thanosGroup.AppendMeta(m); err != nil {
level.Error(g.logger).Log("msg", "failed to add block to partitioned group", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "block", m.ULID)
}
}
thanosGroup.SetExtensions(&CortexMetaExtensions{
PartitionInfo: &PartitionInfo{
PartitionedGroupID: partitionedGroupID,
PartitionCount: partitionCount,
PartitionID: partitionID,
},
})

outGroups = append(outGroups, thanosGroup)
level.Debug(g.logger).Log("msg", "added partition to compaction groups", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount)
if len(outGroups) >= g.compactionConcurrency {
break
}
}

level.Info(g.logger).Log("msg", fmt.Sprintf("total groups for compaction: %d", len(outGroups)))
Expand All @@ -321,10 +341,8 @@ func (g *ShuffleShardingGrouper) generatePartitionBlockGroup(group blocksGroup,
}
updatedPartitionedGroupInfo, err := UpdatePartitionedGroupInfo(g.ctx, g.bkt, g.logger, *partitionedGroupInfo, g.partitionedGroupInfoReadFailed, g.partitionedGroupInfoWriteFailed)
if err != nil {
level.Warn(g.logger).Log("msg", "unable to update partitioned group info", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "err", err)
return nil, err
}
level.Debug(g.logger).Log("msg", "generated partitioned groups", "groups", updatedPartitionedGroupInfo)
return updatedPartitionedGroupInfo, nil
}

Expand Down Expand Up @@ -539,10 +557,13 @@ func createBlocksGroup(blocks map[ulid.ULID]*metadata.Meta, blockIDs []ulid.ULID

// blocksGroup struct and functions copied and adjusted from https://github.com/cortexproject/cortex/pull/2616
type blocksGroup struct {
rangeStart int64 // Included.
rangeEnd int64 // Excluded.
blocks []*metadata.Meta
key string
rangeStart int64 // Included.
rangeEnd int64 // Excluded.
blocks []*metadata.Meta
key string
groupHash uint32
partitionedGroupInfo *PartitionedGroupInfo
partition Partition
}

// overlaps returns whether the group range overlaps with the input group.
Expand Down
3 changes: 2 additions & 1 deletion pkg/compactor/shuffle_sharding_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func (p *ShuffleShardingPlanner) PlanWithPartition(_ context.Context, metasByMin
return nil, fmt.Errorf("block %s with partition ID %d is in completed status", blockID, partitionID)
}
if !blockVisitMarker.isVisitedByCompactor(p.blockVisitMarkerTimeout, partitionID, p.ringLifecyclerID) {
level.Warn(p.logger).Log("msg", "block is not visited by current compactor", "block_id", blockID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID)
level.Warn(p.logger).Log("msg", "block is not visited by current compactor", "block_id", blockID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID,
"marker_partitioned_group_id", blockVisitMarker.PartitionedGroupID, "marker_partition_id", blockVisitMarker.PartitionID, "marker_compactor_id", blockVisitMarker.CompactorID, "marker_visit_time", blockVisitMarker.VisitTime)
return nil, nil
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/util/log/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ func WithUserID(userID string, l kitlog.Logger) kitlog.Logger {
return kitlog.With(l, "org_id", userID)
}

// WithExecutionID returns a Logger that has information about the execution id in
// its details.
func WithExecutionID(executionID string, l kitlog.Logger) kitlog.Logger {
return kitlog.With(l, "execution_id", executionID)
}

// WithTraceID returns a Logger that has information about the traceID in
// its details.
func WithTraceID(traceID string, l kitlog.Logger) kitlog.Logger {
Expand Down

0 comments on commit 4f7d9c0

Please sign in to comment.