Skip to content

Commit

Permalink
separated InternalTls configs from grpc configs
Browse files Browse the repository at this point in the history
Signed-off-by: Nischay <[email protected]>
  • Loading branch information
nish112022 committed Nov 4, 2024
1 parent b1a62fa commit c4742ff
Show file tree
Hide file tree
Showing 19 changed files with 148 additions and 152 deletions.
73 changes: 36 additions & 37 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,17 @@ minio:
# aliyun (ecs): https://www.alibabacloud.com/help/en/elastic-compute-service/latest/attach-an-instance-ram-role
useIAM: false
# Cloud Provider of S3. Supports: "aws", "gcp", "aliyun".
# Cloud Provider of Google Cloud Storage. Supports: "gcpnative".
# You can use "aws" for other cloud provider supports S3 API with signature v4, e.g.: minio
# You can use "gcp" for other cloud provider supports S3 API with signature v2
# You can use "aliyun" for other cloud provider uses virtual host style bucket
# You can use "gcpnative" for the Google Cloud Platform provider. Uses service account credentials
# for authentication.
# When useIAM enabled, only "aws", "gcp", "aliyun" is supported for now
cloudProvider: aws
# The JSON content contains the gcs service account credentials.
# Used only for the "gcpnative" cloud provider.
gcpCredentialJSON:
# Custom endpoint for fetch IAM role credentials. when useIAM is true & cloudProvider is "aws".
# Leave it empty if you want to use AWS default endpoint
iamEndpoint:
Expand All @@ -164,6 +170,7 @@ mq:
enablePursuitMode: true # Default value: "true"
pursuitLag: 10 # time tick lag threshold to enter pursuit mode, in seconds
pursuitBufferSize: 8388608 # pursuit mode buffer size in bytes
pursuitBufferTime: 60 # pursuit mode buffer time in seconds
mqBufSize: 16 # MQ client consumer buffer length
dispatcher:
mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge
Expand All @@ -181,9 +188,9 @@ pulsar:
port: 6650 # Port of Pulsar service.
webport: 80 # Web port of of Pulsar service. If you connect direcly without proxy, should use 8080.
# The maximum size of each message in Pulsar. Unit: Byte.
# By default, Pulsar can transmit at most 5 MB of data in a single message. When the size of inserted data is greater than this value, proxy fragments the data into multiple messages to ensure that they can be transmitted correctly.
# By default, Pulsar can transmit at most 2MB of data in a single message. When the size of inserted data is greater than this value, proxy fragments the data into multiple messages to ensure that they can be transmitted correctly.
# If the corresponding parameter in Pulsar remains unchanged, increasing this configuration will cause Milvus to fail, and reducing it produces no advantage.
maxMessageSize: 5242880
maxMessageSize: 2097152
# Pulsar can be provisioned for specific tenants with appropriate capacity allocated to the tenant.
# To share a Pulsar instance among multiple Milvus instances, you can change this to an Pulsar tenant rather than the default one for each Milvus instance before you start them. However, if you do not want Pulsar multi-tenancy, you are advised to change msgChannel.chanNamePrefix.cluster to the different value.
tenant: public
Expand Down Expand Up @@ -396,6 +403,7 @@ queryNode:
nprobe: 16 # nprobe to search small index, based on your accuracy requirement, must smaller than nlist
memExpansionRate: 1.15 # extra memory needed by building interim index
buildParallelRate: 0.5 # the ratio of building interim index parallel matched with cpu num
multipleChunkedEnable: false # Enable multiple chunked search
knowhereScoreConsistency: false # Enable knowhere strong consistency score computation logic
loadMemoryUsageFactor: 1 # The multiply factor of calculating the memory usage while loading segments
enableDisk: false # enable querynode load disk index, and search on disk index
Expand All @@ -415,7 +423,11 @@ queryNode:
vectorIndex: false # Enable mmap for loading vector index
scalarField: false # Enable mmap for loading scalar data
scalarIndex: false # Enable mmap for loading scalar index
growingMmapEnabled: false # Enable mmap for using in growing raw data
chunkCache: true # Enable mmap for chunk cache (raw vector retrieving).
# Enable memory mapping (mmap) to optimize the handling of growing raw data.
# By activating this feature, the memory overhead associated with newly added or modified data will be significantly minimized.
# However, this optimization may come at the cost of a slight decrease in query latency for the affected data segments.
growingMmapEnabled: false
fixedFileSizeForMmapAlloc: 1 # tmp file size for mmap chunk manager
maxDiskUsagePercentageForMmapAlloc: 50 # disk percentage used in mmap chunk manager
lazyload:
Expand All @@ -425,6 +437,7 @@ queryNode:
requestResourceRetryInterval: 2000 # retry interval in milliseconds for waiting request resource for lazy load, 2s by default
maxRetryTimes: 1 # max retry times for lazy load, 1 by default
maxEvictPerRetry: 1 # max evict count for lazy load, 1 by default
indexOffsetCacheEnabled: false # enable index offset cache for some scalar indexes, now is just for bitmap index, enable this param can improve performance for retrieving raw data from index
grouping:
enabled: true
maxNQ: 1000
Expand Down Expand Up @@ -452,12 +465,15 @@ queryNode:
taskQueueExpire: 60 # Control how long (many seconds) that queue retains since queue is empty
enableCrossUserGrouping: false # Enable Cross user grouping when using user-task-polling policy. (Disable it if user's task can not merge each other)
maxPendingTaskPerUser: 1024 # Max pending task per user in scheduler
levelZeroForwardPolicy: FilterByBF # delegator level zero deletion forward policy, possible option["FilterByBF", "RemoteLoad"]
streamingDeltaForwardPolicy: FilterByBF # delegator streaming deletion forward policy, possible option["FilterByBF", "Direct"]
dataSync:
flowGraph:
maxQueueLength: 16 # The maximum size of task queue cache in flow graph in query node.
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
enableSegmentPrune: false # use partition stats to prune data in search/query on shard delegator
queryStreamBatchSize: 4194304 # return batch size of stream query
queryStreamBatchSize: 4194304 # return min batch size of stream query
queryStreamMaxBatchSize: 134217728 # return max batch size of stream query
bloomFilterApplyParallelFactor: 4 # parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM
workerPooling:
size: 10 # the size for worker querynode client pool
Expand Down Expand Up @@ -644,8 +660,9 @@ dataNode:
maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode.
maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.
readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import.
maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task.
compaction:
levelZeroBatchMemoryRatio: 0.05 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.
gracefulStopTimeout: 1800 # seconds. force stop node without graceful stop
slot:
Expand Down Expand Up @@ -753,10 +770,9 @@ tls:
caPemPath: configs/cert/ca.pem

internaltls:
serverPemPath: configs/cert1/server.pem
serverKeyPath: configs/cert1/server.key
caPemPath: configs/cert1/ca.pem
internalAdd: #pod internal address
serverPemPath: #path to server.pem
serverKeyPath: #path to server.key
caPemPath: #path to ca.key

common:
defaultPartitionName: _default # Name of the default partition when a collection is created
Expand All @@ -778,7 +794,6 @@ common:
BeamWidthRatio: 4
gracefulTime: 5000 # milliseconds. it represents the interval (in ms) by which the request arrival time needs to be subtracted in the case of Bounded Consistency.
gracefulStopTimeout: 1800 # seconds. it will force quit the server if the graceful stop process is not completed during this time.
bitmapIndexCardinalityBound: 500
storageType: remote # please adjust in embedded Milvus: local, available values are [local, remote, opendal], value minio is deprecated, use remote instead
# Default value: auto
# Valid values: [auto, avx512, avx2, avx, sse4_2]
Expand Down Expand Up @@ -842,6 +857,7 @@ quotaAndLimits:
maxCollectionNumPerDB: 65536 # Maximum number of collections per database.
maxInsertSize: -1 # maximum size of a single insert request, in bytes, -1 means no limit
maxResourceGroupNumOfQueryNode: 1024 # maximum number of resource groups of query nodes
maxGroupSize: 10 # maximum size for one single group when doing search group by
ddl:
enabled: false # Whether DDL request throttling is enabled.
# Maximum number of collection-related DDL requests per second.
Expand Down Expand Up @@ -993,37 +1009,20 @@ quotaAndLimits:
diskQuotaPerPartition: -1 # MB, (0, +inf), default no limit
l0SegmentsRowCountProtection:
enabled: false # switch to enable l0 segment row count quota
lowWaterLevel: 32768 # l0 segment row count quota, low water level
highWaterLevel: 65536 # l0 segment row count quota, low water level
lowWaterLevel: 30000000 # l0 segment row count quota, low water level
highWaterLevel: 50000000 # l0 segment row count quota, high water level
deleteBufferRowCountProtection:
enabled: false # switch to enable delete buffer row count quota
lowWaterLevel: 32768 # delete buffer row count quota, low water level
highWaterLevel: 65536 # delete buffer row count quota, high water level
deleteBufferSizeProtection:
enabled: false # switch to enable delete buffer size quota
lowWaterLevel: 134217728 # delete buffer size quota, low water level
highWaterLevel: 268435456 # delete buffer size quota, high water level
limitReading:
# forceDeny false means dql requests are allowed (except for some
# specific conditions, such as collection has been dropped), true means always reject all dql requests.
forceDeny: false
queueProtection:
enabled: false
# nqInQueueThreshold indicated that the system was under backpressure for Search/Query path.
# If NQ in any QueryNode's queue is greater than nqInQueueThreshold, search&query rates would gradually cool off
# until the NQ in queue no longer exceeds nqInQueueThreshold. We think of the NQ of query request as 1.
# int, default no limit
nqInQueueThreshold: -1
# queueLatencyThreshold indicated that the system was under backpressure for Search/Query path.
# If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off
# until the latency of queuing no longer exceeds queueLatencyThreshold.
# The latency here refers to the averaged latency over a period of time.
# milliseconds, default no limit
queueLatencyThreshold: -1
resultProtection:
enabled: false
# maxReadResultRate indicated that the system was under backpressure for Search/Query path.
# If dql result rate is greater than maxReadResultRate, search&query rates would gradually cool off
# until the read result rate no longer exceeds maxReadResultRate.
# MB/s, default no limit
maxReadResultRate: -1
maxReadResultRatePerDB: -1
maxReadResultRatePerCollection: -1
# colOffSpeed is the speed of search&query rates cool off.
# (0, 1]
coolOffSpeed: 0.9

trace:
# trace exporter type, default is stdout,
Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/datacoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func NewClient(ctx context.Context) (types.DataCoordClient, error) {
client.grpcClient.SetGetAddrFunc(client.getDataCoordAddr)
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetSession(sess)
if config.InternalTLSEnabled.GetAsBool() {
if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.DataCoordGrpcClientCfg.InternalTLSCaPemPath.GetValue(), "Datacoord")
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "Datacoord")
if err != nil {
log.Error("Failed to create cert pool for Datacoord client")
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion internal/distributed/datacoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil"
streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/funcutil"
Expand Down Expand Up @@ -199,7 +200,9 @@ func (s *Server) startGrpcLoop() {
return s.serverID.Load()
}),
streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(),
))}
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("DataCoord"))
s.grpcServer = grpc.NewServer(grpcOpts...)
Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/datanode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ func NewClient(ctx context.Context, addr string, serverID int64) (types.DataNode
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetNodeID(serverID)
client.grpcClient.SetSession(sess)
if config.InternalTLSEnabled.GetAsBool() {
if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.DataNodeGrpcClientCfg.InternalTLSCaPemPath.GetValue(), "DataNode")
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "DataNode")
if err != nil {
log.Error("Failed to create cert pool for DataNode client")
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion internal/distributed/datanode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/util/dependency"
_ "github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/interceptor"
Expand Down Expand Up @@ -151,7 +152,9 @@ func (s *Server) startGrpcLoop() {
}
return s.serverID.Load()
}),
))}
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("DataNode"))
s.grpcServer = grpc.NewServer(grpcOpts...)
Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/indexnode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func NewClient(ctx context.Context, addr string, nodeID int64, encryption bool)
if encryption {
client.grpcClient.EnableEncryption()
}
if config.InternalTLSEnabled.GetAsBool() {
if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.IndexNodeGrpcClientCfg.InternalTLSCaPemPath.GetValue(), "IndexNode")
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "IndexNode")
if err != nil {
log.Error("Failed to create cert pool for IndexNode client")
return nil, err
Expand Down
8 changes: 5 additions & 3 deletions internal/distributed/indexnode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/indexnode"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
_ "github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/interceptor"
Expand Down Expand Up @@ -138,11 +138,13 @@ func (s *Server) startGrpcLoop() {
}
return s.serverID.Load()
}),
))}
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("IndexNode"))
s.grpcServer = grpc.NewServer(grpcOpts...)
indexpb.RegisterIndexNodeServer(s.grpcServer, s)
workerpb.RegisterIndexNodeServer(s.grpcServer, s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(s.listener); err != nil {
s.grpcErrChan <- err
Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/proxy/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (types.ProxyClien
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetNodeID(nodeID)
client.grpcClient.SetSession(sess)
if config.InternalTLSEnabled.GetAsBool() {
if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.ProxyGrpcServerCfg.InternalTLSCaPemPath.GetValue(), "Proxy")
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "Proxy")
if err != nil {
log.Error("Failed to create cert pool for Proxy client")
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion internal/distributed/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,9 @@ func (s *Server) startInternalGrpc(errChan chan error) {
}
return s.serverID.Load()
}),
))}
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("Proxy"))
s.grpcInternalServer = grpc.NewServer(grpcOpts...)
Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/querycoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func NewClient(ctx context.Context) (types.QueryCoordClient, error) {
client.grpcClient.SetGetAddrFunc(client.getQueryCoordAddr)
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetSession(sess)
if config.InternalTLSEnabled.GetAsBool() {
if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.QueryCoordGrpcServerCfg.InternalTLSCaPemPath.GetValue(), "QueryCoord")
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "QueryCoord")
if err != nil {
log.Error("Failed to create cert pool for QueryCoord client")
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion internal/distributed/querycoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/util/dependency"
_ "github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/funcutil"
Expand Down Expand Up @@ -253,7 +254,9 @@ func (s *Server) startGrpcLoop() {
}
return s.serverID.Load()
}),
))}
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("QueryCoord"))
s.grpcServer = grpc.NewServer(grpcOpts...)
Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/querynode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (types.QueryNodeC
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetNodeID(nodeID)
client.grpcClient.SetSession(sess)
if config.InternalTLSEnabled.GetAsBool() {
if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.QueryNodeGrpcClientCfg.InternalTLSCaPemPath.GetValue(), "QueryNode")
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "QueryNode")
if err != nil {
log.Error("Failed to create cert pool for QueryNode client")
return nil, err
Expand Down
Loading

0 comments on commit c4742ff

Please sign in to comment.