Skip to content

Commit

Permalink
enhance: implement rmq and pulsar as wal (#34046)
Browse files Browse the repository at this point in the history
issue: #33285

- use reader but not consumer for pulsar
- advanced test framework
- move some streaming related package into pkg

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Jun 27, 2024
1 parent be23495 commit d2bc4a5
Show file tree
Hide file tree
Showing 76 changed files with 1,231 additions and 288 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/mocks/streamingnode/server/mock_wal/mock_WAL.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions internal/proto/streaming.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ message Message {
message PChannelInfo {
string name = 1; // channel name
int64 term = 2; // A monotonic increasing term, every time the channel is recovered or moved to another streamingnode, the term will increase by meta server.
int64 serverID = 3; // The log node id address of the channel.
repeated VChannelInfo vChannelInfos = 4; // PChannel related vchannels.
int64 server_id = 3; // The log node id address of the channel.
}

// VChannelInfo is the information of a vchannel info.
Expand All @@ -39,7 +38,7 @@ message DeliverPolicy {
oneof policy {
google.protobuf.Empty all = 1; // deliver all messages.
google.protobuf.Empty latest = 2; // deliver the latest message.
MessageID startFrom = 3; // deliver message from this message id. [startFrom, ...]
MessageID startAfter = 4; // deliver message after this message id. (startAfter, ...]
MessageID start_from = 3; // deliver message from this message id. [startFrom, ...]
MessageID start_after = 4; // deliver message after this message id. (startAfter, ...]
}
}
26 changes: 17 additions & 9 deletions internal/streamingnode/server/wal/RAEDME.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
# WAL

`wal` package is the basic defination of wal interface of milvus streamingnode.
`wal` use `github.com/milvus-io/milvus/pkg/streaming/walimpls` to implement the final wal service.

## Project arrangement

- `/`: only define exposed interfaces.
- `/walimpls/`: define the underlying message system interfaces need to be implemented.
- `/registry/`: A static lifetime registry to regsiter new implementation for inverting dependency.
- `/adaptor/`: adaptors to implement `wal` interface from `walimpls` interface
- `/helper/`: A utility used to help developer to implement `walimpls` conveniently.
- `/utility/`: A utility code for common logic or data structure.
- `wal`
- `/`: only define exposed interfaces.
- `/adaptor/`: adaptors to implement `wal` interface from `walimpls` interface
- `/utility/`: A utility code for common logic or data structure.
- `github.com/milvus-io/milvus/pkg/streaming/walimpls`
- `/`: define the underlying message system interfaces need to be implemented.
- `/registry/`: A static lifetime registry to regsiter new implementation for inverting dependency.
- `/helper/`: A utility used to help developer to implement `walimpls` conveniently.
- `/impls/`: A official implemented walimpls sets.

## Lifetime Of Interfaces

Expand All @@ -20,17 +24,19 @@

## Add New Implemetation Of WAL

developper who want to add a new implementation of `wal` should implements the `walimpls` package interfaces. following interfaces is required:
developper who want to add a new implementation of `wal` should implements the `github.com/milvus-io/milvus/pkg/streaming/walimpls` package interfaces. following interfaces is required:

- `walimpls.OpenerBuilderImpls`
- `walimpls.OpenerImpls`
- `walimpls.ScannerImpls`
- `walimpls.WALImpls`

`OpenerBuilderImpls` create `OpenerImpls`; `OpenerImpls` creates `WALImpls`; `WALImpls` create `ScannerImpls`.
Then register the implmentation of `walimpls.OpenerBuilderImpls` into `registry` package.
Then register the implmentation of `walimpls.OpenerBuilderImpls` into `github.com/milvus-io/milvus/pkg/streaming/walimpls/registry` package.

```
import "github.com/milvus-io/milvus/pkg/streaming/walimpls/registry"
var _ OpenerBuilderImpls = b{};
registry.RegisterBuilder(b{})
```
Expand All @@ -40,8 +46,10 @@ All things have been done.
## Use WAL

```
import "github.com/milvus-io/milvus/internal/streamingnode/server/wal/registry"
name := "your builder name"
var yourCh *streamingpb.PChannelInfo
var yourCh *options.PChannelInfo
opener, err := registry.MustGetBuilder(name).Build()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/streamingnode/server/wal/adaptor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package adaptor

import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
)

var _ wal.OpenerBuilder = (*builderAdaptorImpl)(nil)
Expand Down
2 changes: 1 addition & 1 deletion internal/streamingnode/server/wal/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"

"github.com/milvus-io/milvus/internal/proto/streamingpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
)

// OpenerBuilder is the interface for build wal opener.
Expand Down
24 changes: 0 additions & 24 deletions internal/streamingnode/server/wal/helper/wal_helper_test.go

This file was deleted.

26 changes: 3 additions & 23 deletions internal/streamingnode/server/wal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,11 @@ package registry
import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/adaptor"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/registry"
)

// builders is a map of registered wal builders.
var builders typeutil.ConcurrentMap[string, wal.OpenerBuilder]

// Register registers the wal builder.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), name of builder is lowercase. If multiple Builder are
// registered with the same name, panic will occur.
func RegisterBuilder(b walimpls.OpenerBuilderImpls) {
bb := adaptor.AdaptImplsToBuilder(b)
_, loaded := builders.GetOrInsert(bb.Name(), bb)
if loaded {
panic("wal builder already registered: " + b.Name())
}
}

// MustGetBuilder returns the wal builder by name.
func MustGetBuilder(name string) wal.OpenerBuilder {
b, ok := builders.Get(name)
if !ok {
panic("wal builder not found: " + name)
}
return b
b := registry.MustGetBuilder(name)
return adaptor.AdaptImplsToBuilder(b)
}
4 changes: 2 additions & 2 deletions internal/streamingnode/server/wal/scanner.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package wal

import (
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
"github.com/milvus-io/milvus/internal/util/streamingutil/options"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
)

// ReadOption is the option for reading records from the wal.
Expand Down
7 changes: 3 additions & 4 deletions internal/streamingnode/server/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ package wal
import (
"context"

"github.com/milvus-io/milvus/internal/proto/streamingpb"
"github.com/milvus-io/milvus/internal/util/streamingutil/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)

// WAL is the WAL framework interface.
// !!! Don't implement it directly, implement walimpls.WAL instead.
type WAL interface {
// Channel returns the channel assignment info of the wal.
// Should be read-only.
Channel() *streamingpb.PChannelInfo
Channel() types.PChannelInfo

// Append writes a record to the log.
Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error)
Expand Down
52 changes: 0 additions & 52 deletions internal/streamingnode/server/wal/walimplstest/wal.go

This file was deleted.

17 changes: 1 addition & 16 deletions internal/streamingservice/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,9 @@ dir: "internal/mocks/{{trimPrefix .PackagePath \"github.com/milvus-io/milvus/int
mockname: "Mock{{.InterfaceName}}"
outpkg: "mock_{{.PackageName}}"
packages:
github.com/milvus-io/milvus/internal/util/streamingutil/message:
interfaces:
MessageID:
ImmutableMessage:
MutableMessage:
RProperties:
github.com/milvus-io/milvus/internal/streamingnode/server/wal:
interfaces:
OpenerBuilder:
Opener:
Scanner:
WAL:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls:
interfaces:
OpenerBuilderImpls:
OpenerImpls:
ScannerImpls:
WALImpls:
Interceptor:
InterceptorWithReady:
InterceptorBuilder:
WAL:
45 changes: 0 additions & 45 deletions internal/util/streamingutil/options/deliver.go

This file was deleted.

5 changes: 4 additions & 1 deletion pkg/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ INSTALL_PATH := $(ROOTPATH)/bin
getdeps:
$(MAKE) -C $(ROOTPATH) getdeps

generate-mockery: getdeps
generate-mockery: getdeps generate-mockery-streaming
$(INSTALL_PATH)/mockery --name=MsgStream --dir=$(PWD)/mq/msgstream --output=$(PWD)/mq/msgstream --filename=mock_msgstream.go --with-expecter --structname=MockMsgStream --outpkg=msgstream --inpackage
$(INSTALL_PATH)/mockery --name=Factory --dir=$(PWD)/mq/msgstream --output=$(PWD)/mq/msgstream --filename=mock_msgstream_factory.go --with-expecter --structname=MockFactory --outpkg=msgstream --inpackage
$(INSTALL_PATH)/mockery --name=Client --dir=$(PWD)/mq/msgdispatcher --output=$(PWD)/mq/msgsdispatcher --filename=mock_client.go --with-expecter --structname=MockClient --outpkg=msgdispatcher --inpackage
$(INSTALL_PATH)/mockery --name=Logger --dir=$(PWD)/eventlog --output=$(PWD)/eventlog --filename=mock_logger.go --with-expecter --structname=MockLogger --outpkg=eventlog --inpackage
$(INSTALL_PATH)/mockery --name=MessageID --dir=$(PWD)/mq/msgstream/mqwrapper --output=$(PWD)/mq/msgstream/mqwrapper --filename=mock_id.go --with-expecter --structname=MockMessageID --outpkg=mqwrapper --inpackage

generate-mockery-streaming: getdeps
$(INSTALL_PATH)/mockery --config $(PWD)/streaming/.mockery.yaml
1 change: 1 addition & 0 deletions pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/panjf2000/ants/v2 v2.7.2
github.com/prometheus/client_golang v1.14.0
github.com/quasilyte/go-ruleguard/dsl v0.3.22
github.com/remeh/sizedwaitgroup v1.0.0
github.com/samber/lo v1.27.0
github.com/sasha-s/go-deadlock v0.3.1
github.com/shirou/gopsutil/v3 v3.22.9
Expand Down
2 changes: 2 additions & 0 deletions pkg/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,8 @@ github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/quasilyte/go-ruleguard/dsl v0.3.22 h1:wd8zkOhSNr+I+8Qeciml08ivDt1pSXe60+5DqOpCjPE=
github.com/quasilyte/go-ruleguard/dsl v0.3.22/go.mod h1:KeCP03KrjuSO0H1kTuZQCWlQPulDV6YMIXmpQss17rU=
github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E=
github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ=
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d2bc4a5

Please sign in to comment.