From 4a18cbe3675447043b939daa1dd7d6e2e73cc864 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 18 Dec 2024 15:14:40 -0500 Subject: [PATCH 01/22] [WIP] Update xlang kinesis to v2 --- .../trigger_files/beam_PostCommit_Python.json | 2 +- .../expansion-service/build.gradle | 39 +++ .../kinesis/KinesisTransformRegistrar.java | 267 ++++++++++++++++++ sdks/python/apache_beam/io/kinesis.py | 46 ++- .../python/test-suites/portable/common.gradle | 4 +- settings.gradle.kts | 1 + 6 files changed, 344 insertions(+), 15 deletions(-) create mode 100644 sdks/java/io/amazon-web-services2/expansion-service/build.gradle create mode 100644 sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 9c7a70ceed74..dd3d3e011a0c 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 7 + "modification": 8 } diff --git a/sdks/java/io/amazon-web-services2/expansion-service/build.gradle b/sdks/java/io/amazon-web-services2/expansion-service/build.gradle new file mode 100644 index 000000000000..fd712737f53c --- /dev/null +++ b/sdks/java/io/amazon-web-services2/expansion-service/build.gradle @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'org.apache.beam.module' +apply plugin: 'application' +mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService" + +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.amazon-web-services2.expansion.service', + exportJavadoc: false, + validateShadowJar: false, + shadowClosure: {}, +) + +description = "Apache Beam :: SDKs :: Java :: IO :: Amazon Web Services 2 :: Expansion Service" +ext.summary = "Expansion service serving AWS2" + +dependencies { + implementation project(":sdks:java:expansion-service") + permitUnusedDeclared project(":sdks:java:expansion-service") + implementation project(":sdks:java:io:amazon-web-services2") + permitUnusedDeclared project(":sdks:java:io:amazon-web-services2") + runtimeOnly library.java.slf4j_jdk14 +} \ No newline at end of file diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java new file mode 100644 index 000000000000..85f5c9f2db96 --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.kinesis; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.kinesis.common.InitialPositionInStream; +import com.google.auto.service.AutoService; +import java.util.Map; +import java.util.Properties; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Exposes {@link KinesisIO.Write} and {@link KinesisIO.Read} as an external transform for + * cross-language usage. + */ +@AutoService(ExternalTransformRegistrar.class) +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class KinesisTransformRegistrar implements ExternalTransformRegistrar { + public static final String WRITE_URN = "beam:transform:org.apache.beam:kinesis_write:v2"; + public static final String READ_DATA_URN = "beam:transform:org.apache.beam:kinesis_read_data:v2"; + + @Override + public Map> knownBuilderInstances() { + return ImmutableMap.of(WRITE_URN, new WriteBuilder(), READ_DATA_URN, new ReadDataBuilder()); + } + + private abstract static class CrossLanguageConfiguration { + String streamName; + String awsAccessKey; + String awsSecretKey; + Region region; + @Nullable String serviceEndpoint; + + public void setStreamName(String streamName) { + this.streamName = streamName; + } + + public void setAwsAccessKey(String awsAccessKey) { + this.awsAccessKey = awsAccessKey; + } + + public void setAwsSecretKey(String awsSecretKey) { + this.awsSecretKey = awsSecretKey; + } + + public void setRegion(String region) { + this.region = Region.of(region); + } + + public void setServiceEndpoint(@Nullable String serviceEndpoint) { + this.serviceEndpoint = serviceEndpoint; + } + } + + public static class WriteBuilder + implements ExternalTransformBuilder, PDone> { + + public static class Configuration extends CrossLanguageConfiguration { + private Properties producerProperties; + private String partitionKey; + + public void setProducerProperties(Map producerProperties) { + if (producerProperties != null) { + Properties properties = new Properties(); + producerProperties.forEach(properties::setProperty); + this.producerProperties = properties; + } + } + + public void setPartitionKey(String partitionKey) { + this.partitionKey = partitionKey; + } + } + + @Override + public PTransform, PDone> buildExternal(Configuration configuration) { + AwsBasicCredentials creds = + AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); + KinesisIO.Write writeTransform = + KinesisIO.write() + .withStreamName(configuration.streamName) + .withClientConfiguration( + ClientConfiguration.builder() + .credentialsProvider(provider) + .region(Region.of(configuration.region)) + .endpoint(configuration.serviceEndpoint) + .build()) + .withPartitioner(p -> configuration.partitionKey); + + if (configuration.producerProperties != null) { + writeTransform = writeTransform.withProducerProperties(configuration.producerProperties); + } + + return writeTransform; + } + } + + public static class ReadDataBuilder + implements ExternalTransformBuilder< + ReadDataBuilder.Configuration, PBegin, PCollection> { + + public static class Configuration extends CrossLanguageConfiguration { + private @Nullable Long maxNumRecords; + private @Nullable Duration maxReadTime; + private @Nullable InitialPositionInStream initialPositionInStream; + private @Nullable Instant initialTimestampInStream; + private @Nullable Integer requestRecordsLimit; + private @Nullable Duration upToDateThreshold; + private @Nullable Long maxCapacityPerShard; + private @Nullable WatermarkPolicy watermarkPolicy; + private @Nullable Duration watermarkIdleDurationThreshold; + private @Nullable Duration rateLimit; + + public void setMaxNumRecords(@Nullable Long maxNumRecords) { + this.maxNumRecords = maxNumRecords; + } + + public void setMaxReadTime(@Nullable Long maxReadTime) { + if (maxReadTime != null) { + this.maxReadTime = Duration.millis(maxReadTime); + } + } + + public void setInitialPositionInStream(@Nullable String initialPositionInStream) { + if (initialPositionInStream != null) { + this.initialPositionInStream = InitialPositionInStream.valueOf(initialPositionInStream); + } + } + + public void setInitialTimestampInStream(@Nullable Long initialTimestampInStream) { + if (initialTimestampInStream != null) { + this.initialTimestampInStream = Instant.ofEpochMilli(initialTimestampInStream); + } + } + + public void setRequestRecordsLimit(@Nullable Long requestRecordsLimit) { + if (requestRecordsLimit != null) { + this.requestRecordsLimit = requestRecordsLimit.intValue(); + } + } + + public void setUpToDateThreshold(@Nullable Long upToDateThreshold) { + if (upToDateThreshold != null) { + this.upToDateThreshold = Duration.millis(upToDateThreshold); + } + } + + public void setMaxCapacityPerShard(@Nullable Long maxCapacityPerShard) { + this.maxCapacityPerShard = maxCapacityPerShard; + } + + public void setWatermarkPolicy(@Nullable String watermarkPolicy) { + if (watermarkPolicy != null) { + this.watermarkPolicy = WatermarkPolicy.valueOf(watermarkPolicy); + } + } + + public void setWatermarkIdleDurationThreshold(@Nullable Long watermarkIdleDurationThreshold) { + if (watermarkIdleDurationThreshold != null) { + this.watermarkIdleDurationThreshold = Duration.millis(watermarkIdleDurationThreshold); + } + } + + public void setRateLimit(@Nullable Long rateLimit) { + if (rateLimit != null) { + this.rateLimit = Duration.millis(rateLimit); + } + } + } + + private enum WatermarkPolicy { + ARRIVAL_TIME, + PROCESSING_TIME + } + + @Override + public PTransform> buildExternal( + ReadDataBuilder.Configuration configuration) { + KinesisIO.Read readTransform = + KinesisIO.readData() + .withStreamName(configuration.streamName) + .withClientConfiguration( + ClientConfiguration.builder() + .credentialsProvider(provider) + .region(Region.of(configuration.region)) + .endpoint(configuration.serviceEndpoint) + .build()); + + if (configuration.maxNumRecords != null) { + readTransform = readTransform.withMaxNumRecords(configuration.maxNumRecords); + } + if (configuration.upToDateThreshold != null) { + readTransform = readTransform.withUpToDateThreshold(configuration.upToDateThreshold); + } + if (configuration.maxCapacityPerShard != null) { + readTransform = + readTransform.withMaxCapacityPerShard(configuration.maxCapacityPerShard.intValue()); + } + if (configuration.watermarkPolicy != null) { + switch (configuration.watermarkPolicy) { + case ARRIVAL_TIME: + readTransform = + configuration.watermarkIdleDurationThreshold != null + ? readTransform.withArrivalTimeWatermarkPolicy( + configuration.watermarkIdleDurationThreshold) + : readTransform.withArrivalTimeWatermarkPolicy(); + break; + case PROCESSING_TIME: + readTransform = readTransform.withProcessingTimeWatermarkPolicy(); + break; + default: + throw new RuntimeException( + String.format( + "Unsupported watermark policy type: %s", configuration.watermarkPolicy)); + } + } + if (configuration.rateLimit != null) { + readTransform = readTransform.withFixedDelayRateLimitPolicy(configuration.rateLimit); + } + if (configuration.maxReadTime != null) { + readTransform = readTransform.withMaxReadTime(configuration.maxReadTime); + } + if (configuration.initialPositionInStream != null) { + readTransform = + readTransform.withInitialPositionInStream(configuration.initialPositionInStream); + } + if (configuration.requestRecordsLimit != null) { + readTransform = readTransform.withRequestRecordsLimit(configuration.requestRecordsLimit); + } + if (configuration.initialTimestampInStream != null) { + readTransform = + readTransform.withInitialTimestampInStream(configuration.initialTimestampInStream); + } + return readTransform; + } + } +} \ No newline at end of file diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index bc5e1fa787b4..0be98c122f6e 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -49,7 +49,8 @@ In this option, Python SDK will either download (for released Beam version) or build (when running from a Beam Git clone) a expansion service jar and use that to expand transforms. Currently Kinesis transforms use the - 'beam-sdks-java-io-kinesis-expansion-service' jar for this purpose. + 'beam-sdks-java-io-amazon-web-services2-expansion-service' jar for this + purpose. *Option 2: specify a custom expansion service* @@ -99,7 +100,7 @@ def default_io_expansion_service(): return BeamJarExpansionService( - 'sdks:java:io:kinesis:expansion-service:shadowJar') + 'sdks:java:io:amazon-web-services2:expansion-service:shadowJar') WriteToKinesisSchema = NamedTuple( @@ -111,7 +112,6 @@ def default_io_expansion_service(): ('region', str), ('partition_key', str), ('service_endpoint', Optional[str]), - ('verify_certificate', Optional[bool]), ('producer_properties', Optional[Mapping[str, str]]), ], ) @@ -123,7 +123,7 @@ class WriteToKinesis(ExternalTransform): Experimental; no backwards compatibility guarantees. """ - URN = 'beam:transform:org.apache.beam:kinesis_write:v1' + URN = 'beam:transform:org.apache.beam:kinesis_write:v2' def __init__( self, @@ -145,14 +145,26 @@ def __init__( :param aws_secret_key: Kinesis access key secret. :param region: AWS region. Example: 'us-east-1'. :param service_endpoint: Kinesis service endpoint - :param verify_certificate: Enable or disable certificate verification. - Never set to False on production. True by default. + :param verify_certificate: Deprecated - certificates will always be + verified. :param partition_key: Specify default partition key. :param producer_properties: Specify the configuration properties for Kinesis Producer Library (KPL) as dictionary. Example: {'CollectionMaxCount': '1000', 'ConnectTimeout': '10000'} :param expansion_service: The address (host:port) of the ExpansionService. """ + if verify_certificate is False: + # Previously, we supported this via + # https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate-- + # With the new AWS client, we no longer support it and it is always True + raise ValueError( + 'verify_certificate set to False. This option is no longer ' + + 'supported and certificate verification will still happen.') + if verify_certificate is True: + logging.warning( + 'verify_certificate set to True. This option is no longer ' + + 'supported and certificate verification will automatically happen. ' + + 'This option may be removed in a future release') super().__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -163,7 +175,6 @@ def __init__( region=region, partition_key=partition_key, service_endpoint=service_endpoint, - verify_certificate=verify_certificate, producer_properties=producer_properties, )), expansion_service or default_io_expansion_service(), @@ -178,7 +189,6 @@ def __init__( ('aws_secret_key', str), ('region', str), ('service_endpoint', Optional[str]), - ('verify_certificate', Optional[bool]), ('max_num_records', Optional[int]), ('max_read_time', Optional[int]), ('initial_position_in_stream', Optional[str]), @@ -199,7 +209,7 @@ class ReadDataFromKinesis(ExternalTransform): Experimental; no backwards compatibility guarantees. """ - URN = 'beam:transform:org.apache.beam:kinesis_read_data:v1' + URN = 'beam:transform:org.apache.beam:kinesis_read_data:v2' def __init__( self, @@ -229,8 +239,8 @@ def __init__( :param aws_secret_key: Kinesis access key secret. :param region: AWS region. Example: 'us-east-1'. :param service_endpoint: Kinesis service endpoint - :param verify_certificate: Enable or disable certificate verification. - Never set to False on production. True by default. + :param verify_certificate: Deprecated - certificates will always be + verified. :param max_num_records: Specifies to read at most a given number of records. Must be greater than 0. :param max_read_time: Specifies to read records during x milliseconds. @@ -277,6 +287,19 @@ def __init__( ): logging.warning('Provided timestamp emplaced not in the past.') + if verify_certificate is False: + # Previously, we supported this via + # https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate-- + # With the new AWS client, we no longer support it and it is always True + raise ValueError( + 'verify_certificate set to False. This option is no longer ' + + 'supported and certificate verification will still happen.') + if verify_certificate is True: + logging.warning( + 'verify_certificate set to True. This option is no longer ' + + 'supported and certificate verification will automatically happen. ' + + 'This option may be removed in a future release') + super().__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -286,7 +309,6 @@ def __init__( aws_secret_key=aws_secret_key, region=region, service_endpoint=service_endpoint, - verify_certificate=verify_certificate, max_num_records=max_num_records, max_read_time=max_read_time, initial_position_in_stream=initial_position_in_stream, diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index be87be749862..99b477b2c7db 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -376,7 +376,7 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { ':sdks:java:testing:kafka-service:buildTestKafkaServiceJar', ':sdks:java:io:expansion-service:shadowJar', ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar', - ':sdks:java:io:kinesis:expansion-service:shadowJar', + ':sdks:java:io:amazon-web-services2:expansion-service:shadowJar', ':sdks:java:extensions:schemaio-expansion-service:shadowJar', ':sdks:java:io:debezium:expansion-service:shadowJar' ] @@ -426,7 +426,7 @@ project.tasks.register("xlangSpannerIOIT") { ":sdks:java:container:${currentJavaVersion}:docker", ':sdks:java:io:expansion-service:shadowJar', ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar', - ':sdks:java:io:kinesis:expansion-service:shadowJar', + ':sdks:java:io:amazon-web-services2:expansion-service:shadowJar', ':sdks:java:extensions:schemaio-expansion-service:shadowJar', ':sdks:java:io:debezium:expansion-service:shadowJar' ] diff --git a/settings.gradle.kts b/settings.gradle.kts index a8bee45a05ac..624e9f970d9d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -206,6 +206,7 @@ include(":sdks:java:harness") include(":sdks:java:harness:jmh") include(":sdks:java:io:amazon-web-services") include(":sdks:java:io:amazon-web-services2") +include(":sdks:java:io:amazon-web-services2:expansion-service") include(":sdks:java:io:amqp") include(":sdks:java:io:azure") include(":sdks:java:io:azure-cosmos") From 18255573b8d9e5d5300a5e09751f27d01debed41 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 18 Dec 2024 16:05:08 -0500 Subject: [PATCH 02/22] cleanup --- .../apache_beam/io/external/xlang_kinesisio_it_test.py | 7 +------ sdks/python/apache_beam/io/kinesis.py | 6 ++++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index 151d63d84684..44e6b78f4e93 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -116,8 +116,7 @@ def run_kinesis_write(self): region=self.aws_region, service_endpoint=self.aws_service_endpoint, verify_certificate=(not self.use_localstack), - partition_key='1', - producer_properties=self.producer_properties, + partition_key='1' )) def run_kinesis_read(self): @@ -219,10 +218,6 @@ def setUp(self): self.aws_service_endpoint = known_args.aws_service_endpoint self.use_localstack = not known_args.use_real_aws self.expansion_service = known_args.expansion_service - self.producer_properties = { - 'CollectionMaxCount': str(NUM_RECORDS), - 'ConnectTimeout': str(MAX_READ_TIME), - } if self.use_localstack: self.set_localstack() diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index 0be98c122f6e..1640b25830b4 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -112,7 +112,6 @@ def default_io_expansion_service(): ('region', str), ('partition_key', str), ('service_endpoint', Optional[str]), - ('producer_properties', Optional[Mapping[str, str]]), ], ) @@ -165,6 +164,10 @@ def __init__( 'verify_certificate set to True. This option is no longer ' + 'supported and certificate verification will automatically happen. ' + 'This option may be removed in a future release') + if producer_properties is not None: + raise ValueError( + 'producer_properties is no longer supported and will be removed in ' + + 'a future release.') super().__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -175,7 +178,6 @@ def __init__( region=region, partition_key=partition_key, service_endpoint=service_endpoint, - producer_properties=producer_properties, )), expansion_service or default_io_expansion_service(), ) From adc016f0f30db265597c0e24653f68c8238fe74d Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 18 Dec 2024 16:15:16 -0500 Subject: [PATCH 03/22] Add missed file --- .../kinesis/KinesisTransformRegistrar.java | 29 ++++++------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 85f5c9f2db96..7ad4c4f01973 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Properties; import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO; import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; @@ -85,17 +86,8 @@ public static class WriteBuilder implements ExternalTransformBuilder, PDone> { public static class Configuration extends CrossLanguageConfiguration { - private Properties producerProperties; private String partitionKey; - public void setProducerProperties(Map producerProperties) { - if (producerProperties != null) { - Properties properties = new Properties(); - producerProperties.forEach(properties::setProperty); - this.producerProperties = properties; - } - } - public void setPartitionKey(String partitionKey) { this.partitionKey = partitionKey; } @@ -103,24 +95,19 @@ public void setPartitionKey(String partitionKey) { @Override public PTransform, PDone> buildExternal(Configuration configuration) { - AwsBasicCredentials creds = - AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); KinesisIO.Write writeTransform = - KinesisIO.write() + KinesisIO.write() .withStreamName(configuration.streamName) .withClientConfiguration( ClientConfiguration.builder() .credentialsProvider(provider) - .region(Region.of(configuration.region)) + .region(configuration.region) .endpoint(configuration.serviceEndpoint) .build()) .withPartitioner(p -> configuration.partitionKey); - if (configuration.producerProperties != null) { - writeTransform = writeTransform.withProducerProperties(configuration.producerProperties); - } - return writeTransform; } } @@ -206,13 +193,15 @@ private enum WatermarkPolicy { @Override public PTransform> buildExternal( ReadDataBuilder.Configuration configuration) { - KinesisIO.Read readTransform = - KinesisIO.readData() + AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); + KinesisIO.Read readTransform = + KinesisIO.read() .withStreamName(configuration.streamName) .withClientConfiguration( ClientConfiguration.builder() .credentialsProvider(provider) - .region(Region.of(configuration.region)) + .region(configuration.region) .endpoint(configuration.serviceEndpoint) .build()); From fb95fee846dae1952b9144da347ceab0751c1a5f Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 18 Dec 2024 16:50:35 -0500 Subject: [PATCH 04/22] Fix up --- .../io/aws2/kinesis/KinesisTransformRegistrar.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 7ad4c4f01973..8bbe9968024c 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -22,6 +22,7 @@ import software.amazon.awssdk.regions.Region; import software.amazon.kinesis.common.InitialPositionInStream; import com.google.auto.service.AutoService; +import java.net.URI; import java.util.Map; import java.util.Properties; import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; @@ -59,7 +60,7 @@ private abstract static class CrossLanguageConfiguration { String awsAccessKey; String awsSecretKey; Region region; - @Nullable String serviceEndpoint; + @Nullable URI serviceEndpoint; public void setStreamName(String streamName) { this.streamName = streamName; @@ -78,7 +79,9 @@ public void setRegion(String region) { } public void setServiceEndpoint(@Nullable String serviceEndpoint) { - this.serviceEndpoint = serviceEndpoint; + if (serviceEndpoint != null) { + this.serviceEndpoint = URI(serviceEndpoint); + } } } @@ -97,7 +100,7 @@ public void setPartitionKey(String partitionKey) { public PTransform, PDone> buildExternal(Configuration configuration) { AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); - KinesisIO.Write writeTransform = + KinesisIO.Write writeTransform = KinesisIO.write() .withStreamName(configuration.streamName) .withClientConfiguration( @@ -250,7 +253,9 @@ public PTransform> buildExternal( readTransform = readTransform.withInitialTimestampInStream(configuration.initialTimestampInStream); } - return readTransform; + // Convert back to bytes to keep consistency with previous verison: + // https://github.com/apache/beam/blob/5eed396caf9e0065d8ed82edcc236bad5b71ba22/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java + return readTransform.Map(kr -> kr.getDataAsBytes()); } } } \ No newline at end of file From a0cf3637cd712ecefd3b58a1ae53e1638fb35d3a Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 18 Dec 2024 16:57:21 -0500 Subject: [PATCH 05/22] Fix up --- .../beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 8bbe9968024c..8df97f6d5304 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -86,7 +85,7 @@ public void setServiceEndpoint(@Nullable String serviceEndpoint) { } public static class WriteBuilder - implements ExternalTransformBuilder, PDone> { + implements ExternalTransformBuilder, KinesisIO.Write.Result> { public static class Configuration extends CrossLanguageConfiguration { private String partitionKey; @@ -97,7 +96,7 @@ public void setPartitionKey(String partitionKey) { } @Override - public PTransform, PDone> buildExternal(Configuration configuration) { + public PTransform, KinesisIO.Write.Result> buildExternal(Configuration configuration) { AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); KinesisIO.Write writeTransform = From 93e427ac01010233f114b8cba2e6b0b54d6db5d0 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 18 Dec 2024 17:02:33 -0500 Subject: [PATCH 06/22] Fix up --- .../sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 8df97f6d5304..5983f9a665ec 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -79,7 +79,7 @@ public void setRegion(String region) { public void setServiceEndpoint(@Nullable String serviceEndpoint) { if (serviceEndpoint != null) { - this.serviceEndpoint = URI(serviceEndpoint); + this.serviceEndpoint = new URI(serviceEndpoint); } } } @@ -254,7 +254,13 @@ public PTransform> buildExternal( } // Convert back to bytes to keep consistency with previous verison: // https://github.com/apache/beam/blob/5eed396caf9e0065d8ed82edcc236bad5b71ba22/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java - return readTransform.Map(kr -> kr.getDataAsBytes()); + return readTransform.apply("Convert to bytes", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + KinesisRecord record = c.element(); + return record.getDataAsBytes(); + } + })); } } } \ No newline at end of file From 3397d8554b09ebc76739d8d1c38bbe6556370114 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 19 Dec 2024 10:14:03 -0500 Subject: [PATCH 07/22] fix --- .../kinesis/KinesisTransformRegistrar.java | 88 +++++++++++++------ 1 file changed, 59 insertions(+), 29 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 5983f9a665ec..5f11c9d21461 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -17,29 +17,31 @@ */ package org.apache.beam.sdk.io.aws2.kinesis; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.kinesis.common.InitialPositionInStream; import com.google.auto.service.AutoService; import java.net.URI; +import java.net.URISyntaxException; import java.util.Map; -import java.util.Properties; import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; -import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.kinesis.common.InitialPositionInStream; /** - * Exposes {@link KinesisIO.Write} and {@link KinesisIO.Read} as an external transform for - * cross-language usage. + * Exposes {@link org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Write} and {@link + * org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read} as an external transform for cross-language + * usage. */ @AutoService(ExternalTransformRegistrar.class) @SuppressWarnings({ @@ -79,13 +81,19 @@ public void setRegion(String region) { public void setServiceEndpoint(@Nullable String serviceEndpoint) { if (serviceEndpoint != null) { - this.serviceEndpoint = new URI(serviceEndpoint); + try { + this.serviceEndpoint = new URI(serviceEndpoint); + } catch (URISyntaxException ex) { + throw new RuntimeException( + String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); + } } } } public static class WriteBuilder - implements ExternalTransformBuilder, KinesisIO.Write.Result> { + implements ExternalTransformBuilder< + WriteBuilder.Configuration, PCollection, KinesisIO.Write.Result> { public static class Configuration extends CrossLanguageConfiguration { private String partitionKey; @@ -96,18 +104,20 @@ public void setPartitionKey(String partitionKey) { } @Override - public PTransform, KinesisIO.Write.Result> buildExternal(Configuration configuration) { - AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + public PTransform, KinesisIO.Write.Result> buildExternal( + Configuration configuration) { + AwsBasicCredentials creds = + AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); KinesisIO.Write writeTransform = KinesisIO.write() .withStreamName(configuration.streamName) .withClientConfiguration( ClientConfiguration.builder() - .credentialsProvider(provider) - .region(configuration.region) - .endpoint(configuration.serviceEndpoint) - .build()) + .credentialsProvider(provider) + .region(configuration.region) + .endpoint(configuration.serviceEndpoint) + .build()) .withPartitioner(p -> configuration.partitionKey); return writeTransform; @@ -195,17 +205,18 @@ private enum WatermarkPolicy { @Override public PTransform> buildExternal( ReadDataBuilder.Configuration configuration) { - AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + AwsBasicCredentials creds = + AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); KinesisIO.Read readTransform = KinesisIO.read() .withStreamName(configuration.streamName) .withClientConfiguration( ClientConfiguration.builder() - .credentialsProvider(provider) - .region(configuration.region) - .endpoint(configuration.serviceEndpoint) - .build()); + .credentialsProvider(provider) + .region(configuration.region) + .endpoint(configuration.serviceEndpoint) + .build()); if (configuration.maxNumRecords != null) { readTransform = readTransform.withMaxNumRecords(configuration.maxNumRecords); @@ -252,15 +263,34 @@ public PTransform> buildExternal( readTransform = readTransform.withInitialTimestampInStream(configuration.initialTimestampInStream); } + + return new KinesisReadToBytes(readTransform); + } + } + + public static class KinesisReadToBytes extends PTransform> { + private KinesisIO.Read readTransform; + + private KinesisReadToBytes(KinesisIO.Read readTransform) { + this.readTransform = readTransform; + } + + @Override + public PCollection expand(PBegin input) { // Convert back to bytes to keep consistency with previous verison: // https://github.com/apache/beam/blob/5eed396caf9e0065d8ed82edcc236bad5b71ba22/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java - return readTransform.apply("Convert to bytes", ParDo.of(new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - KinesisRecord record = c.element(); - return record.getDataAsBytes(); - } - })); + return input + .apply(this.readTransform) + .apply( + "Convert to bytes", + ParDo.of( + new DoFn() { + @ProcessElement + public byte[] processElement(ProcessContext c) { + KinesisRecord record = c.element(); + return record.getDataAsBytes(); + } + })); } } -} \ No newline at end of file +} From fb462b4fc1fa55fad63e460744dd4a2aae633bd8 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 19 Dec 2024 15:40:10 +0000 Subject: [PATCH 08/22] fmt --- sdks/python/apache_beam/io/kinesis.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index 1640b25830b4..6d924e1992b8 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -157,17 +157,17 @@ def __init__( # https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate-- # With the new AWS client, we no longer support it and it is always True raise ValueError( - 'verify_certificate set to False. This option is no longer ' + - 'supported and certificate verification will still happen.') + 'verify_certificate set to False. This option is no longer ' + + 'supported and certificate verification will still happen.') if verify_certificate is True: logging.warning( - 'verify_certificate set to True. This option is no longer ' + - 'supported and certificate verification will automatically happen. ' + - 'This option may be removed in a future release') + 'verify_certificate set to True. This option is no longer ' + + 'supported and certificate verification will automatically ' + + 'happen. This option may be removed in a future release') if producer_properties is not None: raise ValueError( - 'producer_properties is no longer supported and will be removed in ' + - 'a future release.') + 'producer_properties is no longer supported and will be removed ' + + 'in a future release.') super().__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -294,13 +294,13 @@ def __init__( # https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate-- # With the new AWS client, we no longer support it and it is always True raise ValueError( - 'verify_certificate set to False. This option is no longer ' + - 'supported and certificate verification will still happen.') + 'verify_certificate set to False. This option is no longer ' + + 'supported and certificate verification will still happen.') if verify_certificate is True: logging.warning( - 'verify_certificate set to True. This option is no longer ' + - 'supported and certificate verification will automatically happen. ' + - 'This option may be removed in a future release') + 'verify_certificate set to True. This option is no longer ' + + 'supported and certificate verification will automatically ' + + 'happen. This option may be removed in a future release') super().__init__( self.URN, From d91a113dfa1a8f6cb9f52c3d272de16739d36f69 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 19 Dec 2024 17:44:07 +0000 Subject: [PATCH 09/22] Fix test --- .../apache_beam/io/external/xlang_kinesisio_it_test.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index 44e6b78f4e93..d37ce1f87fe3 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -115,9 +115,7 @@ def run_kinesis_write(self): aws_secret_key=self.aws_secret_key, region=self.aws_region, service_endpoint=self.aws_service_endpoint, - verify_certificate=(not self.use_localstack), - partition_key='1' - )) + partition_key='1')) def run_kinesis_read(self): records = [RECORD + str(i).encode() for i in range(NUM_RECORDS)] @@ -131,7 +129,6 @@ def run_kinesis_read(self): aws_secret_key=self.aws_secret_key, region=self.aws_region, service_endpoint=self.aws_service_endpoint, - verify_certificate=not self.use_localstack, max_num_records=NUM_RECORDS, max_read_time=MAX_READ_TIME, request_records_limit=REQUEST_RECORDS_LIMIT, From 90c13e9948c669b8f65d9be9119948ab18a4da37 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 19 Dec 2024 17:45:05 +0000 Subject: [PATCH 10/22] lint --- sdks/python/apache_beam/io/kinesis.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index 6d924e1992b8..cdae53e71c5a 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -82,7 +82,6 @@ import logging import time -from typing import Mapping from typing import NamedTuple from typing import Optional From a9eed7d7c2b7950d818b7d507b163b53071b45f8 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 19 Dec 2024 20:57:04 +0000 Subject: [PATCH 11/22] Add serializer --- .../beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 5f11c9d21461..f3a5e261ff7f 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -109,6 +109,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); + SerializableFunction serializer = v -> v; KinesisIO.Write writeTransform = KinesisIO.write() .withStreamName(configuration.streamName) @@ -118,7 +119,8 @@ public PTransform, KinesisIO.Write.Result> buildExternal( .region(configuration.region) .endpoint(configuration.serviceEndpoint) .build()) - .withPartitioner(p -> configuration.partitionKey); + .withPartitioner(p -> configuration.partitionKey) + .withSerializer(serializer); return writeTransform; } From 3f01f70d9f1fc32c6d695025d2a4c9f5b1be60c9 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 19 Dec 2024 21:22:06 +0000 Subject: [PATCH 12/22] Add serializer --- .../beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index f3a5e261ff7f..8dfe1f07729e 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; From a4aa6f1944ea03548b237379f86c0278be3c539b Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 14:44:23 +0000 Subject: [PATCH 13/22] Allow configuration to be serialized --- .../kinesis/KinesisTransformRegistrar.java | 35 +++++++++++++------ 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 8dfe1f07729e..ddb80131f6b7 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -62,7 +62,7 @@ private abstract static class CrossLanguageConfiguration { String awsAccessKey; String awsSecretKey; Region region; - @Nullable URI serviceEndpoint; + @Nullable String serviceEndpoint; public void setStreamName(String streamName) { this.streamName = streamName; @@ -81,14 +81,7 @@ public void setRegion(String region) { } public void setServiceEndpoint(@Nullable String serviceEndpoint) { - if (serviceEndpoint != null) { - try { - this.serviceEndpoint = new URI(serviceEndpoint); - } catch (URISyntaxException ex) { - throw new RuntimeException( - String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); - } - } + this.serviceEndpoint = serviceEndpoint; } } @@ -111,6 +104,16 @@ public PTransform, KinesisIO.Write.Result> buildExternal( AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); SerializableFunction serializer = v -> v; + @Nullable URI endpoint; + if (configuration.serviceEndpoint != null) { + try { + endpoint = configuration.serviceEndpoint; + } + catch (URISyntaxException ex) { + throw new RuntimeException( + String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); + } + } KinesisIO.Write writeTransform = KinesisIO.write() .withStreamName(configuration.streamName) @@ -118,7 +121,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( ClientConfiguration.builder() .credentialsProvider(provider) .region(configuration.region) - .endpoint(configuration.serviceEndpoint) + .endpoint(endpoint) .build()) .withPartitioner(p -> configuration.partitionKey) .withSerializer(serializer); @@ -211,6 +214,16 @@ public PTransform> buildExternal( AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); + @Nullable URI endpoint; + if (configuration.serviceEndpoint != null) { + try { + endpoint = configuration.serviceEndpoint; + } + catch (URISyntaxException ex) { + throw new RuntimeException( + String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); + } + } KinesisIO.Read readTransform = KinesisIO.read() .withStreamName(configuration.streamName) @@ -218,7 +231,7 @@ public PTransform> buildExternal( ClientConfiguration.builder() .credentialsProvider(provider) .region(configuration.region) - .endpoint(configuration.serviceEndpoint) + .endpoint(endpoint) .build()); if (configuration.maxNumRecords != null) { From 7a63a0b17e5de6fe087f2c0ea871af2de23014e3 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 15:18:20 +0000 Subject: [PATCH 14/22] Allow configuration to be serialized --- .../io/aws2/kinesis/KinesisTransformRegistrar.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index ddb80131f6b7..42d06e81f66e 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -104,14 +104,14 @@ public PTransform, KinesisIO.Write.Result> buildExternal( AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); SerializableFunction serializer = v -> v; - @Nullable URI endpoint; + @Nullable URI endpoint = null; if (configuration.serviceEndpoint != null) { try { - endpoint = configuration.serviceEndpoint; + endpoint = new URI(configuration.serviceEndpoint); } catch (URISyntaxException ex) { throw new RuntimeException( - String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); + String.format("Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); } } KinesisIO.Write writeTransform = @@ -214,14 +214,14 @@ public PTransform> buildExternal( AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); - @Nullable URI endpoint; + @Nullable URI endpoint = null; if (configuration.serviceEndpoint != null) { try { - endpoint = configuration.serviceEndpoint; + endpoint = new URI(configuration.serviceEndpoint); } catch (URISyntaxException ex) { throw new RuntimeException( - String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); + String.format("Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); } } KinesisIO.Read readTransform = From d27843403d3a8eb7d5151e692efce392dd370abf Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 17:27:51 +0000 Subject: [PATCH 15/22] Allow configuration to be serialized --- .../sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 42d06e81f66e..944aa7d7a202 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -61,7 +61,7 @@ private abstract static class CrossLanguageConfiguration { String streamName; String awsAccessKey; String awsSecretKey; - Region region; + String region; @Nullable String serviceEndpoint; public void setStreamName(String streamName) { @@ -77,7 +77,7 @@ public void setAwsSecretKey(String awsSecretKey) { } public void setRegion(String region) { - this.region = Region.of(region); + this.region = region; } public void setServiceEndpoint(@Nullable String serviceEndpoint) { @@ -120,7 +120,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( .withClientConfiguration( ClientConfiguration.builder() .credentialsProvider(provider) - .region(configuration.region) + .region(Regions.fromName(configuration.region)) .endpoint(endpoint) .build()) .withPartitioner(p -> configuration.partitionKey) @@ -230,7 +230,7 @@ public PTransform> buildExternal( .withClientConfiguration( ClientConfiguration.builder() .credentialsProvider(provider) - .region(configuration.region) + .region(Regions.fromName(configuration.region)) .endpoint(endpoint) .build()); From 1b79da4fe7010d87aae5e911d34c0cb0a7b22f4f Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 17:54:42 +0000 Subject: [PATCH 16/22] Allow configuration to be serialized --- .../beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 944aa7d7a202..0908071646a9 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -120,7 +120,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( .withClientConfiguration( ClientConfiguration.builder() .credentialsProvider(provider) - .region(Regions.fromName(configuration.region)) + .region(Region.of(configuration.region)) .endpoint(endpoint) .build()) .withPartitioner(p -> configuration.partitionKey) @@ -230,7 +230,7 @@ public PTransform> buildExternal( .withClientConfiguration( ClientConfiguration.builder() .credentialsProvider(provider) - .region(Regions.fromName(configuration.region)) + .region(Region.of(configuration.region)) .endpoint(endpoint) .build()); From f0c0f5875c3030effa721b28ef73640b6350d386 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 20:58:38 +0000 Subject: [PATCH 17/22] debug info --- .../java/org/apache/beam/sdk/util/SerializableUtils.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index 54f2a6572f6f..c4941e22c4a1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -21,6 +21,8 @@ import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -56,7 +58,8 @@ public static byte[] serializeToByteArray(Serializable value) { } return buffer.toByteArray(); } catch (IOException exn) { - throw new IllegalArgumentException("unable to serialize " + value, exn); + ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter(); + throw new IllegalArgumentException("unable to serialize " + ow.writeValueAsString(object), exn); } } From 5f816a16681a348aa4c39ea4269181913c913441 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 21:13:10 +0000 Subject: [PATCH 18/22] debug info --- .../main/java/org/apache/beam/sdk/util/SerializableUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index c4941e22c4a1..82f109d1ad43 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -59,7 +59,7 @@ public static byte[] serializeToByteArray(Serializable value) { return buffer.toByteArray(); } catch (IOException exn) { ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter(); - throw new IllegalArgumentException("unable to serialize " + ow.writeValueAsString(object), exn); + throw new IllegalArgumentException("unable to serialize " + ow.writeValueAsString(value), exn); } } From 0e943df8a139f95a8e89654b7151f88d6f0659ee Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 21:25:43 +0000 Subject: [PATCH 19/22] debug info --- .../java/org/apache/beam/sdk/util/SerializableUtils.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index 82f109d1ad43..fe148d917825 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.JsonMappingException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -59,7 +60,11 @@ public static byte[] serializeToByteArray(Serializable value) { return buffer.toByteArray(); } catch (IOException exn) { ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter(); - throw new IllegalArgumentException("unable to serialize " + ow.writeValueAsString(value), exn); + try { + throw new IllegalArgumentException("unable to serialize " + ow.writeValueAsString(value), exn); + } catch (JsonProcessingException ex) { + IllegalArgumentException("unable to jsonify " + value, exn); + } } } From d13783243d9e260610c50d09b4986609b38d74b9 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 21:28:26 +0000 Subject: [PATCH 20/22] debug info --- .../main/java/org/apache/beam/sdk/util/SerializableUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index fe148d917825..20dfcf75cc41 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -63,7 +63,7 @@ public static byte[] serializeToByteArray(Serializable value) { try { throw new IllegalArgumentException("unable to serialize " + ow.writeValueAsString(value), exn); } catch (JsonProcessingException ex) { - IllegalArgumentException("unable to jsonify " + value, exn); + throw new IllegalArgumentException("unable to jsonify " + value, exn); } } } From c6c736d94ea9f5bc6e9332316d115822740e1763 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 21:30:20 +0000 Subject: [PATCH 21/22] debug info --- .../org/apache/beam/sdk/util/SerializableUtils.java | 7 ++++--- .../io/aws2/kinesis/KinesisTransformRegistrar.java | 12 ++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index 20dfcf75cc41..43c0b343c3bf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -21,8 +21,8 @@ import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.JsonMappingException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -61,7 +61,8 @@ public static byte[] serializeToByteArray(Serializable value) { } catch (IOException exn) { ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter(); try { - throw new IllegalArgumentException("unable to serialize " + ow.writeValueAsString(value), exn); + throw new IllegalArgumentException( + "unable to serialize " + ow.writeValueAsString(value), exn); } catch (JsonProcessingException ex) { throw new IllegalArgumentException("unable to jsonify " + value, exn); } diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 0908071646a9..b7e71fe18f4d 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -108,10 +108,10 @@ public PTransform, KinesisIO.Write.Result> buildExternal( if (configuration.serviceEndpoint != null) { try { endpoint = new URI(configuration.serviceEndpoint); - } - catch (URISyntaxException ex) { + } catch (URISyntaxException ex) { throw new RuntimeException( - String.format("Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); + String.format( + "Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); } } KinesisIO.Write writeTransform = @@ -218,10 +218,10 @@ public PTransform> buildExternal( if (configuration.serviceEndpoint != null) { try { endpoint = new URI(configuration.serviceEndpoint); - } - catch (URISyntaxException ex) { + } catch (URISyntaxException ex) { throw new RuntimeException( - String.format("Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); + String.format( + "Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); } } KinesisIO.Read readTransform = From 4260926cb82e24929e3fe8e83c5309f5d48c426b Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 21:34:12 +0000 Subject: [PATCH 22/22] debug info --- .../main/java/org/apache/beam/sdk/util/SerializableUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index 43c0b343c3bf..4f01248cf2fc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -21,9 +21,9 @@ import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.JsonMappingException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException;