Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update xlang kinesis to v2 #33416

Merged
merged 44 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
4a18cbe
[WIP] Update xlang kinesis to v2
damccorm Dec 18, 2024
1825557
cleanup
damccorm Dec 18, 2024
adc016f
Add missed file
damccorm Dec 18, 2024
fb95fee
Fix up
damccorm Dec 18, 2024
a0cf363
Fix up
damccorm Dec 18, 2024
93e427a
Fix up
damccorm Dec 18, 2024
3397d85
fix
damccorm Dec 19, 2024
fb462b4
fmt
damccorm Dec 19, 2024
d91a113
Fix test
damccorm Dec 19, 2024
90c13e9
lint
damccorm Dec 19, 2024
a9eed7d
Add serializer
damccorm Dec 19, 2024
3f01f70
Add serializer
damccorm Dec 19, 2024
a4aa6f1
Allow configuration to be serialized
damccorm Dec 20, 2024
7a63a0b
Allow configuration to be serialized
damccorm Dec 20, 2024
d278434
Allow configuration to be serialized
damccorm Dec 20, 2024
1b79da4
Allow configuration to be serialized
damccorm Dec 20, 2024
f0c0f58
debug info
damccorm Dec 20, 2024
5f816a1
debug info
damccorm Dec 20, 2024
0e943df
debug info
damccorm Dec 20, 2024
d137832
debug info
damccorm Dec 20, 2024
c6c736d
debug info
damccorm Dec 20, 2024
4260926
debug info
damccorm Dec 20, 2024
d737e5b
Allow writebuilder to be serialized
damccorm Dec 23, 2024
64d391e
Try skipping certs
damccorm Dec 23, 2024
80daeb5
Make sure it gets set for now
damccorm Dec 23, 2024
4c8992c
put behind flag
damccorm Dec 23, 2024
adc70ad
Doc + debug further
damccorm Dec 23, 2024
68de24c
Merge in master
damccorm Dec 23, 2024
30fd605
Merge in master
damccorm Dec 23, 2024
c8caf0c
Debug info
damccorm Dec 24, 2024
583157b
Pass through param
damccorm Dec 24, 2024
e1f306d
Remove debug
damccorm Dec 26, 2024
4bf4425
Remove debug
damccorm Dec 26, 2024
3822660
override trust manager
damccorm Dec 26, 2024
60c2480
checkstyle
damccorm Dec 26, 2024
1a00a4d
Try disabling aggregation
damccorm Dec 26, 2024
f9e617a
easier debugging
damccorm Dec 26, 2024
ad6ad03
Try upgrading localstack
damccorm Dec 27, 2024
1b1fc38
change how containers are started
damccorm Dec 27, 2024
236dfcb
change how containers are started
damccorm Dec 27, 2024
8822b86
force http1
damccorm Dec 27, 2024
71f9160
Add back all tests
damccorm Dec 27, 2024
8268fb1
Update changes wording
damccorm Dec 27, 2024
f95fa7a
Better change description
damccorm Dec 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 7
"modification": 8
}

2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* AWS V1 I/Os have been removed (Java). As part of this, x-lang Python I/Os no longer support setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)).

## Deprecations

Expand Down
39 changes: 39 additions & 0 deletions sdks/java/io/amazon-web-services2/expansion-service/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply plugin: 'org.apache.beam.module'
apply plugin: 'application'
mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"

applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.amazon-web-services2.expansion.service',
exportJavadoc: false,
validateShadowJar: false,
shadowClosure: {},
)

description = "Apache Beam :: SDKs :: Java :: IO :: Amazon Web Services 2 :: Expansion Service"
ext.summary = "Expansion service serving AWS2"

dependencies {
implementation project(":sdks:java:expansion-service")
permitUnusedDeclared project(":sdks:java:expansion-service")
implementation project(":sdks:java:io:amazon-web-services2")
permitUnusedDeclared project(":sdks:java:io:amazon-web-services2")
runtimeOnly library.java.slf4j_jdk14
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@

import java.io.Serializable;
import java.net.URI;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand All @@ -37,9 +41,12 @@
import software.amazon.awssdk.core.client.builder.SdkSyncClientBuilder;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.TlsTrustManagersProvider;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.internal.http.NoneTlsKeyManagersProvider;
import software.amazon.awssdk.regions.Region;

/**
Expand Down Expand Up @@ -113,6 +120,32 @@ static <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT> ClientT b
return ClientBuilderFactory.getFactory(options).create(builder, config, options).build();
}

/** Trust provider to skip certificate verification. Should only be used for test pipelines. */
class SkipCertificateVerificationTrustManagerProvider implements TlsTrustManagersProvider {
public SkipCertificateVerificationTrustManagerProvider() {}

@Override
public TrustManager[] trustManagers() {
TrustManager tm =
new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] x509CertificateArr, String str)
throws CertificateException {}

@Override
public void checkServerTrusted(X509Certificate[] x509CertificateArr, String str)
throws CertificateException {}

@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
};
TrustManager[] tms = {tm};
return tms;
}
}

/**
* Default implementation of {@link ClientBuilderFactory}. This implementation can configure both,
* synchronous clients using {@link ApacheHttpClient} as well as asynchronous clients using {@link
Expand Down Expand Up @@ -161,7 +194,11 @@ public <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT> BuilderT

HttpClientConfiguration httpConfig = options.getHttpClientConfiguration();
ProxyConfiguration proxyConfig = options.getProxyConfiguration();
if (proxyConfig != null || httpConfig != null) {
boolean skipCertificateVerification = false;
if (config.skipCertificateVerification() != null) {
skipCertificateVerification = config.skipCertificateVerification();
}
if (proxyConfig != null || httpConfig != null || skipCertificateVerification) {
if (builder instanceof SdkSyncClientBuilder) {
ApacheHttpClient.Builder client = syncClientBuilder();

Expand All @@ -177,6 +214,11 @@ public <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT> BuilderT
setOptional(httpConfig.maxConnections(), client::maxConnections);
}

if (skipCertificateVerification) {
client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance());
client.tlsTrustManagersProvider(new SkipCertificateVerificationTrustManagerProvider());
}

// must use builder to make sure client is managed by the SDK
((SdkSyncClientBuilder<?, ?>) builder).httpClientBuilder(client);
} else if (builder instanceof SdkAsyncClientBuilder) {
Expand All @@ -201,6 +243,12 @@ public <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT> BuilderT
setOptional(httpConfig.maxConnections(), client::maxConcurrency);
}

if (skipCertificateVerification) {
client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance());
client.tlsTrustManagersProvider(new SkipCertificateVerificationTrustManagerProvider());
client.protocol(Protocol.HTTP1_1);
}

// must use builder to make sure client is managed by the SDK
((SdkAsyncClientBuilder<?, ?>) builder).httpClientBuilder(client);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ public abstract class ClientConfiguration implements Serializable {
return regionId() != null ? Region.of(regionId()) : null;
}

/**
* Optional flag to skip certificate verification. Should only be overriden for test scenarios. If
* set, this overwrites the default in {@link AwsOptions#skipCertificateVerification()}.
*/
@JsonProperty
public abstract @Nullable @Pure Boolean skipCertificateVerification();

/**
* Optional service endpoint to use AWS compatible services instead, e.g. for testing. If set,
* this overwrites the default in {@link AwsOptions#getEndpoint()}.
Expand Down Expand Up @@ -156,6 +163,13 @@ public Builder retry(Consumer<RetryConfiguration.Builder> retry) {
return retry(builder.build());
}

/**
* Optional flag to skip certificate verification. Should only be overriden for test scenarios.
* If set, this overwrites the default in {@link AwsOptions#skipCertificateVerification()}.
*/
@JsonProperty
public abstract Builder skipCertificateVerification(boolean skipCertificateVerification);

abstract Builder regionId(String region);

abstract Builder credentialsProviderAsJson(String credentialsProvider);
Expand Down
Loading
Loading