Skip to content

Commit

Permalink
Mesh Registry support custom group for sofa registry. (#1454)
Browse files Browse the repository at this point in the history
Co-authored-by: liujianjun.ljj <[email protected]>
  • Loading branch information
EvenLjj and liujianjun.ljj authored Oct 14, 2024
1 parent 60433bc commit ffc3b93
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.RegistryConfig;
Expand Down Expand Up @@ -184,6 +185,10 @@ protected PublishServiceRequest buildPublishServiceRequest(String serviceName, S
providerMetaInfo.setVersion(VERSION);
providerMetaInfo.setProperties(providerInfo.getStaticAttrs());
publishServiceRequest.setProviderMetaInfo(providerMetaInfo);
String group = providerInfo.getStaticAttrs().get(SofaRegistryConstants.SOFA_GROUP_KEY);
if (StringUtils.isNotBlank(group)) {
publishServiceRequest.setGroup(group);
}
return publishServiceRequest;
}

Expand Down Expand Up @@ -233,6 +238,10 @@ protected UnPublishServiceRequest buildUnPublishServiceRequest(String serviceNam
UnPublishServiceRequest unPublishServiceRequest = new UnPublishServiceRequest();
unPublishServiceRequest.setServiceName(serviceName);
unPublishServiceRequest.setProtocolType(providerInfo.getProtocolType());
String group = providerInfo.getStaticAttr(SofaRegistryConstants.SOFA_GROUP_KEY);
if (StringUtils.isNotBlank(group)) {
unPublishServiceRequest.setGroup(group);
}
return unPublishServiceRequest;
}

Expand Down Expand Up @@ -303,6 +312,11 @@ protected SubscribeServiceRequest buildSubscribeServiceRequest(ConsumerConfig co
SubscribeServiceRequest subscribeRequest = new SubscribeServiceRequest();
subscribeRequest.setServiceName(key);
subscribeRequest.setProtocolType(consumerConfig.getProtocol());
subscribeRequest.setProperties(consumerConfig.getParameters());
String group = consumerConfig.getParameter(SofaRegistryConstants.SOFA_GROUP_KEY);
if (StringUtils.isNotBlank(group)) {
subscribeRequest.setGroup(group);
}
return subscribeRequest;
}

Expand Down Expand Up @@ -368,6 +382,10 @@ protected UnSubscribeServiceRequest buildUnSubscribeServiceRequest(ConsumerConfi
String key = MeshRegistryHelper.buildMeshKey(config, config.getProtocol());
unsubscribeRequest.setServiceName(key);
unsubscribeRequest.setProtocolType(config.getProtocol());
String group = config.getParameter(SofaRegistryConstants.SOFA_GROUP_KEY);
if (StringUtils.isNotBlank(group)) {
unsubscribeRequest.setGroup(group);
}
return unsubscribeRequest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class PublishServiceRequest {

private boolean onlyPublishInCloud;

private String group;

public String getServiceName() {
return serviceName;
}
Expand Down Expand Up @@ -64,13 +66,22 @@ public void setOnlyPublishInCloud(boolean onlyPublishInCloud) {
this.onlyPublishInCloud = onlyPublishInCloud;
}

public String getGroup() {
return group;
}

public void setGroup(String group) {
this.group = group;
}

@Override
public String toString() {
final StringBuffer sb = new StringBuffer("PublishServiceRequest{");
sb.append("serviceName='").append(serviceName).append('\'');
sb.append(", protocolType='").append(protocolType).append('\'');
sb.append(", providerMetaInfo=").append(providerMetaInfo);
sb.append(", onlyPublishInCloud=").append(onlyPublishInCloud);
sb.append(", group='").append(group).append('\'');
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,31 @@
*/
package com.alipay.sofa.rpc.registry.mesh.model;

import java.util.Map;

/**
* @author bystander
* @version $Id: PublishServiceRequest.java, v 0.1 2018年04月03日 11:27 AM bystander Exp $
*/
public class SubscribeServiceRequest {

private String serviceName;
private String serviceName;

//这个值是类似DEFAULT/XFIRE这种,也有可能是tr
private String protocolType;
private String protocolType;

//this should be xxx-pool.alipay.com or xxx.alipay.com,can be null
private String targetAppAddress;
private String targetAppAddress;

private boolean vipEnforce;

private boolean vipOnly;

private boolean vipEnforce;
private boolean localCloudFirst;

private boolean vipOnly;
private String group;

private boolean localCloudFirst;
private Map<String, String> properties;

public String getServiceName() {
return serviceName;
Expand Down Expand Up @@ -84,6 +90,22 @@ public void setLocalCloudFirst(boolean localCloudFirst) {
this.localCloudFirst = localCloudFirst;
}

public String getGroup() {
return group;
}

public void setGroup(String group) {
this.group = group;
}

public Map<String, String> getProperties() {
return properties;
}

public void setProperties(Map<String, String> properties) {
this.properties = properties;
}

@Override
public String toString() {
final StringBuffer sb = new StringBuffer("SubscribeServiceRequest{");
Expand All @@ -93,6 +115,7 @@ public String toString() {
sb.append(", vipEnforce=").append(vipEnforce);
sb.append(", vipOnly=").append(vipOnly);
sb.append(", localCloudFirst=").append(localCloudFirst);
sb.append(", group='").append(group).append('\'');
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class UnPublishServiceRequest {
//这个值是类似DEFAULT/XFIRE这种,也有可能是tr
private String protocolType;

private String group;

public String getServiceName() {
return serviceName;
}
Expand All @@ -43,6 +45,14 @@ public void setProtocolType(String protocolType) {
this.protocolType = protocolType;
}

public String getGroup() {
return group;
}

public void setGroup(String group) {
this.group = group;
}

@Override
public String toString() {
final StringBuffer sb = new StringBuffer("UnPublishServiceRequest{");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class UnSubscribeServiceRequest {
//这个值是类似DEFAULT/XFIRE这种,也有可能是tr
private String protocolType;

private String group;

public String getServiceName() {
return serviceName;
}
Expand All @@ -54,12 +56,21 @@ public void setProtocolType(String protocolType) {
this.protocolType = protocolType;
}

public String getGroup() {
return group;
}

public void setGroup(String group) {
this.group = group;
}

@Override
public String toString() {
final StringBuffer sb = new StringBuffer("UnSubscribeServiceRequest{");
sb.append("serviceName='").append(serviceName).append('\'');
sb.append(", targetAppAddress='").append(targetAppAddress).append('\'');
sb.append(", protocolType='").append(protocolType).append('\'');
sb.append(", group='").append(group).append('\'');
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public void testOnlyPublish() throws InterruptedException {
public void testAll() throws Exception {

int timeoutPerSub = 1000;
Map<String, String> parameter = new HashMap<>();
parameter.put(SofaRegistryConstants.SOFA_GROUP_KEY, "SOFA_TEST");

ServerConfig serverConfig = new ServerConfig()
.setProtocol("bolt")
Expand All @@ -183,7 +185,8 @@ public void testAll() throws Exception {
.setSerialization("hessian2")
.setServer(serverConfig)
.setWeight(222)
.setTimeout(3000);
.setTimeout(3000)
.setParameters(parameter);

// 注册
registry.register(provider);
Expand All @@ -196,7 +199,8 @@ public void testAll() throws Exception {
.setSubscribe(true)
.setSerialization("java")
.setInvokeType("sync")
.setTimeout(4444);
.setTimeout(4444)
.setParameters(parameter);

String tag0 = MeshRegistryHelper.buildMeshKey(provider, serverConfig.getProtocol());
String tag1 = MeshRegistryHelper.buildMeshKey(consumer, consumer.getProtocol());
Expand All @@ -205,6 +209,7 @@ public void testAll() throws Exception {
PublishServiceRequest publishServiceRequest = registry.buildPublishServiceRequest(tag0,
serverConfig.getProtocol(), providerInfo, "test-server");
Assert.assertEquals(serverConfig.getProtocol(), publishServiceRequest.getProtocolType());
Assert.assertEquals("SOFA_TEST", publishServiceRequest.getGroup());

// 订阅
MeshRegistryTest.MockProviderInfoListener providerInfoListener = new MeshRegistryTest.MockProviderInfoListener();
Expand All @@ -216,6 +221,8 @@ public void testAll() throws Exception {
Assert.assertTrue(ps.toString(), ps.size() == 1);
SubscribeServiceRequest subscribeServiceRequest = registry.buildSubscribeServiceRequest(consumer);
Assert.assertEquals(consumer.getProtocol(), subscribeServiceRequest.getProtocolType());
Assert.assertEquals("SOFA_TEST", subscribeServiceRequest.getGroup());
Assert.assertNotNull(subscribeServiceRequest.getProperties());

// 反注册
CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -226,6 +233,7 @@ public void testAll() throws Exception {
Assert.assertTrue(ps.size() == 1);
UnPublishServiceRequest unPublishServiceRequest = registry.buildUnPublishServiceRequest(tag0, providerInfo);
Assert.assertEquals(serverConfig.getProtocol(), unPublishServiceRequest.getProtocolType());
Assert.assertEquals("SOFA_TEST", unPublishServiceRequest.getGroup());

// 一次发2个端口的再次注册
latch = new CountDownLatch(1);
Expand All @@ -246,7 +254,8 @@ public void testAll() throws Exception {
.setSubscribe(true)
.setSerialization("java")
.setInvokeType("sync")
.setTimeout(4444);
.setTimeout(4444)
.setParameters(parameter);
CountDownLatch latch2 = new CountDownLatch(1);
MeshRegistryTest.MockProviderInfoListener providerInfoListener2 = new MeshRegistryTest.MockProviderInfoListener();
providerInfoListener2.setCountDownLatch(latch2);
Expand All @@ -261,6 +270,7 @@ public void testAll() throws Exception {
registry.unSubscribe(consumer);
UnSubscribeServiceRequest unSubscribeServiceRequest = registry.buildUnSubscribeServiceRequest(consumer);
Assert.assertEquals(consumer.getProtocol(), unSubscribeServiceRequest.getProtocolType());
Assert.assertEquals("SOFA_TEST", unSubscribeServiceRequest.getGroup());

// 批量反注册,判断订阅者2的数据
latch = new CountDownLatch(1);
Expand Down

0 comments on commit ffc3b93

Please sign in to comment.