Skip to content

Commit

Permalink
enhance: add commit time in partitionStats proto (#35125)
Browse files Browse the repository at this point in the history
fix: #35110

Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Aug 2, 2024
1 parent 475c333 commit 81773bf
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 19 deletions.
20 changes: 9 additions & 11 deletions internal/datacoord/compaction_policy_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package datacoord
import (
"context"
"fmt"
"sort"
"time"

"github.com/samber/lo"
Expand All @@ -31,7 +30,6 @@ import (
"github.com/milvus-io/milvus/internal/util/clustering"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)

type clusteringCompactionPolicy struct {
Expand Down Expand Up @@ -216,12 +214,8 @@ func calculateClusteringCompactionConfig(coll *collectionInfo, view CompactionVi

func triggerClusteringCompactionPolicy(ctx context.Context, meta *meta, collectionID int64, partitionID int64, channel string, segments []*SegmentInfo) (bool, error) {
log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID))
partitionStatsInfos := meta.partitionStatsMeta.ListPartitionStatsInfos(collectionID, partitionID, channel)
sort.Slice(partitionStatsInfos, func(i, j int) bool {
return partitionStatsInfos[i].Version > partitionStatsInfos[j].Version
})

if len(partitionStatsInfos) == 0 {
currentVersion := meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel)
if currentVersion == 0 {
var newDataSize int64 = 0
for _, seg := range segments {
newDataSize += seg.getSegmentSize()
Expand All @@ -234,9 +228,13 @@ func triggerClusteringCompactionPolicy(ctx context.Context, meta *meta, collecti
return false, nil
}

partitionStats := partitionStatsInfos[0]
version := partitionStats.Version
pTime, _ := tsoutil.ParseTS(uint64(version))
partitionStats := meta.GetPartitionStatsMeta().GetPartitionStats(collectionID, partitionID, channel, currentVersion)
if partitionStats == nil {
log.Info("partition stats not found")
return false, nil
}
timestampSeconds := partitionStats.GetCommitTime()
pTime := time.Unix(timestampSeconds, 0)
if time.Since(pTime) < Params.DataCoordCfg.ClusteringCompactionMinInterval.GetAsDuration(time.Second) {
log.Info("Too short time before last clustering compaction, skip compaction")
return false, nil
Expand Down
130 changes: 124 additions & 6 deletions internal/datacoord/compaction_policy_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ package datacoord
import (
"context"
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"

"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
)

Expand All @@ -33,12 +36,12 @@ func TestClusteringCompactionPolicySuite(t *testing.T) {
type ClusteringCompactionPolicySuite struct {
suite.Suite

mockAlloc *NMockAllocator
mockTriggerManager *MockTriggerManager
testLabel *CompactionGroupLabel
handler *NMockHandler
mockPlanContext *MockCompactionPlanContext

mockAlloc *NMockAllocator
mockTriggerManager *MockTriggerManager
testLabel *CompactionGroupLabel
handler *NMockHandler
mockPlanContext *MockCompactionPlanContext
catalog *mocks.DataCoordCatalog
clusteringCompactionPolicy *clusteringCompactionPolicy
}

Expand All @@ -49,6 +52,11 @@ func (s *ClusteringCompactionPolicySuite) SetupTest() {
Channel: "ch-1",
}

catalog := mocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().SavePartitionStatsInfo(mock.Anything, mock.Anything).Return(nil).Maybe()
catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil).Maybe()
s.catalog = catalog

segments := genSegmentsForMeta(s.testLabel)
meta := &meta{segments: NewSegmentsInfo()}
for id, segment := range segments {
Expand Down Expand Up @@ -184,5 +192,115 @@ func (s *ClusteringCompactionPolicySuite) TestCollectionIsClusteringCompacting()
}

func (s *ClusteringCompactionPolicySuite) TestGetExpectedSegmentSize() {
}

func (s *ClusteringCompactionPolicySuite) TestTimeIntervalLogic() {
ctx := context.TODO()
collectionID := int64(100)
partitionID := int64(101)
channel := "ch1"

tests := []struct {
description string
partitionStats []*datapb.PartitionStatsInfo
currentVersion int64
segments []*SegmentInfo
succeed bool
}{
{"no partition stats and not enough new data", []*datapb.PartitionStatsInfo{}, emptyPartitionStatsVersion, []*SegmentInfo{}, false},
{"no partition stats and enough new data", []*datapb.PartitionStatsInfo{}, emptyPartitionStatsVersion, []*SegmentInfo{
{
size: *atomic.NewInt64(1024 * 1024 * 1024 * 10),
},
}, true},
{"very recent partition stats and enough new data",
[]*datapb.PartitionStatsInfo{
{
CollectionID: collectionID,
PartitionID: partitionID,
VChannel: channel,
CommitTime: time.Now().Unix(),
Version: 100,
},
},
100,
[]*SegmentInfo{
{
size: *atomic.NewInt64(1024 * 1024 * 1024 * 10),
},
}, false},
{"very old partition stats and not enough new data",
[]*datapb.PartitionStatsInfo{
{
CollectionID: collectionID,
PartitionID: partitionID,
VChannel: channel,
CommitTime: time.Unix(1704038400, 0).Unix(),
Version: 100,
},
},
100,
[]*SegmentInfo{
{
size: *atomic.NewInt64(1024),
},
}, true},
{"partition stats and enough new data",
[]*datapb.PartitionStatsInfo{
{
CollectionID: collectionID,
PartitionID: partitionID,
VChannel: channel,
CommitTime: time.Now().Add(-3 * time.Hour).Unix(),
SegmentIDs: []int64{100000},
Version: 100,
},
},
100,
[]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{ID: 9999},
size: *atomic.NewInt64(1024 * 1024 * 1024 * 10),
},
}, true},
{"partition stats and not enough new data",
[]*datapb.PartitionStatsInfo{
{
CollectionID: collectionID,
PartitionID: partitionID,
VChannel: channel,
CommitTime: time.Now().Add(-3 * time.Hour).Unix(),
SegmentIDs: []int64{100000},
Version: 100,
},
},
100,
[]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{ID: 9999},
size: *atomic.NewInt64(1024),
},
}, false},
}

for _, test := range tests {
s.Run(test.description, func() {
partitionStatsMeta, err := newPartitionStatsMeta(ctx, s.catalog)
s.NoError(err)
for _, partitionStats := range test.partitionStats {
partitionStatsMeta.SavePartitionStatsInfo(partitionStats)
}
if test.currentVersion != 0 {
partitionStatsMeta.partitionStatsInfos[channel][partitionID].currentVersion = test.currentVersion
}

meta := &meta{
partitionStatsMeta: partitionStatsMeta,
}

succeed, err := triggerClusteringCompactionPolicy(ctx, meta, collectionID, partitionID, channel, test.segments)
s.NoError(err)
s.Equal(test.succeed, succeed)
})
}
}
1 change: 1 addition & 0 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ func (t *clusteringCompactionTask) completeTask() error {
VChannel: t.GetChannel(),
Version: t.GetPlanID(),
SegmentIDs: t.GetResultSegments(),
CommitTime: time.Now().Unix(),
})
if err != nil {
return merr.WrapErrClusteringCompactionMetaError("SavePartitionStatsInfo", err)
Expand Down
8 changes: 8 additions & 0 deletions internal/datacoord/compaction_task_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,14 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() {
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
}

// fix: https://github.com/milvus-io/milvus/issues/35110
func (s *ClusteringCompactionTaskSuite) TestCompleteTask() {
task := s.generateBasicTask()
task.completeTask()
partitionStats := s.meta.GetPartitionStatsMeta().GetPartitionStats(task.GetCollectionID(), task.GetPartitionID(), task.GetChannel(), task.GetPlanID())
s.True(partitionStats.GetCommitTime() > time.Now().Add(-2*time.Second).Unix())
}

const (
Int64Field = "int64Field"
FloatVecField = "floatVecField"
Expand Down
19 changes: 17 additions & 2 deletions internal/datacoord/partition_stats_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/timerecord"
)

const emptyPartitionStatsVersion = int64(0)

type partitionStatsMeta struct {
sync.RWMutex
ctx context.Context
Expand Down Expand Up @@ -180,10 +182,23 @@ func (psm *partitionStatsMeta) GetCurrentPartitionStatsVersion(collectionID, par
defer psm.RUnlock()

if _, ok := psm.partitionStatsInfos[vChannel]; !ok {
return 0
return emptyPartitionStatsVersion
}
if _, ok := psm.partitionStatsInfos[vChannel][partitionID]; !ok {
return 0
return emptyPartitionStatsVersion
}
return psm.partitionStatsInfos[vChannel][partitionID].currentVersion
}

func (psm *partitionStatsMeta) GetPartitionStats(collectionID, partitionID int64, vChannel string, version int64) *datapb.PartitionStatsInfo {
psm.RLock()
defer psm.RUnlock()

if _, ok := psm.partitionStatsInfos[vChannel]; !ok {
return nil
}
if _, ok := psm.partitionStatsInfos[vChannel][partitionID]; !ok {
return nil
}
return psm.partitionStatsInfos[vChannel][partitionID].infos[version]
}
89 changes: 89 additions & 0 deletions internal/datacoord/partition_stats_meta_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datacoord

import (
"context"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
)

type PartitionStatsMetaSuite struct {
suite.Suite

catalog *mocks.DataCoordCatalog
meta *partitionStatsMeta
}

func TestPartitionStatsMetaSuite(t *testing.T) {
suite.Run(t, new(PartitionStatsMetaSuite))
}

func (s *PartitionStatsMetaSuite) SetupTest() {
catalog := mocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().SavePartitionStatsInfo(mock.Anything, mock.Anything).Return(nil).Maybe()
catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil).Maybe()
s.catalog = catalog
}

func (s *PartitionStatsMetaSuite) TestGetPartitionStats() {
ctx := context.Background()
partitionStatsMeta, err := newPartitionStatsMeta(ctx, s.catalog)
s.NoError(err)
partitionStats := []*datapb.PartitionStatsInfo{
{
CollectionID: 1,
PartitionID: 2,
VChannel: "ch-1",
SegmentIDs: []int64{100000},
Version: 100,
},
}
for _, partitionStats := range partitionStats {
partitionStatsMeta.SavePartitionStatsInfo(partitionStats)
}

ps1 := partitionStatsMeta.GetPartitionStats(1, 2, "ch-2", 100)
s.Nil(ps1)

ps2 := partitionStatsMeta.GetPartitionStats(1, 3, "ch-1", 100)
s.Nil(ps2)

ps3 := partitionStatsMeta.GetPartitionStats(1, 2, "ch-1", 101)
s.Nil(ps3)

ps := partitionStatsMeta.GetPartitionStats(1, 2, "ch-1", 100)
s.NotNil(ps)

currentVersion := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1")
s.Equal(emptyPartitionStatsVersion, currentVersion)

currentVersion2 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-2")
s.Equal(emptyPartitionStatsVersion, currentVersion2)

currentVersion3 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 3, "ch-1")
s.Equal(emptyPartitionStatsVersion, currentVersion3)

partitionStatsMeta.partitionStatsInfos["ch-1"][2].currentVersion = 100
currentVersion4 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1")
s.Equal(int64(100), currentVersion4)
}
1 change: 1 addition & 0 deletions internal/proto/data_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,7 @@ message PartitionStatsInfo {
int64 version = 4;
repeated int64 segmentIDs = 5;
int64 analyzeTaskID = 6;
int64 commitTime = 7;
}

message DropCompactionPlanRequest {
Expand Down

0 comments on commit 81773bf

Please sign in to comment.