Skip to content

Commit

Permalink
enhance: timetick interceptor optimization (#35287)
Browse files Browse the repository at this point in the history
issue: #33285

- remove redundant goroutine by using insepctor.
- remove the coutinous non-message timetick persistence
- periodically push the time tick forward without persistent timetick
message.
- add 'message type filter' deliver filter.

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Aug 12, 2024
1 parent 06f9ba2 commit 16b0aee
Show file tree
Hide file tree
Showing 58 changed files with 1,671 additions and 455 deletions.
4 changes: 3 additions & 1 deletion internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ packages:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector:
interfaces:
SealOperator:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector:
interfaces:
TimeTickSyncOperator:
google.golang.org/grpc:
interfaces:
ClientStream:
Expand Down Expand Up @@ -67,4 +70,3 @@ packages:
google.golang.org/grpc/balancer:
interfaces:
SubConn:

30 changes: 21 additions & 9 deletions internal/distributed/streaming/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/milvus-io/milvus/internal/distributed/streaming/internal/producer"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/funcutil"
)

Expand All @@ -18,17 +19,17 @@ func newAppendResponseN(n int) AppendResponses {

// AppendResponse is the response of one append operation.
type AppendResponse struct {
MessageID message.MessageID
Error error
AppendResult *types.AppendResult
Error error
}

// AppendResponses is the response of append operation.
type AppendResponses struct {
Responses []AppendResponse
}

// IsAnyError returns the first error in the responses.
func (a AppendResponses) IsAnyError() error {
// UnwrapFirstError returns the first error in the responses.
func (a AppendResponses) UnwrapFirstError() error {
for _, r := range a.Responses {
if r.Error != nil {
return r.Error
Expand All @@ -37,6 +38,17 @@ func (a AppendResponses) IsAnyError() error {
return nil
}

// MaxTimeTick returns the max time tick in the responses.
func (a AppendResponses) MaxTimeTick() uint64 {
maxTimeTick := uint64(0)
for _, r := range a.Responses {
if r.AppendResult.TimeTick > maxTimeTick {
maxTimeTick = r.AppendResult.TimeTick
}
}
return maxTimeTick
}

// fillAllError fills all the responses with the same error.
func (a *AppendResponses) fillAllError(err error) {
for i := range a.Responses {
Expand Down Expand Up @@ -122,10 +134,10 @@ func (w *walAccesserImpl) appendToPChannel(ctx context.Context, pchannel string,
// TODO: only the partition-key with high partition will generate many message in one time on the same pchannel,
// we should optimize the message-format, make it into one; but not the goroutine count.
if len(msgs) == 1 {
msgID, err := p.Produce(ctx, msgs[0])
produceResult, err := p.Produce(ctx, msgs[0])
resp.fillResponseAtIdx(AppendResponse{
MessageID: msgID,
Error: err,
AppendResult: produceResult,
Error: err,
}, 0)
return resp
}
Expand All @@ -144,8 +156,8 @@ func (w *walAccesserImpl) appendToPChannel(ctx context.Context, pchannel string,

mu.Lock()
resp.fillResponseAtIdx(AppendResponse{
MessageID: msgID,
Error: err,
AppendResult: msgID,
Error: err,
}, i)
mu.Unlock()
return struct{}{}, nil
Expand Down
6 changes: 3 additions & 3 deletions internal/distributed/streaming/internal/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type ResumableProducer struct {
}

// Produce produce a new message to log service.
func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (msgID message.MessageID, err error) {
func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (*producer.ProduceResult, error) {
if p.lifetime.Add(lifetime.IsWorking) != nil {
return nil, errors.Wrapf(errs.ErrClosed, "produce on closed producer")
}
Expand All @@ -89,9 +89,9 @@ func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMess
return nil, err
}

msgID, err := producerHandler.Produce(ctx, msg)
produceResult, err := producerHandler.Produce(ctx, msg)
if err == nil {
return msgID, nil
return produceResult, nil
}
// It's ok to stop retry if the error is canceled or deadline exceed.
if status.IsCanceled(err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
func TestResumableProducer(t *testing.T) {
p := mock_producer.NewMockProducer(t)
msgID := mock_message.NewMockMessageID(t)
p.EXPECT().Produce(mock.Anything, mock.Anything).Return(msgID, nil)
p.EXPECT().Produce(mock.Anything, mock.Anything).Return(&producer.ProduceResult{
MessageID: msgID,
TimeTick: 100,
}, nil)
p.EXPECT().Close().Return()
ch := make(chan struct{})
p.EXPECT().Available().Return(ch)
Expand All @@ -44,11 +47,14 @@ func TestResumableProducer(t *testing.T) {
} else if i == 2 {
p := mock_producer.NewMockProducer(t)
msgID := mock_message.NewMockMessageID(t)
p.EXPECT().Produce(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) {
p.EXPECT().Produce(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*producer.ProduceResult, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
return msgID, nil
return &producer.ProduceResult{
MessageID: msgID,
TimeTick: 100,
}, nil
})
p.EXPECT().Close().Return()
p.EXPECT().Available().Return(ch2)
Expand Down
6 changes: 3 additions & 3 deletions internal/distributed/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package streaming
import (
"context"

clientv3 "go.etcd.io/etcd/client/v3"

kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
)
Expand All @@ -17,7 +16,8 @@ func SetWAL(w WALAccesser) {

// Init initializes the wal accesser with the given etcd client.
// should be called before any other operations.
func Init(c *clientv3.Client) {
func Init() {
c, _ := kvfactory.GetEtcdAndPath()
singleton = newWALAccesser(c)
}

Expand Down
4 changes: 1 addition & 3 deletions internal/distributed/streaming/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"
Expand All @@ -19,8 +18,7 @@ const vChannel = "by-dev-rootcoord-dml_4"

func TestMain(m *testing.M) {
paramtable.Init()
etcd, _ := kvfactory.GetEtcdAndPath()
streaming.Init(etcd)
streaming.Init()
defer streaming.Release()
m.Run()
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions internal/mocks/streamingnode/server/mock_wal/mock_WAL.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 16b0aee

Please sign in to comment.