From dd7182d09680efdff1e9b6cc35a5b2a3e099f6de Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 21 May 2024 13:11:54 -0400 Subject: [PATCH 01/11] kafka schematransform translation and tests --- .../workflows/IO_Kafka_Integration_Tests.yml | 0 .../kafka_IO_integration.txt | 0 .../KafkaReadSchemaTransformProvider.java | 250 ++++++++++-------- .../KafkaSchemaTransformTranslation.java | 93 +++++++ .../KafkaWriteSchemaTransformProvider.java | 15 ++ .../KafkaSchemaTransformTranslationTest.java | 241 +++++++++++++++++ 6 files changed, 492 insertions(+), 107 deletions(-) create mode 100644 .github/workflows/IO_Kafka_Integration_Tests.yml create mode 100644 .github/workflows/performance-tests-pipeline-options/kafka_IO_integration.txt create mode 100644 sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslation.java create mode 100644 sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java diff --git a/.github/workflows/IO_Kafka_Integration_Tests.yml b/.github/workflows/IO_Kafka_Integration_Tests.yml new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/.github/workflows/performance-tests-pipeline-options/kafka_IO_integration.txt b/.github/workflows/performance-tests-pipeline-options/kafka_IO_integration.txt new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index 13240ea9dc40..f1eb281def7f 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.kafka; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + import com.google.auto.service.AutoService; import java.io.FileOutputStream; import java.io.IOException; @@ -38,7 +40,9 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.transforms.Convert; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; @@ -99,113 +103,7 @@ protected Class configurationClass() { }) @Override protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configuration) { - configuration.validate(); - - final String inputSchema = configuration.getSchema(); - final int groupId = configuration.hashCode() % Integer.MAX_VALUE; - final String autoOffsetReset = - MoreObjects.firstNonNull(configuration.getAutoOffsetResetConfig(), "latest"); - - Map consumerConfigs = - new HashMap<>( - MoreObjects.firstNonNull(configuration.getConsumerConfigUpdates(), new HashMap<>())); - consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-read-provider-" + groupId); - consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); - consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); - consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); - - String format = configuration.getFormat(); - boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling()); - - SerializableFunction valueMapper; - Schema beamSchema; - - String confluentSchemaRegUrl = configuration.getConfluentSchemaRegistryUrl(); - if (confluentSchemaRegUrl != null) { - return new SchemaTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - final String confluentSchemaRegSubject = - configuration.getConfluentSchemaRegistrySubject(); - KafkaIO.Read kafkaRead = - KafkaIO.read() - .withTopic(configuration.getTopic()) - .withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()) - .withBootstrapServers(configuration.getBootstrapServers()) - .withConsumerConfigUpdates(consumerConfigs) - .withKeyDeserializer(ByteArrayDeserializer.class) - .withValueDeserializer( - ConfluentSchemaRegistryDeserializerProvider.of( - confluentSchemaRegUrl, confluentSchemaRegSubject)); - if (isTest) { - kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs)); - } - - PCollection kafkaValues = - input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create()); - - assert kafkaValues.getCoder().getClass() == AvroCoder.class; - AvroCoder coder = (AvroCoder) kafkaValues.getCoder(); - kafkaValues = kafkaValues.setCoder(AvroUtils.schemaCoder(coder.getSchema())); - return PCollectionRowTuple.of("output", kafkaValues.apply(Convert.toRows())); - } - }; - } - if ("RAW".equals(format)) { - beamSchema = Schema.builder().addField("payload", Schema.FieldType.BYTES).build(); - valueMapper = getRawBytesToRowFunction(beamSchema); - } else if ("PROTO".equals(format)) { - String fileDescriptorPath = configuration.getFileDescriptorPath(); - String messageName = configuration.getMessageName(); - if (fileDescriptorPath != null) { - beamSchema = ProtoByteUtils.getBeamSchemaFromProto(fileDescriptorPath, messageName); - valueMapper = ProtoByteUtils.getProtoBytesToRowFunction(fileDescriptorPath, messageName); - } else { - beamSchema = ProtoByteUtils.getBeamSchemaFromProtoSchema(inputSchema, messageName); - valueMapper = ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(inputSchema, messageName); - } - } else if ("JSON".equals(format)) { - beamSchema = JsonUtils.beamSchemaFromJsonSchema(inputSchema); - valueMapper = JsonUtils.getJsonBytesToRowFunction(beamSchema); - } else { - beamSchema = AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema)); - valueMapper = AvroUtils.getAvroBytesToRowFunction(beamSchema); - } - - return new SchemaTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - KafkaIO.Read kafkaRead = - KafkaIO.readBytes() - .withConsumerConfigUpdates(consumerConfigs) - .withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()) - .withTopic(configuration.getTopic()) - .withBootstrapServers(configuration.getBootstrapServers()); - if (isTest) { - kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs)); - } - - PCollection kafkaValues = - input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create()); - - Schema errorSchema = ErrorHandling.errorSchemaBytes(); - PCollectionTuple outputTuple = - kafkaValues.apply( - ParDo.of( - new ErrorFn( - "Kafka-read-error-counter", valueMapper, errorSchema, handleErrors)) - .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); - - PCollectionRowTuple outputRows = - PCollectionRowTuple.of("output", outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema)); - - PCollection errorOutput = outputTuple.get(ERROR_TAG).setRowSchema(errorSchema); - if (handleErrors) { - outputRows = outputRows.and(configuration.getErrorHandling().getOutput(), errorOutput); - } - return outputRows; - } - }; + return new KafkaReadSchemaTransform(configuration, isTest, testTimeoutSecs); } public static SerializableFunction getRawBytesToRowFunction(Schema rawSchema) { @@ -232,6 +130,144 @@ public List outputCollectionNames() { return Arrays.asList("output", "errors"); } + static class KafkaReadSchemaTransform extends SchemaTransform { + private final KafkaReadSchemaTransformConfiguration configuration; + final Boolean isTest; + final Integer testTimeoutSecs; + + KafkaReadSchemaTransform( + KafkaReadSchemaTransformConfiguration configuration, + Boolean isTest, + Integer testTimeoutSecs) { + this.configuration = configuration; + this.isTest = isTest; + this.testTimeoutSecs = testTimeoutSecs; + } + + Row getConfigurationRow() { + try { + // To stay consistent with our SchemaTransform configuration naming conventions, + // we sort lexicographically + return SchemaRegistry.createDefault() + .getToRowFunction(KafkaReadSchemaTransformConfiguration.class) + .apply(configuration) + .sorted(); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + configuration.validate(); + + final String inputSchema = configuration.getSchema(); + final int groupId = configuration.hashCode() % Integer.MAX_VALUE; + final String autoOffsetReset = + MoreObjects.firstNonNull(configuration.getAutoOffsetResetConfig(), "latest"); + + Map consumerConfigs = + new HashMap<>( + MoreObjects.firstNonNull(configuration.getConsumerConfigUpdates(), new HashMap<>())); + consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-read-provider-" + groupId); + consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); + consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + + String format = configuration.getFormat(); + boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling()); + + SerializableFunction valueMapper; + Schema beamSchema; + + String confluentSchemaRegUrl = configuration.getConfluentSchemaRegistryUrl(); + if (confluentSchemaRegUrl != null) { + final String confluentSchemaRegSubject = + checkArgumentNotNull(configuration.getConfluentSchemaRegistrySubject()); + KafkaIO.Read kafkaRead = + KafkaIO.read() + .withTopic(configuration.getTopic()) + .withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()) + .withBootstrapServers(configuration.getBootstrapServers()) + .withConsumerConfigUpdates(consumerConfigs) + .withKeyDeserializer(ByteArrayDeserializer.class) + .withValueDeserializer( + ConfluentSchemaRegistryDeserializerProvider.of( + confluentSchemaRegUrl, confluentSchemaRegSubject)); + if (isTest) { + kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs)); + } + + PCollection kafkaValues = + input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create()); + + assert kafkaValues.getCoder().getClass() == AvroCoder.class; + AvroCoder coder = (AvroCoder) kafkaValues.getCoder(); + kafkaValues = kafkaValues.setCoder(AvroUtils.schemaCoder(coder.getSchema())); + return PCollectionRowTuple.of("output", kafkaValues.apply(Convert.toRows())); + } + + if ("RAW".equals(format)) { + beamSchema = Schema.builder().addField("payload", Schema.FieldType.BYTES).build(); + valueMapper = getRawBytesToRowFunction(beamSchema); + } else if ("PROTO".equals(format)) { + String fileDescriptorPath = checkArgumentNotNull(configuration.getFileDescriptorPath()); + String messageName = checkArgumentNotNull(configuration.getMessageName()); + if (fileDescriptorPath != null) { + beamSchema = ProtoByteUtils.getBeamSchemaFromProto(fileDescriptorPath, messageName); + valueMapper = ProtoByteUtils.getProtoBytesToRowFunction(fileDescriptorPath, messageName); + } else { + beamSchema = + ProtoByteUtils.getBeamSchemaFromProtoSchema( + checkArgumentNotNull(inputSchema), messageName); + valueMapper = + ProtoByteUtils.getProtoBytesToRowFromSchemaFunction( + checkArgumentNotNull(inputSchema), messageName); + } + } else if ("JSON".equals(format)) { + beamSchema = JsonUtils.beamSchemaFromJsonSchema(checkArgumentNotNull(inputSchema)); + valueMapper = JsonUtils.getJsonBytesToRowFunction(beamSchema); + } else { + beamSchema = + AvroUtils.toBeamSchema( + new org.apache.avro.Schema.Parser().parse(checkArgumentNotNull(inputSchema))); + valueMapper = AvroUtils.getAvroBytesToRowFunction(beamSchema); + } + + KafkaIO.Read kafkaRead = + KafkaIO.readBytes() + .withConsumerConfigUpdates(consumerConfigs) + .withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()) + .withTopic(configuration.getTopic()) + .withBootstrapServers(configuration.getBootstrapServers()); + if (isTest) { + kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs)); + } + + PCollection kafkaValues = + input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create()); + + Schema errorSchema = ErrorHandling.errorSchemaBytes(); + PCollectionTuple outputTuple = + kafkaValues.apply( + ParDo.of( + new ErrorFn( + "Kafka-read-error-counter", valueMapper, errorSchema, handleErrors)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + PCollectionRowTuple outputRows = + PCollectionRowTuple.of("output", outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema)); + + PCollection errorOutput = outputTuple.get(ERROR_TAG).setRowSchema(errorSchema); + if (handleErrors) { + outputRows = + outputRows.and( + checkArgumentNotNull(configuration.getErrorHandling()).getOutput(), errorOutput); + } + return outputRows; + } + } + public static class ErrorFn extends DoFn { private final SerializableFunction valueMapper; private final Counter errorCounter; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslation.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslation.java new file mode 100644 index 000000000000..4b83e2b6f558 --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslation.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider.KafkaReadSchemaTransform; +import static org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransform; +import static org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator; + +import com.google.auto.service.AutoService; +import java.util.Map; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.PTransformTranslation; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +public class KafkaSchemaTransformTranslation { + static class KafkaReadSchemaTransformTranslator + extends SchemaTransformPayloadTranslator { + @Override + public SchemaTransformProvider provider() { + return new KafkaReadSchemaTransformProvider(); + } + + @Override + public Row toConfigRow(KafkaReadSchemaTransform transform) { + return transform.getConfigurationRow(); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map< + ? extends Class, + ? extends PTransformTranslation.TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap + ., PTransformTranslation.TransformPayloadTranslator>builder() + .put(KafkaReadSchemaTransform.class, new KafkaReadSchemaTransformTranslator()) + .build(); + } + } + + static class KafkaWriteSchemaTransformTranslator + extends SchemaTransformPayloadTranslator { + @Override + public SchemaTransformProvider provider() { + return new KafkaWriteSchemaTransformProvider(); + } + + @Override + public Row toConfigRow(KafkaWriteSchemaTransform transform) { + return transform.getConfigurationRow(); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map< + ? extends Class, + ? extends PTransformTranslation.TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap + ., PTransformTranslation.TransformPayloadTranslator>builder() + .put(KafkaWriteSchemaTransform.class, new KafkaWriteSchemaTransformTranslator()) + .build(); + } + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java index 26f37b790ef8..68d7b1952ee4 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java @@ -31,7 +31,9 @@ import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; @@ -99,6 +101,19 @@ static final class KafkaWriteSchemaTransform extends SchemaTransform implements this.configuration = configuration; } + Row getConfigurationRow() { + try { + // To stay consistent with our SchemaTransform configuration naming conventions, + // we sort lexicographically + return SchemaRegistry.createDefault() + .getToRowFunction(KafkaWriteSchemaTransformConfiguration.class) + .apply(configuration) + .sorted(); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } + public static class ErrorCounterFn extends DoFn> { private final SerializableFunction toBytesFn; private final Counter errorCounter; diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java new file mode 100644 index 000000000000..0a2c052612f2 --- /dev/null +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; +import static org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider.KafkaReadSchemaTransform; +import static org.apache.beam.sdk.io.kafka.KafkaSchemaTransformTranslation.KafkaReadSchemaTransformTranslator; +import static org.apache.beam.sdk.io.kafka.KafkaSchemaTransformTranslation.KafkaWriteSchemaTransformTranslator; +import static org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransform; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.construction.BeamUrns; +import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; + +public class KafkaSchemaTransformTranslationTest { + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + static final KafkaWriteSchemaTransformProvider WRITE_PROVIDER = + new KafkaWriteSchemaTransformProvider(); + static final KafkaReadSchemaTransformProvider READ_PROVIDER = + new KafkaReadSchemaTransformProvider(); + + @Test + public void testRecreateWriteTransformFromRow() { + Row transformConfigRow = + Row.withSchema(WRITE_PROVIDER.configurationSchema()) + .withFieldValue("format", "RAW") + .withFieldValue("topic", "test_topic") + .withFieldValue("bootstrapServers", "host:port") + .withFieldValue("producerConfigUpdates", ImmutableMap.builder().build()) + .withFieldValue("errorHandling", null) + .withFieldValue("fileDescriptorPath", "testPath") + .withFieldValue("messageName", "test_message") + .withFieldValue("schema", "test_schema") + .build(); + KafkaWriteSchemaTransform writeTransform = + (KafkaWriteSchemaTransform) WRITE_PROVIDER.from(transformConfigRow); + + KafkaWriteSchemaTransformTranslator translator = new KafkaWriteSchemaTransformTranslator(); + Row translatedRow = translator.toConfigRow(writeTransform); + + KafkaWriteSchemaTransform writeTransformFromRow = + translator.fromConfigRow(translatedRow, PipelineOptionsFactory.create()); + + assertEquals(transformConfigRow, writeTransformFromRow.getConfigurationRow()); + } + + @Test + public void testWriteTransformProtoTranslation() + throws InvalidProtocolBufferException, IOException { + // First build a pipeline + Pipeline p = Pipeline.create(); + Schema inputSchema = Schema.builder().addByteArrayField("b").build(); + PCollection input = + p.apply( + Create.of( + Collections.singletonList( + Row.withSchema(inputSchema).addValue(new byte[] {1, 2, 3}).build()))) + .setRowSchema(inputSchema); + + Row transformConfigRow = + Row.withSchema(WRITE_PROVIDER.configurationSchema()) + .withFieldValue("format", "RAW") + .withFieldValue("topic", "test_topic") + .withFieldValue("bootstrapServers", "host:port") + .withFieldValue("producerConfigUpdates", ImmutableMap.builder().build()) + .withFieldValue("errorHandling", null) + .withFieldValue("fileDescriptorPath", "testPath") + .withFieldValue("messageName", "test_message") + .withFieldValue("schema", "test_schema") + .build(); + + KafkaWriteSchemaTransform writeTransform = + (KafkaWriteSchemaTransform) WRITE_PROVIDER.from(transformConfigRow); + PCollectionRowTuple.of("input", input).apply(writeTransform); + + // Then translate the pipeline to a proto and extract IcebergWriteSchemaTransform proto + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + List writeTransformProto = + pipelineProto.getComponents().getTransformsMap().values().stream() + .filter( + tr -> { + RunnerApi.FunctionSpec spec = tr.getSpec(); + try { + return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) + && SchemaTransformPayload.parseFrom(spec.getPayload()) + .getIdentifier() + .equals(WRITE_PROVIDER.identifier()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertEquals(1, writeTransformProto.size()); + RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec(); + + // Check that the proto contains correct values + SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); + Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec); + Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); + + assertEquals(transformConfigRow, rowFromSpec); + + // Use the information in the proto to recreate the IcebergWriteSchemaTransform + KafkaWriteSchemaTransformTranslator translator = new KafkaWriteSchemaTransformTranslator(); + KafkaWriteSchemaTransform writeTransformFromSpec = + translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); + + assertEquals(transformConfigRow, writeTransformFromSpec.getConfigurationRow()); + } + + @Test + public void testReCreateReadTransformFromRow() { + // setting a subset of fields here. + Row transformConfigRow = + Row.withSchema(READ_PROVIDER.configurationSchema()) + .withFieldValue("format", "RAW") + .withFieldValue("topic", "test_topic") + .withFieldValue("bootstrapServers", "host:port") + .withFieldValue("confluentSchemaRegistryUrl", "test_url") + .withFieldValue("confluentSchemaRegistrySubject", "test_subject") + .withFieldValue("schema", "test_schema") + .withFieldValue("fileDescriptorPath", "testPath") + .withFieldValue("messageName", "test_message") + .withFieldValue("autoOffsetResetConfig", "earliest") + .withFieldValue("consumerConfigUpdates", ImmutableMap.builder().build()) + .withFieldValue("errorHandling", null) + .build(); + + KafkaReadSchemaTransform readTransform = + (KafkaReadSchemaTransform) READ_PROVIDER.from(transformConfigRow); + + KafkaReadSchemaTransformTranslator translator = new KafkaReadSchemaTransformTranslator(); + Row row = translator.toConfigRow(readTransform); + + KafkaReadSchemaTransform readTransformFromRow = + translator.fromConfigRow(row, PipelineOptionsFactory.create()); + + assertEquals(transformConfigRow, readTransformFromRow.getConfigurationRow()); + } + + @Test + public void testReadTransformProtoTranslation() + throws InvalidProtocolBufferException, IOException { + // First build a pipeline + Pipeline p = Pipeline.create(); + Row transformConfigRow = + Row.withSchema(READ_PROVIDER.configurationSchema()) + .withFieldValue("format", "RAW") + .withFieldValue("topic", "test_topic") + .withFieldValue("bootstrapServers", "host:port") + .withFieldValue("confluentSchemaRegistryUrl", null) + .withFieldValue("confluentSchemaRegistrySubject", null) + .withFieldValue("schema", null) + .withFieldValue("fileDescriptorPath", "testPath") + .withFieldValue("messageName", "test_message") + .withFieldValue("autoOffsetResetConfig", "earliest") + .withFieldValue("consumerConfigUpdates", ImmutableMap.builder().build()) + .withFieldValue("errorHandling", null) + .build(); + + KafkaReadSchemaTransform readTransform = + (KafkaReadSchemaTransform) READ_PROVIDER.from(transformConfigRow); + + PCollectionRowTuple.empty(p).apply(readTransform); + + // Then translate the pipeline to a proto and extract IcebergReadSchemaTransform proto + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + List readTransformProto = + pipelineProto.getComponents().getTransformsMap().values().stream() + .filter( + tr -> { + RunnerApi.FunctionSpec spec = tr.getSpec(); + try { + return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) + && SchemaTransformPayload.parseFrom(spec.getPayload()) + .getIdentifier() + .equals(READ_PROVIDER.identifier()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertEquals(1, readTransformProto.size()); + RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec(); + + // Check that the proto contains correct values + SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); + Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec); + Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); + assertEquals(transformConfigRow, rowFromSpec); + + // Use the information in the proto to recreate the IcebergReadSchemaTransform + KafkaReadSchemaTransformTranslator translator = new KafkaReadSchemaTransformTranslator(); + KafkaReadSchemaTransform readTransformFromSpec = + translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); + + assertEquals(transformConfigRow, readTransformFromSpec.getConfigurationRow()); + } +} From 2dd733b29aded25c4959ab89e4f4d9f6a096250c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 21 May 2024 16:32:11 -0400 Subject: [PATCH 02/11] cleanup --- .github/workflows/IO_Kafka_Integration_Tests.yml | 0 .../performance-tests-pipeline-options/kafka_IO_integration.txt | 0 2 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 .github/workflows/IO_Kafka_Integration_Tests.yml delete mode 100644 .github/workflows/performance-tests-pipeline-options/kafka_IO_integration.txt diff --git a/.github/workflows/IO_Kafka_Integration_Tests.yml b/.github/workflows/IO_Kafka_Integration_Tests.yml deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/.github/workflows/performance-tests-pipeline-options/kafka_IO_integration.txt b/.github/workflows/performance-tests-pipeline-options/kafka_IO_integration.txt deleted file mode 100644 index e69de29bb2d1..000000000000 From 0c9f914e898f488bf07061b53c4b018841454bb8 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 21 May 2024 16:51:26 -0400 Subject: [PATCH 03/11] spotless --- .../beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java index 0a2c052612f2..b899408ceeee 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java @@ -42,12 +42,12 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; -import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; public class KafkaSchemaTransformTranslationTest { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); From 9180d86ce459e3db6b3df5135fe92ad8b7e4df16 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 21 May 2024 18:22:31 -0400 Subject: [PATCH 04/11] address failing tests --- .../KafkaReadSchemaTransformProvider.java | 2 +- .../KafkaReadSchemaTransformProviderTest.java | 45 ++++++++++--------- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index f1eb281def7f..d80248cb509c 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -211,7 +211,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { beamSchema = Schema.builder().addField("payload", Schema.FieldType.BYTES).build(); valueMapper = getRawBytesToRowFunction(beamSchema); } else if ("PROTO".equals(format)) { - String fileDescriptorPath = checkArgumentNotNull(configuration.getFileDescriptorPath()); + String fileDescriptorPath = configuration.getFileDescriptorPath(); String messageName = checkArgumentNotNull(configuration.getMessageName()); if (fileDescriptorPath != null) { beamSchema = ProtoByteUtils.getBeamSchemaFromProto(fileDescriptorPath, messageName); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index d5962a737baf..40291d84e1f8 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.managed.ManagedTransformConstants; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PCollectionRowTuple; @@ -232,22 +233,23 @@ public void testBuildTransformWithProtoFormatWrongMessageName() { .collect(Collectors.toList()); KafkaReadSchemaTransformProvider kafkaProvider = (KafkaReadSchemaTransformProvider) providers.get(0); + SchemaTransform transform = + kafkaProvider.from( + KafkaReadSchemaTransformConfiguration.builder() + .setTopic("anytopic") + .setBootstrapServers("anybootstrap") + .setFormat("PROTO") + .setMessageName("MyOtherMessage") + .setFileDescriptorPath( + Objects.requireNonNull( + getClass() + .getResource("/proto_byte/file_descriptor/proto_byte_utils.pb")) + .getPath()) + .build()); assertThrows( NullPointerException.class, - () -> - kafkaProvider.from( - KafkaReadSchemaTransformConfiguration.builder() - .setTopic("anytopic") - .setBootstrapServers("anybootstrap") - .setFormat("PROTO") - .setMessageName("MyOtherMessage") - .setFileDescriptorPath( - Objects.requireNonNull( - getClass() - .getResource("/proto_byte/file_descriptor/proto_byte_utils.pb")) - .getPath()) - .build())); + () -> transform.expand(PCollectionRowTuple.empty(Pipeline.create()))); } @Test @@ -281,17 +283,18 @@ public void testBuildTransformWithoutProtoSchemaFormat() { .collect(Collectors.toList()); KafkaReadSchemaTransformProvider kafkaProvider = (KafkaReadSchemaTransformProvider) providers.get(0); + SchemaTransform transform = + kafkaProvider.from( + KafkaReadSchemaTransformConfiguration.builder() + .setTopic("anytopic") + .setBootstrapServers("anybootstrap") + .setFormat("PROTO") + .setMessageName("MyMessage") + .build()); assertThrows( IllegalArgumentException.class, - () -> - kafkaProvider.from( - KafkaReadSchemaTransformConfiguration.builder() - .setTopic("anytopic") - .setBootstrapServers("anybootstrap") - .setFormat("PROTO") - .setMessageName("MyMessage") - .build())); + () -> transform.expand(PCollectionRowTuple.empty(Pipeline.create()))); } @Test From 7f572c80c6f717c46a30878806a4c9c70868ede2 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 31 May 2024 23:35:54 -0400 Subject: [PATCH 05/11] switch existing schematransform tests to use Managed API --- ...KafkaReadSchemaTransformConfiguration.java | 6 +++ .../KafkaReadSchemaTransformProvider.java | 37 +++++-------------- .../apache/beam/sdk/io/kafka/KafkaIOIT.java | 35 ++++++++++-------- .../KafkaReadSchemaTransformProviderTest.java | 3 +- .../managed/ManagedTransformConstants.java | 1 + 5 files changed, 38 insertions(+), 44 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java index 13f5249a6c3b..693c1371f78c 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java @@ -149,6 +149,10 @@ public static Builder builder() { /** Sets the topic from which to read. */ public abstract String getTopic(); + @SchemaFieldDescription("Upper bound of how long to read from Kafka.") + @Nullable + public abstract Integer getMaxReadTimeSeconds(); + @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.") @Nullable public abstract ErrorHandling getErrorHandling(); @@ -179,6 +183,8 @@ public abstract static class Builder { /** Sets the topic from which to read. */ public abstract Builder setTopic(String value); + public abstract Builder setMaxReadTimeSeconds(Integer maxReadTimeSeconds); + public abstract Builder setErrorHandling(ErrorHandling errorHandling); /** Builds a {@link KafkaReadSchemaTransformConfiguration} instance. */ diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index d80248cb509c..b0dba73e56d0 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -60,7 +60,6 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -80,19 +79,6 @@ public class KafkaReadSchemaTransformProvider public static final TupleTag OUTPUT_TAG = new TupleTag() {}; public static final TupleTag ERROR_TAG = new TupleTag() {}; - final Boolean isTest; - final Integer testTimeoutSecs; - - public KafkaReadSchemaTransformProvider() { - this(false, 0); - } - - @VisibleForTesting - KafkaReadSchemaTransformProvider(Boolean isTest, Integer testTimeoutSecs) { - this.isTest = isTest; - this.testTimeoutSecs = testTimeoutSecs; - } - @Override protected Class configurationClass() { return KafkaReadSchemaTransformConfiguration.class; @@ -103,7 +89,7 @@ protected Class configurationClass() { }) @Override protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configuration) { - return new KafkaReadSchemaTransform(configuration, isTest, testTimeoutSecs); + return new KafkaReadSchemaTransform(configuration); } public static SerializableFunction getRawBytesToRowFunction(Schema rawSchema) { @@ -132,16 +118,9 @@ public List outputCollectionNames() { static class KafkaReadSchemaTransform extends SchemaTransform { private final KafkaReadSchemaTransformConfiguration configuration; - final Boolean isTest; - final Integer testTimeoutSecs; - KafkaReadSchemaTransform( - KafkaReadSchemaTransformConfiguration configuration, - Boolean isTest, - Integer testTimeoutSecs) { + KafkaReadSchemaTransform(KafkaReadSchemaTransformConfiguration configuration) { this.configuration = configuration; - this.isTest = isTest; - this.testTimeoutSecs = testTimeoutSecs; } Row getConfigurationRow() { @@ -194,8 +173,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .withValueDeserializer( ConfluentSchemaRegistryDeserializerProvider.of( confluentSchemaRegUrl, confluentSchemaRegSubject)); - if (isTest) { - kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs)); + if (configuration.getMaxReadTimeSeconds() != null) { + kafkaRead = + kafkaRead.withMaxReadTime( + Duration.standardSeconds(configuration.getMaxReadTimeSeconds())); } PCollection kafkaValues = @@ -240,8 +221,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()) .withTopic(configuration.getTopic()) .withBootstrapServers(configuration.getBootstrapServers()); - if (isTest) { - kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs)); + if (configuration.getMaxReadTimeSeconds() != null) { + kafkaRead = + kafkaRead.withMaxReadTime( + Duration.standardSeconds(configuration.getMaxReadTimeSeconds())); } PCollection kafkaValues = diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index ab6ac52e318d..2189a3ffab0c 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -48,6 +48,7 @@ import org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFnTest.FailingDeserializer; import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource; import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; +import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.ExperimentalOptions; @@ -646,13 +647,12 @@ public void runReadWriteKafkaViaSchemaTransforms( .setRowSchema(beamSchema)) .apply( "Write to Kafka", - new KafkaWriteSchemaTransformProvider() - .from( - KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransformConfiguration - .builder() - .setTopic(topicName) - .setBootstrapServers(options.getKafkaBootstrapServerAddresses()) - .setFormat(format) + Managed.write(Managed.KAFKA) + .withConfig( + ImmutableMap.builder() + .put("topic", topicName) + .put("bootstrap_servers", options.getKafkaBootstrapServerAddresses()) + .put("format", format) .build())); PAssert.that( @@ -661,15 +661,18 @@ public void runReadWriteKafkaViaSchemaTransforms( "Read from unbounded Kafka", // A timeout of 30s for local, container-based tests, and 2 minutes for // real-kafka tests. - new KafkaReadSchemaTransformProvider( - true, options.isWithTestcontainers() ? 30 : 120) - .from( - KafkaReadSchemaTransformConfiguration.builder() - .setFormat(format) - .setAutoOffsetResetConfig("earliest") - .setSchema(schemaDefinition) - .setTopic(topicName) - .setBootstrapServers(options.getKafkaBootstrapServerAddresses()) + Managed.read(Managed.KAFKA) + .withConfig( + ImmutableMap.builder() + .put("format", format) + .put("auto_offset_reset_config", "earliest") + .put("schema", schemaDefinition) + .put("topic", topicName) + .put( + "bootstrap_servers", options.getKafkaBootstrapServerAddresses()) + .put( + "max_read_time_seconds", + options.isWithTestcontainers() ? 30 : 120) .build())) .get("output")) .containsInAnyOrder( diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index 40291d84e1f8..acf6f4177f9c 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -132,7 +132,8 @@ public void testFindTransformAndMakeItWork() { "confluentSchemaRegistryUrl", "errorHandling", "fileDescriptorPath", - "messageName"), + "messageName", + "maxReadTimeSeconds"), kafkaProvider.configurationSchema().getFields().stream() .map(field -> field.getName()) .collect(Collectors.toSet())); diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java index 8165633cf15e..bd6d1650d7c0 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java @@ -55,6 +55,7 @@ public class ManagedTransformConstants { .put("schema", "schema") .put("file_descriptor_path", "fileDescriptorPath") .put("message_name", "messageName") + .put("max_read_time_seconds", "maxReadTimeSeconds") .build(); private static final Map KAFKA_WRITE_MAPPINGS = From 081524912fa04bbe68c238c0f7561c53882f8670 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 31 May 2024 23:43:47 -0400 Subject: [PATCH 06/11] fix nullness --- .../io/kafka/KafkaReadSchemaTransformProvider.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index b0dba73e56d0..bcf211e08c4a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -173,10 +173,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .withValueDeserializer( ConfluentSchemaRegistryDeserializerProvider.of( confluentSchemaRegUrl, confluentSchemaRegSubject)); - if (configuration.getMaxReadTimeSeconds() != null) { - kafkaRead = - kafkaRead.withMaxReadTime( - Duration.standardSeconds(configuration.getMaxReadTimeSeconds())); + Integer maxReadTimeSeconds = configuration.getMaxReadTimeSeconds(); + if (maxReadTimeSeconds != null) { + kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(maxReadTimeSeconds)); } PCollection kafkaValues = @@ -221,10 +220,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()) .withTopic(configuration.getTopic()) .withBootstrapServers(configuration.getBootstrapServers()); - if (configuration.getMaxReadTimeSeconds() != null) { - kafkaRead = - kafkaRead.withMaxReadTime( - Duration.standardSeconds(configuration.getMaxReadTimeSeconds())); + Integer maxReadTimeSeconds = configuration.getMaxReadTimeSeconds(); + if (maxReadTimeSeconds != null) { + kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(maxReadTimeSeconds)); } PCollection kafkaValues = From e10e0adaee322cb77b225c6b5013487eba1f1ddd Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sun, 2 Jun 2024 12:52:08 -0400 Subject: [PATCH 07/11] add some more mappings --- .../apache/beam/sdk/managed/ManagedTransformConstants.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java index bd6d1650d7c0..a81c659ecc8a 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java @@ -56,6 +56,8 @@ public class ManagedTransformConstants { .put("file_descriptor_path", "fileDescriptorPath") .put("message_name", "messageName") .put("max_read_time_seconds", "maxReadTimeSeconds") + .put("auto_offset_reset_config", "autoOffsetResetConfig") + .put("error_handling", "ErrorHandling") .build(); private static final Map KAFKA_WRITE_MAPPINGS = @@ -66,6 +68,8 @@ public class ManagedTransformConstants { .put("data_format", "format") .put("file_descriptor_path", "fileDescriptorPath") .put("message_name", "messageName") + .put("schema", "schema") + .put("error_handling", "ErrorHandling") .build(); public static final Map> MAPPINGS = From 8cecfe926fa594b0238f3c6bbe1285d8a1e3831a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sun, 2 Jun 2024 13:00:00 -0400 Subject: [PATCH 08/11] fix mapping --- .../apache/beam/sdk/managed/ManagedTransformConstants.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java index a81c659ecc8a..7233914ab1a9 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java @@ -57,7 +57,7 @@ public class ManagedTransformConstants { .put("message_name", "messageName") .put("max_read_time_seconds", "maxReadTimeSeconds") .put("auto_offset_reset_config", "autoOffsetResetConfig") - .put("error_handling", "ErrorHandling") + .put("error_handling", "errorHandling") .build(); private static final Map KAFKA_WRITE_MAPPINGS = @@ -69,7 +69,7 @@ public class ManagedTransformConstants { .put("file_descriptor_path", "fileDescriptorPath") .put("message_name", "messageName") .put("schema", "schema") - .put("error_handling", "ErrorHandling") + .put("error_handling", "errorHandling") .build(); public static final Map> MAPPINGS = From 7ea495291a342e1a2755f79403367510eccabdf1 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 3 Jun 2024 12:40:04 -0400 Subject: [PATCH 09/11] typo --- .../sdk/io/kafka/KafkaSchemaTransformTranslationTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java index b899408ceeee..2ef4fabaa250 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java @@ -113,7 +113,7 @@ public void testWriteTransformProtoTranslation() (KafkaWriteSchemaTransform) WRITE_PROVIDER.from(transformConfigRow); PCollectionRowTuple.of("input", input).apply(writeTransform); - // Then translate the pipeline to a proto and extract IcebergWriteSchemaTransform proto + // Then translate the pipeline to a proto and extract KafkaWriteSchemaTransform proto RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); List writeTransformProto = pipelineProto.getComponents().getTransformsMap().values().stream() @@ -141,7 +141,7 @@ public void testWriteTransformProtoTranslation() assertEquals(transformConfigRow, rowFromSpec); - // Use the information in the proto to recreate the IcebergWriteSchemaTransform + // Use the information in the proto to recreate the KafkaWriteSchemaTransform KafkaWriteSchemaTransformTranslator translator = new KafkaWriteSchemaTransformTranslator(); KafkaWriteSchemaTransform writeTransformFromSpec = translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); @@ -204,7 +204,7 @@ public void testReadTransformProtoTranslation() PCollectionRowTuple.empty(p).apply(readTransform); - // Then translate the pipeline to a proto and extract IcebergReadSchemaTransform proto + // Then translate the pipeline to a proto and extract KafkaReadSchemaTransform proto RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); List readTransformProto = pipelineProto.getComponents().getTransformsMap().values().stream() @@ -231,7 +231,7 @@ public void testReadTransformProtoTranslation() Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); assertEquals(transformConfigRow, rowFromSpec); - // Use the information in the proto to recreate the IcebergReadSchemaTransform + // Use the information in the proto to recreate the KafkaReadSchemaTransform KafkaReadSchemaTransformTranslator translator = new KafkaReadSchemaTransformTranslator(); KafkaReadSchemaTransform readTransformFromSpec = translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); From 8604c76af8ff48fa567278bc6f305b7b9ea5545c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 3 Jun 2024 12:55:04 -0400 Subject: [PATCH 10/11] more accurate test name --- .../java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index 2189a3ffab0c..4d38636892c2 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -608,18 +608,18 @@ public void testKafkaWithDelayedStopReadingFunction() { private static final int FIVE_MINUTES_IN_MS = 5 * 60 * 1000; @Test(timeout = FIVE_MINUTES_IN_MS) - public void testKafkaViaSchemaTransformJson() { - runReadWriteKafkaViaSchemaTransforms( + public void testKafkaViaManagedSchemaTransformJson() { + runReadWriteKafkaViaManagedSchemaTransforms( "JSON", SCHEMA_IN_JSON, JsonUtils.beamSchemaFromJsonSchema(SCHEMA_IN_JSON)); } @Test(timeout = FIVE_MINUTES_IN_MS) - public void testKafkaViaSchemaTransformAvro() { - runReadWriteKafkaViaSchemaTransforms( + public void testKafkaViaManagedSchemaTransformAvro() { + runReadWriteKafkaViaManagedSchemaTransforms( "AVRO", AvroUtils.toAvroSchema(KAFKA_TOPIC_SCHEMA).toString(), KAFKA_TOPIC_SCHEMA); } - public void runReadWriteKafkaViaSchemaTransforms( + public void runReadWriteKafkaViaManagedSchemaTransforms( String format, String schemaDefinition, Schema beamSchema) { String topicName = options.getKafkaTopic() + "-schema-transform" + UUID.randomUUID(); PCollectionRowTuple.of( From 4d41aef6270eaea4c2d84af7c1af83b10326ebac Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 4 Jun 2024 17:02:45 -0400 Subject: [PATCH 11/11] cleanup after merging snake_case PR --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 +- .../KafkaReadSchemaTransformProvider.java | 3 +- .../KafkaWriteSchemaTransformProvider.java | 3 +- .../KafkaSchemaTransformTranslationTest.java | 99 +++++++------------ 4 files changed, 42 insertions(+), 65 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index e897ed439cd1..ea65ad794df9 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -2665,7 +2665,7 @@ abstract static class Builder { abstract Builder setProducerFactoryFn( SerializableFunction, Producer> fn); - abstract Builder setKeySerializer(Class> serializer); + abstract Builder setKeySerializer(@Nullable Class> serializer); abstract Builder setValueSerializer(Class> serializer); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index bcf211e08c4a..b2eeb1a54d1d 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -130,7 +130,8 @@ Row getConfigurationRow() { return SchemaRegistry.createDefault() .getToRowFunction(KafkaReadSchemaTransformConfiguration.class) .apply(configuration) - .sorted(); + .sorted() + .toSnakeCase(); } catch (NoSuchSchemaException e) { throw new RuntimeException(e); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java index 68d7b1952ee4..09b338492b47 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java @@ -108,7 +108,8 @@ Row getConfigurationRow() { return SchemaRegistry.createDefault() .getToRowFunction(KafkaWriteSchemaTransformConfiguration.class) .apply(configuration) - .sorted(); + .sorted() + .toSnakeCase(); } catch (NoSuchSchemaException e) { throw new RuntimeException(e); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java index 2ef4fabaa250..b297227bb7aa 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformTranslationTest.java @@ -59,21 +59,37 @@ public class KafkaSchemaTransformTranslationTest { static final KafkaReadSchemaTransformProvider READ_PROVIDER = new KafkaReadSchemaTransformProvider(); + static final Row READ_CONFIG = + Row.withSchema(READ_PROVIDER.configurationSchema()) + .withFieldValue("format", "RAW") + .withFieldValue("topic", "test_topic") + .withFieldValue("bootstrap_servers", "host:port") + .withFieldValue("confluent_schema_registry_url", null) + .withFieldValue("confluent_schema_registry_subject", null) + .withFieldValue("schema", null) + .withFieldValue("file_descriptor_path", "testPath") + .withFieldValue("message_name", "test_message") + .withFieldValue("auto_offset_reset_config", "earliest") + .withFieldValue("consumer_config_updates", ImmutableMap.builder().build()) + .withFieldValue("error_handling", null) + .build(); + + static final Row WRITE_CONFIG = + Row.withSchema(WRITE_PROVIDER.configurationSchema()) + .withFieldValue("format", "RAW") + .withFieldValue("topic", "test_topic") + .withFieldValue("bootstrap_servers", "host:port") + .withFieldValue("producer_config_updates", ImmutableMap.builder().build()) + .withFieldValue("error_handling", null) + .withFieldValue("file_descriptor_path", "testPath") + .withFieldValue("message_name", "test_message") + .withFieldValue("schema", "test_schema") + .build(); + @Test public void testRecreateWriteTransformFromRow() { - Row transformConfigRow = - Row.withSchema(WRITE_PROVIDER.configurationSchema()) - .withFieldValue("format", "RAW") - .withFieldValue("topic", "test_topic") - .withFieldValue("bootstrapServers", "host:port") - .withFieldValue("producerConfigUpdates", ImmutableMap.builder().build()) - .withFieldValue("errorHandling", null) - .withFieldValue("fileDescriptorPath", "testPath") - .withFieldValue("messageName", "test_message") - .withFieldValue("schema", "test_schema") - .build(); KafkaWriteSchemaTransform writeTransform = - (KafkaWriteSchemaTransform) WRITE_PROVIDER.from(transformConfigRow); + (KafkaWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG); KafkaWriteSchemaTransformTranslator translator = new KafkaWriteSchemaTransformTranslator(); Row translatedRow = translator.toConfigRow(writeTransform); @@ -81,7 +97,7 @@ public void testRecreateWriteTransformFromRow() { KafkaWriteSchemaTransform writeTransformFromRow = translator.fromConfigRow(translatedRow, PipelineOptionsFactory.create()); - assertEquals(transformConfigRow, writeTransformFromRow.getConfigurationRow()); + assertEquals(WRITE_CONFIG, writeTransformFromRow.getConfigurationRow()); } @Test @@ -97,20 +113,8 @@ public void testWriteTransformProtoTranslation() Row.withSchema(inputSchema).addValue(new byte[] {1, 2, 3}).build()))) .setRowSchema(inputSchema); - Row transformConfigRow = - Row.withSchema(WRITE_PROVIDER.configurationSchema()) - .withFieldValue("format", "RAW") - .withFieldValue("topic", "test_topic") - .withFieldValue("bootstrapServers", "host:port") - .withFieldValue("producerConfigUpdates", ImmutableMap.builder().build()) - .withFieldValue("errorHandling", null) - .withFieldValue("fileDescriptorPath", "testPath") - .withFieldValue("messageName", "test_message") - .withFieldValue("schema", "test_schema") - .build(); - KafkaWriteSchemaTransform writeTransform = - (KafkaWriteSchemaTransform) WRITE_PROVIDER.from(transformConfigRow); + (KafkaWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG); PCollectionRowTuple.of("input", input).apply(writeTransform); // Then translate the pipeline to a proto and extract KafkaWriteSchemaTransform proto @@ -139,36 +143,21 @@ public void testWriteTransformProtoTranslation() assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec); Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); - assertEquals(transformConfigRow, rowFromSpec); + assertEquals(WRITE_CONFIG, rowFromSpec); // Use the information in the proto to recreate the KafkaWriteSchemaTransform KafkaWriteSchemaTransformTranslator translator = new KafkaWriteSchemaTransformTranslator(); KafkaWriteSchemaTransform writeTransformFromSpec = translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); - assertEquals(transformConfigRow, writeTransformFromSpec.getConfigurationRow()); + assertEquals(WRITE_CONFIG, writeTransformFromSpec.getConfigurationRow()); } @Test public void testReCreateReadTransformFromRow() { // setting a subset of fields here. - Row transformConfigRow = - Row.withSchema(READ_PROVIDER.configurationSchema()) - .withFieldValue("format", "RAW") - .withFieldValue("topic", "test_topic") - .withFieldValue("bootstrapServers", "host:port") - .withFieldValue("confluentSchemaRegistryUrl", "test_url") - .withFieldValue("confluentSchemaRegistrySubject", "test_subject") - .withFieldValue("schema", "test_schema") - .withFieldValue("fileDescriptorPath", "testPath") - .withFieldValue("messageName", "test_message") - .withFieldValue("autoOffsetResetConfig", "earliest") - .withFieldValue("consumerConfigUpdates", ImmutableMap.builder().build()) - .withFieldValue("errorHandling", null) - .build(); - KafkaReadSchemaTransform readTransform = - (KafkaReadSchemaTransform) READ_PROVIDER.from(transformConfigRow); + (KafkaReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG); KafkaReadSchemaTransformTranslator translator = new KafkaReadSchemaTransformTranslator(); Row row = translator.toConfigRow(readTransform); @@ -176,7 +165,7 @@ public void testReCreateReadTransformFromRow() { KafkaReadSchemaTransform readTransformFromRow = translator.fromConfigRow(row, PipelineOptionsFactory.create()); - assertEquals(transformConfigRow, readTransformFromRow.getConfigurationRow()); + assertEquals(READ_CONFIG, readTransformFromRow.getConfigurationRow()); } @Test @@ -184,23 +173,9 @@ public void testReadTransformProtoTranslation() throws InvalidProtocolBufferException, IOException { // First build a pipeline Pipeline p = Pipeline.create(); - Row transformConfigRow = - Row.withSchema(READ_PROVIDER.configurationSchema()) - .withFieldValue("format", "RAW") - .withFieldValue("topic", "test_topic") - .withFieldValue("bootstrapServers", "host:port") - .withFieldValue("confluentSchemaRegistryUrl", null) - .withFieldValue("confluentSchemaRegistrySubject", null) - .withFieldValue("schema", null) - .withFieldValue("fileDescriptorPath", "testPath") - .withFieldValue("messageName", "test_message") - .withFieldValue("autoOffsetResetConfig", "earliest") - .withFieldValue("consumerConfigUpdates", ImmutableMap.builder().build()) - .withFieldValue("errorHandling", null) - .build(); KafkaReadSchemaTransform readTransform = - (KafkaReadSchemaTransform) READ_PROVIDER.from(transformConfigRow); + (KafkaReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG); PCollectionRowTuple.empty(p).apply(readTransform); @@ -229,13 +204,13 @@ public void testReadTransformProtoTranslation() Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec); Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); - assertEquals(transformConfigRow, rowFromSpec); + assertEquals(READ_CONFIG, rowFromSpec); // Use the information in the proto to recreate the KafkaReadSchemaTransform KafkaReadSchemaTransformTranslator translator = new KafkaReadSchemaTransformTranslator(); KafkaReadSchemaTransform readTransformFromSpec = translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); - assertEquals(transformConfigRow, readTransformFromSpec.getConfigurationRow()); + assertEquals(READ_CONFIG, readTransformFromSpec.getConfigurationRow()); } }