Skip to content

Commit

Permalink
fix conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper committed Dec 4, 2024
2 parents fd44d61 + 319f549 commit 571abc1
Show file tree
Hide file tree
Showing 81 changed files with 2,354 additions and 887 deletions.
8 changes: 4 additions & 4 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ queryCoord:
channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode
collectionObserverInterval: 200 # the interval of collection observer
checkExecutedFlagInterval: 100 # the interval of check executed flag to force to pull dist
updateCollectionLoadStatusInterval: 5 # 5m, max interval for updating collection loaded status
updateCollectionLoadStatusInterval: 300 # 300s, max interval of updating collection loaded status for check health
cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds
ip: # TCP/IP address of queryCoord. If not specified, use the first unicastable address
port: 19531 # TCP port of queryCoord
Expand Down Expand Up @@ -806,7 +806,7 @@ common:
readwrite:
privileges: ListDatabases,SelectOwnership,SelectUser,DescribeResourceGroup,ListResourceGroups,FlushAll,TransferNode,TransferReplica,UpdateResourceGroups # Cluster level readwrite privileges
admin:
privileges: ListDatabases,SelectOwnership,SelectUser,DescribeResourceGroup,ListResourceGroups,FlushAll,TransferNode,TransferReplica,UpdateResourceGroups,BackupRBAC,RestoreRBAC,CreateDatabase,DropDatabase,CreateOwnership,DropOwnership,ManageOwnership,CreateResourceGroup,DropResourceGroup,UpdateUser # Cluster level admin privileges
privileges: ListDatabases,SelectOwnership,SelectUser,DescribeResourceGroup,ListResourceGroups,FlushAll,TransferNode,TransferReplica,UpdateResourceGroups,BackupRBAC,RestoreRBAC,CreateDatabase,DropDatabase,CreateOwnership,DropOwnership,ManageOwnership,CreateResourceGroup,DropResourceGroup,UpdateUser,RenameCollection # Cluster level admin privileges
database:
readonly:
privileges: ShowCollections,DescribeDatabase # Database level readonly privileges
Expand All @@ -818,9 +818,9 @@ common:
readonly:
privileges: Query,Search,IndexDetail,GetFlushState,GetLoadState,GetLoadingProgress,HasPartition,ShowPartitions,DescribeCollection,DescribeAlias,GetStatistics,ListAliases # Collection level readonly privileges
readwrite:
privileges: Query,Search,IndexDetail,GetFlushState,GetLoadState,GetLoadingProgress,HasPartition,ShowPartitions,DescribeCollection,DescribeAlias,GetStatistics,ListAliases,Load,Release,Insert,Delete,Upsert,Import,Flush,Compaction,LoadBalance,RenameCollection,CreateIndex,DropIndex,CreatePartition,DropPartition # Collection level readwrite privileges
privileges: Query,Search,IndexDetail,GetFlushState,GetLoadState,GetLoadingProgress,HasPartition,ShowPartitions,DescribeCollection,DescribeAlias,GetStatistics,ListAliases,Load,Release,Insert,Delete,Upsert,Import,Flush,Compaction,LoadBalance,CreateIndex,DropIndex,CreatePartition,DropPartition # Collection level readwrite privileges
admin:
privileges: Query,Search,IndexDetail,GetFlushState,GetLoadState,GetLoadingProgress,HasPartition,ShowPartitions,DescribeCollection,DescribeAlias,GetStatistics,ListAliases,Load,Release,Insert,Delete,Upsert,Import,Flush,Compaction,LoadBalance,RenameCollection,CreateIndex,DropIndex,CreatePartition,DropPartition,CreateAlias,DropAlias # Collection level admin privileges
privileges: Query,Search,IndexDetail,GetFlushState,GetLoadState,GetLoadingProgress,HasPartition,ShowPartitions,DescribeCollection,DescribeAlias,GetStatistics,ListAliases,Load,Release,Insert,Delete,Upsert,Import,Flush,Compaction,LoadBalance,CreateIndex,DropIndex,CreatePartition,DropPartition,CreateAlias,DropAlias # Collection level admin privileges
tlsMode: 0
session:
ttl: 30 # ttl value when session granting a lease to register service
Expand Down
110 changes: 70 additions & 40 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,6 @@ func (t *clusteringCompactionTask) processPipelining() error {
log.Debug("wait for the node to be assigned before proceeding with the subsequent steps")
return nil
}
var operators []UpdateOperator
for _, segID := range t.InputSegments {
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L2))
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("fail to set segment level to L2", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo before compaction executing", err)
}

if typeutil.IsVectorType(t.GetClusteringKeyField().DataType) {
err := t.doAnalyze()
if err != nil {
Expand Down Expand Up @@ -309,34 +299,49 @@ func (t *clusteringCompactionTask) processIndexing() error {
return nil
}

func (t *clusteringCompactionTask) markInputSegmentsDropped() error {
var operators []UpdateOperator
// mark
for _, segID := range t.GetInputSegments() {
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("markInputSegmentsDropped UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("markInputSegmentsDropped UpdateSegmentsInfo", err)
}
return nil
}

// indexed is the final state of a clustering compaction task
// one task should only run this once
func (t *clusteringCompactionTask) completeTask() error {
err := t.meta.GetPartitionStatsMeta().SavePartitionStatsInfo(&datapb.PartitionStatsInfo{
var err error
// update current partition stats version
// at this point, the segment view includes both the input segments and the result segments.
if err = t.meta.GetPartitionStatsMeta().SavePartitionStatsInfo(&datapb.PartitionStatsInfo{
CollectionID: t.GetCollectionID(),
PartitionID: t.GetPartitionID(),
VChannel: t.GetChannel(),
Version: t.GetPlanID(),
SegmentIDs: t.GetResultSegments(),
CommitTime: time.Now().Unix(),
})
if err != nil {
}); err != nil {
return merr.WrapErrClusteringCompactionMetaError("SavePartitionStatsInfo", err)
}

var operators []UpdateOperator
for _, segID := range t.GetResultSegments() {
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetPlanID()))
}
err = t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentPartitionStatsVersion", err)
}

err = t.meta.GetPartitionStatsMeta().SaveCurrentPartitionStatsVersion(t.GetCollectionID(), t.GetPartitionID(), t.GetChannel(), t.GetPlanID())
if err != nil {
return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err)
}

// mark input segments as dropped
// now, the segment view only includes the result segments.
if err = t.markInputSegmentsDropped(); err != nil {
log.Warn("mark input segments as Dropped failed, skip it and wait retry",
zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}

return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
}

Expand Down Expand Up @@ -376,25 +381,50 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
}); err != nil {
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}

// revert segments meta
var operators []UpdateOperator
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
for _, segID := range t.InputSegments {
operators = append(operators, RevertSegmentLevelOperator(segID))
}
// if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats
for _, segID := range t.ResultSegments {
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
isInputDropped := false
for _, segID := range t.GetInputSegments() {
if t.meta.GetHealthySegment(segID) == nil {
isInputDropped = true
break
}
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
if isInputDropped {
log.Info("input segments dropped, doing for compatibility",
zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()))
// this task must be generated by v2.4, just for compatibility
// revert segments meta
var operators []UpdateOperator
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
for _, segID := range t.GetInputSegments() {
operators = append(operators, RevertSegmentLevelOperator(segID))
}
// if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats
for _, segID := range t.GetResultSegments() {
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
}
} else {
// after v2.4.16, mark the results segment as dropped
var operators []UpdateOperator
for _, segID := range t.GetResultSegments() {
// Don't worry about them being loaded; they are all invisible.
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}

err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
}
}

t.resetSegmentCompacting()

// drop partition stats if uploaded
Expand All @@ -405,7 +435,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
Version: t.GetPlanID(),
SegmentIDs: t.GetResultSegments(),
}
err = t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
if err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
}
Expand Down
12 changes: 7 additions & 5 deletions internal/datacoord/compaction_task_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
task.processPipelining()

seg11 := s.meta.GetSegment(101)
s.Equal(datapb.SegmentLevel_L2, seg11.Level)
s.Equal(datapb.SegmentLevel_L1, seg11.Level)
seg21 := s.meta.GetSegment(102)
s.Equal(datapb.SegmentLevel_L2, seg21.Level)
s.Equal(int64(10000), seg21.PartitionStatsVersion)
Expand Down Expand Up @@ -147,11 +147,13 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
s.Equal(int64(10000), seg22.PartitionStatsVersion)

seg32 := s.meta.GetSegment(103)
s.Equal(datapb.SegmentLevel_L1, seg32.Level)
s.Equal(int64(0), seg32.PartitionStatsVersion)
s.Equal(datapb.SegmentLevel_L2, seg32.Level)
s.Equal(int64(10001), seg32.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Dropped, seg32.GetState())
seg42 := s.meta.GetSegment(104)
s.Equal(datapb.SegmentLevel_L1, seg42.Level)
s.Equal(int64(0), seg42.PartitionStatsVersion)
s.Equal(datapb.SegmentLevel_L2, seg42.Level)
s.Equal(int64(10001), seg42.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Dropped, seg42.GetState())
}

func (s *ClusteringCompactionTaskSuite) generateBasicTask(vectorClusteringKey bool) *clusteringCompactionTask {
Expand Down
92 changes: 46 additions & 46 deletions internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,11 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
}

var (
flushedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
levelZeroIDs = make(typeutil.UniqueSet)
flushedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
levelZeroIDs = make(typeutil.UniqueSet)
newFlushedIDs = make(typeutil.UniqueSet)
)

// cannot use GetSegmentsByChannel since dropped segments are needed here
Expand All @@ -132,7 +133,6 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
indexedSegments := FilterInIndexedSegments(h, h.s.meta, false, segments...)
indexed := typeutil.NewUniqueSet(lo.Map(indexedSegments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...)

unIndexedIDs := make(typeutil.UniqueSet)
for _, s := range segments {
if s.GetStartPosition() == nil && s.GetDmlPosition() == nil {
continue
Expand All @@ -142,36 +142,17 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
continue
}

currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), s.GetPartitionID(), channel.GetName())
if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() != currentPartitionStatsVersion {
// in the process of L2 compaction, newly generated segment may be visible before the whole L2 compaction Plan
// is finished, we have to skip these fast-finished segment because all segments in one L2 Batch must be
// seen atomically, otherwise users will see intermediate result
continue
}

validSegmentInfos[s.GetID()] = s
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() == currentPartitionStatsVersion {
// if segment.partStatsVersion is equal to currentPartitionStatsVersion,
// it must have been indexed, this is guaranteed by clustering compaction process
// this is to ensure that the current valid L2 compaction produce is available to search/query
// to avoid insufficient data
flushedIDs.Insert(s.GetID())
continue
}
droppedIDs.Insert(s.GetID())
case !isFlushState(s.GetState()):
growingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())
case indexed.Contain(s.GetID()) || s.GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64():
// fill in indexed segments into flushed directly
flushedIDs.Insert(s.GetID())

default:
// unIndexed segments will be checked if it's parents are all indexed
unIndexedIDs.Insert(s.GetID())
flushedIDs.Insert(s.GetID())
}
}

Expand Down Expand Up @@ -203,36 +184,55 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
return true
}

retrieveUnIndexed := func() bool {
var compactionFromExist func(segID UniqueID) bool

compactionFromExist = func(segID UniqueID) bool {
compactionFrom := validSegmentInfos[segID].GetCompactionFrom()
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
return false
}
for _, fromID := range compactionFrom {
if flushedIDs.Contain(fromID) || newFlushedIDs.Contain(fromID) {
return true
}
if compactionFromExist(fromID) {
return true
}
}
return false
}

segmentIndexed := func(segID UniqueID) bool {
return indexed.Contain(segID) || validSegmentInfos[segID].GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64()
}

retrieve := func() bool {
continueRetrieve := false
for id := range unIndexedIDs {
for id := range flushedIDs {
compactionFrom := validSegmentInfos[id].GetCompactionFrom()
compactTos := []UniqueID{} // neighbors and itself
if len(compactionFrom) > 0 && isValid(compactionFrom...) {
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
newFlushedIDs.Insert(id)
continue
}
if segmentIndexed(id) && !compactionFromExist(id) {
newFlushedIDs.Insert(id)
} else {
for _, fromID := range compactionFrom {
if len(compactTos) == 0 {
compactToInfo, _ := h.s.meta.GetCompactionTo(fromID)
compactTos = lo.Map(compactToInfo, func(s *SegmentInfo, _ int) UniqueID { return s.GetID() })
}
if indexed.Contain(fromID) {
flushedIDs.Insert(fromID)
} else {
unIndexedIDs.Insert(fromID)
continueRetrieve = true
}
newFlushedIDs.Insert(fromID)
continueRetrieve = true
droppedIDs.Remove(fromID)
}
unIndexedIDs.Remove(compactTos...)
flushedIDs.Remove(compactTos...)
droppedIDs.Remove(compactionFrom...)
}
}
return continueRetrieve
}
for retrieveUnIndexed() {

for retrieve() {
flushedIDs = newFlushedIDs
newFlushedIDs = make(typeutil.UniqueSet)
}

// unindexed is flushed segments as well
flushedIDs.Insert(unIndexedIDs.Collect()...)
flushedIDs = newFlushedIDs

log.Info("GetQueryVChanPositions",
zap.Int64("collectionID", channel.GetCollectionID()),
Expand Down
Loading

0 comments on commit 571abc1

Please sign in to comment.