From 849300d6d8593f92c3ad48d2aa255b41400d9721 Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Mon, 11 Sep 2023 10:23:12 +0800 Subject: [PATCH] add delete by expression method (#578) Signed-off-by: aoiasd --- client/client.go | 2 ++ client/data_test.go | 66 ++++++++++++++++++++++++++++++++++++++ client/insert.go | 38 ++++++++++++++++++++++ test/base/milvus_client.go | 8 +++++ 4 files changed, 114 insertions(+) diff --git a/client/client.go b/client/client.go index 5f3cd730..dda234d7 100644 --- a/client/client.go +++ b/client/client.go @@ -130,6 +130,8 @@ type Client interface { FlushV2(ctx context.Context, collName string, async bool) ([]int64, []int64, int64, error) // DeleteByPks deletes entries related to provided primary keys DeleteByPks(ctx context.Context, collName string, partitionName string, ids entity.Column) error + // Delete deletes entries match expression + Delete(ctx context.Context, collName string, partitionName string, expr string) error // Upsert column-based data of collection, returns id column values Upsert(ctx context.Context, collName string, partitionName string, columns ...entity.Column) (entity.Column, error) // Search search with bool expression diff --git a/client/data_test.go b/client/data_test.go index e580ee9a..0e787b68 100644 --- a/client/data_test.go +++ b/client/data_test.go @@ -267,6 +267,72 @@ func TestGrpcDeleteByPks(t *testing.T) { }) } +func TestGrpcDelete(t *testing.T) { + ctx := context.Background() + + c := testClient(ctx, t) + defer c.Close() + + mockServer.SetInjection(MDescribeCollection, describeCollectionInjection(t, 1, testCollectionName, defaultSchema())) + defer mockServer.DelInjection(MDescribeCollection) + + t.Run("normal delete by pks", func(t *testing.T) { + partName := "testPart" + mockServer.SetInjection(MHasPartition, hasPartitionInjection(t, testCollectionName, true, partName)) + defer mockServer.DelInjection(MHasPartition) + mockServer.SetInjection(MDelete, func(_ context.Context, raw proto.Message) (proto.Message, error) { + req, ok := raw.(*milvuspb.DeleteRequest) + if !ok { + t.FailNow() + } + assert.Equal(t, testCollectionName, req.GetCollectionName()) + assert.Equal(t, partName, req.GetPartitionName()) + + resp := &milvuspb.MutationResult{} + s, err := SuccessStatus() + resp.Status = s + return resp, err + }) + defer mockServer.DelInjection(MDelete) + + err := c.Delete(ctx, testCollectionName, partName, "") + assert.NoError(t, err) + }) + + t.Run("Bad request deletes", func(t *testing.T) { + partName := "testPart" + mockServer.SetInjection(MHasPartition, hasPartitionInjection(t, testCollectionName, false, partName)) + defer mockServer.DelInjection(MHasPartition) + + // non-exist collection + err := c.Delete(ctx, "non-exists-collection", "", "") + assert.Error(t, err) + + // non-exist parition + err = c.Delete(ctx, testCollectionName, "non-exists-part", "") + assert.Error(t, err) + }) + t.Run("delete services fail", func(t *testing.T) { + mockServer.SetInjection(MDelete, func(_ context.Context, raw proto.Message) (proto.Message, error) { + resp := &milvuspb.MutationResult{} + return resp, errors.New("mockServer.d error") + }) + + err := c.Delete(ctx, testCollectionName, "", "") + assert.Error(t, err) + + mockServer.SetInjection(MDelete, func(_ context.Context, raw proto.Message) (proto.Message, error) { + resp := &milvuspb.MutationResult{} + resp.Status = &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + } + return resp, nil + }) + err = c.Delete(ctx, testCollectionName, "", "") + assert.Error(t, err) + }) +} + type SearchSuite struct { MockSuiteBase sch *entity.Schema diff --git a/client/insert.go b/client/insert.go index 24fb6828..43ccfb88 100644 --- a/client/insert.go +++ b/client/insert.go @@ -299,6 +299,44 @@ func (c *GrpcClient) DeleteByPks(ctx context.Context, collName string, partition return nil } +// Delete deletes entries match expression +func (c *GrpcClient) Delete(ctx context.Context, collName string, partitionName string, expr string) error { + if c.Service == nil { + return ErrClientNotReady + } + + // check collection name + if err := c.checkCollectionExists(ctx, collName); err != nil { + return err + } + + // check partition name + if partitionName != "" { + err := c.checkPartitionExists(ctx, collName, partitionName) + if err != nil { + return err + } + } + + req := &milvuspb.DeleteRequest{ + DbName: "", + CollectionName: collName, + PartitionName: partitionName, + Expr: expr, + } + + resp, err := c.Service.Delete(ctx, req) + if err != nil { + return err + } + err = handleRespStatus(resp.GetStatus()) + if err != nil { + return err + } + MetaCache.setSessionTs(collName, resp.Timestamp) + return nil +} + // Upsert Index into collection with column-based format // collName is the collection name // partitionName is the partition to upsert, if not specified(empty), default partition will be used diff --git a/test/base/milvus_client.go b/test/base/milvus_client.go index 61da33ca..39921a5c 100644 --- a/test/base/milvus_client.go +++ b/test/base/milvus_client.go @@ -382,6 +382,14 @@ func (mc *MilvusClient) DeleteByPks(ctx context.Context, collName string, partit return err } +// Delete deletes entries match expression +func (mc *MilvusClient) Delete(ctx context.Context, collName string, partitionName string, expr string) error { + preRequest("DeleteByPks", ctx, collName, partitionName, expr) + err := mc.mClient.Delete(ctx, collName, partitionName, expr) + postResponse("DeleteByPks", err) + return err +} + // Search search from collection func (mc *MilvusClient) Search(ctx context.Context, collName string, partitions []string, expr string, outputFields []string, vectors []entity.Vector, vectorField string, metricType entity.MetricType, topK int, sp entity.SearchParam, opts ...client.SearchQueryOptionFunc,