diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index c56a89c35..8f5552742 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -24,7 +24,6 @@ import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.SSEAlgorithm; -import io.confluent.connect.storage.common.util.StringUtils; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -67,6 +66,7 @@ import io.confluent.connect.storage.common.GenericRecommender; import io.confluent.connect.storage.common.ParentValueRecommender; import io.confluent.connect.storage.common.StorageCommonConfig; +import io.confluent.connect.storage.common.util.StringUtils; import io.confluent.connect.storage.format.Format; import io.confluent.connect.storage.partitioner.DailyPartitioner; import io.confluent.connect.storage.partitioner.DefaultPartitioner; @@ -184,6 +184,13 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { + " This value is case insensitive and can be either 'BASE64' (default) or 'NUMERIC'"; private static final String DECIMAL_FORMAT_DISPLAY = "Decimal Format"; + public static final String REPLACE_NULL_WITH_DEFAULT_CONFIG = "json.replace.null.with.default"; + public static final boolean REPLACE_NULL_WITH_DEFAULT_DEFAULT = true; + private static final String REPLACE_NULL_WITH_DEFAULT_DOC = "Whether to replace fields that" + + " have a default value and that are null to the default value." + + " When set to true, the default value is used, otherwise null is used."; + private static final String REPLACE_NULL_WITH_DEFAULT_DISPLAY = "Replace null with default"; + public static final String STORE_KAFKA_KEYS_CONFIG = "store.kafka.keys"; public static final String STORE_KAFKA_HEADERS_CONFIG = "store.kafka.headers"; public static final String KEYS_FORMAT_CLASS_CONFIG = "keys.format.class"; @@ -300,11 +307,11 @@ public static ConfigDef newConfigDef() { configDef.define( S3_BUCKET_CONFIG, Type.STRING, - Importance.HIGH, + ConfigDef.Importance.HIGH, "The S3 Bucket.", group, ++orderInGroup, - Width.LONG, + ConfigDef.Width.LONG, "S3 Bucket" ); @@ -537,6 +544,18 @@ public static ConfigDef newConfigDef() { DECIMAL_FORMAT_DISPLAY ); + configDef.define( + REPLACE_NULL_WITH_DEFAULT_CONFIG, + Type.BOOLEAN, + REPLACE_NULL_WITH_DEFAULT_DEFAULT, + Importance.LOW, + REPLACE_NULL_WITH_DEFAULT_DOC, + group, + ++orderInGroup, + Width.SHORT, + REPLACE_NULL_WITH_DEFAULT_DISPLAY + ); + configDef.define( S3_PART_RETRIES_CONFIG, Type.INT, diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonFormat.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonFormat.java index e4175c634..f5eaff890 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonFormat.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonFormat.java @@ -16,11 +16,11 @@ package io.confluent.connect.s3.format.json; import org.apache.kafka.connect.json.JsonConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import io.confluent.connect.s3.S3SinkConnectorConfig; import io.confluent.connect.s3.storage.S3Storage; @@ -46,6 +46,11 @@ public JsonFormat(S3Storage storage) { "decimal.format", String.valueOf(storage.conf().getJsonDecimalFormat()) ); + converterConfig.put( + "replace.null.with.default", + storage.conf().getBoolean(S3SinkConnectorConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG) + ); + this.converter.configure(converterConfig, false); } diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java index 1d346b644..7471aa04d 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java @@ -17,16 +17,27 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; - +import io.confluent.connect.avro.AvroDataConfig; +import io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider; +import io.confluent.connect.s3.format.avro.AvroFormat; import io.confluent.connect.s3.format.bytearray.ByteArrayFormat; +import io.confluent.connect.s3.format.json.JsonFormat; import io.confluent.connect.s3.format.parquet.ParquetFormat; - +import io.confluent.connect.s3.storage.S3Storage; +import io.confluent.connect.storage.common.StorageCommonConfig; +import io.confluent.connect.storage.partitioner.DailyPartitioner; +import io.confluent.connect.storage.partitioner.DefaultPartitioner; +import io.confluent.connect.storage.partitioner.FieldPartitioner; +import io.confluent.connect.storage.partitioner.HourlyPartitioner; +import io.confluent.connect.storage.partitioner.Partitioner; +import io.confluent.connect.storage.partitioner.PartitionerConfig; +import io.confluent.connect.storage.partitioner.TimeBasedPartitioner; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.json.DecimalFormat; import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.After; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -38,32 +49,15 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider; -import io.confluent.connect.s3.format.avro.AvroFormat; -import io.confluent.connect.s3.format.json.JsonFormat; -import io.confluent.connect.s3.storage.S3Storage; -import io.confluent.connect.storage.common.StorageCommonConfig; -import io.confluent.connect.storage.partitioner.DailyPartitioner; -import io.confluent.connect.storage.partitioner.DefaultPartitioner; -import io.confluent.connect.storage.partitioner.FieldPartitioner; -import io.confluent.connect.storage.partitioner.HourlyPartitioner; -import io.confluent.connect.storage.partitioner.Partitioner; -import io.confluent.connect.storage.partitioner.PartitionerConfig; -import io.confluent.connect.storage.partitioner.TimeBasedPartitioner; -import io.confluent.connect.avro.AvroDataConfig; - import static io.confluent.connect.s3.S3SinkConnectorConfig.AffixType; -import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_CONFIG; -import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_DEFAULT; import static io.confluent.connect.s3.S3SinkConnectorConfig.HEADERS_FORMAT_CLASS_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.KEYS_FORMAT_CLASS_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.SCHEMA_PARTITION_AFFIX_TYPE_CONFIG; - import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; public class S3SinkConnectorConfigTest extends S3SinkConnectorTestBase { @@ -458,6 +452,16 @@ public void testJsonDecimalFormat() { assertEquals(DecimalFormat.NUMERIC.name(), connectorConfig.getJsonDecimalFormat()); } + @Test + public void testJsonReplaceNullWithDefault() { + connectorConfig = new S3SinkConnectorConfig(properties); + assertTrue(connectorConfig.getBoolean("json.replace.null.with.default")); + + properties.put(S3SinkConnectorConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG, "false"); + connectorConfig = new S3SinkConnectorConfig(properties); + assertFalse(connectorConfig.getBoolean("json.replace.null.with.default")); + } + @Test public void testValidCompressionLevels() { IntStream.range(-1, 9).boxed().forEach(i -> { diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/BaseConnectorIT.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/BaseConnectorIT.java index 4351ddff3..c1d7243d0 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/BaseConnectorIT.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/BaseConnectorIT.java @@ -15,10 +15,6 @@ package io.confluent.connect.s3.integration; -import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG; -import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG; -import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC; - import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.s3.AmazonS3; @@ -29,27 +25,12 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.NullNode; import com.google.common.collect.ImmutableMap; import io.confluent.common.utils.IntegrationTest; +import io.confluent.connect.s3.util.S3Utils; import io.confluent.kafka.schemaregistry.CompatibilityLevel; import io.confluent.kafka.schemaregistry.RestApp; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.net.ServerSocket; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import io.confluent.connect.s3.util.S3Utils; -import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; @@ -79,6 +60,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG; +import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC; import static org.assertj.core.api.Assertions.assertThat; @Category(IntegrationTest.class) @@ -332,15 +332,15 @@ protected Schema getSampleStructSchema() { .field("myFloat32", Schema.FLOAT32_SCHEMA) .field("myFloat64", Schema.FLOAT64_SCHEMA) .field("myString", Schema.STRING_SCHEMA) + .field("withDefault", SchemaBuilder.bool().optional().defaultValue(true).build()) .build(); } protected Struct getSampleStructVal(Schema structSchema) { - Date sampleDate = new Date(1111111); - sampleDate.setTime(0); return new Struct(structSchema) .put("ID", (long) 1) .put("myBool", true) + .put("withDefault", null) .put("myInt32", 32) .put("myFloat32", 3.2f) .put("myFloat64", 64.64) @@ -409,12 +409,15 @@ protected static void clearBucket(String bucketName) { * @param bucketName the name of the s3 test bucket * @param expectedRowsPerFile the number of rows a file should have * @param expectedRow the expected row data in each file + * @param useDefaultValues + * * @return whether every row of the files read equals the expected row */ protected boolean fileContentsAsExpected( String bucketName, int expectedRowsPerFile, - Struct expectedRow + Struct expectedRow, + boolean useDefaultValues ) { log.info("expectedRow: {}", expectedRow); for (String fileName : @@ -427,7 +430,7 @@ protected boolean fileContentsAsExpected( String fileExtension = getExtensionFromKey(fileName); List downloadedFileContents = contentGetters.get(fileExtension) .apply(destinationPath); - if (!fileContentsMatchExpected(downloadedFileContents, expectedRowsPerFile, expectedRow)) { + if (!fileContentsMatchExpected(downloadedFileContents, expectedRowsPerFile, expectedRow, useDefaultValues)) { return false; } downloadedFile.delete(); @@ -481,12 +484,15 @@ protected boolean keyfileContentsAsExpected( * @param fileContents the file contents as a list of JsonNodes * @param expectedRowsPerFile the number of rows expected in the file * @param expectedRow the expected values of each row + * @param useDefaultValues use default values from struct + * * @return whether the file contents match the expected row */ protected boolean fileContentsMatchExpected( List fileContents, int expectedRowsPerFile, - Struct expectedRow + Struct expectedRow, + boolean useDefaultValues ) { if (fileContents.size() != expectedRowsPerFile) { log.error("Number of rows in file do not match the expected count, actual: {}, expected: {}", @@ -494,7 +500,7 @@ protected boolean fileContentsMatchExpected( return false; } for (JsonNode row : fileContents) { - if (!fileRowMatchesExpectedRow(row, expectedRow)) { + if (!fileRowMatchesExpectedRow(row, expectedRow, useDefaultValues)) { return false; } } @@ -512,18 +518,34 @@ private List getS3KeyFileList(List summaries) { /** * Compare the row in the file and its values to the expected row's values. * - * @param fileRow the row read from the file as a JsonNode - * @param expectedRow the expected contents of the row + * @param fileRow the row read from the file as a JsonNode + * @param expectedRow the expected contents of the row + * @param useDefaultValues + * * @return whether the file row matches the expected row */ - private boolean fileRowMatchesExpectedRow(JsonNode fileRow, Struct expectedRow) { - log.debug("Comparing rows: file: {}, expected: {}", fileRow, expectedRow); + private boolean fileRowMatchesExpectedRow(JsonNode fileRow, Struct expectedRow, boolean useDefaultValues) { + log.info("Comparing rows: file: {}, expected: {}", fileRow, expectedRow); // compare the field values for (Field key : expectedRow.schema().fields()) { - String expectedValue = expectedRow.get(key).toString(); - String rowValue = fileRow.get(key.name()).toString().replaceAll("^\"|\"$", ""); - log.debug("Comparing values: {}, {}", expectedValue, rowValue); - if (!rowValue.equals(expectedValue)) { + String expectedValue = null; + if (useDefaultValues) { + expectedValue = expectedRow.get(key).toString(); + } else { + Object withoutDefault = expectedRow.getWithoutDefault(key.name()); + if (withoutDefault != null) { + expectedValue = withoutDefault.toString(); + } + } + + JsonNode jsonValue = fileRow.get(key.name()); + String rowValue = null; + if (!(jsonValue instanceof NullNode)) { + rowValue = jsonValue.toString().replaceAll("^\"|\"$", ""); + } + + log.info("Comparing values: {}, {}, {}, {}", key.name(), expectedValue, rowValue, Objects.equals(rowValue, expectedValue)); + if (!Objects.equals(rowValue, expectedValue)) { return false; } } diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkConnectorIT.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkConnectorIT.java index 24b6296b4..22b6062f9 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkConnectorIT.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkConnectorIT.java @@ -17,11 +17,10 @@ import static io.confluent.connect.s3.S3SinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.KEYS_FORMAT_CLASS_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_BUCKET_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.STORE_KAFKA_HEADERS_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.STORE_KAFKA_KEYS_CONFIG; -import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG; -import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.TOMBSTONE_ENCODED_PARTITION; import static io.confluent.connect.storage.StorageSinkConnectorConfig.FLUSH_SIZE_CONFIG; import static io.confluent.connect.storage.StorageSinkConnectorConfig.FORMAT_CLASS_CONFIG; @@ -32,7 +31,6 @@ import static org.hamcrest.core.StringStartsWith.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - import io.confluent.connect.s3.S3SinkConnector; import io.confluent.connect.s3.S3SinkConnectorConfig.IgnoreOrFailBehavior; import io.confluent.connect.s3.S3SinkConnectorConfig.OutputWriteBehavior; @@ -40,6 +38,7 @@ import io.confluent.connect.s3.format.json.JsonFormat; import io.confluent.connect.s3.format.parquet.ParquetFormat; import io.confluent.connect.s3.storage.S3Storage; +import io.confluent.connect.s3.util.EmbeddedConnectUtils; import java.io.File; import java.util.ArrayList; import java.util.Arrays; @@ -51,8 +50,6 @@ import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; - -import io.confluent.connect.s3.util.EmbeddedConnectUtils; import org.apache.commons.io.FileUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -60,10 +57,10 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.common.header.Header; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.sink.SinkRecord; @@ -99,7 +96,7 @@ public class S3SinkConnectorIT extends BaseConnectorIT { @Before public void before() throws InterruptedException { - initializeJsonConverter(); + initializeJsonConverter(true); initializeCustomProducer(); setupProperties(); waitForSchemaRegistryToStart(); @@ -127,21 +124,33 @@ public void after() throws Exception { public void testBasicRecordsWrittenAvro() throws Throwable { //add test specific props props.put(FORMAT_CLASS_CONFIG, AvroFormat.class.getName()); - testBasicRecordsWritten(AVRO_EXTENSION, false); + testBasicRecordsWritten(AVRO_EXTENSION, false, true); } @Test public void testBasicRecordsWrittenParquet() throws Throwable { //add test specific props props.put(FORMAT_CLASS_CONFIG, ParquetFormat.class.getName()); - testBasicRecordsWritten(PARQUET_EXTENSION, false); + testBasicRecordsWritten(PARQUET_EXTENSION, false, true); } @Test public void testBasicRecordsWrittenJson() throws Throwable { //add test specific props props.put(FORMAT_CLASS_CONFIG, JsonFormat.class.getName()); - testBasicRecordsWritten(JSON_EXTENSION, false); + testBasicRecordsWritten(JSON_EXTENSION, false, true); + } + + @Test + public void testBasicRecordsWrittenJsonWithoutDefaults() throws Throwable { + initializeJsonConverter(false); + + //add test specific props + props.put(FORMAT_CLASS_CONFIG, JsonFormat.class.getName()); + props.put(REPLACE_NULL_WITH_DEFAULT_CONFIG, "false"); + props.put("value.converter.replace.null.with.default", "false"); + props.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + testBasicRecordsWritten(JSON_EXTENSION, false, false); } @Test @@ -155,38 +164,42 @@ public void testTombstoneRecordsWrittenJson() throws Throwable { testTombstoneRecordsWritten(JSON_EXTENSION, false); } - + @Test public void testFilesWrittenToBucketAvroWithExtInTopic() throws Throwable { //add test specific props props.put(FORMAT_CLASS_CONFIG, AvroFormat.class.getName()); - testBasicRecordsWritten(AVRO_EXTENSION, true); + testBasicRecordsWritten(AVRO_EXTENSION, true, true); } @Test public void testFilesWrittenToBucketParquetWithExtInTopic() throws Throwable { //add test specific props props.put(FORMAT_CLASS_CONFIG, ParquetFormat.class.getName()); - testBasicRecordsWritten(PARQUET_EXTENSION, true); + testBasicRecordsWritten(PARQUET_EXTENSION, true, true); } @Test public void testFilesWrittenToBucketJsonWithExtInTopic() throws Throwable { //add test specific props props.put(FORMAT_CLASS_CONFIG, JsonFormat.class.getName()); - testBasicRecordsWritten(JSON_EXTENSION, true); + testBasicRecordsWritten(JSON_EXTENSION, true, true); } /** * Test that the expected records are written for a given file extension * Optionally, test that topics which have "*.{expectedFileExtension}*" in them are processed * and written. + * * @param expectedFileExtension The file extension to test against - * @param addExtensionInTopic Add a topic to to the test which contains the extension + * @param addExtensionInTopic Add a topic to the test which contains the extension + * @param useDefaultValues Should default values be used or null values + * * @throws Throwable */ private void testBasicRecordsWritten( String expectedFileExtension, - boolean addExtensionInTopic + boolean addExtensionInTopic, + boolean useDefaultValues ) throws Throwable { final String topicNameWithExt = "other." + expectedFileExtension + ".topic." + expectedFileExtension; @@ -241,7 +254,7 @@ private void testBasicRecordsWritten( assertFileNamesValid(TEST_BUCKET_NAME, new ArrayList<>(expectedTopicFilenames)); // Now check that all files created by the sink have the contents that were sent // to the producer (they're all the same content) - assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, recordValueStruct)); + assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, recordValueStruct, useDefaultValues)); } private void testTombstoneRecordsWritten( @@ -350,7 +363,7 @@ public void testFaultyRecordsReportedToDLQ() throws Throwable { assertEquals(expectedDLQRecordCount, dlqRecords.count()); assertDLQRecordMessages(expectedErrors, dlqRecords); - assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, recordValueStruct)); + assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, recordValueStruct, true)); } /** @@ -424,10 +437,14 @@ private void produceRecords( } } - private void initializeJsonConverter() { + private void initializeJsonConverter(boolean replaceNullWithDefaults) { Map jsonConverterProps = new HashMap<>(); jsonConverterProps.put("schemas.enable", "true"); jsonConverterProps.put("converter.type", "value"); + if (!replaceNullWithDefaults) { + jsonConverterProps.put("replace.null.with.default", "false"); + } + jsonConverter = new JsonConverter(); jsonConverter.configure(jsonConverterProps); } diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkDataFormatIT.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkDataFormatIT.java index e5c64882b..343768a00 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkDataFormatIT.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/integration/S3SinkDataFormatIT.java @@ -15,33 +15,6 @@ package io.confluent.connect.s3.integration; -import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_BUCKET_CONFIG; -import static io.confluent.connect.storage.StorageSinkConnectorConfig.FLUSH_SIZE_CONFIG; -import static io.confluent.connect.storage.StorageSinkConnectorConfig.FORMAT_CLASS_CONFIG; -import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; -import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; -import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; -import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import io.confluent.connect.avro.AvroConverter; -import io.confluent.connect.json.JsonSchemaConverter; -import io.confluent.connect.protobuf.ProtobufConverter; -import io.confluent.connect.s3.S3SinkConnector; -import io.confluent.connect.s3.format.avro.AvroFormat; -import io.confluent.connect.s3.storage.S3Storage; -import io.confluent.connect.s3.util.EmbeddedConnectUtils; -import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -65,6 +38,35 @@ import org.slf4j.LoggerFactory; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import io.confluent.connect.avro.AvroConverter; +import io.confluent.connect.json.JsonSchemaConverter; +import io.confluent.connect.protobuf.ProtobufConverter; +import io.confluent.connect.s3.S3SinkConnector; +import io.confluent.connect.s3.format.avro.AvroFormat; +import io.confluent.connect.s3.storage.S3Storage; +import io.confluent.connect.s3.util.EmbeddedConnectUtils; + +import static io.confluent.connect.s3.S3SinkConnectorConfig.S3_BUCKET_CONFIG; +import static io.confluent.connect.storage.StorageSinkConnectorConfig.FLUSH_SIZE_CONFIG; +import static io.confluent.connect.storage.StorageSinkConnectorConfig.FORMAT_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + @RunWith(Parameterized.class) @Category(IntegrationTest.class) public class S3SinkDataFormatIT extends BaseConnectorIT { @@ -196,7 +198,7 @@ public void testBasicRecordsWrittenAvro() throws Throwable { assertFileNamesValid(TEST_BUCKET_NAME, new ArrayList<>(expectedTopicFilenames)); // Now check that all files created by the sink have the contents that were sent // to the producer (they're all the same content) - assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, recordValueStruct)); + assertTrue(fileContentsAsExpected(TEST_BUCKET_NAME, FLUSH_SIZE_STANDARD, recordValueStruct, true)); } protected void produceRecords(