Skip to content

Commit

Permalink
refactor: rewrite IoT cloud helper to use AWS SDK client instead of C…
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeDombo authored Apr 12, 2021
1 parent 1c3bc99 commit 8d788c1
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 238 deletions.
135 changes: 40 additions & 95 deletions src/main/java/com/aws/greengrass/iot/IotCloudHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,32 @@

import com.aws.greengrass.deployment.exceptions.AWSIotException;
import com.aws.greengrass.iot.model.IotCloudResponse;
import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.logging.impl.LogManager;
import com.aws.greengrass.util.BaseRetryableAccessor;
import com.aws.greengrass.util.CrashableSupplier;
import com.aws.greengrass.util.Utils;
import lombok.NoArgsConstructor;
import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
import software.amazon.awssdk.crt.http.HttpStream;
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.utils.IoUtils;

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.inject.Singleton;


@Singleton
@NoArgsConstructor
public class IotCloudHelper {
private static final Logger LOGGER = LogManager.getLogger(IotCloudHelper.class);
private static final String HTTP_HEADER_REQUEST_ID = "x-amzn-RequestId";
private static final String HTTP_HEADER_ERROR_TYPE = "x-amzn-ErrorType";
private static final String HTTP_HEADER_THING_NAME = "x-amzn-iot-thingname";
// TODO: [P41179510]: User configurable network timeout settings
// Max wait time for device to receive HTTP response from IOT CLOUD
private static final long TIMEOUT_FOR_RESPONSE_FROM_IOT_CLOUD_SECONDS = Duration.ofSeconds(30).getSeconds();
private static final int RETRY_COUNT = 3;
private static final int BACKOFF_MILLIS = 200;

Expand All @@ -60,87 +47,45 @@ public class IotCloudHelper {
* @return Http response corresponding to http request for path
* @throws AWSIotException when unable to send the request successfully
*/
public IotCloudResponse sendHttpRequest(final IotConnectionManager connManager, String thingName,
final String path, final String verb, final byte[] body)
throws AWSIotException {
List<HttpHeader> headers = new ArrayList<>();
headers.add(new HttpHeader("host", connManager.getHost()));
if (Utils.isNotEmpty(thingName)) {
headers.add(new HttpHeader(HTTP_HEADER_THING_NAME, thingName));
}
public IotCloudResponse sendHttpRequest(final IotConnectionManager connManager, String thingName, final String path,
final String verb, final byte[] body) throws AWSIotException {
SdkHttpRequest.Builder innerRequestBuilder = SdkHttpRequest.builder().method(SdkHttpMethod.fromValue(verb));

final HttpRequestBodyStream httpRequestBodyStream = body == null ? null : createHttpRequestBodyStream(body);
final HttpRequest request = new HttpRequest(verb, path, headers.toArray(new HttpHeader[0]),
httpRequestBodyStream);

try (HttpClientConnection conn = connManager.getConnection()) {
BaseRetryableAccessor accessor = new BaseRetryableAccessor();
CrashableSupplier<IotCloudResponse, AWSIotException> getHttpResponse = () -> getHttpResponse(conn, request);
return accessor.retry(RETRY_COUNT, BACKOFF_MILLIS, getHttpResponse,
new HashSet<>(Arrays.asList(AWSIotException.class)));
URI uri = connManager.getURI();
// If the path is actually a full URI, then treat it as such
if (path.startsWith("https://")) {
uri = URI.create(path);
innerRequestBuilder.uri(uri);
} else {
innerRequestBuilder.uri(uri).encodedPath(path);
}
}

private HttpRequestBodyStream createHttpRequestBodyStream(byte[] bytes) {
return new HttpRequestBodyStream() {
@Override
public boolean sendRequestBody(ByteBuffer bodyBytesOut) {
bodyBytesOut.put(bytes);
return true;
}

@Override
public boolean resetPosition() {
return true;
}
};
}

private HttpStreamResponseHandler createResponseHandler(CompletableFuture<Integer> reqCompleted,
Map<String, String> responseHeaders,
IotCloudResponse response) {
return new HttpStreamResponseHandler() {
@Override
public void onResponseHeaders(HttpStream httpStream, int i, int i1, HttpHeader[] httpHeaders) {
Arrays.stream(httpHeaders).forEach(header -> responseHeaders.put(header.getName(), header.getValue()));
}
if (Utils.isNotEmpty(thingName)) {
innerRequestBuilder.appendHeader(HTTP_HEADER_THING_NAME, thingName);
}

@Override
public int onResponseBody(HttpStream stream, byte[] bodyBytes) {
ByteArrayOutputStream responseByteArray = new ByteArrayOutputStream();
if (response.getResponseBody() != null) {
responseByteArray.write(response.getResponseBody(), 0, response.getResponseBody().length);
}
responseByteArray.write(bodyBytes, 0, bodyBytes.length);
response.setResponseBody(responseByteArray.toByteArray());
return bodyBytes.length;
}
ExecutableHttpRequest request = connManager.getClient().prepareRequest(HttpExecuteRequest.builder()
.contentStreamProvider(body == null ? null : () -> new ByteArrayInputStream(body))
.request(innerRequestBuilder.build()).build());

@Override
public void onResponseComplete(HttpStream httpStream, int errorCode) {
response.setStatusCode(httpStream.getResponseStatusCode());
httpStream.close();
reqCompleted.complete(errorCode);
}
};
BaseRetryableAccessor accessor = new BaseRetryableAccessor();
CrashableSupplier<IotCloudResponse, AWSIotException> getHttpResponse = () -> getHttpResponse(request);
return accessor.retry(RETRY_COUNT, BACKOFF_MILLIS, getHttpResponse,
new HashSet<>(Collections.singletonList(AWSIotException.class)));
}

private IotCloudResponse getHttpResponse(HttpClientConnection conn, HttpRequest request) throws AWSIotException {
final CompletableFuture<Integer> reqCompleted = new CompletableFuture<>();
final Map<String, String> responseHeaders = new HashMap<>();
private IotCloudResponse getHttpResponse(ExecutableHttpRequest request) throws AWSIotException {
final IotCloudResponse response = new IotCloudResponse();
// Give the request up to N seconds to complete, otherwise throw a TimeoutException
try {
conn.makeRequest(request, createResponseHandler(reqCompleted, responseHeaders, response)).activate();
int error = reqCompleted.get(TIMEOUT_FOR_RESPONSE_FROM_IOT_CLOUD_SECONDS, TimeUnit.SECONDS);
if (error != 0) {
throw new AWSIotException(String.format("Error %s(%d); RequestId: %s", HTTP_HEADER_ERROR_TYPE, error,
HTTP_HEADER_REQUEST_ID));
HttpExecuteResponse httpResponse = request.call();
response.setStatusCode(httpResponse.httpResponse().statusCode());
try (AbortableInputStream bodyStream = httpResponse.responseBody()
.orElseThrow(() -> new AWSIotException("No response body"))) {
response.setResponseBody(IoUtils.toByteArray(bodyStream));
}
return response;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOGGER.error("Http request failed with error", e);
throw new AWSIotException(e);
} catch (IOException e) {
throw new AWSIotException("Unable to get response", e);
}
return response;
}
}
117 changes: 25 additions & 92 deletions src/main/java/com/aws/greengrass/iot/IotConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,25 @@

import com.aws.greengrass.config.WhatHappened;
import com.aws.greengrass.deployment.DeviceConfiguration;
import com.aws.greengrass.deployment.exceptions.AWSIotException;
import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.logging.impl.LogManager;
import com.aws.greengrass.util.Coerce;
import com.aws.greengrass.util.ProxyUtils;
import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpClientConnectionManagerOptions;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.io.ClientBootstrap;
import software.amazon.awssdk.crt.io.EventLoopGroup;
import software.amazon.awssdk.crt.io.HostResolver;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.io.TlsContext;
import software.amazon.awssdk.crt.io.TlsContextOptions;
import software.amazon.awssdk.http.SdkHttpClient;

import java.io.Closeable;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;

import static com.aws.greengrass.componentmanager.ClientConfigurationUtils.getConfiguredClientBuilder;
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_MQTT_NAMESPACE;
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_AWS_REGION;
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_CERTIFICATE_FILE_PATH;
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_IOT_DATA_ENDPOINT;
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_PRIVATE_KEY_PATH;
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_ROOT_CA_PATH;
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_THING_NAME;
import static com.aws.greengrass.mqttclient.MqttClient.EVENTLOOP_SHUTDOWN_TIMEOUT_SECONDS;

public class IotConnectionManager implements Closeable {
private static final Logger LOGGER = LogManager.getLogger(IotConnectionManager.class);
// Max wait time for device to establish mTLS connection with IOT core
private static final long TIMEOUT_FOR_CONNECTION_SETUP_SECONDS = Duration.ofMinutes(1).getSeconds();
private HttpClientConnectionManager connManager;

private final EventLoopGroup eventLoopGroup;
private final HostResolver resolver;
private final ClientBootstrap clientBootstrap;
private final DeviceConfiguration deviceConfiguration;
private SdkHttpClient client;

/**
* Constructor.
Expand All @@ -58,89 +35,45 @@ public class IotConnectionManager implements Closeable {
@Inject
@SuppressWarnings("PMD.AvoidCatchingGenericException")
public IotConnectionManager(final DeviceConfiguration deviceConfiguration) {
eventLoopGroup = new EventLoopGroup(1);
resolver = new HostResolver(eventLoopGroup);
clientBootstrap = new ClientBootstrap(eventLoopGroup, resolver);
try {
this.connManager = initConnectionManager(deviceConfiguration);
reconfigureOnConfigChange(deviceConfiguration);
} catch (RuntimeException e) {
// If we couldn't initialize the connection manager, then make sure to shutdown
// everything which was started up
clientBootstrap.close();
resolver.close();
eventLoopGroup.close();
throw e;
}
this.deviceConfiguration = deviceConfiguration;
this.client = initConnectionManager();
reconfigureOnConfigChange();
}

private void reconfigureOnConfigChange(DeviceConfiguration deviceConfiguration) {
public URI getURI() {
return URI.create("https://" + Coerce.toString(deviceConfiguration.getIotCredentialEndpoint()));
}

public synchronized SdkHttpClient getClient() {
return this.client;
}

private void reconfigureOnConfigChange() {
deviceConfiguration.onAnyChange((what, node) -> {
if (WhatHappened.childChanged.equals(what) && node != null && (node.childOf(DEVICE_MQTT_NAMESPACE) || node
.childOf(DEVICE_PARAM_THING_NAME) || node.childOf(DEVICE_PARAM_IOT_DATA_ENDPOINT) || node
.childOf(DEVICE_PARAM_PRIVATE_KEY_PATH) || node.childOf(DEVICE_PARAM_CERTIFICATE_FILE_PATH) || node
.childOf(DEVICE_PARAM_ROOT_CA_PATH) || node.childOf(DEVICE_PARAM_AWS_REGION))) {
this.connManager = initConnectionManager(deviceConfiguration);
synchronized (this) {
this.client.close();
this.client = initConnectionManager();
}
}
});
}

private HttpClientConnectionManager initConnectionManager(DeviceConfiguration deviceConfiguration) {
final String certPath = Coerce.toString(deviceConfiguration.getCertificateFilePath());
final String keyPath = Coerce.toString(deviceConfiguration.getPrivateKeyFilePath());
final String caPath = Coerce.toString(deviceConfiguration.getRootCAFilePath());
try (TlsContextOptions tlsCtxOptions = TlsContextOptions.createWithMtlsFromPath(certPath, keyPath)) {
tlsCtxOptions.overrideDefaultTrustStoreFromPath(null, caPath);
return HttpClientConnectionManager
.create(new HttpClientConnectionManagerOptions().withClientBootstrap(clientBootstrap)
.withProxyOptions(ProxyUtils.getHttpProxyOptions(deviceConfiguration))
.withSocketOptions(new SocketOptions()).withTlsContext(new TlsContext(tlsCtxOptions))
.withUri(URI.create(
"https://" + Coerce.toString(deviceConfiguration.getIotCredentialEndpoint()))));
}
private SdkHttpClient initConnectionManager() {
return getConfiguredClientBuilder(deviceConfiguration).build();
}

/**
* Get a connection object for sending requests.
*
* @return {@link HttpClientConnection}
* @throws AWSIotException when getting a connection from underlying manager fails.
*/
public HttpClientConnection getConnection() throws AWSIotException {
try {
return connManager.acquireConnection().get(TIMEOUT_FOR_CONNECTION_SETUP_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException | HttpException e) {
LOGGER.error("Getting connection failed for endpoint {} with error {} ", connManager.getUri(), e);
throw new AWSIotException(e);
}
}

/**
* Get the host string underlying connection manager.
*
* @return Host string to be used in HTTP Host headers
*/
public String getHost() {
return connManager.getUri().getHost();
}

/**
* Clean up underlying connections and close gracefully.
*/
@Override
public void close() {
connManager.close();
clientBootstrap.close();
resolver.close();
eventLoopGroup.close();
try {
eventLoopGroup.getShutdownCompleteFuture().get(EVENTLOOP_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOGGER.atError().log("Error shutting down event loop", e);
} catch (TimeoutException e) {
LOGGER.atError().log("Timed out shutting down event loop");
public synchronized void close() {
if (this.client != null) {
this.client.close();
}
}
}
Loading

0 comments on commit 8d788c1

Please sign in to comment.