Skip to content

Commit

Permalink
Allow null values to be set to null for jsonformat
Browse files Browse the repository at this point in the history
Description:

Using the JsonFormat to write "from" debezium to kafka and then using
the s3sinkconnector to read from kafka and save to s3, causes null
values to be stored always with their default values.

Therefore adding a new config property (for backwards compatibility) to
allow the value transformer inside the s3sinkconnector to be configured
correctly.

Tests for the configuration and and integration have been added as well.

This addresses confluentinc#716, but for json, instead of avro
  • Loading branch information
bjoernhaeuser committed May 16, 2024
1 parent b1efca8 commit 26c7356
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.SSEAlgorithm;
import io.confluent.connect.storage.common.util.StringUtils;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand Down Expand Up @@ -67,6 +66,7 @@
import io.confluent.connect.storage.common.GenericRecommender;
import io.confluent.connect.storage.common.ParentValueRecommender;
import io.confluent.connect.storage.common.StorageCommonConfig;
import io.confluent.connect.storage.common.util.StringUtils;
import io.confluent.connect.storage.format.Format;
import io.confluent.connect.storage.partitioner.DailyPartitioner;
import io.confluent.connect.storage.partitioner.DefaultPartitioner;
Expand Down Expand Up @@ -184,6 +184,13 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
+ " This value is case insensitive and can be either 'BASE64' (default) or 'NUMERIC'";
private static final String DECIMAL_FORMAT_DISPLAY = "Decimal Format";

public static final String REPLACE_NULL_WITH_DEFAULT_CONFIG = "json.replace.null.with.default";
public static final boolean REPLACE_NULL_WITH_DEFAULT_DEFAULT = true;
private static final String REPLACE_NULL_WITH_DEFAULT_DOC = "Whether to replace fields that"
+ " have a default value and that are null to the default value."
+ " When set to true, the default value is used, otherwise null is used.";
private static final String REPLACE_NULL_WITH_DEFAULT_DISPLAY = "Replace null with default";

public static final String STORE_KAFKA_KEYS_CONFIG = "store.kafka.keys";
public static final String STORE_KAFKA_HEADERS_CONFIG = "store.kafka.headers";
public static final String KEYS_FORMAT_CLASS_CONFIG = "keys.format.class";
Expand Down Expand Up @@ -300,11 +307,11 @@ public static ConfigDef newConfigDef() {
configDef.define(
S3_BUCKET_CONFIG,
Type.STRING,
Importance.HIGH,
ConfigDef.Importance.HIGH,
"The S3 Bucket.",
group,
++orderInGroup,
Width.LONG,
ConfigDef.Width.LONG,
"S3 Bucket"
);

Expand Down Expand Up @@ -537,6 +544,18 @@ public static ConfigDef newConfigDef() {
DECIMAL_FORMAT_DISPLAY
);

configDef.define(
REPLACE_NULL_WITH_DEFAULT_CONFIG,
Type.BOOLEAN,
REPLACE_NULL_WITH_DEFAULT_DEFAULT,
Importance.LOW,
REPLACE_NULL_WITH_DEFAULT_DOC,
group,
++orderInGroup,
Width.SHORT,
REPLACE_NULL_WITH_DEFAULT_DISPLAY
);

configDef.define(
S3_PART_RETRIES_CONFIG,
Type.INT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
package io.confluent.connect.s3.format.json;

import org.apache.kafka.connect.json.JsonConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.confluent.connect.s3.S3SinkConnectorConfig;
import io.confluent.connect.s3.storage.S3Storage;
Expand All @@ -46,6 +46,11 @@ public JsonFormat(S3Storage storage) {
"decimal.format",
String.valueOf(storage.conf().getJsonDecimalFormat())
);
converterConfig.put(
"replace.null.with.default",
storage.conf().getBoolean(S3SinkConnectorConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG)
);

this.converter.configure(converterConfig, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;

import io.confluent.connect.s3.format.bytearray.ByteArrayFormat;
import io.confluent.connect.s3.format.parquet.ParquetFormat;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -38,9 +34,12 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import io.confluent.connect.avro.AvroDataConfig;
import io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider;
import io.confluent.connect.s3.format.avro.AvroFormat;
import io.confluent.connect.s3.format.bytearray.ByteArrayFormat;
import io.confluent.connect.s3.format.json.JsonFormat;
import io.confluent.connect.s3.format.parquet.ParquetFormat;
import io.confluent.connect.s3.storage.S3Storage;
import io.confluent.connect.storage.common.StorageCommonConfig;
import io.confluent.connect.storage.partitioner.DailyPartitioner;
Expand All @@ -50,20 +49,16 @@
import io.confluent.connect.storage.partitioner.Partitioner;
import io.confluent.connect.storage.partitioner.PartitionerConfig;
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
import io.confluent.connect.avro.AvroDataConfig;

import static io.confluent.connect.s3.S3SinkConnectorConfig.AffixType;
import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_DEFAULT;
import static io.confluent.connect.s3.S3SinkConnectorConfig.HEADERS_FORMAT_CLASS_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.KEYS_FORMAT_CLASS_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.SCHEMA_PARTITION_AFFIX_TYPE_CONFIG;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

public class S3SinkConnectorConfigTest extends S3SinkConnectorTestBase {

Expand Down Expand Up @@ -458,6 +453,16 @@ public void testJsonDecimalFormat() {
assertEquals(DecimalFormat.NUMERIC.name(), connectorConfig.getJsonDecimalFormat());
}

@Test
public void testJsonReplaceNullWithDefault() {
connectorConfig = new S3SinkConnectorConfig(properties);
assertTrue(connectorConfig.getBoolean("json.replace.null.with.default"));

properties.put(S3SinkConnectorConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG, "false");
connectorConfig = new S3SinkConnectorConfig(properties);
assertFalse(connectorConfig.getBoolean("json.replace.null.with.default"));
}

@Test
public void testValidCompressionLevels() {
IntStream.range(-1, 9).boxed().forEach(i -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@

package io.confluent.connect.s3.integration;

import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG;
import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
Expand All @@ -29,27 +25,8 @@
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
import com.google.common.collect.ImmutableMap;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import io.confluent.kafka.schemaregistry.RestApp;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import io.confluent.connect.s3.util.S3Utils;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -79,6 +56,30 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import io.confluent.common.utils.IntegrationTest;
import io.confluent.connect.s3.util.S3Utils;
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import io.confluent.kafka.schemaregistry.RestApp;

import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG;
import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC;
import static org.assertj.core.api.Assertions.assertThat;

@Category(IntegrationTest.class)
Expand Down Expand Up @@ -332,15 +333,15 @@ protected Schema getSampleStructSchema() {
.field("myFloat32", Schema.FLOAT32_SCHEMA)
.field("myFloat64", Schema.FLOAT64_SCHEMA)
.field("myString", Schema.STRING_SCHEMA)
.field("withDefault", SchemaBuilder.bool().optional().defaultValue(true).build())
.build();
}

protected Struct getSampleStructVal(Schema structSchema) {
Date sampleDate = new Date(1111111);
sampleDate.setTime(0);
return new Struct(structSchema)
.put("ID", (long) 1)
.put("myBool", true)
.put("withDefault", null)
.put("myInt32", 32)
.put("myFloat32", 3.2f)
.put("myFloat64", 64.64)
Expand Down Expand Up @@ -409,12 +410,15 @@ protected static void clearBucket(String bucketName) {
* @param bucketName the name of the s3 test bucket
* @param expectedRowsPerFile the number of rows a file should have
* @param expectedRow the expected row data in each file
* @param useDefaultValues
*
* @return whether every row of the files read equals the expected row
*/
protected boolean fileContentsAsExpected(
String bucketName,
int expectedRowsPerFile,
Struct expectedRow
Struct expectedRow,
boolean useDefaultValues
) {
log.info("expectedRow: {}", expectedRow);
for (String fileName :
Expand All @@ -427,7 +431,7 @@ protected boolean fileContentsAsExpected(
String fileExtension = getExtensionFromKey(fileName);
List<JsonNode> downloadedFileContents = contentGetters.get(fileExtension)
.apply(destinationPath);
if (!fileContentsMatchExpected(downloadedFileContents, expectedRowsPerFile, expectedRow)) {
if (!fileContentsMatchExpected(downloadedFileContents, expectedRowsPerFile, expectedRow, useDefaultValues)) {
return false;
}
downloadedFile.delete();
Expand Down Expand Up @@ -481,20 +485,23 @@ protected boolean keyfileContentsAsExpected(
* @param fileContents the file contents as a list of JsonNodes
* @param expectedRowsPerFile the number of rows expected in the file
* @param expectedRow the expected values of each row
* @param useDefaultValues use default values from struct
*
* @return whether the file contents match the expected row
*/
protected boolean fileContentsMatchExpected(
List<JsonNode> fileContents,
int expectedRowsPerFile,
Struct expectedRow
Struct expectedRow,
boolean useDefaultValues
) {
if (fileContents.size() != expectedRowsPerFile) {
log.error("Number of rows in file do not match the expected count, actual: {}, expected: {}",
fileContents.size(), expectedRowsPerFile);
return false;
}
for (JsonNode row : fileContents) {
if (!fileRowMatchesExpectedRow(row, expectedRow)) {
if (!fileRowMatchesExpectedRow(row, expectedRow, useDefaultValues)) {
return false;
}
}
Expand All @@ -512,18 +519,34 @@ private List<String> getS3KeyFileList(List<S3ObjectSummary> summaries) {
/**
* Compare the row in the file and its values to the expected row's values.
*
* @param fileRow the row read from the file as a JsonNode
* @param expectedRow the expected contents of the row
* @param fileRow the row read from the file as a JsonNode
* @param expectedRow the expected contents of the row
* @param useDefaultValues
*
* @return whether the file row matches the expected row
*/
private boolean fileRowMatchesExpectedRow(JsonNode fileRow, Struct expectedRow) {
log.debug("Comparing rows: file: {}, expected: {}", fileRow, expectedRow);
private boolean fileRowMatchesExpectedRow(JsonNode fileRow, Struct expectedRow, boolean useDefaultValues) {
log.info("Comparing rows: file: {}, expected: {}", fileRow, expectedRow);
// compare the field values
for (Field key : expectedRow.schema().fields()) {
String expectedValue = expectedRow.get(key).toString();
String rowValue = fileRow.get(key.name()).toString().replaceAll("^\"|\"$", "");
log.debug("Comparing values: {}, {}", expectedValue, rowValue);
if (!rowValue.equals(expectedValue)) {
String expectedValue = null;
if (useDefaultValues) {
expectedValue = expectedRow.get(key).toString();
} else {
Object withoutDefault = expectedRow.getWithoutDefault(key.name());
if (withoutDefault != null) {
expectedValue = withoutDefault.toString();
}
}

JsonNode jsonValue = fileRow.get(key.name());
String rowValue = null;
if (!(jsonValue instanceof NullNode)) {
rowValue = jsonValue.toString().replaceAll("^\"|\"$", "");
}

log.info("Comparing values: {}, {}, {}, {}", key.name(), expectedValue, rowValue, Objects.equals(rowValue, expectedValue));
if (!Objects.equals(rowValue, expectedValue)) {
return false;
}
}
Expand Down
Loading

0 comments on commit 26c7356

Please sign in to comment.