From c4742ff864db5d71321ceab130cebf022e02b7fc Mon Sep 17 00:00:00 2001 From: Nischay Date: Tue, 29 Oct 2024 02:57:45 -0700 Subject: [PATCH] separated InternalTls configs from grpc configs Signed-off-by: Nischay --- configs/milvus.yaml | 73 ++++++------ .../distributed/datacoord/client/client.go | 4 +- internal/distributed/datacoord/service.go | 5 +- .../distributed/datanode/client/client.go | 4 +- internal/distributed/datanode/service.go | 5 +- .../distributed/indexnode/client/client.go | 4 +- internal/distributed/indexnode/service.go | 8 +- internal/distributed/proxy/client/client.go | 4 +- internal/distributed/proxy/service.go | 4 +- .../distributed/querycoord/client/client.go | 4 +- internal/distributed/querycoord/service.go | 5 +- .../distributed/querynode/client/client.go | 4 +- internal/distributed/querynode/service.go | 5 +- .../distributed/rootcoord/client/client.go | 4 +- internal/distributed/rootcoord/service.go | 5 +- internal/distributed/utils/util.go | 32 +---- pkg/util/paramtable/component_param.go | 4 + pkg/util/paramtable/grpc_param.go | 109 ++++++++---------- pkg/util/paramtable/grpc_param_test.go | 17 ++- 19 files changed, 148 insertions(+), 152 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 8ffd65f482b87..b46c375bc7708 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -136,11 +136,17 @@ minio: # aliyun (ecs): https://www.alibabacloud.com/help/en/elastic-compute-service/latest/attach-an-instance-ram-role useIAM: false # Cloud Provider of S3. Supports: "aws", "gcp", "aliyun". + # Cloud Provider of Google Cloud Storage. Supports: "gcpnative". # You can use "aws" for other cloud provider supports S3 API with signature v4, e.g.: minio # You can use "gcp" for other cloud provider supports S3 API with signature v2 # You can use "aliyun" for other cloud provider uses virtual host style bucket + # You can use "gcpnative" for the Google Cloud Platform provider. Uses service account credentials + # for authentication. # When useIAM enabled, only "aws", "gcp", "aliyun" is supported for now cloudProvider: aws + # The JSON content contains the gcs service account credentials. + # Used only for the "gcpnative" cloud provider. + gcpCredentialJSON: # Custom endpoint for fetch IAM role credentials. when useIAM is true & cloudProvider is "aws". # Leave it empty if you want to use AWS default endpoint iamEndpoint: @@ -164,6 +170,7 @@ mq: enablePursuitMode: true # Default value: "true" pursuitLag: 10 # time tick lag threshold to enter pursuit mode, in seconds pursuitBufferSize: 8388608 # pursuit mode buffer size in bytes + pursuitBufferTime: 60 # pursuit mode buffer time in seconds mqBufSize: 16 # MQ client consumer buffer length dispatcher: mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge @@ -181,9 +188,9 @@ pulsar: port: 6650 # Port of Pulsar service. webport: 80 # Web port of of Pulsar service. If you connect direcly without proxy, should use 8080. # The maximum size of each message in Pulsar. Unit: Byte. - # By default, Pulsar can transmit at most 5 MB of data in a single message. When the size of inserted data is greater than this value, proxy fragments the data into multiple messages to ensure that they can be transmitted correctly. + # By default, Pulsar can transmit at most 2MB of data in a single message. When the size of inserted data is greater than this value, proxy fragments the data into multiple messages to ensure that they can be transmitted correctly. # If the corresponding parameter in Pulsar remains unchanged, increasing this configuration will cause Milvus to fail, and reducing it produces no advantage. - maxMessageSize: 5242880 + maxMessageSize: 2097152 # Pulsar can be provisioned for specific tenants with appropriate capacity allocated to the tenant. # To share a Pulsar instance among multiple Milvus instances, you can change this to an Pulsar tenant rather than the default one for each Milvus instance before you start them. However, if you do not want Pulsar multi-tenancy, you are advised to change msgChannel.chanNamePrefix.cluster to the different value. tenant: public @@ -396,6 +403,7 @@ queryNode: nprobe: 16 # nprobe to search small index, based on your accuracy requirement, must smaller than nlist memExpansionRate: 1.15 # extra memory needed by building interim index buildParallelRate: 0.5 # the ratio of building interim index parallel matched with cpu num + multipleChunkedEnable: false # Enable multiple chunked search knowhereScoreConsistency: false # Enable knowhere strong consistency score computation logic loadMemoryUsageFactor: 1 # The multiply factor of calculating the memory usage while loading segments enableDisk: false # enable querynode load disk index, and search on disk index @@ -415,7 +423,11 @@ queryNode: vectorIndex: false # Enable mmap for loading vector index scalarField: false # Enable mmap for loading scalar data scalarIndex: false # Enable mmap for loading scalar index - growingMmapEnabled: false # Enable mmap for using in growing raw data + chunkCache: true # Enable mmap for chunk cache (raw vector retrieving). + # Enable memory mapping (mmap) to optimize the handling of growing raw data. + # By activating this feature, the memory overhead associated with newly added or modified data will be significantly minimized. + # However, this optimization may come at the cost of a slight decrease in query latency for the affected data segments. + growingMmapEnabled: false fixedFileSizeForMmapAlloc: 1 # tmp file size for mmap chunk manager maxDiskUsagePercentageForMmapAlloc: 50 # disk percentage used in mmap chunk manager lazyload: @@ -425,6 +437,7 @@ queryNode: requestResourceRetryInterval: 2000 # retry interval in milliseconds for waiting request resource for lazy load, 2s by default maxRetryTimes: 1 # max retry times for lazy load, 1 by default maxEvictPerRetry: 1 # max evict count for lazy load, 1 by default + indexOffsetCacheEnabled: false # enable index offset cache for some scalar indexes, now is just for bitmap index, enable this param can improve performance for retrieving raw data from index grouping: enabled: true maxNQ: 1000 @@ -452,12 +465,15 @@ queryNode: taskQueueExpire: 60 # Control how long (many seconds) that queue retains since queue is empty enableCrossUserGrouping: false # Enable Cross user grouping when using user-task-polling policy. (Disable it if user's task can not merge each other) maxPendingTaskPerUser: 1024 # Max pending task per user in scheduler + levelZeroForwardPolicy: FilterByBF # delegator level zero deletion forward policy, possible option["FilterByBF", "RemoteLoad"] + streamingDeltaForwardPolicy: FilterByBF # delegator streaming deletion forward policy, possible option["FilterByBF", "Direct"] dataSync: flowGraph: maxQueueLength: 16 # The maximum size of task queue cache in flow graph in query node. maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph enableSegmentPrune: false # use partition stats to prune data in search/query on shard delegator - queryStreamBatchSize: 4194304 # return batch size of stream query + queryStreamBatchSize: 4194304 # return min batch size of stream query + queryStreamMaxBatchSize: 134217728 # return max batch size of stream query bloomFilterApplyParallelFactor: 4 # parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM workerPooling: size: 10 # the size for worker querynode client pool @@ -644,8 +660,9 @@ dataNode: maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode. maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files. readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import. + maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task. compaction: - levelZeroBatchMemoryRatio: 0.05 # The minimal memory ratio of free memory for level zero compaction executing in batch mode + levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1. gracefulStopTimeout: 1800 # seconds. force stop node without graceful stop slot: @@ -753,10 +770,9 @@ tls: caPemPath: configs/cert/ca.pem internaltls: - serverPemPath: configs/cert1/server.pem - serverKeyPath: configs/cert1/server.key - caPemPath: configs/cert1/ca.pem - internalAdd: #pod internal address + serverPemPath: #path to server.pem + serverKeyPath: #path to server.key + caPemPath: #path to ca.key common: defaultPartitionName: _default # Name of the default partition when a collection is created @@ -778,7 +794,6 @@ common: BeamWidthRatio: 4 gracefulTime: 5000 # milliseconds. it represents the interval (in ms) by which the request arrival time needs to be subtracted in the case of Bounded Consistency. gracefulStopTimeout: 1800 # seconds. it will force quit the server if the graceful stop process is not completed during this time. - bitmapIndexCardinalityBound: 500 storageType: remote # please adjust in embedded Milvus: local, available values are [local, remote, opendal], value minio is deprecated, use remote instead # Default value: auto # Valid values: [auto, avx512, avx2, avx, sse4_2] @@ -842,6 +857,7 @@ quotaAndLimits: maxCollectionNumPerDB: 65536 # Maximum number of collections per database. maxInsertSize: -1 # maximum size of a single insert request, in bytes, -1 means no limit maxResourceGroupNumOfQueryNode: 1024 # maximum number of resource groups of query nodes + maxGroupSize: 10 # maximum size for one single group when doing search group by ddl: enabled: false # Whether DDL request throttling is enabled. # Maximum number of collection-related DDL requests per second. @@ -993,37 +1009,20 @@ quotaAndLimits: diskQuotaPerPartition: -1 # MB, (0, +inf), default no limit l0SegmentsRowCountProtection: enabled: false # switch to enable l0 segment row count quota - lowWaterLevel: 32768 # l0 segment row count quota, low water level - highWaterLevel: 65536 # l0 segment row count quota, low water level + lowWaterLevel: 30000000 # l0 segment row count quota, low water level + highWaterLevel: 50000000 # l0 segment row count quota, high water level + deleteBufferRowCountProtection: + enabled: false # switch to enable delete buffer row count quota + lowWaterLevel: 32768 # delete buffer row count quota, low water level + highWaterLevel: 65536 # delete buffer row count quota, high water level + deleteBufferSizeProtection: + enabled: false # switch to enable delete buffer size quota + lowWaterLevel: 134217728 # delete buffer size quota, low water level + highWaterLevel: 268435456 # delete buffer size quota, high water level limitReading: # forceDeny false means dql requests are allowed (except for some # specific conditions, such as collection has been dropped), true means always reject all dql requests. forceDeny: false - queueProtection: - enabled: false - # nqInQueueThreshold indicated that the system was under backpressure for Search/Query path. - # If NQ in any QueryNode's queue is greater than nqInQueueThreshold, search&query rates would gradually cool off - # until the NQ in queue no longer exceeds nqInQueueThreshold. We think of the NQ of query request as 1. - # int, default no limit - nqInQueueThreshold: -1 - # queueLatencyThreshold indicated that the system was under backpressure for Search/Query path. - # If dql latency of queuing is greater than queueLatencyThreshold, search&query rates would gradually cool off - # until the latency of queuing no longer exceeds queueLatencyThreshold. - # The latency here refers to the averaged latency over a period of time. - # milliseconds, default no limit - queueLatencyThreshold: -1 - resultProtection: - enabled: false - # maxReadResultRate indicated that the system was under backpressure for Search/Query path. - # If dql result rate is greater than maxReadResultRate, search&query rates would gradually cool off - # until the read result rate no longer exceeds maxReadResultRate. - # MB/s, default no limit - maxReadResultRate: -1 - maxReadResultRatePerDB: -1 - maxReadResultRatePerCollection: -1 - # colOffSpeed is the speed of search&query rates cool off. - # (0, 1] - coolOffSpeed: 0.9 trace: # trace exporter type, default is stdout, diff --git a/internal/distributed/datacoord/client/client.go b/internal/distributed/datacoord/client/client.go index 92d0e91193268..732aa7a541d22 100644 --- a/internal/distributed/datacoord/client/client.go +++ b/internal/distributed/datacoord/client/client.go @@ -71,9 +71,9 @@ func NewClient(ctx context.Context) (types.DataCoordClient, error) { client.grpcClient.SetGetAddrFunc(client.getDataCoordAddr) client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient) client.grpcClient.SetSession(sess) - if config.InternalTLSEnabled.GetAsBool() { + if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() { client.grpcClient.EnableEncryption() - cp, err := utils.CreateCertPoolforClient(Params.DataCoordGrpcClientCfg.InternalTLSCaPemPath.GetValue(), "Datacoord") + cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "Datacoord") if err != nil { log.Error("Failed to create cert pool for Datacoord client") return nil, err diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index 7748de14c1369..ee17f8c0d3a03 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -43,6 +43,7 @@ import ( "github.com/milvus-io/milvus/internal/util/streamingutil" streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -199,7 +200,9 @@ func (s *Server) startGrpcLoop() { return s.serverID.Load() }), streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(), - ))} + )), + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), + } grpcOpts = append(grpcOpts, utils.EnableInternalTLS("DataCoord")) s.grpcServer = grpc.NewServer(grpcOpts...) diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 98b1ad7a0e6ea..8606f0097fb8c 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -72,9 +72,9 @@ func NewClient(ctx context.Context, addr string, serverID int64) (types.DataNode client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient) client.grpcClient.SetNodeID(serverID) client.grpcClient.SetSession(sess) - if config.InternalTLSEnabled.GetAsBool() { + if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() { client.grpcClient.EnableEncryption() - cp, err := utils.CreateCertPoolforClient(Params.DataNodeGrpcClientCfg.InternalTLSCaPemPath.GetValue(), "DataNode") + cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "DataNode") if err != nil { log.Error("Failed to create cert pool for DataNode client") return nil, err diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 1b73f8784db9f..df4dcb0eaf358 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/internal/util/dependency" _ "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/interceptor" @@ -151,7 +152,9 @@ func (s *Server) startGrpcLoop() { } return s.serverID.Load() }), - ))} + )), + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), + } grpcOpts = append(grpcOpts, utils.EnableInternalTLS("DataNode")) s.grpcServer = grpc.NewServer(grpcOpts...) diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index 35e53b8e2cf06..7387bdb1385e3 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -73,9 +73,9 @@ func NewClient(ctx context.Context, addr string, nodeID int64, encryption bool) if encryption { client.grpcClient.EnableEncryption() } - if config.InternalTLSEnabled.GetAsBool() { + if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() { client.grpcClient.EnableEncryption() - cp, err := utils.CreateCertPoolforClient(Params.IndexNodeGrpcClientCfg.InternalTLSCaPemPath.GetValue(), "IndexNode") + cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "IndexNode") if err != nil { log.Error("Failed to create cert pool for IndexNode client") return nil, err diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index dfd25e83e769d..8108615450441 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -33,13 +33,13 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/distributed/utils" "github.com/milvus-io/milvus/internal/indexnode" - "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/workerpb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" _ "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/interceptor" @@ -138,11 +138,13 @@ func (s *Server) startGrpcLoop() { } return s.serverID.Load() }), - ))} + )), + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), + } grpcOpts = append(grpcOpts, utils.EnableInternalTLS("IndexNode")) s.grpcServer = grpc.NewServer(grpcOpts...) - indexpb.RegisterIndexNodeServer(s.grpcServer, s) + workerpb.RegisterIndexNodeServer(s.grpcServer, s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) if err := s.grpcServer.Serve(s.listener); err != nil { s.grpcErrChan <- err diff --git a/internal/distributed/proxy/client/client.go b/internal/distributed/proxy/client/client.go index 2b8844abd9abf..ffbc91ab20ca3 100644 --- a/internal/distributed/proxy/client/client.go +++ b/internal/distributed/proxy/client/client.go @@ -70,9 +70,9 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (types.ProxyClien client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient) client.grpcClient.SetNodeID(nodeID) client.grpcClient.SetSession(sess) - if config.InternalTLSEnabled.GetAsBool() { + if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() { client.grpcClient.EnableEncryption() - cp, err := utils.CreateCertPoolforClient(Params.ProxyGrpcServerCfg.InternalTLSCaPemPath.GetValue(), "Proxy") + cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "Proxy") if err != nil { log.Error("Failed to create cert pool for Proxy client") return nil, err diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index f3c20eea2b50a..32f5748a5796d 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -366,7 +366,9 @@ func (s *Server) startInternalGrpc(errChan chan error) { } return s.serverID.Load() }), - ))} + )), + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), + } grpcOpts = append(grpcOpts, utils.EnableInternalTLS("Proxy")) s.grpcInternalServer = grpc.NewServer(grpcOpts...) diff --git a/internal/distributed/querycoord/client/client.go b/internal/distributed/querycoord/client/client.go index 1eb62fe13621b..1e7a4e390b73c 100644 --- a/internal/distributed/querycoord/client/client.go +++ b/internal/distributed/querycoord/client/client.go @@ -63,9 +63,9 @@ func NewClient(ctx context.Context) (types.QueryCoordClient, error) { client.grpcClient.SetGetAddrFunc(client.getQueryCoordAddr) client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient) client.grpcClient.SetSession(sess) - if config.InternalTLSEnabled.GetAsBool() { + if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() { client.grpcClient.EnableEncryption() - cp, err := utils.CreateCertPoolforClient(Params.QueryCoordGrpcServerCfg.InternalTLSCaPemPath.GetValue(), "QueryCoord") + cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "QueryCoord") if err != nil { log.Error("Failed to create cert pool for QueryCoord client") return nil, err diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 333ed591e9694..3dbca5bc69363 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/internal/util/dependency" _ "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -253,7 +254,9 @@ func (s *Server) startGrpcLoop() { } return s.serverID.Load() }), - ))} + )), + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), + } grpcOpts = append(grpcOpts, utils.EnableInternalTLS("QueryCoord")) s.grpcServer = grpc.NewServer(grpcOpts...) diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 709fb6aed5029..ad05792f6c8f1 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -72,9 +72,9 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (types.QueryNodeC client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient) client.grpcClient.SetNodeID(nodeID) client.grpcClient.SetSession(sess) - if config.InternalTLSEnabled.GetAsBool() { + if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() { client.grpcClient.EnableEncryption() - cp, err := utils.CreateCertPoolforClient(Params.QueryNodeGrpcClientCfg.InternalTLSCaPemPath.GetValue(), "QueryNode") + cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "QueryNode") if err != nil { log.Error("Failed to create cert pool for QueryNode client") return nil, err diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index dc84de1830232..1f0a502ac6aef 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -39,6 +39,7 @@ import ( "github.com/milvus-io/milvus/internal/util/dependency" _ "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/interceptor" @@ -200,7 +201,9 @@ func (s *Server) startGrpcLoop() { } return s.serverID.Load() }), - ))} + )), + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), + } grpcOpts = append(grpcOpts, utils.EnableInternalTLS("QueryNode")) s.grpcServer = grpc.NewServer(grpcOpts...) diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 7beb10fa40e5c..ee6a7e6ae6194 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -70,9 +70,9 @@ func NewClient(ctx context.Context) (types.RootCoordClient, error) { client.grpcClient.SetGetAddrFunc(client.getRootCoordAddr) client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient) client.grpcClient.SetSession(sess) - if config.InternalTLSEnabled.GetAsBool() { + if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() { client.grpcClient.EnableEncryption() - cp, err := utils.CreateCertPoolforClient(Params.RootCoordGrpcClientCfg.InternalTLSCaPemPath.GetValue(), "RootCoord") + cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "RootCoord") if err != nil { log.Error("Failed to create cert pool for RootCoord client") return nil, err diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index efc0944946133..7ee1279722a81 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/internal/util/dependency" _ "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/tracer" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -301,7 +302,9 @@ func (s *Server) startGrpcLoop() { } return s.serverID.Load() }), - ))} + )), + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), + } grpcOpts = append(grpcOpts, utils.EnableInternalTLS("RootCoord")) s.grpcServer = grpc.NewServer(grpcOpts...) diff --git a/internal/distributed/utils/util.go b/internal/distributed/utils/util.go index 14435ca2f5972..d95f34ae9e00e 100644 --- a/internal/distributed/utils/util.go +++ b/internal/distributed/utils/util.go @@ -2,8 +2,8 @@ package utils import ( "crypto/x509" + "errors" "os" - "strings" "time" "go.uber.org/zap" @@ -49,31 +49,11 @@ func getTLSCreds(certFile string, keyFile string, nodeType string) credentials.T func EnableInternalTLS(NodeType string) grpc.ServerOption { var Params *paramtable.ComponentParam = paramtable.Get() - var serverCfg *paramtable.GrpcServerConfig - switch strings.ToLower(NodeType) { - case "datacoord": - serverCfg = &Params.DataCoordGrpcServerCfg - case "datanode": - serverCfg = &Params.DataNodeGrpcServerCfg - case "indexnode": - serverCfg = &Params.IndexNodeGrpcServerCfg - case "proxy": - serverCfg = &Params.ProxyGrpcServerCfg - case "querycoord": - serverCfg = &Params.QueryCoordGrpcServerCfg - case "querynode": - serverCfg = &Params.QueryNodeGrpcServerCfg - case "rootcoord": - serverCfg = &Params.RootCoordGrpcServerCfg - default: - log.Error("Unknown NodeType") - return grpc.Creds(nil) - } - certFile := serverCfg.InternalTLSServerPemPath.GetValue() - keyFile := serverCfg.InternalTLSServerKeyPath.GetValue() - internaltlsEnabled := serverCfg.InternalTLSEnabled.GetAsBool() + certFile := Params.InternalTLSCfg.InternalTLSServerPemPath.GetValue() + keyFile := Params.InternalTLSCfg.InternalTLSServerKeyPath.GetValue() + internaltlsEnabled := Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() - log.Info("internal TLS Enabled", zap.Bool("value", internaltlsEnabled)) + log.Info("Internal TLS Enabled", zap.Bool("value", internaltlsEnabled)) if internaltlsEnabled { creds := getTLSCreds(certFile, keyFile, NodeType) @@ -95,7 +75,7 @@ func CreateCertPoolforClient(caFile string, nodeType string) (*x509.CertPool, er if !certPool.AppendCertsFromPEM(b) { log.Error("credentials: failed to append certificates") - return nil, err // Cert pool is invalid, return nil and the error + return nil, errors.New("failed to append certificates") // Cert pool is invalid, return nil and the error } return certPool, err } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 9e0ff8bdc30a4..82afeb4359389 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -82,6 +82,8 @@ type ComponentParam struct { RoleCfg roleConfig StreamingCfg streamingConfig + InternalTLSCfg InternalTLSConfig + RootCoordGrpcServerCfg GrpcServerConfig ProxyGrpcServerCfg GrpcServerConfig QueryCoordGrpcServerCfg GrpcServerConfig @@ -137,6 +139,8 @@ func (p *ComponentParam) init(bt *BaseTable) { p.GpuConfig.init(bt) p.KnowhereConfig.init(bt) + p.InternalTLSCfg.Init(bt) + p.RootCoordGrpcServerCfg.Init("rootCoord", bt) p.ProxyGrpcServerCfg.Init("proxy", bt) p.ProxyGrpcServerCfg.InternalPort.Export = true diff --git a/pkg/util/paramtable/grpc_param.go b/pkg/util/paramtable/grpc_param.go index c1e41874398b7..eb53070d19ea2 100644 --- a/pkg/util/paramtable/grpc_param.go +++ b/pkg/util/paramtable/grpc_param.go @@ -64,20 +64,15 @@ const ( // ///////////////////////////////////////////////////////////////////////////// // --- grpc --- type grpcConfig struct { - Domain string `refreshable:"false"` - IP string `refreshable:"false"` - InternalTLSEnabled ParamItem `refreshable:"false"` - Address ParamItem `refreshable:"false"` - TLSMode ParamItem `refreshable:"false"` - IPItem ParamItem `refreshable:"false"` - Port ParamItem `refreshable:"false"` - InternalPort ParamItem `refreshable:"false"` - ServerPemPath ParamItem `refreshable:"false"` - ServerKeyPath ParamItem `refreshable:"false"` - CaPemPath ParamItem `refreshable:"false"` - InternalTLSServerPemPath ParamItem `refreshable:"false"` - InternalTLSServerKeyPath ParamItem `refreshable:"false"` - InternalTLSCaPemPath ParamItem `refreshable:"false"` + Domain string `refreshable:"false"` + IP string `refreshable:"false"` + TLSMode ParamItem `refreshable:"false"` + IPItem ParamItem `refreshable:"false"` + Port ParamItem `refreshable:"false"` + InternalPort ParamItem `refreshable:"false"` + ServerPemPath ParamItem `refreshable:"false"` + ServerKeyPath ParamItem `refreshable:"false"` + CaPemPath ParamItem `refreshable:"false"` } func (p *grpcConfig) init(domain string, base *BaseTable) { @@ -91,13 +86,6 @@ func (p *grpcConfig) init(domain string, base *BaseTable) { p.IPItem.Init(base.mgr) p.IP = funcutil.GetIP(p.IPItem.GetValue()) - p.Address = ParamItem{ - Key: p.Domain + ".internalAdd", - Version: "2.0.0", - Export: true, - } - p.Address.Init(base.mgr) - p.Port = ParamItem{ Key: p.Domain + ".port", Version: "2.0.0", @@ -142,52 +130,15 @@ func (p *grpcConfig) init(domain string, base *BaseTable) { Export: true, } p.CaPemPath.Init(base.mgr) - - p.InternalTLSEnabled = ParamItem{ - Key: "common.security.internaltlsEnabled", - Version: "2.0.0", - DefaultValue: "0", - Export: true, - } - p.InternalTLSEnabled.Init(base.mgr) - - p.InternalTLSServerPemPath = ParamItem{ - Key: "internaltls.serverPemPath", - Version: "2.0.0", - Export: true, - } - p.InternalTLSServerPemPath.Init(base.mgr) - - p.InternalTLSServerKeyPath = ParamItem{ - Key: "internaltls.serverKeyPath", - Version: "2.0.0", - Export: true, - } - p.InternalTLSServerKeyPath.Init(base.mgr) - - p.InternalTLSCaPemPath = ParamItem{ - Key: "internaltls.caPemPath", - Version: "2.0.0", - Export: true, - } - p.InternalTLSCaPemPath.Init(base.mgr) } // GetAddress return grpc address func (p *grpcConfig) GetAddress() string { - if !p.InternalTLSEnabled.GetAsBool() { - return p.IP + ":" + p.Port.GetValue() - } - fmt.Println("address: ", p.Address.GetValue()) - return p.Address.GetValue() + ":" + p.Port.GetValue() + return p.IP + ":" + p.Port.GetValue() } func (p *grpcConfig) GetInternalAddress() string { - if !p.InternalTLSEnabled.GetAsBool() { - return p.IP + ":" + p.InternalPort.GetValue() - } - fmt.Println("address: ", p.Address.GetValue()) - return p.Address.GetValue() + ":" + p.InternalPort.GetValue() + return p.IP + ":" + p.InternalPort.GetValue() } // GrpcServerConfig is configuration for grpc server. @@ -584,3 +535,41 @@ func (p *GrpcClientConfig) GetDefaultRetryPolicy() map[string]interface{} { "backoffMultiplier": p.BackoffMultiplier.GetAsFloat(), } } + +type InternalTLSConfig struct { + InternalTLSEnabled ParamItem `refreshable:"false"` + InternalTLSServerPemPath ParamItem `refreshable:"false"` + InternalTLSServerKeyPath ParamItem `refreshable:"false"` + InternalTLSCaPemPath ParamItem `refreshable:"false"` +} + +func (p *InternalTLSConfig) Init(base *BaseTable) { + p.InternalTLSEnabled = ParamItem{ + Key: "common.security.internaltlsEnabled", + Version: "2.0.0", + DefaultValue: "0", + Export: true, + } + p.InternalTLSEnabled.Init(base.mgr) + + p.InternalTLSServerPemPath = ParamItem{ + Key: "internaltls.serverPemPath", + Version: "2.0.0", + Export: true, + } + p.InternalTLSServerPemPath.Init(base.mgr) + + p.InternalTLSServerKeyPath = ParamItem{ + Key: "internaltls.serverKeyPath", + Version: "2.0.0", + Export: true, + } + p.InternalTLSServerKeyPath.Init(base.mgr) + + p.InternalTLSCaPemPath = ParamItem{ + Key: "internaltls.caPemPath", + Version: "2.0.0", + Export: true, + } + p.InternalTLSCaPemPath.Init(base.mgr) +} diff --git a/pkg/util/paramtable/grpc_param_test.go b/pkg/util/paramtable/grpc_param_test.go index d8d12a521bf1a..5e598d5fd16c6 100644 --- a/pkg/util/paramtable/grpc_param_test.go +++ b/pkg/util/paramtable/grpc_param_test.go @@ -177,15 +177,20 @@ func TestGrpcClientParams(t *testing.T) { assert.Equal(t, clientConfig.ServerPemPath.GetValue(), "/pem") assert.Equal(t, clientConfig.ServerKeyPath.GetValue(), "/key") assert.Equal(t, clientConfig.CaPemPath.GetValue(), "/ca") +} + +func TestInternalTLSParams(t *testing.T) { + base := ComponentParam{} + base.Init(NewBaseTable(SkipRemote(true))) + var internalTlsCfg InternalTLSConfig + internalTlsCfg.Init(base.baseTable) base.Save("common.security.internalTlsEnabled", "True") base.Save("internaltls.serverPemPath", "/pem") base.Save("internaltls.serverKeyPath", "/key") base.Save("internaltls.caPemPath", "/ca") - base.Save("internaltls.Address","/datanode") - assert.Equal(t, clientConfig.TLSMode.GetAsBool(), "True") - assert.Equal(t, clientConfig.ServerPemPath.GetValue(), "/pem") - assert.Equal(t, clientConfig.ServerKeyPath.GetValue(), "/key") - assert.Equal(t, clientConfig.CaPemPath.GetValue(), "/ca") - assert.Equal(t, clientConfig.Address.GetValue(), "/datanode") + assert.Equal(t, internalTlsCfg.InternalTLSEnabled.GetAsBool(), "True") + assert.Equal(t, internalTlsCfg.InternalTLSServerPemPath.GetValue(), "/pem") + assert.Equal(t, internalTlsCfg.InternalTLSServerKeyPath.GetValue(), "/key") + assert.Equal(t, internalTlsCfg.InternalTLSCaPemPath.GetValue(), "/ca") }