Skip to content

Commit

Permalink
fix: [2.4] execute method should return an error when result is a f…
Browse files Browse the repository at this point in the history
…ailure (#34872)

- issue: #34812
- pr: #34813

Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG authored Jul 22, 2024
1 parent 7540714 commit 4e95f38
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 124 deletions.
5 changes: 1 addition & 4 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3993,12 +3993,9 @@ func (node *Proxy) FlushAll(ctx context.Context, req *milvuspb.FlushAllRequest)
DbName: dbName,
CollectionNames: []string{collection},
})
if err != nil {
if err = merr.CheckRPCCall(flushRsp, err); err != nil {
return err
}
if flushRsp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return merr.Error(flushRsp.GetStatus())
}
return nil
})
}
Expand Down
96 changes: 21 additions & 75 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error {
func (t *createCollectionTask) Execute(ctx context.Context) error {
var err error
t.result, err = t.rootCoord.CreateCollection(ctx, t.CreateCollectionRequest)
return err
return merr.CheckRPCCall(t.result, err)
}

func (t *createCollectionTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -469,7 +469,7 @@ func (t *dropCollectionTask) PreExecute(ctx context.Context) error {
func (t *dropCollectionTask) Execute(ctx context.Context) error {
var err error
t.result, err = t.rootCoord.DropCollection(ctx, t.DropCollectionRequest)
return err
return merr.CheckRPCCall(t.result, err)
}

func (t *dropCollectionTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -537,16 +537,7 @@ func (t *hasCollectionTask) PreExecute(ctx context.Context) error {
func (t *hasCollectionTask) Execute(ctx context.Context) error {
var err error
t.result, err = t.rootCoord.HasCollection(ctx, t.HasCollectionRequest)
if err != nil {
return err
}
if t.result == nil {
return errors.New("has collection resp is nil")
}
if t.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return merr.Error(t.result.GetStatus())
}
return nil
return merr.CheckRPCCall(t.result, err)
}

func (t *hasCollectionTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -757,18 +748,10 @@ func (t *showCollectionsTask) PreExecute(ctx context.Context) error {
func (t *showCollectionsTask) Execute(ctx context.Context) error {
ctx = AppendUserInfoForRPC(ctx)
respFromRootCoord, err := t.rootCoord.ShowCollections(ctx, t.ShowCollectionsRequest)
if err != nil {
if err = merr.CheckRPCCall(respFromRootCoord, err); err != nil {
return err
}

if respFromRootCoord == nil {
return errors.New("failed to show collections")
}

if respFromRootCoord.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return merr.Error(respFromRootCoord.GetStatus())
}

if t.GetType() == milvuspb.ShowType_InMemory {
IDs2Names := make(map[UniqueID]string)
for offset, collectionName := range respFromRootCoord.CollectionNames {
Expand Down Expand Up @@ -1030,7 +1013,7 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
func (t *alterCollectionTask) Execute(ctx context.Context) error {
var err error
t.result, err = t.rootCoord.AlterCollection(ctx, t.AlterCollectionRequest)
return err
return merr.CheckRPCCall(t.result, err)
}

func (t *alterCollectionTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -1112,13 +1095,7 @@ func (t *createPartitionTask) PreExecute(ctx context.Context) error {

func (t *createPartitionTask) Execute(ctx context.Context) (err error) {
t.result, err = t.rootCoord.CreatePartition(ctx, t.CreatePartitionRequest)
if err != nil {
return err
}
if t.result.ErrorCode != commonpb.ErrorCode_Success {
return errors.New(t.result.Reason)
}
return err
return merr.CheckRPCCall(t.result, err)
}

func (t *createPartitionTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -1227,13 +1204,7 @@ func (t *dropPartitionTask) PreExecute(ctx context.Context) error {

func (t *dropPartitionTask) Execute(ctx context.Context) (err error) {
t.result, err = t.rootCoord.DropPartition(ctx, t.DropPartitionRequest)
if err != nil {
return err
}
if t.result.ErrorCode != commonpb.ErrorCode_Success {
return errors.New(t.result.Reason)
}
return err
return merr.CheckRPCCall(t.result, err)
}

func (t *dropPartitionTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -1306,13 +1277,7 @@ func (t *hasPartitionTask) PreExecute(ctx context.Context) error {

func (t *hasPartitionTask) Execute(ctx context.Context) (err error) {
t.result, err = t.rootCoord.HasPartition(ctx, t.HasPartitionRequest)
if err != nil {
return err
}
if t.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return merr.Error(t.result.GetStatus())
}
return err
return merr.CheckRPCCall(t.result, err)
}

func (t *hasPartitionTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -1389,18 +1354,10 @@ func (t *showPartitionsTask) PreExecute(ctx context.Context) error {

func (t *showPartitionsTask) Execute(ctx context.Context) error {
respFromRootCoord, err := t.rootCoord.ShowPartitions(ctx, t.ShowPartitionsRequest)
if err != nil {
if err = merr.CheckRPCCall(respFromRootCoord, err); err != nil {
return err
}

if respFromRootCoord == nil {
return errors.New("failed to show partitions")
}

if respFromRootCoord.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return merr.Error(respFromRootCoord.GetStatus())
}

if t.GetType() == milvuspb.ShowType_InMemory {
collectionName := t.CollectionName
collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), collectionName)
Expand Down Expand Up @@ -1433,18 +1390,10 @@ func (t *showPartitionsTask) Execute(ctx context.Context) error {
CollectionID: collectionID,
PartitionIDs: partitionIDs,
})
if err != nil {
if err = merr.CheckRPCCall(resp, err); err != nil {
return err
}

if resp == nil {
return errors.New("failed to show partitions")
}

if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return merr.Error(resp.GetStatus())
}

t.result = &milvuspb.ShowPartitionsResponse{
Status: resp.Status,
PartitionNames: make([]string, 0, len(resp.PartitionIDs)),
Expand Down Expand Up @@ -1559,12 +1508,9 @@ func (t *flushTask) Execute(ctx context.Context) error {
CollectionID: collID,
}
resp, err := t.dataCoord.Flush(ctx, flushReq)
if err != nil {
if err = merr.CheckRPCCall(resp, err); err != nil {
return fmt.Errorf("failed to call flush to data coordinator: %s", err.Error())
}
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return merr.Error(resp.GetStatus())
}
coll2Segments[collName] = &schemapb.LongArray{Data: resp.GetSegmentIDs()}
flushColl2Segments[collName] = &schemapb.LongArray{Data: resp.GetFlushSegmentIDs()}
coll2SealTimes[collName] = resp.GetTimeOfSeal()
Expand Down Expand Up @@ -1723,7 +1669,7 @@ func (t *loadCollectionTask) Execute(ctx context.Context) (err error) {
log.Debug("send LoadCollectionRequest to query coordinator",
zap.Any("schema", request.Schema))
t.result, err = t.queryCoord.LoadCollection(ctx, request)
if err != nil {
if err = merr.CheckRPCCall(t.result, err); err != nil {
return fmt.Errorf("call query coordinator LoadCollection: %s", err)
}
SendReplicateMessagePack(ctx, t.replicateMsgStream, t.LoadCollectionRequest)
Expand Down Expand Up @@ -1821,7 +1767,7 @@ func (t *releaseCollectionTask) Execute(ctx context.Context) (err error) {
}

t.result, err = t.queryCoord.ReleaseCollection(ctx, request)
if err != nil {
if err = merr.CheckRPCCall(t.result, err); err != nil {
return err
}

Expand Down Expand Up @@ -1973,7 +1919,7 @@ func (t *loadPartitionsTask) Execute(ctx context.Context) error {
ResourceGroups: t.ResourceGroups,
}
t.result, err = t.queryCoord.LoadPartitions(ctx, request)
if err != nil {
if err = merr.CheckRPCCall(t.result, err); err != nil {
return err
}
SendReplicateMessagePack(ctx, t.replicateMsgStream, t.LoadPartitionsRequest)
Expand Down Expand Up @@ -2081,7 +2027,7 @@ func (t *releasePartitionsTask) Execute(ctx context.Context) (err error) {
PartitionIDs: partitionIDs,
}
t.result, err = t.queryCoord.ReleasePartitions(ctx, request)
if err != nil {
if err = merr.CheckRPCCall(t.result, err); err != nil {
return err
}
SendReplicateMessagePack(ctx, t.replicateMsgStream, t.ReleasePartitionsRequest)
Expand Down Expand Up @@ -2151,7 +2097,7 @@ func (t *CreateResourceGroupTask) PreExecute(ctx context.Context) error {
func (t *CreateResourceGroupTask) Execute(ctx context.Context) error {
var err error
t.result, err = t.queryCoord.CreateResourceGroup(ctx, t.CreateResourceGroupRequest)
return err
return merr.CheckRPCCall(t.result, err)
}

func (t *CreateResourceGroupTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -2219,7 +2165,7 @@ func (t *UpdateResourceGroupsTask) Execute(ctx context.Context) error {
Base: t.UpdateResourceGroupsRequest.GetBase(),
ResourceGroups: t.UpdateResourceGroupsRequest.GetResourceGroups(),
})
return err
return merr.CheckRPCCall(t.result, err)
}

func (t *UpdateResourceGroupsTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -2284,7 +2230,7 @@ func (t *DropResourceGroupTask) PreExecute(ctx context.Context) error {
func (t *DropResourceGroupTask) Execute(ctx context.Context) error {
var err error
t.result, err = t.queryCoord.DropResourceGroup(ctx, t.DropResourceGroupRequest)
return err
return merr.CheckRPCCall(t.result, err)
}

func (t *DropResourceGroupTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -2475,7 +2421,7 @@ func (t *TransferNodeTask) PreExecute(ctx context.Context) error {
func (t *TransferNodeTask) Execute(ctx context.Context) error {
var err error
t.result, err = t.queryCoord.TransferNode(ctx, t.TransferNodeRequest)
return err
return merr.CheckRPCCall(t.result, err)
}

func (t *TransferNodeTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -2549,7 +2495,7 @@ func (t *TransferReplicaTask) Execute(ctx context.Context) error {
CollectionID: collID,
NumReplica: t.NumReplica,
})
return err
return merr.CheckRPCCall(t.result, err)
}

func (t *TransferReplicaTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -2614,7 +2560,7 @@ func (t *ListResourceGroupsTask) PreExecute(ctx context.Context) error {
func (t *ListResourceGroupsTask) Execute(ctx context.Context) error {
var err error
t.result, err = t.queryCoord.ListResourceGroups(ctx, t.ListResourceGroupsRequest)
return err
return merr.CheckRPCCall(t.result, err)
}

func (t *ListResourceGroupsTask) PostExecute(ctx context.Context) error {
Expand Down
11 changes: 6 additions & 5 deletions internal/proxy/task_alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand Down Expand Up @@ -106,7 +107,7 @@ func (t *CreateAliasTask) PreExecute(ctx context.Context) error {
func (t *CreateAliasTask) Execute(ctx context.Context) error {
var err error
t.result, err = t.rootCoord.CreateAlias(ctx, t.CreateAliasRequest)
return err
return merr.CheckRPCCall(t.result, err)
}

// PostExecute defines the post execution, do nothing for create alias
Expand Down Expand Up @@ -180,7 +181,7 @@ func (t *DropAliasTask) PreExecute(ctx context.Context) error {
func (t *DropAliasTask) Execute(ctx context.Context) error {
var err error
t.result, err = t.rootCoord.DropAlias(ctx, t.DropAliasRequest)
return err
return merr.CheckRPCCall(t.result, err)
}

func (t *DropAliasTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -257,7 +258,7 @@ func (t *AlterAliasTask) PreExecute(ctx context.Context) error {
func (t *AlterAliasTask) Execute(ctx context.Context) error {
var err error
t.result, err = t.rootCoord.AlterAlias(ctx, t.AlterAliasRequest)
return err
return merr.CheckRPCCall(t.result, err)
}

func (t *AlterAliasTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -325,7 +326,7 @@ func (a *DescribeAliasTask) PreExecute(ctx context.Context) error {
func (a *DescribeAliasTask) Execute(ctx context.Context) error {
var err error
a.result, err = a.rootCoord.DescribeAlias(ctx, a.DescribeAliasRequest)
return err
return merr.CheckRPCCall(a.result, err)
}

func (a *DescribeAliasTask) PostExecute(ctx context.Context) error {
Expand Down Expand Up @@ -395,7 +396,7 @@ func (a *ListAliasesTask) PreExecute(ctx context.Context) error {
func (a *ListAliasesTask) Execute(ctx context.Context) error {
var err error
a.result, err = a.rootCoord.ListAliases(ctx, a.ListAliasesRequest)
return err
return merr.CheckRPCCall(a.result, err)
}

func (a *ListAliasesTask) PostExecute(ctx context.Context) error {
Expand Down
8 changes: 4 additions & 4 deletions internal/proxy/task_alias_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestCreateAlias_all(t *testing.T) {
task.CreateAliasRequest.CollectionName = collectionName

assert.NoError(t, task.PreExecute(ctx))
assert.NoError(t, task.Execute(ctx))
assert.Error(t, task.Execute(ctx))
assert.NoError(t, task.PostExecute(ctx))
}

Expand Down Expand Up @@ -107,7 +107,7 @@ func TestDropAlias_all(t *testing.T) {
assert.Equal(t, ts, task.EndTs())

assert.NoError(t, task.PreExecute(ctx))
assert.NoError(t, task.Execute(ctx))
assert.Error(t, task.Execute(ctx))
assert.NoError(t, task.PostExecute(ctx))
}

Expand Down Expand Up @@ -153,7 +153,7 @@ func TestAlterAlias_all(t *testing.T) {
task.AlterAliasRequest.CollectionName = collectionName

assert.NoError(t, task.PreExecute(ctx))
assert.NoError(t, task.Execute(ctx))
assert.Error(t, task.Execute(ctx))
assert.NoError(t, task.PostExecute(ctx))
}

Expand Down Expand Up @@ -193,7 +193,7 @@ func TestDescribeAlias_all(t *testing.T) {
assert.Equal(t, ts, task.EndTs())

assert.NoError(t, task.PreExecute(ctx))
assert.NoError(t, task.Execute(ctx))
assert.Error(t, task.Execute(ctx))
assert.NoError(t, task.PostExecute(ctx))
}

Expand Down
Loading

0 comments on commit 4e95f38

Please sign in to comment.