diff --git a/registry/registry-nacos/src/main/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistry.java b/registry/registry-nacos/src/main/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistry.java index 7161a8702..98dec21bc 100644 --- a/registry/registry-nacos/src/main/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistry.java +++ b/registry/registry-nacos/src/main/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistry.java @@ -20,12 +20,12 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.NamingFactory; import com.alibaba.nacos.api.naming.NamingService; -import com.alibaba.nacos.api.naming.listener.Event; import com.alibaba.nacos.api.naming.listener.EventListener; import com.alibaba.nacos.api.naming.listener.NamingEvent; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alipay.sofa.rpc.client.ProviderGroup; import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.annotation.VisibleForTesting; import com.alipay.sofa.rpc.common.utils.CommonUtils; import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.config.ConsumerConfig; @@ -146,6 +146,10 @@ public synchronized void init() { nacosConfig.putAll(parameters); } + if (providerObserver == null) { + providerObserver = new NacosRegistryProviderObserver(); + } + try { namingService = NamingFactory.createNamingService(nacosConfig); } catch (NacosException e) { @@ -272,26 +276,19 @@ public List subscribe(final ConsumerConfig config) { } try { - if (providerObserver == null) { - providerObserver = new NacosRegistryProviderObserver(); - } - ProviderInfoListener providerInfoListener = config.getProviderInfoListener(); providerObserver.addProviderListener(config, providerInfoListener); - EventListener eventListener = new EventListener() { - @Override - public void onEvent(Event event) { - if (event instanceof NamingEvent) { - NamingEvent namingEvent = (NamingEvent) event; - List instances = namingEvent.getInstances(); - // avoid npe - if (null == instances) { - instances = new ArrayList(); - } - instances.removeIf(i -> !i.isEnabled()); - providerObserver.updateProviders(config, instances); + EventListener eventListener = event -> { + if (event instanceof NamingEvent) { + NamingEvent namingEvent = (NamingEvent) event; + List instances = namingEvent.getInstances(); + // avoid npe + if (null == instances) { + instances = new ArrayList(); } + instances.removeIf(i -> !i.isEnabled()); + providerObserver.updateProviders(config, instances); } }; namingService.subscribe(serviceName, defaultCluster, eventListener); @@ -359,4 +356,14 @@ public void destroy() { public Properties getNacosConfig() { return nacosConfig; } + + /** + * UT only + * + * @return + */ + @VisibleForTesting + public NacosRegistryProviderObserver getProviderObserver() { + return providerObserver; + } } diff --git a/registry/registry-nacos/src/test/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistryTest.java b/registry/registry-nacos/src/test/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistryTest.java index 373af0eb6..dc6e7f114 100644 --- a/registry/registry-nacos/src/test/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistryTest.java +++ b/registry/registry-nacos/src/test/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistryTest.java @@ -18,6 +18,7 @@ import com.alipay.sofa.rpc.client.ProviderGroup; import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.struct.ConcurrentHashSet; import com.alipay.sofa.rpc.config.ApplicationConfig; import com.alipay.sofa.rpc.config.ConsumerConfig; import com.alipay.sofa.rpc.config.ProviderConfig; @@ -36,9 +37,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** @@ -66,8 +70,6 @@ public void setUp() { .setRegister(true); registry = (NacosRegistry) RegistryFactory.getRegistry(registryConfig); - registry.init(); - Assert.assertTrue(registry.start()); } /** @@ -77,7 +79,30 @@ public void setUp() { public void tearDown() { registry.destroy(); registry = null; - serverConfig.destroy(); + } + + @Test + public void testMuiltInit() throws InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(10); + final CountDownLatch latch = new CountDownLatch(10); + Set sets = new ConcurrentHashSet<>(); + + for (int i = 0; i < 10; i++) { + executorService.submit(() -> { + try { + registry.init(); + NacosRegistryProviderObserver providerObserver = registry.getProviderObserver(); + sets.add(providerObserver); + } finally { + latch.countDown(); + } + }); + } + + latch.await(); + executorService.shutdown(); + + Assert.assertEquals(1, sets.size()); } /** @@ -87,6 +112,8 @@ public void tearDown() { */ @Test public void testProviderObserver() throws Exception { + registry.init(); + Assert.assertTrue(registry.start()); int timeoutPerSub = 2000; //wait nacos startup ok @@ -227,6 +254,7 @@ public void testProviderObserver() throws Exception { List consumerConfigList = new ArrayList<>(); consumerConfigList.add(consumer2); registry.batchUnSubscribe(consumerConfigList); + serverConfig.destroy(); } /** @@ -236,6 +264,8 @@ public void testProviderObserver() throws Exception { */ @Test public void testVirtualHostAndVirtualPort() throws Exception { + registry.init(); + Assert.assertTrue(registry.start()); //wait nacos startup ok TimeUnit.SECONDS.sleep(10); // 模拟的场景 client -> proxy:127.7.7.7:8888 -> netty:0.0.0.0:12200 @@ -297,6 +327,7 @@ public void testVirtualHostAndVirtualPort() throws Exception { virtualHost + ":" + virtualPort); Assert.assertEquals("The provider's host should be virtualHost", virtualHost, pri.getHost()); Assert.assertEquals("The provider's port should be virtualPort", virtualPort, pri.getPort()); + serverConfig.destroy(); } private static class MockProviderInfoListener implements ProviderInfoListener {