Skip to content

Commit

Permalink
Moved routing key generation to protocol (#83)
Browse files Browse the repository at this point in the history
* Moved routing key generation to corresponding protocol

* Added testcase for generating routing key

* Added testcase for routing key

* added space in changelog.md

* Removed validation

* minor changes

* minor changes
  • Loading branch information
Umadevi-Kapu authored and xvinosi-github committed Oct 31, 2017
1 parent 7938297 commit 6dca696
Show file tree
Hide file tree
Showing 15 changed files with 252 additions and 226 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.4.7
- Added rk(Routing key) and tag option in CLI.
- Added routingKey and tag parameter in Service.
- Moved Routing key generation to corresponding protocol.

## 0.4.6
- Uplifted remrem-semantics version to 0.2.9

Expand Down
7 changes: 3 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ subprojects {
apply plugin: 'java'

//Latest version for publish
version = "0.4.6"
version = "0.4.7"

//Declare where to find the dependencies of project here
repositories {
Expand Down Expand Up @@ -110,12 +110,11 @@ subprojects {

//Injectable Message Library and its Implementation
compile 'com.github.Ericsson:eiffel-remrem-shared:0.3.4'
compile 'com.github.Ericsson:eiffel-remrem-protocol-interface:0.0.1'
compile 'com.github.Ericsson:eiffel-remrem-protocol-interface:0.0.5'
//For publishing eiffel2.0 events
compile("com.github.Ericsson:eiffel-remrem-semantics:0.2.9"){
compile("com.github.Ericsson:eiffel-remrem-semantics:0.3.0"){
exclude group: 'org.apache.commons'
}

//Declare the dependency for your favourite test framework you want to use in your tests.
//TestNG is also supported by the Gradle Test task. Just change the
//testCompile dependency to testCompile 'org.testng:testng:6.8.1' and add
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public static void createCLIOptions() {
options.addOption("domain", "domainId", true, "identifies the domain that produces the event");
options.addOption("ud", "user_domain_suffix", true, "user domain suffix");
options.addOption("v", "lists the versions of publish and all loaded protocols");
options.addOption("tag", "tag", true, "tag needs to put in routing key");
options.addOption("rk", "routing_key", true, "routing key of the eiffel message");
contentGroup = createContentGroup();
options.addOptionGroup(contentGroup);
}
Expand Down Expand Up @@ -209,16 +211,28 @@ public static void handleMessageBusOptions() throws HandleMessageBusException {
if (commandLine.hasOption("tls")) {
String tls_ver = commandLine.getOptionValue("tls");
if (tls_ver == null) {
tls_ver = "NULL";
tls_ver = "NULL";
}
String[] validTlsVersions = new String[]{"1", "1.1", "1.2", "default"};
if (!ArrayUtils.contains(validTlsVersions, tls_ver)) {
throw new HandleMessageBusException("Specified TLS version is not valid! Specify a valid TLS version!");
throw new HandleMessageBusException("Specified TLS version is not valid! Specify a valid TLS version!");
}
String key = PropertiesConfig.TLS;
System.setProperty(key, tls_ver);
}

}

if (commandLine.hasOption("tag")) {
String tag = commandLine.getOptionValue("tag");
String key = PropertiesConfig.TAG;
System.setProperty(key, tag);
}

if (commandLine.hasOption("rk")) {
String routingKey = commandLine.getOptionValue("rk");
String key = PropertiesConfig.ROUTING_KEY;
System.setProperty(key, routingKey);
}

String usePersistance = "true";
if (commandLine.hasOption("np")) {
usePersistance = "false";
Expand Down Expand Up @@ -256,6 +270,12 @@ public static void clearSystemProperties() {
System.clearProperty(key);
key = PropertiesConfig.TLS;
System.clearProperty(key);
key = PropertiesConfig.DOMAIN_ID;
System.clearProperty(key);
key = PropertiesConfig.TAG;
System.clearProperty(key);
key = PropertiesConfig.ROUTING_KEY;
System.clearProperty(key);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public class PropertiesConfig {
public static final String TEST_MODE = "com.ericsson.eiffel.remrem.publish.cli.test.mode";
public static final String DEBUG = "Debug";
public static final String DOMAIN_ID = "com.ericsson.eiffel.remrem.publish.domain";

public static final String TAG = "com.ericsson.eiffel.remrem.publish.tag";
public static final String ROUTING_KEY = "com.ericsson.eiffel.remrem.publish.routingkey";

public static final String INVALID_EVENT_CONTENT = "Invalid event content, client need to fix problem in event before submitting again";
public static final String INVALID_MESSAGE = "Bad Request";
Expand All @@ -40,6 +41,7 @@ public class PropertiesConfig {

public static final String SERVER_DOWN = "Internal Server Error";
public static final String SERVER_DOWN_MESSAGE = "Possible to try again later when server is up";
public static final String ROUTING_KEY_NULL_CONTENT = "Could not prepare Routing key";
public static final String UNSUCCESSFUL_EVENT_CONTENT = "Please check previous event and try again later";
public static final String RABBITMQ_PROPERTIES_NOT_FOUND = "RabbitMq properties not found";
public static final String RABBITMQ_PROPERTIES_NOT_FOUND_CONTENT = "RabbitMq properties not configured for the protocol ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public Map<String, RabbitMqProperties> getRabbitMqProperties() {
rabbitMqPropertiesMap.get(protocol).setExchangeName(entry.getValue().toString());
} else if (key.contains("rabbitmq.domainId")) {
rabbitMqPropertiesMap.get(protocol).setDomainId(entry.getValue().toString());
} else if (key.contains("rabbitmq.routingKey")) {
rabbitMqPropertiesMap.get(protocol).setRoutingKey(entry.getValue().toString());
} else if (key.contains("rabbitmq.tag")) {
rabbitMqPropertiesMap.get(protocol).setTag(entry.getValue().toString());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

import com.ericsson.eiffel.remrem.protocol.MsgService;
import com.ericsson.eiffel.remrem.publish.config.PropertiesConfig;
import com.ericsson.eiffel.remrem.semantics.SemanticsService;
import com.google.gson.JsonObject;

import ch.qos.logback.classic.Logger;

public class PublishUtils {

private static final String DOT = ".";
static Logger log = (Logger) LoggerFactory.getLogger(PublishUtils.class);
/**
* Method returns the MsgService based on the mp(message protocol) from the list of MsgService beans.
Expand Down Expand Up @@ -58,33 +56,17 @@ public static MsgService getMessageService(String mp, MsgService msgServices[])
* @param msgService the Messaging service.
* @param json the eiffel event
* @param userDomainSuffix is optional parameter, If user provide this it will add to the domainId.
* @return routing key or returns null if family and type not available.
*/
public static String prepareRoutingKey(MsgService msgService, JsonObject json, RMQHelper rmqHelper,
String userDomainSuffix) {
* @return routing key or returns "" if host, exchange and domainId not available.
*/
public static String getRoutingKey(MsgService msgService, JsonObject json, RMQHelper rmqHelper, String userDomainSuffix) {
String protocol = msgService.getServiceName();
Boolean cliMode = Boolean.getBoolean(PropertiesConfig.CLI_MODE);
RabbitMqProperties rabbitMqProperties = rmqHelper.rabbitMqPropertiesMap.get(protocol);
if (rabbitMqProperties != null && rabbitMqProperties.getDomainId() != null && rabbitMqProperties.getExchangeName() != null && rabbitMqProperties.getHost() != null) {
String domainId = rabbitMqProperties.getDomainId();
String family = msgService.getFamily(json);
String type = msgService.getType(json);
if (StringUtils.isNotEmpty(userDomainSuffix)) {
domainId = domainId + DOT + userDomainSuffix;
}
if (msgService != null && StringUtils.isNotEmpty(family) && StringUtils.isNotEmpty(type)) {
if (msgService instanceof SemanticsService)
return PropertiesConfig.PROTOCOL + DOT + family + DOT + type + DOT + "notag" + DOT + domainId;
else
return family + DOT + type + DOT + "notag" + DOT + domainId;
}
} else {
if (Boolean.getBoolean(PropertiesConfig.CLI_MODE)) {
log.error("RabbitMq properties not configured for protocol " + protocol);
System.exit(-1);
}
return "";
String domainId = rabbitMqProperties.getDomainId();
if (rabbitMqProperties != null && rabbitMqProperties.getExchangeName() != null && rabbitMqProperties.getHost() != null
&& (cliMode || (!cliMode && StringUtils.isNotBlank(domainId)))) {
return StringUtils.defaultIfBlank(rabbitMqProperties.getRoutingKey(), msgService.generateRoutingKey(json, rabbitMqProperties.getTag(), domainId, userDomainSuffix));
}
return null;
return "";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public class RabbitMqProperties {
private String username;
private String password;
private String domainId;
private String tag;
private String routingKey;


private Connection rabbitConnection;
private String protocol;

Expand Down Expand Up @@ -115,6 +119,14 @@ public void setProtocol(String protocol) {
this.protocol = protocol;
}

public String getTag() {
return tag;
}

public void setTag(String tag) {
this.tag = tag;
}

public Connection getRabbitConnection() {
return rabbitConnection;
}
Expand All @@ -123,6 +135,14 @@ public void setRabbitConnection(Connection rabbitConnection) {
this.rabbitConnection = rabbitConnection;
}

public String getRoutingKey() {
return routingKey;
}

public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}

public void init() {
log.info("RabbitMqProperties init ...");
if (Boolean.getBoolean(PropertiesConfig.CLI_MODE)) {
Expand Down Expand Up @@ -219,6 +239,14 @@ private void initService() {
if (password == null) {
password = getValuesFromSystemProperties(protocol + ".rabbitmq.password");
}

if (tag == null) {
tag = getValuesFromSystemProperties(protocol + ".rabbitmq.tag");
}

if (routingKey == null) {
routingKey = getValuesFromSystemProperties(protocol + ".rabbitmq.routingKey");
}
}

private void setValues() {
Expand All @@ -227,6 +255,8 @@ private void setValues() {
domainId = getValuesFromSystemProperties(PropertiesConfig.DOMAIN_ID);
tlsVer = getValuesFromSystemProperties(PropertiesConfig.TLS);
exchangeName = getValuesFromSystemProperties(PropertiesConfig.EXCHANGE_NAME);
tag = getValuesFromSystemProperties(PropertiesConfig.TAG);
routingKey = getValuesFromSystemProperties(PropertiesConfig.ROUTING_KEY);
usePersitance = Boolean.getBoolean(PropertiesConfig.USE_PERSISTENCE);
}

Expand All @@ -238,7 +268,7 @@ private String getValuesFromSystemProperties(String propertyName) {
* This method is used to check mandatory RabbitMQ properties.
*/
private void madatoryParametersCheck() {
if(host == null || exchangeName == null || domainId == null) {
if(host == null || exchangeName == null) {
if (Boolean.getBoolean(PropertiesConfig.CLI_MODE)) {
System.err.println("Mandatory RabbitMq properties missing");
System.exit(-1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -28,6 +29,7 @@
import org.springframework.util.CollectionUtils;

import com.ericsson.eiffel.remrem.protocol.MsgService;
import com.ericsson.eiffel.remrem.protocol.ValidationResult;
import com.ericsson.eiffel.remrem.publish.config.PropertiesConfig;
import com.ericsson.eiffel.remrem.publish.helper.PublishUtils;
import com.ericsson.eiffel.remrem.publish.helper.RMQHelper;
Expand Down Expand Up @@ -91,16 +93,21 @@ public SendResult send(String jsonContent, MsgService msgService, String userDom
Map<String, String> map = new HashMap<>();
Map<String, String> routingKeyMap = new HashMap<>();
String eventId = msgService.getEventId(json.getAsJsonObject());
String routingKey = (eventId != null) ? (PublishUtils.prepareRoutingKey(msgService,
json.getAsJsonObject(), rmqHelper, userDomainSuffix)) : null;
if (eventId != null && routingKey != null && !routingKey.isEmpty()) {
map.put(eventId, json.toString());
routingKeyMap.put(eventId, routingKey);
} else if(routingKey != null && routingKey.isEmpty()) {
List<PublishResultItem> events = new ArrayList<>();
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
events.add(resultItem);
return new SendResult(events);
if (StringUtils.isNotBlank(eventId)) {
String routingKey = PublishUtils.getRoutingKey(msgService, json.getAsJsonObject(), rmqHelper, userDomainSuffix);
if (StringUtils.isNotBlank(routingKey)) {
map.put(eventId, json.toString());
routingKeyMap.put(eventId, routingKey);
} else if (routingKey == null) {
List<PublishResultItem> events = new ArrayList<>();
routingKey(events);
return new SendResult(events);
} else {
List<PublishResultItem> events = new ArrayList<>();
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
events.add(resultItem);
return new SendResult(events);
}
} else {
List<PublishResultItem> events = new ArrayList<>();
createFailureResult(events);
Expand All @@ -119,11 +126,11 @@ public SendResult send(String jsonContent, MsgService msgService, String userDom
return new SendResult(events);
}
}

/*
* (non-Javadoc)
* @see com.ericsson.eiffel.remrem.publish.service.MessageService#send(com.google.gson.JsonElement, com.ericsson.eiffel.remrem.protocol.MsgService, java.lang.String)
*/
*/
@Override
public SendResult send(JsonElement json, MsgService msgService, String userDomainSuffix) {
Map<String, String> map = new HashMap<>();
Expand All @@ -138,25 +145,35 @@ public SendResult send(JsonElement json, MsgService msgService, String userDomai
checkEventStatus = true;
JsonArray bodyJson = json.getAsJsonArray();
for (JsonElement obj : bodyJson) {
String routingKey = getAndCheckEvent(msgService, map, resultList, obj, routingKeyMap, userDomainSuffix);
String eventId = msgService.getEventId(obj.getAsJsonObject());
if (eventId != null && checkEventStatus && (routingKey != null && !routingKey.isEmpty())) {
result = send(obj.toString(), msgService, userDomainSuffix);
resultList.addAll(result.getEvents());
int statusCode = result.getEvents().get(0).getStatusCode();
if (!statusCodes.contains(statusCode))
statusCodes.add(statusCode);
} else {
if (!checkEventStatus) {
addUnsuccessfulResultItem(obj);
if (StringUtils.isNotEmpty(eventId) && checkEventStatus) {
String routingKey = getAndCheckEvent(msgService, map, resultList, obj, routingKeyMap,
userDomainSuffix);
if (StringUtils.isNotBlank(routingKey)) {
result = send(obj.toString(), msgService, userDomainSuffix);
resultList.addAll(result.getEvents());
int statusCode = result.getEvents().get(0).getStatusCode();
if (!statusCodes.contains(statusCode))
statusCodes.add(statusCode);
} else if (routingKey == null) {
routingKey(resultList);
errorItems = new ArrayList<JsonElement>();
int statusCode = resultList.get(0).getStatusCode();
statusCodes.add(statusCode);
} else if(routingKey != null && routingKey.isEmpty()) {
errorItems.add(obj);
checkEventStatus = false;
} else {
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
resultList.add(resultItem);
int statusCode = resultItem.getStatusCode();
statusCodes.add(statusCode);
break;
}
} else {
if (!checkEventStatus) {
addUnsuccessfulResultItem(obj);
int statusCode = resultList.get(0).getStatusCode();
statusCodes.add(statusCode);
} else {
createFailureResult(resultList);
errorItems = new ArrayList<JsonElement>();
Expand All @@ -179,7 +196,7 @@ public SendResult send(JsonElement json, MsgService msgService, String userDomai
result.setEvents(resultList);
return result;
}

private String sendMessage(String routingKey, String msg, MsgService msgService) {
String resultMsg = PropertiesConfig.SUCCESS;
instantiateRmqHelper();
Expand Down Expand Up @@ -207,7 +224,7 @@ private void instantiateRmqHelper() {
log.error(e.getMessage(), e);
}
}

/**
* Method get the eventId from the messaging service. And checks the eventId.
* @param msgService Messaging service.
Expand All @@ -220,7 +237,7 @@ private String getAndCheckEvent(MsgService msgService, Map<String, String> map,
JsonElement obj, Map<String, String> routingKeyMap, String userDomainSuffix) {
String eventId = msgService.getEventId(obj.getAsJsonObject());
String routingKey = (eventId != null)
? (PublishUtils.prepareRoutingKey(msgService, obj.getAsJsonObject(), rmqHelper, userDomainSuffix))
? (PublishUtils.getRoutingKey(msgService, obj.getAsJsonObject(), rmqHelper, userDomainSuffix))
: null;
if (eventId != null && routingKey != null && !routingKey.isEmpty()) {
routingKeyMap.put(eventId, routingKey);
Expand Down Expand Up @@ -250,6 +267,12 @@ private PublishResultItem rabbitmqConfigurationNotFound(MsgService msgService) {
return event;
}

private void routingKey(List<PublishResultItem> events) {
PublishResultItem event = new PublishResultItem(null, 500, PropertiesConfig.SERVER_DOWN,
PropertiesConfig.ROUTING_KEY_NULL_CONTENT);
events.add(event);
}

private void addUnsuccessfulResultItem(JsonElement obj) {
PublishResultItem event = new PublishResultItem(null, 503, PropertiesConfig.SERVICE_UNAVAILABLE,
PropertiesConfig.UNSUCCESSFUL_EVENT_CONTENT);
Expand Down
Loading

0 comments on commit 6dca696

Please sign in to comment.