From 3397d8554b09ebc76739d8d1c38bbe6556370114 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 19 Dec 2024 10:14:03 -0500 Subject: [PATCH] fix --- .../kinesis/KinesisTransformRegistrar.java | 88 +++++++++++++------ 1 file changed, 59 insertions(+), 29 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 5983f9a665ec..5f11c9d21461 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -17,29 +17,31 @@ */ package org.apache.beam.sdk.io.aws2.kinesis; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.kinesis.common.InitialPositionInStream; import com.google.auto.service.AutoService; import java.net.URI; +import java.net.URISyntaxException; import java.util.Map; -import java.util.Properties; import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; -import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.kinesis.common.InitialPositionInStream; /** - * Exposes {@link KinesisIO.Write} and {@link KinesisIO.Read} as an external transform for - * cross-language usage. + * Exposes {@link org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Write} and {@link + * org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read} as an external transform for cross-language + * usage. */ @AutoService(ExternalTransformRegistrar.class) @SuppressWarnings({ @@ -79,13 +81,19 @@ public void setRegion(String region) { public void setServiceEndpoint(@Nullable String serviceEndpoint) { if (serviceEndpoint != null) { - this.serviceEndpoint = new URI(serviceEndpoint); + try { + this.serviceEndpoint = new URI(serviceEndpoint); + } catch (URISyntaxException ex) { + throw new RuntimeException( + String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); + } } } } public static class WriteBuilder - implements ExternalTransformBuilder, KinesisIO.Write.Result> { + implements ExternalTransformBuilder< + WriteBuilder.Configuration, PCollection, KinesisIO.Write.Result> { public static class Configuration extends CrossLanguageConfiguration { private String partitionKey; @@ -96,18 +104,20 @@ public void setPartitionKey(String partitionKey) { } @Override - public PTransform, KinesisIO.Write.Result> buildExternal(Configuration configuration) { - AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + public PTransform, KinesisIO.Write.Result> buildExternal( + Configuration configuration) { + AwsBasicCredentials creds = + AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); KinesisIO.Write writeTransform = KinesisIO.write() .withStreamName(configuration.streamName) .withClientConfiguration( ClientConfiguration.builder() - .credentialsProvider(provider) - .region(configuration.region) - .endpoint(configuration.serviceEndpoint) - .build()) + .credentialsProvider(provider) + .region(configuration.region) + .endpoint(configuration.serviceEndpoint) + .build()) .withPartitioner(p -> configuration.partitionKey); return writeTransform; @@ -195,17 +205,18 @@ private enum WatermarkPolicy { @Override public PTransform> buildExternal( ReadDataBuilder.Configuration configuration) { - AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + AwsBasicCredentials creds = + AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); KinesisIO.Read readTransform = KinesisIO.read() .withStreamName(configuration.streamName) .withClientConfiguration( ClientConfiguration.builder() - .credentialsProvider(provider) - .region(configuration.region) - .endpoint(configuration.serviceEndpoint) - .build()); + .credentialsProvider(provider) + .region(configuration.region) + .endpoint(configuration.serviceEndpoint) + .build()); if (configuration.maxNumRecords != null) { readTransform = readTransform.withMaxNumRecords(configuration.maxNumRecords); @@ -252,15 +263,34 @@ public PTransform> buildExternal( readTransform = readTransform.withInitialTimestampInStream(configuration.initialTimestampInStream); } + + return new KinesisReadToBytes(readTransform); + } + } + + public static class KinesisReadToBytes extends PTransform> { + private KinesisIO.Read readTransform; + + private KinesisReadToBytes(KinesisIO.Read readTransform) { + this.readTransform = readTransform; + } + + @Override + public PCollection expand(PBegin input) { // Convert back to bytes to keep consistency with previous verison: // https://github.com/apache/beam/blob/5eed396caf9e0065d8ed82edcc236bad5b71ba22/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java - return readTransform.apply("Convert to bytes", ParDo.of(new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - KinesisRecord record = c.element(); - return record.getDataAsBytes(); - } - })); + return input + .apply(this.readTransform) + .apply( + "Convert to bytes", + ParDo.of( + new DoFn() { + @ProcessElement + public byte[] processElement(ProcessContext c) { + KinesisRecord record = c.element(); + return record.getDataAsBytes(); + } + })); } } -} \ No newline at end of file +}