Skip to content

Commit

Permalink
enhance: Unify DeleteLog parsing code (#34009)
Browse files Browse the repository at this point in the history
See also #33787

The parsing delete log is distributed in lots of places, which is not
recommended and hard to maintain.

This PR abstract common parsing logic into `DeleteLog.Parse` method to
unify implementation and make it easier to replace json parsing lib.

Signed-off-by: Congqi Xia <[email protected]>

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Jun 21, 2024
1 parent 622be36 commit 2f691f1
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 156 deletions.
134 changes: 2 additions & 132 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ import (
"fmt"
"math"
"sort"
"strconv"
"strings"

"github.com/samber/lo"
"github.com/valyala/fastjson"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
Expand Down Expand Up @@ -704,101 +699,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return partitionID, segmentID, data, err
}

type DeleteLog struct {
Pk PrimaryKey `json:"pk"`
Ts uint64 `json:"ts"`
PkType int64 `json:"pkType"`
}

func NewDeleteLog(pk PrimaryKey, ts Timestamp) *DeleteLog {
pkType := pk.Type()

return &DeleteLog{
Pk: pk,
Ts: ts,
PkType: int64(pkType),
}
}

func (dl *DeleteLog) UnmarshalJSON(data []byte) error {
var messageMap map[string]*json.RawMessage
var err error
if err = json.Unmarshal(data, &messageMap); err != nil {
return err
}

if err = json.Unmarshal(*messageMap["pkType"], &dl.PkType); err != nil {
return err
}

switch schemapb.DataType(dl.PkType) {
case schemapb.DataType_Int64:
dl.Pk = &Int64PrimaryKey{}
case schemapb.DataType_VarChar:
dl.Pk = &VarCharPrimaryKey{}
}

if err = json.Unmarshal(*messageMap["pk"], dl.Pk); err != nil {
return err
}

if err = json.Unmarshal(*messageMap["ts"], &dl.Ts); err != nil {
return err
}

return nil
}

// DeleteData saves each entity delete message represented as <primarykey,timestamp> map.
// timestamp represents the time when this instance was deleted
type DeleteData struct {
Pks []PrimaryKey // primary keys
Tss []Timestamp // timestamps
RowCount int64
memSize int64
}

func NewDeleteData(pks []PrimaryKey, tss []Timestamp) *DeleteData {
return &DeleteData{
Pks: pks,
Tss: tss,
RowCount: int64(len(pks)),
memSize: lo.SumBy(pks, func(pk PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8),
}
}

// Append append 1 pk&ts pair to DeleteData
func (data *DeleteData) Append(pk PrimaryKey, ts Timestamp) {
data.Pks = append(data.Pks, pk)
data.Tss = append(data.Tss, ts)
data.RowCount++
data.memSize += pk.Size() + int64(8)
}

// Append append 1 pk&ts pair to DeleteData
func (data *DeleteData) AppendBatch(pks []PrimaryKey, tss []Timestamp) {
data.Pks = append(data.Pks, pks...)
data.Tss = append(data.Tss, tss...)
data.RowCount += int64(len(pks))
data.memSize += lo.SumBy(pks, func(pk PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8)
}

func (data *DeleteData) Merge(other *DeleteData) {
data.Pks = append(data.Pks, other.Pks...)
data.Tss = append(data.Tss, other.Tss...)
data.RowCount += other.RowCount
data.memSize += other.Size()

other.Pks = nil
other.Tss = nil
other.RowCount = 0
other.memSize = 0
}

func (data *DeleteData) Size() int64 {
return data.memSize
}

// DeleteCodec serializes and deserializes the delete data
type DeleteCodec struct{}

Expand Down Expand Up @@ -898,8 +798,6 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return err
}
defer rr.Release()

var p fastjson.Parser
deleteLog := &DeleteLog{}

for rr.Next() {
Expand All @@ -909,38 +807,10 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
for i := 0; i < column.Len(); i++ {
strVal := column.ValueStr(i)

v, err := p.Parse(strVal)
err := deleteLog.Parse(strVal)
if err != nil {
// compatible with versions that only support int64 type primary keys
// compatible with fmt.Sprintf("%d,%d", pk, ts)
// compatible error info (unmarshal err invalid character ',' after top-level value)
splits := strings.Split(strVal, ",")
if len(splits) != 2 {
return fmt.Errorf("the format of delta log is incorrect, %v can not be split", strVal)
}
pk, err := strconv.ParseInt(splits[0], 10, 64)
if err != nil {
return err
}
deleteLog.Pk = &Int64PrimaryKey{
Value: pk,
}
deleteLog.PkType = int64(schemapb.DataType_Int64)
deleteLog.Ts, err = strconv.ParseUint(splits[1], 10, 64)
if err != nil {
return err
}
} else {
deleteLog.Ts = v.GetUint64("ts")
deleteLog.PkType = v.GetInt64("pkType")
switch deleteLog.PkType {
case int64(schemapb.DataType_Int64):
deleteLog.Pk = &Int64PrimaryKey{Value: v.GetInt64("pk")}
case int64(schemapb.DataType_VarChar):
deleteLog.Pk = &VarCharPrimaryKey{Value: string(v.GetStringBytes("pk"))}
}
return err
}

result.Append(deleteLog.Pk, deleteLog.Ts)
}
}
Expand Down
180 changes: 180 additions & 0 deletions internal/storage/delta_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
"encoding/json"
"fmt"
"strconv"
"strings"

"github.com/samber/lo"
"github.com/valyala/fastjson"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
)

// parserPool use object pooling to reduce fastjson.Parser allocation.
var parserPool = &fastjson.ParserPool{}

// DeltaData stores delta data
// currently only delete tuples are stored
type DeltaData struct {
pkType schemapb.DataType
// delete tuples
delPks PrimaryKeys
delTss []Timestamp

// stats
delRowCount int64
memSize int64
}

type DeleteLog struct {
Pk PrimaryKey `json:"pk"`
Ts uint64 `json:"ts"`
PkType int64 `json:"pkType"`
}

func NewDeleteLog(pk PrimaryKey, ts Timestamp) *DeleteLog {
pkType := pk.Type()

return &DeleteLog{
Pk: pk,
Ts: ts,
PkType: int64(pkType),
}
}

// Parse tries to parse string format delete log
// it try json first then use "," split int,ts format
func (dl *DeleteLog) Parse(val string) error {
p := parserPool.Get()
defer parserPool.Put(p)
v, err := p.Parse(val)
if err != nil {
// compatible with versions that only support int64 type primary keys
// compatible with fmt.Sprintf("%d,%d", pk, ts)
// compatible error info (unmarshal err invalid character ',' after top-level value)
splits := strings.Split(val, ",")
if len(splits) != 2 {
return fmt.Errorf("the format of delta log is incorrect, %v can not be split", val)
}
pk, err := strconv.ParseInt(splits[0], 10, 64)
if err != nil {
return err
}
dl.Pk = &Int64PrimaryKey{
Value: pk,
}
dl.PkType = int64(schemapb.DataType_Int64)
dl.Ts, err = strconv.ParseUint(splits[1], 10, 64)
if err != nil {
return err
}
return nil
}

dl.Ts = v.GetUint64("ts")
dl.PkType = v.GetInt64("pkType")
switch dl.PkType {
case int64(schemapb.DataType_Int64):
dl.Pk = &Int64PrimaryKey{Value: v.GetInt64("pk")}
case int64(schemapb.DataType_VarChar):
dl.Pk = &VarCharPrimaryKey{Value: string(v.GetStringBytes("pk"))}
}
return nil
}

func (dl *DeleteLog) UnmarshalJSON(data []byte) error {
var messageMap map[string]*json.RawMessage
var err error
if err = json.Unmarshal(data, &messageMap); err != nil {
return err
}

if err = json.Unmarshal(*messageMap["pkType"], &dl.PkType); err != nil {
return err
}

switch schemapb.DataType(dl.PkType) {
case schemapb.DataType_Int64:
dl.Pk = &Int64PrimaryKey{}
case schemapb.DataType_VarChar:
dl.Pk = &VarCharPrimaryKey{}
}

if err = json.Unmarshal(*messageMap["pk"], dl.Pk); err != nil {
return err
}

if err = json.Unmarshal(*messageMap["ts"], &dl.Ts); err != nil {
return err
}

return nil
}

// DeleteData saves each entity delete message represented as <primarykey,timestamp> map.
// timestamp represents the time when this instance was deleted
type DeleteData struct {
Pks []PrimaryKey // primary keys
Tss []Timestamp // timestamps
RowCount int64
memSize int64
}

func NewDeleteData(pks []PrimaryKey, tss []Timestamp) *DeleteData {
return &DeleteData{
Pks: pks,
Tss: tss,
RowCount: int64(len(pks)),
memSize: lo.SumBy(pks, func(pk PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8),
}
}

// Append append 1 pk&ts pair to DeleteData
func (data *DeleteData) Append(pk PrimaryKey, ts Timestamp) {
data.Pks = append(data.Pks, pk)
data.Tss = append(data.Tss, ts)
data.RowCount++
data.memSize += pk.Size() + int64(8)
}

// Append append 1 pk&ts pair to DeleteData
func (data *DeleteData) AppendBatch(pks []PrimaryKey, tss []Timestamp) {
data.Pks = append(data.Pks, pks...)
data.Tss = append(data.Tss, tss...)
data.RowCount += int64(len(pks))
data.memSize += lo.SumBy(pks, func(pk PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8)
}

func (data *DeleteData) Merge(other *DeleteData) {
data.Pks = append(data.Pks, other.Pks...)
data.Tss = append(data.Tss, other.Tss...)
data.RowCount += other.RowCount
data.memSize += other.Size()

other.Pks = nil
other.Tss = nil
other.RowCount = 0
other.memSize = 0
}

func (data *DeleteData) Size() int64 {
return data.memSize
}
22 changes: 2 additions & 20 deletions internal/storage/serde_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"io"
"sort"
"strconv"
"strings"

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
Expand Down Expand Up @@ -249,25 +248,8 @@ func NewDeltalogDeserializeReader(blobs []*Blob) (*DeserializeReader[*DeleteLog]
}
a := r.Column(fid).(*array.String)
strVal := a.Value(i)
if err = json.Unmarshal([]byte(strVal), v[i]); err != nil {
// compatible with versions that only support int64 type primary keys
// compatible with fmt.Sprintf("%d,%d", pk, ts)
// compatible error info (unmarshal err invalid character ',' after top-level value)
splits := strings.Split(strVal, ",")
if len(splits) != 2 {
return fmt.Errorf("the format of delta log is incorrect, %v can not be split", strVal)
}
pk, err := strconv.ParseInt(splits[0], 10, 64)
if err != nil {
return err
}
v[i].Pk = &Int64PrimaryKey{
Value: pk,
}
v[i].Ts, err = strconv.ParseUint(splits[1], 10, 64)
if err != nil {
return err
}
if err := v[i].Parse(strVal); err != nil {
return err
}
}
return nil
Expand Down
3 changes: 1 addition & 2 deletions internal/util/importutilv2/binlog/l0_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package binlog

import (
"context"
"encoding/json"
"fmt"
"io"

Expand Down Expand Up @@ -94,7 +93,7 @@ func (r *l0Reader) Read() (*storage.DeleteData, error) {
for _, rows := range rowsSet {
for _, row := range rows.([]string) {
dl := &storage.DeleteLog{}
err = json.Unmarshal([]byte(row), dl)
err = dl.Parse(row)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 2f691f1

Please sign in to comment.