Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consul service discovery #126

Open
wants to merge 4 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
rootProject.name='turbine'
include 'turbine-core', 'turbine-ext:turbine-discovery-eureka1', 'turbine-ext:turbine-discovery-eureka2'
include 'turbine-core', 'turbine-ext:turbine-discovery-eureka1', 'turbine-ext:turbine-discovery-eureka2', 'turbine-ext:turbine-discovery-consul'
33 changes: 33 additions & 0 deletions turbine-ext/turbine-discovery-consul/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: 'maven-publish'


sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8

dependencies {
compile project(":turbine-core")

compile 'org.slf4j:slf4j-log4j12:1.6.1'
compile 'com.orbitz.consul:consul-client:1.2.0'
testCompile 'junit:junit-dep:4.10'
testCompile 'org.json:json:20140107'
}

mainClassName = "com.netflix.turbine.discovery.consul.StartConsulTurbine"

shadowJar {
baseName = 'turbine-discovery-consul-executable'
classifier = ''
}

publishing {
publications {
shadow(MavenPublication) {
from components.shadow
artifactId = 'turbine-discovery-consul-executable'
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.turbine.discovery.consul;

public class ConsulInstance {

public static enum Status {
UP, DOWN
}

private final String cluster;
private final Status status;
private final String hostname;
private final int port;

public ConsulInstance(String cluster, Status status, String hostname, int port) {
this.cluster = cluster;
this.status = status;
this.hostname = hostname;
this.port = port;
}

public Status getStatus() {
return status;
}

public String getCluster() {
return cluster;
}

public String getHost() {
return hostname;
}

public boolean isUp() {
return Status.UP == status;
}

public int getPort() {
return port;
}

public String getId() {
return hostname + port;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ConsulInstance)) return false;

ConsulInstance that = (ConsulInstance) o;

if (port != that.port) return false;
if (cluster != null ? !cluster.equals(that.cluster) : that.cluster != null) return false;
if (hostname != null ? !hostname.equals(that.hostname) : that.hostname != null) return false;
if (status != that.status) return false;

return true;
}

@Override
public int hashCode() {
int result = cluster != null ? cluster.hashCode() : 0;
result = 31 * result + (status != null ? status.hashCode() : 0);
result = 31 * result + (hostname != null ? hostname.hashCode() : 0);
result = 31 * result + port;
return result;
}

@Override
public String toString() {
return "ConsulInstance{" +
"cluster='" + cluster + '\'' +
", status=" + status +
", hostname='" + hostname + '\'' +
", port=" + port +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright 2013 Netflix
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.turbine.discovery.consul;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

import com.google.common.collect.Collections2;
import com.google.common.net.HostAndPort;
import com.orbitz.consul.*;
import com.orbitz.consul.cache.*;
import com.orbitz.consul.cache.ConsulCache.*;
import com.orbitz.consul.model.health.*;

import joptsimple.OptionParser;
import joptsimple.OptionSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;
import rx.Subscriber;

/**
* Class that encapsulates an {@link InstanceDicovery} implementation that uses
* Consul (see https://www.consul.io/) The plugin requires a list of
* applications configured. It then queries the set of instances for each
* application. Instance information retrieved from Consul must be translated to
* something that Turbine can understand i.e the {@link ConsulInstance} class.
*
* All the logic to perform this translation can be overridden here, so that you
* can provide your own implementation if needed.
*/
public class ConsulInstanceDiscovery {

private static final Logger logger = LoggerFactory.getLogger(ConsulInstanceDiscovery.class);
static CopyOnWriteArrayList<ConsulInstance> instances=new CopyOnWriteArrayList<>();
private final Consul consul;

public static void main(String[] args) {
OptionParser optionParser = new OptionParser();
optionParser.accepts("consulPort").withRequiredArg();
optionParser.accepts("consulHostname").withRequiredArg();
optionParser.accepts("app").withRequiredArg();

OptionSet options = optionParser.parse(args);

int consulPort = -1;
if (!options.has("consulPort")) {
System.err.println("Argument --consulPort required: port of Consul Service Discovery");
System.exit(-1);
} else {
try {
consulPort = Integer.parseInt(String.valueOf(options.valueOf("consulPort")));
} catch (NumberFormatException e) {
System.err.println("Value of consulPort must be an integer but was: " + options.valueOf("consulPort"));
}
}

String consulHostname = null;
if (!options.has("consulHostname")) {
System.err.println("Argument --consulHostname required: hostname of Consul Service Discovery");
System.exit(-1);
} else {
consulHostname = String.valueOf(options.valueOf("consulHostname"));
}

String app = null;
if (!options.has("app")) {
System.err.println("Argument --app required for Consul Instsance Discovery. Eg. -app api");
System.exit(-1);
} else {
app = String.valueOf(options.valueOf("app"));
}

HostAndPort consulHostAndPort = HostAndPort.fromParts(consulHostname, consulPort);

Consul consul = Consul.builder().withHostAndPort(consulHostAndPort).build();

new ConsulInstanceDiscovery(consul).getInstanceEvents(app).toBlocking().forEach(i -> System.out.println(i));
}

public ConsulInstanceDiscovery(Consul consul) {
this.consul = consul;
}

public Observable<ConsulInstance> getInstanceEvents(String appName) {
HealthClient healthClient = consul.healthClient();
ServiceHealthCache svHealth = ServiceHealthCache.newCache(healthClient, appName);
ArrayList<Subscriber<ConsulInstance>> subscribers=new ArrayList<>();
Observable<ConsulInstance> observableConsulInstances = Observable.create((x) ->{
subscribers.add((Subscriber<ConsulInstance>)x);
instances.forEach((i) ->{
x.onNext(i);
});
});

svHealth.addListener(new Listener<ServiceHealthKey, ServiceHealth>() {
@Override
public void notify(Map<ServiceHealthKey, ServiceHealth> newValues) {
try {
newValues.forEach((x, y) -> {
Service service = y.getService();
Optional<String> dataCenter = y.getNode().getDatacenter();

String cluster = dataCenter.isPresent() ? dataCenter.get() : service.getService();
ConsulInstance.Status status = ConsulInstance.Status.UP;
String hostName = service.getAddress();
int port = service.getPort();

for (HealthCheck check : y.getChecks()) {
if (check.getStatus().equals("passing") == false) {
status = ConsulInstance.Status.DOWN;
}
}
ConsulInstance instance = new ConsulInstance(cluster, status, hostName, port);
Collection<ConsulInstance> toRemove = Collections2.filter(instances, (c) ->{return c.getId().equals(instance.getId());});
for(ConsulInstance item : toRemove) { instances.remove(item);}
instances.add(instance);
});
} catch (Exception e) {
logger.warn("Error parsing notification from consul {}", newValues);
}
}
});
svHealth.start();

return observableConsulInstances;
}

public static Observable<ConsulInstance> delta(List<List<ConsulInstance>> listOfLists) {
if (listOfLists.size() == 1) {
return Observable.from(listOfLists.get(0));
} else {
// diff the two
List<ConsulInstance> newList = listOfLists.get(1);
List<ConsulInstance> oldList = new ArrayList<>(listOfLists.get(0));

Set<ConsulInstance> delta = new LinkedHashSet<>();
delta.addAll(newList);
// remove all that match in old
delta.removeAll(oldList);

// filter oldList to those that aren't in the newList
oldList.removeAll(newList);

// for all left in the oldList we'll create DROP events
for (ConsulInstance old : oldList) {
delta.add(new ConsulInstance(old.getCluster(), ConsulInstance.Status.DOWN, old.getHost(), old.getPort()));
}

return Observable.from(delta);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.turbine.discovery.consul;

import java.net.URI;

import com.netflix.turbine.discovery.StreamAction;
import com.netflix.turbine.discovery.StreamDiscovery;
import com.orbitz.consul.Consul;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

public class ConsulStreamDiscovery implements StreamDiscovery {

private static final Logger logger = LoggerFactory.getLogger(ConsulStreamDiscovery.class);

public static ConsulStreamDiscovery create(String appName, String uriTemplate, Consul consul) {
return new ConsulStreamDiscovery(appName, uriTemplate, consul);
}

public final static String HOSTNAME = "{HOSTNAME}";
private final String uriTemplate;
private final String appName;
private final Consul consul;

private ConsulStreamDiscovery(String appName, String uriTemplate, Consul consul) {
this.appName = appName;
this.uriTemplate = uriTemplate;
this.consul = consul;
}

@Override
public Observable<StreamAction> getInstanceList() {
return new ConsulInstanceDiscovery(consul)
.getInstanceEvents(appName)
.map(ei -> {
URI uri;
String uriString = uriTemplate.replace(HOSTNAME, ei.getHost() + ":" + ei.getPort());
try {
uri = new URI(uriString);
} catch (Exception e) {
throw new RuntimeException("Invalid URI: " + uriString, e);
}
if (ei.getStatus() == ConsulInstance.Status.UP) {
logger.info("StreamAction ADD");
return StreamAction.create(StreamAction.ActionType.ADD, uri);
} else {
logger.info("StreamAction REMOVE");
return StreamAction.create(StreamAction.ActionType.REMOVE, uri);
}

});
}

}
Loading