From 508e0fea774174f9fedca229f6c91d214a52f1f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20H=C3=A4user?= Date: Thu, 16 May 2024 22:00:57 +0200 Subject: [PATCH] Allow null values to be set to null for jsonformat Description: Using the JsonFormat to write "from" debezium to kafka and then using the s3sinkconnector to read from kafka and save to s3, causes null values to be stored always with their default values. Therefore adding a new config property (for backwards compatibility) to allow the value transformer inside the s3sinkconnector to be configured correctly. Tests for the configuration and and integration have been added as well. This addresses #716, but for json, instead of avro --- .../connect/s3/S3SinkConnectorConfig.java | 25 ++++- .../connect/s3/format/json/JsonFormat.java | 9 +- .../connect/s3/S3SinkConnectorConfigTest.java | 46 +++++----- .../s3/integration/BaseConnectorIT.java | 92 ++++++++++++------- .../s3/integration/S3SinkConnectorIT.java | 55 +++++++---- .../s3/integration/S3SinkDataFormatIT.java | 58 ++++++------ 6 files changed, 177 insertions(+), 108 deletions(-) diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index c56a89c35..8f5552742 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -24,7 +24,6 @@ import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.SSEAlgorithm; -import io.confluent.connect.storage.common.util.StringUtils; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -67,6 +66,7 @@ import io.confluent.connect.storage.common.GenericRecommender; import io.confluent.connect.storage.common.ParentValueRecommender; import io.confluent.connect.storage.common.StorageCommonConfig; +import io.confluent.connect.storage.common.util.StringUtils; import io.confluent.connect.storage.format.Format; import io.confluent.connect.storage.partitioner.DailyPartitioner; import io.confluent.connect.storage.partitioner.DefaultPartitioner; @@ -184,6 +184,13 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { + " This value is case insensitive and can be either 'BASE64' (default) or 'NUMERIC'"; private static final String DECIMAL_FORMAT_DISPLAY = "Decimal Format"; + public static final String REPLACE_NULL_WITH_DEFAULT_CONFIG = "json.replace.null.with.default"; + public static final boolean REPLACE_NULL_WITH_DEFAULT_DEFAULT = true; + private static final String REPLACE_NULL_WITH_DEFAULT_DOC = "Whether to replace fields that" + + " have a default value and that are null to the default value." + + " When set to true, the default value is used, otherwise null is used."; + private static final String REPLACE_NULL_WITH_DEFAULT_DISPLAY = "Replace null with default"; + public static final String STORE_KAFKA_KEYS_CONFIG = "store.kafka.keys"; public static final String STORE_KAFKA_HEADERS_CONFIG = "store.kafka.headers"; public static final String KEYS_FORMAT_CLASS_CONFIG = "keys.format.class"; @@ -300,11 +307,11 @@ public static ConfigDef newConfigDef() { configDef.define( S3_BUCKET_CONFIG, Type.STRING, - Importance.HIGH, + ConfigDef.Importance.HIGH, "The S3 Bucket.", group, ++orderInGroup, - Width.LONG, + ConfigDef.Width.LONG, "S3 Bucket" ); @@ -537,6 +544,18 @@ public static ConfigDef newConfigDef() { DECIMAL_FORMAT_DISPLAY ); + configDef.define( + REPLACE_NULL_WITH_DEFAULT_CONFIG, + Type.BOOLEAN, + REPLACE_NULL_WITH_DEFAULT_DEFAULT, + Importance.LOW, + REPLACE_NULL_WITH_DEFAULT_DOC, + group, + ++orderInGroup, + Width.SHORT, + REPLACE_NULL_WITH_DEFAULT_DISPLAY + ); + configDef.define( S3_PART_RETRIES_CONFIG, Type.INT, diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonFormat.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonFormat.java index e4175c634..f5eaff890 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonFormat.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonFormat.java @@ -16,11 +16,11 @@ package io.confluent.connect.s3.format.json; import org.apache.kafka.connect.json.JsonConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import io.confluent.connect.s3.S3SinkConnectorConfig; import io.confluent.connect.s3.storage.S3Storage; @@ -46,6 +46,11 @@ public JsonFormat(S3Storage storage) { "decimal.format", String.valueOf(storage.conf().getJsonDecimalFormat()) ); + converterConfig.put( + "replace.null.with.default", + storage.conf().getBoolean(S3SinkConnectorConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG) + ); + this.converter.configure(converterConfig, false); } diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java index 1d346b644..7471aa04d 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java @@ -17,16 +17,27 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; - +import io.confluent.connect.avro.AvroDataConfig; +import io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider; +import io.confluent.connect.s3.format.avro.AvroFormat; import io.confluent.connect.s3.format.bytearray.ByteArrayFormat; +import io.confluent.connect.s3.format.json.JsonFormat; import io.confluent.connect.s3.format.parquet.ParquetFormat; - +import io.confluent.connect.s3.storage.S3Storage; +import io.confluent.connect.storage.common.StorageCommonConfig; +import io.confluent.connect.storage.partitioner.DailyPartitioner; +import io.confluent.connect.storage.partitioner.DefaultPartitioner; +import io.confluent.connect.storage.partitioner.FieldPartitioner; +import io.confluent.connect.storage.partitioner.HourlyPartitioner; +import io.confluent.connect.storage.partitioner.Partitioner; +import io.confluent.connect.storage.partitioner.PartitionerConfig; +import io.confluent.connect.storage.partitioner.TimeBasedPartitioner; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.json.DecimalFormat; import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.After; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -38,32 +49,15 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider; -import io.confluent.connect.s3.format.avro.AvroFormat; -import io.confluent.connect.s3.format.json.JsonFormat; -import io.confluent.connect.s3.storage.S3Storage; -import io.confluent.connect.storage.common.StorageCommonConfig; -import io.confluent.connect.storage.partitioner.DailyPartitioner; -import io.confluent.connect.storage.partitioner.DefaultPartitioner; -import io.confluent.connect.storage.partitioner.FieldPartitioner; -import io.confluent.connect.storage.partitioner.HourlyPartitioner; -import io.confluent.connect.storage.partitioner.Partitioner; -import io.confluent.connect.storage.partitioner.PartitionerConfig; -import io.confluent.connect.storage.partitioner.TimeBasedPartitioner; -import io.confluent.connect.avro.AvroDataConfig; - import static io.confluent.connect.s3.S3SinkConnectorConfig.AffixType; -import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_CONFIG; -import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_DEFAULT; import static io.confluent.connect.s3.S3SinkConnectorConfig.HEADERS_FORMAT_CLASS_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.KEYS_FORMAT_CLASS_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.SCHEMA_PARTITION_AFFIX_TYPE_CONFIG; - import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; public class S3SinkConnectorConfigTest extends S3SinkConnectorTestBase { @@ -458,6 +452,16 @@ public void testJsonDecimalFormat() { assertEquals(DecimalFormat.NUMERIC.name(), connectorConfig.getJsonDecimalFormat()); } + @Test + public void testJsonReplaceNullWithDefault() { + connectorConfig = new S3SinkConnectorConfig(properties); + assertTrue(connectorConfig.getBoolean("json.replace.null.with.default")); + + properties.put(S3SinkConnectorConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG, "false"); + connectorConfig = new S3SinkConnectorConfig(properties); + assertFalse(connectorConfig.getBoolean("json.replace.null.with.default")); + } + @Test public void testValidCompressionLevels() { IntStream.range(-1, 9).boxed().forEach(i -> { diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/BaseConnectorIT.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/BaseConnectorIT.java index 4351ddff3..c1d7243d0 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/BaseConnectorIT.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/BaseConnectorIT.java @@ -15,10 +15,6 @@ package io.confluent.connect.s3.integration; -import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG; -import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG; -import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC; - import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.s3.AmazonS3; @@ -29,27 +25,12 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.NullNode; import com.google.common.collect.ImmutableMap; import io.confluent.common.utils.IntegrationTest; +import io.confluent.connect.s3.util.S3Utils; import io.confluent.kafka.schemaregistry.CompatibilityLevel; import io.confluent.kafka.schemaregistry.RestApp; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.net.ServerSocket; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import io.confluent.connect.s3.util.S3Utils; -import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; @@ -79,6 +60,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG; +import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC; import static org.assertj.core.api.Assertions.assertThat; @Category(IntegrationTest.class) @@ -332,15 +332,15 @@ protected Schema getSampleStructSchema() { .field("myFloat32", Schema.FLOAT32_SCHEMA) .field("myFloat64", Schema.FLOAT64_SCHEMA) .field("myString", Schema.STRING_SCHEMA) + .field("withDefault", SchemaBuilder.bool().optional().defaultValue(true).build()) .build(); } protected Struct getSampleStructVal(Schema structSchema) { - Date sampleDate = new Date(1111111); - sampleDate.setTime(0); return new Struct(structSchema) .put("ID", (long) 1) .put("myBool", true) + .put("withDefault", null) .put("myInt32", 32) .put("myFloat32", 3.2f) .put("myFloat64", 64.64) @@ -409,12 +409,15 @@ protected static void clearBucket(String bucketName) { * @param bucketName the name of the s3 test bucket * @param expectedRowsPerFile the number of rows a file should have * @param expectedRow the expected row data in each file + * @param useDefaultValues + * * @return whether every row of the files read equals the expected row */ protected boolean fileContentsAsExpected( String bucketName, int expectedRowsPerFile, - Struct expectedRow + Struct expectedRow, + boolean useDefaultValues ) { log.info("expectedRow: {}", expectedRow); for (String fileName : @@ -427,7 +430,7 @@ protected boolean fileContentsAsExpected( String fileExtension = getExtensionFromKey(fileName); List downloadedFileContents = contentGetters.get(fileExtension) .apply(destinationPath); - if (!fileContentsMatchExpected(downloadedFileContents, expectedRowsPerFile, expectedRow)) { + if (!fileContentsMatchExpected(downloadedFileContents, expectedRowsPerFile, expectedRow, useDefaultValues)) { return false; } downloadedFile.delete(); @@ -481,12 +484,15 @@ protected boolean keyfileContentsAsExpected( * @param fileContents the file contents as a list of JsonNodes * @param expectedRowsPerFile the number of rows expected in the file * @param expectedRow the expected values of each row + * @param useDefaultValues use default values from struct + * * @return whether the file contents match the expected row */ protected boolean fileContentsMatchExpected( List fileContents, int expectedRowsPerFile, - Struct expectedRow + Struct expectedRow, + boolean useDefaultValues ) { if (fileContents.size() != expectedRowsPerFile) { log.error("Number of rows in file do not match the expected count, actual: {}, expected: {}", @@ -494,7 +500,7 @@ protected boolean fileContentsMatchExpected( return false; } for (JsonNode row : fileContents) { - if (!fileRowMatchesExpectedRow(row, expectedRow)) { + if (!fileRowMatchesExpectedRow(row, expectedRow, useDefaultValues)) { return false; } } @@ -512,18 +518,34 @@ private List getS3KeyFileList(List summaries) { /** * Compare the row in the file and its values to the expected row's values. * - * @param fileRow the row read from the file as a JsonNode - * @param expectedRow the expected contents of the row + * @param fileRow the row read from the file as a JsonNode + * @param expectedRow the expected contents of the row + * @param useDefaultValues + * * @return whether the file row matches the expected row */ - private boolean fileRowMatchesExpectedRow(JsonNode fileRow, Struct expectedRow) { - log.debug("Comparing rows: file: {}, expected: {}", fileRow, expectedRow); + private boolean fileRowMatchesExpectedRow(JsonNode fileRow, Struct expectedRow, boolean useDefaultValues) { + log.info("Comparing rows: file: {}, expected: {}", fileRow, expectedRow); // compare the field values for (Field key : expectedRow.schema().fields()) { - String expectedValue = expectedRow.get(key).toString(); - String rowValue = fileRow.get(key.name()).toString().replaceAll("^\"|\"$", ""); - log.debug("Comparing values: {}, {}", expectedValue, rowValue); - if (!rowValue.equals(expectedValue)) { + String expectedValue = null; + if (useDefaultValues) { + expectedValue = expectedRow.get(key).toString(); + } else { + Object withoutDefault = expectedRow.getWithoutDefault(key.name()); + if (withoutDefault != null) { + expectedValue = withoutDefault.toString(); + } + } + + JsonNode jsonValue = fileRow.get(key.name()); + String rowValue = null; + if (!(jsonValue instanceof NullNode)) { + rowValue = jsonValue.toString().replaceAll("^\"|\"$", ""); + } + + log.info("Comparing values: {}, {}, {}, {}", key.name(), expectedValue, rowValue, Objects.equals(rowValue, expectedValue)); + if (!Objects.equals(rowValue, expectedValue)) { return false; } } diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkConnectorIT.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkConnectorIT.java index 24b6296b4..22b6062f9 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkConnectorIT.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkConnectorIT.java @@ -17,11 +17,10 @@ import static io.confluent.connect.s3.S3SinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.KEYS_FORMAT_CLASS_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_BUCKET_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.STORE_KAFKA_HEADERS_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.STORE_KAFKA_KEYS_CONFIG; -import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG; -import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.TOMBSTONE_ENCODED_PARTITION; import static io.confluent.connect.storage.StorageSinkConnectorConfig.FLUSH_SIZE_CONFIG; import static io.confluent.connect.storage.StorageSinkConnectorConfig.FORMAT_CLASS_CONFIG; @@ -32,7 +31,6 @@ import static org.hamcrest.core.StringStartsWith.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - import io.confluent.connect.s3.S3SinkConnector; import io.confluent.connect.s3.S3SinkConnectorConfig.IgnoreOrFailBehavior; import io.confluent.connect.s3.S3SinkConnectorConfig.OutputWriteBehavior; @@ -40,6 +38,7 @@ import io.confluent.connect.s3.format.json.JsonFormat; import io.confluent.connect.s3.format.parquet.ParquetFormat; import io.confluent.connect.s3.storage.S3Storage; +import io.confluent.connect.s3.util.EmbeddedConnectUtils; import java.io.File; import java.util.ArrayList; import java.util.Arrays; @@ -51,8 +50,6 @@ import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; - -import io.confluent.connect.s3.util.EmbeddedConnectUtils; import org.apache.commons.io.FileUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -60,10 +57,10 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.common.header.Header; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.sink.SinkRecord; @@ -99,7 +96,7 @@ public class S3SinkConnectorIT extends BaseConnectorIT { @Before public void before() throws InterruptedException { - initializeJsonConverter(); + initializeJsonConverter(true); initializeCustomProducer(); setupProperties(); waitForSchemaRegistryToStart(); @@ -127,21 +124,33 @@ public void after() throws Exception { public void testBasicRecordsWrittenAvro() throws Throwable { //add test specific props props.put(FORMAT_CLASS_CONFIG, AvroFormat.class.getName()); - testBasicRecordsWritten(AVRO_EXTENSION, false); + testBasicRecordsWritten(AVRO_EXTENSION, false, true); } @Test public void testBasicRecordsWrittenParquet() throws Throwable { //add test specific props props.put(FORMAT_CLASS_CONFIG, ParquetFormat.class.getName()); - testBasicRecordsWritten(PARQUET_EXTENSION, false); + testBasicRecordsWritten(PARQUET_EXTENSION, false, true); } @Test public void testBasicRecordsWrittenJson() throws Throwable { //add test specific props props.put(FORMAT_CLASS_CONFIG, JsonFormat.class.getName()); - testBasicRecordsWritten(JSON_EXTENSION, false); + testBasicRecordsWritten(JSON_EXTENSION, false, true); + } + + @Test + public void testBasicRecordsWrittenJsonWithoutDefaults() throws Throwable { + initializeJsonConverter(false); + + //add test specific props + props.put(FORMAT_CLASS_CONFIG, JsonFormat.class.getName()); + props.put(REPLACE_NULL_WITH_DEFAULT_CONFIG, "false"); + props.put("value.converter.replace.null.with.default", "false"); + props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + testBasicRecordsWritten(JSON_EXTENSION, false, false); } @Test @@ -155,38 +164,42 @@ public void testTombstoneRecordsWrittenJson() throws Throwable { testTombstoneRecordsWritten(JSON_EXTENSION, false); } - + @Test public void testFilesWrittenToBucketAvroWithExtInTopic() throws Throwable { //add test specific props props.put(FORMAT_CLASS_CONFIG, AvroFormat.class.getName()); - testBasicRecordsWritten(AVRO_EXTENSION, true); + testBasicRecordsWritten(AVRO_EXTENSION, true, true); } @Test public void testFilesWrittenToBucketParquetWithExtInTopic() throws Throwable { //add test specific props props.put(FORMAT_CLASS_CONFIG, ParquetFormat.class.getName()); - testBasicRecordsWritten(PARQUET_EXTENSION, true); + testBasicRecordsWritten(PARQUET_EXTENSION, true, true); } @Test public void testFilesWrittenToBucketJsonWithExtInTopic() throws Throwable { //add test specific props props.put(FORMAT_CLASS_CONFIG, JsonFormat.class.getName()); - testBasicRecordsWritten(JSON_EXTENSION, true); + testBasicRecordsWritten(JSON_EXTENSION, true, true); } /** * Test that the expected records are written for a given file extension * Optionally, test that topics which have "*.{expectedFileExtension}*" in them are processed * and written. + * * @param expectedFileExtension The file extension to test against - * @param addExtensionInTopic Add a topic to to the test which contains the extension + * @param addExtensionInTopic Add a topic to the test which contains the extension + * @param useDefaultValues Should default values be used or null values + * * @throws Throwable */ private void testBasicRecordsWritten( String expectedFileExtension, - boolean addExtensionInTopic + boolean addExtensionInTopic, + boolean useDefaultValues ) throws Throwable { final String topicNameWithExt = "other." + expectedFileExtension + ".topic." + expectedFileExtension; @@ -241,7 +254,7 @@ private void testBasicRecordsWritten( assertFileNamesValid(TEST_BUCKET_NAME, new ArrayList<>(expectedTopicFilenames)); // Now check that all files created by the sink have the contents that were sent // to the producer (they're all the same content) - assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, recordValueStruct)); + assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, recordValueStruct, useDefaultValues)); } private void testTombstoneRecordsWritten( @@ -350,7 +363,7 @@ public void testFaultyRecordsReportedToDLQ() throws Throwable { assertEquals(expectedDLQRecordCount, dlqRecords.count()); assertDLQRecordMessages(expectedErrors, dlqRecords); - assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, recordValueStruct)); + assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, recordValueStruct, true)); } /** @@ -424,10 +437,14 @@ private void produceRecords( } } - private void initializeJsonConverter() { + private void initializeJsonConverter(boolean replaceNullWithDefaults) { Map jsonConverterProps = new HashMap<>(); jsonConverterProps.put("schemas.enable", "true"); jsonConverterProps.put("converter.type", "value"); + if (!replaceNullWithDefaults) { + jsonConverterProps.put("replace.null.with.default", "false"); + } + jsonConverter = new JsonConverter(); jsonConverter.configure(jsonConverterProps); } diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkDataFormatIT.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkDataFormatIT.java index e5c64882b..343768a00 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkDataFormatIT.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkDataFormatIT.java @@ -15,33 +15,6 @@ package io.confluent.connect.s3.integration; -import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_BUCKET_CONFIG; -import static io.confluent.connect.storage.StorageSinkConnectorConfig.FLUSH_SIZE_CONFIG; -import static io.confluent.connect.storage.StorageSinkConnectorConfig.FORMAT_CLASS_CONFIG; -import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; -import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; -import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; -import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import io.confluent.connect.avro.AvroConverter; -import io.confluent.connect.json.JsonSchemaConverter; -import io.confluent.connect.protobuf.ProtobufConverter; -import io.confluent.connect.s3.S3SinkConnector; -import io.confluent.connect.s3.format.avro.AvroFormat; -import io.confluent.connect.s3.storage.S3Storage; -import io.confluent.connect.s3.util.EmbeddedConnectUtils; -import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -65,6 +38,35 @@ import org.slf4j.LoggerFactory; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import io.confluent.connect.avro.AvroConverter; +import io.confluent.connect.json.JsonSchemaConverter; +import io.confluent.connect.protobuf.ProtobufConverter; +import io.confluent.connect.s3.S3SinkConnector; +import io.confluent.connect.s3.format.avro.AvroFormat; +import io.confluent.connect.s3.storage.S3Storage; +import io.confluent.connect.s3.util.EmbeddedConnectUtils; + +import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_BUCKET_CONFIG; +import static io.confluent.connect.storage.StorageSinkConnectorConfig.FLUSH_SIZE_CONFIG; +import static io.confluent.connect.storage.StorageSinkConnectorConfig.FORMAT_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + @RunWith(Parameterized.class) @Category(IntegrationTest.class) public class S3SinkDataFormatIT extends BaseConnectorIT { @@ -196,7 +198,7 @@ public void testBasicRecordsWrittenAvro() throws Throwable { assertFileNamesValid(TEST_BUCKET_NAME, new ArrayList<>(expectedTopicFilenames)); // Now check that all files created by the sink have the contents that were sent // to the producer (they're all the same content) - assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, recordValueStruct)); + assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, recordValueStruct, true)); } protected void produceRecords(