Skip to content

Commit

Permalink
Optimize dynamic config: integrate Zookeeper & Nacos, support interfa…
Browse files Browse the repository at this point in the history
…ce-level dynamic config (#1430)

* integrate Zookeeper and Nacos as configuration centers

* support dynamic config at the interface level

* Optimize interface-level dynamic config

* Modify dynamic config test

* Add DynamicUrl

* Optimize config update process

* Modify config update process

* Modify ApolloDynamicConfigManagerTest
  • Loading branch information
Narzisss authored Oct 31, 2024
1 parent 8b8fc7d commit 3e317ff
Show file tree
Hide file tree
Showing 31 changed files with 1,793 additions and 40 deletions.
12 changes: 12 additions & 0 deletions all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,16 @@
<artifactId>sofa-rpc-config-apollo</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-config-zk</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-config-nacos</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
Expand Down Expand Up @@ -553,6 +563,8 @@
<include>com.alipay.sofa:sofa-rpc-tracer-opentracing-resteasy</include>
<include>com.alipay.sofa:sofa-rpc-tracer-opentracing-triple</include>
<include>com.alipay.sofa:sofa-rpc-config-apollo</include>
<include>com.alipay.sofa:sofa-rpc-config-zk</include>
<include>com.alipay.sofa:sofa-rpc-config-nacos</include>
<include>com.alipay.sofa:sofa-rpc-doc-swagger</include>
<!-- TODO -->
</includes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alipay.sofa.rpc.client.Cluster;
import com.alipay.sofa.rpc.client.ClusterFactory;
import com.alipay.sofa.rpc.client.ProviderGroup;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.SofaConfigs;
import com.alipay.sofa.rpc.common.SofaOptions;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
Expand All @@ -28,9 +29,12 @@
import com.alipay.sofa.rpc.config.RegistryConfig;
import com.alipay.sofa.rpc.context.RpcRuntimeContext;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
import com.alipay.sofa.rpc.dynamic.ConfigChangedEvent;
import com.alipay.sofa.rpc.dynamic.ConfigChangeType;
import com.alipay.sofa.rpc.dynamic.DynamicConfigKeys;
import com.alipay.sofa.rpc.dynamic.DynamicConfigManager;
import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory;
import com.alipay.sofa.rpc.dynamic.DynamicUrl;
import com.alipay.sofa.rpc.ext.Extension;
import com.alipay.sofa.rpc.invoke.Invoker;
import com.alipay.sofa.rpc.listener.ConfigListener;
Expand All @@ -44,8 +48,10 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -54,6 +60,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static com.alipay.sofa.rpc.common.RpcConstants.REGISTRY_PROTOCOL_DOMAIN;
import static com.alipay.sofa.common.config.SofaConfigs.getOrDefault;

/**
* Default consumer bootstrap.
Expand Down Expand Up @@ -146,7 +153,8 @@ public T refer() {
// build cluster
cluster = ClusterFactory.getCluster(this);
// build listeners
consumerConfig.setConfigListener(buildConfigListener(this));
ConfigListener configListener = buildConfigListener(this);
consumerConfig.setConfigListener(configListener);
consumerConfig.setProviderInfoListener(buildProviderInfoListener(this));
// init cluster
cluster.init();
Expand All @@ -156,13 +164,25 @@ public T refer() {
proxyIns = (T) ProxyFactory.buildProxy(consumerConfig.getProxy(), consumerConfig.getProxyClass(),
proxyInvoker);

//动态配置
//请求级别动态配置参数
final String dynamicAlias = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_ALIAS);
if (StringUtils.isNotBlank(dynamicAlias)) {
final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManager(
consumerConfig.getAppName(), dynamicAlias);
consumerConfig.getAppName(), dynamicAlias);
dynamicManager.initServiceConfiguration(consumerConfig.getInterfaceId());
}

//接口级别动态配置参数
Boolean dynamicConfigRefreshEnable = getOrDefault(DynamicConfigKeys.DYNAMIC_REFRESH_ENABLE);
String configCenterAddress = getOrDefault(DynamicConfigKeys.CONFIG_CENTER_ADDRESS);
if (dynamicConfigRefreshEnable && StringUtils.isNotBlank(configCenterAddress)) {
DynamicUrl dynamicUrl = new DynamicUrl(configCenterAddress);
//启用接口级别动态配置
final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManager(
consumerConfig.getAppName(), dynamicUrl.getProtocol());
dynamicManager.addListener(consumerConfig.getInterfaceId(), configListener);
dynamicManager.initServiceConfiguration(consumerConfig.getInterfaceId(), configListener);
}
} catch (Exception e) {
if (cluster != null) {
cluster.destroy();
Expand Down Expand Up @@ -438,8 +458,47 @@ public void updateAllProviders(List<ProviderGroup> groups) {
*/
private class ConsumerAttributeListener implements ConfigListener {

// 可以动态配置的选项
private final Set<String> supportDynamicConfigKeys = new HashSet<>();
private final Map<String, String> newValueMap = new HashMap<>();

ConsumerAttributeListener() {
supportDynamicConfigKeys.add(RpcConstants.CONFIG_KEY_TIMEOUT);
supportDynamicConfigKeys.add(RpcConstants.CONFIG_KEY_RETRIES);
supportDynamicConfigKeys.add(RpcConstants.CONFIG_KEY_LOADBALANCER);
}

@Override
public void process(ConfigChangedEvent event) {
// 清除上次的动态配置值缓存
consumerConfig.getDynamicConfigValueCache().clear();
// 获取对应配置项的默认值
for (String key : newValueMap.keySet()) {
if (consumerConfig.getConfigValueCache().get(key) != null) {
newValueMap.put(key, String.valueOf(consumerConfig.getConfigValueCache().get(key)));
} else {
newValueMap.put(key, null);
}
}
if (!event.getChangeType().equals(ConfigChangeType.DELETED)) {
// ADDED or MODIFIED
Map<String, String> dynamicValueMap = event.getDynamicValueMap();
for (String key : dynamicValueMap.keySet()) {
String tempKey = key.lastIndexOf(".") == -1 ? key : key.substring(key.lastIndexOf(".") + 1);
if (supportDynamicConfigKeys.contains(tempKey)) {
String value = dynamicValueMap.get(key);
if (StringUtils.isNotBlank(value)) {
consumerConfig.getDynamicConfigValueCache().put(key, value);
newValueMap.put(key, value);
}
}
}
}
attrUpdated(newValueMap);
}

@Override
public void configChanged(Map newValue) {
public void configChanged(Map newValueMap) {

}

Expand All @@ -452,7 +511,7 @@ public synchronized void attrUpdated(Map newValueMap) {
Map<String, String> oldValues = new HashMap<String, String>();
boolean rerefer = false;
try { // 检查是否有变化
// 是否过滤map?
// 是否过滤map?
for (Map.Entry<String, String> entry : newValues.entrySet()) {
String newValue = entry.getValue();
String oldValue = consumerConfig.queryAttribute(entry.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,30 @@
*/
package com.alipay.sofa.rpc.dynamic.apollo;

import com.alipay.sofa.common.config.SofaConfigs;
import com.alipay.sofa.rpc.auth.AuthRuleGroup;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.dynamic.ConfigChangeType;
import com.alipay.sofa.rpc.dynamic.ConfigChangedEvent;
import com.alipay.sofa.rpc.dynamic.DynamicConfigKeyHelper;
import com.alipay.sofa.rpc.dynamic.DynamicConfigKeys;
import com.alipay.sofa.rpc.dynamic.DynamicConfigManager;
import com.alipay.sofa.rpc.dynamic.DynamicHelper;
import com.alipay.sofa.rpc.ext.Extension;
import com.alipay.sofa.rpc.listener.ConfigListener;
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigService;
import com.ctrip.framework.apollo.enums.PropertyChangeType;
import com.ctrip.framework.apollo.model.ConfigChange;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;

/**
* @author bystander
Expand All @@ -34,41 +51,88 @@
@Extension(value = "apollo", override = true)
public class ApolloDynamicConfigManager extends DynamicConfigManager {

private Config config;
private final static Logger LOGGER = LoggerFactory.getLogger(ApolloDynamicConfigManager.class);

private static final String APOLLO_APPID_KEY = "app.id";

private static final String APOLLO_ADDR_KEY = "apollo.meta";

private static final String APOLLO_CLUSTER_KEY = "apollo.cluster";

private static final String APOLLO_PARAM_APPID_KEY = "appId";

private static final String APOLLO_PARAM_CLUSTER_KEY = "cluster";

private static final String APOLLO_PARAM_NAMESPACE_KEY = "namespace";

private static final String APOLLO_PROTOCOL_PREFIX = "http://";

private final ConcurrentMap<String, ApolloListener> watchListenerMap = new ConcurrentHashMap<>();

private final Config config;

protected ApolloDynamicConfigManager(String appName) {
super(appName);
config = ConfigService.getAppConfig();
super(appName, SofaConfigs.getOrCustomDefault(DynamicConfigKeys.CONFIG_CENTER_ADDRESS, ""));
if (getDynamicUrl() != null) {
if (StringUtils.isNotBlank(getDynamicUrl().getParam(APOLLO_PARAM_APPID_KEY))) {
System.setProperty(APOLLO_APPID_KEY, getDynamicUrl().getParam(APOLLO_PARAM_APPID_KEY));
}
if (StringUtils.isNotBlank(getDynamicUrl().getAddress())) {
System.setProperty(APOLLO_ADDR_KEY, APOLLO_PROTOCOL_PREFIX + getDynamicUrl().getAddress());
}
if (StringUtils.isNotBlank(getDynamicUrl().getParam(APOLLO_PARAM_CLUSTER_KEY))) {
System.setProperty(APOLLO_CLUSTER_KEY, getDynamicUrl().getParam(APOLLO_PARAM_CLUSTER_KEY));
}
if (StringUtils.isNotBlank(getDynamicUrl().getParam(APOLLO_PARAM_NAMESPACE_KEY))) {
config = ConfigService.getConfig(getDynamicUrl().getParam(APOLLO_PARAM_NAMESPACE_KEY));
} else {
config = ConfigService.getAppConfig();
}
} else {
config = ConfigService.getAppConfig();
}
}

@Override
public void initServiceConfiguration(String service) {
//TODO not now
// TODO 暂不支持
}

@Override
public void initServiceConfiguration(String service, ConfigListener listener) {
try {
String rawConfig = config.getProperty(service, "");
if (StringUtils.isNotBlank(rawConfig)) {
listener.process(new ConfigChangedEvent(service, rawConfig));
}
} catch (Exception e) {
LOGGER.error("Init service configuration error", e);
}
}

@Override
public String getProviderServiceProperty(String service, String key) {
return config.getProperty(DynamicConfigKeyHelper.buildProviderServiceProKey(service, key),
DynamicHelper.DEFAULT_DYNAMIC_VALUE);
DynamicHelper.DEFAULT_DYNAMIC_VALUE);
}

@Override
public String getConsumerServiceProperty(String service, String key) {
return config.getProperty(DynamicConfigKeyHelper.buildConsumerServiceProKey(service, key),
DynamicHelper.DEFAULT_DYNAMIC_VALUE);
DynamicHelper.DEFAULT_DYNAMIC_VALUE);

}

@Override
public String getProviderMethodProperty(String service, String method, String key) {
return config.getProperty(DynamicConfigKeyHelper.buildProviderMethodProKey(service, method, key),
DynamicHelper.DEFAULT_DYNAMIC_VALUE);
DynamicHelper.DEFAULT_DYNAMIC_VALUE);
}

@Override
public String getConsumerMethodProperty(String service, String method, String key) {
return config.getProperty(DynamicConfigKeyHelper.buildConsumerMethodProKey(service, method, key),
DynamicHelper.DEFAULT_DYNAMIC_VALUE);
DynamicHelper.DEFAULT_DYNAMIC_VALUE);

}

Expand All @@ -77,4 +141,40 @@ public AuthRuleGroup getServiceAuthRule(String service) {
//TODO 暂不支持
return null;
}

@Override
public void addListener(String key, ConfigListener listener) {
ApolloListener apolloListener = watchListenerMap.computeIfAbsent(key, k -> new ApolloListener());
apolloListener.addListener(listener);
config.addChangeListener(apolloListener, Collections.singleton(key));
}

public static class ApolloListener implements ConfigChangeListener {

private final Set<ConfigListener> listeners = new CopyOnWriteArraySet<>();

@Override
public void onChange(com.ctrip.framework.apollo.model.ConfigChangeEvent changeEvent) {
for (String key : changeEvent.changedKeys()) {
ConfigChange change = changeEvent.getChange(key);
ConfigChangedEvent event =
new ConfigChangedEvent(key, change.getNewValue(), getChangeType(change));
listeners.forEach(listener -> listener.process(event));
}
}

private ConfigChangeType getChangeType(ConfigChange change) {
if (change.getChangeType() == PropertyChangeType.DELETED) {
return ConfigChangeType.DELETED;
}
if (change.getChangeType() == PropertyChangeType.ADDED) {
return ConfigChangeType.ADDED;
}
return ConfigChangeType.MODIFIED;
}

void addListener(ConfigListener configListener) {
this.listeners.add(configListener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.alipay.sofa.rpc.dynamic.apollo;

import com.alipay.sofa.rpc.dynamic.DynamicConfigManager;
import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory;
import com.alipay.sofa.rpc.dynamic.DynamicHelper;
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
Expand All @@ -24,10 +26,11 @@

public class ApolloDynamicConfigManagerTest {

private final static Logger logger = LoggerFactory
.getLogger(ApolloDynamicConfigManagerTest.class);
private final static Logger logger = LoggerFactory
.getLogger(ApolloDynamicConfigManagerTest.class);

private ApolloDynamicConfigManager apolloDynamicConfigManager = new ApolloDynamicConfigManager("test");
private DynamicConfigManager apolloDynamicConfigManager = DynamicConfigManagerFactory.getDynamicManager("test",
"apollo");

@Test
public void getProviderServiceProperty() {
Expand All @@ -37,17 +40,19 @@ public void getProviderServiceProperty() {

@Test
public void getConsumerServiceProperty() {
String result = apolloDynamicConfigManager.getConsumerServiceProperty("serviceName", "timeout");
Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result);
}

@Test
public void getProviderMethodProperty() {
String result = apolloDynamicConfigManager.getProviderMethodProperty("serviceName", "methodName", "timeout");
Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result);
}

@Test
public void getConsumerMethodProperty() {
}

@Test
public void getServiceAuthRule() {
String result = apolloDynamicConfigManager.getConsumerMethodProperty("serviceName", "methodName", "timeout");
Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result);
}
}
Loading

0 comments on commit 3e317ff

Please sign in to comment.