From e2a93105430c4291714ccaaae248d174fd74a651 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 26 Oct 2023 10:30:02 +0800 Subject: [PATCH] Support Array datatype (#607) Signed-off-by: Congqi Xia --- entity/columns.go | 83 ++++ entity/columns_array_gen.go | 805 +++++++++++++++++++++++++++++++++ entity/columns_varchar_test.go | 6 +- entity/genarray/gen_array.go | 171 +++++++ entity/schema.go | 30 +- entity/schema_test.go | 3 + examples/array/hello_array.go | 216 +++++++++ 7 files changed, 1308 insertions(+), 6 deletions(-) create mode 100755 entity/columns_array_gen.go create mode 100644 entity/genarray/gen_array.go create mode 100644 examples/array/hello_array.go diff --git a/entity/columns.go b/entity/columns.go index 9c1ebf72..b51a053d 100644 --- a/entity/columns.go +++ b/entity/columns.go @@ -239,6 +239,13 @@ func FieldDataColumn(fd *schema.FieldData, begin, end int) (Column, error) { } return NewColumnVarChar(fd.GetFieldName(), data.StringData.GetData()[begin:end]), nil + case schema.DataType_Array: + data := fd.GetScalars().GetArrayData() + if data == nil { + return nil, errFieldDataTypeNotMatch + } + return parseArrayData(fd.GetFieldName(), data) + case schema.DataType_JSON: data, ok := fd.GetScalars().GetData().(*schema.ScalarField_JsonData) isDynamic := fd.GetIsDynamic() @@ -297,6 +304,82 @@ func FieldDataColumn(fd *schema.FieldData, begin, end int) (Column, error) { } } +func parseArrayData(fieldName string, array *schema.ArrayArray) (Column, error) { + fieldDataList := array.Data + elementType := array.ElementType + + switch elementType { + case schema.DataType_Bool: + var data [][]bool + for _, fd := range fieldDataList { + data = append(data, fd.GetBoolData().GetData()) + } + return NewColumnBoolArray(fieldName, data), nil + + case schema.DataType_Int8: + var data [][]int8 + for _, fd := range fieldDataList { + raw := fd.GetIntData().Data + row := make([]int8, 0, len(raw)) + for _, item := range raw { + row = append(row, int8(item)) + } + data = append(data, row) + } + return NewColumnInt8Array(fieldName, data), nil + + case schema.DataType_Int16: + var data [][]int16 + for _, fd := range fieldDataList { + raw := fd.GetIntData().Data + row := make([]int16, 0, len(raw)) + for _, item := range raw { + row = append(row, int16(item)) + } + data = append(data, row) + } + return NewColumnInt16Array(fieldName, data), nil + + case schema.DataType_Int32: + var data [][]int32 + for _, fd := range fieldDataList { + data = append(data, fd.GetIntData().GetData()) + } + return NewColumnInt32Array(fieldName, data), nil + + case schema.DataType_Int64: + var data [][]int64 + for _, fd := range fieldDataList { + data = append(data, fd.GetLongData().GetData()) + } + return NewColumnInt64Array(fieldName, data), nil + + case schema.DataType_Float: + var data [][]float32 + for _, fd := range fieldDataList { + data = append(data, fd.GetFloatData().GetData()) + } + return NewColumnFloatArray(fieldName, data), nil + + case schema.DataType_Double: + var data [][]float64 + for _, fd := range fieldDataList { + data = append(data, fd.GetDoubleData().GetData()) + } + return NewColumnDoubleArray(fieldName, data), nil + + case schema.DataType_VarChar: + var data [][][]byte + for _, fd := range fieldDataList { + data = append(data, fd.GetBytesData().GetData()) + } + return NewColumnVarCharArray(fieldName, data), nil + + default: + return nil, fmt.Errorf("unsupported element type %s", elementType) + } +} + // getIntData get int32 slice from result field data // also handles LongData bug (see also https://github.com/milvus-io/milvus/issues/23850) func getIntData(fd *schema.FieldData) (*schema.ScalarField_IntData, bool) { diff --git a/entity/columns_array_gen.go b/entity/columns_array_gen.go new file mode 100755 index 00000000..60eb25ea --- /dev/null +++ b/entity/columns_array_gen.go @@ -0,0 +1,805 @@ +// Code generated by go generate; DO NOT EDIT +// This file is generated by go generate + +package entity + +import ( + "fmt" + + "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" +) + + +// ColumnBoolArray generated columns type for Bool +type ColumnBoolArray struct { + ColumnBase + name string + values [][]bool +} + +// Name returns column name +func (c *ColumnBoolArray) Name() string { + return c.name +} + +// Type returns column FieldType +func (c *ColumnBoolArray) Type() FieldType { + return FieldTypeArray +} + +// Len returns column values length +func (c *ColumnBoolArray) Len() int { + return len(c.values) +} + +// Get returns value at index as interface{}. +func (c *ColumnBoolArray) Get(idx int) (interface{}, error) { + var r []bool // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// FieldData return column data mapped to schemapb.FieldData +func (c *ColumnBoolArray) FieldData() *schemapb.FieldData { + fd := &schemapb.FieldData{ + Type: schemapb.DataType_Array, + FieldName: c.name, + } + + data := make([]*schemapb.ScalarField, 0, c.Len()) + for _, arr := range c.values { + converted := make([]bool, 0, c.Len()) + for i := 0; i < len(arr); i++ { + converted = append(converted, bool(arr[i])) + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BoolData{ + BoolData: &schemapb.BoolArray{ + Data: converted, + }, + }, + }) + } + fd.Field = &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_ArrayData{ + ArrayData: &schemapb.ArrayArray{ + Data: data, + ElementType: schemapb.DataType_Bool, + }, + }, + }, + } + return fd +} + +// ValueByIdx returns value of the provided index +// error occurs when index out of range +func (c *ColumnBoolArray) ValueByIdx(idx int) ([]bool, error) { + var r []bool // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// AppendValue append value into column +func(c *ColumnBoolArray) AppendValue(i interface{}) error { + v, ok := i.([]bool) + if !ok { + return fmt.Errorf("invalid type, expected []bool, got %T", i) + } + c.values = append(c.values, v) + + return nil +} + +// Data returns column data +func (c *ColumnBoolArray) Data() [][]bool { + return c.values +} + +// NewColumnBool auto generated constructor +func NewColumnBoolArray(name string, values [][]bool) *ColumnBoolArray { + return &ColumnBoolArray { + name: name, + values: values, + } +} + +// ColumnInt8Array generated columns type for Int8 +type ColumnInt8Array struct { + ColumnBase + name string + values [][]int8 +} + +// Name returns column name +func (c *ColumnInt8Array) Name() string { + return c.name +} + +// Type returns column FieldType +func (c *ColumnInt8Array) Type() FieldType { + return FieldTypeArray +} + +// Len returns column values length +func (c *ColumnInt8Array) Len() int { + return len(c.values) +} + +// Get returns value at index as interface{}. +func (c *ColumnInt8Array) Get(idx int) (interface{}, error) { + var r []int8 // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// FieldData return column data mapped to schemapb.FieldData +func (c *ColumnInt8Array) FieldData() *schemapb.FieldData { + fd := &schemapb.FieldData{ + Type: schemapb.DataType_Array, + FieldName: c.name, + } + + data := make([]*schemapb.ScalarField, 0, c.Len()) + for _, arr := range c.values { + converted := make([]int32, 0, c.Len()) + for i := 0; i < len(arr); i++ { + converted = append(converted, int32(arr[i])) + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: converted, + }, + }, + }) + } + fd.Field = &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_ArrayData{ + ArrayData: &schemapb.ArrayArray{ + Data: data, + ElementType: schemapb.DataType_Int8, + }, + }, + }, + } + return fd +} + +// ValueByIdx returns value of the provided index +// error occurs when index out of range +func (c *ColumnInt8Array) ValueByIdx(idx int) ([]int8, error) { + var r []int8 // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// AppendValue append value into column +func(c *ColumnInt8Array) AppendValue(i interface{}) error { + v, ok := i.([]int8) + if !ok { + return fmt.Errorf("invalid type, expected []int8, got %T", i) + } + c.values = append(c.values, v) + + return nil +} + +// Data returns column data +func (c *ColumnInt8Array) Data() [][]int8 { + return c.values +} + +// NewColumnInt8 auto generated constructor +func NewColumnInt8Array(name string, values [][]int8) *ColumnInt8Array { + return &ColumnInt8Array { + name: name, + values: values, + } +} + +// ColumnInt16Array generated columns type for Int16 +type ColumnInt16Array struct { + ColumnBase + name string + values [][]int16 +} + +// Name returns column name +func (c *ColumnInt16Array) Name() string { + return c.name +} + +// Type returns column FieldType +func (c *ColumnInt16Array) Type() FieldType { + return FieldTypeArray +} + +// Len returns column values length +func (c *ColumnInt16Array) Len() int { + return len(c.values) +} + +// Get returns value at index as interface{}. +func (c *ColumnInt16Array) Get(idx int) (interface{}, error) { + var r []int16 // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// FieldData return column data mapped to schemapb.FieldData +func (c *ColumnInt16Array) FieldData() *schemapb.FieldData { + fd := &schemapb.FieldData{ + Type: schemapb.DataType_Array, + FieldName: c.name, + } + + data := make([]*schemapb.ScalarField, 0, c.Len()) + for _, arr := range c.values { + converted := make([]int32, 0, c.Len()) + for i := 0; i < len(arr); i++ { + converted = append(converted, int32(arr[i])) + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: converted, + }, + }, + }) + } + fd.Field = &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_ArrayData{ + ArrayData: &schemapb.ArrayArray{ + Data: data, + ElementType: schemapb.DataType_Int16, + }, + }, + }, + } + return fd +} + +// ValueByIdx returns value of the provided index +// error occurs when index out of range +func (c *ColumnInt16Array) ValueByIdx(idx int) ([]int16, error) { + var r []int16 // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// AppendValue append value into column +func(c *ColumnInt16Array) AppendValue(i interface{}) error { + v, ok := i.([]int16) + if !ok { + return fmt.Errorf("invalid type, expected []int16, got %T", i) + } + c.values = append(c.values, v) + + return nil +} + +// Data returns column data +func (c *ColumnInt16Array) Data() [][]int16 { + return c.values +} + +// NewColumnInt16 auto generated constructor +func NewColumnInt16Array(name string, values [][]int16) *ColumnInt16Array { + return &ColumnInt16Array { + name: name, + values: values, + } +} + +// ColumnInt32Array generated columns type for Int32 +type ColumnInt32Array struct { + ColumnBase + name string + values [][]int32 +} + +// Name returns column name +func (c *ColumnInt32Array) Name() string { + return c.name +} + +// Type returns column FieldType +func (c *ColumnInt32Array) Type() FieldType { + return FieldTypeArray +} + +// Len returns column values length +func (c *ColumnInt32Array) Len() int { + return len(c.values) +} + +// Get returns value at index as interface{}. +func (c *ColumnInt32Array) Get(idx int) (interface{}, error) { + var r []int32 // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// FieldData return column data mapped to schemapb.FieldData +func (c *ColumnInt32Array) FieldData() *schemapb.FieldData { + fd := &schemapb.FieldData{ + Type: schemapb.DataType_Array, + FieldName: c.name, + } + + data := make([]*schemapb.ScalarField, 0, c.Len()) + for _, arr := range c.values { + converted := make([]int32, 0, c.Len()) + for i := 0; i < len(arr); i++ { + converted = append(converted, int32(arr[i])) + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: converted, + }, + }, + }) + } + fd.Field = &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_ArrayData{ + ArrayData: &schemapb.ArrayArray{ + Data: data, + ElementType: schemapb.DataType_Int32, + }, + }, + }, + } + return fd +} + +// ValueByIdx returns value of the provided index +// error occurs when index out of range +func (c *ColumnInt32Array) ValueByIdx(idx int) ([]int32, error) { + var r []int32 // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// AppendValue append value into column +func(c *ColumnInt32Array) AppendValue(i interface{}) error { + v, ok := i.([]int32) + if !ok { + return fmt.Errorf("invalid type, expected []int32, got %T", i) + } + c.values = append(c.values, v) + + return nil +} + +// Data returns column data +func (c *ColumnInt32Array) Data() [][]int32 { + return c.values +} + +// NewColumnInt32 auto generated constructor +func NewColumnInt32Array(name string, values [][]int32) *ColumnInt32Array { + return &ColumnInt32Array { + name: name, + values: values, + } +} + +// ColumnInt64Array generated columns type for Int64 +type ColumnInt64Array struct { + ColumnBase + name string + values [][]int64 +} + +// Name returns column name +func (c *ColumnInt64Array) Name() string { + return c.name +} + +// Type returns column FieldType +func (c *ColumnInt64Array) Type() FieldType { + return FieldTypeArray +} + +// Len returns column values length +func (c *ColumnInt64Array) Len() int { + return len(c.values) +} + +// Get returns value at index as interface{}. +func (c *ColumnInt64Array) Get(idx int) (interface{}, error) { + var r []int64 // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// FieldData return column data mapped to schemapb.FieldData +func (c *ColumnInt64Array) FieldData() *schemapb.FieldData { + fd := &schemapb.FieldData{ + Type: schemapb.DataType_Array, + FieldName: c.name, + } + + data := make([]*schemapb.ScalarField, 0, c.Len()) + for _, arr := range c.values { + converted := make([]int64, 0, c.Len()) + for i := 0; i < len(arr); i++ { + converted = append(converted, int64(arr[i])) + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: converted, + }, + }, + }) + } + fd.Field = &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_ArrayData{ + ArrayData: &schemapb.ArrayArray{ + Data: data, + ElementType: schemapb.DataType_Int64, + }, + }, + }, + } + return fd +} + +// ValueByIdx returns value of the provided index +// error occurs when index out of range +func (c *ColumnInt64Array) ValueByIdx(idx int) ([]int64, error) { + var r []int64 // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// AppendValue append value into column +func(c *ColumnInt64Array) AppendValue(i interface{}) error { + v, ok := i.([]int64) + if !ok { + return fmt.Errorf("invalid type, expected []int64, got %T", i) + } + c.values = append(c.values, v) + + return nil +} + +// Data returns column data +func (c *ColumnInt64Array) Data() [][]int64 { + return c.values +} + +// NewColumnInt64 auto generated constructor +func NewColumnInt64Array(name string, values [][]int64) *ColumnInt64Array { + return &ColumnInt64Array { + name: name, + values: values, + } +} + +// ColumnFloatArray generated columns type for Float +type ColumnFloatArray struct { + ColumnBase + name string + values [][]float32 +} + +// Name returns column name +func (c *ColumnFloatArray) Name() string { + return c.name +} + +// Type returns column FieldType +func (c *ColumnFloatArray) Type() FieldType { + return FieldTypeArray +} + +// Len returns column values length +func (c *ColumnFloatArray) Len() int { + return len(c.values) +} + +// Get returns value at index as interface{}. +func (c *ColumnFloatArray) Get(idx int) (interface{}, error) { + var r []float32 // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// FieldData return column data mapped to schemapb.FieldData +func (c *ColumnFloatArray) FieldData() *schemapb.FieldData { + fd := &schemapb.FieldData{ + Type: schemapb.DataType_Array, + FieldName: c.name, + } + + data := make([]*schemapb.ScalarField, 0, c.Len()) + for _, arr := range c.values { + converted := make([]float32, 0, c.Len()) + for i := 0; i < len(arr); i++ { + converted = append(converted, float32(arr[i])) + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_FloatData{ + FloatData: &schemapb.FloatArray{ + Data: converted, + }, + }, + }) + } + fd.Field = &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_ArrayData{ + ArrayData: &schemapb.ArrayArray{ + Data: data, + ElementType: schemapb.DataType_Float, + }, + }, + }, + } + return fd +} + +// ValueByIdx returns value of the provided index +// error occurs when index out of range +func (c *ColumnFloatArray) ValueByIdx(idx int) ([]float32, error) { + var r []float32 // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// AppendValue append value into column +func(c *ColumnFloatArray) AppendValue(i interface{}) error { + v, ok := i.([]float32) + if !ok { + return fmt.Errorf("invalid type, expected []float32, got %T", i) + } + c.values = append(c.values, v) + + return nil +} + +// Data returns column data +func (c *ColumnFloatArray) Data() [][]float32 { + return c.values +} + +// NewColumnFloat auto generated constructor +func NewColumnFloatArray(name string, values [][]float32) *ColumnFloatArray { + return &ColumnFloatArray { + name: name, + values: values, + } +} + +// ColumnDoubleArray generated columns type for Double +type ColumnDoubleArray struct { + ColumnBase + name string + values [][]float64 +} + +// Name returns column name +func (c *ColumnDoubleArray) Name() string { + return c.name +} + +// Type returns column FieldType +func (c *ColumnDoubleArray) Type() FieldType { + return FieldTypeArray +} + +// Len returns column values length +func (c *ColumnDoubleArray) Len() int { + return len(c.values) +} + +// Get returns value at index as interface{}. +func (c *ColumnDoubleArray) Get(idx int) (interface{}, error) { + var r []float64 // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// FieldData return column data mapped to schemapb.FieldData +func (c *ColumnDoubleArray) FieldData() *schemapb.FieldData { + fd := &schemapb.FieldData{ + Type: schemapb.DataType_Array, + FieldName: c.name, + } + + data := make([]*schemapb.ScalarField, 0, c.Len()) + for _, arr := range c.values { + converted := make([]float64, 0, c.Len()) + for i := 0; i < len(arr); i++ { + converted = append(converted, float64(arr[i])) + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_DoubleData{ + DoubleData: &schemapb.DoubleArray{ + Data: converted, + }, + }, + }) + } + fd.Field = &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_ArrayData{ + ArrayData: &schemapb.ArrayArray{ + Data: data, + ElementType: schemapb.DataType_Double, + }, + }, + }, + } + return fd +} + +// ValueByIdx returns value of the provided index +// error occurs when index out of range +func (c *ColumnDoubleArray) ValueByIdx(idx int) ([]float64, error) { + var r []float64 // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// AppendValue append value into column +func(c *ColumnDoubleArray) AppendValue(i interface{}) error { + v, ok := i.([]float64) + if !ok { + return fmt.Errorf("invalid type, expected []float64, got %T", i) + } + c.values = append(c.values, v) + + return nil +} + +// Data returns column data +func (c *ColumnDoubleArray) Data() [][]float64 { + return c.values +} + +// NewColumnDouble auto generated constructor +func NewColumnDoubleArray(name string, values [][]float64) *ColumnDoubleArray { + return &ColumnDoubleArray { + name: name, + values: values, + } +} + +// ColumnVarCharArray generated columns type for VarChar +type ColumnVarCharArray struct { + ColumnBase + name string + values [][][]byte +} + +// Name returns column name +func (c *ColumnVarCharArray) Name() string { + return c.name +} + +// Type returns column FieldType +func (c *ColumnVarCharArray) Type() FieldType { + return FieldTypeArray +} + +// Len returns column values length +func (c *ColumnVarCharArray) Len() int { + return len(c.values) +} + +// Get returns value at index as interface{}. +func (c *ColumnVarCharArray) Get(idx int) (interface{}, error) { + var r [][]byte // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// FieldData return column data mapped to schemapb.FieldData +func (c *ColumnVarCharArray) FieldData() *schemapb.FieldData { + fd := &schemapb.FieldData{ + Type: schemapb.DataType_Array, + FieldName: c.name, + } + + data := make([]*schemapb.ScalarField, 0, c.Len()) + for _, arr := range c.values { + converted := make([][]byte, 0, c.Len()) + for i := 0; i < len(arr); i++ { + converted = append(converted, []byte(arr[i])) + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BytesData{ + BytesData: &schemapb.BytesArray{ + Data: converted, + }, + }, + }) + } + fd.Field = &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_ArrayData{ + ArrayData: &schemapb.ArrayArray{ + Data: data, + ElementType: schemapb.DataType_VarChar, + }, + }, + }, + } + return fd +} + +// ValueByIdx returns value of the provided index +// error occurs when index out of range +func (c *ColumnVarCharArray) ValueByIdx(idx int) ([][]byte, error) { + var r [][]byte // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// AppendValue append value into column +func(c *ColumnVarCharArray) AppendValue(i interface{}) error { + v, ok := i.([][]byte) + if !ok { + return fmt.Errorf("invalid type, expected [][]byte, got %T", i) + } + c.values = append(c.values, v) + + return nil +} + +// Data returns column data +func (c *ColumnVarCharArray) Data() [][][]byte { + return c.values +} + +// NewColumnVarChar auto generated constructor +func NewColumnVarCharArray(name string, values [][][]byte) *ColumnVarCharArray { + return &ColumnVarCharArray { + name: name, + values: values, + } +} + diff --git a/entity/columns_varchar_test.go b/entity/columns_varchar_test.go index 46a88358..4913554b 100644 --- a/entity/columns_varchar_test.go +++ b/entity/columns_varchar_test.go @@ -21,10 +21,10 @@ func TestColumnVarChar(t *testing.T) { t.Run("test meta", func(t *testing.T) { ft := FieldTypeVarChar assert.Equal(t, "VarChar", ft.Name()) - assert.Equal(t, "string", ft.String()) + assert.Equal(t, "[]byte", ft.String()) pbName, pbType := ft.PbFieldType() - assert.Equal(t, "VarChar", pbName) - assert.Equal(t, "string", pbType) + assert.Equal(t, "Bytes", pbName) + assert.Equal(t, "[]byte", pbType) }) t.Run("test column attribute", func(t *testing.T) { diff --git a/entity/genarray/gen_array.go b/entity/genarray/gen_array.go new file mode 100644 index 00000000..bd011ff6 --- /dev/null +++ b/entity/genarray/gen_array.go @@ -0,0 +1,171 @@ +package main + +import ( + "fmt" + "os" + "text/template" + + "github.com/milvus-io/milvus-sdk-go/v2/entity" +) + +var arrayColumnTmpl = template.Must(template.New("").Parse(`// Code generated by go generate; DO NOT EDIT +// This file is generated by go generate + +package entity + +import ( + "fmt" + + "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" +) + +{{ range .Types }}{{with .}} +// Column{{.TypeName}}Array generated columns type for {{.TypeName}} +type Column{{.TypeName}}Array struct { + ColumnBase + name string + values [][]{{.TypeDef}} +} + +// Name returns column name +func (c *Column{{.TypeName}}Array) Name() string { + return c.name +} + +// Type returns column FieldType +func (c *Column{{.TypeName}}Array) Type() FieldType { + return FieldTypeArray +} + +// Len returns column values length +func (c *Column{{.TypeName}}Array) Len() int { + return len(c.values) +} + +// Get returns value at index as interface{}. +func (c *Column{{.TypeName}}Array) Get(idx int) (interface{}, error) { + var r []{{.TypeDef}} // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// FieldData return column data mapped to schemapb.FieldData +func (c *Column{{.TypeName}}Array) FieldData() *schemapb.FieldData { + fd := &schemapb.FieldData{ + Type: schemapb.DataType_Array, + FieldName: c.name, + } + + data := make([]*schemapb.ScalarField, 0, c.Len()) + for _, arr := range c.values { + converted := make([]{{.PbType}}, 0, c.Len()) + for i := 0; i < len(arr); i++ { + converted = append(converted, {{.PbType}}(arr[i])) + } + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_{{.PbName}}Data{ + {{.PbName}}Data: &schemapb.{{.PbName}}Array{ + Data: converted, + }, + }, + }) + } + fd.Field = &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_ArrayData{ + ArrayData: &schemapb.ArrayArray{ + Data: data, + ElementType: schemapb.DataType_{{.TypeName}}, + }, + }, + }, + } + return fd +} + +// ValueByIdx returns value of the provided index +// error occurs when index out of range +func (c *Column{{.TypeName}}Array) ValueByIdx(idx int) ([]{{.TypeDef}}, error) { + var r []{{.TypeDef}} // use default value + if idx < 0 || idx >= c.Len() { + return r, errors.New("index out of range") + } + return c.values[idx], nil +} + +// AppendValue append value into column +func(c *Column{{.TypeName}}Array) AppendValue(i interface{}) error { + v, ok := i.([]{{.TypeDef}}) + if !ok { + return fmt.Errorf("invalid type, expected []{{.TypeDef}}, got %T", i) + } + c.values = append(c.values, v) + + return nil +} + +// Data returns column data +func (c *Column{{.TypeName}}Array) Data() [][]{{.TypeDef}} { + return c.values +} + +// NewColumn{{.TypeName}} auto generated constructor +func NewColumn{{.TypeName}}Array(name string, values [][]{{.TypeDef}}) *Column{{.TypeName}}Array { + return &Column{{.TypeName}}Array { + name: name, + values: values, + } +} +{{end}}{{end}} +`)) + +func main() { + arrElementTypes := []entity.FieldType{ + entity.FieldTypeBool, + entity.FieldTypeInt8, + entity.FieldTypeInt16, + entity.FieldTypeInt32, + entity.FieldTypeInt64, + entity.FieldTypeFloat, + entity.FieldTypeDouble, + entity.FieldTypeVarChar, + } + + pf := func(ft entity.FieldType) interface{} { + pbName, pbType := ft.PbFieldType() + return struct { + TypeName string + TypeDef string + PbName string + PbType string + }{ + TypeName: ft.Name(), + TypeDef: ft.String(), + PbName: pbName, + PbType: pbType, + } + } + + fn := func(fn string, types []entity.FieldType, tmpl *template.Template, pf func(entity.FieldType) interface{}) { + params := struct { + Types []interface{} + }{ + Types: make([]interface{}, 0, len(types)), + } + for _, ft := range types { + params.Types = append(params.Types, pf(ft)) + } + f, err := os.OpenFile(fn, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) + if err != nil { + fmt.Println(err.Error()) + return + } + defer f.Close() + + tmpl.Execute(f, params) + } + fn("columns_array_gen.go", arrElementTypes, arrayColumnTmpl, pf) +} diff --git a/entity/schema.go b/entity/schema.go index d94371b1..f9dfd850 100644 --- a/entity/schema.go +++ b/entity/schema.go @@ -25,6 +25,9 @@ const ( // TypeParamMaxLength is the const for varchar type maximal length TypeParamMaxLength = "max_length" + // TypeParamMaxCapacity is the const for array type max capacity + TypeParamMaxCapacity = `max_capacity` + // ClStrong strong consistency level ClStrong ConsistencyLevel = ConsistencyLevel(common.ConsistencyLevel_Strong) // ClBounded bounded consistency level with default tolerance of 5 seconds @@ -137,6 +140,7 @@ type Field struct { IndexParams map[string]string IsDynamic bool IsPartitionKey bool + ElementType FieldType } // ProtoMessage generates corresponding FieldSchema @@ -152,6 +156,7 @@ func (f *Field) ProtoMessage() *schema.FieldSchema { IndexParams: MapKvPairs(f.IndexParams), IsDynamic: f.IsDynamic, IsPartitionKey: f.IsPartitionKey, + ElementType: schema.DataType(f.ElementType), } } @@ -277,6 +282,19 @@ func (f *Field) WithMaxLength(maxLen int64) *Field { return f } +func (f *Field) WithElementType(eleType FieldType) *Field { + f.ElementType = eleType + return f +} + +func (f *Field) WithMaxCapacity(maxCap int64) *Field { + if f.TypeParams == nil { + f.TypeParams = make(map[string]string) + } + f.TypeParams[TypeParamMaxCapacity] = strconv.FormatInt(maxCap, 10) + return f +} + // ReadProto parses FieldSchema func (f *Field) ReadProto(p *schema.FieldSchema) *Field { f.ID = p.GetFieldID() @@ -289,6 +307,7 @@ func (f *Field) ReadProto(p *schema.FieldSchema) *Field { f.IndexParams = KvPairsMap(p.GetIndexParams()) f.IsDynamic = p.GetIsDynamic() f.IsPartitionKey = p.GetIsPartitionKey() + f.ElementType = FieldType(p.GetElementType()) return f } @@ -339,6 +358,8 @@ func (t FieldType) Name() string { return "String" case FieldTypeVarChar: return "VarChar" + case FieldTypeArray: + return "Array" case FieldTypeJSON: return "JSON" case FieldTypeBinaryVector: @@ -370,7 +391,9 @@ func (t FieldType) String() string { case FieldTypeString: return "string" case FieldTypeVarChar: - return "string" + return "[]byte" + case FieldTypeArray: + return "Array" case FieldTypeJSON: return "JSON" case FieldTypeBinaryVector: @@ -402,7 +425,7 @@ func (t FieldType) PbFieldType() (string, string) { case FieldTypeString: return "String", "string" case FieldTypeVarChar: - return "VarChar", "string" + return "Bytes", "[]byte" case FieldTypeJSON: return "JSON", "JSON" case FieldTypeBinaryVector: @@ -436,7 +459,8 @@ const ( FieldTypeString FieldType = 20 // FieldTypeVarChar field type varchar FieldTypeVarChar FieldType = 21 // variable-length strings with a specified maximum length - // FieldTypeArray FieldType = 22 + // FieldTypeArray field type Array + FieldTypeArray FieldType = 22 // FieldTypeJSON field type JSON FieldTypeJSON FieldType = 23 // FieldTypeBinaryVector field type binary vector diff --git a/entity/schema_test.go b/entity/schema_test.go index a73b5d5b..07d07de6 100644 --- a/entity/schema_test.go +++ b/entity/schema_test.go @@ -25,6 +25,7 @@ func TestFieldSchema(t *testing.T) { NewField().WithName("int_field").WithDataType(FieldTypeInt64).WithIsAutoID(true).WithIsPrimaryKey(true).WithDescription("int_field desc"), NewField().WithName("string_field").WithDataType(FieldTypeString).WithIsAutoID(false).WithIsPrimaryKey(true).WithIsDynamic(false).WithTypeParams("max_len", "32").WithDescription("string_field desc"), NewField().WithName("partition_key").WithDataType(FieldTypeInt32).WithIsPartitionKey(true), + NewField().WithName("array_field").WithDataType(FieldTypeArray).WithElementType(FieldTypeBool).WithMaxCapacity(128), /* NewField().WithName("default_value_bool").WithDataType(FieldTypeBool).WithDefaultValueBool(true), NewField().WithName("default_value_int").WithDataType(FieldTypeInt32).WithDefaultValueInt(1), @@ -45,6 +46,7 @@ func TestFieldSchema(t *testing.T) { assert.Equal(t, field.IsDynamic, fieldSchema.GetIsDynamic()) assert.Equal(t, field.Description, fieldSchema.GetDescription()) assert.Equal(t, field.TypeParams, KvPairsMap(fieldSchema.GetTypeParams())) + assert.EqualValues(t, field.ElementType, fieldSchema.GetElementType()) // marshal & unmarshal, still equals nf := &Field{} nf = nf.ReadProto(fieldSchema) @@ -57,6 +59,7 @@ func TestFieldSchema(t *testing.T) { assert.Equal(t, field.IsDynamic, nf.IsDynamic) assert.Equal(t, field.IsPartitionKey, nf.IsPartitionKey) assert.EqualValues(t, field.TypeParams, nf.TypeParams) + assert.EqualValues(t, field.ElementType, nf.ElementType) } assert.NotPanics(t, func() { diff --git a/examples/array/hello_array.go b/examples/array/hello_array.go new file mode 100644 index 00000000..a89b511e --- /dev/null +++ b/examples/array/hello_array.go @@ -0,0 +1,216 @@ +package main + +import ( + "context" + "fmt" + "log" + "math/rand" + "time" + + "github.com/milvus-io/milvus-sdk-go/v2/client" + "github.com/milvus-io/milvus-sdk-go/v2/entity" +) + +const ( + milvusAddr = `127.0.0.1:19530` + nEntities, dim = 3000, 128 + collectionName = "hello_array" + + msgFmt = "==== %s ====\n" + idCol, randomCol, embeddingCol = "ID", "random", "embeddings" + topK = 3 +) + +func main() { + ctx := context.Background() + + log.Printf(msgFmt, "start connecting to Milvus") + c, err := client.NewClient(ctx, client.Config{ + Address: milvusAddr, + }) + if err != nil { + log.Fatal("failed to connect to milvus, err: ", err.Error()) + } + defer c.Close() + + // delete collection if exists + has, err := c.HasCollection(ctx, collectionName) + if err != nil { + log.Fatalf("failed to check collection exists, err: %v", err) + } + if has { + c.DropCollection(ctx, collectionName) + } + + // create collection + log.Printf(msgFmt, fmt.Sprintf("create collection, `%s`", collectionName)) + schema := entity.NewSchema().WithName(collectionName).WithDescription("hello_array is the simplest demo to introduce the APIs"). + WithField(entity.NewField().WithName(idCol).WithDataType(entity.FieldTypeInt64).WithIsPrimaryKey(true).WithIsAutoID(false)). + WithField(entity.NewField().WithName(randomCol).WithDataType(entity.FieldTypeArray).WithElementType(entity.FieldTypeDouble).WithMaxCapacity(10)). + WithField(entity.NewField().WithName(embeddingCol).WithDataType(entity.FieldTypeFloatVector).WithDim(dim)) + + if err := c.CreateCollection(ctx, schema, entity.DefaultShardNumber); err != nil { // use default shard number + log.Fatalf("create collection failed, err: %v", err) + } + + // insert data + log.Printf(msgFmt, "start inserting random entities") + idList, randomList := make([]int64, 0, nEntities), make([][]float64, 0, nEntities) + embeddingList := make([][]float32, 0, nEntities) + + rand.Seed(time.Now().UnixNano()) + + // generate data + for i := 0; i < nEntities; i++ { + idList = append(idList, int64(i)) + } + for i := 0; i < nEntities; i++ { + data := make([]float64, 0, 10) + for j := 0; j < 10; j++ { + data = append(data, rand.Float64()) + } + randomList = append(randomList, data) + } + for i := 0; i < nEntities; i++ { + vec := make([]float32, 0, dim) + for j := 0; j < dim; j++ { + vec = append(vec, rand.Float32()) + } + embeddingList = append(embeddingList, vec) + } + idColData := entity.NewColumnInt64(idCol, idList) + randomColData := entity.NewColumnDoubleArray(randomCol, randomList) + embeddingColData := entity.NewColumnFloatVector(embeddingCol, dim, embeddingList) + + if _, err := c.Insert(ctx, collectionName, "", idColData, randomColData, embeddingColData); err != nil { + log.Fatalf("failed to insert random data into `hello_array, err: %v", err) + } + + if err := c.Flush(ctx, collectionName, false); err != nil { + log.Fatalf("failed to flush data, err: %v", err) + } + + // build index + log.Printf(msgFmt, "start creating index IVF_FLAT") + idx, err := entity.NewIndexIvfFlat(entity.L2, 128) + if err != nil { + log.Fatalf("failed to create ivf flat index, err: %v", err) + } + if err := c.CreateIndex(ctx, collectionName, embeddingCol, idx, false); err != nil { + log.Fatalf("failed to create index, err: %v", err) + } + + log.Printf(msgFmt, "start loading collection") + err = c.LoadCollection(ctx, collectionName, false) + if err != nil { + log.Fatalf("failed to load collection, err: %v", err) + } + + log.Printf(msgFmt, "start searcching based on vector similarity") + vec2search := []entity.Vector{ + entity.FloatVector(embeddingList[len(embeddingList)-2]), + entity.FloatVector(embeddingList[len(embeddingList)-1]), + } + begin := time.Now() + sp, _ := entity.NewIndexIvfFlatSearchParam(16) + sRet, err := c.Search(ctx, collectionName, nil, "", []string{randomCol}, vec2search, + embeddingCol, entity.L2, topK, sp) + end := time.Now() + if err != nil { + log.Fatalf("failed to search collection, err: %v", err) + } + + log.Println("results:") + for _, res := range sRet { + printResult(&res) + } + log.Printf("\tsearch latency: %dms\n", end.Sub(begin)/time.Millisecond) + + // hybrid search + log.Printf(msgFmt, "start hybrid searching with `random[0] > 0.5`") + begin = time.Now() + sRet2, err := c.Search(ctx, collectionName, nil, "random[0] > 0.5", + []string{randomCol}, vec2search, embeddingCol, entity.L2, topK, sp) + end = time.Now() + if err != nil { + log.Fatalf("failed to search collection, err: %v", err) + } + log.Println("results:") + for _, res := range sRet2 { + printResult(&res) + } + log.Printf("\tsearch latency: %dms\n", end.Sub(begin)/time.Millisecond) + + // delete data + log.Printf(msgFmt, "start deleting with expr ``") + pks := entity.NewColumnInt64(idCol, []int64{0, 1}) + sRet3, err := c.QueryByPks(ctx, collectionName, nil, pks, []string{randomCol}) + if err != nil { + log.Fatalf("failed to query result, err: %v", err) + } + log.Println("results:") + idlist := make([]int64, 0) + randList := make([][]float64, 0) + + for _, col := range sRet3 { + if col.Name() == idCol { + idColumn := col.(*entity.ColumnInt64) + for i := 0; i < col.Len(); i++ { + val, err := idColumn.ValueByIdx(i) + if err != nil { + log.Fatal(err) + } + idlist = append(idlist, val) + } + } else { + randColumn := col.(*entity.ColumnDoubleArray) + for i := 0; i < col.Len(); i++ { + val, err := randColumn.ValueByIdx(i) + if err != nil { + log.Fatal(err) + } + randList = append(randList, val) + } + } + } + log.Printf("\tids: %#v, randoms: %#v\n", idlist, randList) + + if err := c.DeleteByPks(ctx, collectionName, "", pks); err != nil { + log.Fatalf("failed to delete by pks, err: %v", err) + } + _, err = c.QueryByPks(ctx, collectionName, nil, pks, []string{randomCol}, client.WithSearchQueryConsistencyLevel(entity.ClStrong)) + if err != nil { + log.Printf("failed to query result, err: %v", err) + } + + // drop collection + log.Printf(msgFmt, "drop collection `hello_array`") + if err := c.DropCollection(ctx, collectionName); err != nil { + log.Fatalf("failed to drop collection, err: %v", err) + } +} + +func printResult(sRet *client.SearchResult) { + randoms := make([][]float64, 0, sRet.ResultCount) + scores := make([]float32, 0, sRet.ResultCount) + + var randCol *entity.ColumnDoubleArray + for _, field := range sRet.Fields { + fmt.Println(field.Name()) + if field.Name() == randomCol { + c, ok := field.(*entity.ColumnDoubleArray) + if ok { + randCol = c + } + } + } + for i := 0; i < sRet.ResultCount; i++ { + val, err := randCol.ValueByIdx(i) + if err != nil { + log.Fatal(err) + } + randoms = append(randoms, val) + scores = append(scores, sRet.Scores[i]) + } + log.Printf("\trandoms: %v, scores: %v\n", randoms, scores) +}