From 7392e22b6b0d7c37ec486e5819307895999204d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=88=E9=93=AD?= Date: Tue, 6 Feb 2024 14:33:09 +0800 Subject: [PATCH] support sofa registry kubernetes --- .../sofa/rpc/config/RegistryConfig.java | 8 +- .../kubernetes/KubernetesRegistry.java | 16 + .../constant/KubernetesClientConstants.java | 6 +- .../util/KubernetesConfigUtils.java | 6 +- .../kubernetes/KubernetesRegistryTest.java | 319 ++++++++++++++++++ .../rpc/registry/kubernetes/TestService.java | 22 ++ .../registry/kubernetes/TestServiceImpl.java | 25 ++ .../org.mockito.plugins.MockMaker | 1 + 8 files changed, 394 insertions(+), 9 deletions(-) create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java create mode 100644 registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java b/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java index 9f9408077..93637858c 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java @@ -405,7 +405,7 @@ public String getParameter(String key) { * @return the value */ public String getParameter(String key, String defaultValue) { - return parameters == null ? defaultValue : parameters.get(key); + return (parameters == null || parameters.get(key) == null) ? defaultValue : parameters.get(key); } /** @@ -415,7 +415,8 @@ public String getParameter(String key, String defaultValue) { * @return the value */ public int getParameter(String key, int defaultValue) { - return parameters == null ? defaultValue : Integer.parseInt(parameters.get(key)); + return (parameters == null || parameters.get(key) == null) ? defaultValue : Integer.parseInt(parameters + .get(key)); } /** @@ -425,7 +426,8 @@ public int getParameter(String key, int defaultValue) { * @return the value */ public boolean getParameter(String key, boolean defaultValue) { - return parameters == null ? defaultValue : Boolean.parseBoolean(parameters.get(key)); + return (parameters == null || parameters.get(key) == null) ? defaultValue : Boolean.parseBoolean(parameters + .get(key)); } @Override diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java index 2307c5b1f..fe5d11afa 100644 --- a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java @@ -256,4 +256,20 @@ private List getPods() { .getItems(); } + /** + * UT used only + */ + @Deprecated + public void setCurrentHostname(String currentHostname) { + this.currentHostname = currentHostname; + } + + /** + * UT used only + */ + @Deprecated + public ConcurrentMap> getConsumerListeners() { + return consumerListeners; + } + } \ No newline at end of file diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java index e6c02dcb0..49f251aa6 100644 --- a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java @@ -20,9 +20,9 @@ public class KubernetesClientConstants { public static final String DEFAULT_MASTER_URL = "https://kubernetes.default.svc"; - public static final String LABEL_KEY = "io.sofa.rpc/label"; + public static final String LABEL_KEY = "com.sofa.rpc/label"; - public static final String ANNOTATION_KEY = "io.sofa.rpc/annotation"; + public static final String ANNOTATION_KEY = "com.sofa.rpc/annotation"; public static final String TRUST_CERTS = "trustCerts"; @@ -64,8 +64,6 @@ public class KubernetesClientConstants { public static final String REQUEST_TIMEOUT = "requestTimeout"; - public static final String ROLLING_TIMEOUT = "rollingTimeout"; - public static final String LOGGING_INTERVAL = "loggingInterval"; public static final String HTTP_PROXY = "httpProxy"; diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/util/KubernetesConfigUtils.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/util/KubernetesConfigUtils.java index 26a0df072..5a1cf9cf0 100644 --- a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/util/KubernetesConfigUtils.java +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/util/KubernetesConfigUtils.java @@ -89,8 +89,10 @@ public static Config buildKubernetesConfig(RegistryConfig registryConfig) { private static String buildMasterUrl(RegistryConfig registryConfig) { String address = registryConfig.getAddress(); - if (StringUtils.isNotBlank(address)) { - return (Boolean.parseBoolean(registryConfig.getParameter(USE_HTTPS, "false")) ? "https://" : "http://") + + if (StringUtils.isNotBlank(address) && address.startsWith("http")) { + return address; + } else if (StringUtils.isNotBlank(address) && !address.startsWith("http")) { + return (Boolean.parseBoolean(registryConfig.getParameter(USE_HTTPS, "true")) ? "https://" : "http://") + address; } return DEFAULT_MASTER_URL; diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java new file mode 100644 index 000000000..e0c08d82f --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +import com.alipay.sofa.rpc.client.ProviderGroup; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.common.utils.JSONUtils; +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.RegistryConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.context.RpcInvokeContext; +import com.alipay.sofa.rpc.context.RpcRunningState; +import com.alipay.sofa.rpc.context.RpcRuntimeContext; +import com.alipay.sofa.rpc.listener.ProviderInfoListener; +import io.fabric8.kubernetes.api.model.Endpoints; +import io.fabric8.kubernetes.api.model.EndpointsBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.ServiceBuilder; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.NamespacedKubernetesClient; +import io.fabric8.kubernetes.client.server.mock.KubernetesServer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.ANNOTATION_KEY; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.LABEL_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class KubernetesRegistryTest { + + private static final String NAMESPACE = "TestNameSpace"; + private static final String POD_NAME = "TestPodName"; + private static final String APP_NAME = "TestAppName"; + private static final String SERVICE_NAME = "TestService"; + + public KubernetesServer mockServer; + + private NamespacedKubernetesClient mockClient; + + private static KubernetesRegistry kubernetesRegistry; + + private static RegistryConfig registryConfig; + + private static ConsumerConfig consumer; + + /** + * Ad before class. + */ + @BeforeClass + public static void adBeforeClass() { + RpcRunningState.setUnitTestMode(true); + } + + /** + * Ad after class. + */ + @AfterClass + public static void adAfterClass() { + RpcRuntimeContext.destroy(); + RpcInternalContext.removeContext(); + RpcInvokeContext.removeContext(); + } + + @Before + public void setup() { + mockServer = new KubernetesServer(false, true); + mockServer.before(); + mockClient = mockServer.getClient().inNamespace(NAMESPACE); + + registryConfig = new RegistryConfig(); + registryConfig.setProtocol("kubernetes"); + registryConfig.setAddress(mockClient.getConfiguration().getMasterUrl()); + // registryConfig.setParameter("trustCerts", "true"); + registryConfig.setParameter("namespace", NAMESPACE); + registryConfig.setParameter("useHttps", "false"); + registryConfig.setParameter("http2Disable", "true"); + + kubernetesRegistry = new KubernetesRegistry(registryConfig); + kubernetesRegistry.init(); + kubernetesRegistry.setCurrentHostname(POD_NAME); + + System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY, "false"); + System.setProperty(Config.KUBERNETES_AUTH_TRYSERVICEACCOUNT_SYSTEM_PROPERTY, "false"); + + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(POD_NAME) + .endMetadata() + .withNewStatus() + .withPodIP("192.168.1.100") + .endStatus() + .build(); + + Service service = new ServiceBuilder() + .withNewMetadata() + .withName(SERVICE_NAME) + .endMetadata() + .withNewSpec() + .endSpec() + .build(); + + Endpoints endPoints = new EndpointsBuilder() + .withNewMetadata() + .withName(SERVICE_NAME) + .endMetadata() + .addNewSubset() + .addNewAddress() + .withIp("ip1") + .withNewTargetRef() + .withUid("uid1") + .withName(POD_NAME) + .endTargetRef() + .endAddress() + .addNewPort("Test", "Test", 12345, "TCP") + .endSubset() + .build(); + + mockClient.pods().inNamespace(NAMESPACE).create(pod); + mockClient.services().inNamespace(NAMESPACE).create(service); + mockClient.endpoints().inNamespace(NAMESPACE).create(endPoints); + + Assert.assertTrue(kubernetesRegistry.start()); + } + + @After + public void cleanup() { + kubernetesRegistry.destroy(); + mockClient.close(); + mockServer.after(); + } + + @Test + public void testAll() throws InterruptedException { + ApplicationConfig applicationConfig = new ApplicationConfig() + .setAppName(APP_NAME); + + ServerConfig serverConfig = new ServerConfig() + .setProtocol("bolt") + .setPort(12200) + .setDaemon(false); + + ProviderConfig providerConfig = new ProviderConfig() + .setApplication(applicationConfig) + .setInterfaceId(TestService.class.getName()) + .setRegistry(registryConfig) + .setRegister(true) + // .setUniqueId("standalone") + .setRef(new TestServiceImpl()) + .setServer(serverConfig); + + // 注册 + kubernetesRegistry.register(providerConfig); + + List items = mockClient.pods().inNamespace(NAMESPACE).withLabel(LABEL_KEY).list().getItems(); + + Assert.assertEquals(1, items.size()); + Pod pod = items.get(0); + String label = pod.getMetadata().getLabels().get(LABEL_KEY); + Assert.assertNotNull(label); + + String annotation = pod.getMetadata().getAnnotations().get(ANNOTATION_KEY); + Assert.assertNotNull(annotation); + ProviderConfig result = JSONUtils.parseObject(annotation, ProviderConfig.class); + RegistryConfig registryConfig = (RegistryConfig) result.getRegistry().get(0); + Assert.assertEquals(registryConfig.getProtocol(), "kubernetes"); + + // 订阅 + consumer = new ConsumerConfig(); + consumer.setInterfaceId("com.alipay.sofa.rpc.registry.kubernetes.TestService") + .setApplication(applicationConfig) + .setProxy("javassist") + .setSubscribe(true) + .setSerialization("java") + .setInvokeType("sync") + .setTimeout(4444); + + CountDownLatch latch = new CountDownLatch(1); + MockProviderInfoListener providerInfoListener = new MockProviderInfoListener(); + providerInfoListener.setCountDownLatch(latch); + consumer.setProviderInfoListener(providerInfoListener); + kubernetesRegistry.subscribe(consumer); + latch.await(5000, TimeUnit.MILLISECONDS); + Map ps = providerInfoListener.getData(); + + Assert.assertTrue(ps.size() > 0); + Assert.assertNotNull(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP)); + Assert.assertTrue(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP).size() > 0); + + // 反订阅 + kubernetesRegistry.unSubscribe(consumer); + Assert.assertEquals(0, kubernetesRegistry.getConsumerListeners().size()); + + // 反注册 + kubernetesRegistry.unRegister(providerConfig); + + List unRegisterItems = mockClient.pods().inNamespace(NAMESPACE).withLabel(LABEL_KEY).list().getItems(); + Assert.assertEquals(0, unRegisterItems.size()); + + List unRegisterItems1 = mockClient.pods().inNamespace(NAMESPACE).list().getItems(); + String unRegisterLabel = unRegisterItems1.get(0).getMetadata().getLabels().get(LABEL_KEY); + Assert.assertNull(unRegisterLabel); + String unRegisterAnnotations = unRegisterItems1.get(0).getMetadata().getAnnotations().get(ANNOTATION_KEY); + Assert.assertNull(unRegisterAnnotations); + + } + + private static class MockProviderInfoListener implements ProviderInfoListener { + + Map providerGroupMap = new HashMap<>(); + + private CountDownLatch countDownLatch; + + public void setCountDownLatch(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public void addProvider(ProviderGroup providerGroup) { + + } + + @Override + public void removeProvider(ProviderGroup providerGroup) { + + } + + @Override + public void updateProviders(ProviderGroup providerGroup) { + + providerGroupMap.put(providerGroup.getName(), providerGroup); + if (countDownLatch != null) { + countDownLatch.countDown(); + countDownLatch = null; + } + } + + @Override + public void updateAllProviders(List providerGroups) { + providerGroupMap.clear(); + + if (providerGroups == null || providerGroups.size() == 0) { + } else { + for (ProviderGroup p : providerGroups) { + providerGroupMap.put(p.getName(), p); + } + + } + } + + public Map getData() { + return providerGroupMap; + } + } + + @Test + public void testUpdatePodAnnotations() { + + // 创建一个新的 Pod + String podName = "test-pod"; + String namespace = "test-namespace"; + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(podName) + .withNamespace(namespace) + .endMetadata() + .build(); + + // 在模拟环境中创建 Pod + pod = mockClient.pods().inNamespace(namespace).create(pod); + assertNotNull(pod); + + // 准备要更新的 annotations + Map annotations = new HashMap<>(); + annotations.put("example.com/annotation", "value"); + + // 更新 Pod 的 annotations + pod = new PodBuilder(pod) + .editMetadata() + .addToAnnotations(annotations) + .endMetadata() + .build(); + + // 在模拟环境中更新 Pod + pod = mockClient.pods().inNamespace(namespace).withName(podName).replace(pod); + + // 获取并验证 annotations 是否已更新 + assertEquals("value", pod.getMetadata().getAnnotations().get("example.com/annotation")); + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java new file mode 100644 index 000000000..8cb791154 --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public interface TestService { + + String sayHello(String str); +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java new file mode 100644 index 000000000..25b7cc7c3 --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public class TestServiceImpl implements TestService { + + @Override + public String sayHello(String str) { + return str; + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 000000000..ca6ee9cea --- /dev/null +++ b/registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file