Skip to content

Commit

Permalink
Add test cases for mmap
Browse files Browse the repository at this point in the history
Signed-off-by: ThreadDao <[email protected]>
  • Loading branch information
ThreadDao committed Apr 17, 2024
1 parent c711641 commit 4e5d2e6
Show file tree
Hide file tree
Showing 2 changed files with 321 additions and 0 deletions.
16 changes: 16 additions & 0 deletions test/base/milvus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ func (mc *MilvusClient) HasCollection(ctx context.Context, collName string) (boo
return has, err
}

// AlterCollection changes collection attributes
func (mc *MilvusClient) AlterCollection(ctx context.Context, collName string, attrs ...entity.CollectionAttribute) error {
preRequest("AlterCollection", ctx, collName)
err := mc.mClient.AlterCollection(ctx, collName, attrs...)
postResponse("AlterCollection", err)
return err
}

// -- alias --

// CreateAlias Create Alias
Expand Down Expand Up @@ -356,6 +364,14 @@ func (mc *MilvusClient) GetIndexState(ctx context.Context, collName string, fiel
return indexState, err
}

// AlterIndex modifies the index params.
func (mc *MilvusClient) AlterIndex(ctx context.Context, collName string, indexName string, opts ...client.IndexOption) error {
preRequest("AlterIndex", ctx, collName, indexName, opts)
err := mc.mClient.AlterIndex(ctx, collName, indexName, opts...)
postResponse("AlterIndex", err)
return err
}

// -- basic operation --

// Insert insert data
Expand Down
305 changes: 305 additions & 0 deletions test/testcases/load_release_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package testcases
import (
"fmt"
"log"
"strconv"
"testing"
"time"

"github.com/milvus-io/milvus-sdk-go/v2/client"

"github.com/milvus-io/milvus-sdk-go/v2/entity"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -512,3 +515,305 @@ func TestReleaseMultiPartitions(t *testing.T) {
[]string{common.DefaultIntFieldName})
common.CheckErr(t, errQuery, false, "collection not loaded")
}

// test mmap collection raw data and index
// create -> insert -> flush -> index with mmap -> load -> alter collection with mmap -> reload -> read op
func TestMmapCollectionIndexDefault(t *testing.T) {
ctx := createContext(t, time.Second*common.DefaultTimeout*2)
// connect
mc := createMilvusClient(ctx, t)

// create -> insert [0, 3000) -> flush
cp := CollectionParams{CollectionFieldsType: AllFields, AutoID: false, EnableDynamicField: true,
ShardsNum: common.DefaultShards, Dim: common.DefaultDim}
collName := createCollection(ctx, t, mc, cp)

dp := DataParams{DoInsert: true, CollectionName: collName, CollectionFieldsType: AllFields, start: 0,
nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: true}
insertData(ctx, t, mc, dp)
_ = mc.Flush(ctx, collName, false)

// create vector index with mmap
GenDefaultIndexParamsForAllVectors()
indexHnsw, _ := entity.NewIndexHNSW(entity.L2, 8, 96)
indexBinary, _ := entity.NewIndexBinIvfFlat(entity.JACCARD, 64)
for _, fieldName := range common.AllVectorsFieldsName {
if fieldName == common.DefaultBinaryVecFieldName {
err := mc.CreateIndex(ctx, collName, fieldName, indexBinary, false, client.WithMmap(true))
common.CheckErr(t, err, true)
} else if fieldName == common.DefaultFloatVecFieldName {
err := mc.CreateIndex(ctx, collName, fieldName, indexHnsw, false, client.WithMmap(true))
common.CheckErr(t, err, true)
} else {
err := mc.CreateIndex(ctx, collName, fieldName, indexHnsw, false)
common.CheckErr(t, err, true)
}
}

// describe index and check mmap
for _, fieldName := range common.AllVectorsFieldsName {
if fieldName == common.DefaultFloatVecFieldName || fieldName == common.DefaultBinaryVecFieldName {
indexes, _ := mc.DescribeIndex(ctx, collName, fieldName)
require.Len(t, indexes, 1)
// check index mmap
require.Equal(t, "true", indexes[0].Params()["mmap.enabled"])
}
}

// load collection -> describe collection and check mmap false
_ = mc.LoadCollection(ctx, collName, false)
coll, _ := mc.DescribeCollection(ctx, collName)
require.Equal(t, "", coll.Properties["mmap.enabled"])

// alter collection and check collection mmap
_ = mc.ReleaseCollection(ctx, collName)
err := mc.AlterCollection(ctx, collName, entity.Mmap(true))
common.CheckErr(t, err, true)

//describe collection
mc.LoadCollection(ctx, collName, false)
coll, _ = mc.DescribeCollection(ctx, collName)
require.Equal(t, "true", coll.Properties["mmap.enabled"])

// query
queryRes, err := mc.Query(ctx, collName, []string{}, fmt.Sprintf("%s < 10", common.DefaultIntFieldName), []string{"*"})
common.CheckErr(t, err, true)
require.Equal(t, queryRes.GetColumn(common.DefaultIntFieldName).Len(), 10)
common.CheckOutputFields(t, queryRes, common.GetAllFieldsName(true, false))

// search
queryVec1 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeFloatVector)
sp, _ := entity.NewIndexHNSWSearchParam(74)
expr := fmt.Sprintf("%s > 10", common.DefaultIntFieldName)
searchRes, _ := mc.Search(ctx, collName, []string{}, expr, []string{"*"}, queryVec1, common.DefaultFloatVecFieldName,
entity.L2, common.DefaultTopK, sp)
common.CheckOutputFields(t, searchRes[0].Fields, common.GetAllFieldsName(true, false))

// hybrid search
queryVec2 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeBinaryVector)
sReqs := []*client.ANNSearchRequest{
client.NewANNSearchRequest(common.DefaultFloatVecFieldName, entity.L2, "", queryVec1, sp, common.DefaultTopK, client.WithOffset(3)),
client.NewANNSearchRequest(common.DefaultBinaryVecFieldName, entity.JACCARD, "", queryVec2, sp, common.DefaultTopK),
}
_, errSearch := mc.HybridSearch(ctx, collName, []string{}, common.DefaultTopK, []string{}, client.NewRRFReranker(), sReqs)
common.CheckErr(t, errSearch, true)
}

// test mmap collection raw data and index
// create -> insert -> flush -> index -> load -> alter collection with mmap -> alter index with mmap -> reload -> read op
func TestMmapAlterCollectionIndexDefault(t *testing.T) {
ctx := createContext(t, time.Second*common.DefaultTimeout*2)
// connect
mc := createMilvusClient(ctx, t)

// create -> insert [0, 3000) -> flush
cp := CollectionParams{CollectionFieldsType: AllFields, AutoID: false, EnableDynamicField: true,
ShardsNum: common.DefaultShards, Dim: common.DefaultDim}

dp := DataParams{DoInsert: true, CollectionFieldsType: AllFields, start: 0,
nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: true}

ips := GenDefaultIndexParamsForAllVectors()
collName := prepareCollection(ctx, t, mc, cp, WithDataParams(dp), WithIndexParams(ips), WithCreateOption(client.WithConsistencyLevel(entity.ClStrong)))

// describe index and check mmap
for _, fieldName := range common.AllVectorsFieldsName {
indexes, _ := mc.DescribeIndex(ctx, collName, fieldName)
// check index mmap
require.Equal(t, "", indexes[0].Params()["mmap.enabled"])
}

//describe collection
mc.LoadCollection(ctx, collName, false)
coll, _ := mc.DescribeCollection(ctx, collName)
require.Equal(t, "", coll.Properties["mmap.enabled"])

// alter mmap: collection and index
_ = mc.ReleaseCollection(ctx, collName)
err := mc.AlterCollection(ctx, collName, entity.Mmap(true))
common.CheckErr(t, err, true)
for _, fieldName := range common.AllVectorsFieldsName {
err := mc.AlterIndex(ctx, collName, fieldName, client.WithMmap(true))
common.CheckErr(t, err, true)
}

// load collection -> describe collection and check mmap false
//describe collection
mc.LoadCollection(ctx, collName, false)
coll, _ = mc.DescribeCollection(ctx, collName)
require.Equal(t, "true", coll.Properties["mmap.enabled"])
for _, fieldName := range common.AllVectorsFieldsName {
idx, _ := mc.DescribeIndex(ctx, collName, fieldName)
require.Equal(t, "true", idx[0].Params()["mmap.enabled"])
}

// query
queryRes, err := mc.Query(ctx, collName, []string{}, fmt.Sprintf("%s < 10", common.DefaultIntFieldName), []string{"*"})
common.CheckErr(t, err, true)
require.Equal(t, queryRes.GetColumn(common.DefaultIntFieldName).Len(), 10)
common.CheckOutputFields(t, queryRes, common.GetAllFieldsName(true, false))

// search
queryVec1 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeFloatVector)
sp, _ := entity.NewIndexHNSWSearchParam(74)
expr := fmt.Sprintf("%s > 10", common.DefaultIntFieldName)
searchRes, _ := mc.Search(ctx, collName, []string{}, expr, []string{"*"}, queryVec1, common.DefaultFloatVecFieldName,
entity.L2, common.DefaultTopK, sp)
common.CheckOutputFields(t, searchRes[0].Fields, common.GetAllFieldsName(true, false))

// hybrid search
queryVec2 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeBinaryVector)
sReqs := []*client.ANNSearchRequest{
client.NewANNSearchRequest(common.DefaultFloatVecFieldName, entity.L2, "", queryVec1, sp, common.DefaultTopK, client.WithOffset(3)),
client.NewANNSearchRequest(common.DefaultBinaryVecFieldName, entity.JACCARD, "", queryVec2, sp, common.DefaultTopK),
}
_, errSearch := mc.HybridSearch(ctx, collName, []string{}, common.DefaultTopK, []string{}, client.NewRRFReranker(), sReqs)
common.CheckErr(t, errSearch, true)
}

// test mmap collection loaded
func TestMmapCollectionLoaded(t *testing.T) {
ctx := createContext(t, time.Second*common.DefaultTimeout*2)
// connect
mc := createMilvusClient(ctx, t)

// create -> insert -> flush -> index -> load
cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false,
ShardsNum: common.DefaultShards, Dim: common.DefaultDim}

dp := DataParams{DoInsert: true, CollectionFieldsType: Int64FloatVec, start: 0, nb: common.DefaultNb,
dim: common.DefaultDim, EnableDynamicField: false}

collName := prepareCollection(ctx, t, mc, cp, WithDataParams(dp), WithCreateOption(client.WithConsistencyLevel(entity.ClStrong)))

// mmap collection raw data
err := mc.AlterCollection(ctx, collName, entity.Mmap(true))
common.CheckErr(t, err, false, "can not alter mmap properties if collection loaded")

// mmap index
err = mc.AlterIndex(ctx, collName, common.DefaultFloatVecFieldName, client.WithMmap(true))
common.CheckErr(t, err, false, "can't alter index on loaded collection, please release the collection first")
}

// test mmap collection which scalar field indexed
func TestMmapCollectionScalarIndexed(t *testing.T) {
ctx := createContext(t, time.Second*common.DefaultTimeout*2)
// connect
mc := createMilvusClient(ctx, t)

// create -> insert -> flush -> index -> load
cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false,
ShardsNum: common.DefaultShards, Dim: common.DefaultDim}

dp := DataParams{DoInsert: true, CollectionFieldsType: Int64FloatVec, start: 0, nb: common.DefaultNb,
dim: common.DefaultDim, EnableDynamicField: false}

collName := prepareCollection(ctx, t, mc, cp, WithDataParams(dp), WithCreateOption(client.WithConsistencyLevel(entity.ClStrong)))

// create scalar index
for _, fName := range []string{common.DefaultIntFieldName, common.DefaultFloatFieldName} {
err := mc.CreateIndex(ctx, collName, fName, entity.NewScalarIndexWithType(entity.Inverted), false)
common.CheckErr(t, err, true)
}

// mmap collection
mc.ReleaseCollection(ctx, collName)
err := mc.AlterCollection(ctx, collName, entity.Mmap(true))
common.CheckErr(t, err, true)
err = mc.LoadCollection(ctx, collName, false)
common.CheckErr(t, err, true)
}

// test mmap unsupported index: DiskANN, GPU-class
func TestMmapIndexUnsupported(t *testing.T) {
t.Skip("https://github.com/milvus-io/milvus-sdk-go/issues/714")
ctx := createContext(t, time.Second*common.DefaultTimeout*2)
// connect
mc := createMilvusClient(ctx, t)

// create -> insert -> flush -> index -> load
cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false,
ShardsNum: common.DefaultShards, Dim: common.DefaultDim}
collName := createCollection(ctx, t, mc, cp)

dp := DataParams{DoInsert: true, CollectionName: collName, CollectionFieldsType: Int64FloatVec, start: 0,
nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: false}
insertData(ctx, t, mc, dp)
mc.Flush(ctx, collName, false)

//create index with mmap
idx, _ := entity.NewIndexDISKANN(entity.COSINE)
err := mc.CreateIndex(ctx, collName, common.DefaultFloatVecFieldName, idx, false, client.WithMmap(true))
common.CheckErr(t, err, false, "index type DISKANN does not support mmap")

//create scalar index with mmap
for _, idx := range []entity.Index{entity.NewScalarIndex(), entity.NewScalarIndexWithType(entity.Inverted)} {
err := mc.CreateIndex(ctx, collName, common.DefaultFloatFieldName, idx, false, client.WithMmap(true))
common.CheckErr(t, err, false, "does not support mmap")
}
}

func TestAlterIndexMmapUnsupportedIndex(t *testing.T) {
ctx := createContext(t, time.Second*common.DefaultTimeout*2)
// connect
mc := createMilvusClient(ctx, t)

// create -> insert -> flush -> index -> load
cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false,
ShardsNum: common.DefaultShards, Dim: common.DefaultDim}
collName := createCollection(ctx, t, mc, cp)

dp := DataParams{DoInsert: true, CollectionName: collName, CollectionFieldsType: Int64FloatVec, start: 0,
nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: false}
insertData(ctx, t, mc, dp)
mc.Flush(ctx, collName, false)

idxHnsw, _ := entity.NewIndexDISKANN(entity.IP)
err := mc.CreateIndex(ctx, collName, common.DefaultFloatVecFieldName, idxHnsw, false)
common.CheckErr(t, err, true)
for _, mmap := range []bool{true, false} {
err = mc.AlterIndex(ctx, collName, common.DefaultFloatVecFieldName, client.WithMmap(mmap))
common.CheckErr(t, err, false, "index type DISKANN does not support mmap: invalid parameter")
}
}

func TestMmapAlterIndex(t *testing.T) {
t.Parallel()
for _, mmap := range []bool{true, false} {
ctx := createContext(t, time.Second*common.DefaultTimeout*2)
// connect
mc := createMilvusClient(ctx, t)

// create -> insert -> flush -> index -> load
cp := CollectionParams{CollectionFieldsType: Int64FloatVec, AutoID: false, EnableDynamicField: false,
ShardsNum: common.DefaultShards, Dim: common.DefaultDim}
collName := createCollection(ctx, t, mc, cp)

dp := DataParams{DoInsert: true, CollectionName: collName, CollectionFieldsType: Int64FloatVec, start: 0,
nb: common.DefaultNb, dim: common.DefaultDim, EnableDynamicField: false}
insertData(ctx, t, mc, dp)
mc.Flush(ctx, collName, false)

// create index and enable mmap
idxHnsw, _ := entity.NewIndexHNSW(entity.COSINE, 8, 96)
err := mc.CreateIndex(ctx, collName, common.DefaultFloatVecFieldName, idxHnsw, false, client.WithMmap(!mmap))
common.CheckErr(t, err, true)

// alter index and enable mmap
err = mc.AlterIndex(ctx, collName, common.DefaultFloatVecFieldName, client.WithMmap(mmap))
common.CheckErr(t, err, true)

idx, _ := mc.DescribeIndex(ctx, collName, common.DefaultFloatVecFieldName)
require.Equal(t, strconv.FormatBool(mmap), idx[0].Params()["mmap.enabled"])

err = mc.LoadCollection(ctx, collName, false)
common.CheckErr(t, err, true)

queryVec1 := common.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeFloatVector)
sp, _ := entity.NewIndexHNSWSearchParam(74)
expr := fmt.Sprintf("%s > 10", common.DefaultIntFieldName)
searchRes, _ := mc.Search(ctx, collName, []string{}, expr, []string{"*"}, queryVec1, common.DefaultFloatVecFieldName,
entity.COSINE, common.DefaultTopK, sp)
common.CheckOutputFields(t, searchRes[0].Fields, []string{common.DefaultIntFieldName, common.DefaultFloatFieldName, common.DefaultFloatVecFieldName})
}
}

0 comments on commit 4e5d2e6

Please sign in to comment.