Skip to content

Commit

Permalink
feat: add replicate.enable property to collection and related config
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Oct 29, 2024
1 parent 9d16b97 commit 178dc60
Show file tree
Hide file tree
Showing 16 changed files with 607 additions and 7 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ common:
bloomFilterType: BlockedBloomFilter # bloom filter type, support BasicBloomFilter and BlockedBloomFilter
maxBloomFalsePositive: 0.001 # max false positive rate for bloom filter
bloomFilterApplyBatchSize: 1000 # batch size when to apply pk to bloom filter
collectionReplicateEnable: false # Whether to enable collection replication.
usePartitionKeyAsClusteringKey: false # if true, do clustering compaction and segment prune on partition key field
useVectorAsClusteringKey: false # if true, do clustering compaction and segment prune on vector field
enableVectorClusteringKey: false # if true, enable vector clustering key and vector clustering compaction
Expand Down
3 changes: 2 additions & 1 deletion internal/datanode/importv2/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/stretchr/testify/assert"
)

func TestResizePools(t *testing.T) {
Expand Down
45 changes: 44 additions & 1 deletion internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,8 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC
ctx: ctx,
Condition: NewTaskCondition(ctx),
AlterCollectionRequest: request,
replicateTargetTSMap: node.replicateTargetTSMap,
replicateCurrentTSMap: node.replicateCurrentTSMap,
rootCoord: node.rootCoord,
queryCoord: node.queryCoord,
dataCoord: node.dataCoord,
Expand Down Expand Up @@ -6088,16 +6090,37 @@ func (node *Proxy) Connect(ctx context.Context, request *milvuspb.ConnectRequest
}, nil
}

func (node *Proxy) storeCurrentReplicateTS(channelName string, ts uint64) {
current, ok := node.replicateCurrentTSMap.Get(channelName)
if !ok || ts > current {
node.replicateCurrentTSMap.Insert(channelName, ts)
}
}

func (node *Proxy) ReplicateMessage(ctx context.Context, req *milvuspb.ReplicateMessageRequest) (*milvuspb.ReplicateMessageResponse, error) {
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.ReplicateMessageResponse{Status: merr.Status(err)}, nil
}

if paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool() {
collectionReplicateEnable := paramtable.Get().CommonCfg.CollectionReplicateEnable.GetAsBool()
ttMsgEnabled := paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool()

// replicate message can be use in two ways, otherwise return error
// 1. collectionReplicateEnable is false and ttMsgEnabled is false, active/standby mode
// 2. collectionReplicateEnable is true and ttMsgEnabled is true, data migration mode
if (!collectionReplicateEnable && ttMsgEnabled) || (collectionReplicateEnable && !ttMsgEnabled) {
return &milvuspb.ReplicateMessageResponse{
Status: merr.Status(merr.ErrDenyReplicateMessage),
}, nil
}

if _, ok := node.replicateTargetTSMap.Get(req.GetChannelName()); ok {
log.Ctx(ctx).Warn("the related collection is altering properties, deny to replicate message")
return &milvuspb.ReplicateMessageResponse{
Status: merr.Status(merr.ErrDenyReplicateMessage),
}, nil
}

var err error

if req.GetChannelName() == "" {
Expand Down Expand Up @@ -6137,6 +6160,18 @@ func (node *Proxy) ReplicateMessage(ctx context.Context, req *milvuspb.Replicate
StartPositions: req.StartPositions,
EndPositions: req.EndPositions,
}
checkCollectionReplicateProperty := func(dbName, collectionName string) bool {
if !collectionReplicateEnable {
return true
}
info, err := globalMetaCache.GetCollectionInfo(ctx, dbName, collectionName, 0)
if err != nil {
log.Warn("get collection info failed", zap.String("collectionName", collectionName), zap.Error(err))
return false
}
return info.replicateMode
}

// getTsMsgFromConsumerMsg
for i, msgBytes := range req.Msgs {
header := commonpb.MsgHeader{}
Expand All @@ -6154,8 +6189,12 @@ func (node *Proxy) ReplicateMessage(ctx context.Context, req *milvuspb.Replicate
log.Ctx(ctx).Warn("failed to unmarshal msg", zap.Int("index", i), zap.Error(err))
return &milvuspb.ReplicateMessageResponse{Status: merr.Status(merr.ErrInvalidMsgBytes)}, nil
}
node.storeCurrentReplicateTS(req.GetChannelName(), tsMsg.EndTs())
switch realMsg := tsMsg.(type) {
case *msgstream.InsertMsg:
if !checkCollectionReplicateProperty(realMsg.GetDbName(), realMsg.GetCollectionName()) {
return &milvuspb.ReplicateMessageResponse{Status: merr.Status(merr.WrapErrCollectionReplicateMode("replicate"))}, nil
}
assignedSegmentInfos, err := node.segAssigner.GetSegmentID(realMsg.GetCollectionID(), realMsg.GetPartitionID(),
realMsg.GetShardName(), uint32(realMsg.NumRows), req.EndTs)
if err != nil {
Expand All @@ -6170,6 +6209,10 @@ func (node *Proxy) ReplicateMessage(ctx context.Context, req *milvuspb.Replicate
realMsg.SegmentID = assignSegmentID
break
}
case *msgstream.DeleteMsg:
if !checkCollectionReplicateProperty(realMsg.GetDbName(), realMsg.GetCollectionName()) {
return &milvuspb.ReplicateMessageResponse{Status: merr.Status(merr.WrapErrCollectionReplicateMode("replicate"))}, nil
}
}
msgPack.Msgs = append(msgPack.Msgs, tsMsg)
}
Expand Down
Loading

0 comments on commit 178dc60

Please sign in to comment.