diff --git a/docs/src/main/asciidoc/se/webclient.adoc b/docs/src/main/asciidoc/se/webclient.adoc index 59e5c9d614f..2d0f61acf68 100644 --- a/docs/src/main/asciidoc/se/webclient.adoc +++ b/docs/src/main/asciidoc/se/webclient.adoc @@ -519,6 +519,55 @@ include::{sourcedir}/se/WebClientSnippets.java[tag=snippet_8,indent=0] <1> `application.yaml` is a default configuration source loaded when YAML support is on classpath, so we can just use `Config.create()` <2> Passing the client configuration node +=== Setting connection limits +It is possible to limit connection numbers for specific hosts, proxies or even the total number of connections the client is allowed to create. None of these limits is mandatory to set. + +In the examples below we are setting fixed limit implementations, but it is possible to use any implementation of the interface `io.helidon.common.concurrency.limits.Limit`. + +Note: Connection limiting is currently supported only for the HTTP1 client connections. + +==== Setting connection limit in your code +Below is an example of how to set connection limit to your client programmatically. + +[source,java] +---- +include::{sourcedir}/se/WebClientSnippets.java[tag=snippet_13,indent=0] +---- + +<1> Overall connection limit set to 100. The client will not create more active connections than that. +<2> Setting a per-host limit. Every host will have their connection count limited to 5. +<3> This is how we can set limit only to the specific host. This overrides the per-host limit set above. This client will not create more than 2 connections to the host with the name `some-host`. +<4> Disable shared cache so your cache configuration can take effect. + +==== Setting connection limit via configuration +It is also possible to set different limits via configuration. + +[source,yaml] +.Setting up Http1Client configuration into the `application.yaml` file. +---- +client: + share-connection-cache: false + connection-cache-config: + connection-limit: + fixed: + permits: 100 + connection-per-host-limit: + fixed: + permits: 5 + host-limits: + - host: "some-host" + limit: + fixed: + permits: 2 +---- +Then, in your application code, load the configuration from that file. + +[source,java] +.Http1Client initialization using the `application.yaml` file located on the classpath +---- +include::{sourcedir}/se/WebClientSnippets.java[tag=snippet_14,indent=0] +---- + == Reference * link:{webclient-javadoc-base-url}.api/module-summary.html[Helidon Webclient API] diff --git a/docs/src/main/java/io/helidon/docs/se/WebClientSnippets.java b/docs/src/main/java/io/helidon/docs/se/WebClientSnippets.java index 65c9b8ed1b7..3292964c3b4 100644 --- a/docs/src/main/java/io/helidon/docs/se/WebClientSnippets.java +++ b/docs/src/main/java/io/helidon/docs/se/WebClientSnippets.java @@ -15,16 +15,18 @@ */ package io.helidon.docs.se; +import io.helidon.common.concurrency.limits.FixedLimit; import io.helidon.common.media.type.MediaTypes; import io.helidon.config.Config; import io.helidon.http.Method; import io.helidon.http.media.MediaSupport; import io.helidon.webclient.api.ClientResponseTyped; -import io.helidon.webclient.api.HttpClientRequest; import io.helidon.webclient.api.HttpClientResponse; import io.helidon.webclient.api.Proxy; import io.helidon.webclient.api.WebClient; +import io.helidon.webclient.http1.Http1Client; import io.helidon.webclient.http1.Http1ClientProtocolConfig; +import io.helidon.webclient.http1.Http1ConnectionCacheConfig; import io.helidon.webclient.metrics.WebClientMetrics; import io.helidon.webclient.spi.WebClientService; @@ -176,4 +178,28 @@ void snippet_12() { // end::snippet_12[] } + void snippet_13() { + // tag::snippet_13[] + Http1ConnectionCacheConfig cacheConfig = Http1ConnectionCacheConfig.builder() + .connectionLimit(FixedLimit.builder().permits(100).build()) // <1> + .connectionPerHostLimit(FixedLimit.builder().permits(5).build()) // <2> + .addHostLimit(builder -> builder.host("some-host") // <3> + .limit(FixedLimit.builder().permits(2).build())) + .build(); + + Http1Client client = Http1Client.builder() + .shareConnectionCache(false) // <4> + .connectionCacheConfig(cacheConfig) + .build(); + // end::snippet_13[] + } + + void snippet_14() { + // tag::snippet_14[] + Config config = Config.create(); + + Http1Client client = Http1Client.create(config.get("client")); + // end::snippet_14[] + } + } diff --git a/webclient/api/src/main/java/io/helidon/webclient/api/TcpClientConnection.java b/webclient/api/src/main/java/io/helidon/webclient/api/TcpClientConnection.java index 54c4ae19ae5..f54a5c2e8ed 100644 --- a/webclient/api/src/main/java/io/helidon/webclient/api/TcpClientConnection.java +++ b/webclient/api/src/main/java/io/helidon/webclient/api/TcpClientConnection.java @@ -192,10 +192,12 @@ public void closeResource() { if (closed) { return; } - try { - this.socket.close(); - } catch (IOException e) { - LOGGER.log(TRACE, "Failed to close a client socket", e); + if (this.socket != null) { + try { + this.socket.close(); + } catch (IOException e) { + LOGGER.log(TRACE, "Failed to close a client socket", e); + } } this.closed = true; closeConsumer.accept(this); diff --git a/webclient/api/src/main/java/module-info.java b/webclient/api/src/main/java/module-info.java index 70636be3d5a..7ded6ba7cc7 100644 --- a/webclient/api/src/main/java/module-info.java +++ b/webclient/api/src/main/java/module-info.java @@ -51,5 +51,5 @@ uses io.helidon.webclient.spi.WebClientServiceProvider; uses io.helidon.webclient.spi.ProtocolConfigProvider; uses io.helidon.webclient.spi.HttpClientSpiProvider; - + } diff --git a/webclient/http1/pom.xml b/webclient/http1/pom.xml index 4460015f3df..b4dd902c7d5 100644 --- a/webclient/http1/pom.xml +++ b/webclient/http1/pom.xml @@ -68,6 +68,10 @@ io.helidon.common helidon-common-context + + io.helidon.common.concurrency + helidon-common-concurrency-limits + io.helidon.common.features helidon-common-features-api @@ -102,6 +106,16 @@ helidon-common-testing-http-junit5 test + + io.helidon.config + helidon-config + test + + + io.helidon.config + helidon-config-yaml + test + diff --git a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1Client.java b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1Client.java index 3a663fb12e8..8b080d48d7b 100644 --- a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1Client.java +++ b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1Client.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2023 Oracle and/or its affiliates. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,7 +45,8 @@ public interface Http1Client extends HttpClient, RuntimeType * @return fluent API builder */ static Http1ClientConfig.Builder builder() { - return Http1ClientConfig.builder(); + return Http1ClientConfig.builder() + .update(it -> it.from(Http1ClientImpl.globalConfig())); } /** @@ -65,8 +66,7 @@ static Http1Client create(Http1ClientConfig clientConfig) { * @return a new client */ static Http1Client create(Consumer consumer) { - return Http1ClientConfig.builder() - .update(consumer) + return builder().update(consumer) .build(); } @@ -88,4 +88,15 @@ static Http1Client create() { static Http1Client create(Config config) { return create(it -> it.config(config)); } + + /** + * Configure the default Http1 client configuration. + * Note: This method needs to be used before Helidon is started to have the full effect. + * + * @param clientConfig global client config + */ + static void configureDefaults(Http1ClientConfig clientConfig) { + Http1ClientImpl.GLOBAL_CONFIG.compareAndSet(null, clientConfig); + } + } diff --git a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ClientConfigBlueprint.java b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ClientConfigBlueprint.java index 0338e389eab..2f7e861f641 100644 --- a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ClientConfigBlueprint.java +++ b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ClientConfigBlueprint.java @@ -23,6 +23,7 @@ /** * HTTP/1.1. full webclient configuration. */ +@Prototype.Configured @Prototype.Blueprint interface Http1ClientConfigBlueprint extends HttpClientConfig, Prototype.Factory { /** @@ -32,4 +33,14 @@ interface Http1ClientConfigBlueprint extends HttpClientConfig, Prototype.Factory */ @Option.Default("create()") Http1ClientProtocolConfig protocolConfig(); + + /** + * Client connection cache configuration. + * + * @return cache configuration + */ + @Option.Default("create()") + @Option.Configured + Http1ConnectionCacheConfig connectionCacheConfig(); + } diff --git a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ClientImpl.java b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ClientImpl.java index f06c4300837..3e88c2726f4 100644 --- a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ClientImpl.java +++ b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ClientImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2023 Oracle and/or its affiliates. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,12 @@ package io.helidon.webclient.http1; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import io.helidon.common.LazyValue; +import io.helidon.common.config.Config; +import io.helidon.common.config.GlobalConfig; import io.helidon.http.Method; import io.helidon.webclient.api.ClientRequest; import io.helidon.webclient.api.ClientUri; @@ -24,6 +30,15 @@ import io.helidon.webclient.spi.HttpClientSpi; class Http1ClientImpl implements Http1Client, HttpClientSpi { + static final AtomicReference GLOBAL_CONFIG = new AtomicReference<>(); + private static final LazyValue LAZY_GLOBAL_CONFIG = LazyValue.create(() -> { + Config config = GlobalConfig.config(); + return Http1ClientConfig.builder() + .servicesDiscoverServices(false) + .config(config.get("client")) + .buildPrototype(); + }); + private final WebClient webClient; private final Http1ClientConfig clientConfig; private final Http1ClientProtocolConfig protocolConfig; @@ -38,11 +53,16 @@ class Http1ClientImpl implements Http1Client, HttpClientSpi { this.connectionCache = Http1ConnectionCache.shared(); this.clientCache = null; } else { - this.connectionCache = Http1ConnectionCache.create(); + this.connectionCache = Http1ConnectionCache.create(clientConfig.connectionCacheConfig()); this.clientCache = connectionCache; } } + static Http1ClientConfig globalConfig() { + return Optional.ofNullable(Http1ClientImpl.GLOBAL_CONFIG.get()) + .orElseGet(Http1ClientImpl.LAZY_GLOBAL_CONFIG); + } + @Override public Http1ClientRequest method(Method method) { ClientUri clientUri = clientConfig.baseUri() diff --git a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ConnectionCache.java b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ConnectionCache.java index 8f862cbe3a0..2df0cd84efc 100644 --- a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ConnectionCache.java +++ b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ConnectionCache.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,15 +16,28 @@ package io.helidon.webclient.http1; +import java.net.InetSocketAddress; +import java.net.ProxySelector; +import java.net.SocketAddress; +import java.net.URI; import java.time.Duration; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import java.util.stream.Collectors; +import io.helidon.common.concurrency.limits.FixedLimit; +import io.helidon.common.concurrency.limits.Limit; +import io.helidon.common.concurrency.limits.LimitAlgorithm; import io.helidon.common.tls.Tls; import io.helidon.http.ClientRequestHeaders; import io.helidon.http.HeaderValues; @@ -34,7 +47,6 @@ import io.helidon.webclient.api.ConnectionKey; import io.helidon.webclient.api.Proxy; import io.helidon.webclient.api.TcpClientConnection; -import io.helidon.webclient.api.WebClient; import io.helidon.webclient.spi.ClientConnectionCache; import static java.lang.System.Logger.Level.DEBUG; @@ -46,14 +58,37 @@ class Http1ConnectionCache extends ClientConnectionCache { private static final System.Logger LOGGER = System.getLogger(Http1ConnectionCache.class.getName()); private static final Tls NO_TLS = Tls.builder().enabled(false).build(); private static final String HTTPS = "https"; - private static final Http1ConnectionCache SHARED = new Http1ConnectionCache(true); + private static final ConnectionCreationStrategy UNLIMITED_STRATEGY = new UnlimitedConnectionStrategy(); private static final List ALPN_ID = List.of(Http1Client.PROTOCOL_ID); private static final Duration QUEUE_TIMEOUT = Duration.ofMillis(10); + private static final Http1ConnectionCacheConfig EMPTY_CONFIG = Http1ConnectionCacheConfig.create(); + private static final Http1ConnectionCache SHARED = new Http1ConnectionCache(true, + Http1ClientImpl.globalConfig() + .connectionCacheConfig()); + private final ConnectionCreationStrategy connectionCreationStrategy; + private final Duration keepAliveWaiting; private final Map> cache = new ConcurrentHashMap<>(); private final AtomicBoolean closed = new AtomicBoolean(); - protected Http1ConnectionCache(boolean shared) { + protected Http1ConnectionCache(boolean shared, Http1ConnectionCacheConfig cacheConfig) { super(shared); + if (cacheConfig.enableConnectionLimits()) { + if (cacheConfig.connectionLimit().isPresent() + || cacheConfig.connectionPerHostLimit().isPresent() + || !cacheConfig.hostLimits().isEmpty() + || !cacheConfig.proxyLimits().isEmpty()) { + connectionCreationStrategy = new LimitedConnectionStrategy(cacheConfig); + } else { + connectionCreationStrategy = UNLIMITED_STRATEGY; + } + } else { + connectionCreationStrategy = UNLIMITED_STRATEGY; + } + keepAliveWaiting = cacheConfig.keepAliveTimeout(); + } + + private Http1ConnectionCache(Http1ConnectionCacheConfig clientConfig) { + this(false, clientConfig); } static Http1ConnectionCache shared() { @@ -61,7 +96,11 @@ static Http1ConnectionCache shared() { } static Http1ConnectionCache create() { - return new Http1ConnectionCache(false); + return new Http1ConnectionCache(EMPTY_CONFIG); + } + + static Http1ConnectionCache create(Http1ConnectionCacheConfig cacheConfig) { + return new Http1ConnectionCache(cacheConfig); } ClientConnection connection(Http1ClientImpl http1Client, @@ -135,13 +174,31 @@ private ClientConnection keepAliveConnection(Http1ClientImpl http1Client, } if (connection == null) { - connection = TcpClientConnection.create(http1Client.webClient(), - connectionKey, - ALPN_ID, - conn -> finishRequest(connectionQueue, conn), - conn -> { - }) - .connect(); + connection = connectionCreationStrategy.createConnection(connectionKey, + http1Client, + conn -> finishRequest(connectionQueue, conn), + true); + if (connection == null) { + try { + while ((connection = connectionQueue.poll(keepAliveWaiting.toMillis(), TimeUnit.MILLISECONDS)) != null + && !connection.isConnected()) { + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (connection == null) { + throw new IllegalStateException("Could not make a new HTTP connection. " + + "Maximum number of connections reached."); + } else { + if (LOGGER.isLoggable(DEBUG)) { + LOGGER.log(DEBUG, String.format("[%s] client connection obtained %s", + connection.channelId(), + Thread.currentThread().getName())); + } + } + } else { + connection.connect(); + } } else { if (LOGGER.isLoggable(DEBUG)) { LOGGER.log(DEBUG, String.format("[%s] client connection obtained %s", @@ -156,32 +213,35 @@ private ClientConnection oneOffConnection(Http1ClientImpl http1Client, Tls tls, ClientUri uri, Proxy proxy) { - - WebClient webClient = http1Client.webClient(); Http1ClientConfig clientConfig = http1Client.clientConfig(); + ConnectionKey connectionKey = new ConnectionKey(uri.scheme(), + uri.host(), + uri.port(), + clientConfig.readTimeout().orElse(Duration.ZERO), + tls, + clientConfig.dnsResolver(), + clientConfig.dnsAddressLookup(), + proxy); + + TcpClientConnection connection = connectionCreationStrategy.createConnection(connectionKey, + http1Client, + conn -> false, + false); - return TcpClientConnection.create(webClient, - new ConnectionKey(uri.scheme(), - uri.host(), - uri.port(), - clientConfig.readTimeout().orElse(Duration.ZERO), - tls, - clientConfig.dnsResolver(), - clientConfig.dnsAddressLookup(), - proxy), - ALPN_ID, - conn -> false, // always close connection - conn -> { - }) - - .connect(); + if (connection == null) { + throw new IllegalStateException("Could not make a new HTTP connection. Maximum number of connections reached."); + } + return connection.connect(); } private boolean finishRequest(LinkedBlockingDeque connectionQueue, TcpClientConnection conn) { if (conn.isConnected()) { try { + //Connection needs to be marked as idle here. + //This prevents race condition where another thread takes it out of the connection before setting it to + //idle state. + conn.helidonSocket().idle(); // mark it as idle to stay blocked at read for closed conn detection if (connectionQueue.offer(conn, QUEUE_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) { - conn.helidonSocket().idle(); // mark it as idle to stay blocked at read for closed conn detection if (LOGGER.isLoggable(DEBUG)) { LOGGER.log(DEBUG, "[%s] client connection returned %s", conn.channelId(), @@ -204,7 +264,208 @@ private boolean finishRequest(LinkedBlockingDeque connectio } } } - return false; } + + ConnectionCreationStrategy strategy() { + return connectionCreationStrategy; + } + + sealed interface ConnectionCreationStrategy permits UnlimitedConnectionStrategy, LimitedConnectionStrategy { + + TcpClientConnection createConnection(ConnectionKey connectionKey, + Http1ClientImpl http1Client, + Function releaseFunction, + boolean keepAlive); + + } + + static final class UnlimitedConnectionStrategy implements ConnectionCreationStrategy { + + UnlimitedConnectionStrategy() { + } + + @Override + public TcpClientConnection createConnection(ConnectionKey connectionKey, + Http1ClientImpl http1Client, + Function releaseFunction, + boolean keepAlive) { + return TcpClientConnection.create(http1Client.webClient(), + connectionKey, + ALPN_ID, + releaseFunction, + conn -> {}); + } + + } + + static final class LimitedConnectionStrategy implements ConnectionCreationStrategy { + + private static final Limit NOOP = FixedLimit.create(); + + private final Lock hostsConnectionLimitLock = new ReentrantLock(); + private final Limit connectionLimit; + private final Limit nonProxyConnectionLimit; + private final Limit connectionPerHostLimit; + private final Map proxyConfigs; + private final Map proxyConnectionLimits; + private final Map connectionLimitsPerHost = new HashMap<>(); + + LimitedConnectionStrategy(Http1ConnectionCacheConfig cacheConfig) { + connectionLimit = cacheConfig.connectionLimit().orElse(NOOP).copy(); + nonProxyConnectionLimit = cacheConfig.nonProxyConnectionLimit().orElse(NOOP).copy(); + connectionPerHostLimit = cacheConfig.connectionPerHostLimit().orElse(NOOP).copy(); + for (Http1HostLimitConfig hostLimit : cacheConfig.hostLimits()) { + String key = hostLimit.host(); + connectionLimitsPerHost.put(key, hostLimit.limit().copy()); + } + Map proxyConnectionLimits = new HashMap<>(); + for (Http1ProxyLimitConfig proxyLimit : cacheConfig.proxyLimits()) { + proxyLimit.connectionLimit().ifPresent(it -> proxyConnectionLimits.put(proxyLimit.authority(), it.copy())); + for (Http1HostLimitConfig hostLimit : proxyLimit.hostLimits()) { + String key = hostAndProxyKey(hostLimit.host(), proxyLimit.authority()); + connectionLimitsPerHost.put(key, hostLimit.limit().copy()); + } + } + this.proxyConnectionLimits = Map.copyOf(proxyConnectionLimits); + this.proxyConfigs = cacheConfig.proxyLimits().stream().collect(Collectors.toMap(Http1ProxyLimitConfig::authority, + Function.identity())); + } + + private static String hostAndProxyKey(String host, String proxyAuthority) { + return host + "|" + proxyAuthority; + } + + @Override + public TcpClientConnection createConnection(ConnectionKey connectionKey, + Http1ClientImpl http1Client, + Function releaseFunction, + boolean keepAlive) { + //Maximum connections was not reached + //New connection should be created + Optional maxConnectionToken = connectionLimit.tryAcquire(!keepAlive); + if (maxConnectionToken.isPresent()) { + //Maximum connections was not reached + return checkProxyConnectionLimits(maxConnectionToken.get(), + connectionKey, + http1Client, + releaseFunction, + keepAlive); + } + return null; + } + + private TcpClientConnection checkProxyConnectionLimits(LimitAlgorithm.Token maxConnectionToken, + ConnectionKey connectionKey, + Http1ClientImpl http1Client, + Function releaseFunction, + boolean keepAlive) { + Proxy proxy = connectionKey.proxy(); + Optional maxProxyConnectionToken; + String proxyIdent; + if (proxy.type() == Proxy.ProxyType.NONE) { + maxProxyConnectionToken = nonProxyConnectionLimit.tryAcquire(!keepAlive); + proxyIdent = ""; + } else if (proxy.type() == Proxy.ProxyType.SYSTEM) { + String scheme = connectionKey.tls().enabled() ? "https" : "http"; + ProxySelector proxySelector = ProxySelector.getDefault(); + if (proxySelector == null) { + maxProxyConnectionToken = nonProxyConnectionLimit.tryAcquire(!keepAlive); + proxyIdent = ""; + } else { + List proxies = proxySelector + .select(URI.create(scheme + "://" + connectionKey.host() + ":" + connectionKey.port())); + if (proxies.isEmpty()) { + maxProxyConnectionToken = nonProxyConnectionLimit.tryAcquire(!keepAlive); + proxyIdent = ""; + } else { + java.net.Proxy jnProxy = proxies.getFirst(); + if (jnProxy.type() == java.net.Proxy.Type.DIRECT) { + maxProxyConnectionToken = nonProxyConnectionLimit.tryAcquire(!keepAlive); + proxyIdent = ""; + } else { + SocketAddress proxyAddress = jnProxy.address(); + if (proxyAddress instanceof InetSocketAddress inetSocketAddress) { + proxyIdent = inetSocketAddress.getHostName() + ":" + inetSocketAddress.getPort(); + } else { + proxyIdent = proxyAddress.toString(); + } + Limit proxyConnectionLimit = proxyConnectionLimits.getOrDefault(proxyIdent, NOOP); + maxProxyConnectionToken = proxyConnectionLimit.tryAcquire(!keepAlive); + } + } + } + } else { + proxyIdent = proxy.host() + ":" + proxy.port(); + Limit proxyConnectionLimit = proxyConnectionLimits.getOrDefault(proxyIdent, NOOP); + maxProxyConnectionToken = proxyConnectionLimit.tryAcquire(!keepAlive); + } + if (maxProxyConnectionToken.isPresent()) { + //Maximum proxy/non-proxy connections was not reached + return checkHostLimit(maxConnectionToken, + maxProxyConnectionToken.get(), + connectionKey, + http1Client, + releaseFunction, + keepAlive, + proxyIdent); + } else { + maxConnectionToken.ignore(); + } + return null; + } + + private TcpClientConnection checkHostLimit(LimitAlgorithm.Token maxConnectionToken, + LimitAlgorithm.Token maxProxyConnectionToken, + ConnectionKey connectionKey, + Http1ClientImpl http1Client, + Function releaseFunction, + boolean keepAlive, + String proxyIdent) { + String hostKey = connectionKey.host(); + if (!proxyIdent.isEmpty()) { + hostKey = hostAndProxyKey(hostKey, proxyIdent); + } + Limit hostLimit; + try { + hostsConnectionLimitLock.lock(); + hostLimit = connectionLimitsPerHost.computeIfAbsent(hostKey, + key -> Optional.ofNullable(proxyConfigs.get(proxyIdent)) + .flatMap(Http1ProxyLimitConfigBlueprint::connectionPerHostLimit) + .orElse(connectionPerHostLimit) + .copy()); + } finally { + hostsConnectionLimitLock.unlock(); + } + Optional maxConnectionPerRouteLimitToken = hostLimit.tryAcquire(!keepAlive); + if (maxConnectionPerRouteLimitToken.isPresent()) { + //Maximum host connections was not reached + //Create new connection + return TcpClientConnection.create(http1Client.webClient(), + connectionKey, + ALPN_ID, + releaseFunction, + conn -> { + //We need to free all the tokens when this connection is closed. + maxConnectionToken.success(); + maxProxyConnectionToken.success(); + maxConnectionPerRouteLimitToken.get().success(); + }); + } else { + maxConnectionToken.ignore(); + maxProxyConnectionToken.ignore(); + } + return null; + } + + //Getters are here only for testing purposes + Limit maxConnectionLimit() { + return connectionLimit; + } + + Map connectionLimitsPerHost() { + return Map.copyOf(connectionLimitsPerHost); + } + } + } diff --git a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ConnectionCacheConfigBlueprint.java b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ConnectionCacheConfigBlueprint.java new file mode 100644 index 00000000000..743b9c5cc37 --- /dev/null +++ b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ConnectionCacheConfigBlueprint.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.http1; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; + +import io.helidon.builder.api.Option; +import io.helidon.builder.api.Prototype; +import io.helidon.common.concurrency.limits.Limit; +import io.helidon.common.concurrency.limits.spi.LimitProvider; + +/** + * Configuration of the HTTP/1.1 client cache. + */ +@Prototype.Configured +@Prototype.Blueprint +interface Http1ConnectionCacheConfigBlueprint { + + /** + * Whether to enable connection limits, if they are set. + * Default value is {@code true}. + * + * @return whether to use configured connection limits + */ + @Option.DefaultBoolean(true) + @Option.Configured + boolean enableConnectionLimits(); + + /** + * Total connection limit of the client. + * This limit cannot be overridden on any underling level. + * Set as unlimited if not configured. + * + * @return configured connection limit + */ + @Option.Provider(value = LimitProvider.class, discoverServices = false) + @Option.Configured + Optional connectionLimit(); + + /** + * Limit of how many connections can be created per each host. + * This limit can be adjusted via specific host configuration. + * + * @return configured connection limit + */ + @Option.Provider(value = LimitProvider.class, discoverServices = false) + @Option.Configured + Optional connectionPerHostLimit(); + + /** + * Limit of how many connections can be created without proxy. + * + * @return configured non-proxy connection limit + */ + @Option.Provider(value = LimitProvider.class, discoverServices = false) + @Option.Configured + Optional nonProxyConnectionLimit(); + + /** + * Specific host connection limit configuration. + * Limit specified for each host will override the one defined by {@link #connectionPerHostLimit()}. + * + * @return specific host limits + */ + @Option.Singular + @Option.Configured + List hostLimits(); + + /** + * Specific proxy limit configurations. + * + * @return proxy limit configurations + */ + @Option.Singular + @Option.Configured + List proxyLimits(); + + /** + * Keep alive timeout, how long should the client wait for the connection, when all the connections are taken. + * + * @return keep alive connection timeout + */ + @Option.Configured + @Option.Default("PT5S") + Duration keepAliveTimeout(); + +} diff --git a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1HostLimitConfigBlueprint.java b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1HostLimitConfigBlueprint.java new file mode 100644 index 00000000000..f57a8d21301 --- /dev/null +++ b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1HostLimitConfigBlueprint.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.http1; + +import io.helidon.builder.api.Option; +import io.helidon.builder.api.Prototype; +import io.helidon.common.concurrency.limits.Limit; +import io.helidon.common.concurrency.limits.spi.LimitProvider; + +/** + * Configuration of the host connection limit. + */ +@Prototype.Configured +@Prototype.Blueprint +interface Http1HostLimitConfigBlueprint { + + /** + * Hostname this limit applies to.. + * Example: {@code localhost}. + * + * @return configured authority + */ + @Option.Configured + String host(); + + /** + * Limit of how many connections can be created for this host. + * + * @return configured host connection limit + */ + @Option.Provider(value = LimitProvider.class, discoverServices = false) + @Option.Configured + Limit limit(); + +} diff --git a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ProxyLimitConfigBlueprint.java b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ProxyLimitConfigBlueprint.java new file mode 100644 index 00000000000..eaba18b69d8 --- /dev/null +++ b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ProxyLimitConfigBlueprint.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.http1; + +import java.util.List; +import java.util.Optional; + +import io.helidon.builder.api.Option; +import io.helidon.builder.api.Prototype; +import io.helidon.common.concurrency.limits.Limit; +import io.helidon.common.concurrency.limits.spi.LimitProvider; + +/** + * Configuration of the HTTP/1.1 client proxy limit. + */ +@Prototype.Configured +@Prototype.Blueprint +interface Http1ProxyLimitConfigBlueprint { + + /** + * Authority of the proxy. + * Example: {@code example-host:80}. + * + * @return configured authority + */ + @Option.Configured + String authority(); + + /** + * Total connection limit of the proxy. + * This limit does not override {@link Http1ConnectionCacheConfig#connectionLimit()}. + * + * @return configured connection limit + */ + @Option.Provider(value = LimitProvider.class, discoverServices = false) + @Option.Configured + Optional connectionLimit(); + + /** + * Limit of how many connections can be created per each host. + * This limit can be adjusted via specific host configuration. + * + * @return configured connection limit + */ + @Option.Provider(value = LimitProvider.class, discoverServices = false) + @Option.Configured + Optional connectionPerHostLimit(); + + /** + * Specific host connection limit configuration. + * Limit specified for each host will override the one defined by {@link #connectionPerHostLimit()}. + * + * @return specific host limits + */ + @Option.Singular + @Option.Configured + List hostLimits(); + +} diff --git a/webclient/http1/src/main/java/module-info.java b/webclient/http1/src/main/java/module-info.java index 87802215cd8..10259469625 100644 --- a/webclient/http1/src/main/java/module-info.java +++ b/webclient/http1/src/main/java/module-info.java @@ -28,6 +28,7 @@ module io.helidon.webclient.http1 { requires io.helidon.builder.api; // @Builder - interfaces are a runtime dependency + requires io.helidon.common.concurrency.limits; requires static io.helidon.common.features.api; @@ -40,6 +41,7 @@ provides io.helidon.webclient.spi.ProtocolConfigProvider with io.helidon.webclient.http1.Http1ProtocolConfigProvider; + uses io.helidon.common.concurrency.limits.spi.LimitProvider; uses io.helidon.webclient.spi.SourceHandlerProvider; } \ No newline at end of file diff --git a/webclient/http1/src/test/java/io/helidon/webclient/http1/Http1ConnectionCacheTest.java b/webclient/http1/src/test/java/io/helidon/webclient/http1/Http1ConnectionCacheTest.java new file mode 100644 index 00000000000..0caddd4727e --- /dev/null +++ b/webclient/http1/src/test/java/io/helidon/webclient/http1/Http1ConnectionCacheTest.java @@ -0,0 +1,272 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webclient.http1; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import io.helidon.common.concurrency.limits.FixedLimit; +import io.helidon.common.concurrency.limits.Limit; +import io.helidon.common.concurrency.limits.LimitAlgorithm; +import io.helidon.common.uri.UriInfo; +import io.helidon.config.Config; +import io.helidon.webclient.api.ConnectionKey; +import io.helidon.webclient.api.Proxy; +import io.helidon.webclient.api.TcpClientConnection; +import io.helidon.webclient.api.WebClient; + +import org.junit.jupiter.api.Test; + +import static io.helidon.webclient.http1.Http1ConnectionCache.ConnectionCreationStrategy; +import static io.helidon.webclient.http1.Http1ConnectionCache.UnlimitedConnectionStrategy; +import static io.helidon.webclient.http1.Http1ConnectionCache.LimitedConnectionStrategy; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +class Http1ConnectionCacheTest { + + private static final Http1ClientImpl DEFAULT_CLIENT; + + static { + WebClient webClient = WebClient.create(); + Http1ClientConfig clientConfig = Http1ClientConfig.create(); + DEFAULT_CLIENT = new Http1ClientImpl(webClient, clientConfig); + } + + @Test + void testCacheFromConfig() { + Config config = Config.create(); + Http1ConnectionCacheConfig cacheConfig = Http1ConnectionCacheConfig.create(config.get("missing")); + Http1ConnectionCache cache = Http1ConnectionCache.create(cacheConfig); + ConnectionCreationStrategy strategy = cache.strategy(); + assertThat(strategy, instanceOf(UnlimitedConnectionStrategy.class)); + + cacheConfig = Http1ConnectionCacheConfig.create( + config.get("client-cache-tests.connection-cache-config")); + cache = Http1ConnectionCache.create(cacheConfig); + strategy = cache.strategy(); + assertThat(strategy, instanceOf(LimitedConnectionStrategy.class)); + } + + @Test + void testCacheFromClientConfig() { + Config config = Config.create(); + Http1ClientImpl client = (Http1ClientImpl) Http1Client.create(config.get("missing")); + Http1ConnectionCache cache = client.connectionCache(); + ConnectionCreationStrategy strategy = cache.strategy(); + assertThat(strategy, instanceOf(UnlimitedConnectionStrategy.class)); + + client = (Http1ClientImpl) Http1Client.create(config.get("client-cache-tests")); + cache = client.connectionCache(); + strategy = cache.strategy(); + assertThat(strategy, instanceOf(LimitedConnectionStrategy.class)); + } + + @Test + void testDefaultConnectionCreationStrategyCreation() { + Http1ConnectionCache cache = Http1ConnectionCache.create(); + assertThat(cache.strategy(), instanceOf(UnlimitedConnectionStrategy.class)); + } + + @Test + void testLimitedConnectionCreationStrategyCreation() { + Http1ConnectionCacheConfig connectionCacheConfig = Http1ConnectionCacheConfig.builder() + .connectionLimit(FixedLimit.create()) + .build(); + Http1ConnectionCache cache = Http1ConnectionCache.create(connectionCacheConfig); + assertThat(cache.strategy(), instanceOf(LimitedConnectionStrategy.class)); + } + + @Test + void testDisabledLimits() { + Http1ConnectionCacheConfig connectionCacheConfig = Http1ConnectionCacheConfig.builder() + .connectionLimit(FixedLimit.create()) //This would indicate usage of the limited strategy + .enableConnectionLimits(false) //This line enforces usage of unlimited strategy + .build(); + Http1ConnectionCache cache = Http1ConnectionCache.create(connectionCacheConfig); + assertThat(cache.strategy(), instanceOf(UnlimitedConnectionStrategy.class)); + } + + @Test + void testUnlimitedConnectionStrategy() { + Http1ClientRequest request = DEFAULT_CLIENT.get().uri("http://localhost:8080"); + Http1ClientConfig clientConfig = DEFAULT_CLIENT.prototype(); + UriInfo uri = request.resolvedUri(); + ConnectionKey connectionKey = new ConnectionKey(uri.scheme(), + uri.host(), + uri.port(), + clientConfig.readTimeout().orElse(Duration.ZERO), + clientConfig.tls(), + clientConfig.dnsResolver(), + clientConfig.dnsAddressLookup(), + clientConfig.proxy()); + ConnectionCreationStrategy strategy = new UnlimitedConnectionStrategy(); + + for (int i = 0; i < 100; i++) { + TcpClientConnection connection = strategy.createConnection(connectionKey, + DEFAULT_CLIENT, + tcpClientConnection -> false, + true); + assertThat(connection, notNullValue()); + } + } + + @Test + void testConnectionLimit() { + Http1ConnectionCacheConfig cacheConfig = Http1ConnectionCacheConfig.builder() + .connectionLimit(FixedLimit.builder().permits(5).build()) + .build(); + LimitedConnectionStrategy strategy = new LimitedConnectionStrategy(cacheConfig); + testStrategyLimit("http://localhost:8080", strategy, 5); + assertThat(strategy.maxConnectionLimit().tryAcquire(false), is(Optional.empty())); + + assertThat(strategy.connectionLimitsPerHost().size(), is(1)); + Limit localhostLimit = strategy.connectionLimitsPerHost().get("localhost"); + assertThat(localhostLimit, notNullValue()); + assertThat(localhostLimit.tryAcquire(false), is(not((Optional.empty())))); + } + + @Test + void testPerHostConnectionLimit() { + Http1ConnectionCacheConfig cacheConfig = Http1ConnectionCacheConfig.builder() + .connectionPerHostLimit(FixedLimit.builder().permits(5).build()) + .build(); + LimitedConnectionStrategy strategy = new LimitedConnectionStrategy(cacheConfig); + testStrategyLimit("http://localhost:8080", strategy, 5); + assertThat(strategy.maxConnectionLimit().tryAcquire(false), is(not((Optional.empty())))); + + assertThat(strategy.connectionLimitsPerHost().size(), is(1)); + Limit localhostLimit = strategy.connectionLimitsPerHost().get("localhost"); + assertThat(localhostLimit, notNullValue()); + assertThat(localhostLimit.tryAcquire(false), is(Optional.empty())); + + testStrategyLimit("http://localhost2:8080", strategy, 5); + assertThat(strategy.connectionLimitsPerHost().size(), is(2)); + } + + @Test + void testSpecificHostConnectionLimit() { + Http1ConnectionCacheConfig cacheConfig = Http1ConnectionCacheConfig.builder() + .connectionPerHostLimit(FixedLimit.builder().permits(5).build()) + .addHostLimit(builder -> builder.host("localhost").limit(FixedLimit.create(fb -> fb.permits(2))).build()) + .build(); + LimitedConnectionStrategy strategy = new LimitedConnectionStrategy(cacheConfig); + testStrategyLimit("http://localhost:8080", strategy, 2); + assertThat(strategy.maxConnectionLimit().tryAcquire(false), is(not((Optional.empty())))); + + Limit localhostLimit = strategy.connectionLimitsPerHost().get("localhost"); + assertThat(localhostLimit, notNullValue()); + assertThat(localhostLimit.tryAcquire(false), is(Optional.empty())); + + testStrategyLimit("http://localhost2:8080", strategy, 5); + assertThat(strategy.connectionLimitsPerHost().size(), is(2)); + } + + @Test + void testNonProxyConnectionLimit() { + Http1ConnectionCacheConfig cacheConfig = Http1ConnectionCacheConfig.builder() + .nonProxyConnectionLimit(FixedLimit.builder().permits(5).build()) + .addProxyLimit(builder -> builder.authority("localhost:1234") + .connectionLimit(FixedLimit.builder().permits(3).build())) + .build(); + LimitedConnectionStrategy strategy = new LimitedConnectionStrategy(cacheConfig); + testStrategyLimit("http://localhost:8080", strategy, 5); + + Proxy proxy = Proxy.builder().type(Proxy.ProxyType.HTTP).host("localhost").port(1234).build(); + testStrategyLimit("http://localhost:8080", strategy, 3, proxy); + } + + @Test + void testProxyConnectionConfigurationLimit() { + Http1ConnectionCacheConfig cacheConfig = Http1ConnectionCacheConfig.builder() + .addProxyLimit(builder -> builder.authority("localhost:1234") + .connectionLimit(FixedLimit.builder().permits(4).build()) + .connectionPerHostLimit(FixedLimit.builder().permits(2).build()) + .addHostLimit(hb -> hb.host("localhost2").limit(FixedLimit.create(fb -> fb.permits(3))).build())) + .build(); + LimitedConnectionStrategy strategy = new LimitedConnectionStrategy(cacheConfig); + Proxy proxy = Proxy.builder().type(Proxy.ProxyType.HTTP).host("localhost").port(1234).build(); + List connections = testStrategyLimit("http://localhost:8080", strategy, 2, proxy); + + //Only two free connections should be remaining in the max connection size by this proxy + testStrategyLimit("http://localhost2:8080", strategy, 2, proxy); + + connections.forEach(TcpClientConnection::releaseResource); + //1 more connections should be remaining to get the full host limit + testStrategyLimit("http://localhost2:8080", strategy, 1, proxy); + + //1 remaining connection of the total amount of 4 connections permitted by this proxy + testStrategyLimit("http://localhost:8080", strategy, 1, proxy); + } + + @Test + void testNonProxyConnectionReleasing() { + Http1ConnectionCacheConfig cacheConfig = Http1ConnectionCacheConfig.builder() + .connectionLimit(FixedLimit.builder().permits(5).build()) + .build(); + LimitedConnectionStrategy strategy = new LimitedConnectionStrategy(cacheConfig); + List connections = testStrategyLimit("http://localhost:8080", strategy, 5); + assertThat(strategy.maxConnectionLimit().tryAcquire(false), is(Optional.empty())); + + connections.forEach(TcpClientConnection::releaseResource); + Optional token = strategy.maxConnectionLimit().tryAcquire(false); + assertThat(token, is(not(Optional.empty()))); + token.get().ignore(); + + testStrategyLimit("http://localhost:8080", strategy, 5); + } + + private List testStrategyLimit(String uriString, LimitedConnectionStrategy strategy, int expectedLimit) { + return testStrategyLimit(uriString, strategy, expectedLimit, null); + } + + private List testStrategyLimit(String uriString, LimitedConnectionStrategy strategy, int expectedLimit, Proxy proxy) { + Http1ClientRequest request = DEFAULT_CLIENT.get().uri(uriString); + Http1ClientConfig clientConfig = DEFAULT_CLIENT.prototype(); + UriInfo uri = request.resolvedUri(); + ConnectionKey connectionKey = new ConnectionKey(uri.scheme(), + uri.host(), + uri.port(), + clientConfig.readTimeout().orElse(Duration.ZERO), + clientConfig.tls(), + clientConfig.dnsResolver(), + clientConfig.dnsAddressLookup(), + proxy == null ? clientConfig.proxy() : proxy); + List connections = new ArrayList<>(); + for (int i = 1; i <= expectedLimit + 1; i++) { + TcpClientConnection connection = strategy.createConnection(connectionKey, + DEFAULT_CLIENT, + tcpClientConnection -> false, + //Keep-alive true ensures Limit not to wait + true); + if (i <= expectedLimit) { + assertThat(connection, notNullValue()); + connections.add(connection); + } else { + assertThat(connection, nullValue()); + } + } + return connections; + } + +} diff --git a/webclient/http1/src/test/resources/application.yaml b/webclient/http1/src/test/resources/application.yaml new file mode 100644 index 00000000000..cdaf1e9303d --- /dev/null +++ b/webclient/http1/src/test/resources/application.yaml @@ -0,0 +1,30 @@ +# +# Copyright (c) 2024 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +client-cache-tests: + share-connection-cache: false + connection-cache-config: + connection-limit: + fixed: + permits: 100 + connection-per-host-limit: + fixed: + permits: 5 + host-limits: + - host: "some-host" + limit: + fixed: + permits: 2 \ No newline at end of file