Skip to content

Commit

Permalink
enhance: Add missing delete metrics
Browse files Browse the repository at this point in the history
Add 2 counter metrics:
- Total delete entries from deltalog: milvus_datanode_compaction_delete_count
- Total missing deletes: milvus_datanode_compaction_missing_delete_count

See also: #34665

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Dec 23, 2024
1 parent 82d85e8 commit 36ddc05
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 160 deletions.
53 changes: 16 additions & 37 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -93,7 +92,7 @@ type clusteringCompactionTask struct {
// inner field
collectionID int64
partitionID int64
currentTs typeutil.Timestamp // for TTL
currentTime time.Time // for TTL
isVectorClusteringKey bool
clusteringKeyField *schemapb.FieldSchema
primaryKeyField *schemapb.FieldSchema
Expand Down Expand Up @@ -223,7 +222,7 @@ func (t *clusteringCompactionTask) init() error {

t.primaryKeyField = pkField
t.isVectorClusteringKey = typeutil.IsVectorType(t.clusteringKeyField.DataType)
t.currentTs = tsoutil.GetCurrentTime()
t.currentTime = time.Now()
t.memoryBufferSize = t.getMemoryBufferSize()
workerPoolSize := t.getWorkerPoolSize()
t.mappingPool = conc.NewPool[any](workerPoolSize)
Expand Down Expand Up @@ -563,11 +562,7 @@ func (t *clusteringCompactionTask) mappingSegment(
log.Info("mapping segment start")
processStart := time.Now()
fieldBinlogPaths := make([][]string, 0)
var (
expired int64 = 0
deleted int64 = 0
remained int64 = 0
)
var remained int64 = 0

deltaPaths := make([]string, 0)
for _, d := range segment.GetDeltalogs() {
Expand All @@ -579,17 +574,7 @@ func (t *clusteringCompactionTask) mappingSegment(
if err != nil {
return err
}

isDeletedValue := func(v *storage.Value) bool {
ts, ok := delta[v.PK.GetValue()]
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ok && uint64(v.Timestamp) < ts {
return true
}
return false
}
entityFilter := newEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime)

mappingStats := &clusteringpb.ClusteringCentroidIdMappingStats{}
if t.isVectorClusteringKey {
Expand Down Expand Up @@ -656,15 +641,7 @@ func (t *clusteringCompactionTask) mappingSegment(
v := pkIter.Value()
offset++

// Filtering deleted entity
if isDeletedValue(v) {
deleted++
continue
}
// Filtering expired entity
ts := typeutil.Timestamp(v.Timestamp)
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, ts) {
expired++
if entityFilter.Filtered(v.PK.GetValue(), uint64(v.Timestamp)) {
continue
}

Expand Down Expand Up @@ -753,13 +730,19 @@ func (t *clusteringCompactionTask) mappingSegment(
}
}
}
missing := entityFilter.GetMissingDeleteCount()

log.Info("mapping segment end",
zap.Int64("remained_entities", remained),
zap.Int64("deleted_entities", deleted),
zap.Int64("expired_entities", expired),
zap.Int("deleted_entities", entityFilter.GetDeletedCount()),
zap.Int("expired_entities", entityFilter.GetExpiredCount()),
zap.Int("deltalog deletes", entityFilter.GetDeltalogDeleteCount()),
zap.Int("missing deletes", missing),
zap.Int64("written_row_num", t.writtenRowNum.Load()),
zap.Duration("elapse", time.Since(processStart)))

metrics.DataNodeCompactionDeleteCount.WithLabelValues(fmt.Sprint(t.collectionID)).Add(float64(entityFilter.GetDeltalogDeleteCount()))
metrics.DataNodeCompactionMissingDeleteCount.WithLabelValues(fmt.Sprint(t.collectionID)).Add(float64(missing))
return nil
}

Expand Down Expand Up @@ -1175,8 +1158,6 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
var (
timestampTo int64 = -1
timestampFrom int64 = -1
expired int64 = 0
deleted int64 = 0
remained int64 = 0
analyzeResult map[interface{}]int64 = make(map[interface{}]int64, 0)
)
Expand All @@ -1203,6 +1184,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
fieldBinlogPaths = append(fieldBinlogPaths, ps)
}

expiredFilter := newEntityFilter(nil, t.plan.GetCollectionTtl(), t.currentTime)
for _, paths := range fieldBinlogPaths {
allValues, err := t.binlogIO.Download(ctx, paths)
if err != nil {
Expand Down Expand Up @@ -1233,9 +1215,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
v := pkIter.Value()

// Filtering expired entity
ts := typeutil.Timestamp(v.Timestamp)
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, ts) {
expired++
if expiredFilter.Filtered(v.PK.GetValue(), uint64(v.Timestamp)) {
continue
}

Expand Down Expand Up @@ -1264,8 +1244,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(

log.Info("analyze segment end",
zap.Int64("remained entities", remained),
zap.Int64("deleted entities", deleted),
zap.Int64("expired entities", expired),
zap.Int("expired entities", expiredFilter.GetExpiredCount()),
zap.Duration("map elapse", time.Since(processStart)))
return analyzeResult, nil
}
Expand Down
91 changes: 77 additions & 14 deletions internal/datanode/compaction/compactor_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,90 @@ import (

const compactionBatchSize = 100

func isExpiredEntity(ttl int64, now, ts typeutil.Timestamp) bool {
type EntityFilter struct {
deletedPkTs map[interface{}]typeutil.Timestamp // pk2ts
ttl int64 // nanoseconds
currentTime time.Time

expiredCount int
deletedCount int
}

func newEntityFilter(deletedPkTs map[interface{}]typeutil.Timestamp, ttl int64, currTime time.Time) *EntityFilter {
if deletedPkTs == nil {
deletedPkTs = make(map[interface{}]typeutil.Timestamp)
}
return &EntityFilter{
deletedPkTs: deletedPkTs,
ttl: ttl,
currentTime: currTime,
}
}

func (filter *EntityFilter) Filtered(pk any, ts typeutil.Timestamp) bool {
if filter.isEntityDeleted(pk, ts) {
filter.deletedCount++
return true
}

// Filtering expired entity
if filter.isEntityExpired(ts) {
filter.expiredCount++
return true
}
return false
}

func (filter *EntityFilter) GetExpiredCount() int {
return filter.expiredCount
}

func (filter *EntityFilter) GetDeletedCount() int {
return filter.deletedCount
}

func (filter *EntityFilter) GetDeltalogDeleteCount() int {
return len(filter.deletedPkTs)
}

func (filter *EntityFilter) GetMissingDeleteCount() int {
return filter.GetDeltalogDeleteCount() - filter.GetDeletedCount()
}

func (filter *EntityFilter) isEntityDeleted(pk interface{}, pkTs typeutil.Timestamp) bool {
if deleteTs, ok := filter.deletedPkTs[pk]; ok {
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if pkTs < deleteTs {
return true
}
}
return false
}

func (filter *EntityFilter) isEntityExpired(entityTs typeutil.Timestamp) bool {
// entity expire is not enabled if duration <= 0
if ttl <= 0 {
if filter.ttl <= 0 {
return false
}
entityTime, _ := tsoutil.ParseTS(entityTs)

// this dur can represents 292 million years before or after 1970, enough for milvus
// ttl calculation
dur := filter.currentTime.UnixMilli() - entityTime.UnixMilli()

pts, _ := tsoutil.ParseTS(ts)
pnow, _ := tsoutil.ParseTS(now)
expireTime := pts.Add(time.Duration(ttl))
return expireTime.Before(pnow)
// filter.ttl is nanoseconds
return filter.ttl/int64(time.Millisecond) <= dur
}

func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[interface{}]typeutil.Timestamp, error) {
pk2ts := make(map[interface{}]typeutil.Timestamp)
pk2Ts := make(map[interface{}]typeutil.Timestamp)

log := log.Ctx(ctx)
if len(paths) == 0 {
log.Debug("compact with no deltalogs, skip merge deltalogs")
return pk2ts, nil
return pk2Ts, nil
}

blobs := make([]*storage.Blob, 0)
Expand Down Expand Up @@ -88,17 +153,15 @@ func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[in
}

dl := reader.Value()
// If pk already exists in pk2ts, record the later one.
if ts, ok := pk2ts[dl.Pk.GetValue()]; ok && ts > dl.Ts {
if ts, ok := pk2Ts[dl.Pk.GetValue()]; ok && ts > dl.Ts {
continue
}
pk2ts[dl.Pk.GetValue()] = dl.Ts
pk2Ts[dl.Pk.GetValue()] = dl.Ts
}

log.Info("compact mergeDeltalogs end",
zap.Int("deleted pk counts", len(pk2ts)))
log.Info("compact mergeDeltalogs end", zap.Int("delete entries counts", len(pk2Ts)))

return pk2ts, nil
return pk2Ts, nil
}

func composePaths(segments []*datapb.CompactionSegmentBinlogs) (
Expand Down
84 changes: 84 additions & 0 deletions internal/datanode/compaction/compactor_common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package compaction

import (
"math"
"testing"
"time"

"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/pkg/util/tsoutil"
)

func TestCompactorCommonTaskSuite(t *testing.T) {
suite.Run(t, new(CompactorCommonSuite))
}

type CompactorCommonSuite struct {
suite.Suite
}

func (s *CompactorCommonSuite) TestEntityFilterByTTL() {
milvusBirthday := getMilvusBirthday()

tests := []struct {
description string
collTTL int64
nowTime time.Time
entityTime time.Time

expect bool
}{
// ttl == maxInt64, dur is 1hour, no entities should expire
{"ttl=maxInt64, now<entity", math.MaxInt64, milvusBirthday, milvusBirthday.Add(time.Hour), false},
{"ttl=maxInt64, now>entity", math.MaxInt64, milvusBirthday, milvusBirthday.Add(-time.Hour), false},
{"ttl=maxInt64, now==entity", math.MaxInt64, milvusBirthday, milvusBirthday, false},
// ttl == 0, no entities should expire
{"ttl=0, now==entity", 0, milvusBirthday, milvusBirthday, false},
{"ttl=0, now>entity", 0, milvusBirthday, milvusBirthday.Add(-time.Hour), false},
{"ttl=0, now<entity", 0, milvusBirthday, milvusBirthday.Add(time.Hour), false},
// ttl == 10days
{"ttl=10days, nowTs-entityTs>10days", 864000000000000, milvusBirthday.AddDate(0, 0, 11), milvusBirthday, true},
{"ttl=10days, nowTs-entityTs==10days", 864000000000000, milvusBirthday.AddDate(0, 0, 10), milvusBirthday, true},
{"ttl=10days, nowTs-entityTs<10days", 864000000000000, milvusBirthday.AddDate(0, 0, 9), milvusBirthday, false},
// ttl is maxInt64
{"ttl=maxInt64, nowTs-entityTs>1000years", math.MaxInt64, milvusBirthday.AddDate(1000, 0, 11), milvusBirthday, true},
{"ttl=maxInt64, nowTs-entityTs==1000years", math.MaxInt64, milvusBirthday.AddDate(1000, 0, 0), milvusBirthday, true},
{"ttl=maxInt64, nowTs-entityTs==240year", math.MaxInt64, milvusBirthday.AddDate(240, 0, 0), milvusBirthday, false},
{"ttl=maxInt64, nowTs-entityTs==maxDur", math.MaxInt64, milvusBirthday.Add(time.Duration(math.MaxInt64)), milvusBirthday, true},
{"ttl<maxInt64, nowTs-entityTs==1000years", math.MaxInt64 - 1, milvusBirthday.AddDate(1000, 0, 0), milvusBirthday, true},
}
for _, test := range tests {
s.Run(test.description, func() {
filter := newEntityFilter(nil, test.collTTL, test.nowTime)

entityTs := tsoutil.ComposeTSByTime(test.entityTime, 0)
got := filter.Filtered("mockpk", entityTs)
s.Equal(test.expect, got)

if got {
s.Equal(1, filter.GetExpiredCount())
s.Equal(0, filter.GetDeletedCount())
} else {
s.Equal(0, filter.GetExpiredCount())
s.Equal(0, filter.GetDeletedCount())
}
})
}
}
Loading

0 comments on commit 36ddc05

Please sign in to comment.