diff --git a/pkg/compactor/block_visit_marker.go b/pkg/compactor/block_visit_marker.go index 7ddcd158c8..74034936c3 100644 --- a/pkg/compactor/block_visit_marker.go +++ b/pkg/compactor/block_visit_marker.go @@ -141,7 +141,7 @@ func markBlocksVisited( } reader.Reset(visitMarkerFileContent) } - level.Debug(logger).Log("msg", "marked block visited", "partition_id", marker.PartitionID, "blocks", generateBlocksInfo(blocks)) + level.Debug(logger).Log("msg", "marked blocks visited", "partition_id", marker.PartitionID, "blocks", generateBlocksInfo(blocks)) } func markBlocksVisitedHeartBeat( diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 77d4d13f51..e66ad5ea0c 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -2,6 +2,7 @@ package compactor import ( "context" + crypto_rand "crypto/rand" "flag" "fmt" "hash/fnv" @@ -810,6 +811,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { defer c.syncerMetrics.gatherThanosSyncerMetrics(reg) ulogger := util_log.WithUserID(userID, c.logger) + ulogger = util_log.WithExecutionID(ulid.MustNew(ulid.Now(), crypto_rand.Reader).String(), ulogger) // Filters out duplicate blocks that can be formed from two or more overlapping // blocks that fully submatches the source blocks of the older blocks. diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 691373ca68..81a4edb070 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1634,10 +1634,15 @@ func removeIgnoredLogs(input []string) []string { } out := make([]string, 0, len(input)) - durationRe := regexp.MustCompile(`\s?duration=\S+`) + executionIDRe := regexp.MustCompile(`\s?duration=\S+`) + durationRe := regexp.MustCompile(`\s?execution_id=\S+`) for i := 0; i < len(input); i++ { log := input[i] + + // Remove any execution_id from logs. + log = executionIDRe.ReplaceAllString(log, "") + if strings.Contains(log, "block.MetaFetcher") || strings.Contains(log, "block.BaseFetcher") { continue } diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 2acabe2540..44045dcab2 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -210,6 +210,7 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re return iGroupKey < jGroupKey }) + var blockGroups []*blocksGroup for _, group := range groups { var blockIds []string for _, block := range group.blocks { @@ -234,6 +235,7 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re partitionedGroupID := partitionedGroupInfo.PartitionedGroupID partitionCount := partitionedGroupInfo.PartitionCount + partitionAdded := 0 for _, partition := range partitionedGroupInfo.Partitions { partitionID := partition.PartitionID partitionedGroup, err := createBlocksGroup(blocks, partition.Blocks, partitionedGroupInfo.RangeStart, partitionedGroupInfo.RangeEnd) @@ -241,72 +243,90 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re level.Error(g.logger).Log("msg", "unable to create partitioned group", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "err", err) continue } - if isVisited, err := g.isGroupVisited(partitionedGroup.blocks, partitionID, g.ringLifecyclerID); err != nil { - level.Warn(g.logger).Log("msg", "unable to check if blocks in partition are visited", "group hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "err", err, "group", group.String()) - continue - } else if isVisited { - level.Info(g.logger).Log("msg", "skipping group because at least one block in partition is visited", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID) - continue - } + partitionedGroup.groupHash = groupHash + partitionedGroup.partitionedGroupInfo = partitionedGroupInfo + partitionedGroup.partition = partition + blockGroups = append(blockGroups, partitionedGroup) + partitionAdded++ + level.Debug(g.logger).Log("msg", "found available partition", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount) remainingCompactions++ + } + level.Info(g.logger).Log("msg", fmt.Sprintf("found available partitions: %d", partitionAdded)) + } + level.Info(g.logger).Log("msg", fmt.Sprintf("total possible group for compaction: %d", len(blockGroups))) + + for _, partitionedGroup := range blockGroups { + groupHash := partitionedGroup.groupHash + partitionedGroupID := partitionedGroup.partitionedGroupInfo.PartitionedGroupID + partitionCount := partitionedGroup.partitionedGroupInfo.PartitionCount + partitionID := partitionedGroup.partition.PartitionID + if isVisited, err := g.isGroupVisited(partitionedGroup.blocks, partitionID, g.ringLifecyclerID); err != nil { + level.Warn(g.logger).Log("msg", "unable to check if blocks in partition are visited", "group hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "err", err, "group", partitionedGroup.String()) + continue + } else if isVisited { + level.Info(g.logger).Log("msg", "skipping group because at least one block in partition is visited", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID) + remainingCompactions-- + continue + } + partitionedGroupKey := createGroupKeyWithPartitionID(groupHash, partitionID, *partitionedGroup) + + level.Info(g.logger).Log("msg", "found compactable group for user", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "group", partitionedGroup.String()) + blockVisitMarker := BlockVisitMarker{ + VisitTime: time.Now().Unix(), + CompactorID: g.ringLifecyclerID, + Status: Pending, + PartitionedGroupID: partitionedGroupID, + PartitionID: partitionID, + Version: VisitMarkerVersion1, + } + markBlocksVisited(g.ctx, g.bkt, g.logger, partitionedGroup.blocks, blockVisitMarker, g.blockVisitMarkerWriteFailed) + level.Info(g.logger).Log("msg", "marked blocks visited in group", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "group", partitionedGroup.String()) + + resolution := partitionedGroup.blocks[0].Thanos.Downsample.Resolution + externalLabels := labels.FromMap(partitionedGroup.blocks[0].Thanos.Labels) + thanosGroup, err := compact.NewGroup( + log.With(g.logger, "groupKey", partitionedGroupKey, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "rangeStart", partitionedGroup.rangeStartTime().String(), "rangeEnd", partitionedGroup.rangeEndTime().String(), "externalLabels", externalLabels, "downsampleResolution", resolution), + g.bkt, + partitionedGroupKey, + externalLabels, + resolution, + g.acceptMalformedIndex, + true, // Enable vertical compaction. + g.compactions.WithLabelValues(partitionedGroupKey), + g.compactionRunsStarted.WithLabelValues(partitionedGroupKey), + g.compactionRunsCompleted.WithLabelValues(partitionedGroupKey), + g.compactionFailures.WithLabelValues(partitionedGroupKey), + g.verticalCompactions.WithLabelValues(partitionedGroupKey), + g.garbageCollectedBlocks, + g.blocksMarkedForDeletion, + g.blocksMarkedForNoCompact, + g.hashFunc, + g.blockFilesConcurrency, + g.blocksFetchConcurrency, + ) + if err != nil { + level.Error(g.logger).Log("msg", "failed to create partitioned group", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "blocks", partitionedGroup.partition.Blocks) + } - if len(outGroups) < g.compactionConcurrency { - partitionedGroupKey := createGroupKeyWithPartitionID(groupHash, partitionID, *partitionedGroup) - - level.Info(g.logger).Log("msg", "found compactable group for user", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "group", partitionedGroup.String()) - blockVisitMarker := BlockVisitMarker{ - VisitTime: time.Now().Unix(), - CompactorID: g.ringLifecyclerID, - Status: Pending, - PartitionedGroupID: partitionedGroupID, - PartitionID: partitionID, - Version: VisitMarkerVersion1, - } - markBlocksVisited(g.ctx, g.bkt, g.logger, partitionedGroup.blocks, blockVisitMarker, g.blockVisitMarkerWriteFailed) - - resolution := partitionedGroup.blocks[0].Thanos.Downsample.Resolution - externalLabels := labels.FromMap(partitionedGroup.blocks[0].Thanos.Labels) - thanosGroup, err := compact.NewGroup( - log.With(g.logger, "groupKey", partitionedGroupKey, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "rangeStart", partitionedGroup.rangeStartTime().String(), "rangeEnd", partitionedGroup.rangeEndTime().String(), "externalLabels", externalLabels, "downsampleResolution", resolution), - g.bkt, - partitionedGroupKey, - externalLabels, - resolution, - g.acceptMalformedIndex, - true, // Enable vertical compaction. - g.compactions.WithLabelValues(partitionedGroupKey), - g.compactionRunsStarted.WithLabelValues(partitionedGroupKey), - g.compactionRunsCompleted.WithLabelValues(partitionedGroupKey), - g.compactionFailures.WithLabelValues(partitionedGroupKey), - g.verticalCompactions.WithLabelValues(partitionedGroupKey), - g.garbageCollectedBlocks, - g.blocksMarkedForDeletion, - g.blocksMarkedForNoCompact, - g.hashFunc, - g.blockFilesConcurrency, - g.blocksFetchConcurrency, - ) - if err != nil { - level.Error(g.logger).Log("msg", "failed to create partitioned group", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "blocks", partition.Blocks) - } - - for _, m := range partitionedGroup.blocks { - if err := thanosGroup.AppendMeta(m); err != nil { - level.Error(g.logger).Log("msg", "failed to add block to partitioned group", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "block", m.ULID) - } - } - thanosGroup.SetExtensions(&CortexMetaExtensions{ - PartitionInfo: &PartitionInfo{ - PartitionedGroupID: partitionedGroupID, - PartitionCount: partitionCount, - PartitionID: partitionID, - }, - }) - - outGroups = append(outGroups, thanosGroup) + for _, m := range partitionedGroup.blocks { + if err := thanosGroup.AppendMeta(m); err != nil { + level.Error(g.logger).Log("msg", "failed to add block to partitioned group", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "block", m.ULID) } } + thanosGroup.SetExtensions(&CortexMetaExtensions{ + PartitionInfo: &PartitionInfo{ + PartitionedGroupID: partitionedGroupID, + PartitionCount: partitionCount, + PartitionID: partitionID, + }, + }) + + outGroups = append(outGroups, thanosGroup) + level.Debug(g.logger).Log("msg", "added partition to compaction groups", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount) + if len(outGroups) >= g.compactionConcurrency { + break + } } level.Info(g.logger).Log("msg", fmt.Sprintf("total groups for compaction: %d", len(outGroups))) @@ -321,10 +341,8 @@ func (g *ShuffleShardingGrouper) generatePartitionBlockGroup(group blocksGroup, } updatedPartitionedGroupInfo, err := UpdatePartitionedGroupInfo(g.ctx, g.bkt, g.logger, *partitionedGroupInfo, g.partitionedGroupInfoReadFailed, g.partitionedGroupInfoWriteFailed) if err != nil { - level.Warn(g.logger).Log("msg", "unable to update partitioned group info", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "err", err) return nil, err } - level.Debug(g.logger).Log("msg", "generated partitioned groups", "groups", updatedPartitionedGroupInfo) return updatedPartitionedGroupInfo, nil } @@ -539,10 +557,13 @@ func createBlocksGroup(blocks map[ulid.ULID]*metadata.Meta, blockIDs []ulid.ULID // blocksGroup struct and functions copied and adjusted from https://github.com/cortexproject/cortex/pull/2616 type blocksGroup struct { - rangeStart int64 // Included. - rangeEnd int64 // Excluded. - blocks []*metadata.Meta - key string + rangeStart int64 // Included. + rangeEnd int64 // Excluded. + blocks []*metadata.Meta + key string + groupHash uint32 + partitionedGroupInfo *PartitionedGroupInfo + partition Partition } // overlaps returns whether the group range overlaps with the input group. diff --git a/pkg/compactor/shuffle_sharding_planner.go b/pkg/compactor/shuffle_sharding_planner.go index 9942d43f1a..ca409167bf 100644 --- a/pkg/compactor/shuffle_sharding_planner.go +++ b/pkg/compactor/shuffle_sharding_planner.go @@ -95,7 +95,8 @@ func (p *ShuffleShardingPlanner) PlanWithPartition(_ context.Context, metasByMin return nil, fmt.Errorf("block %s with partition ID %d is in completed status", blockID, partitionID) } if !blockVisitMarker.isVisitedByCompactor(p.blockVisitMarkerTimeout, partitionID, p.ringLifecyclerID) { - level.Warn(p.logger).Log("msg", "block is not visited by current compactor", "block_id", blockID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID) + level.Warn(p.logger).Log("msg", "block is not visited by current compactor", "block_id", blockID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID, + "marker_partitioned_group_id", blockVisitMarker.PartitionedGroupID, "marker_partition_id", blockVisitMarker.PartitionID, "marker_compactor_id", blockVisitMarker.CompactorID, "marker_visit_time", blockVisitMarker.VisitTime) return nil, nil } diff --git a/pkg/util/log/wrappers.go b/pkg/util/log/wrappers.go index 50567f6144..f30103bec3 100644 --- a/pkg/util/log/wrappers.go +++ b/pkg/util/log/wrappers.go @@ -17,6 +17,12 @@ func WithUserID(userID string, l kitlog.Logger) kitlog.Logger { return kitlog.With(l, "org_id", userID) } +// WithExecutionID returns a Logger that has information about the execution id in +// its details. +func WithExecutionID(executionID string, l kitlog.Logger) kitlog.Logger { + return kitlog.With(l, "execution_id", executionID) +} + // WithTraceID returns a Logger that has information about the traceID in // its details. func WithTraceID(traceID string, l kitlog.Logger) kitlog.Logger {