Skip to content

Commit

Permalink
Update synthetic data publisher to aws v2
Browse files Browse the repository at this point in the history
  • Loading branch information
damccorm committed Dec 17, 2024
1 parent e73aa88 commit aebd91c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 10 deletions.
10 changes: 8 additions & 2 deletions sdks/java/testing/load-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ configurations {
gradleRun
}

def excludeNetty = {
exclude group: "io.netty", module: "*" // exclude more recent Netty version
}

dependencies {
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)

Expand All @@ -73,8 +77,10 @@ dependencies {
implementation project(":sdks:java:testing:test-utils")
implementation project(":sdks:java:io:google-cloud-platform")
implementation project(":sdks:java:io:kafka")
implementation project(":sdks:java:io:kinesis")
implementation library.java.aws_java_sdk_core
implementation project(":sdks:java:io:amazon-web-services2")
implementation library.java.aws_java_sdk2_kinesis, excludeNetty
implementation library.java.aws_java_sdk2_auth, excludeNetty
implementation library.java.aws_java_sdk2_regions, excludeNetty
implementation library.java.google_cloud_core
implementation library.java.joda_time
implementation library.java.vendored_guava_32_1_2_jre
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;

import com.amazonaws.regions.Regions;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -30,10 +29,11 @@
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kinesis.KinesisIO;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
Expand All @@ -47,6 +47,9 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringSerializer;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;

/**
* Pipeline that generates synthetic data and publishes it in a PubSub or Kafka topic or in a
Expand Down Expand Up @@ -180,17 +183,27 @@ private static void writeToKafka(PCollection<KV<byte[], byte[]>> collection) {
}

private static void writeToKinesis(PCollection<KV<byte[], byte[]>> collection) {
AwsBasicCredentials creds =
AwsBasicCredentials.create(options.getKinesisAwsKey(), options.getKinesisAwsSecret());
StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds);
collection
.apply("Map to byte array for Kinesis", MapElements.via(new MapKVToByteArray()))
.apply(
"Write to Kinesis",
KinesisIO.write()
KinesisIO.<byte[]>write()
.withStreamName(options.getKinesisStreamName())
.withPartitionKey(options.getKinesisPartitionKey())
.withAWSClientsProvider(
options.getKinesisAwsKey(),
options.getKinesisAwsSecret(),
Regions.fromName(options.getKinesisAwsRegion())));
.withPartitioner(p -> options.getKinesisPartitionKey())
// .withPartitionKey(options.getKinesisPartitionKey())
// .withAWSClientsProvider(
// options.getKinesisAwsKey(),
// options.getKinesisAwsSecret(),
// Regions.fromName(options.getKinesisAwsRegion())
// ));
.withClientConfiguration(
ClientConfiguration.builder()
.credentialsProvider(provider)
.region(Region.of(options.getKinesisAwsRegion()))
.build()));
}

private static class MapKVToString extends SimpleFunction<KV<byte[], byte[]>, String> {
Expand Down

0 comments on commit aebd91c

Please sign in to comment.