Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: binlog primary key turn off dict encoding #34358

Merged
merged 1 commit into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/storage/binlog_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func generateTestSchema() *schemapb.CollectionSchema {
{FieldID: 13, Name: "int64", DataType: schemapb.DataType_Int64},
{FieldID: 14, Name: "float", DataType: schemapb.DataType_Float},
{FieldID: 15, Name: "double", DataType: schemapb.DataType_Double},
{FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar},
{FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar, IsPrimaryKey: true},
{FieldID: 17, Name: "string", DataType: schemapb.DataType_String},
{FieldID: 18, Name: "array", DataType: schemapb.DataType_Array},
{FieldID: 19, Name: "string", DataType: schemapb.DataType_JSON},
Expand Down
10 changes: 5 additions & 5 deletions internal/storage/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
func TestInsertBinlog(t *testing.T) {
w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40, false)

e1, err := w.NextInsertEventWriter(false)
e1, err := w.NextInsertEventWriter()
assert.NoError(t, err)
err = e1.AddDataToPayload([]int64{1, 2, 3}, nil)
assert.NoError(t, err)
Expand All @@ -49,7 +49,7 @@ func TestInsertBinlog(t *testing.T) {
assert.NoError(t, err)
e1.SetEventTimestamp(100, 200)

e2, err := w.NextInsertEventWriter(false)
e2, err := w.NextInsertEventWriter()
assert.NoError(t, err)
err = e2.AddDataToPayload([]int64{7, 8, 9}, nil)
assert.NoError(t, err)
Expand Down Expand Up @@ -1329,7 +1329,7 @@ func TestNewBinlogReaderError(t *testing.T) {

w.SetEventTimeStamp(1000, 2000)

e1, err := w.NextInsertEventWriter(false)
e1, err := w.NextInsertEventWriter()
assert.NoError(t, err)
err = e1.AddDataToPayload([]int64{1, 2, 3}, nil)
assert.NoError(t, err)
Expand Down Expand Up @@ -1393,7 +1393,7 @@ func TestNewBinlogWriterTsError(t *testing.T) {

func TestInsertBinlogWriterCloseError(t *testing.T) {
insertWriter := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40, false)
e1, err := insertWriter.NextInsertEventWriter(false)
e1, err := insertWriter.NextInsertEventWriter()
assert.NoError(t, err)

sizeTotal := 2000000
Expand All @@ -1406,7 +1406,7 @@ func TestInsertBinlogWriterCloseError(t *testing.T) {
err = insertWriter.Finish()
assert.NoError(t, err)
assert.NotNil(t, insertWriter.buffer)
insertEventWriter, err := insertWriter.NextInsertEventWriter(false)
insertEventWriter, err := insertWriter.NextInsertEventWriter()
assert.Nil(t, insertEventWriter)
assert.Error(t, err)
insertWriter.Close()
Expand Down
18 changes: 4 additions & 14 deletions internal/storage/binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// BinlogType is to distinguish different files saving different data.
Expand Down Expand Up @@ -150,21 +149,12 @@ type InsertBinlogWriter struct {
}

// NextInsertEventWriter returns an event writer to write insert data to an event.
func (writer *InsertBinlogWriter) NextInsertEventWriter(nullable bool, dim ...int) (*insertEventWriter, error) {
func (writer *InsertBinlogWriter) NextInsertEventWriter(opts ...PayloadWriterOptions) (*insertEventWriter, error) {
if writer.isClosed() {
return nil, fmt.Errorf("binlog has closed")
}

var event *insertEventWriter
var err error
if typeutil.IsVectorType(writer.PayloadDataType) && !typeutil.IsSparseFloatVectorType(writer.PayloadDataType) {
if len(dim) != 1 {
return nil, fmt.Errorf("incorrect input numbers")
}
event, err = newInsertEventWriter(writer.PayloadDataType, nullable, dim[0])
} else {
event, err = newInsertEventWriter(writer.PayloadDataType, nullable)
}
event, err := newInsertEventWriter(writer.PayloadDataType, opts...)
if err != nil {
return nil, err
}
Expand All @@ -179,11 +169,11 @@ type DeleteBinlogWriter struct {
}

// NextDeleteEventWriter returns an event writer to write delete data to an event.
func (writer *DeleteBinlogWriter) NextDeleteEventWriter() (*deleteEventWriter, error) {
func (writer *DeleteBinlogWriter) NextDeleteEventWriter(opts ...PayloadWriterOptions) (*deleteEventWriter, error) {
if writer.isClosed() {
return nil, fmt.Errorf("binlog has closed")
}
event, err := newDeleteEventWriter(writer.PayloadDataType)
event, err := newDeleteEventWriter(writer.PayloadDataType, opts...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/binlog_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestBinlogWriterReader(t *testing.T) {

binlogWriter.SetEventTimeStamp(1000, 2000)
defer binlogWriter.Close()
eventWriter, err := binlogWriter.NextInsertEventWriter(false)
eventWriter, err := binlogWriter.NextInsertEventWriter()
assert.NoError(t, err)
err = eventWriter.AddInt32ToPayload([]int32{1, 2, 3}, nil)
assert.NoError(t, err)
Expand Down
37 changes: 13 additions & 24 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,31 +243,18 @@
for _, field := range insertCodec.Schema.Schema.Fields {
// encode fields
writer = NewInsertBinlogWriter(field.DataType, insertCodec.Schema.ID, partitionID, segmentID, field.FieldID, field.GetNullable())
var eventWriter *insertEventWriter
var err error
var dim int64
if typeutil.IsVectorType(field.DataType) {
if field.GetNullable() {
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("vectorType not support null, fieldName: %s", field.GetName()))
}
switch field.DataType {
case schemapb.DataType_FloatVector,
schemapb.DataType_BinaryVector,
schemapb.DataType_Float16Vector,
schemapb.DataType_BFloat16Vector:
dim, err = typeutil.GetDim(field)
if err != nil {
return nil, err
}
eventWriter, err = writer.NextInsertEventWriter(field.GetNullable(), int(dim))
case schemapb.DataType_SparseFloatVector:
eventWriter, err = writer.NextInsertEventWriter(field.GetNullable())
default:
return nil, fmt.Errorf("undefined data type %d", field.DataType)

// get payload writing configs, including nullable and fallback encoding method
opts := []PayloadWriterOptions{WithNullable(field.GetNullable()), WithWriterProps(getFieldWriterProps(field))}

if typeutil.IsVectorType(field.DataType) && !typeutil.IsSparseFloatVectorType(field.DataType) {
dim, err := typeutil.GetDim(field)
if err != nil {
return nil, err

Check warning on line 253 in internal/storage/data_codec.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/data_codec.go#L253

Added line #L253 was not covered by tests
}
} else {
eventWriter, err = writer.NextInsertEventWriter(field.GetNullable())
opts = append(opts, WithDim(int(dim)))
}
eventWriter, err := writer.NextInsertEventWriter(opts...)
if err != nil {
writer.Close()
return nil, err
Expand Down Expand Up @@ -711,7 +698,9 @@
// For each delete message, it will save "pk,ts" string to binlog.
func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, data *DeleteData) (*Blob, error) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, collectionID, partitionID, segmentID)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
field := &schemapb.FieldSchema{IsPrimaryKey: true, DataType: schemapb.DataType_String}
opts := []PayloadWriterOptions{WithWriterProps(getFieldWriterProps(field))}
eventWriter, err := binlogWriter.NextDeleteEventWriter(opts...)
if err != nil {
binlogWriter.Close()
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/data_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ func TestDeleteData(t *testing.T) {

func TestAddFieldDataToPayload(t *testing.T) {
w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40, false)
e, _ := w.NextInsertEventWriter(false)
e, _ := w.NextInsertEventWriter()
var err error
err = AddFieldDataToPayload(e, schemapb.DataType_Bool, &BoolFieldData{[]bool{}, nil})
assert.Error(t, err)
Expand Down
22 changes: 11 additions & 11 deletions internal/storage/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestInsertEvent(t *testing.T) {
}

t.Run("insert_bool", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Bool, false)
w, err := newInsertEventWriter(schemapb.DataType_Bool)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Bool, w,
func(w *insertEventWriter) error {
Expand All @@ -211,7 +211,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_int8", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Int8, false)
w, err := newInsertEventWriter(schemapb.DataType_Int8)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Int8, w,
func(w *insertEventWriter) error {
Expand All @@ -227,7 +227,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_int16", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Int16, false)
w, err := newInsertEventWriter(schemapb.DataType_Int16)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Int16, w,
func(w *insertEventWriter) error {
Expand All @@ -243,7 +243,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_int32", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Int32, false)
w, err := newInsertEventWriter(schemapb.DataType_Int32)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Int32, w,
func(w *insertEventWriter) error {
Expand All @@ -259,7 +259,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_int64", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Int64, false)
w, err := newInsertEventWriter(schemapb.DataType_Int64)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Int64, w,
func(w *insertEventWriter) error {
Expand All @@ -275,7 +275,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_float32", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Float, false)
w, err := newInsertEventWriter(schemapb.DataType_Float)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Float, w,
func(w *insertEventWriter) error {
Expand All @@ -291,7 +291,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_float64", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Double, false)
w, err := newInsertEventWriter(schemapb.DataType_Double)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Double, w,
func(w *insertEventWriter) error {
Expand All @@ -307,7 +307,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_binary_vector", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_BinaryVector, false, 16)
w, err := newInsertEventWriter(schemapb.DataType_BinaryVector, WithDim(16))
assert.NoError(t, err)
insertT(t, schemapb.DataType_BinaryVector, w,
func(w *insertEventWriter) error {
Expand All @@ -323,7 +323,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_float_vector", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_FloatVector, false, 2)
w, err := newInsertEventWriter(schemapb.DataType_FloatVector, WithDim(2))
assert.NoError(t, err)
insertT(t, schemapb.DataType_FloatVector, w,
func(w *insertEventWriter) error {
Expand All @@ -339,7 +339,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_string", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_String, false)
w, err := newInsertEventWriter(schemapb.DataType_String)
assert.NoError(t, err)
w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload("1234", nil)
Expand Down Expand Up @@ -1101,7 +1101,7 @@ func TestEventReaderError(t *testing.T) {
}

func TestEventClose(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_String, false)
w, err := newInsertEventWriter(schemapb.DataType_String)
assert.NoError(t, err)
w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload("1234", nil)
Expand Down
29 changes: 9 additions & 20 deletions internal/storage/event_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ package storage
import (
"bytes"
"encoding/binary"
"fmt"
"io"

"github.com/cockroachdb/errors"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// EventTypeCode represents event type by code
Expand Down Expand Up @@ -222,17 +220,8 @@ func NewBaseDescriptorEvent(collectionID int64, partitionID int64, segmentID int
return de
}

func newInsertEventWriter(dataType schemapb.DataType, nullable bool, dim ...int) (*insertEventWriter, error) {
var payloadWriter PayloadWriterInterface
var err error
if typeutil.IsVectorType(dataType) && !typeutil.IsSparseFloatVectorType(dataType) {
if len(dim) != 1 {
return nil, fmt.Errorf("incorrect input numbers")
}
payloadWriter, err = NewPayloadWriter(dataType, nullable, dim[0])
} else {
payloadWriter, err = NewPayloadWriter(dataType, nullable)
}
func newInsertEventWriter(dataType schemapb.DataType, opts ...PayloadWriterOptions) (*insertEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType, opts...)
if err != nil {
return nil, err
}
Expand All @@ -253,8 +242,8 @@ func newInsertEventWriter(dataType schemapb.DataType, nullable bool, dim ...int)
return writer, nil
}

func newDeleteEventWriter(dataType schemapb.DataType) (*deleteEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType, false)
func newDeleteEventWriter(dataType schemapb.DataType, opts ...PayloadWriterOptions) (*deleteEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType, opts...)
if err != nil {
return nil, err
}
Expand All @@ -280,7 +269,7 @@ func newCreateCollectionEventWriter(dataType schemapb.DataType) (*createCollecti
return nil, errors.New("incorrect data type")
}

payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand All @@ -306,7 +295,7 @@ func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEv
return nil, errors.New("incorrect data type")
}

payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand All @@ -332,7 +321,7 @@ func newCreatePartitionEventWriter(dataType schemapb.DataType) (*createPartition
return nil, errors.New("incorrect data type")
}

payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand All @@ -358,7 +347,7 @@ func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEven
return nil, errors.New("incorrect data type")
}

payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand All @@ -380,7 +369,7 @@ func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEven
}

func newIndexFileEventWriter(dataType schemapb.DataType) (*indexFileEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/event_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func TestSizeofStruct(t *testing.T) {
}

func TestEventWriter(t *testing.T) {
insertEvent, err := newInsertEventWriter(schemapb.DataType_Int32, false)
insertEvent, err := newInsertEventWriter(schemapb.DataType_Int32)
assert.NoError(t, err)
insertEvent.Close()

insertEvent, err = newInsertEventWriter(schemapb.DataType_Int32, false)
insertEvent, err = newInsertEventWriter(schemapb.DataType_Int32)
assert.NoError(t, err)
defer insertEvent.Close()

Expand Down
Loading
Loading