Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events can be published in parallel #302

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
## 2.1.5
- Events can be published in parallel.

## 2.1.2
- Added external parameters to send username, password and/or uri to connect to Messagebus.

## 2.1.1
- Implemented the changes to log the eventId and HTTPStatus while the level is INFO.
- Implemented the changes to print the user information while the log level is INFO.
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
<version>2.0.12</version>
</parent>
<properties>
<eiffel-remrem-publish.version>2.1.4</eiffel-remrem-publish.version>
<eiffel-remrem-semantics.version>2.3.0</eiffel-remrem-semantics.version>
<eiffel-remrem-publish.version>2.1.5</eiffel-remrem-publish.version>
<eiffel-remrem-semantics.version>2.4.0</eiffel-remrem-semantics.version>
</properties>
<artifactId>eiffel-remrem-publish</artifactId>
<version>${eiffel-remrem-publish.version}</version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,15 @@ public RemRemPublishException(String message, RMQBeanConnectionFactory factory,
Throwable cause) {
super(message + factory.getHost() + ":" + factory.getPort(), cause);
}

@Override
public String getMessage() {
String message = super.getMessage();
Throwable cause = getCause();
if (cause != null) {
message += "; root cause: '" + cause.getMessage() + "'";
}

return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void init() {
//The exception can be safely handled here as there is a check for existence of exchange is done before each publish.
checkAndCreateExchangeIfNeeded();
} catch (RemRemPublishException e) {
log.error("Error occured while setting up the RabbitMq Connection. "+e.getMessage());
log.error("Error occurred while setting up the RabbitMq Connection. "+e.getMessage());
e.printStackTrace();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.cfg.JsonNodeFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.spi.json.JacksonJsonNodeJsonProvider;
Expand All @@ -44,19 +48,20 @@ public class EventTemplateHandler {

private static final String REGEXP_END_DIGITS = "\\[\\d+\\]$";

private final Configuration configuration = Configuration.builder()
.jsonProvider(new JacksonJsonNodeJsonProvider())
.mappingProvider(new JacksonMappingProvider())
.build();
private ObjectMapper mapper = JsonMapper.builder()
.disable(JsonNodeFeature.READ_NULL_PROPERTIES)
.build()
.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
private Configuration configuration = Configuration.builder()
.jsonProvider(new JacksonJsonNodeJsonProvider(mapper))
.mappingProvider(new JacksonMappingProvider(mapper))
.build();

// eventTemplateParser
@JsonInclude(JsonInclude.Include.NON_NULL)
public JsonNode eventTemplateParser(String jsonData , String eventName){
JsonNode updatedJson = null;
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
mapper.setSerializationInclusion(JsonInclude.Include.NON_ABSENT);
mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);

JsonNode rootNode = null;
try {
String eventTemplate = accessFileInSemanticJar(EVENT_TEMPLATE_PATH + eventName.toLowerCase() + ".json");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,4 @@ public interface MessageService {
* Does the cleanup like closing open connections
*/
public void cleanUp();

/**
* Implemented Status code for the response
*/
public HttpStatus getHttpStatus();

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.ericsson.eiffel.remrem.publish.service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -49,11 +50,6 @@

Logger log = (Logger) LoggerFactory.getLogger(MessageServiceRMQImpl.class);

/*Variables handles status codes*/
List<Integer> statusCodes;
List<JsonElement> errorItems;
List<PublishResultItem> resultList;
boolean checkEventStatus;
/*
* (non-Javadoc)
* @see com.ericsson.eiffel.remrem.publish.service.MessageService#send(java.util.Map, java.util.Map)
Expand All @@ -74,7 +70,6 @@ public SendResult send(Map<String, String> routingKeyMap, Map<String, String> ms
} else {
event = new PublishResultItem(entryKey, HttpStatus.INTERNAL_SERVER_ERROR.value(), PropertiesConfig.SERVER_DOWN,
PropertiesConfig.SERVER_DOWN_MESSAGE);
checkEventStatus = false;
}
} catch (NackException e) {
event = new PublishResultItem(entryKey, HttpStatus.INTERNAL_SERVER_ERROR.value(), PropertiesConfig.SERVER_DOWN,
Expand Down Expand Up @@ -105,45 +100,43 @@ public SendResult send(Map<String, String> routingKeyMap, Map<String, String> ms
*/
@Override
public SendResult send(String jsonContent, MsgService msgService, String userDomainSuffix, String tag, String routingKey) {

JsonParser parser = new JsonParser();
try {
JsonElement json = parser.parse(jsonContent);
JsonElement json = JsonParser.parseString(jsonContent);
if (json.isJsonArray()) {
return send(json, msgService, userDomainSuffix, tag, routingKey);
} else {
Map<String, String> map = new HashMap<>();
Map<String, String> routingKeyMap = new HashMap<>();
String eventId = msgService.getEventId(json.getAsJsonObject());
if (StringUtils.isNotBlank(eventId)) {
String routing_key = PublishUtils.getRoutingKey(msgService, json.getAsJsonObject(), rmqHelper, userDomainSuffix, tag, routingKey);
if (StringUtils.isNotBlank(routing_key)) {
map.put(eventId, json.toString());
routingKeyMap.put(eventId, routing_key);
} else if (routing_key == null) {
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
routingKeyGenerationFailure(resultItemList);
return new SendResult(resultItemList);
} else {
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
resultItemList.add(resultItem);
return new SendResult(resultItemList);
}
}

Map<String, String> map = new HashMap<>();
Map<String, String> routingKeyMap = new HashMap<>();
String eventId = msgService.getEventId(json.getAsJsonObject());
if (StringUtils.isNotBlank(eventId)) {
String routing_key = PublishUtils.getRoutingKey(msgService, json.getAsJsonObject(), rmqHelper, userDomainSuffix, tag, routingKey);
if (StringUtils.isNotBlank(routing_key)) {
map.put(eventId, json.toString());
routingKeyMap.put(eventId, routing_key);
} else if (routing_key == null) {
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
routingKeyGenerationFailure(resultItemList);
return new SendResult(resultItemList);
} else {
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
createFailureResult(resultItemList);
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
resultItemList.add(resultItem);
return new SendResult(resultItemList);
}
return send(routingKeyMap, map, msgService);
} else {
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
createFailureResult(resultItemList);
return new SendResult(resultItemList);
}
return send(routingKeyMap, map, msgService);
} catch (final JsonSyntaxException e) {
String resultMsg = "Could not parse JSON.";
if (e.getCause() != null) {
resultMsg = resultMsg + " Cause: " + e.getCause().getMessage();
}
log.error(resultMsg, e.getMessage());
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
List<PublishResultItem> resultItemList = new ArrayList<>();
createFailureResult(resultItemList);
return new SendResult(resultItemList);
}
Expand All @@ -155,76 +148,64 @@ public SendResult send(String jsonContent, MsgService msgService, String userDom
*/
@Override
public SendResult send(JsonElement json, MsgService msgService, String userDomainSuffix, String tag, String routingKey) {

List<PublishResultItem> resultList;
boolean checkEventStatus;

Map<String, String> map = new HashMap<>();
Map<String, String> routingKeyMap = new HashMap<>();

SendResult result;
resultList = new CopyOnWriteArrayList<PublishResultItem>();
resultList = new ArrayList<>();
if (json == null) {
createFailureResult(resultList);
}
if (json.isJsonArray()) {
statusCodes = new CopyOnWriteArrayList<Integer>();
checkEventStatus = true;
JsonArray bodyJson = json.getAsJsonArray();
for (JsonElement obj : bodyJson) {
String eventId = msgService.getEventId(obj.getAsJsonObject());
if (StringUtils.isNotEmpty(eventId) && checkEventStatus) {
String routing_key = getAndCheckEvent(msgService, map, resultList, obj, routingKeyMap,
userDomainSuffix, tag, routingKey);
if (StringUtils.isNotBlank(routing_key)) {
result = send(obj.toString(), msgService, userDomainSuffix, tag, routing_key);
resultList.addAll(result.getEvents());
int statusCode = result.getEvents().get(0).getStatusCode();
if (!statusCodes.contains(statusCode))
statusCodes.add(statusCode);
} else if (routing_key == null) {
routingKeyGenerationFailure(resultList);
errorItems = new CopyOnWriteArrayList<JsonElement>();
int statusCode = resultList.get(0).getStatusCode();
statusCodes.add(statusCode);
errorItems.add(obj);
checkEventStatus = false;
} else {
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
resultList.add(resultItem);
int statusCode = resultItem.getStatusCode();
statusCodes.add(statusCode);
break;
}
} else {
if (!checkEventStatus) {
addUnsuccessfulResultItem(obj);
int statusCode = resultList.get(0).getStatusCode();
statusCodes.add(statusCode);
else {
if (json.isJsonArray()) {
checkEventStatus = true;
JsonArray bodyJson = json.getAsJsonArray();
for (JsonElement obj : bodyJson) {
String eventId = msgService.getEventId(obj.getAsJsonObject());
if (StringUtils.isNotEmpty(eventId) && checkEventStatus) {
String routing_key = getAndCheckEvent(msgService, map, resultList, obj, routingKeyMap,
userDomainSuffix, tag, routingKey);
if (StringUtils.isNotBlank(routing_key)) {
result = send(obj.toString(), msgService, userDomainSuffix, tag, routing_key);
resultList.addAll(result.getEvents());
} else if (routing_key == null) {
routingKeyGenerationFailure(resultList);
checkEventStatus = false;
} else {
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
resultList.add(resultItem);
break;
}
} else {
createFailureResult(resultList);
errorItems = new CopyOnWriteArrayList<JsonElement>();
int statusCode = resultList.get(0).getStatusCode();
statusCodes.add(statusCode);
errorItems.add(obj);
checkEventStatus = false;
if (!checkEventStatus) {
addUnsuccessfulResultItem(resultList, obj);
} else {
createFailureResult(resultList);
checkEventStatus = false;
}
}
}
} else {
result = send(json.toString(), msgService, userDomainSuffix, tag, routingKey);
resultList.addAll(result.getEvents());
}
} else {
statusCodes = new CopyOnWriteArrayList<Integer>();
result = send(json.toString(), msgService, userDomainSuffix, tag, routingKey);
resultList.addAll(result.getEvents());
int statusCode = result.getEvents().get(0).getStatusCode();
if (!statusCodes.contains(statusCode))
statusCodes.add(statusCode);
}

result = new SendResult();
result.setEvents(resultList);
return result;
}

private String sendMessage(String routingKey, String msg, MsgService msgService) throws IOException,NackException, TimeoutException, RemRemPublishException {
private String sendMessage(String routingKey, String msg, MsgService msgService) throws IOException,TimeoutException, RemRemPublishException {
String resultMsg = PropertiesConfig.SUCCESS;
try {
instantiateRmqHelper();
} catch (RemRemPublishException e) {
log.error("RemRemPublishException occurred::" + e.getMessage());
log.error("RemRemPublishException occurred::{}", e.getMessage());
}
rmqHelper.send(routingKey, msg, msgService);
return resultMsg;
Expand Down Expand Up @@ -268,7 +249,7 @@ private String getAndCheckEvent(MsgService msgService, Map<String, String> map,

/**
* Method returns result for the failure event.
* @param events for list the eiffel events results
* @param resultItemList for list the eiffel events results
*/
private void createFailureResult(List<PublishResultItem> resultItemList) {
PublishResultItem resultItem = new PublishResultItem(null, 400, PropertiesConfig.INVALID_MESSAGE,
Expand All @@ -293,21 +274,9 @@ private void routingKeyGenerationFailure(List<PublishResultItem> resultItemList)
resultItemList.add(resultItem);
}

private void addUnsuccessfulResultItem(JsonElement obj) {
private void addUnsuccessfulResultItem(List<PublishResultItem> resultList, JsonElement obj) {
PublishResultItem event = new PublishResultItem(null, 503, PropertiesConfig.SERVICE_UNAVAILABLE,
PropertiesConfig.UNSUCCESSFUL_EVENT_CONTENT);
resultList.add(event);
}

/**
* Method returns the Http response code for the events.
*/
public HttpStatus getHttpStatus() {
if (statusCodes.size() > 1) {
return HttpStatus.MULTI_STATUS;
} else {
return HttpStatus.valueOf(statusCodes.get(0));

}
}
}
Loading