diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index d6a5b88a25f6a..0efdbf5fecadf 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -485,6 +485,12 @@ func (t *clusteringCompactionTask) completeTask() error { return err } + err = t.meta.GetPartitionStatsMeta().SaveCurrentPartitionStatsVersion(t.GetTaskProto().GetCollectionID(), + t.GetTaskProto().GetPartitionID(), t.GetTaskProto().GetChannel(), t.GetTaskProto().GetPlanID()) + if err != nil { + return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err) + } + return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) } diff --git a/internal/datacoord/partition_stats_meta.go b/internal/datacoord/partition_stats_meta.go index 27009992cecd8..f379afe67ff22 100644 --- a/internal/datacoord/partition_stats_meta.go +++ b/internal/datacoord/partition_stats_meta.go @@ -121,8 +121,6 @@ func (psm *partitionStatsMeta) SavePartitionStatsInfo(info *datapb.PartitionStat } psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].infos[info.GetVersion()] = info - // after v2.5.0, the current version will be updated when updating the partition stats info, so there’s no need to split it into two steps - psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].currentVersion = info.Version return nil } diff --git a/internal/datacoord/partition_stats_meta_test.go b/internal/datacoord/partition_stats_meta_test.go index 2a124a6471c65..0c67f2d4424b9 100644 --- a/internal/datacoord/partition_stats_meta_test.go +++ b/internal/datacoord/partition_stats_meta_test.go @@ -42,6 +42,7 @@ func (s *PartitionStatsMetaSuite) SetupTest() { catalog := mocks.NewDataCoordCatalog(s.T()) catalog.EXPECT().SavePartitionStatsInfo(mock.Anything, mock.Anything).Return(nil).Maybe() catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil).Maybe() + catalog.EXPECT().SaveCurrentPartitionStatsVersion(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() s.catalog = catalog } @@ -74,6 +75,9 @@ func (s *PartitionStatsMetaSuite) TestGetPartitionStats() { ps := partitionStatsMeta.GetPartitionStats(1, 2, "ch-1", 100) s.NotNil(ps) + err = partitionStatsMeta.SaveCurrentPartitionStatsVersion(1, 2, "ch-1", 100) + s.NoError(err) + currentVersion := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1") s.Equal(int64(100), currentVersion) diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index a512cf67beff5..4f6fbe875876d 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -23,6 +23,7 @@ import ( "strconv" "strings" + "github.com/cockroachdb/errors" "go.uber.org/zap" "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" @@ -43,6 +44,7 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -930,6 +932,9 @@ func (kc *Catalog) GetCurrentPartitionStatsVersion(ctx context.Context, collID, key := buildCurrentPartitionStatsVersionPath(collID, partID, vChannel) valueStr, err := kc.MetaKv.Load(key) if err != nil { + if errors.Is(err, merr.ErrIoKeyNotFound) { + return 0, nil + } return 0, err }