Skip to content

Commit

Permalink
Merge branch 'awslabs:master' into yang/double-quote-escape
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazcy authored Nov 22, 2022
2 parents 32677c4 + b03fcc1 commit de2168d
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import com.evanlennick.retry4j.Status;
import com.evanlennick.retry4j.config.RetryConfig;
import com.evanlennick.retry4j.config.RetryConfigBuilder;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.evanlennick.retry4j.exception.UnexpectedException;
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import org.apache.commons.lang3.StringUtils;
import org.apache.tinkerpop.gremlin.driver.IamAuthConfig;
import software.amazon.utils.RegionUtils;
Expand Down Expand Up @@ -128,58 +129,82 @@ private AWSLambda createLambdaClient(String region, String iamProfile, AWSCreden
@Override
public Map<EndpointsSelector, Collection<String>> getAddresses() {

if (endpointsSelector.getClass().equals(EndpointsType.class)) {
Callable<Map<EndpointsSelector, Collection<String>>> query = () -> {
if (EndpointsType.class.isAssignableFrom(endpointsSelector.getClass())) {
return getAddressesForEndpointsType();
} else {
return getAddressesForCustomEndpointsSelector();
}
}

EndpointsType endpointsType = (EndpointsType) endpointsSelector;
private Map<EndpointsSelector, Collection<String>> getAddressesForCustomEndpointsSelector() {
Callable<NeptuneClusterMetadata> query = () -> {

InvokeRequest invokeRequest = new InvokeRequest()
.withFunctionName(lambdaName)
.withPayload(String.format("\"%s\"", endpointsType.name()));
InvokeRequest invokeRequest = new InvokeRequest()
.withFunctionName(lambdaName)
.withPayload("\"\"");
InvokeResult result = lambdaClient.invoke(invokeRequest);

InvokeResult result = lambdaClient.invoke(invokeRequest);
String payload = new String(result.getPayload().array());
return NeptuneClusterMetadata.fromByeArray(result.getPayload().array());
};

Map<EndpointsSelector, Collection<String>> results = new HashMap<>();
@SuppressWarnings("unchecked")
CallExecutor<NeptuneClusterMetadata> executor =
new CallExecutorBuilder<Map<EndpointsSelector, Collection<String>>>().config(retryConfig).build();

results.put(endpointsType, Arrays.asList(payload.split(",")));
Status<NeptuneClusterMetadata> status;

return results;
};
try {
status = executor.execute(query);
} catch (UnexpectedException e){
if (e.getCause() instanceof MismatchedInputException){
throw new IllegalStateException(String.format("The AWS Lambda proxy (%s) isn't returning a NeptuneClusterMetadata JSON document. Check that the function supports returning a NeptuneClusterMetadata JSON document.", lambdaName), e.getCause());
} else{
throw new IllegalStateException(String.format("There was an unexpected error while attempting to get a NeptuneClusterMetadata JSON document from the AWS Lambda proxy (%s). Check that the function supports returning a NeptuneClusterMetadata JSON document.", lambdaName), e.getCause());
}
}

@SuppressWarnings("unchecked")
CallExecutor<Map<EndpointsSelector, Collection<String>>> executor =
new CallExecutorBuilder<Map<EndpointsSelector, Collection<String>>>().config(retryConfig).build();
NeptuneClusterMetadata neptuneClusterMetadata = status.getResult();
Map<EndpointsSelector, Collection<String>> results = new HashMap<>();

Status<Map<EndpointsSelector, Collection<String>>> status = executor.execute(query);
results.put(endpointsSelector, endpointsSelector.getEndpoints(
neptuneClusterMetadata.getClusterEndpoint(),
neptuneClusterMetadata.getReaderEndpoint(),
neptuneClusterMetadata.getInstances()));

return status.getResult();
} else {
Callable<NeptuneClusterMetadata> query = () -> {
return results;
}

InvokeRequest invokeRequest = new InvokeRequest()
.withFunctionName(lambdaName)
.withPayload("\"\"");
InvokeResult result = lambdaClient.invoke(invokeRequest);
private Map<EndpointsSelector, Collection<String>> getAddressesForEndpointsType() {
Callable<Map<EndpointsSelector, Collection<String>>> query = () -> {

return NeptuneClusterMetadata.fromByeArray(result.getPayload().array());
};
EndpointsType endpointsType = (EndpointsType) endpointsSelector;

@SuppressWarnings("unchecked")
CallExecutor<NeptuneClusterMetadata> executor =
new CallExecutorBuilder<Map<EndpointsSelector, Collection<String>>>().config(retryConfig).build();
InvokeRequest invokeRequest = new InvokeRequest()
.withFunctionName(lambdaName)
.withPayload(String.format("\"%s\"", endpointsType.name()));

Status<NeptuneClusterMetadata> status = executor.execute(query);
NeptuneClusterMetadata neptuneClusterMetadata = status.getResult();
InvokeResult result = lambdaClient.invoke(invokeRequest);
String payload = new String(result.getPayload().array());

Map<EndpointsSelector, Collection<String>> results = new HashMap<>();

results.put(endpointsSelector, endpointsSelector.getEndpoints(
neptuneClusterMetadata.getClusterEndpoint(),
neptuneClusterMetadata.getReaderEndpoint(),
neptuneClusterMetadata.getInstances()));
results.put(endpointsType, Arrays.asList(payload.split(",")));

return results;
};

@SuppressWarnings("unchecked")
CallExecutor<Map<EndpointsSelector, Collection<String>>> executor =
new CallExecutorBuilder<Map<EndpointsSelector, Collection<String>>>().config(retryConfig).build();

Status<Map<EndpointsSelector, Collection<String>>> status;

try{
status = executor.execute(query);
} catch (UnexpectedException e){
throw new IllegalStateException(String.format("There was an unexpected error while attempting to get a list of endpoints from the AWS Lambda proxy (%s). Check that the function supports returning a list of endpoints.", lambdaName), e.getCause());
}

return status.getResult();
}
}
40 changes: 40 additions & 0 deletions neptune-python-utils/build-lambda-layer.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/bin/bash -ex

pushd .
pip install virtualenv
rm -rf target
rm -rf temp
mkdir target
virtualenv temp --python=python3.8
source temp/bin/activate
cd temp
pip install gremlinpython==3.5.1
pip install requests
pip install backoff
pip install cchardet
pip install aiodns
pip install idna-ssl
pushd lib/python3.8/site-packages
#rm -rf certifi-*
rm -rf easy_install.py
rm -rf six.py
cp -r ../../../../neptune_python_utils .
popd
mkdir python
mv lib python/lib
zip -r neptune_python_utils_lambda_layer.zip python \
-x "*pycache*" \
-x "*.so" \
-x "*dist-info*" \
-x "*.virtualenv" \
-x "*/pip*" \
-x "*/pkg_resources*" \
-x "*/setuptools*" \
-x "*/wheel*" \
-x "*distutils*" \
-x "*/_virtualenv.*" \
#-x "*/certifi*"
deactivate
popd
mv temp/neptune_python_utils_lambda_layer.zip target/neptune_python_utils_lambda_layer.zip
rm -rf temp

0 comments on commit de2168d

Please sign in to comment.