From a9eed7d7c2b7950d818b7d507b163b53071b45f8 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Thu, 19 Dec 2024 20:57:04 +0000 Subject: [PATCH] Add serializer --- .../beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 5f11c9d2146..f3a5e261ff7 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -109,6 +109,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); + SerializableFunction serializer = v -> v; KinesisIO.Write writeTransform = KinesisIO.write() .withStreamName(configuration.streamName) @@ -118,7 +119,8 @@ public PTransform, KinesisIO.Write.Result> buildExternal( .region(configuration.region) .endpoint(configuration.serviceEndpoint) .build()) - .withPartitioner(p -> configuration.partitionKey); + .withPartitioner(p -> configuration.partitionKey) + .withSerializer(serializer); return writeTransform; }