Skip to content

Commit

Permalink
fix: split delete task msg to MaxMessageSize to avoid mq message too …
Browse files Browse the repository at this point in the history
…large error (#36197)

relate: #36089

---------

Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd authored Sep 27, 2024
1 parent b1ac3f0 commit ffc12fb
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 78 deletions.
115 changes: 59 additions & 56 deletions internal/proxy/task_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
EndTs: dt.EndTs(),
}

for _, msg := range result {
if msg != nil {
for _, msgs := range result {
for _, msg := range msgs {
msgPack.Msgs = append(msgPack.Msgs, msg)
}
}
Expand Down Expand Up @@ -202,75 +202,78 @@ func repackDeleteMsgByHash(
collectionName string,
partitionID int64,
partitionName string,
) (map[uint32]*msgstream.DeleteMsg, int64, error) {
) (map[uint32][]*msgstream.DeleteMsg, int64, error) {
maxSize := Params.PulsarCfg.MaxMessageSize.GetAsInt()
hashValues := typeutil.HashPK2Channels(primaryKeys, vChannels)
// repack delete msg by dmChannel
result := make(map[uint32]*msgstream.DeleteMsg)
result := make(map[uint32][]*msgstream.DeleteMsg)
lastMessageSize := map[uint32]int{}

numRows := int64(0)
numMessage := 0

createMessage := func(key uint32, vchannel string) *msgstream.DeleteMsg {
numMessage++
lastMessageSize[key] = 0
return &msgstream.DeleteMsg{
BaseMsg: msgstream.BaseMsg{
Ctx: ctx,
},
DeleteRequest: &msgpb.DeleteRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_Delete),
// msgid of delete msg must be set later
// or it will be seen as duplicated msg in mq
commonpbutil.WithTimeStamp(ts),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
CollectionID: collectionID,
PartitionID: partitionID,
CollectionName: collectionName,
PartitionName: partitionName,
PrimaryKeys: &schemapb.IDs{},
ShardName: vchannel,
},
}
}

for index, key := range hashValues {
vchannel := vChannels[key]
_, ok := result[key]
msgs, ok := result[key]
if !ok {
deleteMsg, err := newDeleteMsg(
ctx,
idAllocator,
ts,
collectionID,
collectionName,
partitionID,
partitionName,
)
if err != nil {
return nil, 0, err
}
deleteMsg.ShardName = vchannel
result[key] = deleteMsg
result[key] = make([]*msgstream.DeleteMsg, 1)
msgs = result[key]
result[key][0] = createMessage(key, vchannel)
}
curMsg := msgs[len(msgs)-1]
size, id := typeutil.GetId(primaryKeys, index)
if lastMessageSize[key]+16+size > maxSize {
curMsg = createMessage(key, vchannel)
result[key] = append(result[key], curMsg)
}
curMsg := result[key]
curMsg.HashValues = append(curMsg.HashValues, hashValues[index])
curMsg.Timestamps = append(curMsg.Timestamps, ts)

typeutil.AppendIDs(curMsg.PrimaryKeys, primaryKeys, index)
typeutil.AppendID(curMsg.PrimaryKeys, id)
lastMessageSize[key] += 16 + size
curMsg.NumRows++
numRows++
}
return result, numRows, nil
}

func newDeleteMsg(
ctx context.Context,
idAllocator allocator.Interface,
ts uint64,
collectionID int64,
collectionName string,
partitionID int64,
partitionName string,
) (*msgstream.DeleteMsg, error) {
msgid, err := idAllocator.AllocOne()
// alloc messageID
start, _, err := idAllocator.Alloc(uint32(numMessage))
if err != nil {
return nil, errors.Wrap(err, "failed to allocate MsgID of delete")
}
sliceRequest := &msgpb.DeleteRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_Delete),
// msgid of delete msg must be set
// or it will be seen as duplicated msg in mq
commonpbutil.WithMsgID(msgid),
commonpbutil.WithTimeStamp(ts),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
CollectionID: collectionID,
PartitionID: partitionID,
CollectionName: collectionName,
PartitionName: partitionName,
PrimaryKeys: &schemapb.IDs{},
}
return &msgstream.DeleteMsg{
BaseMsg: msgstream.BaseMsg{
Ctx: ctx,
},
DeleteRequest: sliceRequest,
}, nil
return nil, 0, err
}

cnt := int64(0)
for _, msgs := range result {
for _, msg := range msgs {
msg.Base.MsgID = start + cnt
cnt++
}
}
return result, numRows, nil
}

type deleteRunner struct {
Expand Down
24 changes: 13 additions & 11 deletions internal/proxy/task_delete_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,21 @@ func (dt *deleteTaskByStreamingService) Execute(ctx context.Context) (err error)
}

var msgs []message.MutableMessage
for hashKey, deleteMsg := range result {
for hashKey, deleteMsgs := range result {
vchannel := dt.vChannels[hashKey]
msg, err := message.NewDeleteMessageBuilderV1().
WithHeader(&message.DeleteMessageHeader{
CollectionId: dt.collectionID,
}).
WithBody(deleteMsg.DeleteRequest).
WithVChannel(vchannel).
BuildMutable()
if err != nil {
return err
for _, deleteMsg := range deleteMsgs {
msg, err := message.NewDeleteMessageBuilderV1().
WithHeader(&message.DeleteMessageHeader{
CollectionId: dt.collectionID,
}).
WithBody(deleteMsg.DeleteRequest).
WithVChannel(vchannel).
BuildMutable()
if err != nil {
return err
}
msgs = append(msgs, msg)
}
msgs = append(msgs, msg)
}

log.Debug("send delete request to virtual channels",
Expand Down
24 changes: 13 additions & 11 deletions internal/proxy/task_upsert_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,21 @@ func (it *upsertTaskByStreamingService) packDeleteMessage(ctx context.Context) (
}

var msgs []message.MutableMessage
for hashKey, deleteMsg := range result {
for hashKey, deleteMsgs := range result {
vchannel := vChannels[hashKey]
msg, err := message.NewDeleteMessageBuilderV1().
WithHeader(&message.DeleteMessageHeader{
CollectionId: it.upsertMsg.DeleteMsg.CollectionID,
}).
WithBody(deleteMsg.DeleteRequest).
WithVChannel(vchannel).
BuildMutable()
if err != nil {
return nil, err
for _, deleteMsg := range deleteMsgs {
msg, err := message.NewDeleteMessageBuilderV1().
WithHeader(&message.DeleteMessageHeader{
CollectionId: it.upsertMsg.DeleteMsg.CollectionID,
}).
WithBody(deleteMsg.DeleteRequest).
WithVChannel(vchannel).
BuildMutable()
if err != nil {
return nil, err
}
msgs = append(msgs, msg)
}
msgs = append(msgs, msg)
}

log.Debug("Proxy Upsert deleteExecute done",
Expand Down
38 changes: 38 additions & 0 deletions pkg/util/typeutil/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,44 @@ func AppendSystemFields(schema *schemapb.CollectionSchema) *schemapb.CollectionS
return newSchema
}

func GetId(src *schemapb.IDs, idx int) (int, any) {
switch src.IdField.(type) {
case *schemapb.IDs_IntId:
return 8, src.GetIntId().Data[idx]
case *schemapb.IDs_StrId:
return len(src.GetStrId().Data[idx]), src.GetStrId().Data[idx]
default:
panic("unknown pk type")
}
}

func AppendID(dst *schemapb.IDs, src any) {
switch value := src.(type) {
case int64:
if dst.GetIdField() == nil {
dst.IdField = &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{value},
},
}
} else {
dst.GetIntId().Data = append(dst.GetIntId().Data, value)
}
case string:
if dst.GetIdField() == nil {
dst.IdField = &schemapb.IDs_StrId{
StrId: &schemapb.StringArray{
Data: []string{value},
},
}
} else {
dst.GetStrId().Data = append(dst.GetStrId().Data, value)
}
default:
// TODO
}
}

func AppendIDs(dst *schemapb.IDs, src *schemapb.IDs, idx int) {
switch src.IdField.(type) {
case *schemapb.IDs_IntId:
Expand Down

0 comments on commit ffc12fb

Please sign in to comment.