diff --git a/server/pom.xml b/server/pom.xml old mode 100644 new mode 100755 index 2c61224..a46d24b --- a/server/pom.xml +++ b/server/pom.xml @@ -176,5 +176,12 @@ testing test + + + org.mockito + mockito-core + test + + diff --git a/server/src/main/java/io/airlift/discovery/DiscoveryConfig.java b/server/src/main/java/io/airlift/discovery/DiscoveryConfig.java old mode 100644 new mode 100755 index ed70a3f..98e50ca --- a/server/src/main/java/io/airlift/discovery/DiscoveryConfig.java +++ b/server/src/main/java/io/airlift/discovery/DiscoveryConfig.java @@ -15,15 +15,24 @@ */ package io.airlift.discovery; +import com.google.common.collect.ForwardingSet; +import com.google.common.collect.ImmutableSet; import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; import io.airlift.units.Duration; +import javax.validation.constraints.AssertTrue; import javax.validation.constraints.NotNull; +import java.net.URI; +import java.util.Set; import java.util.concurrent.TimeUnit; public class DiscoveryConfig { private Duration maxAge = new Duration(30, TimeUnit.SECONDS); + private StringSet proxyProxiedTypes = StringSet.of(); + private String proxyEnvironment = null; + private URI proxyUri = null; @NotNull public Duration getMaxAge() @@ -32,9 +41,87 @@ public Duration getMaxAge() } @Config("discovery.max-age") + @ConfigDescription("Dynamic announcement expiration") public DiscoveryConfig setMaxAge(Duration maxAge) { this.maxAge = maxAge; return this; } + + public StringSet getProxyProxiedTypes() + { + return proxyProxiedTypes; + } + + @Config("discovery.proxy.proxied-types") + @ConfigDescription("Service types to proxy (test environments only)") + public DiscoveryConfig setProxyProxiedTypes(StringSet proxyProxiedTypes) + { + this.proxyProxiedTypes = proxyProxiedTypes; + return this; + } + + public String getProxyEnvironment() + { + return proxyEnvironment; + } + + @Config("discovery.proxy.environment") + @ConfigDescription("Environment to proxy to (test environments only)") + public DiscoveryConfig setProxyEnvironment(String proxyEnvironment) + { + this.proxyEnvironment = proxyEnvironment; + return this; + } + + public URI getProxyUri() + { + return proxyUri; + } + + @Config("discovery.proxy.uri") + @ConfigDescription("Discovery server to proxy to (test environments only)") + public DiscoveryConfig setProxyUri(URI proxyUri) + { + this.proxyUri = proxyUri; + return this; + } + + @AssertTrue(message = "discovery.proxy.environment specified if and only if any proxy types") + public boolean isProxyTypeAndEnvironment() + { + return proxyProxiedTypes.isEmpty() == (proxyEnvironment == null); + } + + @AssertTrue(message = "discovery.proxy.uri specified if and only if any proxy types") + public boolean isProxyTypeAndUri() + { + return proxyProxiedTypes.isEmpty() == (proxyUri == null); + } + + public static final class StringSet extends ForwardingSet + { + private final Set delegate; + + private StringSet(Set delegate) + { + this.delegate = ImmutableSet.copyOf(delegate); + } + + public static StringSet of(String... strings) + { + return new StringSet(ImmutableSet.copyOf(strings)); + } + + public static StringSet valueOf(String string) + { + return of(string.split("\\s*,\\s*")); + } + + @Override + protected Set delegate() + { + return delegate; + } + } } diff --git a/server/src/main/java/io/airlift/discovery/DiscoveryServerModule.java b/server/src/main/java/io/airlift/discovery/DiscoveryServerModule.java old mode 100644 new mode 100755 index f616d96..e182960 --- a/server/src/main/java/io/airlift/discovery/DiscoveryServerModule.java +++ b/server/src/main/java/io/airlift/discovery/DiscoveryServerModule.java @@ -34,6 +34,7 @@ import static io.airlift.configuration.ConfigurationModule.bindConfig; import static io.airlift.discovery.client.DiscoveryBinder.discoveryBinder; +import static io.airlift.http.client.HttpClientBinder.httpClientBinder; public class DiscoveryServerModule implements Module @@ -55,6 +56,10 @@ public void configure(Binder binder) binder.bind(StaticStore.class).to(ReplicatedStaticStore.class).in(Scopes.SINGLETON); binder.install(new ReplicatedStoreModule("static", ForStaticStore.class, PersistentStore.class)); bindConfig(binder).prefixedWith("static").to(PersistentStoreConfig.class); + + // proxy announcements + httpClientBinder(binder).bindAsyncHttpClient("discovery.proxy", ForProxyStore.class); + binder.bind(ProxyStore.class).in(Scopes.SINGLETON); } @Singleton diff --git a/server/src/main/java/io/airlift/discovery/DynamicAnnouncementResource.java b/server/src/main/java/io/airlift/discovery/DynamicAnnouncementResource.java old mode 100644 new mode 100755 index ac73956..40b3762 --- a/server/src/main/java/io/airlift/discovery/DynamicAnnouncementResource.java +++ b/server/src/main/java/io/airlift/discovery/DynamicAnnouncementResource.java @@ -15,6 +15,7 @@ */ package io.airlift.discovery; +import com.google.common.base.Joiner; import com.google.common.base.Objects; import io.airlift.node.NodeInfo; @@ -24,14 +25,15 @@ import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; -import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriInfo; +import java.util.HashSet; +import java.util.Set; import static java.lang.String.format; import static javax.ws.rs.core.Response.Status.ACCEPTED; import static javax.ws.rs.core.Response.Status.BAD_REQUEST; +import static javax.ws.rs.core.Response.Status.FORBIDDEN; import static javax.ws.rs.core.Response.Status.NOT_FOUND; @Path("/v1/announcement/{node_id}") @@ -39,17 +41,19 @@ public class DynamicAnnouncementResource { private final NodeInfo nodeInfo; private final DynamicStore dynamicStore; + private Set proxyTypes; @Inject - public DynamicAnnouncementResource(DynamicStore dynamicStore, NodeInfo nodeInfo) + public DynamicAnnouncementResource(DynamicStore dynamicStore, NodeInfo nodeInfo, DiscoveryConfig discoveryConfig) { this.dynamicStore = dynamicStore; this.nodeInfo = nodeInfo; + proxyTypes = discoveryConfig.getProxyProxiedTypes(); } @PUT @Consumes(MediaType.APPLICATION_JSON) - public Response put(@PathParam("node_id") Id nodeId, @Context UriInfo uriInfo, DynamicAnnouncement announcement) + public Response put(@PathParam("node_id") Id nodeId, DynamicAnnouncement announcement) { if (!nodeInfo.getEnvironment().equals(announcement.getEnvironment())) { return Response.status(BAD_REQUEST) @@ -57,6 +61,21 @@ public Response put(@PathParam("node_id") Id nodeId, @Context UriInfo uriI .build(); } + if (!proxyTypes.isEmpty()) { + Set forbiddenTypes = new HashSet<>(); + for (DynamicServiceAnnouncement serviceAnnouncement : announcement.getServiceAnnouncements()) { + String type = serviceAnnouncement.getType(); + if (proxyTypes.contains(type)) { + forbiddenTypes.add(type); + } + } + if (!forbiddenTypes.isEmpty()) { + return Response.status(FORBIDDEN) + .entity(format("Cannot announce proxied type%s %s", forbiddenTypes.size() == 1 ? "" : "s", Joiner.on(',').join(forbiddenTypes))) + .build(); + } + } + String location = Objects.firstNonNull(announcement.getLocation(), "/somewhere/" + nodeId.toString()); DynamicAnnouncement announcementWithLocation = DynamicAnnouncement.copyOf(announcement) diff --git a/server/src/main/java/io/airlift/discovery/ForProxyStore.java b/server/src/main/java/io/airlift/discovery/ForProxyStore.java new file mode 100755 index 0000000..c52bc36 --- /dev/null +++ b/server/src/main/java/io/airlift/discovery/ForProxyStore.java @@ -0,0 +1,31 @@ +/* + * Copyright 2013 Proofpoint, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.discovery; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target(PARAMETER) +@BindingAnnotation +public @interface ForProxyStore +{ +} diff --git a/server/src/main/java/io/airlift/discovery/Main.java b/server/src/main/java/io/airlift/discovery/Main.java old mode 100644 new mode 100755 index 234f5e7..0c01781 --- a/server/src/main/java/io/airlift/discovery/Main.java +++ b/server/src/main/java/io/airlift/discovery/Main.java @@ -58,7 +58,6 @@ public static void main(String[] args) } catch (Exception e) { log.error(e); - // Cassandra prevents the vm from shutting down on its own System.exit(1); } } diff --git a/server/src/main/java/io/airlift/discovery/ProxyStore.java b/server/src/main/java/io/airlift/discovery/ProxyStore.java new file mode 100755 index 0000000..90f4bb8 --- /dev/null +++ b/server/src/main/java/io/airlift/discovery/ProxyStore.java @@ -0,0 +1,218 @@ +/* + * Copyright 2013 Proofpoint, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.discovery; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSet.Builder; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Provider; +import io.airlift.discovery.client.DiscoveryException; +import io.airlift.discovery.client.DiscoveryLookupClient; +import io.airlift.discovery.client.HttpDiscoveryLookupClient; +import io.airlift.discovery.client.ServiceDescriptor; +import io.airlift.discovery.client.ServiceDescriptors; +import io.airlift.discovery.client.ServiceDescriptorsRepresentation; +import io.airlift.http.client.AsyncHttpClient; +import io.airlift.log.Logger; +import io.airlift.node.NodeInfo; +import io.airlift.units.Duration; + +import javax.annotation.Nullable; +import javax.inject.Inject; +import java.net.URI; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static io.airlift.discovery.client.HttpDiscoveryAnnouncementClient.DEFAULT_DELAY; +import static io.airlift.json.JsonCodec.jsonCodec; + +public class ProxyStore +{ + private final Set proxyTypes; + private final Map> map; + + private static final Logger log = Logger.get(ProxyStore.class); + + @Inject + public ProxyStore(final DiscoveryConfig discoveryConfig, Injector injector) + { + this.proxyTypes = discoveryConfig.getProxyProxiedTypes(); + + if (!proxyTypes.isEmpty()) { + map = new ConcurrentHashMap<>(); + AsyncHttpClient httpClient = injector.getInstance( + Key.get(AsyncHttpClient.class, ForProxyStore.class)); + DiscoveryLookupClient lookupClient = new HttpDiscoveryLookupClient( + new ProxyURIProvider(discoveryConfig), + new NodeInfo(discoveryConfig.getProxyEnvironment()), + jsonCodec(ServiceDescriptorsRepresentation.class), + httpClient); + ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(5, new ThreadFactoryBuilder() + .setNameFormat("Proxy-Discovery-%s") + .setDaemon(true) + .build()); + + for (String type : proxyTypes) { + new ServiceUpdater(type, lookupClient, poolExecutor).start(); + } + } + else { + map = null; + } + } + + public Set filterAndGetAll(Set services) + { + if (proxyTypes.isEmpty()) { + return services; + } + + Builder builder = ImmutableSet.builder(); + for (Service service : services) { + if (!proxyTypes.contains(service.getType())) { + builder.add(service); + } + } + for (Set set : map.values()) { + builder.addAll(set); + } + return builder.build(); + } + + @Nullable + public Set get(String type) + { + if (!proxyTypes.contains(type)) { + return null; + } + return map.get(type); + } + + @Nullable + public Set get(String type, final String pool) + { + if (!proxyTypes.contains(type)) { + return null; + } + Builder builder = ImmutableSet.builder(); + for (Service service : map.get(type)) { + if (pool.equals(service.getPool())) { + builder.add(service); + } + } + return builder.build(); + } + + private static class ProxyURIProvider implements Provider + { + private final URI uri; + + public ProxyURIProvider(DiscoveryConfig discoveryConfig) + { + uri = discoveryConfig.getProxyUri(); + } + + @Override + public URI get() + { + return uri; + } + } + + private class ServiceUpdater + { + private final String type; + private final DiscoveryLookupClient lookupClient; + private final ScheduledThreadPoolExecutor poolExecutor; + private final AtomicBoolean serverUp = new AtomicBoolean(true); + + public ServiceUpdater(String type, DiscoveryLookupClient lookupClient, ScheduledThreadPoolExecutor poolExecutor) + { + this.type = type; + this.lookupClient = lookupClient; + this.poolExecutor = poolExecutor; + } + + public void start() + { + try { + refresh().checkedGet(30, TimeUnit.SECONDS); + } + catch (TimeoutException ignored) { + } + } + + private CheckedFuture refresh() + { + final CheckedFuture future = lookupClient.getServices(type); + + future.addListener(new Runnable() + { + @Override + public void run() + { + Duration delay = DEFAULT_DELAY; + try { + ServiceDescriptors descriptors = future.checkedGet(); + delay = descriptors.getMaxAge(); + Builder builder = ImmutableSet.builder(); + for (ServiceDescriptor descriptor : descriptors.getServiceDescriptors()) { + builder.add(new Service( + Id.valueOf(descriptor.getId()), + Id.valueOf(descriptor.getNodeId()), + descriptor.getType(), + descriptor.getPool(), + descriptor.getLocation(), + descriptor.getProperties())); + } + map.put(type, builder.build()); + if (serverUp.compareAndSet(false, true)) { + log.info("Proxied discovery server connect succeeded for refresh (%s)", type); + } + } + catch (DiscoveryException e) { + if (serverUp.compareAndSet(true, false)) { + log.error("Cannot connect to proxy discovery server for refresh (%s): %s", type, e.getMessage()); + } + log.debug(e, "Cannot connect to proxy discovery server for refresh (%s)", type); + } + finally { + if (!poolExecutor.isShutdown()) { + poolExecutor.schedule(new Runnable() + { + @Override + public void run() + { + refresh(); + } + }, (long) delay.toMillis(), TimeUnit.MILLISECONDS); + } + } + } + }, poolExecutor); + + return future; + } + } +} diff --git a/server/src/main/java/io/airlift/discovery/ServiceResource.java b/server/src/main/java/io/airlift/discovery/ServiceResource.java old mode 100644 new mode 100755 index 5e6b7fd..17aa167 --- a/server/src/main/java/io/airlift/discovery/ServiceResource.java +++ b/server/src/main/java/io/airlift/discovery/ServiceResource.java @@ -23,7 +23,9 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; +import java.util.Set; +import static com.google.common.base.Objects.firstNonNull; import static com.google.common.collect.Sets.union; @@ -32,13 +34,15 @@ public class ServiceResource { private final DynamicStore dynamicStore; private final StaticStore staticStore; + private final ProxyStore proxyStore; private final NodeInfo node; @Inject - public ServiceResource(DynamicStore dynamicStore, StaticStore staticStore, NodeInfo node) + public ServiceResource(DynamicStore dynamicStore, StaticStore staticStore, ProxyStore proxyStore, NodeInfo node) { this.dynamicStore = dynamicStore; this.staticStore = staticStore; + this.proxyStore = proxyStore; this.node = node; } @@ -47,7 +51,8 @@ public ServiceResource(DynamicStore dynamicStore, StaticStore staticStore, NodeI @Produces(MediaType.APPLICATION_JSON) public Services getServices(@PathParam("type") String type, @PathParam("pool") String pool) { - return new Services(node.getEnvironment(), union(dynamicStore.get(type, pool), staticStore.get(type, pool))); + return new Services(node.getEnvironment(), firstNonNull(proxyStore.get(type, pool), + union(dynamicStore.get(type, pool), staticStore.get(type, pool)))); } @GET @@ -55,13 +60,15 @@ public Services getServices(@PathParam("type") String type, @PathParam("pool") S @Produces(MediaType.APPLICATION_JSON) public Services getServices(@PathParam("type") String type) { - return new Services(node.getEnvironment(), union(dynamicStore.get(type), staticStore.get(type))); + return new Services(node.getEnvironment(), firstNonNull(proxyStore.get(type), + union(dynamicStore.get(type), staticStore.get(type)))); } @GET @Produces(MediaType.APPLICATION_JSON) public Services getServices() { - return new Services(node.getEnvironment(), union(dynamicStore.getAll(), staticStore.getAll())); + Set services = union(dynamicStore.getAll(), staticStore.getAll()); + return new Services(node.getEnvironment(), proxyStore.filterAndGetAll(services)); } } diff --git a/server/src/main/java/io/airlift/discovery/StaticAnnouncementResource.java b/server/src/main/java/io/airlift/discovery/StaticAnnouncementResource.java old mode 100644 new mode 100755 index fb96c06..57030f1 --- a/server/src/main/java/io/airlift/discovery/StaticAnnouncementResource.java +++ b/server/src/main/java/io/airlift/discovery/StaticAnnouncementResource.java @@ -31,21 +31,25 @@ import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; import java.net.URI; +import java.util.Set; import static java.lang.String.format; import static javax.ws.rs.core.Response.Status.BAD_REQUEST; +import static javax.ws.rs.core.Response.Status.FORBIDDEN; @Path("/v1/announcement/static") public class StaticAnnouncementResource { private final StaticStore store; private final NodeInfo nodeInfo; + private final Set proxyTypes; @Inject - public StaticAnnouncementResource(StaticStore store, NodeInfo nodeInfo) + public StaticAnnouncementResource(StaticStore store, NodeInfo nodeInfo, DiscoveryConfig discoveryConfig) { this.store = store; this.nodeInfo = nodeInfo; + proxyTypes = discoveryConfig.getProxyProxiedTypes(); } @POST @@ -58,6 +62,12 @@ public Response post(StaticAnnouncement announcement, @Context UriInfo uriInfo) .build(); } + if (proxyTypes.contains(announcement.getType())) { + return Response.status(FORBIDDEN) + .entity(format("Cannot announce proxied type %s", announcement.getType())) + .build(); + } + Id id = Id.random(); String location = Objects.firstNonNull(announcement.getLocation(), "/somewhere/" + id); diff --git a/server/src/test/java/io/airlift/discovery/TestDiscoveryConfig.java b/server/src/test/java/io/airlift/discovery/TestDiscoveryConfig.java old mode 100644 new mode 100755 index 657399a..d5cec87 --- a/server/src/test/java/io/airlift/discovery/TestDiscoveryConfig.java +++ b/server/src/test/java/io/airlift/discovery/TestDiscoveryConfig.java @@ -17,10 +17,13 @@ import com.google.common.collect.ImmutableMap; import io.airlift.configuration.testing.ConfigAssertions; +import io.airlift.discovery.DiscoveryConfig.StringSet; import io.airlift.units.Duration; import org.testng.annotations.Test; +import javax.validation.constraints.AssertTrue; import javax.validation.constraints.NotNull; +import java.net.URI; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -32,7 +35,10 @@ public class TestDiscoveryConfig public void testDefaults() { ConfigAssertions.assertRecordedDefaults(ConfigAssertions.recordDefaults(DiscoveryConfig.class) - .setMaxAge(new Duration(30, TimeUnit.SECONDS))); + .setMaxAge(new Duration(30, TimeUnit.SECONDS)) + .setProxyProxiedTypes(DiscoveryConfig.StringSet.of()) + .setProxyEnvironment(null) + .setProxyUri(null)); } @Test @@ -40,14 +46,26 @@ public void testExplicitPropertyMappings() { Map properties = ImmutableMap.builder() .put("discovery.max-age", "1m") + .put("discovery.proxy.proxied-types", "foo , bar") + .put("discovery.proxy.environment", "pre-release") + .put("discovery.proxy.uri", "http://10.20.30.40:4111") .build(); DiscoveryConfig expected = new DiscoveryConfig() - .setMaxAge(new Duration(1, TimeUnit.MINUTES)); + .setMaxAge(new Duration(1, TimeUnit.MINUTES)) + .setProxyProxiedTypes(DiscoveryConfig.StringSet.of("foo", "bar")) + .setProxyEnvironment("pre-release") + .setProxyUri(URI.create("http://10.20.30.40:4111")); ConfigAssertions.assertFullMapping(properties, expected); } + @Test + public void testDeprecatedProperties() + { + ConfigAssertions.assertDeprecatedEquivalence(DiscoveryConfig.class, + ImmutableMap.of()); + } @Test public void testValidatesNotNullDuration() @@ -56,4 +74,36 @@ public void testValidatesNotNullDuration() assertFailsValidation(config, "maxAge", "may not be null", NotNull.class); } + + @Test + public void testProxyMissingEnvironment() + { + DiscoveryConfig config = new DiscoveryConfig().setProxyProxiedTypes(StringSet.of("foo")).setProxyUri(URI.create("http://10.20.30.40:4111")); + assertFailsValidation(config, "proxyTypeAndEnvironment", "discovery.proxy.environment specified if and only if any proxy types", + AssertTrue.class); + } + + @Test + public void testProxyEnvironment() + { + DiscoveryConfig config = new DiscoveryConfig().setProxyEnvironment("pre-release"); + assertFailsValidation(config, "proxyTypeAndEnvironment", "discovery.proxy.environment specified if and only if any proxy types", + AssertTrue.class); + } + + @Test + public void testProxyMissingUri() + { + DiscoveryConfig config = new DiscoveryConfig().setProxyProxiedTypes(StringSet.of("foo")).setProxyEnvironment("pre-release"); + assertFailsValidation(config, "proxyTypeAndUri", "discovery.proxy.uri specified if and only if any proxy types", + AssertTrue.class); + } + + @Test + public void testProxyUri() + { + DiscoveryConfig config = new DiscoveryConfig().setProxyUri(URI.create("http://10.20.30.40:4111")); + assertFailsValidation(config, "proxyTypeAndUri", "discovery.proxy.uri specified if and only if any proxy types", + AssertTrue.class); + } } diff --git a/server/src/test/java/io/airlift/discovery/TestDynamicAnnouncementResource.java b/server/src/test/java/io/airlift/discovery/TestDynamicAnnouncementResource.java old mode 100644 new mode 100755 index de05766..f9d9755 --- a/server/src/test/java/io/airlift/discovery/TestDynamicAnnouncementResource.java +++ b/server/src/test/java/io/airlift/discovery/TestDynamicAnnouncementResource.java @@ -17,14 +17,14 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.discovery.DiscoveryConfig.StringSet; import io.airlift.discovery.store.RealTimeProvider; -import io.airlift.jaxrs.testing.MockUriInfo; import io.airlift.node.NodeInfo; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import javax.ws.rs.core.Response; -import java.net.URI; +import javax.ws.rs.core.Response.Status; import static com.google.common.collect.Iterables.transform; import static io.airlift.discovery.DynamicServiceAnnouncement.toServiceWith; @@ -42,7 +42,7 @@ public class TestDynamicAnnouncementResource public void setup() { store = new InMemoryDynamicStore(new DiscoveryConfig(), new RealTimeProvider()); - resource = new DynamicAnnouncementResource(store, new NodeInfo("testing")); + resource = new DynamicAnnouncementResource(store, new NodeInfo("testing"), new DiscoveryConfig()); } @Test @@ -53,7 +53,7 @@ public void testPutNew() ); Id nodeId = Id.random(); - Response response = resource.put(nodeId, new MockUriInfo(URI.create("http://localhost:8080/v1/announcement/" + nodeId.toString())), announcement); + Response response = resource.put(nodeId, announcement); assertNotNull(response); assertEquals(response.getStatus(), Response.Status.ACCEPTED.getStatusCode()); @@ -75,7 +75,7 @@ public void testReplace() new DynamicServiceAnnouncement(Id.random(), "storage", ImmutableMap.of("key", "new"))) ); - Response response = resource.put(nodeId, new MockUriInfo(URI.create("http://localhost:8080/v1/announcement/" + nodeId.toString())), announcement); + Response response = resource.put(nodeId, announcement); assertNotNull(response); assertEquals(response.getStatus(), Response.Status.ACCEPTED.getStatusCode()); @@ -91,7 +91,7 @@ public void testEnvironmentConflict() ); Id nodeId = Id.random(); - Response response = resource.put(nodeId, new MockUriInfo(URI.create("http://localhost:8080/v1/announcement/" + nodeId.toString())), announcement); + Response response = resource.put(nodeId, announcement); assertNotNull(response); assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode()); @@ -99,6 +99,25 @@ public void testEnvironmentConflict() assertTrue(store.getAll().isEmpty()); } + @Test + public void testPutProxied() + { + resource = new DynamicAnnouncementResource(store, new NodeInfo("testing"), + new DiscoveryConfig().setProxyProxiedTypes(StringSet.of("storage"))); + + DynamicAnnouncement announcement = new DynamicAnnouncement("testing", "alpha", "/a/b/c", ImmutableSet.of( + new DynamicServiceAnnouncement(Id.random(), "storage", ImmutableMap.of("http", "http://localhost:1111"))) + ); + + Id nodeId = Id.random(); + Response response = resource.put(nodeId, announcement); + + assertNotNull(response); + assertEquals(response.getStatus(), Status.FORBIDDEN.getStatusCode()); + + assertTrue(store.getAll().isEmpty()); + } + @Test public void testDeleteExisting() { @@ -142,7 +161,7 @@ public void testMakesUpLocation() ); Id nodeId = Id.random(); - Response response = resource.put(nodeId, new MockUriInfo(URI.create("http://localhost:8080/v1/announcement/" + nodeId.toString())), announcement); + Response response = resource.put(nodeId, announcement); assertNotNull(response); assertEquals(response.getStatus(), Response.Status.ACCEPTED.getStatusCode()); diff --git a/server/src/test/java/io/airlift/discovery/TestProxyStore.java b/server/src/test/java/io/airlift/discovery/TestProxyStore.java new file mode 100755 index 0000000..a850f9f --- /dev/null +++ b/server/src/test/java/io/airlift/discovery/TestProxyStore.java @@ -0,0 +1,243 @@ +package io.airlift.discovery; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSet.Builder; +import com.google.common.collect.ListMultimap; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.inject.Injector; +import com.google.inject.Key; +import io.airlift.discovery.DiscoveryConfig.StringSet; +import io.airlift.discovery.client.DiscoveryException; +import io.airlift.http.client.AsyncHttpClient; +import io.airlift.http.client.Request; +import io.airlift.http.client.RequestStats; +import io.airlift.http.client.Response; +import io.airlift.http.client.ResponseHandler; +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static io.airlift.json.JsonCodec.jsonCodec; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class TestProxyStore +{ + @Test + public void testNoProxy() + { + Injector injector = mock(Injector.class); + ProxyStore proxyStore = new ProxyStore(new DiscoveryConfig(), injector); + Set services = ImmutableSet.of(new Service(Id.random(), Id.random(), "type", "pool", "/location", ImmutableMap.of("key", "value"))); + + assertEquals(proxyStore.filterAndGetAll(services), services); + assertEquals(proxyStore.get("foo"), null); + assertEquals(proxyStore.get("foo", "bar"), null); + verifyNoMoreInteractions(injector); + } + + @Test + public void testProxy() + throws InterruptedException + { + Service service1 = new Service(Id.random(), Id.random(), "storage", "pool1", "/location/1", ImmutableMap.of("key", "value")); + Service service2 = new Service(Id.random(), Id.random(), "storage", "pool2", "/location/2", ImmutableMap.of("key2", "value2")); + Service service3 = new Service(Id.random(), Id.random(), "customer", "general", "/location/3", ImmutableMap.of("key3", "value3")); + + DiscoveryConfig config = new DiscoveryConfig() + .setProxyProxiedTypes(StringSet.of("storage", "customer", "auth")) + .setProxyEnvironment("upstream") + .setProxyUri(URI.create("http://discovery.example.com")); + Injector injector = mock(Injector.class); + AsyncHttpClient httpClient = new TestingDiscoveryHttpClient(config, new Service[]{service1, service2, service3}); + when(injector.getInstance(Key.get(AsyncHttpClient.class, ForProxyStore.class))).thenReturn(httpClient); + ProxyStore proxyStore = new ProxyStore(config, injector); + Thread.sleep(100); + + Service service4 = new Service(Id.random(), Id.random(), "storage", "pool1", "/location/4", ImmutableMap.of("key4", "value4")); + Service service5 = new Service(Id.random(), Id.random(), "auth", "pool3", "/location/5", ImmutableMap.of("key5", "value5")); + Service service6 = new Service(Id.random(), Id.random(), "event", "general", "/location/6", ImmutableMap.of("key6", "value6")); + + assertEquals(proxyStore.filterAndGetAll(ImmutableSet.of(service4, service5, service6)), + ImmutableSet.of(service1, service2, service3, service6)); + + assertEquals(proxyStore.get("storage"), ImmutableSet.of(service1, service2)); + assertEquals(proxyStore.get("customer"), ImmutableSet.of(service3)); + assertEquals(proxyStore.get("auth"), ImmutableSet.of()); + assertEquals(proxyStore.get("event"), null); + + assertEquals(proxyStore.get("storage", "pool1"), ImmutableSet.of(service1)); + assertEquals(proxyStore.get("storage", "pool2"), ImmutableSet.of(service2)); + assertEquals(proxyStore.get("customer", "general"), ImmutableSet.of(service3)); + assertEquals(proxyStore.get("customer", "pool3"), ImmutableSet.of()); + assertEquals(proxyStore.get("auth", "pool3"), ImmutableSet.of()); + assertEquals(proxyStore.get("event", "general"), null); + } + + @Test(expectedExceptions = DiscoveryException.class, + expectedExceptionsMessageRegExp = "Expected environment to be upstream, but was mismatch") + public void testEnvironmentMismatch() + { + Service service1 = new Service(Id.random(), Id.random(), "storage", "pool1", "/location/1", ImmutableMap.of("key", "value")); + Service service2 = new Service(Id.random(), Id.random(), "storage", "pool2", "/location/2", ImmutableMap.of("key2", "value2")); + Service service3 = new Service(Id.random(), Id.random(), "customer", "general", "/location/3", ImmutableMap.of("key3", "value3")); + + Injector injector = mock(Injector.class); + AsyncHttpClient httpClient = new TestingDiscoveryHttpClient(new DiscoveryConfig() + .setProxyProxiedTypes(StringSet.of("storage", "customer", "auth")) + .setProxyEnvironment("mismatch") + .setProxyUri(URI.create("http://discovery.example.com")), new Service[]{service1, service2, service3}); + when(injector.getInstance(Key.get(AsyncHttpClient.class, ForProxyStore.class))).thenReturn(httpClient); + new ProxyStore(new DiscoveryConfig() + .setProxyProxiedTypes(StringSet.of("storage", "customer", "auth")) + .setProxyEnvironment("upstream") + .setProxyUri(URI.create("http://discovery.example.com")), injector); + } + + private static class TestingDiscoveryHttpClient implements AsyncHttpClient + { + private final DiscoveryConfig config; + private final ImmutableSet services; + + public TestingDiscoveryHttpClient(DiscoveryConfig config, Service[] services) + { + this.config = config; + this.services = ImmutableSet.copyOf(services); + } + + @Override + public AsyncHttpResponseFuture executeAsync(Request request, ResponseHandler responseHandler) + { + assertEquals(request.getMethod(), "GET"); + URI uri = request.getUri(); + assertTrue(uri.toString().startsWith("http://discovery.example.com/v1/service/"), "uri " + uri.toString() + " starts with expected prefix"); + String type = uri.toASCIIString().substring(40); + if (type.endsWith("/")) { + type = type.substring(0, type.length() - 1); + } + assertTrue(config.getProxyProxiedTypes().contains(type), "type " + type + " in configured proxy types"); + + Builder builder = ImmutableSet.builder(); + for (Service service : services) { + if (type.equals(service.getType())) { + builder.add(service); + } + } + final Services filteredServices = new Services(config.getProxyEnvironment(), builder.build()); + + return new TestingResponseFuture(request, responseHandler, filteredServices); + } + + @Override + public T execute(Request request, ResponseHandler responseHandler) + throws E + { + throw new UnsupportedOperationException(); + } + + @Override + public RequestStats getStats() + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() + { + } + + private static class TestingResponseFuture + extends AbstractFuture + implements AsyncHttpResponseFuture + { + public TestingResponseFuture(Request request, ResponseHandler responseHandler, final Services filteredServices) + { + try { + T result = responseHandler.handle(request, new Response() + { + @Override + public int getStatusCode() + { + return 200; + } + + @Override + public String getStatusMessage() + { + throw new UnsupportedOperationException(); + } + + @Override + public String getHeader(String name) + { + return null; + } + + @Override + public ListMultimap getHeaders() + { + throw new UnsupportedOperationException(); + } + + @Override + public long getBytesRead() + { + throw new UnsupportedOperationException(); + } + + @Override + public InputStream getInputStream() + throws IOException + { + return new ByteArrayInputStream(jsonCodec(Services.class).toJson(filteredServices).getBytes("UTF-8")); + } + }); + set(result); + } + catch (Exception e) { + setException(e); + } + } + + @Override + public String getState() + { + return "done"; + } + + @Override + public T checkedGet() + throws E + { + try { + return get(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + catch (ExecutionException e) { + throw (E) e.getCause(); + } + } + + @Override + public T checkedGet(long l, TimeUnit timeUnit) + throws TimeoutException, E + { + return checkedGet(); + } + } + } +} diff --git a/server/src/test/java/io/airlift/discovery/TestServiceResource.java b/server/src/test/java/io/airlift/discovery/TestServiceResource.java old mode 100644 new mode 100755 index 6d43f81..0f12b05 --- a/server/src/test/java/io/airlift/discovery/TestServiceResource.java +++ b/server/src/test/java/io/airlift/discovery/TestServiceResource.java @@ -18,13 +18,23 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.node.NodeInfo; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.Collections; +import java.util.Set; import static com.google.common.collect.ImmutableSet.of; +import static com.google.common.collect.Sets.union; import static io.airlift.discovery.DynamicServiceAnnouncement.toServiceWith; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; public class TestServiceResource @@ -32,13 +42,15 @@ public class TestServiceResource private InMemoryDynamicStore dynamicStore; private InMemoryStaticStore staticStore; private ServiceResource resource; + private ProxyStore proxyStore; @BeforeMethod protected void setUp() { dynamicStore = new InMemoryDynamicStore(new DiscoveryConfig(), new TestingTimeProvider()); staticStore = new InMemoryStaticStore(); - resource = new ServiceResource(dynamicStore, staticStore, new NodeInfo("testing")); + proxyStore = mock(ProxyStore.class); + resource = new ServiceResource(dynamicStore, staticStore, proxyStore, new NodeInfo("testing")); } @Test @@ -61,6 +73,8 @@ public void testGetByType() dynamicStore.put(greenNodeId, green); dynamicStore.put(blueNodeId, blue); + when(proxyStore.get(any(String.class))).thenReturn(null); + assertEquals(resource.getServices("storage"), new Services("testing", of( toServiceWith(redNodeId, red.getLocation(), red.getPool()).apply(redStorage), toServiceWith(greenNodeId, green.getLocation(), green.getPool()).apply(greenStorage), @@ -70,6 +84,9 @@ public void testGetByType() toServiceWith(redNodeId, red.getLocation(), red.getPool()).apply(redWeb)))); assertEquals(resource.getServices("unknown"), new Services("testing", Collections.emptySet())); + + verify(proxyStore, times(3)).get(any(String.class)); + verifyNoMoreInteractions(proxyStore); } @Test @@ -88,6 +105,8 @@ public void testGetByTypeAndPool() DynamicServiceAnnouncement blueStorage = new DynamicServiceAnnouncement(Id.random(), "storage", ImmutableMap.of("key", "4")); DynamicAnnouncement blue = new DynamicAnnouncement("testing", "beta", "/a/b/c", of(blueStorage)); + when(proxyStore.get(any(String.class), any(String.class))).thenReturn(null); + dynamicStore.put(redNodeId, red); dynamicStore.put(greenNodeId, green); dynamicStore.put(blueNodeId, blue); @@ -99,6 +118,9 @@ public void testGetByTypeAndPool() assertEquals(resource.getServices("storage", "beta"), new Services("testing", ImmutableSet.of(toServiceWith(blueNodeId, blue.getLocation(), blue.getPool()).apply(blueStorage)))); assertEquals(resource.getServices("storage", "unknown"), new Services("testing", Collections.emptySet())); + + verify(proxyStore, times(3)).get(any(String.class), any(String.class)); + verifyNoMoreInteractions(proxyStore); } @Test @@ -121,7 +143,117 @@ public void testGetAll() dynamicStore.put(greenNodeId, green); dynamicStore.put(blueNodeId, blue); + when(proxyStore.filterAndGetAll(any(Set.class))).thenAnswer(new Answer>() + { + @Override + public Set answer(InvocationOnMock invocationOnMock) + throws Throwable + { + return (Set) invocationOnMock.getArguments()[0]; + } + }); + + assertEquals(resource.getServices(), new Services("testing", ImmutableSet.of( + toServiceWith(redNodeId, red.getLocation(), red.getPool()).apply(redStorage), + toServiceWith(redNodeId, red.getLocation(), red.getPool()).apply(redWeb), + toServiceWith(greenNodeId, green.getLocation(), green.getPool()).apply(greenStorage), + toServiceWith(blueNodeId, blue.getLocation(), blue.getPool()).apply(blueStorage)))); + + verify(proxyStore).filterAndGetAll(any(Set.class)); + verifyNoMoreInteractions(proxyStore); + } + + @Test + public void testProxyGetByType() + { + Id redNodeId = Id.random(); + DynamicServiceAnnouncement redStorage = new DynamicServiceAnnouncement(Id.random() , "storage", ImmutableMap.of("key", "1")); + DynamicServiceAnnouncement redWeb = new DynamicServiceAnnouncement(Id.random(), "web", ImmutableMap.of("key", "2")); + DynamicAnnouncement red = new DynamicAnnouncement("testing", "alpha", "/a/b/c", of(redStorage, redWeb)); + + Id greenNodeId = Id.random(); + DynamicServiceAnnouncement greenStorage = new DynamicServiceAnnouncement(Id.random(), "storage", ImmutableMap.of("key", "3")); + DynamicAnnouncement green = new DynamicAnnouncement("testing", "alpha", "/x/y/z", of(greenStorage)); + + Id blueNodeId = Id.random(); + DynamicServiceAnnouncement blueStorage = new DynamicServiceAnnouncement(Id.random(), "storage", ImmutableMap.of("key", "4")); + DynamicAnnouncement blue = new DynamicAnnouncement("testing", "beta", "/a/b/c", of(blueStorage)); + + dynamicStore.put(redNodeId, red); + dynamicStore.put(greenNodeId, green); + dynamicStore.put(blueNodeId, blue); + + Service proxyStorageService = new Service(Id.random(), Id.random(), "storage", "general", "loc", ImmutableMap.of("key", "5")); + when(proxyStore.get("storage")).thenReturn(of(proxyStorageService)); + + assertEquals(resource.getServices("storage"), new Services("testing", of(proxyStorageService))); + + assertEquals(resource.getServices("web"), new Services("testing", ImmutableSet.of())); + } + + @Test + public void testProxyGetByTypeAndPool() + { + Id redNodeId = Id.random(); + DynamicServiceAnnouncement redStorage = new DynamicServiceAnnouncement(Id.random(), "storage", ImmutableMap.of("key", "1")); + DynamicServiceAnnouncement redWeb = new DynamicServiceAnnouncement(Id.random(), "web", ImmutableMap.of("key", "2")); + DynamicAnnouncement red = new DynamicAnnouncement("testing", "alpha", "/a/b/c", of(redStorage, redWeb)); + + Id greenNodeId = Id.random(); + DynamicServiceAnnouncement greenStorage = new DynamicServiceAnnouncement(Id.random(), "storage", ImmutableMap.of("key", "3")); + DynamicAnnouncement green = new DynamicAnnouncement("testing", "alpha", "/x/y/z", of(greenStorage)); + + Id blueNodeId = Id.random(); + DynamicServiceAnnouncement blueStorage = new DynamicServiceAnnouncement(Id.random(), "storage", ImmutableMap.of("key", "4")); + DynamicAnnouncement blue = new DynamicAnnouncement("testing", "beta", "/a/b/c", of(blueStorage)); + + dynamicStore.put(redNodeId, red); + dynamicStore.put(greenNodeId, green); + dynamicStore.put(blueNodeId, blue); + + Service proxyStorageService = new Service(Id.random(), Id.random(), "storage", "alpha", "loc", ImmutableMap.of("key", "5")); + when(proxyStore.get("storage", "alpha")).thenReturn(of(proxyStorageService)); + + assertEquals(resource.getServices("storage", "alpha"), new Services("testing", ImmutableSet.of(proxyStorageService))); + + assertEquals(resource.getServices("storage", "beta"), new Services("testing", ImmutableSet.of())); + + assertEquals(resource.getServices("storage", "unknown"), new Services("testing", ImmutableSet.of())); + } + + @Test + public void testProxyGetAll() + { + Id redNodeId = Id.random(); + DynamicServiceAnnouncement redStorage = new DynamicServiceAnnouncement(Id.random(), "storage", ImmutableMap.of("key", "1")); + DynamicServiceAnnouncement redWeb = new DynamicServiceAnnouncement(Id.random(), "web", ImmutableMap.of("key", "2")); + DynamicAnnouncement red = new DynamicAnnouncement("testing", "alpha", "/a/b/c", of(redStorage, redWeb)); + + Id greenNodeId = Id.random(); + DynamicServiceAnnouncement greenStorage = new DynamicServiceAnnouncement(Id.random(), "storage", ImmutableMap.of("key", "3")); + DynamicAnnouncement green = new DynamicAnnouncement("testing", "alpha", "/x/y/z", of(greenStorage)); + + Id blueNodeId = Id.random(); + DynamicServiceAnnouncement blueStorage = new DynamicServiceAnnouncement(Id.random(), "storage", ImmutableMap.of("key", "4")); + DynamicAnnouncement blue = new DynamicAnnouncement("testing", "beta", "/a/b/c", of(blueStorage)); + + dynamicStore.put(redNodeId, red); + dynamicStore.put(greenNodeId, green); + dynamicStore.put(blueNodeId, blue); + + final Service proxyStorageService = new Service(Id.random(), Id.random(), "storage", "alpha", "loc", ImmutableMap.of("key", "5")); + when(proxyStore.filterAndGetAll(any(Set.class))).thenAnswer(new Answer>() + { + @Override + public Set answer(InvocationOnMock invocationOnMock) + throws Throwable + { + return union(of(proxyStorageService), + (Set) invocationOnMock.getArguments()[0]); + } + }); assertEquals(resource.getServices(), new Services("testing", ImmutableSet.of( + proxyStorageService, toServiceWith(redNodeId, red.getLocation(), red.getPool()).apply(redStorage), toServiceWith(redNodeId, red.getLocation(), red.getPool()).apply(redWeb), toServiceWith(greenNodeId, green.getLocation(), green.getPool()).apply(greenStorage), diff --git a/server/src/test/java/io/airlift/discovery/TestStaticAnnouncementResource.java b/server/src/test/java/io/airlift/discovery/TestStaticAnnouncementResource.java old mode 100644 new mode 100755 index 970ce08..0094506 --- a/server/src/test/java/io/airlift/discovery/TestStaticAnnouncementResource.java +++ b/server/src/test/java/io/airlift/discovery/TestStaticAnnouncementResource.java @@ -17,12 +17,14 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.discovery.DiscoveryConfig.StringSet; import io.airlift.jaxrs.testing.MockUriInfo; import io.airlift.node.NodeInfo; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; import java.net.URI; import static org.testng.Assert.assertEquals; @@ -39,7 +41,7 @@ public class TestStaticAnnouncementResource public void setup() { store = new InMemoryStaticStore(); - resource = new StaticAnnouncementResource(store, new NodeInfo("testing")); + resource = new StaticAnnouncementResource(store, new NodeInfo("testing"), new DiscoveryConfig()); } @Test @@ -76,6 +78,20 @@ public void testEnvironmentConflict() assertTrue(store.getAll().isEmpty()); } + @Test + public void testPostProxied() + { + resource = new StaticAnnouncementResource(store, new NodeInfo("testing"), new DiscoveryConfig().setProxyProxiedTypes(StringSet.of("storage"))); + StaticAnnouncement announcement = new StaticAnnouncement("testing", "storage", "alpha", "/a/b/c", ImmutableMap.of("http", "http://localhost:1111")); + + Response response = resource.post(announcement, new MockUriInfo(URI.create("http://localhost:8080/v1/announcement/static"))); + + assertNotNull(response); + assertEquals(response.getStatus(), Status.FORBIDDEN.getStatusCode()); + + assertTrue(store.getAll().isEmpty()); + } + @Test public void testDelete() {