diff --git a/settings.gradle b/settings.gradle index 48f223b..b3afe51 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,2 +1,2 @@ rootProject.name='turbine' -include 'turbine-core', 'turbine-ext:turbine-discovery-eureka1', 'turbine-ext:turbine-discovery-eureka2' +include 'turbine-core', 'turbine-ext:turbine-discovery-eureka1', 'turbine-ext:turbine-discovery-eureka2', 'turbine-ext:turbine-discovery-consul' diff --git a/turbine-ext/turbine-discovery-consul/build.gradle b/turbine-ext/turbine-discovery-consul/build.gradle new file mode 100644 index 0000000..723ab97 --- /dev/null +++ b/turbine-ext/turbine-discovery-consul/build.gradle @@ -0,0 +1,33 @@ +apply plugin: 'java' +apply plugin: 'eclipse' +apply plugin: 'com.github.johnrengelman.shadow' +apply plugin: 'maven-publish' + + +sourceCompatibility = JavaVersion.VERSION_1_8 +targetCompatibility = JavaVersion.VERSION_1_8 + +dependencies { + compile project(":turbine-core") + + compile 'org.slf4j:slf4j-log4j12:1.6.1' + compile 'com.orbitz.consul:consul-client:1.2.0' + testCompile 'junit:junit-dep:4.10' + testCompile 'org.json:json:20140107' +} + +mainClassName = "com.netflix.turbine.discovery.consul.StartConsulTurbine" + +shadowJar { + baseName = 'turbine-discovery-consul-executable' + classifier = '' +} + +publishing { + publications { + shadow(MavenPublication) { + from components.shadow + artifactId = 'turbine-discovery-consul-executable' + } + } +} diff --git a/turbine-ext/turbine-discovery-consul/src/main/java/com/netflix/turbine/discovery/consul/ConsulInstance.java b/turbine-ext/turbine-discovery-consul/src/main/java/com/netflix/turbine/discovery/consul/ConsulInstance.java new file mode 100644 index 0000000..5785bbd --- /dev/null +++ b/turbine-ext/turbine-discovery-consul/src/main/java/com/netflix/turbine/discovery/consul/ConsulInstance.java @@ -0,0 +1,93 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.turbine.discovery.consul; + +public class ConsulInstance { + + public static enum Status { + UP, DOWN + } + + private final String cluster; + private final Status status; + private final String hostname; + private final int port; + + public ConsulInstance(String cluster, Status status, String hostname, int port) { + this.cluster = cluster; + this.status = status; + this.hostname = hostname; + this.port = port; + } + + public Status getStatus() { + return status; + } + + public String getCluster() { + return cluster; + } + + public String getHost() { + return hostname; + } + + public boolean isUp() { + return Status.UP == status; + } + + public int getPort() { + return port; + } + + public String getId() { + return hostname + port; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ConsulInstance)) return false; + + ConsulInstance that = (ConsulInstance) o; + + if (port != that.port) return false; + if (cluster != null ? !cluster.equals(that.cluster) : that.cluster != null) return false; + if (hostname != null ? !hostname.equals(that.hostname) : that.hostname != null) return false; + if (status != that.status) return false; + + return true; + } + + @Override + public int hashCode() { + int result = cluster != null ? cluster.hashCode() : 0; + result = 31 * result + (status != null ? status.hashCode() : 0); + result = 31 * result + (hostname != null ? hostname.hashCode() : 0); + result = 31 * result + port; + return result; + } + + @Override + public String toString() { + return "ConsulInstance{" + + "cluster='" + cluster + '\'' + + ", status=" + status + + ", hostname='" + hostname + '\'' + + ", port=" + port + + '}'; + } +} diff --git a/turbine-ext/turbine-discovery-consul/src/main/java/com/netflix/turbine/discovery/consul/ConsulInstanceDiscovery.java b/turbine-ext/turbine-discovery-consul/src/main/java/com/netflix/turbine/discovery/consul/ConsulInstanceDiscovery.java new file mode 100644 index 0000000..0e4dabe --- /dev/null +++ b/turbine-ext/turbine-discovery-consul/src/main/java/com/netflix/turbine/discovery/consul/ConsulInstanceDiscovery.java @@ -0,0 +1,167 @@ +/* + * Copyright 2013 Netflix + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.turbine.discovery.consul; + +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; + +import com.google.common.collect.Collections2; +import com.google.common.net.HostAndPort; +import com.orbitz.consul.*; +import com.orbitz.consul.cache.*; +import com.orbitz.consul.cache.ConsulCache.*; +import com.orbitz.consul.model.health.*; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import rx.Observable; +import rx.Subscriber; + +/** + * Class that encapsulates an {@link InstanceDicovery} implementation that uses + * Consul (see https://www.consul.io/) The plugin requires a list of + * applications configured. It then queries the set of instances for each + * application. Instance information retrieved from Consul must be translated to + * something that Turbine can understand i.e the {@link ConsulInstance} class. + * + * All the logic to perform this translation can be overridden here, so that you + * can provide your own implementation if needed. + */ +public class ConsulInstanceDiscovery { + + private static final Logger logger = LoggerFactory.getLogger(ConsulInstanceDiscovery.class); + static CopyOnWriteArrayList instances=new CopyOnWriteArrayList<>(); + private final Consul consul; + + public static void main(String[] args) { + OptionParser optionParser = new OptionParser(); + optionParser.accepts("consulPort").withRequiredArg(); + optionParser.accepts("consulHostname").withRequiredArg(); + optionParser.accepts("app").withRequiredArg(); + + OptionSet options = optionParser.parse(args); + + int consulPort = -1; + if (!options.has("consulPort")) { + System.err.println("Argument --consulPort required: port of Consul Service Discovery"); + System.exit(-1); + } else { + try { + consulPort = Integer.parseInt(String.valueOf(options.valueOf("consulPort"))); + } catch (NumberFormatException e) { + System.err.println("Value of consulPort must be an integer but was: " + options.valueOf("consulPort")); + } + } + + String consulHostname = null; + if (!options.has("consulHostname")) { + System.err.println("Argument --consulHostname required: hostname of Consul Service Discovery"); + System.exit(-1); + } else { + consulHostname = String.valueOf(options.valueOf("consulHostname")); + } + + String app = null; + if (!options.has("app")) { + System.err.println("Argument --app required for Consul Instsance Discovery. Eg. -app api"); + System.exit(-1); + } else { + app = String.valueOf(options.valueOf("app")); + } + + HostAndPort consulHostAndPort = HostAndPort.fromParts(consulHostname, consulPort); + + Consul consul = Consul.builder().withHostAndPort(consulHostAndPort).build(); + + new ConsulInstanceDiscovery(consul).getInstanceEvents(app).toBlocking().forEach(i -> System.out.println(i)); + } + + public ConsulInstanceDiscovery(Consul consul) { + this.consul = consul; + } + + public Observable getInstanceEvents(String appName) { + HealthClient healthClient = consul.healthClient(); + ServiceHealthCache svHealth = ServiceHealthCache.newCache(healthClient, appName); + ArrayList> subscribers=new ArrayList<>(); + Observable observableConsulInstances = Observable.create((x) ->{ + subscribers.add((Subscriber)x); + instances.forEach((i) ->{ + x.onNext(i); + }); + }); + + svHealth.addListener(new Listener() { + @Override + public void notify(Map newValues) { + try { + newValues.forEach((x, y) -> { + Service service = y.getService(); + Optional dataCenter = y.getNode().getDatacenter(); + + String cluster = dataCenter.isPresent() ? dataCenter.get() : service.getService(); + ConsulInstance.Status status = ConsulInstance.Status.UP; + String hostName = service.getAddress(); + int port = service.getPort(); + + for (HealthCheck check : y.getChecks()) { + if (check.getStatus().equals("passing") == false) { + status = ConsulInstance.Status.DOWN; + } + } + ConsulInstance instance = new ConsulInstance(cluster, status, hostName, port); + Collection toRemove = Collections2.filter(instances, (c) ->{return c.getId().equals(instance.getId());}); + for(ConsulInstance item : toRemove) { instances.remove(item);} + instances.add(instance); + }); + } catch (Exception e) { + logger.warn("Error parsing notification from consul {}", newValues); + } + } + }); + svHealth.start(); + + return observableConsulInstances; + } + + public static Observable delta(List> listOfLists) { + if (listOfLists.size() == 1) { + return Observable.from(listOfLists.get(0)); + } else { + // diff the two + List newList = listOfLists.get(1); + List oldList = new ArrayList<>(listOfLists.get(0)); + + Set delta = new LinkedHashSet<>(); + delta.addAll(newList); + // remove all that match in old + delta.removeAll(oldList); + + // filter oldList to those that aren't in the newList + oldList.removeAll(newList); + + // for all left in the oldList we'll create DROP events + for (ConsulInstance old : oldList) { + delta.add(new ConsulInstance(old.getCluster(), ConsulInstance.Status.DOWN, old.getHost(), old.getPort())); + } + + return Observable.from(delta); + } + } +} \ No newline at end of file diff --git a/turbine-ext/turbine-discovery-consul/src/main/java/com/netflix/turbine/discovery/consul/ConsulStreamDiscovery.java b/turbine-ext/turbine-discovery-consul/src/main/java/com/netflix/turbine/discovery/consul/ConsulStreamDiscovery.java new file mode 100644 index 0000000..8deab64 --- /dev/null +++ b/turbine-ext/turbine-discovery-consul/src/main/java/com/netflix/turbine/discovery/consul/ConsulStreamDiscovery.java @@ -0,0 +1,70 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.turbine.discovery.consul; + +import java.net.URI; + +import com.netflix.turbine.discovery.StreamAction; +import com.netflix.turbine.discovery.StreamDiscovery; +import com.orbitz.consul.Consul; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; + +public class ConsulStreamDiscovery implements StreamDiscovery { + + private static final Logger logger = LoggerFactory.getLogger(ConsulStreamDiscovery.class); + + public static ConsulStreamDiscovery create(String appName, String uriTemplate, Consul consul) { + return new ConsulStreamDiscovery(appName, uriTemplate, consul); + } + + public final static String HOSTNAME = "{HOSTNAME}"; + private final String uriTemplate; + private final String appName; + private final Consul consul; + + private ConsulStreamDiscovery(String appName, String uriTemplate, Consul consul) { + this.appName = appName; + this.uriTemplate = uriTemplate; + this.consul = consul; + } + + @Override + public Observable getInstanceList() { + return new ConsulInstanceDiscovery(consul) + .getInstanceEvents(appName) + .map(ei -> { + URI uri; + String uriString = uriTemplate.replace(HOSTNAME, ei.getHost() + ":" + ei.getPort()); + try { + uri = new URI(uriString); + } catch (Exception e) { + throw new RuntimeException("Invalid URI: " + uriString, e); + } + if (ei.getStatus() == ConsulInstance.Status.UP) { + logger.info("StreamAction ADD"); + return StreamAction.create(StreamAction.ActionType.ADD, uri); + } else { + logger.info("StreamAction REMOVE"); + return StreamAction.create(StreamAction.ActionType.REMOVE, uri); + } + + }); + } + +} diff --git a/turbine-ext/turbine-discovery-consul/src/main/java/com/netflix/turbine/discovery/consul/StartConsulTurbine.java b/turbine-ext/turbine-discovery-consul/src/main/java/com/netflix/turbine/discovery/consul/StartConsulTurbine.java new file mode 100644 index 0000000..8689dc7 --- /dev/null +++ b/turbine-ext/turbine-discovery-consul/src/main/java/com/netflix/turbine/discovery/consul/StartConsulTurbine.java @@ -0,0 +1,116 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.turbine.discovery.consul; + +import com.orbitz.consul.Consul; +import com.google.common.net.HostAndPort; +import joptsimple.OptionParser; +import joptsimple.OptionSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.turbine.Turbine; + +public class StartConsulTurbine { + private static final Logger logger = LoggerFactory.getLogger(StartConsulTurbine.class); + + public static void main(String[] args) { + OptionParser optionParser = new OptionParser(); + optionParser.accepts("port").withRequiredArg(); + optionParser.accepts("app").withRequiredArg(); + optionParser.accepts("urlTemplate").withRequiredArg(); + optionParser.accepts("consulPort").withRequiredArg().defaultsTo("8500"); + optionParser.accepts("consulHostname").withRequiredArg(); + + OptionSet options = null; + + try { + options = optionParser.parse(args); + } catch (Exception e) { + System.err.println("Faild to parse input parameters. Expected args: '-port 10901 -app app1 -urlTemplate http://{HOSTNAME}/stream -consulPort 8500 -consulHostname localhost'"); + System.exit(-1); + } + + int port = -1; + if (!options.has("port")) { + System.err.println("Argument -port required for SSE HTTP server to start on. Eg. -port 8888"); + System.exit(-1); + } else { + try { + port = Integer.parseInt(String.valueOf(options.valueOf("port"))); + } catch (NumberFormatException e) { + System.err.println("Value of port must be an integer but was: " + options.valueOf("port")); + } + } + + String app = null; + if (!options.has("app")) { + System.err.println("Argument -app required for Consul Service Discovery. Eg. -app api"); + System.exit(-1); + } else { + app = String.valueOf(options.valueOf("app")); + } + + String template = null; + if (!options.has("urlTemplate")) { + System.err.println("Argument -urlTemplate required. Eg. http://" + ConsulStreamDiscovery.HOSTNAME + "/metrics.stream"); + System.exit(-1); + } else { + template = String.valueOf(options.valueOf("urlTemplate")); + if (!template.contains(ConsulStreamDiscovery.HOSTNAME)) { + System.err.println("Argument -urlTemplate must contain " + ConsulStreamDiscovery.HOSTNAME + " marker. Eg. http://" + ConsulStreamDiscovery.HOSTNAME + "/metrics.stream"); + System.exit(-1); + } + } + + int consulPort = -1; + if (!options.has("consulPort")) { + System.err.println("Argument --consulPort required: port of Consul Service Discovery"); + System.exit(-1); + } else { + try { + consulPort = Integer.parseInt(String.valueOf(options.valueOf("consulPort"))); + } catch (NumberFormatException e) { + System.err.println("Value of consulPort must be an integer but was: " + options.valueOf("consulPort")); + } + } + + String consulHostname = null; + if (!options.has("consulHostname")) { + System.err.println("Argument --consulHostname required: hostname of Consul Service Discovery"); + System.exit(-1); + } else { + consulHostname = String.valueOf(options.valueOf("consulHostname")); + } + + logger.info("Turbine => Consul App: " + app); + logger.info("Turbine => Consul URL Template: " + template); + + try { + HostAndPort consulHostAndPort = HostAndPort.fromParts(consulHostname, consulPort); + + Consul consul = Consul.builder() + .withHostAndPort(consulHostAndPort) + .build(); + + Turbine.startServerSentEventServer(port, ConsulStreamDiscovery.create(app, template, consul)); + } catch (Throwable e) { + e.printStackTrace(); + } + } + +} diff --git a/turbine-ext/turbine-discovery-consul/src/main/resources/log4j.properties b/turbine-ext/turbine-discovery-consul/src/main/resources/log4j.properties new file mode 100644 index 0000000..ff93bc0 --- /dev/null +++ b/turbine-ext/turbine-discovery-consul/src/main/resources/log4j.properties @@ -0,0 +1,8 @@ +# Root logger option +log4j.rootLogger=INFO, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n \ No newline at end of file diff --git a/turbine-ext/turbine-discovery-consul/src/test/java/com/netflix/turbine/discovery/consul/ConsulInstanceDiscoveryTest.java b/turbine-ext/turbine-discovery-consul/src/test/java/com/netflix/turbine/discovery/consul/ConsulInstanceDiscoveryTest.java new file mode 100644 index 0000000..e5bcb1c --- /dev/null +++ b/turbine-ext/turbine-discovery-consul/src/test/java/com/netflix/turbine/discovery/consul/ConsulInstanceDiscoveryTest.java @@ -0,0 +1,109 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.turbine.discovery.consul; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.junit.Test; + +import rx.Observable; +import rx.observers.TestSubscriber; + +import com.netflix.turbine.discovery.consul.ConsulInstance; +import com.netflix.turbine.discovery.consul.ConsulInstanceDiscovery; + +public class ConsulInstanceDiscoveryTest { + + @Test + public void testDeltaRemoveDuplicateAddSecond() { + ConsulInstance a = new ConsulInstance("", ConsulInstance.Status.UP, "hostname1", 1234); + Observable> first = Observable.just(a).toList(); + + ConsulInstance b = new ConsulInstance("", ConsulInstance.Status.DOWN, "hostname1", 1234); + Observable> second = Observable.just(a, b).toList(); + + TestSubscriber ts = new TestSubscriber(); + + Observable + .just(first, second) + .flatMap(o -> o) + .startWith(new ArrayList()) + .buffer(2, 1) + .filter(l -> l.size() == 2) + .flatMap(ConsulInstanceDiscovery::delta) + .subscribe(ts); + + ts.assertReceivedOnNext(Arrays.asList(a, b)); + } + + @Test + public void testDrop() { + ConsulInstance a = new ConsulInstance("", ConsulInstance.Status.UP, "hostname1", 1234); + Observable> first = Observable.just(a).toList(); + + ConsulInstance b = new ConsulInstance("", ConsulInstance.Status.DOWN, "hostname1", 1234); + Observable> second = Observable.just(b).toList(); + + TestSubscriber ts = new TestSubscriber(); + + Observable + .just(first, second) + .flatMap(o -> o) + .startWith(new ArrayList()) + .buffer(2, 1) + .filter(l -> l.size() == 2) + .flatMap(ConsulInstanceDiscovery::delta) + .subscribe(ts); + + ts.assertReceivedOnNext(Arrays.asList(a, b)); + } + + @Test + public void testAddRemoveAddRemove() { + // start with 4 + ConsulInstance a1 = new ConsulInstance("", ConsulInstance.Status.UP, "hostname1", 1234); + ConsulInstance a2 = new ConsulInstance("", ConsulInstance.Status.UP, "hostname2", 1234); + ConsulInstance a3 = new ConsulInstance("", ConsulInstance.Status.UP, "hostname3", 1234); + ConsulInstance a4 = new ConsulInstance("", ConsulInstance.Status.UP, "hostname4", 1234); + Observable> first = Observable.just(a1, a2, a3, a4).toList(); + + // mark one of them as DOWN + ConsulInstance b4 = new ConsulInstance("", ConsulInstance.Status.DOWN, "hostname4", 1234); + Observable> second = Observable.just(a1, a2, a3, b4).toList(); + + // then completely drop 2 of them + Observable> third = Observable.just(a1, a2).toList(); + + TestSubscriber ts = new TestSubscriber(); + + Observable + .just(first, second, third) + .flatMap(o -> o) + .startWith(new ArrayList()) + .buffer(2, 1) + .filter(l -> l.size() == 2) + .flatMap(ConsulInstanceDiscovery::delta) + .subscribe(ts); + + // expected ... + // UP a1, UP a2, UP a3, UP a4 + // DOWN b4 + // DOWN a3 + ts.assertReceivedOnNext(Arrays.asList(a1, a2, a3, a4, b4, new ConsulInstance(a3.getCluster(), ConsulInstance.Status.DOWN, a3.getHost(), a3.getPort()), b4)); + } +}