Skip to content

Commit

Permalink
fix upsert/load cases
Browse files Browse the repository at this point in the history
Signed-off-by: ThreadDao <[email protected]>
  • Loading branch information
ThreadDao committed Jul 2, 2024
1 parent b62fdb2 commit 6a92a2f
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 30 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ jobs:
working-directory: ci/scripts
run: |
url="https://registry.hub.docker.com/v2/repositories/$IMAGE_REPO/tags?page=1&name=$TAG_PREFIX"
echo "IMAGE_TAG=$(curl -s $url | jq -r '."results"[]["name"] | select(test("amd64$"))' | head -n 1)" >> $GITHUB_ENV
echo "IMAGE_TAG=$(curl -s $url | jq -r '."results"[]["name"]' | head -n 1)" >> $GITHUB_ENV
# echo "IMAGE_TAG=$(curl -s $url | jq -r '."results"[]["name"] | select(test("amd64$"))' | head -n 1)" >> $GITHUB_ENV
# echo "IMAGE_TAG=$(./docker_image_find_tag.sh -n ${IMAGE_REPO} -t ${TAG_PREFIX}latest -f ${TAG_PREFIX} -F -L -q)" >> $GITHUB_ENV
# export IMAGE_TAG=$IMAGE_TAG
# export IMAGE_REPO=$IMAGE_REPO
Expand Down
1 change: 0 additions & 1 deletion test/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,6 @@ func GenAllFloatIndex(metricTypes ...entity.MetricType) []entity.Index {
nlist := 128
var allFloatIndex []entity.Index
var allMetricTypes []entity.MetricType
log.Println(metricTypes)
if len(metricTypes) == 0 {
allMetricTypes = SupportFloatMetricType
} else {
Expand Down
4 changes: 2 additions & 2 deletions test/testcases/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func TestCreateIndexMultiVectors(t *testing.T) {
common.CheckErr(t, err, true)
for _, idx := range common.GenAllFloatIndex() {
for _, fieldName := range []string{common.DefaultFloat16VecFieldName, common.DefaultBFloat16VecFieldName} {
log.Printf("index name=%s, index type=%v, index params=%v", idx.Name(), idx.IndexType(), idx.Params())
err := mc.CreateIndex(ctx, collName, fieldName, idx, false, client.WithIndexName(fieldName))
common.CheckErr(t, err, true)

Expand Down Expand Up @@ -360,8 +361,7 @@ func TestCreateScalarIndexVectorField(t *testing.T) {
idx := entity.NewScalarIndexWithType(ip)
for _, fieldName := range common.AllVectorsFieldsName {
err := mc.CreateIndex(ctx, collName, fieldName, idx, false)
common.CheckErr(t, err, false, "STL_SORT are only supported on numeric field",
"TRIE are only supported on varchar field", "INVERTED are not supported on")
common.CheckErr(t, err, false, "metric type not set for vector index")
}
}
for _, fieldName := range common.AllFloatVectorsFieldNames {
Expand Down
22 changes: 14 additions & 8 deletions test/testcases/load_release_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,16 +247,22 @@ func TestLoadMultiPartitions(t *testing.T) {
idx, _ := entity.NewIndexHNSW(entity.L2, 8, 96)
mc.CreateIndex(ctx, collName, common.DefaultFloatVecFieldName, idx, false)

// load partition
errLoad := mc.LoadPartitions(ctx, collName, []string{partitionName, common.DefaultPartition}, false)
// load default partition
errLoad := mc.LoadPartitions(ctx, collName, []string{common.DefaultPartition}, false)
common.CheckErr(t, errLoad, true)

//query nb from partition
queryIds := entity.NewColumnInt64(common.DefaultIntFieldName, []int64{0, common.DefaultNb})
queryResultPartition, _ := mc.QueryByPks(ctx, collName, []string{}, queryIds, []string{common.DefaultIntFieldName})
common.CheckQueryResult(t, queryResultPartition, []entity.Column{
entity.NewColumnInt64(common.DefaultIntFieldName, []int64{0, common.DefaultNb}),
})
//query nb from default partition
resDef, _ := mc.Query(ctx, collName, []string{common.DefaultPartition}, "", []string{common.QueryCountFieldName})
require.EqualValues(t, common.DefaultNb, resDef.GetColumn(common.QueryCountFieldName).(*entity.ColumnInt64).Data()[0])

// load partition and query -> actually not loaded
errLoad = mc.LoadPartitions(ctx, collName, []string{partitionName}, false)
common.CheckErr(t, errLoad, true)
resPar, _ := mc.Query(ctx, collName, []string{partitionName}, "", []string{common.QueryCountFieldName})
require.EqualValues(t, common.DefaultNb, resPar.GetColumn(common.QueryCountFieldName).(*entity.ColumnInt64).Data()[0])

res, _ := mc.Query(ctx, collName, []string{}, "", []string{common.QueryCountFieldName})
require.EqualValues(t, common.DefaultNb*2, res.GetColumn(common.QueryCountFieldName).(*entity.ColumnInt64).Data()[0])
}

// test load partitions repeatedly
Expand Down
46 changes: 28 additions & 18 deletions test/testcases/upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func TestUpsert(t *testing.T) {

// delete some pk
mc.Delete(ctx, collName, "", "int64 < 10")
resSet, err = mc.Query(ctx, collName, []string{}, "int64 < 10", []string{})
require.Zero(t, resSet[0].Len())
resSet1, _ := mc.Query(ctx, collName, []string{}, "int64 < 10", []string{})
require.Zero(t, resSet1[0].Len())

// upsert part deleted(not exist) pk and part existed pk [5, 15)
data = common.GenAllFieldsData(5, upsertNb, common.DefaultDim)
Expand Down Expand Up @@ -219,12 +219,15 @@ func TestUpsertNotExistCollectionPartition(t *testing.T) {
mc := createMilvusClient(ctx, t)

// create default collection with autoID true
collName := createDefaultCollection(ctx, t, mc, true, common.DefaultShards)
collName := createDefaultCollection(ctx, t, mc, false, common.DefaultShards)

// upsert not exist partition
_, floatColumn, vecColumn := common.GenDefaultColumnData(0, common.DefaultNb, common.DefaultDim)
pkColumn, floatColumn, vecColumn := common.GenDefaultColumnData(0, common.DefaultNb, common.DefaultDim)
_, errUpsert := mc.Upsert(ctx, collName, "aaa", floatColumn, vecColumn)
common.CheckErr(t, errUpsert, false, "can not assign primary field data when auto id enabled")
common.CheckErr(t, errUpsert, false, "field int64 not passed")

_, errUpsert = mc.Upsert(ctx, collName, "aaa", pkColumn, floatColumn, vecColumn)
common.CheckErr(t, errUpsert, false, "partition not found[partition=aaa]")

// upsert not exist collection
_, errUpsert = mc.Upsert(ctx, "aaa", "", floatColumn, vecColumn)
Expand Down Expand Up @@ -323,18 +326,18 @@ func TestUpsertDynamicField(t *testing.T) {

// verify that dynamic field exists
upsertNb := 10
resSet, err := mc.Query(ctx, collName, []string{}, fmt.Sprintf("%s < %d", common.DefaultDynamicNumberField, upsertNb),
resSet1, _ := mc.Query(ctx, collName, []string{}, fmt.Sprintf("%s < %d", common.DefaultDynamicNumberField, upsertNb),
[]string{common.DefaultDynamicNumberField})
require.Equal(t, upsertNb, resSet[0].Len())
require.Equal(t, upsertNb, resSet1[0].Len())

// 1. upsert exist pk without dynamic column
intColumn, floatColumn, vecColumn := common.GenDefaultColumnData(0, upsertNb, common.DefaultDim)
_, err = mc.Upsert(ctx, collName, "", intColumn, floatColumn, vecColumn)
_, err := mc.Upsert(ctx, collName, "", intColumn, floatColumn, vecColumn)
common.CheckErr(t, err, true)

// query and gets empty
resSet, err = mc.Query(ctx, collName, []string{}, fmt.Sprintf("%s < %d", common.DefaultDynamicNumberField, upsertNb), []string{common.DefaultDynamicNumberField})
require.Zero(t, resSet[0].Len())
resSet2, _ := mc.Query(ctx, collName, []string{}, fmt.Sprintf("%s < %d", common.DefaultDynamicNumberField, upsertNb), []string{common.DefaultDynamicNumberField})
require.Zero(t, resSet2[0].Len())

// 2. upsert not exist pk with dynamic column -> field dynamicNumber does not exist in collection
intColumn2, floatColumn2, vecColumn2 := common.GenDefaultColumnData(common.DefaultNb, upsertNb, common.DefaultDim)
Expand All @@ -343,8 +346,8 @@ func TestUpsertDynamicField(t *testing.T) {
common.CheckErr(t, err, true)

// query and gets empty dynamic field
resSet, err = mc.Query(ctx, collName, []string{}, fmt.Sprintf("%s >= %d", common.DefaultDynamicNumberField, common.DefaultNb), []string{common.QueryCountFieldName})
require.Equal(t, int64(upsertNb), resSet.GetColumn(common.QueryCountFieldName).(*entity.ColumnInt64).Data()[0])
resSet3, _ := mc.Query(ctx, collName, []string{}, fmt.Sprintf("%s >= %d", common.DefaultDynamicNumberField, common.DefaultNb), []string{common.QueryCountFieldName})
require.Equal(t, int64(upsertNb), resSet3.GetColumn(common.QueryCountFieldName).(*entity.ColumnInt64).Data()[0])
}

func TestUpsertPartitionKeyCollection(t *testing.T) {
Expand Down Expand Up @@ -382,16 +385,22 @@ func TestUpsertPartitionKeyCollection(t *testing.T) {

// upsert data partition key field [nb, nb*2)
partitionKeyColumn2 := common.GenColumnData(common.DefaultNb, common.DefaultNb, entity.FieldTypeInt64, partitionKeyFieldName)
mc.Upsert(ctx, schema.CollectionName, "", pkColumn, floatColumn, vecColumn, partitionKeyColumn2)
_, _, vecColumn2 := common.GenDefaultColumnData(0, common.DefaultNb, common.DefaultDim)
_, err = mc.Upsert(ctx, schema.CollectionName, "", pkColumn, floatColumn, vecColumn2, partitionKeyColumn2)
common.CheckErr(t, err, true)

// verify upsert
resSet, err := mc.Query(ctx, schema.CollectionName, []string{}, fmt.Sprintf("%s >= %d", partitionKeyFieldName, common.DefaultNb),
resSet, _ := mc.Query(ctx, schema.CollectionName, []string{}, fmt.Sprintf("%s >= %d", partitionKeyFieldName, common.DefaultNb),
[]string{common.QueryCountFieldName})
require.Equal(t, int64(common.DefaultNb), resSet.GetColumn(common.QueryCountFieldName).(*entity.ColumnInt64).Data()[0])

resSet, err = mc.Query(ctx, schema.CollectionName, []string{}, fmt.Sprintf("%s < %d", partitionKeyFieldName, common.DefaultNb),
resSet1, _ := mc.Query(ctx, schema.CollectionName, []string{}, fmt.Sprintf("%s < %d", partitionKeyFieldName, common.DefaultNb),
[]string{common.QueryCountFieldName})
require.Equal(t, int64(0), resSet.GetColumn(common.QueryCountFieldName).(*entity.ColumnInt64).Data()[0])
require.Equal(t, int64(0), resSet1.GetColumn(common.QueryCountFieldName).(*entity.ColumnInt64).Data()[0])

resSet2, _ := mc.Query(ctx, schema.CollectionName, []string{}, fmt.Sprintf("%s < %d", common.DefaultIntFieldName, 10),
[]string{"*"})
common.CheckQueryResult(t, resSet2, []entity.Column{pkColumn.Slice(0, 10), floatColumn.Slice(0, 10), partitionKeyColumn2.Slice(0, 10), vecColumn2.Slice(0, 10)})
}

func TestUpsertWithoutLoading(t *testing.T) {
Expand Down Expand Up @@ -423,10 +432,11 @@ func TestUpsertWithoutLoading(t *testing.T) {
mc.CreateIndex(ctx, collName, common.DefaultFloatVecFieldName, idx, false)
mc.LoadCollection(ctx, collName, false)
err = mc.CreateIndex(ctx, collName, common.DefaultFloatVecFieldName, idx, false)
common.CheckErr(t, err, true)

// query and verify
resSet, err := mc.QueryByPks(ctx, collName, []string{}, intColumn,
resSet, err1 := mc.QueryByPks(ctx, collName, []string{}, intColumn,
[]string{common.DefaultFloatVecFieldName})
common.CheckErr(t, err, true)
common.CheckErr(t, err1, true)
require.ElementsMatch(t, vecColumn.(*entity.ColumnFloatVector).Data()[:upsertNb], resSet.GetColumn(common.DefaultFloatVecFieldName).(*entity.ColumnFloatVector).Data())
}

0 comments on commit 6a92a2f

Please sign in to comment.