Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test cases for mmap #715

Merged
merged 1 commit into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions test/base/milvus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ func (mc *MilvusClient) HasCollection(ctx context.Context, collName string) (boo
return has, err
}

// AlterCollection changes collection attributes
func (mc *MilvusClient) AlterCollection(ctx context.Context, collName string, attrs ...entity.CollectionAttribute) error {
preRequest("AlterCollection", ctx, collName)
err := mc.mClient.AlterCollection(ctx, collName, attrs...)
postResponse("AlterCollection", err)
return err
}

// -- alias --

// CreateAlias Create Alias
Expand Down Expand Up @@ -356,6 +364,14 @@ func (mc *MilvusClient) GetIndexState(ctx context.Context, collName string, fiel
return indexState, err
}

// AlterIndex modifies the index params.
func (mc *MilvusClient) AlterIndex(ctx context.Context, collName string, indexName string, opts ...client.IndexOption) error {
preRequest("AlterIndex", ctx, collName, indexName, opts)
err := mc.mClient.AlterIndex(ctx, collName, indexName, opts...)
postResponse("AlterIndex", err)
return err
}

// -- basic operation --

// Insert insert data
Expand Down
305 changes: 305 additions & 0 deletions test/testcases/load_release_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package testcases
import (
"fmt"
"log"
"strconv"
"testing"
"time"

"github.com/milvus-io/milvus-sdk-go/v2/client"

"github.com/milvus-io/milvus-sdk-go/v2/entity"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -512,3 +515,305 @@ func TestReleaseMultiPartitions(t *testing.T) {
[]string{common.DefaultIntFieldName})
common.CheckErr(t, errQuery, false, "collection not loaded")
}

// test mmap collection raw data and index
// create -> insert -> flush -> index with mmap -> load -> alter collection with mmap -> reload -> read op
func TestMmapCollectionIndexDefault(t *testing.T) {
ctx := createContext(t, time.Second*common.DefaultTimeout*2)
// connect
mc := createMilvusClient(ctx, t)

// create -> insert [0, 3000) -> flush
cp := CollectionParams{CollectionFieldsType: AllFields, AutoID: false, EnableDynamicField: true,
ShardsNum: common.DefaultShards, Dim: common.DefaultDim}
collName := createCollection(ctx, t, mc, cp)

dp := DataParams{DoInsert: true, CollectionName: collName, CollectionFieldsType: AllFields, start: 0,
nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: true}
insertData(ctx, t, mc, dp)
_ = mc.Flush(ctx, collName, false)

// create vector index with mmap
GenDefaultIndexParamsForAllVectors()
indexHnsw, _ := entity.NewIndexHNSW(entity.L2, 8, 96)
indexBinary, _ := entity.NewIndexBinIvfFlat(entity.JACCARD, 64)
for _, fieldName := range common.AllVectorsFieldsName {
if fieldName == common.DefaultBinaryVecFieldName {
err := mc.CreateIndex(ctx, collName, fieldName, indexBinary, false, client.WithMmap(true))
common.CheckErr(t, err, true)
} else if fieldName == common.DefaultFloatVecFieldName {
err := mc.CreateIndex(ctx, collName, fieldName, indexHnsw, false, client.WithMmap(true))
common.CheckErr(t, err, true)
} else {
err := mc.CreateIndex(ctx, collName, fieldName, indexHnsw, false)
common.CheckErr(t, err, true)
}
}

// describe index and check mmap
for _, fieldName := range common.AllVectorsFieldsName {
if fieldName == common.DefaultFloatVecFieldName || fieldName == common.DefaultBinaryVecFieldName {
indexes, _ := mc.DescribeIndex(ctx, collName, fieldName)
require.Len(t, indexes, 1)
// check index mmap
require.Equal(t, "true", indexes[0].Params()["mmap.enabled"])
}
}

// load collection -> describe collection and check mmap false
_ = mc.LoadCollection(ctx, collName, false)
coll, _ := mc.DescribeCollection(ctx, collName)
require.Equal(t, "", coll.Properties["mmap.enabled"])

// alter collection and check collection mmap
_ = mc.ReleaseCollection(ctx, collName)
err := mc.AlterCollection(ctx, collName, entity.Mmap(true))
common.CheckErr(t, err, true)

//describe collection
mc.LoadCollection(ctx, collName, false)
coll, _ = mc.DescribeCollection(ctx, collName)
require.Equal(t, "true", coll.Properties["mmap.enabled"])

// query
queryRes, err := mc.Query(ctx, collName, []string{}, fmt.Sprintf("%s < 10", common.DefaultIntFieldName), []string{"*"})
common.CheckErr(t, err, true)
require.Equal(t, queryRes.GetColumn(common.DefaultIntFieldName).Len(), 10)
common.CheckOutputFields(t, queryRes, common.GetAllFieldsName(true, false))

// search
queryVec1 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeFloatVector)
sp, _ := entity.NewIndexHNSWSearchParam(74)
expr := fmt.Sprintf("%s > 10", common.DefaultIntFieldName)
searchRes, _ := mc.Search(ctx, collName, []string{}, expr, []string{"*"}, queryVec1, common.DefaultFloatVecFieldName,
entity.L2, common.DefaultTopK, sp)
common.CheckOutputFields(t, searchRes[0].Fields, common.GetAllFieldsName(true, false))

// hybrid search
queryVec2 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeBinaryVector)
sReqs := []*client.ANNSearchRequest{
client.NewANNSearchRequest(common.DefaultFloatVecFieldName, entity.L2, "", queryVec1, sp, common.DefaultTopK, client.WithOffset(3)),
client.NewANNSearchRequest(common.DefaultBinaryVecFieldName, entity.JACCARD, "", queryVec2, sp, common.DefaultTopK),
}
_, errSearch := mc.HybridSearch(ctx, collName, []string{}, common.DefaultTopK, []string{}, client.NewRRFReranker(), sReqs)
common.CheckErr(t, errSearch, true)
}

// test mmap collection raw data and index
// create -> insert -> flush -> index -> load -> alter collection with mmap -> alter index with mmap -> reload -> read op
func TestMmapAlterCollectionIndexDefault(t *testing.T) {
ctx := createContext(t, time.Second*common.DefaultTimeout*2)
// connect
mc := createMilvusClient(ctx, t)

// create -> insert [0, 3000) -> flush
cp := CollectionParams{CollectionFieldsType: AllFields, AutoID: false, EnableDynamicField: true,
ShardsNum: common.DefaultShards, Dim: common.DefaultDim}

dp := DataParams{DoInsert: true, CollectionFieldsType: AllFields, start: 0,
nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: true}

ips := GenDefaultIndexParamsForAllVectors()
collName := prepareCollection(ctx, t, mc, cp, WithDataParams(dp), WithIndexParams(ips), WithCreateOption(client.WithConsistencyLevel(entity.ClStrong)))

// describe index and check mmap
for _, fieldName := range common.AllVectorsFieldsName {
indexes, _ := mc.DescribeIndex(ctx, collName, fieldName)
// check index mmap
require.Equal(t, "", indexes[0].Params()["mmap.enabled"])
}

//describe collection
mc.LoadCollection(ctx, collName, false)
coll, _ := mc.DescribeCollection(ctx, collName)
require.Equal(t, "", coll.Properties["mmap.enabled"])

// alter mmap: collection and index
_ = mc.ReleaseCollection(ctx, collName)
err := mc.AlterCollection(ctx, collName, entity.Mmap(true))
common.CheckErr(t, err, true)
for _, fieldName := range common.AllVectorsFieldsName {
err := mc.AlterIndex(ctx, collName, fieldName, client.WithMmap(true))
common.CheckErr(t, err, true)
}

// load collection -> describe collection and check mmap false
//describe collection
mc.LoadCollection(ctx, collName, false)
coll, _ = mc.DescribeCollection(ctx, collName)
require.Equal(t, "true", coll.Properties["mmap.enabled"])
for _, fieldName := range common.AllVectorsFieldsName {
idx, _ := mc.DescribeIndex(ctx, collName, fieldName)
require.Equal(t, "true", idx[0].Params()["mmap.enabled"])
}

// query
queryRes, err := mc.Query(ctx, collName, []string{}, fmt.Sprintf("%s < 10", common.DefaultIntFieldName), []string{"*"})
common.CheckErr(t, err, true)
require.Equal(t, queryRes.GetColumn(common.DefaultIntFieldName).Len(), 10)
common.CheckOutputFields(t, queryRes, common.GetAllFieldsName(true, false))

// search
queryVec1 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeFloatVector)
sp, _ := entity.NewIndexHNSWSearchParam(74)
expr := fmt.Sprintf("%s > 10", common.DefaultIntFieldName)
searchRes, _ := mc.Search(ctx, collName, []string{}, expr, []string{"*"}, queryVec1, common.DefaultFloatVecFieldName,
entity.L2, common.DefaultTopK, sp)
common.CheckOutputFields(t, searchRes[0].Fields, common.GetAllFieldsName(true, false))

// hybrid search
queryVec2 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeBinaryVector)
sReqs := []*client.ANNSearchRequest{
client.NewANNSearchRequest(common.DefaultFloatVecFieldName, entity.L2, "", queryVec1, sp, common.DefaultTopK, client.WithOffset(3)),
client.NewANNSearchRequest(common.DefaultBinaryVecFieldName, entity.JACCARD, "", queryVec2, sp, common.DefaultTopK),
}
_, errSearch := mc.HybridSearch(ctx, collName, []string{}, common.DefaultTopK, []string{}, client.NewRRFReranker(), sReqs)
common.CheckErr(t, errSearch, true)
}

// test mmap collection loaded
func TestMmapCollectionLoaded(t *testing.T) {
ctx := createContext(t, time.Second*common.DefaultTimeout*2)
// connect
mc := createMilvusClient(ctx, t)

// create -> insert -> flush -> index -> load
cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false,
ShardsNum: common.DefaultShards, Dim: common.DefaultDim}

dp := DataParams{DoInsert: true, CollectionFieldsType: Int64FloatVec, start: 0, nb: common.DefaultNb,
dim: common.DefaultDim, EnableDynamicField: false}

collName := prepareCollection(ctx, t, mc, cp, WithDataParams(dp), WithCreateOption(client.WithConsistencyLevel(entity.ClStrong)))

// mmap collection raw data
err := mc.AlterCollection(ctx, collName, entity.Mmap(true))
common.CheckErr(t, err, false, "can not alter mmap properties if collection loaded")

// mmap index
err = mc.AlterIndex(ctx, collName, common.DefaultFloatVecFieldName, client.WithMmap(true))
common.CheckErr(t, err, false, "can't alter index on loaded collection, please release the collection first")
}

// test mmap collection which scalar field indexed
func TestMmapCollectionScalarIndexed(t *testing.T) {
ctx := createContext(t, time.Second*common.DefaultTimeout*2)
// connect
mc := createMilvusClient(ctx, t)

// create -> insert -> flush -> index -> load
cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false,
ShardsNum: common.DefaultShards, Dim: common.DefaultDim}

dp := DataParams{DoInsert: true, CollectionFieldsType: Int64FloatVec, start: 0, nb: common.DefaultNb,
dim: common.DefaultDim, EnableDynamicField: false}

collName := prepareCollection(ctx, t, mc, cp, WithDataParams(dp), WithCreateOption(client.WithConsistencyLevel(entity.ClStrong)))

// create scalar index
for _, fName := range []string{common.DefaultIntFieldName, common.DefaultFloatFieldName} {
err := mc.CreateIndex(ctx, collName, fName, entity.NewScalarIndexWithType(entity.Inverted), false)
common.CheckErr(t, err, true)
}

// mmap collection
mc.ReleaseCollection(ctx, collName)
err := mc.AlterCollection(ctx, collName, entity.Mmap(true))
common.CheckErr(t, err, true)
err = mc.LoadCollection(ctx, collName, false)
common.CheckErr(t, err, true)
}

// test mmap unsupported index: DiskANN, GPU-class
func TestMmapIndexUnsupported(t *testing.T) {
t.Skip("https://github.com/milvus-io/milvus-sdk-go/issues/714")
ctx := createContext(t, time.Second*common.DefaultTimeout*2)
// connect
mc := createMilvusClient(ctx, t)

// create -> insert -> flush -> index -> load
cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false,
ShardsNum: common.DefaultShards, Dim: common.DefaultDim}
collName := createCollection(ctx, t, mc, cp)

dp := DataParams{DoInsert: true, CollectionName: collName, CollectionFieldsType: Int64FloatVec, start: 0,
nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: false}
insertData(ctx, t, mc, dp)
mc.Flush(ctx, collName, false)

//create index with mmap
idx, _ := entity.NewIndexDISKANN(entity.COSINE)
err := mc.CreateIndex(ctx, collName, common.DefaultFloatVecFieldName, idx, false, client.WithMmap(true))
common.CheckErr(t, err, false, "index type DISKANN does not support mmap")

//create scalar index with mmap
for _, idx := range []entity.Index{entity.NewScalarIndex(), entity.NewScalarIndexWithType(entity.Inverted)} {
err := mc.CreateIndex(ctx, collName, common.DefaultFloatFieldName, idx, false, client.WithMmap(true))
common.CheckErr(t, err, false, "does not support mmap")
}
}

func TestAlterIndexMmapUnsupportedIndex(t *testing.T) {
ctx := createContext(t, time.Second*common.DefaultTimeout*2)
// connect
mc := createMilvusClient(ctx, t)

// create -> insert -> flush -> index -> load
cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false,
ShardsNum: common.DefaultShards, Dim: common.DefaultDim}
collName := createCollection(ctx, t, mc, cp)

dp := DataParams{DoInsert: true, CollectionName: collName, CollectionFieldsType: Int64FloatVec, start: 0,
nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: false}
insertData(ctx, t, mc, dp)
mc.Flush(ctx, collName, false)

idxHnsw, _ := entity.NewIndexDISKANN(entity.IP)
err := mc.CreateIndex(ctx, collName, common.DefaultFloatVecFieldName, idxHnsw, false)
common.CheckErr(t, err, true)
for _, mmap := range []bool{true, false} {
err = mc.AlterIndex(ctx, collName, common.DefaultFloatVecFieldName, client.WithMmap(mmap))
common.CheckErr(t, err, false, "index type DISKANN does not support mmap: invalid parameter")
}
}

func TestMmapAlterIndex(t *testing.T) {
t.Parallel()
for _, mmap := range []bool{true, false} {
ctx := createContext(t, time.Second*common.DefaultTimeout*2)
// connect
mc := createMilvusClient(ctx, t)

// create -> insert -> flush -> index -> load
cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false,
ShardsNum: common.DefaultShards, Dim: common.DefaultDim}
collName := createCollection(ctx, t, mc, cp)

dp := DataParams{DoInsert: true, CollectionName: collName, CollectionFieldsType: Int64FloatVec, start: 0,
nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: false}
insertData(ctx, t, mc, dp)
mc.Flush(ctx, collName, false)

// create index and enable mmap
idxHnsw, _ := entity.NewIndexHNSW(entity.COSINE, 8, 96)
err := mc.CreateIndex(ctx, collName, common.DefaultFloatVecFieldName, idxHnsw, false, client.WithMmap(!mmap))
common.CheckErr(t, err, true)

// alter index and enable mmap
err = mc.AlterIndex(ctx, collName, common.DefaultFloatVecFieldName, client.WithMmap(mmap))
common.CheckErr(t, err, true)

idx, _ := mc.DescribeIndex(ctx, collName, common.DefaultFloatVecFieldName)
require.Equal(t, strconv.FormatBool(mmap), idx[0].Params()["mmap.enabled"])

err = mc.LoadCollection(ctx, collName, false)
common.CheckErr(t, err, true)

queryVec1 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeFloatVector)
sp, _ := entity.NewIndexHNSWSearchParam(74)
expr := fmt.Sprintf("%s > 10", common.DefaultIntFieldName)
searchRes, _ := mc.Search(ctx, collName, []string{}, expr, []string{"*"}, queryVec1, common.DefaultFloatVecFieldName,
entity.COSINE, common.DefaultTopK, sp)
common.CheckOutputFields(t, searchRes[0].Fields, []string{common.DefaultIntFieldName, common.DefaultFloatFieldName, common.DefaultFloatVecFieldName})
}
}
Loading