Skip to content

Commit

Permalink
Put flexible waitForServer logic in AbstractRpcClient, use in Bitcoin…
Browse files Browse the repository at this point in the history
…Client

AbstractRpcClient:

* add pollOnce() method that takes
   1. JsonRpcRequest supplier
   2. JavaType for expected result
   3. Error mapper to ignore non-fatal errors in an application specific way

* add flexible waitForServer() method that takes
   1. timeout duration
   2. retry duration
   3. JsonRpcRequest supplier
   4. JavaType for expected result
   5. Error mapper to ignore non-fatal errors in an application specific way

* add TransientErrorMapper functional interface

* addition support methods

BitcoinClient:

* re-implement existing synchronous waitForServer() using async methods
* add mapBitcoinTransientErrors function that implements existing logic
  to ignore "transient" errors and extract status messages from them. It
  also adds initial handling of java.net.ConnectException which is thrown
  by java.net.http when trying to connect to an unopened port.
  • Loading branch information
msgilligan committed Sep 25, 2023
1 parent 18a904e commit e103b0a
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,22 @@
import javax.net.ssl.SSLContext;
import java.io.EOFException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -345,80 +350,59 @@ private Address getTestAddress() {
}
}

// TODO: Make this an async method that returns CompletableFuture.
// TODO: Consider renaming this method "connect"
// TODO: Consider having this method automatically get the Bitcoin Network, Server Version, RegTest mining address, etc once connection is made
// TODO: This method should be deprecated in favor of the version that takes a Duration
/**
* Wait until the server is available.
*
* <p>
* Keep trying, ignoring (and logging) a known list of exception conditions that may occur while waiting for
* a `bitcoind` server to start up. This is similar to the behavior enabled by the `-rpcwait`
* option to the `bitcoin-cli` command-line tool.
* a {@code bitcoind} server to start up. This is similar to the behavior enabled by the {@code -rpcwait}
* option to the {@code bitcoin-cli} command-line tool.
*
* @param timeout Timeout in seconds
* @param timeoutSeconds Timeout in seconds
* @return true if ready, false if timeout or interrupted
* @throws JsonRpcException if an "unexpected" exception happens (i.e. an error other than what happens during normal server startup)
*/
public Boolean waitForServer(int timeout) throws JsonRpcException {

log.debug("Waiting for server RPC ready...");

String status; // Status message for logging
String statusLast = null;
long seconds = 0;
while (seconds < timeout) {
try {
Integer block = this.getBlockCount();
if (block != null) {
log.debug("RPC Ready.");
return true;
}
status = "getBlock returned null";
} catch (SocketException se) {
// These are expected exceptions while waiting for a server
if (se.getMessage().equals("Unexpected end of file from server") ||
se.getMessage().equals("Connection reset") ||
se.getMessage().contains("Connection refused") ||
se.getMessage().equals("recvfrom failed: ECONNRESET (Connection reset by peer)")) {
status = se.getMessage();
} else {
throw new JsonRpcException("Unexpected exception in waitForServer", se) ;
}
public boolean waitForServer(int timeoutSeconds) throws JsonRpcException {
return waitForServer(Duration.ofSeconds(timeoutSeconds));
}

} catch (EOFException e) {
/* Android exception, ignore */
// Expected exceptions on Android, RoboVM
status = e.getMessage();
} catch (JsonRpcStatusException e) {
// If server is in "warm-up" mode, e.g. validating/parsing the blockchain...
if (e.jsonRpcCode == -28) {
// ...then grab text message for status logging
status = e.getMessage();
} else {
log.error("Rethrowing JsonRpcStatusException: ", e);
throw e;
}
} catch (IOException e) {
// Ignore all IOExceptions
status = e.getMessage();
}
// Log status messages only once, if new or updated
if (!status.equals(statusLast)) {
log.info("Waiting for server: RPC Status: " + status);
statusLast = status;
}
try {
Thread.sleep(RETRY_DELAY.toMillis());
seconds += RETRY_DELAY.toSeconds();
} catch (InterruptedException e) {
log.error(e.toString());
Thread.currentThread().interrupt();
return false;
}
/**
* Poll the server regularly until it responds.
* @param timeout how long to wait for server
* @return Completes with {@code true} if server responded, {@code false} for timeout
* @throws JsonRpcException if an unexpected JsonRpcException occurs
*/
public boolean waitForServer(Duration timeout) throws JsonRpcException {
try {
return syncGet(waitForServerAsync(timeout, RETRY_DELAY));
} catch (JsonRpcException je) {
throw je;
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

log.error("waitForServer() timed out after {} seconds.", timeout);
return false;
// TODO: Consider renaming this method "connect"
// TODO: Consider having this method automatically get the Bitcoin Network, Server Version, RegTest mining address, etc once connection is made (the generic waitForServer() method is capable of doing that)
/**
* Wait for a connection to the server. Uses {@code getblockcount} to poll server.
* @param timeout how long to wait for server
* @param retry time to wait between retries
* @return Completes with {@code true} if server responded, {@code false} for timeout, exceptionally for unexpected errors.
*/
public CompletableFuture<Boolean> waitForServerAsync(Duration timeout, Duration retry) {
Supplier<JsonRpcRequest> requestSupplier = () -> new JsonRpcRequest("getblockcount");
return waitForServer(timeout, retry, requestSupplier, typeForClass(Integer.class), this::mapBitcoinTransientErrors )
.handle((i, t) -> {
if (i != null) {
return CompletableFuture.completedFuture(Boolean.TRUE);
} else if (t instanceof TimeoutException) {
return CompletableFuture.completedFuture(Boolean.FALSE);
} else {
return CompletableFuture.<Boolean>failedFuture(t);
}
})
.thenCompose(Function.identity());
}

/**
Expand Down Expand Up @@ -459,6 +443,59 @@ public Boolean waitForBlock(int blockHeight, int timeout) throws JsonRpcStatusEx
return false;
}

/**
* TransientErrorMapper suitable for waiting for a temporarily down or restarting Bitcoin JSON-RPC server. This
* can be used to implement the {@code -rpcwait} command-line option, for instance.
*/
protected <T> CompletableFuture<JsonRpcResponse<T>> mapBitcoinTransientErrors(JsonRpcRequest request, JsonRpcResponse<T> response, Throwable t) {
if (response != null) {
return CompletableFuture.completedFuture(response);
} else {
if (t instanceof CompletionException) {
// Java.net.http driver
if (t.getCause() != null) {
if (t.getCause() instanceof ConnectException) {
return CompletableFuture.completedFuture(temporarilyUnavailableResponse(request, t));
} else {
return CompletableFuture.failedFuture(t);
}
}
}
if (t instanceof SocketException) {
// Java HttpUrlConnection driver
SocketException se = (SocketException) t;
// These are expected exceptions while waiting for a server
if (se.getMessage().equals("Unexpected end of file from server") ||
se.getMessage().equals("Connection reset") ||
se.getMessage().contains("Connection refused") ||
se.getMessage().equals("recvfrom failed: ECONNRESET (Connection reset by peer)")) {
// Transient error, generate synthetic JSON-RPC error response with appropriate values
return CompletableFuture.completedFuture(temporarilyUnavailableResponse(request, t));
} else {
return CompletableFuture.failedFuture(se);
}
} else if (t instanceof EOFException) {
/* Android exception, ignore */
// Expected exceptions on Android, RoboVM
return CompletableFuture.completedFuture(temporarilyUnavailableResponse(request, t));
} else if (t instanceof JsonRpcStatusException) {
JsonRpcStatusException e = (JsonRpcStatusException) t;
// If server is in "warm-up" mode, e.g. validating/parsing the blockchain...
if (e.jsonRpcCode == -28) {
// ...then grab text message for status logging
return CompletableFuture.completedFuture(temporarilyUnavailableResponse(request, t));
} else {
return CompletableFuture.failedFuture(e);
}
} else if (t instanceof IOException) {
// Ignore all IOExceptions
return CompletableFuture.completedFuture(temporarilyUnavailableResponse(request, t));
} else {
return CompletableFuture.failedFuture(t);
}
}
}

/**
* Tell the server to stop
* @return A status string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;

// TODO: Rather than implementing transport (HttpUrlConnection vs. java.net.http) with subclasses use composition
// In other words, the constructor should take a transport implementation object.
Expand Down Expand Up @@ -50,6 +57,7 @@
* allowing implementation with alternative HTTP client libraries.
*/
public abstract class AbstractRpcClient implements JsonRpcClient<JavaType> {
private static final Logger log = LoggerFactory.getLogger(AbstractRpcClient.class);
/**
* The default JSON-RPC version in JsonRpcRequest is now '2.0', but since most
* requests are created inside {@code RpcClient} subclasses, we'll continue to default
Expand Down Expand Up @@ -109,4 +117,127 @@ public JavaType typeForClass(Class<?> clazz) {
return getMapper().constructType(clazz);
}

public <T> CompletableFuture<JsonRpcResponse<T>> pollOnce(JsonRpcRequest request, JavaType resultType, TransientErrorMapper<T> errorMapper) {
CompletableFuture<JsonRpcResponse<T>> f = sendRequestForResponseAsync(request, responseTypeFor(resultType));
// In Java 12+ this can be replaced with exceptionallyCompose()
return f.handle((r, t) -> errorMapper.map(request, r, t))
.thenCompose(Function.identity());
}

/**
* A wait-for-server routine that is agnostic about which RPC methods the server supports. In addition to two {@link Duration}
* parameters, there are 3 parameters (2 functions and a generic type specifier) to enable this method to work with any JSON-RPC server.
* @param timeout how long to wait
* @param retry delay between retries
* @param requestSupplier supplier of requests (needs to increment request ID at the very least)
* @param resultType the result type for the response
* @param errorMapper function that maps non-fatal errors (i.e. cases to keep polling)
* @return A future that returns a successful
* @param <T> The desired result type to be returned when the server is running
*/
public <T> CompletableFuture<T> waitForServer(Duration timeout, Duration retry, Supplier<JsonRpcRequest> requestSupplier, JavaType resultType, TransientErrorMapper<T> errorMapper) {
CompletableFuture<T> future = new CompletableFuture<>();
getDefaultAsyncExecutor().execute(() -> {
log.debug("Waiting for server RPC ready...");
String status; // Status message for logging
String statusLast = null;
long seconds = 0;
while (seconds < timeout.toSeconds()) {
JsonRpcResponse<T> r = null;
try {
// All non-fatal exceptions will be mapped to a JsonRpcError with code -20000
r = this.pollOnce(requestSupplier.get(), resultType, errorMapper).get();
} catch (InterruptedException | ExecutionException e) {
// If a fatal error occurred, fail our future and abort this thread
log.error("Fatal exception: ", e);
future.completeExceptionally(e);
return;
}
if (r.getResult() != null) {
// We received a response with a result, server is ready and has returned a usable result
log.debug("RPC Ready.");
future.complete(r.getResult());
return;
}
// We received a response with a non-fatal error, log it and wait to retry.
status = statusFromErrorResponse(r);
// Log status messages only once, if new or updated
if (!status.equals(statusLast)) {
log.info("Waiting for server: RPC Status: " + status);
statusLast = status;
}
try {
// Damnit, IntelliJ we're not busy-waiting we're polling!
Thread.sleep(retry.toMillis());
seconds += retry.toSeconds();
} catch (InterruptedException e) {
log.error(e.toString());
Thread.currentThread().interrupt();
future.completeExceptionally(e);
return;
}
}
String timeoutMessage = String.format("waitForServer() timed out after %d seconds", timeout.toSeconds());
log.error(timeoutMessage);
future.completeExceptionally(new TimeoutException(timeoutMessage));
});
return future;
}

/**
* Functional interface for ignoring what are considered "transient" errors. The definition of what is transient
* may vary depending upon the application. Different implementations of this function can be created for
* different applications.
* <p>
* The {@code JsonRpcResponse} returned may be a "synthetic" response, that is generated by the client,
* not by the server. The synthetic response will look like this:
* <ul>
* <li>error.code: -20000</li>
* <li>error.message: "Server temporarily unavailable"</li>
* <li>error.data: Detailed string message, e.g. "Connection refused"</li>
* </ul>
* @param <T> The expected result type
*/
@FunctionalInterface
public interface TransientErrorMapper<T> {
/**
* @param request The request we're handling completions for
* @param response response if one was successfully returned (or null)
* @param throwable exception if the call failed (or null)
* @return A completed or failed future than can replace the input (response, throwable) pair
*/
CompletableFuture<JsonRpcResponse<T>> map(JsonRpcRequest request, JsonRpcResponse<T> response, Throwable throwable);
}

/**
* Transient error mapper that is a no-op, i.e. it passes all errors through unchanged.
*/
protected <T> CompletableFuture<JsonRpcResponse<T>> identityTransientErrorMapper(JsonRpcRequest request, JsonRpcResponse<T> response, Throwable t) {
return response != null
? CompletableFuture.completedFuture(response)
: CompletableFuture.failedFuture(t);
}

protected <T> JsonRpcResponse<T> temporarilyUnavailableResponse(JsonRpcRequest request, Throwable t) {
return new JsonRpcResponse<T>(request, new JsonRpcError(-2000, "Server temporarily unavailable", t.getMessage()));
}

/**
* @param response A response where {@code getResult() == null}
* @return An error status string suitable for log messages
*/
protected String statusFromErrorResponse(JsonRpcResponse<?> response) {
Objects.requireNonNull(response);
if (response.getResult() != null) {
throw new IllegalStateException("This should only be called for responses with null result");
}
if (response.getError() == null) {
return "Invalid response both result and error were null";
} else if (response.getError().getData() != null) {
// Has option data, possibly the -2000 special case
return response.getError().getData().toString();
} else {
return response.getError().getMessage();
}
}
}

0 comments on commit e103b0a

Please sign in to comment.