Skip to content

Commit

Permalink
enhance: enable streaming
Browse files Browse the repository at this point in the history
- fixup flusher bugs and refactor the flush operation
- enable streaming service for dml and ddl
- pass the e2e when enabling streaming service
- pass the integration tst when enabling streaming service

Signed-off-by: chyezh <[email protected]>
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
chyezh committed Aug 20, 2024
1 parent 22ced01 commit e2b29a4
Show file tree
Hide file tree
Showing 84 changed files with 2,554 additions and 537 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ generate-mockery-chunk-manager: getdeps
generate-mockery-pkg:
$(MAKE) -C pkg generate-mockery

generate-mockery-internal:
generate-mockery-internal: getdeps
$(INSTALL_PATH)/mockery --config $(PWD)/internal/.mockery.yaml

generate-mockery: generate-mockery-types generate-mockery-kv generate-mockery-rootcoord generate-mockery-proxy generate-mockery-querycoord generate-mockery-querynode generate-mockery-datacoord generate-mockery-pkg generate-mockery-internal
Expand Down
8 changes: 8 additions & 0 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/cmd/components"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/http"
"github.com/milvus-io/milvus/internal/http/healthz"
"github.com/milvus-io/milvus/internal/util/dependency"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/internal/util/initcore"
internalmetrics "github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -377,6 +379,12 @@ func (mr *MilvusRoles) Run() {
paramtable.SetRole(mr.ServerType)
}

// Initialize streaming service if enabled.
if streamingutil.IsStreamingServiceEnabled() {
streaming.Init()
defer streaming.Release()
}

expr.Init()
expr.Register("param", paramtable.Get())
mr.setupLogger()
Expand Down
56 changes: 30 additions & 26 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ etcd:
dir: default.etcd # Embedded Etcd only. please adjust in embedded Milvus: /tmp/milvus/etcdData/
auth:
enabled: false # Whether to enable authentication
userName: # username for etcd authentication
password: # password for etcd authentication
userName: # username for etcd authentication
password: # password for etcd authentication

metastore:
type: etcd # Default value: etcd, Valid values: [etcd, tikv]
type: etcd # Default value: etcd, Valid values: [etcd, tikv]

# Related configuration of tikv, used to store Milvus metadata.
# Notice that when TiKV is enabled for metastore, you still need to have etcd for service discovery.
Expand All @@ -77,9 +77,9 @@ tikv:
snapshotScanSize: 256 # batch size of tikv snapshot scan
ssl:
enabled: false # Whether to support TiKV secure connection mode
tlsCert: # path to your cert file
tlsKey: # path to your key file
tlsCACert: # path to your CACert file
tlsCert: # path to your cert file
tlsKey: # path to your key file
tlsCACert: # path to your CACert file

localStorage:
# Local path to where vector data are stored during a search or a query to avoid repetitve access to MinIO or S3 service.
Expand Down Expand Up @@ -140,12 +140,12 @@ minio:
cloudProvider: aws
# Custom endpoint for fetch IAM role credentials. when useIAM is true & cloudProvider is "aws".
# Leave it empty if you want to use AWS default endpoint
iamEndpoint:
iamEndpoint:
logLevel: fatal # Log level for aws sdk log. Supported level: off, fatal, error, warn, info, debug, trace
region: # Specify minio storage system location region
region: # Specify minio storage system location region
useVirtualHost: false # Whether use virtual host mode for bucket
requestTimeoutMs: 10000 # minio timeout for request time in milliseconds
# The maximum number of objects requested per batch in minio ListObjects rpc,
# The maximum number of objects requested per batch in minio ListObjects rpc,
# 0 means using oss client by default, decrease these configration if ListObjects timeout
listObjectsMaxKeys: 0

Expand All @@ -157,7 +157,7 @@ minio:
mq:
# Default value: "default"
# Valid values: [default, pulsar, kafka, rocksmq, natsmq]
type: default
type: pulsar
enablePursuitMode: true # Default value: "true"
pursuitLag: 10 # time tick lag threshold to enter pursuit mode, in seconds
pursuitBufferSize: 8388608 # pursuit mode buffer size in bytes
Expand Down Expand Up @@ -190,11 +190,11 @@ pulsar:

# If you want to enable kafka, needs to comment the pulsar configs
# kafka:
# brokerList:
# saslUsername:
# saslPassword:
# saslMechanisms:
# securityProtocol:
# brokerList:
# saslUsername:
# saslPassword:
# saslMechanisms:
# securityProtocol:
# ssl:
# enabled: false # whether to enable ssl mode
# tlsCert: # path to client's public key (PEM) used for authentication
Expand Down Expand Up @@ -234,8 +234,8 @@ natsmq:
logSizeLimit: 536870912 # Size in bytes after the log file rolls over to a new one
retention:
maxAge: 4320 # Maximum age of any message in the P-channel
maxBytes: # How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size
maxMsgs: # How many message the single P-channel may contain. Removing oldest messages if the P-channel exceeds this limit
maxBytes: # How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size
maxMsgs: # How many message the single P-channel may contain. Removing oldest messages if the P-channel exceeds this limit

# Related configuration of rootCoord, used to handle data definition language (DDL) and data control language (DCL) requests
rootCoord:
Expand Down Expand Up @@ -304,7 +304,7 @@ proxy:
http:
enabled: true # Whether to enable the http server
debug_mode: false # Whether to enable http server debug mode
port: # high-level restful api
port: # high-level restful api
acceptTypeAllowInt64: true # high-level restful api, whether http client can deal with int64
enablePprof: true # Whether to enable pprof middleware on the metrics port
ip: # TCP/IP address of proxy. If not specified, use the first unicastable address
Expand Down Expand Up @@ -401,9 +401,9 @@ queryNode:
enabled: true
memoryLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024
readAheadPolicy: willneed # The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed`
# options: async, sync, disable.
# Specifies the necessity for warming up the chunk cache.
# 1. If set to "sync" or "async" the original vector data will be synchronously/asynchronously loaded into the
# options: async, sync, disable.
# Specifies the necessity for warming up the chunk cache.
# 1. If set to "sync" or "async" the original vector data will be synchronously/asynchronously loaded into the
# chunk cache during the load process. This approach has the potential to substantially reduce query/search latency
# for a specific duration post-load, albeit accompanied by a concurrent increase in disk usage;
# 2. If set to "disable" original vector data will only be loaded into the chunk cache during search/query.
Expand Down Expand Up @@ -516,11 +516,11 @@ dataCoord:
compactableProportion: 0.85
# over (compactableProportion * segment max # of rows) rows.
# MUST BE GREATER THAN OR EQUAL TO <smallProportion>!!!
# During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%.
# During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%.
expansionRate: 1.25
sealPolicy:
channel:
# The size threshold in MB, if the total size of growing segments of each shard
# The size threshold in MB, if the total size of growing segments of each shard
# exceeds this threshold, the largest growing segment will be sealed.
growingSegmentsMemSize: 4096
autoUpgradeSegmentIndex: false # whether auto upgrade segment index to index engine's version
Expand Down Expand Up @@ -698,11 +698,15 @@ msgChannel:
# It is recommended to change this parameter before starting Milvus for the first time.
dataNodeSubNamePrefix: dataNode

streamingNode:
ip: # if not specified, use the first unicastable address
port: 22222

# Configures the system log output.
log:
# Milvus log level. Option: debug, info, warn, error, panic, and fatal.
# It is recommended to use debug level under test and development environments, and info level in production environment.
level: info
level: debug
file:
# Root path to the log files.
# The default value is set empty, indicating to output log files to standard output (stdout) and standard error (stderr).
Expand Down Expand Up @@ -768,7 +772,7 @@ common:
authorizationEnabled: false
# The superusers will ignore some system check processes,
# like the old password verification when updating the credential
superUsers:
superUsers:
defaultRootPassword: Milvus # default password for root user
tlsMode: 0
session:
Expand Down Expand Up @@ -1013,7 +1017,7 @@ trace:
# Fractions >= 1 will always sample. Fractions < 0 are treated as zero.
sampleFraction: 0
jaeger:
url: # when exporter is jaeger should set the jaeger's URL
url: # when exporter is jaeger should set the jaeger's URL
otlp:
endpoint: # example: "127.0.0.1:4317" for grpc, "127.0.0.1:4318" for http
method: # otlp export method, acceptable values: ["grpc", "http"], using "grpc" by default
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.7
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240815123953-6dab6fcd6454
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240820032106-b34be93a2271
github.com/minio/minio-go/v7 v7.0.61
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240815123953-6dab6fcd6454 h1:JmZCYjMPpiE4ksZw0AUxXWkDY7wwA4fhS+SO1N211Vw=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240815123953-6dab6fcd6454/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240820032106-b34be93a2271 h1:YUWBgtRHmvkxMPTfOrY3FIq0K5XHw02Z18z7cyaMH04=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240820032106-b34be93a2271/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
9 changes: 8 additions & 1 deletion internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
Expand Down Expand Up @@ -159,7 +160,7 @@ func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes
m.finishRemoveChannel(info.NodeID, lo.Values(info.Channels)...)
}

if m.balanceCheckLoop != nil {
if m.balanceCheckLoop != nil && !streamingutil.IsStreamingServiceEnabled() {
log.Info("starting channel balance loop")
m.wg.Add(1)
go func() {
Expand Down Expand Up @@ -328,6 +329,12 @@ func (m *ChannelManagerImpl) Balance() {
}

func (m *ChannelManagerImpl) Match(nodeID UniqueID, channel string) bool {
if streamingutil.IsStreamingServiceEnabled() {
// Skip the channel matching check since the
// channel manager no longer manages channels in streaming mode.
return true
}

m.mu.RLock()
defer m.mu.RUnlock()

Expand Down
5 changes: 4 additions & 1 deletion internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,10 @@ func (s *Server) startServerLoop() {
go s.importScheduler.Start()
go s.importChecker.Start()
s.garbageCollector.start()
s.syncSegmentsScheduler.Start()

if !streamingutil.IsStreamingServiceEnabled() {
s.syncSegmentsScheduler.Start()
}
}

func (s *Server) updateSegmentStatistics(stats []*commonpb.SegmentStats) {
Expand Down
71 changes: 41 additions & 30 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -111,14 +112,16 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
}
timeOfSeal, _ := tsoutil.ParseTS(ts)

sealedSegmentIDs, err := s.segmentManager.SealAllSegments(ctx, req.GetCollectionID(), req.GetSegmentIDs())
if err != nil {
return &datapb.FlushResponse{
Status: merr.Status(errors.Wrapf(err, "failed to flush collection %d",
req.GetCollectionID())),
}, nil
sealedSegmentIDs := make([]int64, 0)
if !streamingutil.IsStreamingServiceEnabled() {
var err error
if sealedSegmentIDs, err = s.segmentManager.SealAllSegments(ctx, req.GetCollectionID(), req.GetSegmentIDs()); err != nil {
return &datapb.FlushResponse{
Status: merr.Status(errors.Wrapf(err, "failed to flush collection %d",
req.GetCollectionID())),
}, nil
}
}

sealedSegmentsIDDict := make(map[UniqueID]bool)
for _, sealedSegmentID := range sealedSegmentIDs {
sealedSegmentsIDDict[sealedSegmentID] = true
Expand All @@ -135,33 +138,35 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
}
}

var isUnimplemented bool
err = retry.Do(ctx, func() error {
nodeChannels := s.channelManager.GetNodeChannelsByCollectionID(req.GetCollectionID())
if !streamingutil.IsStreamingServiceEnabled() {
var isUnimplemented bool
err = retry.Do(ctx, func() error {
nodeChannels := s.channelManager.GetNodeChannelsByCollectionID(req.GetCollectionID())

for nodeID, channelNames := range nodeChannels {
err = s.cluster.FlushChannels(ctx, nodeID, ts, channelNames)
if err != nil && errors.Is(err, merr.ErrServiceUnimplemented) {
isUnimplemented = true
return nil
}
if err != nil {
return err
for nodeID, channelNames := range nodeChannels {
err = s.cluster.FlushChannels(ctx, nodeID, ts, channelNames)
if err != nil && errors.Is(err, merr.ErrServiceUnimplemented) {
isUnimplemented = true
return nil
}
if err != nil {
return err
}
}
return nil
}, retry.Attempts(60)) // about 3min
if err != nil {
return &datapb.FlushResponse{
Status: merr.Status(err),
}, nil
}
return nil
}, retry.Attempts(60)) // about 3min
if err != nil {
return &datapb.FlushResponse{
Status: merr.Status(err),
}, nil
}

if isUnimplemented {
// For compatible with rolling upgrade from version 2.2.x,
// fall back to the flush logic of version 2.2.x;
log.Warn("DataNode FlushChannels unimplemented", zap.Error(err))
ts = 0
if isUnimplemented {
// For compatible with rolling upgrade from version 2.2.x,
// fall back to the flush logic of version 2.2.x;
log.Warn("DataNode FlushChannels unimplemented", zap.Error(err))
ts = 0
}
}

log.Info("flush response with segments",
Expand Down Expand Up @@ -255,6 +260,12 @@ func (s *Server) AllocSegment(ctx context.Context, req *datapb.AllocSegmentReque
return &datapb.AllocSegmentResponse{Status: merr.Status(merr.ErrParameterInvalid)}, nil
}

// refresh the meta of the collection.
_, err := s.handler.GetCollection(ctx, req.GetCollectionId())
if err != nil {
return &datapb.AllocSegmentResponse{Status: merr.Status(err)}, nil
}

// Alloc new growing segment and return the segment info.
segmentInfo, err := s.segmentManager.AllocNewGrowingSegment(ctx, req.GetCollectionId(), req.GetPartitionId(), req.GetSegmentId(), req.GetVchannel())
if err != nil {
Expand Down
21 changes: 12 additions & 9 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -308,20 +309,22 @@ func (node *DataNode) Start() error {
return
}

node.writeBufferManager.Start()
if !streamingutil.IsStreamingServiceEnabled() {
node.writeBufferManager.Start()

go node.compactionExecutor.Start(node.ctx)
node.timeTickSender = util2.NewTimeTickSender(node.broker, node.session.ServerID,
retry.Attempts(20), retry.Sleep(time.Millisecond*100))
node.timeTickSender.Start()

go node.importScheduler.Start()
node.channelManager = channel.NewChannelManager(getPipelineParams(node), node.flowgraphManager)
node.channelManager.Start()

node.timeTickSender = util2.NewTimeTickSender(node.broker, node.session.ServerID,
retry.Attempts(20), retry.Sleep(time.Millisecond*100))
node.timeTickSender.Start()
go node.channelCheckpointUpdater.Start()
}

go node.channelCheckpointUpdater.Start()
go node.compactionExecutor.Start(node.ctx)

node.channelManager = channel.NewChannelManager(getPipelineParams(node), node.flowgraphManager)
node.channelManager.Start()
go node.importScheduler.Start()

node.UpdateStateCode(commonpb.StateCode_Healthy)
})
Expand Down
Loading

0 comments on commit e2b29a4

Please sign in to comment.