From 9509a6e1a8a90f0773545f668dc9f368a09e4abb Mon Sep 17 00:00:00 2001 From: shiyiyue1102 Date: Mon, 6 May 2024 19:04:12 +0800 Subject: [PATCH 1/4] =?UTF-8?q?go=20sdk=E6=94=AF=E6=8C=81=E6=A0=87?= =?UTF-8?q?=E7=AD=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- clients/config_client/config_client_test.go | 4 + clients/config_client/config_proxy.go | 2 +- clients/naming_client/naming_client_test.go | 2 +- .../naming_grpc/naming_grpc_proxy.go | 2 +- common/constant/client_config_options.go | 6 + common/constant/config.go | 1 + common/remote/rpc/rpc_client.go | 104 +++++++++++++++++- 7 files changed, 117 insertions(+), 4 deletions(-) diff --git a/clients/config_client/config_client_test.go b/clients/config_client/config_client_test.go index c73c983d..c23cb70d 100644 --- a/clients/config_client/config_client_test.go +++ b/clients/config_client/config_client_test.go @@ -19,6 +19,7 @@ package config_client import ( "context" "errors" + "fmt" "github.com/nacos-group/nacos-sdk-go/v2/util" "testing" @@ -45,6 +46,7 @@ var clientConfigWithOptions = constant.NewClientConfig( constant.WithOpenKMS(true), constant.WithKMSVersion(constant.KMSv1), constant.WithRegionId("cn-hangzhou"), + constant.WithAppConnLabels(map[string]string{"key1": "value1", "key2": "value2", "key3": "value3"}), ) var clientTLsConfigWithOptions = constant.NewClientConfig( @@ -310,6 +312,8 @@ func TestListen(t *testing.T) { DataId: localConfigTest.DataId, Group: localConfigTest.Group, OnChange: func(namespace, group, dataId, data string) { + fmt.Printf("receive content : %s\n", data) + }, }) assert.Nil(t, err) diff --git a/clients/config_client/config_proxy.go b/clients/config_client/config_proxy.go index b3038840..f21a1394 100644 --- a/clients/config_client/config_proxy.go +++ b/clients/config_client/config_proxy.go @@ -166,7 +166,7 @@ func (cp *ConfigProxy) createRpcClient(ctx context.Context, taskId string, clien "taskId": taskId, } - iRpcClient, _ := rpc.CreateClient(ctx, "config-"+taskId+"-"+client.uid, rpc.GRPC, labels, cp.nacosServer, &cp.clientConfig.TLSCfg) + iRpcClient, _ := rpc.CreateClient(ctx, "config-"+taskId+"-"+client.uid, rpc.GRPC, labels, cp.nacosServer, &cp.clientConfig.TLSCfg, cp.clientConfig.AppConnLabels) rpcClient := iRpcClient.GetRpcClient() if rpcClient.IsInitialized() { rpcClient.RegisterServerRequestHandler(func() rpc_request.IRequest { diff --git a/clients/naming_client/naming_client_test.go b/clients/naming_client/naming_client_test.go index 58ac40e6..35656dd7 100644 --- a/clients/naming_client/naming_client_test.go +++ b/clients/naming_client/naming_client_test.go @@ -34,7 +34,7 @@ var clientConfigTest = *constant.NewClientConfig( constant.WithNotLoadCacheAtStart(true), ) -var serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos")) +var serverConfigTest = *constant.NewServerConfig("mse-xxx-p.nacos-ans.mse.aliyuncs.com", 8848, constant.WithContextPath("/nacos")) type MockNamingProxy struct { } diff --git a/clients/naming_client/naming_grpc/naming_grpc_proxy.go b/clients/naming_client/naming_grpc/naming_grpc_proxy.go index 2bf017a7..39fd85f5 100644 --- a/clients/naming_client/naming_grpc/naming_grpc_proxy.go +++ b/clients/naming_client/naming_grpc/naming_grpc_proxy.go @@ -61,7 +61,7 @@ func NewNamingGrpcProxy(ctx context.Context, clientCfg constant.ClientConfig, na constant.LABEL_MODULE: constant.LABEL_MODULE_NAMING, } - iRpcClient, err := rpc.CreateClient(ctx, uid.String(), rpc.GRPC, labels, srvProxy.nacosServer, &clientCfg.TLSCfg) + iRpcClient, err := rpc.CreateClient(ctx, uid.String(), rpc.GRPC, labels, srvProxy.nacosServer, &clientCfg.TLSCfg, clientCfg.AppConnLabels) if err != nil { return nil, err } diff --git a/common/constant/client_config_options.go b/common/constant/client_config_options.go index 50e8cd60..7befcff3 100644 --- a/common/constant/client_config_options.go +++ b/common/constant/client_config_options.go @@ -226,3 +226,9 @@ func WithTLS(tlsCfg TLSConfig) ClientOption { config.TLSCfg = tlsCfg } } + +func WithAppConnLabels(appConnLabels map[string]string) ClientOption { + return func(config *ClientConfig) { + config.AppConnLabels = appConnLabels + } +} diff --git a/common/constant/config.go b/common/constant/config.go index 202234b7..0d5998ed 100644 --- a/common/constant/config.go +++ b/common/constant/config.go @@ -58,6 +58,7 @@ type ClientConfig struct { EndpointContextPath string // the address server endpoint contextPath EndpointQueryParams string // the address server endpoint query params ClusterName string // the address server clusterName + AppConnLabels map[string]string // app conn labels } type ClientLogSamplingConfig struct { diff --git a/common/remote/rpc/rpc_client.go b/common/remote/rpc/rpc_client.go index a1502fc4..cb5e6547 100644 --- a/common/remote/rpc/rpc_client.go +++ b/common/remote/rpc/rpc_client.go @@ -18,8 +18,11 @@ package rpc import ( "context" + "fmt" "math" + "os" "reflect" + "strings" "sync" "sync/atomic" "time" @@ -147,10 +150,11 @@ func getClient(clientName string) IRpcClient { return clientMap[clientName] } -func CreateClient(ctx context.Context, clientName string, connectionType ConnectionType, labels map[string]string, nacosServer *nacos_server.NacosServer, tlsConfig *constant.TLSConfig) (IRpcClient, error) { +func CreateClient(ctx context.Context, clientName string, connectionType ConnectionType, labels map[string]string, nacosServer *nacos_server.NacosServer, tlsConfig *constant.TLSConfig, appConnLabels map[string]string) (IRpcClient, error) { cMux.Lock() defer cMux.Unlock() if _, ok := clientMap[clientName]; !ok { + logger.Infof("init rpc client for name ", clientName) var rpcClient IRpcClient if GRPC == connectionType { rpcClient = NewGrpcClient(ctx, clientName, nacosServer, tlsConfig) @@ -158,6 +162,19 @@ func CreateClient(ctx context.Context, clientName string, connectionType Connect if rpcClient == nil { return nil, errors.New("unsupported connection type") } + + logger.Infof("get app conn labels from client config ", appConnLabels) + appConnLabelsEnv := getAppLabelsFromEnv() + logger.Infof("get app conn labels from env ", appConnLabelsEnv) + + appConnLabelsFinal := mergerAppLabels(appConnLabels, appConnLabelsEnv) + logger.Infof("final app conn labels : ", appConnLabelsFinal) + + appConnLabelsFinal = addPrefixForEachKey(appConnLabelsFinal, "app_") + if appConnLabelsFinal != nil && len(appConnLabelsFinal) != 0 { + rpcClient.putAllLabels(appConnLabelsFinal) + } + rpcClient.putAllLabels(labels) clientMap[clientName] = rpcClient return rpcClient, nil @@ -165,6 +182,91 @@ func CreateClient(ctx context.Context, clientName string, connectionType Connect return clientMap[clientName], nil } +func mergerAppLabels(appLabelsAppointed map[string]string, appLabelsEnv map[string]string) map[string]string { + preferred := strings.ToLower(os.Getenv("nacos_app_conn_labels_preferred")) + + var preferFirst bool + if preferred != "env" { + preferFirst = true + } else { + preferFirst = false + } + return mergeMaps(appLabelsAppointed, appLabelsEnv, preferFirst) +} + +func mergeMaps(map1, map2 map[string]string, preferFirst bool) map[string]string { + result := make(map[string]string) + + for k, v := range map1 { + result[k] = v + } + + for k, v := range map2 { + if preferFirst && map1[k] != "" { + continue + } + result[k] = v + } + + return result +} + +func getAppLabelsFromEnv() map[string]string { + configMap := make(map[string]string) + + // nacos_config_gray_label + grayLabel := os.Getenv("nacos_config_gray_label") + if grayLabel != "" { + configMap["nacos_config_gray_label"] = grayLabel + } + + // nacos_app_conn_labels + connLabels := os.Getenv("nacos_app_conn_labels") + if connLabels != "" { + labelsMap := parseLabels(connLabels) + for k, v := range labelsMap { + configMap[k] = v + } + } + + return configMap +} + +func parseLabels(rawLabels string) map[string]string { + if strings.TrimSpace(rawLabels) == "" { + return make(map[string]string, 2) + } + + resultMap := make(map[string]string, 2) + labels := strings.Split(rawLabels, ",") + for _, label := range labels { + if strings.TrimSpace(label) != "" { + kv := strings.Split(label, "=") + if len(kv) == 2 { + resultMap[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1]) + } else { + fmt.Println("unknown label format:", label) + } + } + } + return resultMap +} + +func addPrefixForEachKey(m map[string]string, prefix string) map[string]string { + if m == nil || len(m) == 0 { + return m + } + + newMap := make(map[string]string, len(m)) + for k, v := range m { + if strings.TrimSpace(k) != "" { + newKey := prefix + k + newMap[newKey] = v + } + } + return newMap +} + func (r *RpcClient) Start() { if ok := atomic.CompareAndSwapInt32((*int32)(&r.rpcClientStatus), (int32)(INITIALIZED), (int32)(STARTING)); !ok { return From 5b64179190ef4f3eb9859d8b4c07a873702a6dbe Mon Sep 17 00:00:00 2001 From: "zunfei.lzf" Date: Fri, 10 May 2024 17:30:46 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E7=94=A8=E4=BE=8B?= =?UTF-8?q?=E7=B1=BB=E5=9B=9E=E9=80=80=E5=88=B0=E4=B9=8B=E5=89=8D=E7=89=88?= =?UTF-8?q?=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- clients/config_client/config_client_test.go | 2 +- clients/naming_client/naming_client_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/config_client/config_client_test.go b/clients/config_client/config_client_test.go index c23cb70d..9c57909a 100644 --- a/clients/config_client/config_client_test.go +++ b/clients/config_client/config_client_test.go @@ -35,7 +35,7 @@ import ( "github.com/stretchr/testify/assert" ) -var serverConfigWithOptions = constant.NewServerConfig("mse-xxx-p.nacos-ans.mse.aliyuncs.com", 8848) +var serverConfigWithOptions = constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos")) var clientConfigWithOptions = constant.NewClientConfig( constant.WithTimeoutMs(10*1000), diff --git a/clients/naming_client/naming_client_test.go b/clients/naming_client/naming_client_test.go index 35656dd7..58ac40e6 100644 --- a/clients/naming_client/naming_client_test.go +++ b/clients/naming_client/naming_client_test.go @@ -34,7 +34,7 @@ var clientConfigTest = *constant.NewClientConfig( constant.WithNotLoadCacheAtStart(true), ) -var serverConfigTest = *constant.NewServerConfig("mse-xxx-p.nacos-ans.mse.aliyuncs.com", 8848, constant.WithContextPath("/nacos")) +var serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos")) type MockNamingProxy struct { } From f0cb0dd42eeac777c7c4489587744aa57707a99f Mon Sep 17 00:00:00 2001 From: shiyiyue1102 Date: Fri, 10 May 2024 17:45:28 +0800 Subject: [PATCH 3/4] fix cr problem --- clients/config_client/config_client_test.go | 2 +- clients/naming_client/naming_client_test.go | 2 +- common/remote/rpc/rpc_client.go | 17 +++++++++-------- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/clients/config_client/config_client_test.go b/clients/config_client/config_client_test.go index c23cb70d..c7cd37a4 100644 --- a/clients/config_client/config_client_test.go +++ b/clients/config_client/config_client_test.go @@ -35,7 +35,7 @@ import ( "github.com/stretchr/testify/assert" ) -var serverConfigWithOptions = constant.NewServerConfig("mse-xxx-p.nacos-ans.mse.aliyuncs.com", 8848) +var serverConfigWithOptions = constant.NewServerConfig("127.0.0.1", 80) var clientConfigWithOptions = constant.NewClientConfig( constant.WithTimeoutMs(10*1000), diff --git a/clients/naming_client/naming_client_test.go b/clients/naming_client/naming_client_test.go index 35656dd7..58ac40e6 100644 --- a/clients/naming_client/naming_client_test.go +++ b/clients/naming_client/naming_client_test.go @@ -34,7 +34,7 @@ var clientConfigTest = *constant.NewClientConfig( constant.WithNotLoadCacheAtStart(true), ) -var serverConfigTest = *constant.NewServerConfig("mse-xxx-p.nacos-ans.mse.aliyuncs.com", 8848, constant.WithContextPath("/nacos")) +var serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos")) type MockNamingProxy struct { } diff --git a/common/remote/rpc/rpc_client.go b/common/remote/rpc/rpc_client.go index cb5e6547..4e7cae45 100644 --- a/common/remote/rpc/rpc_client.go +++ b/common/remote/rpc/rpc_client.go @@ -163,15 +163,15 @@ func CreateClient(ctx context.Context, clientName string, connectionType Connect return nil, errors.New("unsupported connection type") } - logger.Infof("get app conn labels from client config ", appConnLabels) + logger.Infof("get app conn labels from client config %s ", appConnLabels) appConnLabelsEnv := getAppLabelsFromEnv() - logger.Infof("get app conn labels from env ", appConnLabelsEnv) + logger.Infof("get app conn labels from env %s ", appConnLabelsEnv) appConnLabelsFinal := mergerAppLabels(appConnLabels, appConnLabelsEnv) - logger.Infof("final app conn labels : ", appConnLabelsFinal) + logger.Infof("final app conn labels : %s ", appConnLabelsFinal) appConnLabelsFinal = addPrefixForEachKey(appConnLabelsFinal, "app_") - if appConnLabelsFinal != nil && len(appConnLabelsFinal) != 0 { + if len(appConnLabelsFinal) != 0 { rpcClient.putAllLabels(appConnLabelsFinal) } @@ -195,14 +195,15 @@ func mergerAppLabels(appLabelsAppointed map[string]string, appLabelsEnv map[stri } func mergeMaps(map1, map2 map[string]string, preferFirst bool) map[string]string { - result := make(map[string]string) + result := make(map[string]string, 8) for k, v := range map1 { result[k] = v } for k, v := range map2 { - if preferFirst && map1[k] != "" { + _, ok := map1[k] + if preferFirst && ok { continue } result[k] = v @@ -212,7 +213,7 @@ func mergeMaps(map1, map2 map[string]string, preferFirst bool) map[string]string } func getAppLabelsFromEnv() map[string]string { - configMap := make(map[string]string) + configMap := make(map[string]string, 8) // nacos_config_gray_label grayLabel := os.Getenv("nacos_config_gray_label") @@ -253,7 +254,7 @@ func parseLabels(rawLabels string) map[string]string { } func addPrefixForEachKey(m map[string]string, prefix string) map[string]string { - if m == nil || len(m) == 0 { + if len(m) == 0 { return m } From a8200637e2e6f32e43642d9a2110e1cace17e28e Mon Sep 17 00:00:00 2001 From: shiyiyue1102 Date: Fri, 10 May 2024 17:51:24 +0800 Subject: [PATCH 4/4] =?UTF-8?q?Revert=20"=E6=B5=8B=E8=AF=95=E7=94=A8?= =?UTF-8?q?=E4=BE=8B=E7=B1=BB=E5=9B=9E=E9=80=80=E5=88=B0=E4=B9=8B=E5=89=8D?= =?UTF-8?q?=E7=89=88=E6=9C=AC"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 5b641791 --- clients/naming_client/naming_client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/naming_client/naming_client_test.go b/clients/naming_client/naming_client_test.go index 58ac40e6..35656dd7 100644 --- a/clients/naming_client/naming_client_test.go +++ b/clients/naming_client/naming_client_test.go @@ -34,7 +34,7 @@ var clientConfigTest = *constant.NewClientConfig( constant.WithNotLoadCacheAtStart(true), ) -var serverConfigTest = *constant.NewServerConfig("127.0.0.1", 80, constant.WithContextPath("/nacos")) +var serverConfigTest = *constant.NewServerConfig("mse-xxx-p.nacos-ans.mse.aliyuncs.com", 8848, constant.WithContextPath("/nacos")) type MockNamingProxy struct { }