Skip to content

Commit

Permalink
fix: fix lint and ut and error handling
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Aug 20, 2024
1 parent e2b29a4 commit e0a1d1c
Show file tree
Hide file tree
Showing 19 changed files with 91 additions and 58 deletions.
6 changes: 3 additions & 3 deletions internal/distributed/streaming/internal/errs/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

// All error in streamingservice package should be marked by streamingservice/errs package.
var (
ErrClosed = errors.New("closed")
ErrCanceled = errors.New("canceled")
ErrTxnUnavailable = errors.New("transaction unavailable")
ErrClosed = errors.New("closed")
ErrCanceledOrDeadlineExceed = errors.New("canceled or deadline exceed")
ErrUnrecoverable = errors.New("unrecoverable")
)
9 changes: 5 additions & 4 deletions internal/distributed/streaming/internal/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,14 @@ func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMess
}
// It's ok to stop retry if the error is canceled or deadline exceed.
if status.IsCanceled(err) {
return nil, errors.Mark(err, errs.ErrCanceled)
return nil, errors.Mark(err, errs.ErrCanceledOrDeadlineExceed)
}
if sErr := status.AsStreamingError(err); sErr != nil {
// if the error is txn unavailable, it cannot be retried forever.
// if the error is txn unavailable or unrecoverable error,
// it cannot be retried forever.
// we should mark it and return.
if sErr.IsTxnUnavilable() {
return nil, errors.Mark(err, errs.ErrTxnUnavailable)
if sErr.IsUnrecoverable() {
return nil, errors.Mark(err, errs.ErrUnrecoverable)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (p *producerWithResumingError) GetProducerAfterAvailable(ctx context.Contex
p.cond.L.Lock()
for p.err == nil && (p.producer == nil || !p.producer.IsAvailable()) {
if err := p.cond.Wait(ctx); err != nil {
return nil, errors.Mark(err, errs.ErrCanceled)
return nil, errors.Mark(err, errs.ErrCanceledOrDeadlineExceed)
}
}
err := p.err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestResumableProducer(t *testing.T) {
id, err = rp.Produce(ctx, msg)
assert.Nil(t, id)
assert.Error(t, err)
assert.True(t, errors.Is(err, errs.ErrCanceled))
assert.True(t, errors.Is(err, errs.ErrCanceledOrDeadlineExceed))

// Test the underlying handler close.
close(ch2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ func (s *FlusherSuite) SetupTest() {
scanner := mock_wal.NewMockScanner(s.T())

w := mock_wal.NewMockWAL(s.T())
w.EXPECT().WALName().Return("rocksmq")
w.EXPECT().WALName().Return("rocksmq").Maybe()
w.EXPECT().Read(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, option wal.ReadOption) (wal.Scanner, error) {
handlers.Insert(option.MesasgeHandler)
return scanner, nil
})
}).Maybe()

once := sync.Once{}
scanner.EXPECT().Close().RunAndReturn(func() error {
Expand All @@ -126,7 +126,7 @@ func (s *FlusherSuite) SetupTest() {
})
})
return nil
})
}).Maybe()

s.wal = w
m := mocks.NewChunkManager(s.T())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ import (
"go.uber.org/atomic"

"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/segment/mock_inspector"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

func TestSealedInspector(t *testing.T) {
paramtable.Init()
resource.InitForTest(t)

notifier := stats.NewSealSignalNotifier()
inspector := NewSealedInspector(notifier)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/policy"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
Expand Down Expand Up @@ -284,5 +285,5 @@ func (m *partitionSegmentManager) assignSegment(ctx context.Context, req *Assign
if inserted, ack := newGrowingSegment.AllocRows(ctx, req); inserted {
return &AssignSegmentResult{SegmentID: newGrowingSegment.GetSegmentID(), Acknowledge: ack}, nil
}
return nil, errors.Errorf("too large insert message, cannot hold in empty growing segment, stats: %+v", req.InsertMetrics)
return nil, status.NewUnrecoverableError("too large insert message, cannot hold in empty growing segment, stats: %+v", req.InsertMetrics)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
Expand Down Expand Up @@ -151,7 +152,7 @@ func (m *partitionSegmentManagers) NewPartition(collectionID int64, partitionID
func (m *partitionSegmentManagers) Get(collectionID int64, partitionID int64) (*partitionSegmentManager, error) {
pm, ok := m.managers.Get(partitionID)
if !ok {
return nil, errors.Errorf("partition %d in collection %d not found in segment assignment service", partitionID, collectionID)
return nil, status.NewUnrecoverableError("partition %d in collection %d not found in segment assignment service", partitionID, collectionID)
}
return pm, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (impl *segmentInterceptor) handleInsertMessage(ctx context.Context, msg mes
TxnSession: txn.GetTxnSessionFromContext(ctx),
})
if err != nil {
return nil, status.NewInner("segment assignment failure with error: %s", err.Error())
return nil, err
}
// once the segment assignment is done, we need to ack the result,
// if other partitions failed to assign segment or wal write failure,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ func TestDetail(t *testing.T) {
assert.Panics(t, func() {
newAckDetail(0, mock_message.NewMockMessageID(t))
})
msgID := mock_message.NewMockMessageID(t)
msgID.EXPECT().EQ(msgID).Return(true)
msgID := walimplstest.NewTestMessageID(1)

ackDetail := newAckDetail(1, msgID)
assert.Equal(t, uint64(1), ackDetail.BeginTimestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,15 @@ func (impl *timeTickAppendInterceptor) DoAppend(ctx context.Context, msg message

// GracefulClose implements InterceptorWithGracefulClose.
func (impl *timeTickAppendInterceptor) GracefulClose() {
log.Warn("timeTickAppendInterceptor is closing", zap.String("pchannel", impl.operator.pchannel.Name))
impl.txnManager.GracefulClose()
log.Warn("timeTickAppendInterceptor is closed", zap.String("pchannel", impl.operator.pchannel.Name))
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
logger := log.With(zap.Any("pchannel", impl.operator.pchannel))
logger.Info("timeTickAppendInterceptor is closing, try to perform a txn manager graceful shutdown")
if err := impl.txnManager.GracefulClose(ctx); err != nil {
logger.Warn("timeTickAppendInterceptor is closed", zap.Error(err))
return
}
logger.Info("txnManager of timeTickAppendInterceptor is graceful closed")
}

// Close implements AppendInterceptor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestManager(t *testing.T) {

closed := make(chan struct{})
go func() {
m.GracefulClose()
m.GracefulClose(context.Background())
close(closed)
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (m *TxnManager) GetSessionOfTxn(id message.TxnID) (*TxnSession, error) {
}

// GracefulClose waits for all transactions to be cleaned up.
func (m *TxnManager) GracefulClose() {
func (m *TxnManager) GracefulClose(ctx context.Context) error {
m.mu.Lock()
if m.closed == nil {
m.closed = make(chan struct{})
Expand All @@ -103,7 +103,9 @@ func (m *TxnManager) GracefulClose() {
m.mu.Unlock()

select {
case <-ctx.Done():
return ctx.Err()
case <-m.closed:
case <-time.After(5 * time.Second):
return nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,6 @@ func IsStreamingServiceEnabled() bool {
return os.Getenv(MilvusStreamingServiceEnabled) == "1"
}

// SetStreamingServiceEnabled set the env that indicates whether the streaming service is enabled.
func SetStreamingServiceEnabled() {
err := os.Setenv(MilvusStreamingServiceEnabled, "1")
if err != nil {
panic(err)
}
}

// UnsetStreamingServiceEnabled unsets the env that indicates whether the streaming service is enabled.
func UnsetStreamingServiceEnabled() {
err := os.Setenv(MilvusStreamingServiceEnabled, "0")
if err != nil {
panic(err)
}
}

// MustEnableStreamingService panics if the streaming service is not enabled.
func MustEnableStreamingService() {
if !IsStreamingServiceEnabled() {
Expand Down
11 changes: 11 additions & 0 deletions internal/util/streamingutil/status/streaming_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ func (e *StreamingError) IsSkippedOperation() bool {
e.Code == streamingpb.StreamingCode_STREAMING_CODE_UNMATCHED_CHANNEL_TERM
}

// IsUnrecoverable returns true if the error is unrecoverable.
// Stop resuming retry and report to user.
func (e *StreamingError) IsUnrecoverable() bool {
return e.Code == streamingpb.StreamingCode_STREAMING_CODE_UNRECOVERABLE || e.IsTxnUnavilable()
}

// IsTxnUnavilable returns true if the transaction is unavailable.
func (e *StreamingError) IsTxnUnavilable() bool {
return e.Code == streamingpb.StreamingCode_STREAMING_CODE_TRANSACTION_EXPIRED ||
Expand Down Expand Up @@ -105,6 +111,11 @@ func NewInvalidTransactionState(operation string, expectState message.TxnState,
return New(streamingpb.StreamingCode_STREAMING_CODE_INVALID_TRANSACTION_STATE, "invalid transaction state for operation %s, expect %s, current %s", operation, expectState, currentState)
}

// NewUnrecoverableError creates a new StreamingError with code STREAMING_CODE_UNRECOVERABLE.
func NewUnrecoverableError(format string, args ...interface{}) *StreamingError {
return New(streamingpb.StreamingCode_STREAMING_CODE_UNRECOVERABLE, format, args...)
}

// New creates a new StreamingError with the given code and cause.
func New(code streamingpb.StreamingCode, format string, args ...interface{}) *StreamingError {
if len(args) == 0 {
Expand Down
22 changes: 22 additions & 0 deletions internal/util/streamingutil/test_env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//go:build test
// +build test

package streamingutil

import "os"

// SetStreamingServiceEnabled set the env that indicates whether the streaming service is enabled.
func SetStreamingServiceEnabled() {
err := os.Setenv(MilvusStreamingServiceEnabled, "1")
if err != nil {
panic(err)
}
}

// UnsetStreamingServiceEnabled unsets the env that indicates whether the streaming service is enabled.
func UnsetStreamingServiceEnabled() {
err := os.Setenv(MilvusStreamingServiceEnabled, "0")
if err != nil {
panic(err)
}
}
1 change: 1 addition & 0 deletions pkg/streaming/proto/streaming.proto
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ enum StreamingCode {
STREAMING_CODE_INVAILD_ARGUMENT = 8; // invalid argument
STREAMING_CODE_TRANSACTION_EXPIRED = 9; // transaction expired
STREAMING_CODE_INVALID_TRANSACTION_STATE = 10; // invalid transaction state
STREAMING_CODE_UNRECOVERABLE = 11; // unrecoverable error
STREAMING_CODE_UNKNOWN = 999; // unknown error
}

Expand Down
16 changes: 14 additions & 2 deletions pkg/streaming/util/message/adaptor/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package adaptor

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
Expand Down Expand Up @@ -43,6 +45,9 @@ func TestMsgPackAdaptorHandler(t *testing.T) {
CollectionId: 1,
}).
WithBody(&msgpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
},
CollectionID: 1,
PartitionID: 1,
Timestamps: []uint64{10},
Expand Down Expand Up @@ -78,7 +83,13 @@ func TestMsgPackAdaptorHandler(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, msg)

txnCtx := message.TxnContext{
TxnID: 1,
Keepalive: time.Second,
}

beginImmutableMsg, err := message.AsImmutableBeginTxnMessageV2(msg.WithTimeTick(9).
WithTxnContext(txnCtx).
WithLastConfirmedUseMessageID().
IntoImmutableMessage(rmq.NewRmqID(2)))
assert.NoError(t, err)
Expand All @@ -90,14 +101,15 @@ func TestMsgPackAdaptorHandler(t *testing.T) {
BuildMutable()

commitImmutableMsg, err := message.AsImmutableCommitTxnMessageV2(msg.WithTimeTick(12).
WithTxnContext(txnCtx).
WithTxnContext(message.TxnContext{}).
WithLastConfirmedUseMessageID().
IntoImmutableMessage(rmq.NewRmqID(3)))
assert.NoError(t, err)

txn, err := message.NewImmutableTxnMessageBuilder(beginImmutableMsg).
Add(insertImmutableMessage).
Add(deleteImmutableMsg).
Add(insertMsg.WithTxnContext(txnCtx).IntoImmutableMessage(id)).
Add(deleteMsg.WithTxnContext(txnCtx).IntoImmutableMessage(id)).
Build(commitImmutableMsg)
assert.NoError(t, err)

Expand Down
22 changes: 5 additions & 17 deletions pkg/streaming/util/message/message_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package message_test

import (
"bytes"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
)
Expand Down Expand Up @@ -45,36 +43,26 @@ func TestMessage(t *testing.T) {
assert.Equal(t, uint64(123), mutableMessage.TimeTick())
assert.Equal(t, uint64(456), mutableMessage.BarrierTimeTick())

lcMsgID := mock_message.NewMockMessageID(t)
lcMsgID.EXPECT().Marshal().Return("lcMsgID")
lcMsgID := walimplstest.NewTestMessageID(1)
mutableMessage.WithLastConfirmed(lcMsgID)
v, ok = mutableMessage.Properties().Get("_lc")
assert.True(t, ok)
assert.Equal(t, v, "lcMsgID")
assert.Equal(t, v, "1")

v, ok = mutableMessage.Properties().Get("_vc")
assert.True(t, ok)
assert.Equal(t, "v1", v)
assert.Equal(t, "v1", mutableMessage.VChannel())

msgID := mock_message.NewMockMessageID(t)
msgID.EXPECT().EQ(msgID).Return(true)
msgID.EXPECT().WALName().Return("testMsgID")
message.RegisterMessageIDUnmsarshaler("testMsgID", func(data string) (message.MessageID, error) {
if data == "lcMsgID" {
return msgID, nil
}
panic(fmt.Sprintf("unexpected data: %s", data))
})

msgID := walimplstest.NewTestMessageID(1)
immutableMessage := message.NewImmutableMesasge(msgID,
[]byte("payload"),
map[string]string{
"key": "value",
"_t": "1",
"_tt": message.EncodeUint64(456),
"_v": "1",
"_lc": "lcMsgID",
"_lc": "1",
})

assert.True(t, immutableMessage.MessageID().EQ(msgID))
Expand All @@ -84,7 +72,7 @@ func TestMessage(t *testing.T) {
assert.Equal(t, "value", v)
assert.True(t, ok)
assert.Equal(t, message.MessageTypeTimeTick, immutableMessage.MessageType())
assert.Equal(t, 36, immutableMessage.EstimateSize())
assert.Equal(t, 30, immutableMessage.EstimateSize())
assert.Equal(t, message.Version(1), immutableMessage.Version())
assert.Equal(t, uint64(456), immutableMessage.TimeTick())
assert.NotNil(t, immutableMessage.LastConfirmedMessageID())
Expand Down

0 comments on commit e0a1d1c

Please sign in to comment.