Skip to content

Commit

Permalink
Add config validation to kafka read schema transform (#30625)
Browse files Browse the repository at this point in the history
* Add config validation to kafka read schema transform

Signed-off-by: Jeffrey Kinard <[email protected]>
  • Loading branch information
Polber authored Apr 11, 2024
1 parent 67b3e11 commit e33dec6
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.auto.value.AutoValue;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -46,11 +49,13 @@ public abstract class KafkaReadSchemaTransformConfiguration {

public void validate() {
final String startOffset = this.getAutoOffsetResetConfig();
assert startOffset == null || VALID_START_OFFSET_VALUES.contains(startOffset)
: "Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES;
checkArgument(
startOffset == null || VALID_START_OFFSET_VALUES.contains(startOffset),
"Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES);
final String dataFormat = this.getFormat();
assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat)
: "Valid data formats are " + VALID_DATA_FORMATS;
checkArgument(
dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat),
"Valid data formats are " + VALID_DATA_FORMATS);

final String inputSchema = this.getSchema();
final String messageName = this.getMessageName();
Expand All @@ -59,20 +64,23 @@ public void validate() {
final String confluentSchemaRegSubject = this.getConfluentSchemaRegistrySubject();

if (confluentSchemaRegUrl != null) {
assert confluentSchemaRegSubject != null
: "To read from Kafka, a schema must be provided directly or though Confluent "
+ "Schema Registry. Make sure you are providing one of these parameters.";
checkNotNull(
confluentSchemaRegSubject,
"To read from Kafka, a schema must be provided directly or though Confluent "
+ "Schema Registry. Make sure you are providing one of these parameters.");
} else if (dataFormat != null && dataFormat.equals("RAW")) {
assert inputSchema == null : "To read from Kafka in RAW format, you can't provide a schema.";
checkArgument(
inputSchema == null, "To read from Kafka in RAW format, you can't provide a schema.");
} else if (dataFormat != null && dataFormat.equals("JSON")) {
assert inputSchema != null : "To read from Kafka in JSON format, you must provide a schema.";
checkNotNull(inputSchema, "To read from Kafka in JSON format, you must provide a schema.");
} else if (dataFormat != null && dataFormat.equals("PROTO")) {
assert messageName != null
: "To read from Kafka in PROTO format, messageName must be provided.";
assert fileDescriptorPath != null || inputSchema != null
: "To read from Kafka in PROTO format, fileDescriptorPath or schema must be provided.";
checkNotNull(
messageName, "To read from Kafka in PROTO format, messageName must be provided.");
checkArgument(
fileDescriptorPath != null || inputSchema != null,
"To read from Kafka in PROTO format, fileDescriptorPath or schema must be provided.");
} else {
assert inputSchema != null : "To read from Kafka in AVRO format, you must provide a schema.";
checkNotNull(inputSchema, "To read from Kafka in AVRO format, you must provide a schema.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ protected Class<KafkaReadSchemaTransformConfiguration> configurationClass() {
})
@Override
protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configuration) {
configuration.validate();

final String inputSchema = configuration.getSchema();
final int groupId = configuration.hashCode() % Integer.MAX_VALUE;
final String autoOffsetReset =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class KafkaReadSchemaTransformProviderTest {
@Test
public void testValidConfigurations() {
assertThrows(
AssertionError.class,
IllegalArgumentException.class,
() -> {
KafkaReadSchemaTransformConfiguration.builder()
.setFormat("UNUSUAL_FORMAT")
Expand Down Expand Up @@ -274,7 +274,7 @@ public void testBuildTransformWithoutProtoSchemaFormat() {
(KafkaReadSchemaTransformProvider) providers.get(0);

assertThrows(
NullPointerException.class,
IllegalArgumentException.class,
() ->
kafkaProvider.from(
KafkaReadSchemaTransformConfiguration.builder()
Expand Down

0 comments on commit e33dec6

Please sign in to comment.