From 2b11dc919b6360aef02e14cc228c795efe0a454c Mon Sep 17 00:00:00 2001 From: Roman Szturc Date: Mon, 14 Oct 2024 13:11:42 +0200 Subject: [PATCH 1/3] Orizaba is default Eiffel protocol --- pom.xml | 4 ++-- .../publish/service/EventTemplateHandler.java | 23 +++++++++++-------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/pom.xml b/pom.xml index 3f7f473c..166bf550 100644 --- a/pom.xml +++ b/pom.xml @@ -9,8 +9,8 @@ 2.0.12 - 2.1.4 - 2.3.0 + 2.1.5 + 2.4.0 eiffel-remrem-publish ${eiffel-remrem-publish.version} diff --git a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/EventTemplateHandler.java b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/EventTemplateHandler.java index 6bc3006d..cac44a0d 100644 --- a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/EventTemplateHandler.java +++ b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/EventTemplateHandler.java @@ -18,9 +18,13 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.cfg.JsonNodeFeature; +import com.fasterxml.jackson.databind.json.JsonMapper; import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.spi.json.JacksonJsonNodeJsonProvider; @@ -44,19 +48,20 @@ public class EventTemplateHandler { private static final String REGEXP_END_DIGITS = "\\[\\d+\\]$"; - private final Configuration configuration = Configuration.builder() - .jsonProvider(new JacksonJsonNodeJsonProvider()) - .mappingProvider(new JacksonMappingProvider()) - .build(); + private ObjectMapper mapper = JsonMapper.builder() + .disable(JsonNodeFeature.READ_NULL_PROPERTIES) + .build() + .setSerializationInclusion(JsonInclude.Include.NON_EMPTY); + private Configuration configuration = Configuration.builder() + .jsonProvider(new JacksonJsonNodeJsonProvider(mapper)) + .mappingProvider(new JacksonMappingProvider(mapper)) + .build(); // eventTemplateParser + @JsonInclude(JsonInclude.Include.NON_NULL) public JsonNode eventTemplateParser(String jsonData , String eventName){ JsonNode updatedJson = null; - JsonFactory factory = new JsonFactory(); - ObjectMapper mapper = new ObjectMapper(factory); - mapper.setSerializationInclusion(JsonInclude.Include.NON_ABSENT); - mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY); - mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + JsonNode rootNode = null; try { String eventTemplate = accessFileInSemanticJar(EVENT_TEMPLATE_PATH + eventName.toLowerCase() + ".json"); From 44aee91b268a65b82865a34924dbecc6941d89ff Mon Sep 17 00:00:00 2001 From: Roman Szturc Date: Thu, 14 Nov 2024 15:08:11 +0100 Subject: [PATCH 2/3] Events can be published in parallel --- .../exception/RemRemPublishException.java | 11 ++ .../publish/helper/RabbitMqProperties.java | 2 +- .../publish/service/MessageService.java | 6 - .../service/MessageServiceRMQImpl.java | 159 +++++++----------- .../controller/ProducerController.java | 40 ++++- .../EiffelRemremCommonControllerUnitTest.java | 2 +- 6 files changed, 111 insertions(+), 109 deletions(-) diff --git a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/exception/RemRemPublishException.java b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/exception/RemRemPublishException.java index 02b3a723..cfa5c3fd 100644 --- a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/exception/RemRemPublishException.java +++ b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/exception/RemRemPublishException.java @@ -32,4 +32,15 @@ public RemRemPublishException(String message, RMQBeanConnectionFactory factory, Throwable cause) { super(message + factory.getHost() + ":" + factory.getPort(), cause); } + + @Override + public String getMessage() { + String message = super.getMessage(); + Throwable cause = getCause(); + if (cause != null) { + message += "; root cause: '" + cause.getMessage() + "'"; + } + + return message; + } } diff --git a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RabbitMqProperties.java b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RabbitMqProperties.java index a44d4341..63ddbbdf 100644 --- a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RabbitMqProperties.java +++ b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RabbitMqProperties.java @@ -318,7 +318,7 @@ public void init() { //The exception can be safely handled here as there is a check for existence of exchange is done before each publish. checkAndCreateExchangeIfNeeded(); } catch (RemRemPublishException e) { - log.error("Error occured while setting up the RabbitMq Connection. "+e.getMessage()); + log.error("Error occurred while setting up the RabbitMq Connection. "+e.getMessage()); e.printStackTrace(); } diff --git a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageService.java b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageService.java index ff1e139a..7dca58c0 100644 --- a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageService.java +++ b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageService.java @@ -60,10 +60,4 @@ public interface MessageService { * Does the cleanup like closing open connections */ public void cleanUp(); - - /** - * Implemented Status code for the response - */ - public HttpStatus getHttpStatus(); - } diff --git a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageServiceRMQImpl.java b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageServiceRMQImpl.java index 44f905e4..39e85b56 100644 --- a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageServiceRMQImpl.java +++ b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageServiceRMQImpl.java @@ -15,6 +15,7 @@ package com.ericsson.eiffel.remrem.publish.service; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -49,11 +50,6 @@ Logger log = (Logger) LoggerFactory.getLogger(MessageServiceRMQImpl.class); - /*Variables handles status codes*/ - List statusCodes; - List errorItems; - List resultList; - boolean checkEventStatus; /* * (non-Javadoc) * @see com.ericsson.eiffel.remrem.publish.service.MessageService#send(java.util.Map, java.util.Map) @@ -74,7 +70,6 @@ public SendResult send(Map routingKeyMap, Map ms } else { event = new PublishResultItem(entryKey, HttpStatus.INTERNAL_SERVER_ERROR.value(), PropertiesConfig.SERVER_DOWN, PropertiesConfig.SERVER_DOWN_MESSAGE); - checkEventStatus = false; } } catch (NackException e) { event = new PublishResultItem(entryKey, HttpStatus.INTERNAL_SERVER_ERROR.value(), PropertiesConfig.SERVER_DOWN, @@ -105,45 +100,43 @@ public SendResult send(Map routingKeyMap, Map ms */ @Override public SendResult send(String jsonContent, MsgService msgService, String userDomainSuffix, String tag, String routingKey) { - - JsonParser parser = new JsonParser(); try { - JsonElement json = parser.parse(jsonContent); + JsonElement json = JsonParser.parseString(jsonContent); if (json.isJsonArray()) { return send(json, msgService, userDomainSuffix, tag, routingKey); - } else { - Map map = new HashMap<>(); - Map routingKeyMap = new HashMap<>(); - String eventId = msgService.getEventId(json.getAsJsonObject()); - if (StringUtils.isNotBlank(eventId)) { - String routing_key = PublishUtils.getRoutingKey(msgService, json.getAsJsonObject(), rmqHelper, userDomainSuffix, tag, routingKey); - if (StringUtils.isNotBlank(routing_key)) { - map.put(eventId, json.toString()); - routingKeyMap.put(eventId, routing_key); - } else if (routing_key == null) { - List resultItemList = new CopyOnWriteArrayList<>(); - routingKeyGenerationFailure(resultItemList); - return new SendResult(resultItemList); - } else { - List resultItemList = new CopyOnWriteArrayList<>(); - PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService); - resultItemList.add(resultItem); - return new SendResult(resultItemList); - } + } + + Map map = new HashMap<>(); + Map routingKeyMap = new HashMap<>(); + String eventId = msgService.getEventId(json.getAsJsonObject()); + if (StringUtils.isNotBlank(eventId)) { + String routing_key = PublishUtils.getRoutingKey(msgService, json.getAsJsonObject(), rmqHelper, userDomainSuffix, tag, routingKey); + if (StringUtils.isNotBlank(routing_key)) { + map.put(eventId, json.toString()); + routingKeyMap.put(eventId, routing_key); + } else if (routing_key == null) { + List resultItemList = new CopyOnWriteArrayList<>(); + routingKeyGenerationFailure(resultItemList); + return new SendResult(resultItemList); } else { List resultItemList = new CopyOnWriteArrayList<>(); - createFailureResult(resultItemList); + PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService); + resultItemList.add(resultItem); return new SendResult(resultItemList); } - return send(routingKeyMap, map, msgService); + } else { + List resultItemList = new CopyOnWriteArrayList<>(); + createFailureResult(resultItemList); + return new SendResult(resultItemList); } + return send(routingKeyMap, map, msgService); } catch (final JsonSyntaxException e) { String resultMsg = "Could not parse JSON."; if (e.getCause() != null) { resultMsg = resultMsg + " Cause: " + e.getCause().getMessage(); } log.error(resultMsg, e.getMessage()); - List resultItemList = new CopyOnWriteArrayList<>(); + List resultItemList = new ArrayList<>(); createFailureResult(resultItemList); return new SendResult(resultItemList); } @@ -155,76 +148,64 @@ public SendResult send(String jsonContent, MsgService msgService, String userDom */ @Override public SendResult send(JsonElement json, MsgService msgService, String userDomainSuffix, String tag, String routingKey) { + + List resultList; + boolean checkEventStatus; + Map map = new HashMap<>(); Map routingKeyMap = new HashMap<>(); + SendResult result; - resultList = new CopyOnWriteArrayList(); + resultList = new ArrayList<>(); if (json == null) { createFailureResult(resultList); } - if (json.isJsonArray()) { - statusCodes = new CopyOnWriteArrayList(); - checkEventStatus = true; - JsonArray bodyJson = json.getAsJsonArray(); - for (JsonElement obj : bodyJson) { - String eventId = msgService.getEventId(obj.getAsJsonObject()); - if (StringUtils.isNotEmpty(eventId) && checkEventStatus) { - String routing_key = getAndCheckEvent(msgService, map, resultList, obj, routingKeyMap, - userDomainSuffix, tag, routingKey); - if (StringUtils.isNotBlank(routing_key)) { - result = send(obj.toString(), msgService, userDomainSuffix, tag, routing_key); - resultList.addAll(result.getEvents()); - int statusCode = result.getEvents().get(0).getStatusCode(); - if (!statusCodes.contains(statusCode)) - statusCodes.add(statusCode); - } else if (routing_key == null) { - routingKeyGenerationFailure(resultList); - errorItems = new CopyOnWriteArrayList(); - int statusCode = resultList.get(0).getStatusCode(); - statusCodes.add(statusCode); - errorItems.add(obj); - checkEventStatus = false; - } else { - PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService); - resultList.add(resultItem); - int statusCode = resultItem.getStatusCode(); - statusCodes.add(statusCode); - break; - } - } else { - if (!checkEventStatus) { - addUnsuccessfulResultItem(obj); - int statusCode = resultList.get(0).getStatusCode(); - statusCodes.add(statusCode); + else { + if (json.isJsonArray()) { + checkEventStatus = true; + JsonArray bodyJson = json.getAsJsonArray(); + for (JsonElement obj : bodyJson) { + String eventId = msgService.getEventId(obj.getAsJsonObject()); + if (StringUtils.isNotEmpty(eventId) && checkEventStatus) { + String routing_key = getAndCheckEvent(msgService, map, resultList, obj, routingKeyMap, + userDomainSuffix, tag, routingKey); + if (StringUtils.isNotBlank(routing_key)) { + result = send(obj.toString(), msgService, userDomainSuffix, tag, routing_key); + resultList.addAll(result.getEvents()); + } else if (routing_key == null) { + routingKeyGenerationFailure(resultList); + checkEventStatus = false; + } else { + PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService); + resultList.add(resultItem); + break; + } } else { - createFailureResult(resultList); - errorItems = new CopyOnWriteArrayList(); - int statusCode = resultList.get(0).getStatusCode(); - statusCodes.add(statusCode); - errorItems.add(obj); - checkEventStatus = false; + if (!checkEventStatus) { + addUnsuccessfulResultItem(resultList, obj); + } else { + createFailureResult(resultList); + checkEventStatus = false; + } } } + } else { + result = send(json.toString(), msgService, userDomainSuffix, tag, routingKey); + resultList.addAll(result.getEvents()); } - } else { - statusCodes = new CopyOnWriteArrayList(); - result = send(json.toString(), msgService, userDomainSuffix, tag, routingKey); - resultList.addAll(result.getEvents()); - int statusCode = result.getEvents().get(0).getStatusCode(); - if (!statusCodes.contains(statusCode)) - statusCodes.add(statusCode); } + result = new SendResult(); result.setEvents(resultList); return result; } - private String sendMessage(String routingKey, String msg, MsgService msgService) throws IOException,NackException, TimeoutException, RemRemPublishException { + private String sendMessage(String routingKey, String msg, MsgService msgService) throws IOException,TimeoutException, RemRemPublishException { String resultMsg = PropertiesConfig.SUCCESS; try { instantiateRmqHelper(); } catch (RemRemPublishException e) { - log.error("RemRemPublishException occurred::" + e.getMessage()); + log.error("RemRemPublishException occurred::{}", e.getMessage()); } rmqHelper.send(routingKey, msg, msgService); return resultMsg; @@ -268,7 +249,7 @@ private String getAndCheckEvent(MsgService msgService, Map map, /** * Method returns result for the failure event. - * @param events for list the eiffel events results + * @param resultItemList for list the eiffel events results */ private void createFailureResult(List resultItemList) { PublishResultItem resultItem = new PublishResultItem(null, 400, PropertiesConfig.INVALID_MESSAGE, @@ -293,21 +274,9 @@ private void routingKeyGenerationFailure(List resultItemList) resultItemList.add(resultItem); } - private void addUnsuccessfulResultItem(JsonElement obj) { + private void addUnsuccessfulResultItem(List resultList, JsonElement obj) { PublishResultItem event = new PublishResultItem(null, 503, PropertiesConfig.SERVICE_UNAVAILABLE, PropertiesConfig.UNSUCCESSFUL_EVENT_CONTENT); resultList.add(event); } - - /** - * Method returns the Http response code for the events. - */ - public HttpStatus getHttpStatus() { - if (statusCodes.size() > 1) { - return HttpStatus.MULTI_STATUS; - } else { - return HttpStatus.valueOf(statusCodes.get(0)); - - } - } } diff --git a/publish-service/src/main/java/com/ericsson/eiffel/remrem/publish/controller/ProducerController.java b/publish-service/src/main/java/com/ericsson/eiffel/remrem/publish/controller/ProducerController.java index 7c70e70e..da06b825 100644 --- a/publish-service/src/main/java/com/ericsson/eiffel/remrem/publish/controller/ProducerController.java +++ b/publish-service/src/main/java/com/ericsson/eiffel/remrem/publish/controller/ProducerController.java @@ -16,6 +16,7 @@ import java.util.*; +import com.ericsson.eiffel.remrem.publish.service.*; import com.google.gson.*; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -27,6 +28,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; +import org.springframework.lang.NonNull; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.core.userdetails.UserDetails; @@ -43,10 +45,6 @@ import com.ericsson.eiffel.remrem.publish.exception.RemRemPublishException; import com.ericsson.eiffel.remrem.publish.helper.PublishUtils; import com.ericsson.eiffel.remrem.publish.helper.RMQHelper; -import com.ericsson.eiffel.remrem.publish.service.EventTemplateHandler; -import com.ericsson.eiffel.remrem.publish.service.GenerateURLTemplate; -import com.ericsson.eiffel.remrem.publish.service.MessageService; -import com.ericsson.eiffel.remrem.publish.service.SendResult; import com.fasterxml.jackson.databind.JsonNode; import ch.qos.logback.classic.Logger; @@ -111,6 +109,7 @@ public void setRestTemplate(RestTemplate restTemplate) { this.restTemplate = restTemplate; } + private int callsInSend = 0; public void logUserName() { Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); // Check if the user is authenticated @@ -165,12 +164,31 @@ public ResponseEntity send(final String msgProtocol, final String userDomain, fi } synchronized (this) { + callsInSend++; + log.info("callsInSend (before): " + callsInSend); SendResult result = messageService.send(body, msgService, userDomain, tag, routingKey); - log.info("HTTP Status: {}", messageService.getHttpStatus().value()); - return new ResponseEntity(result, messageService.getHttpStatus()); + callsInSend--; + HttpStatus status = getHttpStatus(result); + log.info("callsInSend (after): " + callsInSend); + log.info("HTTP Status: {}", status.value()); + return new ResponseEntity(result, status); } } + private HttpStatus getHttpStatus(SendResult result) { + List events = result.getEvents(); + HttpStatus status; + int nevents = events.size(); + if (nevents == 0) { + return HttpStatus.BAD_REQUEST; + } + else if (events.size() == 1) { + return HttpStatus.valueOf(events.get(0).getStatusCode()); + } + else { + return HttpStatus.MULTI_STATUS; + } + } /** * This controller used as producer to send messages or event * @param msgProtocol @@ -275,6 +293,11 @@ public ResponseEntity generateAndPublish(@ApiParam(value = "message protocol", r } } + private boolean eventTypeExists(@NonNull MsgService msgService, String eventType) { + Collection supportedEventTypes = msgService.getSupportedEventTypes(); + return supportedEventTypes != null && supportedEventTypes.contains(eventType); + } + /** * This controller provides single RemRem REST API End Point for both RemRem * Generate and Publish. @@ -345,6 +368,11 @@ public ResponseEntity generateAndPublish(final String msgProtocol, final String parsedTemplates.append("["); for (JsonElement eventJson : events) { // -- parse params in incoming request -> body ------------- + if (!eventTypeExists(msgService, msgType)) { + return createResponseEntity(HttpStatus.BAD_REQUEST, JSON_ERROR_STATUS, + "Unknown event type '" + msgType + "'"); + } + JsonNode parsedTemplate = eventTemplateHandler.eventTemplateParser(eventJson.toString(), msgType); if (parsedTemplates.length() > 1) { parsedTemplates.append(","); diff --git a/publish-service/src/test/java/com/ericsson/eiffel/remrem/publish/service/EiffelRemremCommonControllerUnitTest.java b/publish-service/src/test/java/com/ericsson/eiffel/remrem/publish/service/EiffelRemremCommonControllerUnitTest.java index 72d6aa9e..f687ab7c 100644 --- a/publish-service/src/test/java/com/ericsson/eiffel/remrem/publish/service/EiffelRemremCommonControllerUnitTest.java +++ b/publish-service/src/test/java/com/ericsson/eiffel/remrem/publish/service/EiffelRemremCommonControllerUnitTest.java @@ -110,7 +110,7 @@ public void setUp() throws Exception { when(service.getServiceName()).thenReturn("eiffelsemantics"); when(service2.getServiceName()).thenReturn("eiffelsemantics"); - when(messageService.getHttpStatus()).thenReturn(HttpStatus.OK); +// when(messageService.getHttpStatus()).thenReturn(HttpStatus.OK); when(messageService.send(ArgumentMatchers.anyString(), ArgumentMatchers.any(MsgService.class), ArgumentMatchers.anyString(), ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(res); From 2581e89d71d72f7e64b5799b2b829caf4366f1f7 Mon Sep 17 00:00:00 2001 From: Roman Szturc Date: Thu, 14 Nov 2024 15:20:18 +0100 Subject: [PATCH 3/3] Change log altered --- CHANGELOG.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37ccdc8d..4343f7d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,9 @@ +## 2.1.5 +- Events can be published in parallel. + ## 2.1.2 - Added external parameters to send username, password and/or uri to connect to Messagebus. - + ## 2.1.1 - Implemented the changes to log the eventId and HTTPStatus while the level is INFO. - Implemented the changes to print the user information while the log level is INFO.