Skip to content

Commit

Permalink
enhance: wal adaptor implementation (#34122)
Browse files Browse the repository at this point in the history
issue: #33285

- add adaptor to implement walimpls into wal interface.
- implement timetick sorted and filtering scanner.
- add test for wal.

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Jul 4, 2024
1 parent e4cece8 commit 7611128
Show file tree
Hide file tree
Showing 33 changed files with 2,341 additions and 30 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ require (
require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70

require (
github.com/cockroachdb/redact v1.1.3
github.com/greatroar/blobloom v0.0.0-00010101000000-000000000000
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -102,7 +103,6 @@ require (
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/cilium/ebpf v0.11.0 // indirect
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/confluentinc/confluent-kafka-go v1.9.1 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
Expand Down
302 changes: 302 additions & 0 deletions internal/mocks/google.golang.org/mock_grpc/mock_ClientStream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions internal/proto/streaming.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ message VChannelInfo {
string name = 1;
}

// DeliverPolicy is the policy to deliver message.
message DeliverPolicy {
oneof policy {
google.protobuf.Empty all = 1; // deliver all messages.
Expand All @@ -42,3 +43,48 @@ message DeliverPolicy {
MessageID start_after = 4; // deliver message after this message id. (startAfter, ...]
}
}

// DeliverFilter is the filter to deliver message.
message DeliverFilter {
oneof filter {
DeliverFilterTimeTickGT time_tick_gt = 1;
DeliverFilterTimeTickGTE time_tick_gte = 2;
DeliverFilterVChannel vchannel = 3;
}
}

// DeliverFilterTimeTickGT is the filter to deliver message with time tick greater than this value.
message DeliverFilterTimeTickGT {
uint64 time_tick = 1; // deliver message with time tick greater than this value.
}

// DeliverFilterTimeTickGTE is the filter to deliver message with time tick greater than or equal to this value.
message DeliverFilterTimeTickGTE {
uint64 time_tick = 1; // deliver message with time tick greater than or equal to this value.
}

// DeliverFilterVChannel is the filter to deliver message with vchannel name.
message DeliverFilterVChannel {
string vchannel = 1; // deliver message with vchannel name.
}

// StreamingCode is the error code for log internal component.
enum StreamingCode {
STREAMING_CODE_OK = 0;
STREAMING_CODE_CHANNEL_EXIST = 1; // channel already exist
STREAMING_CODE_CHANNEL_NOT_EXIST = 2; // channel not exist
STREAMING_CODE_CHANNEL_FENCED = 3; // channel is fenced
STREAMING_CODE_ON_SHUTDOWN = 4; // component is on shutdown
STREAMING_CODE_INVALID_REQUEST_SEQ = 5; // invalid request sequence
STREAMING_CODE_UNMATCHED_CHANNEL_TERM = 6; // unmatched channel term
STREAMING_CODE_IGNORED_OPERATION = 7; // ignored operation
STREAMING_CODE_INNER = 8; // underlying service failure.
STREAMING_CODE_EOF = 9; // end of stream, generated by grpc status.
STREAMING_CODE_UNKNOWN = 999; // unknown error
}

// StreamingError is the error type for log internal component.
message StreamingError {
StreamingCode code = 1;
string cause = 2;
}
Loading

0 comments on commit 7611128

Please sign in to comment.