diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 9c7a70ceed74..dd3d3e011a0c 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 7 + "modification": 8 } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index 54f2a6572f6f..4f01248cf2fc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -21,6 +21,9 @@ import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -56,7 +59,13 @@ public static byte[] serializeToByteArray(Serializable value) { } return buffer.toByteArray(); } catch (IOException exn) { - throw new IllegalArgumentException("unable to serialize " + value, exn); + ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter(); + try { + throw new IllegalArgumentException( + "unable to serialize " + ow.writeValueAsString(value), exn); + } catch (JsonProcessingException ex) { + throw new IllegalArgumentException("unable to jsonify " + value, exn); + } } } diff --git a/sdks/java/io/amazon-web-services2/expansion-service/build.gradle b/sdks/java/io/amazon-web-services2/expansion-service/build.gradle new file mode 100644 index 000000000000..fd712737f53c --- /dev/null +++ b/sdks/java/io/amazon-web-services2/expansion-service/build.gradle @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'org.apache.beam.module' +apply plugin: 'application' +mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService" + +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.amazon-web-services2.expansion.service', + exportJavadoc: false, + validateShadowJar: false, + shadowClosure: {}, +) + +description = "Apache Beam :: SDKs :: Java :: IO :: Amazon Web Services 2 :: Expansion Service" +ext.summary = "Expansion service serving AWS2" + +dependencies { + implementation project(":sdks:java:expansion-service") + permitUnusedDeclared project(":sdks:java:expansion-service") + implementation project(":sdks:java:io:amazon-web-services2") + permitUnusedDeclared project(":sdks:java:io:amazon-web-services2") + runtimeOnly library.java.slf4j_jdk14 +} \ No newline at end of file diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java new file mode 100644 index 000000000000..b7e71fe18f4d --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.kinesis; + +import com.google.auto.service.AutoService; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.kinesis.common.InitialPositionInStream; + +/** + * Exposes {@link org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Write} and {@link + * org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read} as an external transform for cross-language + * usage. + */ +@AutoService(ExternalTransformRegistrar.class) +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class KinesisTransformRegistrar implements ExternalTransformRegistrar { + public static final String WRITE_URN = "beam:transform:org.apache.beam:kinesis_write:v2"; + public static final String READ_DATA_URN = "beam:transform:org.apache.beam:kinesis_read_data:v2"; + + @Override + public Map> knownBuilderInstances() { + return ImmutableMap.of(WRITE_URN, new WriteBuilder(), READ_DATA_URN, new ReadDataBuilder()); + } + + private abstract static class CrossLanguageConfiguration { + String streamName; + String awsAccessKey; + String awsSecretKey; + String region; + @Nullable String serviceEndpoint; + + public void setStreamName(String streamName) { + this.streamName = streamName; + } + + public void setAwsAccessKey(String awsAccessKey) { + this.awsAccessKey = awsAccessKey; + } + + public void setAwsSecretKey(String awsSecretKey) { + this.awsSecretKey = awsSecretKey; + } + + public void setRegion(String region) { + this.region = region; + } + + public void setServiceEndpoint(@Nullable String serviceEndpoint) { + this.serviceEndpoint = serviceEndpoint; + } + } + + public static class WriteBuilder + implements ExternalTransformBuilder< + WriteBuilder.Configuration, PCollection, KinesisIO.Write.Result> { + + public static class Configuration extends CrossLanguageConfiguration { + private String partitionKey; + + public void setPartitionKey(String partitionKey) { + this.partitionKey = partitionKey; + } + } + + @Override + public PTransform, KinesisIO.Write.Result> buildExternal( + Configuration configuration) { + AwsBasicCredentials creds = + AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); + SerializableFunction serializer = v -> v; + @Nullable URI endpoint = null; + if (configuration.serviceEndpoint != null) { + try { + endpoint = new URI(configuration.serviceEndpoint); + } catch (URISyntaxException ex) { + throw new RuntimeException( + String.format( + "Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); + } + } + KinesisIO.Write writeTransform = + KinesisIO.write() + .withStreamName(configuration.streamName) + .withClientConfiguration( + ClientConfiguration.builder() + .credentialsProvider(provider) + .region(Region.of(configuration.region)) + .endpoint(endpoint) + .build()) + .withPartitioner(p -> configuration.partitionKey) + .withSerializer(serializer); + + return writeTransform; + } + } + + public static class ReadDataBuilder + implements ExternalTransformBuilder< + ReadDataBuilder.Configuration, PBegin, PCollection> { + + public static class Configuration extends CrossLanguageConfiguration { + private @Nullable Long maxNumRecords; + private @Nullable Duration maxReadTime; + private @Nullable InitialPositionInStream initialPositionInStream; + private @Nullable Instant initialTimestampInStream; + private @Nullable Integer requestRecordsLimit; + private @Nullable Duration upToDateThreshold; + private @Nullable Long maxCapacityPerShard; + private @Nullable WatermarkPolicy watermarkPolicy; + private @Nullable Duration watermarkIdleDurationThreshold; + private @Nullable Duration rateLimit; + + public void setMaxNumRecords(@Nullable Long maxNumRecords) { + this.maxNumRecords = maxNumRecords; + } + + public void setMaxReadTime(@Nullable Long maxReadTime) { + if (maxReadTime != null) { + this.maxReadTime = Duration.millis(maxReadTime); + } + } + + public void setInitialPositionInStream(@Nullable String initialPositionInStream) { + if (initialPositionInStream != null) { + this.initialPositionInStream = InitialPositionInStream.valueOf(initialPositionInStream); + } + } + + public void setInitialTimestampInStream(@Nullable Long initialTimestampInStream) { + if (initialTimestampInStream != null) { + this.initialTimestampInStream = Instant.ofEpochMilli(initialTimestampInStream); + } + } + + public void setRequestRecordsLimit(@Nullable Long requestRecordsLimit) { + if (requestRecordsLimit != null) { + this.requestRecordsLimit = requestRecordsLimit.intValue(); + } + } + + public void setUpToDateThreshold(@Nullable Long upToDateThreshold) { + if (upToDateThreshold != null) { + this.upToDateThreshold = Duration.millis(upToDateThreshold); + } + } + + public void setMaxCapacityPerShard(@Nullable Long maxCapacityPerShard) { + this.maxCapacityPerShard = maxCapacityPerShard; + } + + public void setWatermarkPolicy(@Nullable String watermarkPolicy) { + if (watermarkPolicy != null) { + this.watermarkPolicy = WatermarkPolicy.valueOf(watermarkPolicy); + } + } + + public void setWatermarkIdleDurationThreshold(@Nullable Long watermarkIdleDurationThreshold) { + if (watermarkIdleDurationThreshold != null) { + this.watermarkIdleDurationThreshold = Duration.millis(watermarkIdleDurationThreshold); + } + } + + public void setRateLimit(@Nullable Long rateLimit) { + if (rateLimit != null) { + this.rateLimit = Duration.millis(rateLimit); + } + } + } + + private enum WatermarkPolicy { + ARRIVAL_TIME, + PROCESSING_TIME + } + + @Override + public PTransform> buildExternal( + ReadDataBuilder.Configuration configuration) { + AwsBasicCredentials creds = + AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); + @Nullable URI endpoint = null; + if (configuration.serviceEndpoint != null) { + try { + endpoint = new URI(configuration.serviceEndpoint); + } catch (URISyntaxException ex) { + throw new RuntimeException( + String.format( + "Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); + } + } + KinesisIO.Read readTransform = + KinesisIO.read() + .withStreamName(configuration.streamName) + .withClientConfiguration( + ClientConfiguration.builder() + .credentialsProvider(provider) + .region(Region.of(configuration.region)) + .endpoint(endpoint) + .build()); + + if (configuration.maxNumRecords != null) { + readTransform = readTransform.withMaxNumRecords(configuration.maxNumRecords); + } + if (configuration.upToDateThreshold != null) { + readTransform = readTransform.withUpToDateThreshold(configuration.upToDateThreshold); + } + if (configuration.maxCapacityPerShard != null) { + readTransform = + readTransform.withMaxCapacityPerShard(configuration.maxCapacityPerShard.intValue()); + } + if (configuration.watermarkPolicy != null) { + switch (configuration.watermarkPolicy) { + case ARRIVAL_TIME: + readTransform = + configuration.watermarkIdleDurationThreshold != null + ? readTransform.withArrivalTimeWatermarkPolicy( + configuration.watermarkIdleDurationThreshold) + : readTransform.withArrivalTimeWatermarkPolicy(); + break; + case PROCESSING_TIME: + readTransform = readTransform.withProcessingTimeWatermarkPolicy(); + break; + default: + throw new RuntimeException( + String.format( + "Unsupported watermark policy type: %s", configuration.watermarkPolicy)); + } + } + if (configuration.rateLimit != null) { + readTransform = readTransform.withFixedDelayRateLimitPolicy(configuration.rateLimit); + } + if (configuration.maxReadTime != null) { + readTransform = readTransform.withMaxReadTime(configuration.maxReadTime); + } + if (configuration.initialPositionInStream != null) { + readTransform = + readTransform.withInitialPositionInStream(configuration.initialPositionInStream); + } + if (configuration.requestRecordsLimit != null) { + readTransform = readTransform.withRequestRecordsLimit(configuration.requestRecordsLimit); + } + if (configuration.initialTimestampInStream != null) { + readTransform = + readTransform.withInitialTimestampInStream(configuration.initialTimestampInStream); + } + + return new KinesisReadToBytes(readTransform); + } + } + + public static class KinesisReadToBytes extends PTransform> { + private KinesisIO.Read readTransform; + + private KinesisReadToBytes(KinesisIO.Read readTransform) { + this.readTransform = readTransform; + } + + @Override + public PCollection expand(PBegin input) { + // Convert back to bytes to keep consistency with previous verison: + // https://github.com/apache/beam/blob/5eed396caf9e0065d8ed82edcc236bad5b71ba22/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java + return input + .apply(this.readTransform) + .apply( + "Convert to bytes", + ParDo.of( + new DoFn() { + @ProcessElement + public byte[] processElement(ProcessContext c) { + KinesisRecord record = c.element(); + return record.getDataAsBytes(); + } + })); + } + } +} diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index 151d63d84684..d37ce1f87fe3 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -115,10 +115,7 @@ def run_kinesis_write(self): aws_secret_key=self.aws_secret_key, region=self.aws_region, service_endpoint=self.aws_service_endpoint, - verify_certificate=(not self.use_localstack), - partition_key='1', - producer_properties=self.producer_properties, - )) + partition_key='1')) def run_kinesis_read(self): records = [RECORD + str(i).encode() for i in range(NUM_RECORDS)] @@ -132,7 +129,6 @@ def run_kinesis_read(self): aws_secret_key=self.aws_secret_key, region=self.aws_region, service_endpoint=self.aws_service_endpoint, - verify_certificate=not self.use_localstack, max_num_records=NUM_RECORDS, max_read_time=MAX_READ_TIME, request_records_limit=REQUEST_RECORDS_LIMIT, @@ -219,10 +215,6 @@ def setUp(self): self.aws_service_endpoint = known_args.aws_service_endpoint self.use_localstack = not known_args.use_real_aws self.expansion_service = known_args.expansion_service - self.producer_properties = { - 'CollectionMaxCount': str(NUM_RECORDS), - 'ConnectTimeout': str(MAX_READ_TIME), - } if self.use_localstack: self.set_localstack() diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index bc5e1fa787b4..cdae53e71c5a 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -49,7 +49,8 @@ In this option, Python SDK will either download (for released Beam version) or build (when running from a Beam Git clone) a expansion service jar and use that to expand transforms. Currently Kinesis transforms use the - 'beam-sdks-java-io-kinesis-expansion-service' jar for this purpose. + 'beam-sdks-java-io-amazon-web-services2-expansion-service' jar for this + purpose. *Option 2: specify a custom expansion service* @@ -81,7 +82,6 @@ import logging import time -from typing import Mapping from typing import NamedTuple from typing import Optional @@ -99,7 +99,7 @@ def default_io_expansion_service(): return BeamJarExpansionService( - 'sdks:java:io:kinesis:expansion-service:shadowJar') + 'sdks:java:io:amazon-web-services2:expansion-service:shadowJar') WriteToKinesisSchema = NamedTuple( @@ -111,8 +111,6 @@ def default_io_expansion_service(): ('region', str), ('partition_key', str), ('service_endpoint', Optional[str]), - ('verify_certificate', Optional[bool]), - ('producer_properties', Optional[Mapping[str, str]]), ], ) @@ -123,7 +121,7 @@ class WriteToKinesis(ExternalTransform): Experimental; no backwards compatibility guarantees. """ - URN = 'beam:transform:org.apache.beam:kinesis_write:v1' + URN = 'beam:transform:org.apache.beam:kinesis_write:v2' def __init__( self, @@ -145,14 +143,30 @@ def __init__( :param aws_secret_key: Kinesis access key secret. :param region: AWS region. Example: 'us-east-1'. :param service_endpoint: Kinesis service endpoint - :param verify_certificate: Enable or disable certificate verification. - Never set to False on production. True by default. + :param verify_certificate: Deprecated - certificates will always be + verified. :param partition_key: Specify default partition key. :param producer_properties: Specify the configuration properties for Kinesis Producer Library (KPL) as dictionary. Example: {'CollectionMaxCount': '1000', 'ConnectTimeout': '10000'} :param expansion_service: The address (host:port) of the ExpansionService. """ + if verify_certificate is False: + # Previously, we supported this via + # https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate-- + # With the new AWS client, we no longer support it and it is always True + raise ValueError( + 'verify_certificate set to False. This option is no longer ' + + 'supported and certificate verification will still happen.') + if verify_certificate is True: + logging.warning( + 'verify_certificate set to True. This option is no longer ' + + 'supported and certificate verification will automatically ' + + 'happen. This option may be removed in a future release') + if producer_properties is not None: + raise ValueError( + 'producer_properties is no longer supported and will be removed ' + + 'in a future release.') super().__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -163,8 +177,6 @@ def __init__( region=region, partition_key=partition_key, service_endpoint=service_endpoint, - verify_certificate=verify_certificate, - producer_properties=producer_properties, )), expansion_service or default_io_expansion_service(), ) @@ -178,7 +190,6 @@ def __init__( ('aws_secret_key', str), ('region', str), ('service_endpoint', Optional[str]), - ('verify_certificate', Optional[bool]), ('max_num_records', Optional[int]), ('max_read_time', Optional[int]), ('initial_position_in_stream', Optional[str]), @@ -199,7 +210,7 @@ class ReadDataFromKinesis(ExternalTransform): Experimental; no backwards compatibility guarantees. """ - URN = 'beam:transform:org.apache.beam:kinesis_read_data:v1' + URN = 'beam:transform:org.apache.beam:kinesis_read_data:v2' def __init__( self, @@ -229,8 +240,8 @@ def __init__( :param aws_secret_key: Kinesis access key secret. :param region: AWS region. Example: 'us-east-1'. :param service_endpoint: Kinesis service endpoint - :param verify_certificate: Enable or disable certificate verification. - Never set to False on production. True by default. + :param verify_certificate: Deprecated - certificates will always be + verified. :param max_num_records: Specifies to read at most a given number of records. Must be greater than 0. :param max_read_time: Specifies to read records during x milliseconds. @@ -277,6 +288,19 @@ def __init__( ): logging.warning('Provided timestamp emplaced not in the past.') + if verify_certificate is False: + # Previously, we supported this via + # https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate-- + # With the new AWS client, we no longer support it and it is always True + raise ValueError( + 'verify_certificate set to False. This option is no longer ' + + 'supported and certificate verification will still happen.') + if verify_certificate is True: + logging.warning( + 'verify_certificate set to True. This option is no longer ' + + 'supported and certificate verification will automatically ' + + 'happen. This option may be removed in a future release') + super().__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -286,7 +310,6 @@ def __init__( aws_secret_key=aws_secret_key, region=region, service_endpoint=service_endpoint, - verify_certificate=verify_certificate, max_num_records=max_num_records, max_read_time=max_read_time, initial_position_in_stream=initial_position_in_stream, diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index be87be749862..99b477b2c7db 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -376,7 +376,7 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { ':sdks:java:testing:kafka-service:buildTestKafkaServiceJar', ':sdks:java:io:expansion-service:shadowJar', ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar', - ':sdks:java:io:kinesis:expansion-service:shadowJar', + ':sdks:java:io:amazon-web-services2:expansion-service:shadowJar', ':sdks:java:extensions:schemaio-expansion-service:shadowJar', ':sdks:java:io:debezium:expansion-service:shadowJar' ] @@ -426,7 +426,7 @@ project.tasks.register("xlangSpannerIOIT") { ":sdks:java:container:${currentJavaVersion}:docker", ':sdks:java:io:expansion-service:shadowJar', ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar', - ':sdks:java:io:kinesis:expansion-service:shadowJar', + ':sdks:java:io:amazon-web-services2:expansion-service:shadowJar', ':sdks:java:extensions:schemaio-expansion-service:shadowJar', ':sdks:java:io:debezium:expansion-service:shadowJar' ] diff --git a/settings.gradle.kts b/settings.gradle.kts index a8bee45a05ac..624e9f970d9d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -206,6 +206,7 @@ include(":sdks:java:harness") include(":sdks:java:harness:jmh") include(":sdks:java:io:amazon-web-services") include(":sdks:java:io:amazon-web-services2") +include(":sdks:java:io:amazon-web-services2:expansion-service") include(":sdks:java:io:amqp") include(":sdks:java:io:azure") include(":sdks:java:io:azure-cosmos")