Skip to content

Commit

Permalink
enhance: Add missing delete metrics
Browse files Browse the repository at this point in the history
Add 2 counter metrics:
- Total delete entries from deltalog: milvus_datanode_compaction_delete_count
- Total missing deletes: milvus_datanode_compaction_missing_delete_count

See also: #34665

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Dec 20, 2024
1 parent 90de37e commit 4ec4e2b
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 155 deletions.
53 changes: 16 additions & 37 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -93,7 +92,7 @@ type clusteringCompactionTask struct {
// inner field
collectionID int64
partitionID int64
currentTs typeutil.Timestamp // for TTL
currentTime time.Time // for TTL
isVectorClusteringKey bool
clusteringKeyField *schemapb.FieldSchema
primaryKeyField *schemapb.FieldSchema
Expand Down Expand Up @@ -223,7 +222,7 @@ func (t *clusteringCompactionTask) init() error {

t.primaryKeyField = pkField
t.isVectorClusteringKey = typeutil.IsVectorType(t.clusteringKeyField.DataType)
t.currentTs = tsoutil.GetCurrentTime()
t.currentTime = time.Now()
t.memoryBufferSize = t.getMemoryBufferSize()
workerPoolSize := t.getWorkerPoolSize()
t.mappingPool = conc.NewPool[any](workerPoolSize)
Expand Down Expand Up @@ -563,11 +562,7 @@ func (t *clusteringCompactionTask) mappingSegment(
log.Info("mapping segment start")
processStart := time.Now()
fieldBinlogPaths := make([][]string, 0)
var (
expired int64 = 0
deleted int64 = 0
remained int64 = 0
)
var remained int64 = 0

deltaPaths := make([]string, 0)
for _, d := range segment.GetDeltalogs() {
Expand All @@ -579,17 +574,7 @@ func (t *clusteringCompactionTask) mappingSegment(
if err != nil {
return err
}

isDeletedValue := func(v *storage.Value) bool {
ts, ok := delta[v.PK.GetValue()]
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ok && uint64(v.Timestamp) < ts {
return true
}
return false
}
entityFilter := newEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime)

mappingStats := &clusteringpb.ClusteringCentroidIdMappingStats{}
if t.isVectorClusteringKey {
Expand Down Expand Up @@ -656,15 +641,7 @@ func (t *clusteringCompactionTask) mappingSegment(
v := pkIter.Value()
offset++

// Filtering deleted entity
if isDeletedValue(v) {
deleted++
continue
}
// Filtering expired entity
ts := typeutil.Timestamp(v.Timestamp)
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, ts) {
expired++
if entityFilter.Filtered(v.PK.GetValue(), uint64(v.Timestamp)) {
continue
}

Expand Down Expand Up @@ -753,13 +730,19 @@ func (t *clusteringCompactionTask) mappingSegment(
}
}
}
missing := entityFilter.GetMissingDeleteCount()

log.Info("mapping segment end",
zap.Int64("remained_entities", remained),
zap.Int64("deleted_entities", deleted),
zap.Int64("expired_entities", expired),
zap.Int("deleted_entities", entityFilter.GetDeletedCount()),
zap.Int("expired_entities", entityFilter.GetExpiredCount()),
zap.Int("deltalog deletes", entityFilter.GetDeltalogDeleteCount()),
zap.Int("missing deletes", missing),
zap.Int64("written_row_num", t.writtenRowNum.Load()),
zap.Duration("elapse", time.Since(processStart)))

metrics.DataNodeCompactionDeleteCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Add(float64(entityFilter.GetDeltalogDeleteCount()))
metrics.DataNodeCompactionMissingDeleteCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Add(float64(missing))
return nil
}

Expand Down Expand Up @@ -1175,8 +1158,6 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
var (
timestampTo int64 = -1
timestampFrom int64 = -1
expired int64 = 0
deleted int64 = 0
remained int64 = 0
analyzeResult map[interface{}]int64 = make(map[interface{}]int64, 0)
)
Expand All @@ -1203,6 +1184,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
fieldBinlogPaths = append(fieldBinlogPaths, ps)
}

expiredFilter := newEntityFilter(nil, t.plan.GetCollectionTtl(), t.currentTime)
for _, paths := range fieldBinlogPaths {
allValues, err := t.binlogIO.Download(ctx, paths)
if err != nil {
Expand Down Expand Up @@ -1233,9 +1215,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
v := pkIter.Value()

// Filtering expired entity
ts := typeutil.Timestamp(v.Timestamp)
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, ts) {
expired++
if expiredFilter.Filtered(v.PK.GetValue(), uint64(v.Timestamp)) {
continue
}

Expand Down Expand Up @@ -1264,8 +1244,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(

log.Info("analyze segment end",
zap.Int64("remained entities", remained),
zap.Int64("deleted entities", deleted),
zap.Int64("expired entities", expired),
zap.Int("expired entities", expiredFilter.GetExpiredCount()),
zap.Duration("map elapse", time.Since(processStart)))
return analyzeResult, nil
}
Expand Down
125 changes: 110 additions & 15 deletions internal/datanode/compaction/compactor_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package compaction
import (
"context"
sio "io"
"math"
"strconv"
"time"

Expand All @@ -33,29 +34,123 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/samber/lo"
)

const compactionBatchSize = 100

func isExpiredEntity(ttl int64, now, ts typeutil.Timestamp) bool {
type EntityFilter struct {
deletedPkTs map[interface{}]*tsHit // pk2ts
ttl int64
currentTime time.Time

expiredCount int
deletedCount int
}

func newEntityFilter(deletedPkTs map[interface{}]*tsHit, ttl int64, currTime time.Time) *EntityFilter {
if deletedPkTs == nil {
deletedPkTs = make(map[interface{}]*tsHit)
}
return &EntityFilter{
deletedPkTs: deletedPkTs,
ttl: ttl,
currentTime: currTime,
}
}

func (filter *EntityFilter) Filtered(pk any, ts typeutil.Timestamp) bool {
if filter.isEntityDeleted(pk, ts) {
filter.deletedCount++
return true
}

// Filtering expired entity
if filter.isEntityExpired(ts) {
filter.expiredCount++
return true
}
return false
}

func (filter *EntityFilter) GetExpiredCount() int {
return filter.expiredCount
}

func (filter *EntityFilter) GetDeletedCount() int {
return filter.deletedCount
}

func (filter *EntityFilter) GetDeltalogDeleteCount() int {
return len(filter.deletedPkTs)
}

func (filter *EntityFilter) GetMissingDeleteCount() int {
return len(lo.PickBy(filter.deletedPkTs, func(_ any, v *tsHit) bool {
return !v.hit
}))
}

func (filter *EntityFilter) isEntityDeleted(pk interface{}, ts typeutil.Timestamp) bool {
if pkTsHit, ok := filter.deletedPkTs[pk]; ok {
pkTsHit.hit = true

// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ts < pkTsHit.ts {
return true
}
}
return false
}

// Largest TTL is math.MaxInt64
func (filter *EntityFilter) isEntityExpired(entityTs typeutil.Timestamp) bool {
// entity expire is not enabled if duration <= 0
if ttl <= 0 {
if filter.ttl <= 0 {
return false
}

entityTime, _ := tsoutil.ParseTS(entityTs)

// Unlikely to happen, but avoid following negative duration
if entityTime.After(filter.currentTime) {
return false
}

gotDur := filter.currentTime.Sub(entityTime)
if gotDur == time.Duration(math.MaxInt64) {
// Aviod using time.Duration if duration between
// entityTime and currentTime is larger than max dur(~290years)
if filter.ttl == math.MaxInt64 {
return true
}
return false
}

pts, _ := tsoutil.ParseTS(ts)
pnow, _ := tsoutil.ParseTS(now)
expireTime := pts.Add(time.Duration(ttl))
return expireTime.Before(pnow)
return int64(gotDur.Seconds()) >= filter.ttl
}

type tsHit struct {
ts typeutil.Timestamp
hit bool
}

func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[interface{}]typeutil.Timestamp, error) {
pk2ts := make(map[interface{}]typeutil.Timestamp)
// If pk already exists in pk2ts, record the later one.
func (h *tsHit) update(ts typeutil.Timestamp) {
if ts > h.ts {
h.ts = ts
}
}

func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[interface{}]*tsHit, error) {
pk2TsHit := make(map[interface{}]*tsHit)

log := log.Ctx(ctx)
if len(paths) == 0 {
log.Debug("compact with no deltalogs, skip merge deltalogs")
return pk2ts, nil
return pk2TsHit, nil
}

blobs := make([]*storage.Blob, 0)
Expand Down Expand Up @@ -88,17 +183,17 @@ func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[in
}

dl := reader.Value()
// If pk already exists in pk2ts, record the later one.
if ts, ok := pk2ts[dl.Pk.GetValue()]; ok && ts > dl.Ts {
continue
if _, ok := pk2TsHit[dl.Pk.GetValue()]; !ok {
pk2TsHit[dl.Pk.GetValue()] = &tsHit{ts: dl.Ts}
}
pk2ts[dl.Pk.GetValue()] = dl.Ts
pk2TsHit[dl.Pk.GetValue()].update(dl.Ts)

}

log.Info("compact mergeDeltalogs end",
zap.Int("deleted pk counts", len(pk2ts)))
zap.Int("deleted pk counts", len(pk2TsHit)))

return pk2ts, nil
return pk2TsHit, nil
}

func composePaths(segments []*datapb.CompactionSegmentBinlogs) (
Expand Down
84 changes: 84 additions & 0 deletions internal/datanode/compaction/compactor_common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package compaction

import (
"math"
"testing"
"time"

"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/pkg/util/tsoutil"
)

func TestCompactorCommonTaskSuite(t *testing.T) {
suite.Run(t, new(CompactorCommonSuite))
}

type CompactorCommonSuite struct {
suite.Suite
}

func (s *CompactorCommonSuite) TestEntityFilterByTTL() {
milvusBirthday := getMilvusBirthday()

tests := []struct {
description string
collTTL int64
nowTime time.Time
entityTime time.Time

expect bool
}{
// ttl == maxInt64, no entities should expire
{"ttl=maxInt64, now<entity", math.MaxInt64, milvusBirthday, milvusBirthday.Add(time.Hour), false},
{"ttl=maxInt64, now>entity", math.MaxInt64, milvusBirthday, milvusBirthday.Add(-time.Hour), false},
{"ttl=maxInt64, now==entity", math.MaxInt64, milvusBirthday, milvusBirthday, false},
// ttl == 0, no entities should expire
{"ttl=0, now==entity", 0, milvusBirthday, milvusBirthday, false},
{"ttl=0, now>entity", 0, milvusBirthday, milvusBirthday.Add(-time.Hour), false},
{"ttl=0, now<entity", 0, milvusBirthday, milvusBirthday.Add(time.Hour), false},
// ttl == 10days
{"ttl=10days, nowTs-entityTs>10days", 864000, milvusBirthday.AddDate(0, 0, 11), milvusBirthday, true},
{"ttl=10days, nowTs-entityTs==10days", 864000, milvusBirthday.AddDate(0, 0, 10), milvusBirthday, true},
{"ttl=10days, nowTs-entityTs<10days", 864000, milvusBirthday.AddDate(0, 0, 9), milvusBirthday, false},
// ttl is maxInt64
{"ttl=maxInt64, nowTs-entityTs>1000years", math.MaxInt64, milvusBirthday.AddDate(1000, 0, 11), milvusBirthday, true},
{"ttl=maxInt64, nowTs-entityTs==1000years", math.MaxInt64, milvusBirthday.AddDate(1000, 0, 0), milvusBirthday, true},
{"ttl<maxInt64, nowTs-entityTs==1000years", math.MaxInt64 - 1, milvusBirthday.AddDate(1000, 0, 0), milvusBirthday, false},
{"ttl=maxInt64, nowTs-entityTs==240year", math.MaxInt64, milvusBirthday.AddDate(240, 0, 0), milvusBirthday, false},
{"ttl=maxInt64, nowTs-entityTs==maxDur", math.MaxInt64, milvusBirthday.Add(time.Duration(math.MaxInt64)), milvusBirthday, true},
}
for _, test := range tests {
s.Run(test.description, func() {
filter := newEntityFilter(nil, test.collTTL, test.nowTime)

entityTs := tsoutil.ComposeTSByTime(test.entityTime, 0)
got := filter.Filtered("mockpk", entityTs)
s.Equal(test.expect, got)

if got {
s.Equal(1, filter.GetExpiredCount())
s.Equal(0, filter.GetDeletedCount())
} else {
s.Equal(0, filter.GetExpiredCount())
s.Equal(0, filter.GetDeletedCount())
}
})
}
}
Loading

0 comments on commit 4ec4e2b

Please sign in to comment.