diff --git a/internal/streamingnode/client/handler/handler_client_impl.go b/internal/streamingnode/client/handler/handler_client_impl.go index 23f1f0f047568..ab8382d6bfe45 100644 --- a/internal/streamingnode/client/handler/handler_client_impl.go +++ b/internal/streamingnode/client/handler/handler_client_impl.go @@ -4,10 +4,10 @@ import ( "context" "time" - "go.uber.org/zap" - "github.com/cenkalti/backoff/v4" "github.com/cockroachdb/errors" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/proto/streamingpb" "github.com/milvus-io/milvus/internal/streamingnode/client/handler/assignment" "github.com/milvus-io/milvus/internal/streamingnode/client/handler/consumer" diff --git a/internal/streamingnode/server/resource/idalloc/test_mock_root_coord_client.go b/internal/streamingnode/server/resource/idalloc/test_mock_root_coord_client.go index aac552af2ca72..72d75d9db76e2 100644 --- a/internal/streamingnode/server/resource/idalloc/test_mock_root_coord_client.go +++ b/internal/streamingnode/server/resource/idalloc/test_mock_root_coord_client.go @@ -12,9 +12,9 @@ import ( "go.uber.org/atomic" "google.golang.org/grpc" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/pkg/util/merr" ) func NewMockRootCoordClient(t *testing.T) *mocks.MockRootCoordClient { @@ -27,9 +27,7 @@ func NewMockRootCoordClient(t *testing.T) *mocks.MockRootCoordClient { } c := counter.Add(uint64(atr.Count)) return &rootcoordpb.AllocTimestampResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, + Status: merr.Success(), Timestamp: c - uint64(atr.Count), Count: atr.Count, }, nil @@ -42,11 +40,9 @@ func NewMockRootCoordClient(t *testing.T) *mocks.MockRootCoordClient { } c := counter.Add(uint64(atr.Count)) return &rootcoordpb.AllocIDResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - ID: int64(c - uint64(atr.Count)), - Count: atr.Count, + Status: merr.Success(), + ID: int64(c - uint64(atr.Count)), + Count: atr.Count, }, nil }, ).Maybe() diff --git a/internal/util/streamingutil/service/attributes/attributes_test.go b/internal/util/streamingutil/service/attributes/attributes_test.go index 033af6f450062..0f629b1433b5c 100644 --- a/internal/util/streamingutil/service/attributes/attributes_test.go +++ b/internal/util/streamingutil/service/attributes/attributes_test.go @@ -3,9 +3,10 @@ package attributes import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/streaming/util/types" - "github.com/stretchr/testify/assert" ) func TestAttributes(t *testing.T) { @@ -38,6 +39,5 @@ func TestAttributes(t *testing.T) { attr = new(Attributes) attr = WithServerID(attr, 1) - serverID = GetServerID(attr) assert.Equal(t, int64(1), *GetServerID(attr)) } diff --git a/internal/util/streamingutil/service/interceptor/server.go b/internal/util/streamingutil/service/interceptor/server.go index 1b6e5c76c9065..5a10a01f633a3 100644 --- a/internal/util/streamingutil/service/interceptor/server.go +++ b/internal/util/streamingutil/service/interceptor/server.go @@ -15,7 +15,7 @@ func NewStreamingServiceUnaryServerInterceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { resp, err := handler(ctx, req) if err == nil { - return resp, err + return resp, nil } // Streaming Service Method should be overwrite the response error code. if strings.HasPrefix(info.FullMethod, streamingpb.ServiceMethodPrefix) { @@ -35,7 +35,7 @@ func NewStreamingServiceStreamServerInterceptor() grpc.StreamServerInterceptor { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { err := handler(srv, ss) if err == nil { - return err + return nil } // Streaming Service Method should be overwrite the response error code. diff --git a/internal/util/streamingutil/service/resolver/builder.go b/internal/util/streamingutil/service/resolver/builder.go index ed4c3b4b99d5f..165a5381c256a 100644 --- a/internal/util/streamingutil/service/resolver/builder.go +++ b/internal/util/streamingutil/service/resolver/builder.go @@ -1,9 +1,9 @@ package resolver import ( - "errors" "time" + "github.com/cockroachdb/errors" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "google.golang.org/grpc/resolver" diff --git a/internal/util/streamingutil/service/resolver/builder_test.go b/internal/util/streamingutil/service/resolver/builder_test.go index 9f58762a71269..423232d6f1a5f 100644 --- a/internal/util/streamingutil/service/resolver/builder_test.go +++ b/internal/util/streamingutil/service/resolver/builder_test.go @@ -4,13 +4,14 @@ import ( "context" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc/resolver" + "github.com/milvus-io/milvus/internal/mocks/google.golang.org/grpc/mock_resolver" "github.com/milvus-io/milvus/internal/mocks/util/streamingutil/service/mock_discoverer" "github.com/milvus-io/milvus/internal/util/streamingutil/service/discoverer" "github.com/milvus-io/milvus/pkg/util/typeutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "google.golang.org/grpc/resolver" ) func TestNewBuilder(t *testing.T) { diff --git a/internal/util/streamingutil/service/resolver/resolver.go b/internal/util/streamingutil/service/resolver/resolver.go index 10113d7baf25c..3200c82846dc1 100644 --- a/internal/util/streamingutil/service/resolver/resolver.go +++ b/internal/util/streamingutil/service/resolver/resolver.go @@ -2,8 +2,8 @@ package resolver import ( "context" - "errors" + "github.com/cockroachdb/errors" "google.golang.org/grpc/resolver" "github.com/milvus-io/milvus/internal/util/streamingutil/service/discoverer"