diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index cd2620692aa2b..147c647f340db 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -56,7 +56,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" - "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -93,7 +92,7 @@ type clusteringCompactionTask struct { // inner field collectionID int64 partitionID int64 - currentTs typeutil.Timestamp // for TTL + currentTime time.Time // for TTL isVectorClusteringKey bool clusteringKeyField *schemapb.FieldSchema primaryKeyField *schemapb.FieldSchema @@ -223,7 +222,7 @@ func (t *clusteringCompactionTask) init() error { t.primaryKeyField = pkField t.isVectorClusteringKey = typeutil.IsVectorType(t.clusteringKeyField.DataType) - t.currentTs = tsoutil.GetCurrentTime() + t.currentTime = time.Now() t.memoryBufferSize = t.getMemoryBufferSize() workerPoolSize := t.getWorkerPoolSize() t.mappingPool = conc.NewPool[any](workerPoolSize) @@ -563,11 +562,7 @@ func (t *clusteringCompactionTask) mappingSegment( log.Info("mapping segment start") processStart := time.Now() fieldBinlogPaths := make([][]string, 0) - var ( - expired int64 = 0 - deleted int64 = 0 - remained int64 = 0 - ) + var remained int64 = 0 deltaPaths := make([]string, 0) for _, d := range segment.GetDeltalogs() { @@ -579,17 +574,7 @@ func (t *clusteringCompactionTask) mappingSegment( if err != nil { return err } - - isDeletedValue := func(v *storage.Value) bool { - ts, ok := delta[v.PK.GetValue()] - // insert task and delete task has the same ts when upsert - // here should be < instead of <= - // to avoid the upsert data to be deleted after compact - if ok && uint64(v.Timestamp) < ts { - return true - } - return false - } + entityFilter := newEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime) mappingStats := &clusteringpb.ClusteringCentroidIdMappingStats{} if t.isVectorClusteringKey { @@ -656,15 +641,7 @@ func (t *clusteringCompactionTask) mappingSegment( v := pkIter.Value() offset++ - // Filtering deleted entity - if isDeletedValue(v) { - deleted++ - continue - } - // Filtering expired entity - ts := typeutil.Timestamp(v.Timestamp) - if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, ts) { - expired++ + if entityFilter.Filtered(v.PK.GetValue(), uint64(v.Timestamp)) { continue } @@ -753,13 +730,19 @@ func (t *clusteringCompactionTask) mappingSegment( } } } + missing := entityFilter.GetMissingDeleteCount() log.Info("mapping segment end", zap.Int64("remained_entities", remained), - zap.Int64("deleted_entities", deleted), - zap.Int64("expired_entities", expired), + zap.Int("deleted_entities", entityFilter.GetDeletedCount()), + zap.Int("expired_entities", entityFilter.GetExpiredCount()), + zap.Int("deltalog deletes", entityFilter.GetDeltalogDeleteCount()), + zap.Int("missing deletes", missing), zap.Int64("written_row_num", t.writtenRowNum.Load()), zap.Duration("elapse", time.Since(processStart))) + + metrics.DataNodeCompactionDeleteCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Add(float64(entityFilter.GetDeltalogDeleteCount())) + metrics.DataNodeCompactionMissingDeleteCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Add(float64(missing)) return nil } @@ -1175,8 +1158,6 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( var ( timestampTo int64 = -1 timestampFrom int64 = -1 - expired int64 = 0 - deleted int64 = 0 remained int64 = 0 analyzeResult map[interface{}]int64 = make(map[interface{}]int64, 0) ) @@ -1203,6 +1184,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( fieldBinlogPaths = append(fieldBinlogPaths, ps) } + expiredFilter := newEntityFilter(nil, t.plan.GetCollectionTtl(), t.currentTime) for _, paths := range fieldBinlogPaths { allValues, err := t.binlogIO.Download(ctx, paths) if err != nil { @@ -1233,9 +1215,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( v := pkIter.Value() // Filtering expired entity - ts := typeutil.Timestamp(v.Timestamp) - if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, ts) { - expired++ + if expiredFilter.Filtered(v.PK.GetValue(), uint64(v.Timestamp)) { continue } @@ -1264,8 +1244,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( log.Info("analyze segment end", zap.Int64("remained entities", remained), - zap.Int64("deleted entities", deleted), - zap.Int64("expired entities", expired), + zap.Int("expired entities", expiredFilter.GetExpiredCount()), zap.Duration("map elapse", time.Since(processStart))) return analyzeResult, nil } diff --git a/internal/datanode/compaction/compactor_common.go b/internal/datanode/compaction/compactor_common.go index 8fd26ebf6815a..d577511c05664 100644 --- a/internal/datanode/compaction/compactor_common.go +++ b/internal/datanode/compaction/compactor_common.go @@ -19,6 +19,7 @@ package compaction import ( "context" sio "io" + "math" "strconv" "time" @@ -33,29 +34,123 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" + "github.com/samber/lo" ) const compactionBatchSize = 100 -func isExpiredEntity(ttl int64, now, ts typeutil.Timestamp) bool { +type EntityFilter struct { + deletedPkTs map[interface{}]*tsHit // pk2ts + ttl int64 + currentTime time.Time + + expiredCount int + deletedCount int +} + +func newEntityFilter(deletedPkTs map[interface{}]*tsHit, ttl int64, currTime time.Time) *EntityFilter { + if deletedPkTs == nil { + deletedPkTs = make(map[interface{}]*tsHit) + } + return &EntityFilter{ + deletedPkTs: deletedPkTs, + ttl: ttl, + currentTime: currTime, + } +} + +func (filter *EntityFilter) Filtered(pk any, ts typeutil.Timestamp) bool { + if filter.isEntityDeleted(pk, ts) { + filter.deletedCount++ + return true + } + + // Filtering expired entity + if filter.isEntityExpired(ts) { + filter.expiredCount++ + return true + } + return false +} + +func (filter *EntityFilter) GetExpiredCount() int { + return filter.expiredCount +} + +func (filter *EntityFilter) GetDeletedCount() int { + return filter.deletedCount +} + +func (filter *EntityFilter) GetDeltalogDeleteCount() int { + return len(filter.deletedPkTs) +} + +func (filter *EntityFilter) GetMissingDeleteCount() int { + return len(lo.PickBy(filter.deletedPkTs, func(_ any, v *tsHit) bool { + return !v.hit + })) +} + +func (filter *EntityFilter) isEntityDeleted(pk interface{}, ts typeutil.Timestamp) bool { + if pkTsHit, ok := filter.deletedPkTs[pk]; ok { + pkTsHit.hit = true + + // insert task and delete task has the same ts when upsert + // here should be < instead of <= + // to avoid the upsert data to be deleted after compact + if ts < pkTsHit.ts { + return true + } + } + return false +} + +// Largest TTL is math.MaxInt64 +func (filter *EntityFilter) isEntityExpired(entityTs typeutil.Timestamp) bool { // entity expire is not enabled if duration <= 0 - if ttl <= 0 { + if filter.ttl <= 0 { + return false + } + + entityTime, _ := tsoutil.ParseTS(entityTs) + + // Unlikely to happen, but avoid following negative duration + if entityTime.After(filter.currentTime) { + return false + } + + gotDur := filter.currentTime.Sub(entityTime) + if gotDur == time.Duration(math.MaxInt64) { + // Aviod using time.Duration if duration between + // entityTime and currentTime is larger than max dur(~290years) + if filter.ttl == math.MaxInt64 { + return true + } return false } - pts, _ := tsoutil.ParseTS(ts) - pnow, _ := tsoutil.ParseTS(now) - expireTime := pts.Add(time.Duration(ttl)) - return expireTime.Before(pnow) + return int64(gotDur.Seconds()) >= filter.ttl +} + +type tsHit struct { + ts typeutil.Timestamp + hit bool } -func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[interface{}]typeutil.Timestamp, error) { - pk2ts := make(map[interface{}]typeutil.Timestamp) +// If pk already exists in pk2ts, record the later one. +func (h *tsHit) update(ts typeutil.Timestamp) { + if ts > h.ts { + h.ts = ts + } +} + +func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[interface{}]*tsHit, error) { + pk2TsHit := make(map[interface{}]*tsHit) log := log.Ctx(ctx) if len(paths) == 0 { log.Debug("compact with no deltalogs, skip merge deltalogs") - return pk2ts, nil + return pk2TsHit, nil } blobs := make([]*storage.Blob, 0) @@ -88,17 +183,17 @@ func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[in } dl := reader.Value() - // If pk already exists in pk2ts, record the later one. - if ts, ok := pk2ts[dl.Pk.GetValue()]; ok && ts > dl.Ts { - continue + if _, ok := pk2TsHit[dl.Pk.GetValue()]; !ok { + pk2TsHit[dl.Pk.GetValue()] = &tsHit{ts: dl.Ts} } - pk2ts[dl.Pk.GetValue()] = dl.Ts + pk2TsHit[dl.Pk.GetValue()].update(dl.Ts) + } log.Info("compact mergeDeltalogs end", - zap.Int("deleted pk counts", len(pk2ts))) + zap.Int("deleted pk counts", len(pk2TsHit))) - return pk2ts, nil + return pk2TsHit, nil } func composePaths(segments []*datapb.CompactionSegmentBinlogs) ( diff --git a/internal/datanode/compaction/compactor_common_test.go b/internal/datanode/compaction/compactor_common_test.go new file mode 100644 index 0000000000000..6a1016ae07e11 --- /dev/null +++ b/internal/datanode/compaction/compactor_common_test.go @@ -0,0 +1,84 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compaction + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +func TestCompactorCommonTaskSuite(t *testing.T) { + suite.Run(t, new(CompactorCommonSuite)) +} + +type CompactorCommonSuite struct { + suite.Suite +} + +func (s *CompactorCommonSuite) TestEntityFilterByTTL() { + milvusBirthday := getMilvusBirthday() + + tests := []struct { + description string + collTTL int64 + nowTime time.Time + entityTime time.Time + + expect bool + }{ + // ttl == maxInt64, no entities should expire + {"ttl=maxInt64, nowentity", math.MaxInt64, milvusBirthday, milvusBirthday.Add(-time.Hour), false}, + {"ttl=maxInt64, now==entity", math.MaxInt64, milvusBirthday, milvusBirthday, false}, + // ttl == 0, no entities should expire + {"ttl=0, now==entity", 0, milvusBirthday, milvusBirthday, false}, + {"ttl=0, now>entity", 0, milvusBirthday, milvusBirthday.Add(-time.Hour), false}, + {"ttl=0, now10days", 864000, milvusBirthday.AddDate(0, 0, 11), milvusBirthday, true}, + {"ttl=10days, nowTs-entityTs==10days", 864000, milvusBirthday.AddDate(0, 0, 10), milvusBirthday, true}, + {"ttl=10days, nowTs-entityTs<10days", 864000, milvusBirthday.AddDate(0, 0, 9), milvusBirthday, false}, + // ttl is maxInt64 + {"ttl=maxInt64, nowTs-entityTs>1000years", math.MaxInt64, milvusBirthday.AddDate(1000, 0, 11), milvusBirthday, true}, + {"ttl=maxInt64, nowTs-entityTs==1000years", math.MaxInt64, milvusBirthday.AddDate(1000, 0, 0), milvusBirthday, true}, + {"ttlttl v2", math.MaxInt64, math.MaxInt64, milvusBirthdayTs, true}, - // entityTs==currTs will never happen - // {"ttl=maxInt64, curTs-entityTs=0", math.MaxInt64, milvusBirthdayTs, milvusBirthdayTs, true}, - {"ttl=0, nowTs>entityTs", 0, milvusBirthdayTs + 1, milvusBirthdayTs, false}, - {"ttl=0, nowTs==entityTs", 0, milvusBirthdayTs, milvusBirthdayTs, false}, - {"ttl=0, nowTs10days", 864000, milvusBirthdayTs + 864001, milvusBirthdayTs, true}, - {"ttl=10days, nowTs-entityTs==10days", 864000, milvusBirthdayTs + 864000, milvusBirthdayTs, true}, - {"ttl=10days, nowTs-entityTs<10days", 864000, milvusBirthdayTs + 10, milvusBirthdayTs, false}, - } - for _, test := range tests { - s.Run(test.description, func() { - t := &mixCompactionTask{ - plan: &datapb.CompactionPlan{ - CollectionTtl: test.collTTL, - }, - currentTs: test.nowTs, - } - got := isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, test.entityTs) - s.Equal(test.expect, got) - }) - } -} - func getRow(magic int64) map[int64]interface{} { ts := tsoutil.ComposeTSByTime(getMilvusBirthday(), 0) return map[int64]interface{}{ diff --git a/pkg/metrics/datanode_metrics.go b/pkg/metrics/datanode_metrics.go index ff982a4bbf50a..761c617e983ac 100644 --- a/pkg/metrics/datanode_metrics.go +++ b/pkg/metrics/datanode_metrics.go @@ -237,6 +237,22 @@ var ( nodeIDLabelName, channelNameLabelName, }) + + DataNodeCompactionDeleteCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataNodeRole, + Name: "compaction_delete_count", + Help: "Number of delete entries in compaction", + }, []string{nodeIDLabelName}) + + DataNodeCompactionMissingDeleteCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataNodeRole, + Name: "compaction_missing_delete_count", + Help: "Number of missing deletes in compaction", + }, []string{nodeIDLabelName}) ) // RegisterDataNode registers DataNode metrics @@ -261,6 +277,8 @@ func RegisterDataNode(registry *prometheus.Registry) { // compaction related registry.MustRegister(DataNodeCompactionLatency) registry.MustRegister(DataNodeCompactionLatencyInQueue) + registry.MustRegister(DataNodeCompactionDeleteCount) + registry.MustRegister(DataNodeCompactionMissingDeleteCount) // deprecated metrics registry.MustRegister(DataNodeForwardDeleteMsgTimeTaken) registry.MustRegister(DataNodeNumProducers)