diff --git a/clients/config_client/config_client_test.go b/clients/config_client/config_client_test.go index c73c983d..c7cd37a4 100644 --- a/clients/config_client/config_client_test.go +++ b/clients/config_client/config_client_test.go @@ -19,6 +19,7 @@ package config_client import ( "context" "errors" + "fmt" "github.com/nacos-group/nacos-sdk-go/v2/util" "testing" @@ -34,7 +35,7 @@ import ( "github.com/stretchr/testify/assert" ) -var serverConfigWithOptions = constant.NewServerConfig("mse-xxx-p.nacos-ans.mse.aliyuncs.com", 8848) +var serverConfigWithOptions = constant.NewServerConfig("127.0.0.1", 80) var clientConfigWithOptions = constant.NewClientConfig( constant.WithTimeoutMs(10*1000), @@ -45,6 +46,7 @@ var clientConfigWithOptions = constant.NewClientConfig( constant.WithOpenKMS(true), constant.WithKMSVersion(constant.KMSv1), constant.WithRegionId("cn-hangzhou"), + constant.WithAppConnLabels(map[string]string{"key1": "value1", "key2": "value2", "key3": "value3"}), ) var clientTLsConfigWithOptions = constant.NewClientConfig( @@ -310,6 +312,8 @@ func TestListen(t *testing.T) { DataId: localConfigTest.DataId, Group: localConfigTest.Group, OnChange: func(namespace, group, dataId, data string) { + fmt.Printf("receive content : %s\n", data) + }, }) assert.Nil(t, err) diff --git a/clients/config_client/config_proxy.go b/clients/config_client/config_proxy.go index b3038840..f21a1394 100644 --- a/clients/config_client/config_proxy.go +++ b/clients/config_client/config_proxy.go @@ -166,7 +166,7 @@ func (cp *ConfigProxy) createRpcClient(ctx context.Context, taskId string, clien "taskId": taskId, } - iRpcClient, _ := rpc.CreateClient(ctx, "config-"+taskId+"-"+client.uid, rpc.GRPC, labels, cp.nacosServer, &cp.clientConfig.TLSCfg) + iRpcClient, _ := rpc.CreateClient(ctx, "config-"+taskId+"-"+client.uid, rpc.GRPC, labels, cp.nacosServer, &cp.clientConfig.TLSCfg, cp.clientConfig.AppConnLabels) rpcClient := iRpcClient.GetRpcClient() if rpcClient.IsInitialized() { rpcClient.RegisterServerRequestHandler(func() rpc_request.IRequest { diff --git a/clients/naming_client/naming_client_test.go b/clients/naming_client/naming_client_test.go index 58ac40e6..35656dd7 100644 --- a/clients/naming_client/naming_client_test.go +++ b/clients/naming_client/naming_client_test.go @@ -34,7 +34,7 @@ var clientConfigTest = *constant.NewClientConfig( constant.WithNotLoadCacheAtStart(true), ) -var serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos")) +var serverConfigTest = *constant.NewServerConfig("mse-xxx-p.nacos-ans.mse.aliyuncs.com", 8848, constant.WithContextPath("/nacos")) type MockNamingProxy struct { } diff --git a/clients/naming_client/naming_grpc/naming_grpc_proxy.go b/clients/naming_client/naming_grpc/naming_grpc_proxy.go index 2bf017a7..39fd85f5 100644 --- a/clients/naming_client/naming_grpc/naming_grpc_proxy.go +++ b/clients/naming_client/naming_grpc/naming_grpc_proxy.go @@ -61,7 +61,7 @@ func NewNamingGrpcProxy(ctx context.Context, clientCfg constant.ClientConfig, na constant.LABEL_MODULE: constant.LABEL_MODULE_NAMING, } - iRpcClient, err := rpc.CreateClient(ctx, uid.String(), rpc.GRPC, labels, srvProxy.nacosServer, &clientCfg.TLSCfg) + iRpcClient, err := rpc.CreateClient(ctx, uid.String(), rpc.GRPC, labels, srvProxy.nacosServer, &clientCfg.TLSCfg, clientCfg.AppConnLabels) if err != nil { return nil, err } diff --git a/common/constant/client_config_options.go b/common/constant/client_config_options.go index 50e8cd60..7befcff3 100644 --- a/common/constant/client_config_options.go +++ b/common/constant/client_config_options.go @@ -226,3 +226,9 @@ func WithTLS(tlsCfg TLSConfig) ClientOption { config.TLSCfg = tlsCfg } } + +func WithAppConnLabels(appConnLabels map[string]string) ClientOption { + return func(config *ClientConfig) { + config.AppConnLabels = appConnLabels + } +} diff --git a/common/constant/config.go b/common/constant/config.go index 202234b7..0d5998ed 100644 --- a/common/constant/config.go +++ b/common/constant/config.go @@ -58,6 +58,7 @@ type ClientConfig struct { EndpointContextPath string // the address server endpoint contextPath EndpointQueryParams string // the address server endpoint query params ClusterName string // the address server clusterName + AppConnLabels map[string]string // app conn labels } type ClientLogSamplingConfig struct { diff --git a/common/remote/rpc/rpc_client.go b/common/remote/rpc/rpc_client.go index a1502fc4..4e7cae45 100644 --- a/common/remote/rpc/rpc_client.go +++ b/common/remote/rpc/rpc_client.go @@ -18,8 +18,11 @@ package rpc import ( "context" + "fmt" "math" + "os" "reflect" + "strings" "sync" "sync/atomic" "time" @@ -147,10 +150,11 @@ func getClient(clientName string) IRpcClient { return clientMap[clientName] } -func CreateClient(ctx context.Context, clientName string, connectionType ConnectionType, labels map[string]string, nacosServer *nacos_server.NacosServer, tlsConfig *constant.TLSConfig) (IRpcClient, error) { +func CreateClient(ctx context.Context, clientName string, connectionType ConnectionType, labels map[string]string, nacosServer *nacos_server.NacosServer, tlsConfig *constant.TLSConfig, appConnLabels map[string]string) (IRpcClient, error) { cMux.Lock() defer cMux.Unlock() if _, ok := clientMap[clientName]; !ok { + logger.Infof("init rpc client for name ", clientName) var rpcClient IRpcClient if GRPC == connectionType { rpcClient = NewGrpcClient(ctx, clientName, nacosServer, tlsConfig) @@ -158,6 +162,19 @@ func CreateClient(ctx context.Context, clientName string, connectionType Connect if rpcClient == nil { return nil, errors.New("unsupported connection type") } + + logger.Infof("get app conn labels from client config %s ", appConnLabels) + appConnLabelsEnv := getAppLabelsFromEnv() + logger.Infof("get app conn labels from env %s ", appConnLabelsEnv) + + appConnLabelsFinal := mergerAppLabels(appConnLabels, appConnLabelsEnv) + logger.Infof("final app conn labels : %s ", appConnLabelsFinal) + + appConnLabelsFinal = addPrefixForEachKey(appConnLabelsFinal, "app_") + if len(appConnLabelsFinal) != 0 { + rpcClient.putAllLabels(appConnLabelsFinal) + } + rpcClient.putAllLabels(labels) clientMap[clientName] = rpcClient return rpcClient, nil @@ -165,6 +182,92 @@ func CreateClient(ctx context.Context, clientName string, connectionType Connect return clientMap[clientName], nil } +func mergerAppLabels(appLabelsAppointed map[string]string, appLabelsEnv map[string]string) map[string]string { + preferred := strings.ToLower(os.Getenv("nacos_app_conn_labels_preferred")) + + var preferFirst bool + if preferred != "env" { + preferFirst = true + } else { + preferFirst = false + } + return mergeMaps(appLabelsAppointed, appLabelsEnv, preferFirst) +} + +func mergeMaps(map1, map2 map[string]string, preferFirst bool) map[string]string { + result := make(map[string]string, 8) + + for k, v := range map1 { + result[k] = v + } + + for k, v := range map2 { + _, ok := map1[k] + if preferFirst && ok { + continue + } + result[k] = v + } + + return result +} + +func getAppLabelsFromEnv() map[string]string { + configMap := make(map[string]string, 8) + + // nacos_config_gray_label + grayLabel := os.Getenv("nacos_config_gray_label") + if grayLabel != "" { + configMap["nacos_config_gray_label"] = grayLabel + } + + // nacos_app_conn_labels + connLabels := os.Getenv("nacos_app_conn_labels") + if connLabels != "" { + labelsMap := parseLabels(connLabels) + for k, v := range labelsMap { + configMap[k] = v + } + } + + return configMap +} + +func parseLabels(rawLabels string) map[string]string { + if strings.TrimSpace(rawLabels) == "" { + return make(map[string]string, 2) + } + + resultMap := make(map[string]string, 2) + labels := strings.Split(rawLabels, ",") + for _, label := range labels { + if strings.TrimSpace(label) != "" { + kv := strings.Split(label, "=") + if len(kv) == 2 { + resultMap[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1]) + } else { + fmt.Println("unknown label format:", label) + } + } + } + return resultMap +} + +func addPrefixForEachKey(m map[string]string, prefix string) map[string]string { + if len(m) == 0 { + return m + } + + newMap := make(map[string]string, len(m)) + for k, v := range m { + if strings.TrimSpace(k) != "" { + newKey := prefix + k + newMap[newKey] = v + } + } + return newMap +} + func (r *RpcClient) Start() { if ok := atomic.CompareAndSwapInt32((*int32)(&r.rpcClientStatus), (int32)(INITIALIZED), (int32)(STARTING)); !ok { return