From e7c09a2cb2b0951f2c2c1907b11f732f00257e44 Mon Sep 17 00:00:00 2001 From: joerg1985 <16140691+joerg1985@users.noreply.github.com> Date: Tue, 29 Oct 2024 23:36:32 +0100 Subject: [PATCH] [grid] enable the httpclient to perform async requests #14403 (#14409) Co-authored-by: Viet Nguyen Duc --- .../selenium/remote/http/HttpClient.java | 5 + .../remote/http/jdk/JdkHttpClient.java | 91 +++++++++++++------ .../remote/internal/HttpClientTestBase.java | 30 ++++++ 3 files changed, 99 insertions(+), 27 deletions(-) diff --git a/java/src/org/openqa/selenium/remote/http/HttpClient.java b/java/src/org/openqa/selenium/remote/http/HttpClient.java index ee7f8613a64ea..6cc86a6418a2f 100644 --- a/java/src/org/openqa/selenium/remote/http/HttpClient.java +++ b/java/src/org/openqa/selenium/remote/http/HttpClient.java @@ -23,6 +23,7 @@ import java.net.URL; import java.util.ServiceLoader; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.openqa.selenium.internal.Require; @@ -32,6 +33,10 @@ public interface HttpClient extends Closeable, HttpHandler { WebSocket openSocket(HttpRequest request, WebSocket.Listener listener); + default CompletableFuture executeAsync(HttpRequest req) { + return CompletableFuture.supplyAsync(() -> execute(req)); + } + default void close() {} interface Factory { diff --git a/java/src/org/openqa/selenium/remote/http/jdk/JdkHttpClient.java b/java/src/org/openqa/selenium/remote/http/jdk/JdkHttpClient.java index 51e4ede37d563..0fee3531ce6c0 100644 --- a/java/src/org/openqa/selenium/remote/http/jdk/JdkHttpClient.java +++ b/java/src/org/openqa/selenium/remote/http/jdk/JdkHttpClient.java @@ -44,6 +44,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -369,9 +370,64 @@ private URI getWebSocketUri(HttpRequest request) throws URISyntaxException { return uri; } + @Override + public CompletableFuture executeAsync(HttpRequest request) { + // the facade for this http request + CompletableFuture cf = new CompletableFuture<>(); + + // the actual http request + Future future = + executorService.submit( + () -> { + try { + HttpResponse response = handler.execute(request); + + cf.complete(response); + } catch (Throwable t) { + cf.completeExceptionally(t); + } + }); + + // try to interrupt the http request in case of a timeout, to avoid + // https://bugs.openjdk.org/browse/JDK-8258397 + cf.exceptionally( + (throwable) -> { + if (throwable instanceof java.util.concurrent.TimeoutException) { + // interrupts the thread + future.cancel(true); + } + + // nobody will read this result + return null; + }); + + // will complete exceptionally with a java.util.concurrent.TimeoutException + return cf.orTimeout(readTimeout.toMillis(), TimeUnit.MILLISECONDS); + } + @Override public HttpResponse execute(HttpRequest req) throws UncheckedIOException { - return handler.execute(req); + try { + // executeAsync does define a timeout, no need to use a timeout here + return executeAsync(req).get(); + } catch (CancellationException e) { + throw new WebDriverException(e.getMessage(), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new WebDriverException(e.getMessage(), e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + + if (cause instanceof java.util.concurrent.TimeoutException) { + throw new TimeoutException(cause); + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof Error) { + throw (Error) cause; + } + + throw new WebDriverException((cause != null) ? cause : e); + } } private HttpResponse execute0(HttpRequest req) throws UncheckedIOException { @@ -390,34 +446,13 @@ private HttpResponse execute0(HttpRequest req) throws UncheckedIOException { // - avoid a downgrade of POST requests, see the javadoc of j.n.h.HttpClient.Redirect // - not run into https://bugs.openjdk.org/browse/JDK-8304701 for (int i = 0; i < 100; i++) { - java.net.http.HttpRequest request = messages.createRequest(req, method, rawUri); - java.net.http.HttpResponse response; - - // use sendAsync to not run into https://bugs.openjdk.org/browse/JDK-8258397 - CompletableFuture> future = - client.sendAsync(request, byteHandler); - - try { - response = future.get(readTimeout.toMillis(), TimeUnit.MILLISECONDS); - } catch (CancellationException e) { - throw new WebDriverException(e.getMessage(), e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - - if (cause instanceof HttpTimeoutException) { - throw new TimeoutException(cause); - } else if (cause instanceof IOException) { - throw (IOException) cause; - } else if (cause instanceof RuntimeException) { - throw (RuntimeException) cause; - } - - throw new WebDriverException((cause != null) ? cause : e); - } catch (java.util.concurrent.TimeoutException e) { - future.cancel(true); - throw new TimeoutException(e); + if (Thread.interrupted()) { + throw new InterruptedException("http request has been interrupted"); } + java.net.http.HttpRequest request = messages.createRequest(req, method, rawUri); + java.net.http.HttpResponse response = client.send(request, byteHandler); + switch (response.statusCode()) { case 303: method = HttpMethod.GET; @@ -454,6 +489,8 @@ private HttpResponse execute0(HttpRequest req) throws UncheckedIOException { } throw new ProtocolException("Too many redirects: 101"); + } catch (HttpTimeoutException e) { + throw new TimeoutException(e); } catch (IOException e) { throw new UncheckedIOException(e); } catch (InterruptedException e) { diff --git a/java/test/org/openqa/selenium/remote/internal/HttpClientTestBase.java b/java/test/org/openqa/selenium/remote/internal/HttpClientTestBase.java index a50b4c11f75e3..3379772a8db1e 100644 --- a/java/test/org/openqa/selenium/remote/internal/HttpClientTestBase.java +++ b/java/test/org/openqa/selenium/remote/internal/HttpClientTestBase.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import java.util.stream.StreamSupport; import org.junit.jupiter.api.AfterAll; @@ -233,6 +234,35 @@ public void shouldAllowConfigurationFromSystemProperties() { } } + @Test + public void shouldStopRequestAfterTimeout() throws InterruptedException { + AtomicInteger counter = new AtomicInteger(); + + delegate = + req -> { + counter.incrementAndGet(); + try { + Thread.sleep(1600); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + HttpResponse response = new HttpResponse(); + response.setStatus(302); + response.addHeader("Location", "/"); + return response; + }; + ClientConfig clientConfig = ClientConfig.defaultConfig().readTimeout(Duration.ofMillis(800)); + + try (HttpClient client = + createFactory().createClient(clientConfig.baseUri(URI.create(server.whereIs("/"))))) { + HttpRequest request = new HttpRequest(GET, "/delayed"); + assertThatExceptionOfType(TimeoutException.class).isThrownBy(() -> client.execute(request)); + Thread.sleep(4200); + + assertThat(counter.get()).isEqualTo(1); + } + } + private HttpResponse getResponseWithHeaders(final Multimap headers) { return executeWithinServer( new HttpRequest(GET, "/foo"),