Skip to content

Commit

Permalink
enhance: binlog primary key turn off dict encoding (#34358)
Browse files Browse the repository at this point in the history
issue: #34357 

Go Parquet uses dictionary encoding by default, and it will fall back to
plain encoding if the dictionary size exceeds the dictionary size page
limit. Users can specify custom fallback encoding by using
`parquet.WithEncoding(ENCODING_METHOD)` in writer properties. However,
Go Parquet [fallbacks to plain
encoding](https://github.com/apache/arrow/blob/e65c1e295d82c7076df484089a63fa3ba2bd55d1/go/parquet/file/column_writer_types.gen.go.tmpl#L238)
rather than custom encoding method users provide. Therefore, this patch
only turns off dictionary encoding for the primary key.

With a 5 million auto ID primary key benchmark, the parquet file size
improves from 13.93 MB to 8.36 MB when dictionary encoding is turned
off, reducing primary key storage space by 40%.

Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang authored Jul 17, 2024
1 parent 67324eb commit 88b373b
Show file tree
Hide file tree
Showing 18 changed files with 329 additions and 286 deletions.
2 changes: 1 addition & 1 deletion internal/storage/binlog_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func generateTestSchema() *schemapb.CollectionSchema {
{FieldID: 13, Name: "int64", DataType: schemapb.DataType_Int64},
{FieldID: 14, Name: "float", DataType: schemapb.DataType_Float},
{FieldID: 15, Name: "double", DataType: schemapb.DataType_Double},
{FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar},
{FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar, IsPrimaryKey: true},
{FieldID: 17, Name: "string", DataType: schemapb.DataType_String},
{FieldID: 18, Name: "array", DataType: schemapb.DataType_Array},
{FieldID: 19, Name: "string", DataType: schemapb.DataType_JSON},
Expand Down
10 changes: 5 additions & 5 deletions internal/storage/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
func TestInsertBinlog(t *testing.T) {
w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40, false)

e1, err := w.NextInsertEventWriter(false)
e1, err := w.NextInsertEventWriter()
assert.NoError(t, err)
err = e1.AddDataToPayload([]int64{1, 2, 3}, nil)
assert.NoError(t, err)
Expand All @@ -49,7 +49,7 @@ func TestInsertBinlog(t *testing.T) {
assert.NoError(t, err)
e1.SetEventTimestamp(100, 200)

e2, err := w.NextInsertEventWriter(false)
e2, err := w.NextInsertEventWriter()
assert.NoError(t, err)
err = e2.AddDataToPayload([]int64{7, 8, 9}, nil)
assert.NoError(t, err)
Expand Down Expand Up @@ -1329,7 +1329,7 @@ func TestNewBinlogReaderError(t *testing.T) {

w.SetEventTimeStamp(1000, 2000)

e1, err := w.NextInsertEventWriter(false)
e1, err := w.NextInsertEventWriter()
assert.NoError(t, err)
err = e1.AddDataToPayload([]int64{1, 2, 3}, nil)
assert.NoError(t, err)
Expand Down Expand Up @@ -1393,7 +1393,7 @@ func TestNewBinlogWriterTsError(t *testing.T) {

func TestInsertBinlogWriterCloseError(t *testing.T) {
insertWriter := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40, false)
e1, err := insertWriter.NextInsertEventWriter(false)
e1, err := insertWriter.NextInsertEventWriter()
assert.NoError(t, err)

sizeTotal := 2000000
Expand All @@ -1406,7 +1406,7 @@ func TestInsertBinlogWriterCloseError(t *testing.T) {
err = insertWriter.Finish()
assert.NoError(t, err)
assert.NotNil(t, insertWriter.buffer)
insertEventWriter, err := insertWriter.NextInsertEventWriter(false)
insertEventWriter, err := insertWriter.NextInsertEventWriter()
assert.Nil(t, insertEventWriter)
assert.Error(t, err)
insertWriter.Close()
Expand Down
18 changes: 4 additions & 14 deletions internal/storage/binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// BinlogType is to distinguish different files saving different data.
Expand Down Expand Up @@ -150,21 +149,12 @@ type InsertBinlogWriter struct {
}

// NextInsertEventWriter returns an event writer to write insert data to an event.
func (writer *InsertBinlogWriter) NextInsertEventWriter(nullable bool, dim ...int) (*insertEventWriter, error) {
func (writer *InsertBinlogWriter) NextInsertEventWriter(opts ...PayloadWriterOptions) (*insertEventWriter, error) {
if writer.isClosed() {
return nil, fmt.Errorf("binlog has closed")
}

var event *insertEventWriter
var err error
if typeutil.IsVectorType(writer.PayloadDataType) && !typeutil.IsSparseFloatVectorType(writer.PayloadDataType) {
if len(dim) != 1 {
return nil, fmt.Errorf("incorrect input numbers")
}
event, err = newInsertEventWriter(writer.PayloadDataType, nullable, dim[0])
} else {
event, err = newInsertEventWriter(writer.PayloadDataType, nullable)
}
event, err := newInsertEventWriter(writer.PayloadDataType, opts...)
if err != nil {
return nil, err
}
Expand All @@ -179,11 +169,11 @@ type DeleteBinlogWriter struct {
}

// NextDeleteEventWriter returns an event writer to write delete data to an event.
func (writer *DeleteBinlogWriter) NextDeleteEventWriter() (*deleteEventWriter, error) {
func (writer *DeleteBinlogWriter) NextDeleteEventWriter(opts ...PayloadWriterOptions) (*deleteEventWriter, error) {
if writer.isClosed() {
return nil, fmt.Errorf("binlog has closed")
}
event, err := newDeleteEventWriter(writer.PayloadDataType)
event, err := newDeleteEventWriter(writer.PayloadDataType, opts...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/binlog_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestBinlogWriterReader(t *testing.T) {

binlogWriter.SetEventTimeStamp(1000, 2000)
defer binlogWriter.Close()
eventWriter, err := binlogWriter.NextInsertEventWriter(false)
eventWriter, err := binlogWriter.NextInsertEventWriter()
assert.NoError(t, err)
err = eventWriter.AddInt32ToPayload([]int32{1, 2, 3}, nil)
assert.NoError(t, err)
Expand Down
37 changes: 13 additions & 24 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,31 +243,18 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
for _, field := range insertCodec.Schema.Schema.Fields {
// encode fields
writer = NewInsertBinlogWriter(field.DataType, insertCodec.Schema.ID, partitionID, segmentID, field.FieldID, field.GetNullable())
var eventWriter *insertEventWriter
var err error
var dim int64
if typeutil.IsVectorType(field.DataType) {
if field.GetNullable() {
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("vectorType not support null, fieldName: %s", field.GetName()))
}
switch field.DataType {
case schemapb.DataType_FloatVector,
schemapb.DataType_BinaryVector,
schemapb.DataType_Float16Vector,
schemapb.DataType_BFloat16Vector:
dim, err = typeutil.GetDim(field)
if err != nil {
return nil, err
}
eventWriter, err = writer.NextInsertEventWriter(field.GetNullable(), int(dim))
case schemapb.DataType_SparseFloatVector:
eventWriter, err = writer.NextInsertEventWriter(field.GetNullable())
default:
return nil, fmt.Errorf("undefined data type %d", field.DataType)

// get payload writing configs, including nullable and fallback encoding method
opts := []PayloadWriterOptions{WithNullable(field.GetNullable()), WithWriterProps(getFieldWriterProps(field))}

if typeutil.IsVectorType(field.DataType) && !typeutil.IsSparseFloatVectorType(field.DataType) {
dim, err := typeutil.GetDim(field)
if err != nil {
return nil, err
}
} else {
eventWriter, err = writer.NextInsertEventWriter(field.GetNullable())
opts = append(opts, WithDim(int(dim)))
}
eventWriter, err := writer.NextInsertEventWriter(opts...)
if err != nil {
writer.Close()
return nil, err
Expand Down Expand Up @@ -711,7 +698,9 @@ func NewDeleteCodec() *DeleteCodec {
// For each delete message, it will save "pk,ts" string to binlog.
func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, data *DeleteData) (*Blob, error) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, collectionID, partitionID, segmentID)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
field := &schemapb.FieldSchema{IsPrimaryKey: true, DataType: schemapb.DataType_String}
opts := []PayloadWriterOptions{WithWriterProps(getFieldWriterProps(field))}
eventWriter, err := binlogWriter.NextDeleteEventWriter(opts...)
if err != nil {
binlogWriter.Close()
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/data_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ func TestDeleteData(t *testing.T) {

func TestAddFieldDataToPayload(t *testing.T) {
w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40, false)
e, _ := w.NextInsertEventWriter(false)
e, _ := w.NextInsertEventWriter()
var err error
err = AddFieldDataToPayload(e, schemapb.DataType_Bool, &BoolFieldData{[]bool{}, nil})
assert.Error(t, err)
Expand Down
22 changes: 11 additions & 11 deletions internal/storage/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestInsertEvent(t *testing.T) {
}

t.Run("insert_bool", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Bool, false)
w, err := newInsertEventWriter(schemapb.DataType_Bool)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Bool, w,
func(w *insertEventWriter) error {
Expand All @@ -211,7 +211,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_int8", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Int8, false)
w, err := newInsertEventWriter(schemapb.DataType_Int8)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Int8, w,
func(w *insertEventWriter) error {
Expand All @@ -227,7 +227,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_int16", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Int16, false)
w, err := newInsertEventWriter(schemapb.DataType_Int16)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Int16, w,
func(w *insertEventWriter) error {
Expand All @@ -243,7 +243,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_int32", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Int32, false)
w, err := newInsertEventWriter(schemapb.DataType_Int32)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Int32, w,
func(w *insertEventWriter) error {
Expand All @@ -259,7 +259,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_int64", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Int64, false)
w, err := newInsertEventWriter(schemapb.DataType_Int64)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Int64, w,
func(w *insertEventWriter) error {
Expand All @@ -275,7 +275,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_float32", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Float, false)
w, err := newInsertEventWriter(schemapb.DataType_Float)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Float, w,
func(w *insertEventWriter) error {
Expand All @@ -291,7 +291,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_float64", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_Double, false)
w, err := newInsertEventWriter(schemapb.DataType_Double)
assert.NoError(t, err)
insertT(t, schemapb.DataType_Double, w,
func(w *insertEventWriter) error {
Expand All @@ -307,7 +307,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_binary_vector", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_BinaryVector, false, 16)
w, err := newInsertEventWriter(schemapb.DataType_BinaryVector, WithDim(16))
assert.NoError(t, err)
insertT(t, schemapb.DataType_BinaryVector, w,
func(w *insertEventWriter) error {
Expand All @@ -323,7 +323,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_float_vector", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_FloatVector, false, 2)
w, err := newInsertEventWriter(schemapb.DataType_FloatVector, WithDim(2))
assert.NoError(t, err)
insertT(t, schemapb.DataType_FloatVector, w,
func(w *insertEventWriter) error {
Expand All @@ -339,7 +339,7 @@ func TestInsertEvent(t *testing.T) {
})

t.Run("insert_string", func(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_String, false)
w, err := newInsertEventWriter(schemapb.DataType_String)
assert.NoError(t, err)
w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload("1234", nil)
Expand Down Expand Up @@ -1101,7 +1101,7 @@ func TestEventReaderError(t *testing.T) {
}

func TestEventClose(t *testing.T) {
w, err := newInsertEventWriter(schemapb.DataType_String, false)
w, err := newInsertEventWriter(schemapb.DataType_String)
assert.NoError(t, err)
w.SetEventTimestamp(tsoutil.ComposeTS(10, 0), tsoutil.ComposeTS(100, 0))
err = w.AddDataToPayload("1234", nil)
Expand Down
29 changes: 9 additions & 20 deletions internal/storage/event_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ package storage
import (
"bytes"
"encoding/binary"
"fmt"
"io"

"github.com/cockroachdb/errors"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// EventTypeCode represents event type by code
Expand Down Expand Up @@ -222,17 +220,8 @@ func NewBaseDescriptorEvent(collectionID int64, partitionID int64, segmentID int
return de
}

func newInsertEventWriter(dataType schemapb.DataType, nullable bool, dim ...int) (*insertEventWriter, error) {
var payloadWriter PayloadWriterInterface
var err error
if typeutil.IsVectorType(dataType) && !typeutil.IsSparseFloatVectorType(dataType) {
if len(dim) != 1 {
return nil, fmt.Errorf("incorrect input numbers")
}
payloadWriter, err = NewPayloadWriter(dataType, nullable, dim[0])
} else {
payloadWriter, err = NewPayloadWriter(dataType, nullable)
}
func newInsertEventWriter(dataType schemapb.DataType, opts ...PayloadWriterOptions) (*insertEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType, opts...)
if err != nil {
return nil, err
}
Expand All @@ -253,8 +242,8 @@ func newInsertEventWriter(dataType schemapb.DataType, nullable bool, dim ...int)
return writer, nil
}

func newDeleteEventWriter(dataType schemapb.DataType) (*deleteEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType, false)
func newDeleteEventWriter(dataType schemapb.DataType, opts ...PayloadWriterOptions) (*deleteEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType, opts...)
if err != nil {
return nil, err
}
Expand All @@ -280,7 +269,7 @@ func newCreateCollectionEventWriter(dataType schemapb.DataType) (*createCollecti
return nil, errors.New("incorrect data type")
}

payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand All @@ -306,7 +295,7 @@ func newDropCollectionEventWriter(dataType schemapb.DataType) (*dropCollectionEv
return nil, errors.New("incorrect data type")
}

payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand All @@ -332,7 +321,7 @@ func newCreatePartitionEventWriter(dataType schemapb.DataType) (*createPartition
return nil, errors.New("incorrect data type")
}

payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand All @@ -358,7 +347,7 @@ func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEven
return nil, errors.New("incorrect data type")
}

payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand All @@ -380,7 +369,7 @@ func newDropPartitionEventWriter(dataType schemapb.DataType) (*dropPartitionEven
}

func newIndexFileEventWriter(dataType schemapb.DataType) (*indexFileEventWriter, error) {
payloadWriter, err := NewPayloadWriter(dataType, false)
payloadWriter, err := NewPayloadWriter(dataType)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/event_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func TestSizeofStruct(t *testing.T) {
}

func TestEventWriter(t *testing.T) {
insertEvent, err := newInsertEventWriter(schemapb.DataType_Int32, false)
insertEvent, err := newInsertEventWriter(schemapb.DataType_Int32)
assert.NoError(t, err)
insertEvent.Close()

insertEvent, err = newInsertEventWriter(schemapb.DataType_Int32, false)
insertEvent, err = newInsertEventWriter(schemapb.DataType_Int32)
assert.NoError(t, err)
defer insertEvent.Close()

Expand Down
Loading

0 comments on commit 88b373b

Please sign in to comment.