From 2f691f1e67487e8c311914cdaa396e9a8a2e1b12 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 21 Jun 2024 16:54:01 +0800 Subject: [PATCH] enhance: Unify DeleteLog parsing code (#34009) See also #33787 The parsing delete log is distributed in lots of places, which is not recommended and hard to maintain. This PR abstract common parsing logic into `DeleteLog.Parse` method to unify implementation and make it easier to replace json parsing lib. Signed-off-by: Congqi Xia --------- Signed-off-by: Congqi Xia --- internal/storage/data_codec.go | 134 +------------ internal/storage/delta_data.go | 180 ++++++++++++++++++ internal/storage/serde_events.go | 22 +-- .../util/importutilv2/binlog/l0_reader.go | 3 +- internal/util/importutilv2/binlog/reader.go | 3 +- 5 files changed, 186 insertions(+), 156 deletions(-) create mode 100644 internal/storage/delta_data.go diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 9f64808a7383a..c8a1babefce90 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -22,11 +22,6 @@ import ( "fmt" "math" "sort" - "strconv" - "strings" - - "github.com/samber/lo" - "github.com/valyala/fastjson" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" @@ -704,101 +699,6 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return partitionID, segmentID, data, err } -type DeleteLog struct { - Pk PrimaryKey `json:"pk"` - Ts uint64 `json:"ts"` - PkType int64 `json:"pkType"` -} - -func NewDeleteLog(pk PrimaryKey, ts Timestamp) *DeleteLog { - pkType := pk.Type() - - return &DeleteLog{ - Pk: pk, - Ts: ts, - PkType: int64(pkType), - } -} - -func (dl *DeleteLog) UnmarshalJSON(data []byte) error { - var messageMap map[string]*json.RawMessage - var err error - if err = json.Unmarshal(data, &messageMap); err != nil { - return err - } - - if err = json.Unmarshal(*messageMap["pkType"], &dl.PkType); err != nil { - return err - } - - switch schemapb.DataType(dl.PkType) { - case schemapb.DataType_Int64: - dl.Pk = &Int64PrimaryKey{} - case schemapb.DataType_VarChar: - dl.Pk = &VarCharPrimaryKey{} - } - - if err = json.Unmarshal(*messageMap["pk"], dl.Pk); err != nil { - return err - } - - if err = json.Unmarshal(*messageMap["ts"], &dl.Ts); err != nil { - return err - } - - return nil -} - -// DeleteData saves each entity delete message represented as map. -// timestamp represents the time when this instance was deleted -type DeleteData struct { - Pks []PrimaryKey // primary keys - Tss []Timestamp // timestamps - RowCount int64 - memSize int64 -} - -func NewDeleteData(pks []PrimaryKey, tss []Timestamp) *DeleteData { - return &DeleteData{ - Pks: pks, - Tss: tss, - RowCount: int64(len(pks)), - memSize: lo.SumBy(pks, func(pk PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8), - } -} - -// Append append 1 pk&ts pair to DeleteData -func (data *DeleteData) Append(pk PrimaryKey, ts Timestamp) { - data.Pks = append(data.Pks, pk) - data.Tss = append(data.Tss, ts) - data.RowCount++ - data.memSize += pk.Size() + int64(8) -} - -// Append append 1 pk&ts pair to DeleteData -func (data *DeleteData) AppendBatch(pks []PrimaryKey, tss []Timestamp) { - data.Pks = append(data.Pks, pks...) - data.Tss = append(data.Tss, tss...) - data.RowCount += int64(len(pks)) - data.memSize += lo.SumBy(pks, func(pk PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8) -} - -func (data *DeleteData) Merge(other *DeleteData) { - data.Pks = append(data.Pks, other.Pks...) - data.Tss = append(data.Tss, other.Tss...) - data.RowCount += other.RowCount - data.memSize += other.Size() - - other.Pks = nil - other.Tss = nil - other.RowCount = 0 - other.memSize = 0 -} - -func (data *DeleteData) Size() int64 { - return data.memSize -} - // DeleteCodec serializes and deserializes the delete data type DeleteCodec struct{} @@ -898,8 +798,6 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return err } defer rr.Release() - - var p fastjson.Parser deleteLog := &DeleteLog{} for rr.Next() { @@ -909,38 +807,10 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID for i := 0; i < column.Len(); i++ { strVal := column.ValueStr(i) - v, err := p.Parse(strVal) + err := deleteLog.Parse(strVal) if err != nil { - // compatible with versions that only support int64 type primary keys - // compatible with fmt.Sprintf("%d,%d", pk, ts) - // compatible error info (unmarshal err invalid character ',' after top-level value) - splits := strings.Split(strVal, ",") - if len(splits) != 2 { - return fmt.Errorf("the format of delta log is incorrect, %v can not be split", strVal) - } - pk, err := strconv.ParseInt(splits[0], 10, 64) - if err != nil { - return err - } - deleteLog.Pk = &Int64PrimaryKey{ - Value: pk, - } - deleteLog.PkType = int64(schemapb.DataType_Int64) - deleteLog.Ts, err = strconv.ParseUint(splits[1], 10, 64) - if err != nil { - return err - } - } else { - deleteLog.Ts = v.GetUint64("ts") - deleteLog.PkType = v.GetInt64("pkType") - switch deleteLog.PkType { - case int64(schemapb.DataType_Int64): - deleteLog.Pk = &Int64PrimaryKey{Value: v.GetInt64("pk")} - case int64(schemapb.DataType_VarChar): - deleteLog.Pk = &VarCharPrimaryKey{Value: string(v.GetStringBytes("pk"))} - } + return err } - result.Append(deleteLog.Pk, deleteLog.Ts) } } diff --git a/internal/storage/delta_data.go b/internal/storage/delta_data.go new file mode 100644 index 0000000000000..242ac84152bc1 --- /dev/null +++ b/internal/storage/delta_data.go @@ -0,0 +1,180 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/samber/lo" + "github.com/valyala/fastjson" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" +) + +// parserPool use object pooling to reduce fastjson.Parser allocation. +var parserPool = &fastjson.ParserPool{} + +// DeltaData stores delta data +// currently only delete tuples are stored +type DeltaData struct { + pkType schemapb.DataType + // delete tuples + delPks PrimaryKeys + delTss []Timestamp + + // stats + delRowCount int64 + memSize int64 +} + +type DeleteLog struct { + Pk PrimaryKey `json:"pk"` + Ts uint64 `json:"ts"` + PkType int64 `json:"pkType"` +} + +func NewDeleteLog(pk PrimaryKey, ts Timestamp) *DeleteLog { + pkType := pk.Type() + + return &DeleteLog{ + Pk: pk, + Ts: ts, + PkType: int64(pkType), + } +} + +// Parse tries to parse string format delete log +// it try json first then use "," split int,ts format +func (dl *DeleteLog) Parse(val string) error { + p := parserPool.Get() + defer parserPool.Put(p) + v, err := p.Parse(val) + if err != nil { + // compatible with versions that only support int64 type primary keys + // compatible with fmt.Sprintf("%d,%d", pk, ts) + // compatible error info (unmarshal err invalid character ',' after top-level value) + splits := strings.Split(val, ",") + if len(splits) != 2 { + return fmt.Errorf("the format of delta log is incorrect, %v can not be split", val) + } + pk, err := strconv.ParseInt(splits[0], 10, 64) + if err != nil { + return err + } + dl.Pk = &Int64PrimaryKey{ + Value: pk, + } + dl.PkType = int64(schemapb.DataType_Int64) + dl.Ts, err = strconv.ParseUint(splits[1], 10, 64) + if err != nil { + return err + } + return nil + } + + dl.Ts = v.GetUint64("ts") + dl.PkType = v.GetInt64("pkType") + switch dl.PkType { + case int64(schemapb.DataType_Int64): + dl.Pk = &Int64PrimaryKey{Value: v.GetInt64("pk")} + case int64(schemapb.DataType_VarChar): + dl.Pk = &VarCharPrimaryKey{Value: string(v.GetStringBytes("pk"))} + } + return nil +} + +func (dl *DeleteLog) UnmarshalJSON(data []byte) error { + var messageMap map[string]*json.RawMessage + var err error + if err = json.Unmarshal(data, &messageMap); err != nil { + return err + } + + if err = json.Unmarshal(*messageMap["pkType"], &dl.PkType); err != nil { + return err + } + + switch schemapb.DataType(dl.PkType) { + case schemapb.DataType_Int64: + dl.Pk = &Int64PrimaryKey{} + case schemapb.DataType_VarChar: + dl.Pk = &VarCharPrimaryKey{} + } + + if err = json.Unmarshal(*messageMap["pk"], dl.Pk); err != nil { + return err + } + + if err = json.Unmarshal(*messageMap["ts"], &dl.Ts); err != nil { + return err + } + + return nil +} + +// DeleteData saves each entity delete message represented as map. +// timestamp represents the time when this instance was deleted +type DeleteData struct { + Pks []PrimaryKey // primary keys + Tss []Timestamp // timestamps + RowCount int64 + memSize int64 +} + +func NewDeleteData(pks []PrimaryKey, tss []Timestamp) *DeleteData { + return &DeleteData{ + Pks: pks, + Tss: tss, + RowCount: int64(len(pks)), + memSize: lo.SumBy(pks, func(pk PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8), + } +} + +// Append append 1 pk&ts pair to DeleteData +func (data *DeleteData) Append(pk PrimaryKey, ts Timestamp) { + data.Pks = append(data.Pks, pk) + data.Tss = append(data.Tss, ts) + data.RowCount++ + data.memSize += pk.Size() + int64(8) +} + +// Append append 1 pk&ts pair to DeleteData +func (data *DeleteData) AppendBatch(pks []PrimaryKey, tss []Timestamp) { + data.Pks = append(data.Pks, pks...) + data.Tss = append(data.Tss, tss...) + data.RowCount += int64(len(pks)) + data.memSize += lo.SumBy(pks, func(pk PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8) +} + +func (data *DeleteData) Merge(other *DeleteData) { + data.Pks = append(data.Pks, other.Pks...) + data.Tss = append(data.Tss, other.Tss...) + data.RowCount += other.RowCount + data.memSize += other.Size() + + other.Pks = nil + other.Tss = nil + other.RowCount = 0 + other.memSize = 0 +} + +func (data *DeleteData) Size() int64 { + return data.memSize +} diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index 8858319bc1a9e..7080de02ae850 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -24,7 +24,6 @@ import ( "io" "sort" "strconv" - "strings" "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" @@ -249,25 +248,8 @@ func NewDeltalogDeserializeReader(blobs []*Blob) (*DeserializeReader[*DeleteLog] } a := r.Column(fid).(*array.String) strVal := a.Value(i) - if err = json.Unmarshal([]byte(strVal), v[i]); err != nil { - // compatible with versions that only support int64 type primary keys - // compatible with fmt.Sprintf("%d,%d", pk, ts) - // compatible error info (unmarshal err invalid character ',' after top-level value) - splits := strings.Split(strVal, ",") - if len(splits) != 2 { - return fmt.Errorf("the format of delta log is incorrect, %v can not be split", strVal) - } - pk, err := strconv.ParseInt(splits[0], 10, 64) - if err != nil { - return err - } - v[i].Pk = &Int64PrimaryKey{ - Value: pk, - } - v[i].Ts, err = strconv.ParseUint(splits[1], 10, 64) - if err != nil { - return err - } + if err := v[i].Parse(strVal); err != nil { + return err } } return nil diff --git a/internal/util/importutilv2/binlog/l0_reader.go b/internal/util/importutilv2/binlog/l0_reader.go index 06d3207361b82..cdf75b064366d 100644 --- a/internal/util/importutilv2/binlog/l0_reader.go +++ b/internal/util/importutilv2/binlog/l0_reader.go @@ -18,7 +18,6 @@ package binlog import ( "context" - "encoding/json" "fmt" "io" @@ -94,7 +93,7 @@ func (r *l0Reader) Read() (*storage.DeleteData, error) { for _, rows := range rowsSet { for _, row := range rows.([]string) { dl := &storage.DeleteLog{} - err = json.Unmarshal([]byte(row), dl) + err = dl.Parse(row) if err != nil { return nil, err } diff --git a/internal/util/importutilv2/binlog/reader.go b/internal/util/importutilv2/binlog/reader.go index 31ccc68d4a0ae..ca49b05cfafca 100644 --- a/internal/util/importutilv2/binlog/reader.go +++ b/internal/util/importutilv2/binlog/reader.go @@ -18,7 +18,6 @@ package binlog import ( "context" - "encoding/json" "fmt" "io" "math" @@ -124,7 +123,7 @@ func (r *reader) readDelete(deltaLogs []string, tsStart, tsEnd uint64) (*storage for _, rows := range rowsSet { for _, row := range rows.([]string) { dl := &storage.DeleteLog{} - err = json.Unmarshal([]byte(row), dl) + err = dl.Parse(row) if err != nil { return nil, err }