diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java index d95c49894a2c..13f5249a6c3b 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.io.kafka; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + import com.google.auto.value.AutoValue; import java.util.Map; import java.util.Set; @@ -46,11 +49,13 @@ public abstract class KafkaReadSchemaTransformConfiguration { public void validate() { final String startOffset = this.getAutoOffsetResetConfig(); - assert startOffset == null || VALID_START_OFFSET_VALUES.contains(startOffset) - : "Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES; + checkArgument( + startOffset == null || VALID_START_OFFSET_VALUES.contains(startOffset), + "Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES); final String dataFormat = this.getFormat(); - assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat) - : "Valid data formats are " + VALID_DATA_FORMATS; + checkArgument( + dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat), + "Valid data formats are " + VALID_DATA_FORMATS); final String inputSchema = this.getSchema(); final String messageName = this.getMessageName(); @@ -59,20 +64,23 @@ public void validate() { final String confluentSchemaRegSubject = this.getConfluentSchemaRegistrySubject(); if (confluentSchemaRegUrl != null) { - assert confluentSchemaRegSubject != null - : "To read from Kafka, a schema must be provided directly or though Confluent " - + "Schema Registry. Make sure you are providing one of these parameters."; + checkNotNull( + confluentSchemaRegSubject, + "To read from Kafka, a schema must be provided directly or though Confluent " + + "Schema Registry. Make sure you are providing one of these parameters."); } else if (dataFormat != null && dataFormat.equals("RAW")) { - assert inputSchema == null : "To read from Kafka in RAW format, you can't provide a schema."; + checkArgument( + inputSchema == null, "To read from Kafka in RAW format, you can't provide a schema."); } else if (dataFormat != null && dataFormat.equals("JSON")) { - assert inputSchema != null : "To read from Kafka in JSON format, you must provide a schema."; + checkNotNull(inputSchema, "To read from Kafka in JSON format, you must provide a schema."); } else if (dataFormat != null && dataFormat.equals("PROTO")) { - assert messageName != null - : "To read from Kafka in PROTO format, messageName must be provided."; - assert fileDescriptorPath != null || inputSchema != null - : "To read from Kafka in PROTO format, fileDescriptorPath or schema must be provided."; + checkNotNull( + messageName, "To read from Kafka in PROTO format, messageName must be provided."); + checkArgument( + fileDescriptorPath != null || inputSchema != null, + "To read from Kafka in PROTO format, fileDescriptorPath or schema must be provided."); } else { - assert inputSchema != null : "To read from Kafka in AVRO format, you must provide a schema."; + checkNotNull(inputSchema, "To read from Kafka in AVRO format, you must provide a schema."); } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index 10a347929ee0..2776c388f7cc 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -99,6 +99,8 @@ protected Class configurationClass() { }) @Override protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configuration) { + configuration.validate(); + final String inputSchema = configuration.getSchema(); final int groupId = configuration.hashCode() % Integer.MAX_VALUE; final String autoOffsetReset = diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index 4f133746b535..f6e231c758a5 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -65,7 +65,7 @@ public class KafkaReadSchemaTransformProviderTest { @Test public void testValidConfigurations() { assertThrows( - AssertionError.class, + IllegalArgumentException.class, () -> { KafkaReadSchemaTransformConfiguration.builder() .setFormat("UNUSUAL_FORMAT") @@ -274,7 +274,7 @@ public void testBuildTransformWithoutProtoSchemaFormat() { (KafkaReadSchemaTransformProvider) providers.get(0); assertThrows( - NullPointerException.class, + IllegalArgumentException.class, () -> kafkaProvider.from( KafkaReadSchemaTransformConfiguration.builder()