diff --git a/.gitignore b/.gitignore index 900c8104..df9b61e1 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ producer.iml +/bin/ diff --git a/build.gradle b/build.gradle index af7f5ca9..762fedc0 100644 --- a/build.gradle +++ b/build.gradle @@ -21,10 +21,12 @@ plugins { war { baseName = 'remrem-publish' - version = '0.1.0' + version = '0.1.5' } apply plugin: 'spring-boot' +apply plugin: 'java' +apply plugin: 'eclipse' jacocoTestReport { reports { @@ -89,6 +91,8 @@ task integrationTest(type: Test) { // In this section you declare the dependencies for your production and test code dependencies { + compile('com.jayway.restassured:rest-assured:2.3.2') + compile('org.yaml:snakeyaml:1.8') // The production code uses the SLF4J logging API at compile time compile 'org.slf4j:slf4j-api:1.7.18' compile("org.springframework.boot:spring-boot-starter-web:$sprintBootVersion") { @@ -99,6 +103,9 @@ dependencies { compile 'com.google.code.gson:gson:2.6.2' compile 'org.projectlombok:lombok:1.16.8' + //commons CLI + compile 'commons-cli:commons-cli:1.3.1' + //ServletException requires compile time servlet dependency but it causes problems //when deployed if exist on war run time.. hence provided but also compileOnly compileOnly("org.springframework.boot:spring-boot-starter-tomcat") diff --git a/src/integration-test/resources/application.yml b/src/integration-test/resources/application.yml index 3fda33d7..d7c0c3fc 100644 --- a/src/integration-test/resources/application.yml +++ b/src/integration-test/resources/application.yml @@ -1,5 +1,5 @@ rabbitmq: - host: 127.0.0.1 + host: 150.132.35.5 exchange: name: eiffel.test spring: diff --git a/src/main/java/com/ericsson/eiffel/remrem/publish/App.java b/src/main/java/com/ericsson/eiffel/remrem/publish/App.java index 4a618c10..14b9d525 100644 --- a/src/main/java/com/ericsson/eiffel/remrem/publish/App.java +++ b/src/main/java/com/ericsson/eiffel/remrem/publish/App.java @@ -6,10 +6,28 @@ import org.springframework.boot.context.web.SpringBootServletInitializer; import org.springframework.context.ConfigurableApplicationContext; +import com.ericsson.eiffel.remrem.publish.cli.CLI; + import java.util.Arrays; -@SpringBootApplication @Slf4j public class App extends SpringBootServletInitializer { +@SpringBootApplication +@Slf4j +public class App extends SpringBootServletInitializer { public static void main(String[] args) { + + // CLI class checks if arguments are passed to application + // and if so we do not start the service but act based on + // passed arguments. If no arguments are passed the server + // will be started + CLI cli = new CLI(); + boolean needsStartService = cli.parse(args); + + if (needsStartService) { + startService(args); + } + } + + private static void startService(String[] args) { ConfigurableApplicationContext ctx = SpringApplication.run(App.class, args); log.info("Let's inspect the beans provided by Spring Boot:"); @@ -20,4 +38,4 @@ public static void main(String[] args) { log.info(beanName); } } -} +} \ No newline at end of file diff --git a/src/main/java/com/ericsson/eiffel/remrem/publish/cli/CLI.java b/src/main/java/com/ericsson/eiffel/remrem/publish/cli/CLI.java new file mode 100644 index 00000000..6ac93130 --- /dev/null +++ b/src/main/java/com/ericsson/eiffel/remrem/publish/cli/CLI.java @@ -0,0 +1,179 @@ +package com.ericsson.eiffel.remrem.publish.cli; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import com.ericsson.eiffel.remrem.publish.config.PropertiesConfig; +import com.ericsson.eiffel.remrem.publish.helper.RMQHelper; +import com.ericsson.eiffel.remrem.publish.service.MessageService; +import com.ericsson.eiffel.remrem.publish.service.MessageServiceRMQImpl; +import com.ericsson.eiffel.remrem.publish.service.SendResult; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + +/** + * Class for interpreting the passed arguments from command line. + * Parse method returns true, meaning we need to start the service afterwards, if no argument + * is given. The same method returns false, meaning we do not start the service afterwards, if any + * argument is given. If an argument is given that it is not recognized we print help + * + * This class also uses System Properties to pass some arguments to underlying service. It is important to + * choose properties names that are difficult to be matched by the system + * + * @author evasiba + * + */ +public class CLI { + private Options options=null; + + public CLI() { + options = createCLIOptions(); + } + + /** + * Creates the options needed by command line interface + * @return the options this CLI can handle + */ + private static Options createCLIOptions() { + Options options = new Options(); + options.addOption("h", "help", false, "show help."); + options.addOption("f", "content_file", true, "event content file, mandatory"); + options.addOption("mb", "message_bus", true, "host of message bus to use, default is 127.0.0.1"); + options.addOption("en", "exchange_name", true, "exchange name, default is eiffel.poc"); + options.addOption("rk", "routing_key", true, "routing key, mandatory"); + options.addOption("np", "non_persistent", false, "remove persistence from message sending"); + + return options; + } + + /** + * Prints the help for this application and exits. + * @param options the options to print usage help for + */ + private static void help(Options options) { + // This prints out some help + HelpFormatter formater = new HelpFormatter(); + formater.printHelp("java -jar", options); + System.exit(1); + } + + /** + * Parse the given arguments and act on them + * @param args command line arguments + * @return if the service should start or not + */ + public boolean parse(String[] args) { + CommandLineParser parser = new DefaultParser(); + boolean startService = true; + try { + CommandLine commandLine = parser.parse(options, args); + Option[] existingOptions = commandLine.getOptions(); + if (existingOptions.length > 0) { + startService = false; + handleOptions(commandLine); + } + } catch (Exception e) { + e.printStackTrace(); + help(options); + } + return startService; + } + + /** + * Delegates actions depending on the passed arguments + * @param commandLine command line arguments + */ + private void handleOptions(CommandLine commandLine) { + handleMessageBusOptions(commandLine); + if (commandLine.hasOption("h")) { + System.out.println("You passed help flag."); + clearSystemProperties(); + help(options); + } else if (commandLine.hasOption("f") && commandLine.hasOption("rk")) { + String filePath = commandLine.getOptionValue("f"); + String routingKey = commandLine.getOptionValue("rk"); + handleContentFile(filePath, routingKey); + } else { + System.out.println("Missing arguments, please review your arguments" + + " and check if any mandatory argument is missing"); + clearSystemProperties(); + help(options); + } + } + + /** + * Sets the system properties with values passed for + * message bus host and exchange name + * @param commandLine command line arguments + */ + private void handleMessageBusOptions(CommandLine commandLine){ + if (commandLine.hasOption("mb")) { + String messageBusHost = commandLine.getOptionValue("mb"); + String key = PropertiesConfig.MESSAGE_BUSS_HOST; + System.setProperty(key, messageBusHost); + } + + if (commandLine.hasOption("en")) { + String exchangeName = commandLine.getOptionValue("en"); + String key = PropertiesConfig.EXCHANGE_NAME; + System.setProperty(key, exchangeName); + } + + String usePersistance = "true"; + if (commandLine.hasOption("np")) { + usePersistance = "false"; + } + String key = PropertiesConfig.USE_PERSISTENCE; + System.setProperty(key, usePersistance); + } + + /** + * Handle event from file + * @param filePath the path of the file where the messages reside + */ + public void handleContentFile(String filePath, String routingKey) { + JsonParser parser = new JsonParser(); + try { + byte[] fileBytes = Files.readAllBytes(Paths.get(filePath)); + String fileContent = new String(fileBytes); + JsonArray bodyJson = parser.parse(fileContent).getAsJsonArray(); + List msgs = new ArrayList<>(); + for (JsonElement obj : bodyJson) { + msgs.add(obj.toString()); + } + MessageService msgService = new MessageServiceRMQImpl(); + List results = msgService.send(routingKey, msgs); + msgService.cleanUp(); + clearSystemProperties(); + for(SendResult result : results) + System.out.println(result.getMsg()); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + System.exit(-1); + } + } + + /** + * Remove the system properties add by this application + */ + private void clearSystemProperties() { + String key = PropertiesConfig.MESSAGE_BUSS_HOST; + System.clearProperty(key); + key = PropertiesConfig.EXCHANGE_NAME; + System.clearProperty(key); + } + +} \ No newline at end of file diff --git a/src/main/java/com/ericsson/eiffel/remrem/publish/config/PropertiesConfig.java b/src/main/java/com/ericsson/eiffel/remrem/publish/config/PropertiesConfig.java new file mode 100644 index 00000000..20660f42 --- /dev/null +++ b/src/main/java/com/ericsson/eiffel/remrem/publish/config/PropertiesConfig.java @@ -0,0 +1,7 @@ +package com.ericsson.eiffel.remrem.publish.config; + +public class PropertiesConfig { + public static final String MESSAGE_BUSS_HOST = "com.ericsson.eiffel.remrem.publish.messagebus.host"; + public static final String EXCHANGE_NAME = "com.ericsson.eiffel.remrem.publish.exchange.name"; + public static final String USE_PERSISTENCE = "com.ericsson.eiffel.remrem.publish.use.persistence"; +} diff --git a/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RMQHelper.java b/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RMQHelper.java index 5001b0e0..d4ad69bf 100644 --- a/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RMQHelper.java +++ b/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RMQHelper.java @@ -1,19 +1,29 @@ package com.ericsson.eiffel.remrem.publish.helper; +import com.ericsson.eiffel.remrem.publish.config.PropertiesConfig; +import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; + import lombok.extern.slf4j.Slf4j; + import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import org.yaml.snakeyaml.Yaml; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; + import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.TimeoutException; @@ -23,14 +33,32 @@ private static final Random random = new Random(); @Value("${rabbitmq.host}") private String host; @Value("${rabbitmq.exchange.name}") private String exchangeName; + private boolean usePersitance = false; private Connection rabbitConnection; private List rabbitChannels; - @PostConstruct public void init() { + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getExchangeName() { + return exchangeName; + } + + public void setExchangeName(String exchangeName) { + this.exchangeName = exchangeName; + } + + @PostConstruct public void init() { log.info("RMQHelper init ..."); + initCli(); try { ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(host); + factory.setHost(host); rabbitConnection = factory.newConnection(); rabbitChannels = new ArrayList<>(); @@ -42,6 +70,32 @@ } } + private void initCli() { + host = System.getProperty(PropertiesConfig.MESSAGE_BUSS_HOST); + exchangeName = System.getProperty(PropertiesConfig.EXCHANGE_NAME); + usePersitance = Boolean.getBoolean(PropertiesConfig.USE_PERSISTENCE); + if (host == null || exchangeName == null) { + Yaml yaml = new Yaml(); + try { + String fileName = "application.yml"; + ClassLoader classLoader = getClass().getClassLoader(); + InputStream ios = classLoader.getResourceAsStream(fileName); + + // Parse the YAML file and return the output as a series of Maps and Lists + Map result = (Map)yaml.load(ios); + Map rmq = (Map)result.get("rabbitmq"); + if (host == null) + host = (String)rmq.get("host"); + Map rmqExchange = (Map)rmq.get("exchange"); + if (exchangeName == null) + exchangeName = (String)rmqExchange.get("name"); + int i = 2; + } catch (Exception e) { + e.printStackTrace(); + } + } + } + @PreDestroy public void cleanUp() throws IOException { log.info("RMQHelper cleanUp ..."); @@ -54,8 +108,29 @@ public void cleanUp() throws IOException { } public void send(String routingKey, String msg) throws IOException { - giveMeRandomChannel() - .basicPublish(exchangeName, routingKey, MessageProperties.BASIC, msg.getBytes()); + try { + Channel channel = giveMeRandomChannel(); + channel.addShutdownListener(new ShutdownListener() { + public void shutdownCompleted(ShutdownSignalException cause) { + // Beware that proper synchronization is needed here + if (cause.isInitiatedByApplication()) { + log.debug("Shutdown is initiated by application. Ignoring it."); + } else { + log.error("Shutdown is NOT initiated by application."); + log.error(cause.getMessage()); + System.exit(-3); + } + } + }); + + BasicProperties msgProps = MessageProperties.BASIC; + if (usePersitance) + msgProps = MessageProperties.PERSISTENT_BASIC; + + channel.basicPublish(exchangeName, routingKey, msgProps, msg.getBytes()); + } catch (Exception e) { + // TODO: handle exception + } } diff --git a/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageService.java b/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageService.java index a70cc27d..5fe97f12 100644 --- a/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageService.java +++ b/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageService.java @@ -5,4 +5,5 @@ public interface MessageService { List send(String routingKey, List msgs); + public void cleanUp(); } diff --git a/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageServiceRMQImpl.java b/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageServiceRMQImpl.java index 10ab7d42..c04ae893 100644 --- a/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageServiceRMQImpl.java +++ b/src/main/java/com/ericsson/eiffel/remrem/publish/service/MessageServiceRMQImpl.java @@ -7,6 +7,7 @@ import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -28,12 +29,30 @@ private SendResult send(String routingKey, String msg) { String resultMsg = SUCCEED; + instantiateRmqHelper(); try { rmqHelper.send(routingKey, msg); } catch (Exception e) { log.error(e.getMessage(), e); - resultMsg = e.getStackTrace().toString(); + resultMsg = "Failed to send message:" + msg; } return new SendResult(resultMsg); } + + private void instantiateRmqHelper() { + if (rmqHelper == null) { + rmqHelper = new RMQHelper(); + rmqHelper.init(); + } + } + + public void cleanUp() { + if (rmqHelper != null) + try { + rmqHelper.cleanUp(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 56575591..a68d8709 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -6,10 +6,16 @@ logging: com.ericsson.eiffel.remrem.producer: ERROR # need to be updated according to the test env. -#rabbitmq.host=127.0.0.1 +#rabbitmq.host=mb101-eiffel010.lmera.ericsson.se # must exist #rabbitmq.exchange.name=eiffel.poc +rabbitmq: + host: 127.0.0.1 + #host: mb101-eiffel010.lmera.ericsson.se + exchange: + name: eiffel.poc + spring: http.converters: preferred-json-mapper: gson