Skip to content

Commit

Permalink
enhance: Use partitionID when delete by partitionKey
Browse files Browse the repository at this point in the history
When delete by partition_key, Milvus will generates L0 segments
globally. During L0 Compaction, those L0 segments will touch all
partitions collection wise. Due to the false-positive rate of segment bloomfilters,
L0 compactions will append false deltalogs to completed irrelevant partitions,
which causes *partition deletion amplification.

This PR uses partition_key to set targeted partitionID when producing
deleteMsgs into MsgStreams. This'll narrow down L0 segments
scope to partition level, and remove the false-positive
influence collection-wise.

However, due to DeleteMsg structure, we can only label one partition to one deleteMsg,
so this enhancement fails if user wants to delete over 2 partition_keys in one deletion.

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Dec 5, 2024
1 parent 618f0cb commit f027d1f
Show file tree
Hide file tree
Showing 8 changed files with 686 additions and 326 deletions.
1 change: 0 additions & 1 deletion internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
zap.Int64("partitionID", r.GetPartitionID()),
zap.String("channelName", r.GetChannelName()),
zap.Uint32("count", r.GetCount()),
zap.String("segment level", r.GetLevel().String()),
)

// Load the collection info from Root Coordinator, if it is not found in server meta.
Expand Down
2 changes: 1 addition & 1 deletion internal/proto/data_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ message SegmentIDRequest {
int64 partitionID = 4;
bool isImport = 5; // deprecated
int64 importTaskID = 6; // deprecated
SegmentLevel level = 7;
SegmentLevel level = 7; // deprecated
}

message AllocSegmentRequest {
Expand Down
6 changes: 2 additions & 4 deletions internal/proxy/search_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func getPartitionIDs(ctx context.Context, dbName string, collectionName string,

useRegexp := Params.ProxyCfg.PartitionNameRegexp.GetAsBool()

partitionsSet := typeutil.NewSet[int64]()
partitionsSet := typeutil.NewUniqueSet()
for _, partitionName := range partitionNames {
if useRegexp {
// Legacy feature, use partition name as regexp
Expand All @@ -292,9 +292,7 @@ func getPartitionIDs(ctx context.Context, dbName string, collectionName string,
// TODO change after testcase updated: return nil, merr.WrapErrPartitionNotFound(partitionName)
return nil, fmt.Errorf("partition name %s not found", partitionName)
}
if !partitionsSet.Contain(partitionID) {
partitionsSet.Insert(partitionID)
}
partitionsSet.Insert(partitionID)
}
}
return partitionsSet.Collect(), nil
Expand Down
174 changes: 93 additions & 81 deletions internal/proxy/task_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ type deleteTask struct {
idAllocator allocator.Interface

// delete info
primaryKeys *schemapb.IDs
collectionID UniqueID
partitionID UniqueID
dbID UniqueID
partitionKeyMode bool
primaryKeys *schemapb.IDs
collectionID UniqueID
partitionID UniqueID
dbID UniqueID

// set by scheduler
ts Timestamp
Expand Down Expand Up @@ -135,7 +134,6 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error {
func (dt *deleteTask) Execute(ctx context.Context) (err error) {
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete-Execute")
defer sp.End()
// log := log.Ctx(ctx)

if len(dt.req.GetExpr()) == 0 {
return merr.WrapErrParameterInvalid("valid expr", "empty expr", "invalid expression")
Expand Down Expand Up @@ -228,13 +226,13 @@ func repackDeleteMsgByHash(
commonpbutil.WithTimeStamp(ts),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
CollectionID: collectionID,
PartitionID: partitionID,
ShardName: vchannel,
CollectionName: collectionName,
PartitionName: partitionName,
DbName: dbName,
CollectionID: collectionID,
PartitionID: partitionID,
PrimaryKeys: &schemapb.IDs{},
ShardName: vchannel,
},
}
}
Expand Down Expand Up @@ -295,8 +293,9 @@ type deleteRunner struct {
schema *schemaInfo
dbID UniqueID
collectionID UniqueID
partitionID UniqueID
partitionKeyMode bool
partitionIDs []UniqueID
plan *planpb.PlanNode

// for query
msgID int64
Expand Down Expand Up @@ -336,29 +335,53 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
return ErrWithLog(log, "Failed to get collection schema", err)
}

dr.plan, err = planparserv2.CreateRetrievePlan(dr.schema.schemaHelper, dr.req.GetExpr(), dr.req.GetExprTemplateValues())
if err != nil {
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err))
}

if planparserv2.IsAlwaysTruePlan(dr.plan) {
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr()))
}

// Set partitionIDs
dr.partitionKeyMode = dr.schema.IsPartitionKeyCollection()
// get partitionIDs of delete
dr.partitionID = common.AllPartitionsID
if len(dr.req.PartitionName) > 0 {
if dr.partitionKeyMode {
partName := dr.req.GetPartitionName()
if dr.partitionKeyMode {
if len(partName) > 0 {
return errors.New("not support manually specifying the partition names if partition key mode is used")
}

partName := dr.req.GetPartitionName()
expr, err := exprutil.ParseExprFromPlan(dr.plan)
if err != nil {
return err
}
partitionKeys := exprutil.ParseKeys(expr, exprutil.PartitionKey)
hashedPartitionNames, err := assignPartitionKeys(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), partitionKeys)
if err != nil {
return err
}
dr.partitionIDs, err = getPartitionIDs(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), hashedPartitionNames)
if err != nil {
return err
}
} else if len(partName) > 0 {
// static validation
if err := validatePartitionTag(partName, true); err != nil {
return ErrWithLog(log, "Invalid partition name", err)
}

// dynamic validation
partID, err := globalMetaCache.GetPartitionID(ctx, dr.req.GetDbName(), collName, partName)
if err != nil {
return ErrWithLog(log, "Failed to get partition id", err)
}
dr.partitionID = partID
dr.partitionIDs = []UniqueID{partID} // only one partID
}

// hash primary keys to channels
// set vchannels
channelNames, err := dr.chMgr.getVChannels(dr.collectionID)
if err != nil {
return ErrWithLog(log, "Failed to get primary keys from expr", err)
return ErrWithLog(log, "Failed to get vchannels from collection", err)
}
dr.vChannels = channelNames

Expand All @@ -372,16 +395,7 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
}

func (dr *deleteRunner) Run(ctx context.Context) error {
plan, err := planparserv2.CreateRetrievePlan(dr.schema.schemaHelper, dr.req.GetExpr(), dr.req.GetExprTemplateValues())
if err != nil {
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err))
}

if planparserv2.IsAlwaysTruePlan(plan) {
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr()))
}

isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema.CollectionSchema, plan)
isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema.CollectionSchema, dr.plan)
if isSimple {
// if could get delete.primaryKeys from delete expr
err := dr.simpleDelete(ctx, pk, numRow)
Expand All @@ -391,7 +405,7 @@ func (dr *deleteRunner) Run(ctx context.Context) error {
} else {
// if get complex delete expr
// need query from querynode before delete
err = dr.complexDelete(ctx, plan)
err := dr.complexDelete(ctx, dr.plan)
if err != nil {
log.Warn("complex delete failed,but delete some data", zap.Int64("count", dr.result.DeleteCnt), zap.String("expr", dr.req.GetExpr()))
return err
Expand All @@ -400,21 +414,33 @@ func (dr *deleteRunner) Run(ctx context.Context) error {
return nil
}

func (dr *deleteRunner) getDeleteMsgPartitionID() int64 {
// If a complex delete tries to delete multiple partitions in the filter, use AllPartitionID
// otherwise use the target partitionID, which can come from partition name(UDF) or a partition key expression
deletePartitionID := common.AllPartitionsID
if len(dr.partitionIDs) == 1 {
deletePartitionID = dr.partitionIDs[0]
}

return deletePartitionID
}

func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs) (*deleteTask, error) {
targetPartID := dr.getDeleteMsgPartitionID()
dt := &deleteTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
req: dr.req,
idAllocator: dr.idAllocator,
chMgr: dr.chMgr,
chTicker: dr.chTicker,
collectionID: dr.collectionID,
partitionID: dr.partitionID,
partitionKeyMode: dr.partitionKeyMode,
vChannels: dr.vChannels,
primaryKeys: primaryKeys,
dbID: dr.dbID,
ctx: ctx,
Condition: NewTaskCondition(ctx),
req: dr.req,
idAllocator: dr.idAllocator,
chMgr: dr.chMgr,
chTicker: dr.chTicker,
collectionID: dr.collectionID,
partitionID: targetPartID,
vChannels: dr.vChannels,
primaryKeys: primaryKeys,
dbID: dr.dbID,
}

var enqueuedTask task = dt
if streamingutil.IsStreamingServiceEnabled() {
enqueuedTask = &deleteTaskByStreamingService{deleteTask: dt}
Expand All @@ -433,24 +459,8 @@ func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs)
func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) executeFunc {
return func(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channel string) error {
var partitionIDs []int64

// optimize query when partitionKey on
if dr.partitionKeyMode {
expr, err := exprutil.ParseExprFromPlan(plan)
if err != nil {
return err
}
partitionKeys := exprutil.ParseKeys(expr, exprutil.PartitionKey)
hashedPartitionNames, err := assignPartitionKeys(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), partitionKeys)
if err != nil {
return err
}
partitionIDs, err = getPartitionIDs(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), hashedPartitionNames)
if err != nil {
return err
}
} else if dr.partitionID != common.InvalidFieldID {
partitionIDs = []int64{dr.partitionID}
if len(dr.partitionIDs) > 0 {
partitionIDs = dr.partitionIDs
}

log := log.Ctx(ctx).With(
Expand Down Expand Up @@ -602,10 +612,11 @@ func (dr *deleteRunner) complexDelete(ctx context.Context, plan *planpb.PlanNode
}

func (dr *deleteRunner) simpleDelete(ctx context.Context, pk *schemapb.IDs, numRow int64) error {
deletePartID := dr.getDeleteMsgPartitionID()
log.Debug("get primary keys from expr",
zap.Int64("len of primary keys", numRow),
zap.Int64("collectionID", dr.collectionID),
zap.Int64("partitionID", dr.partitionID))
zap.Int64("partitionID", deletePartID))

task, err := dr.produce(ctx, pk)
if err != nil {
Expand All @@ -621,70 +632,71 @@ func (dr *deleteRunner) simpleDelete(ctx context.Context, pk *schemapb.IDs, numR
return err
}

func getPrimaryKeysFromPlan(schema *schemapb.CollectionSchema, plan *planpb.PlanNode) (bool, *schemapb.IDs, int64) {
// simple delete request need expr with "pk in [a, b]"
func getPrimaryKeysFromPlan(schema *schemapb.CollectionSchema, plan *planpb.PlanNode) (isSimpleDelete bool, pks *schemapb.IDs, pkCount int64) {
var err error
// simple delete request with "pk in [a, b]"
termExpr, ok := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_TermExpr)
if ok {
if !termExpr.TermExpr.GetColumnInfo().GetIsPrimaryKey() {
return false, nil, 0
}

ids, rowNum, err := getPrimaryKeysFromTermExpr(schema, termExpr)
pks, pkCount, err = getPrimaryKeysFromTermExpr(schema, termExpr)
if err != nil {
return false, nil, 0
}
return true, ids, rowNum
return true, pks, pkCount
}

// simple delete if expr with "pk == a"
// simple delete with "pk == a"
unaryRangeExpr, ok := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_UnaryRangeExpr)
if ok {
if unaryRangeExpr.UnaryRangeExpr.GetOp() != planpb.OpType_Equal || !unaryRangeExpr.UnaryRangeExpr.GetColumnInfo().GetIsPrimaryKey() {
return false, nil, 0
}

ids, err := getPrimaryKeysFromUnaryRangeExpr(schema, unaryRangeExpr)
pks, err = getPrimaryKeysFromUnaryRangeExpr(schema, unaryRangeExpr)
if err != nil {
return false, nil, 0
}
return true, ids, 1
return true, pks, 1
}

return false, nil, 0
}

func getPrimaryKeysFromUnaryRangeExpr(schema *schemapb.CollectionSchema, unaryRangeExpr *planpb.Expr_UnaryRangeExpr) (res *schemapb.IDs, err error) {
res = &schemapb.IDs{}
func getPrimaryKeysFromUnaryRangeExpr(schema *schemapb.CollectionSchema, unaryRangeExpr *planpb.Expr_UnaryRangeExpr) (pks *schemapb.IDs, err error) {
pks = &schemapb.IDs{}
switch unaryRangeExpr.UnaryRangeExpr.GetColumnInfo().GetDataType() {
case schemapb.DataType_Int64:
res.IdField = &schemapb.IDs_IntId{
pks.IdField = &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{unaryRangeExpr.UnaryRangeExpr.GetValue().GetInt64Val()},
},
}
case schemapb.DataType_VarChar:
res.IdField = &schemapb.IDs_StrId{
pks.IdField = &schemapb.IDs_StrId{
StrId: &schemapb.StringArray{
Data: []string{unaryRangeExpr.UnaryRangeExpr.GetValue().GetStringVal()},
},
}
default:
return res, fmt.Errorf("invalid field data type specifyed in simple delete expr")
return pks, fmt.Errorf("invalid field data type specifyed in simple delete expr")
}

return res, nil
return pks, nil
}

func getPrimaryKeysFromTermExpr(schema *schemapb.CollectionSchema, termExpr *planpb.Expr_TermExpr) (res *schemapb.IDs, rowNum int64, err error) {
res = &schemapb.IDs{}
rowNum = int64(len(termExpr.TermExpr.Values))
func getPrimaryKeysFromTermExpr(schema *schemapb.CollectionSchema, termExpr *planpb.Expr_TermExpr) (pks *schemapb.IDs, pkCount int64, err error) {
pks = &schemapb.IDs{}
pkCount = int64(len(termExpr.TermExpr.Values))
switch termExpr.TermExpr.ColumnInfo.GetDataType() {
case schemapb.DataType_Int64:
ids := make([]int64, 0)
for _, v := range termExpr.TermExpr.Values {
ids = append(ids, v.GetInt64Val())
}
res.IdField = &schemapb.IDs_IntId{
pks.IdField = &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: ids,
},
Expand All @@ -694,14 +706,14 @@ func getPrimaryKeysFromTermExpr(schema *schemapb.CollectionSchema, termExpr *pla
for _, v := range termExpr.TermExpr.Values {
ids = append(ids, v.GetStringVal())
}
res.IdField = &schemapb.IDs_StrId{
pks.IdField = &schemapb.IDs_StrId{
StrId: &schemapb.StringArray{
Data: ids,
},
}
default:
return res, 0, fmt.Errorf("invalid field data type specifyed in simple delete expr")
return pks, 0, fmt.Errorf("invalid field data type specifyed in simple delete expr")
}

return res, rowNum, nil
return pks, pkCount, nil
}
Loading

0 comments on commit f027d1f

Please sign in to comment.