From 736ca3039a2af3340e99d32ce6ae862cc1708b0c Mon Sep 17 00:00:00 2001 From: Hariprasad Kuppuswamy Date: Wed, 9 Sep 2020 17:32:24 +0200 Subject: [PATCH 1/2] Separting the Ability to specify scaling factor for timestamp column used in partitioning by configuration into another PR --- .../storage/common/StorageCommonConfig.java | 19 +++++++++++++++++++ .../partitioner/DefaultPartitioner.java | 8 +++++++- .../partitioner/DailyPartitionerTest.java | 1 + .../partitioner/DefaultPartitionerTest.java | 1 + .../partitioner/HourlyPartitionerTest.java | 3 ++- .../partitioner/TimeBasedPartitionerTest.java | 18 ++++++++++++++++++ 6 files changed, 48 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/io/confluent/connect/storage/common/StorageCommonConfig.java b/common/src/main/java/io/confluent/connect/storage/common/StorageCommonConfig.java index 131fb544e..a45e48943 100644 --- a/common/src/main/java/io/confluent/connect/storage/common/StorageCommonConfig.java +++ b/common/src/main/java/io/confluent/connect/storage/common/StorageCommonConfig.java @@ -46,6 +46,15 @@ public class StorageCommonConfig extends AbstractConfig implements ComposableCon public static final String DIRECTORY_DELIM_DEFAULT = "/"; public static final String DIRECTORY_DELIM_DISPLAY = "Directory Delimiter"; + public static final String PATH_INCLUDE_TOPICNAME_CONFIG = "path.include.topicname"; + public static final String PATH_INCLUDE_TOPICNAME_DOC = "Whether to append the topic" + + " name to the topics.dir parameter. If true the full path will be composed" + + " of topics.dir + delim + topic_name + delim + partitioner_path. If false" + + " topics.dir + delim + partitioner_path"; + public static final Boolean PATH_INCLUDE_TOPICNAME_DEFAULT = true; + public static final String PATH_INCLUDE_TOPICNAME_DISPLAY = + "Whether to append the topic name to the topics.dir parameter"; + public static final String FILE_DELIM_CONFIG = "file.delim"; public static final String FILE_DELIM_DOC = "File delimiter pattern"; public static final String FILE_DELIM_DEFAULT = "+"; @@ -125,6 +134,16 @@ public static ConfigDef newConfigDef(ConfigDef.Recommender storageClassRecommend Width.LONG, FILE_DELIM_DISPLAY ); + + configDef.define(PATH_INCLUDE_TOPICNAME_CONFIG, + Type.BOOLEAN, + PATH_INCLUDE_TOPICNAME_DEFAULT, + Importance.MEDIUM, + PATH_INCLUDE_TOPICNAME_DOC, + group, + ++orderInGroup, + Width.SHORT, + PATH_INCLUDE_TOPICNAME_DISPLAY); } return configDef; } diff --git a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/DefaultPartitioner.java b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/DefaultPartitioner.java index 3c8f23528..e8f4ed6e1 100644 --- a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/DefaultPartitioner.java +++ b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/DefaultPartitioner.java @@ -37,11 +37,13 @@ public class DefaultPartitioner implements Partitioner { protected Map config; protected List partitionFields = null; protected String delim; + protected boolean includeTopicInPath; @Override public void configure(Map config) { this.config = config; delim = (String) config.get(StorageCommonConfig.DIRECTORY_DELIM_CONFIG); + includeTopicInPath = (boolean) config.get(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG); } @Override @@ -51,7 +53,11 @@ public String encodePartition(SinkRecord sinkRecord) { @Override public String generatePartitionedPath(String topic, String encodedPartition) { - return topic + delim + encodedPartition; + if (includeTopicInPath) { + return topic + delim + encodedPartition; + } else { + return encodedPartition; + } } @Override diff --git a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DailyPartitionerTest.java b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DailyPartitionerTest.java index 7c3025bb0..0cf2a5204 100644 --- a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DailyPartitionerTest.java +++ b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DailyPartitionerTest.java @@ -37,6 +37,7 @@ public class DailyPartitionerTest extends StorageSinkTestBase { public void testDailyPartitioner() { Map config = new HashMap<>(); config.put(StorageCommonConfig.DIRECTORY_DELIM_CONFIG, StorageCommonConfig.DIRECTORY_DELIM_DEFAULT); + config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT); config.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "Record"); config.put(PartitionerConfig.LOCALE_CONFIG, Locale.US.toString()); config.put(PartitionerConfig.TIMEZONE_CONFIG, DATE_TIME_ZONE.toString()); diff --git a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DefaultPartitionerTest.java b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DefaultPartitionerTest.java index 39dac993f..b75a655d9 100644 --- a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DefaultPartitionerTest.java +++ b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DefaultPartitionerTest.java @@ -35,6 +35,7 @@ public class DefaultPartitionerTest extends StorageSinkTestBase { public void testDefaultPartitioner() { Map config = new HashMap<>(); config.put(StorageCommonConfig.DIRECTORY_DELIM_CONFIG, StorageCommonConfig.DIRECTORY_DELIM_DEFAULT); + config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT); DefaultPartitioner partitioner = new DefaultPartitioner<>(); partitioner.configure(config); diff --git a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/HourlyPartitionerTest.java b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/HourlyPartitionerTest.java index 563032238..91f21d3e5 100644 --- a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/HourlyPartitionerTest.java +++ b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/HourlyPartitionerTest.java @@ -37,6 +37,7 @@ public class HourlyPartitionerTest extends StorageSinkTestBase { public void testHourlyPartitioner() { Map config = new HashMap<>(); config.put(StorageCommonConfig.DIRECTORY_DELIM_CONFIG, StorageCommonConfig.DIRECTORY_DELIM_DEFAULT); + config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT); config.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "Record"); config.put(PartitionerConfig.LOCALE_CONFIG, Locale.US.toString()); config.put(PartitionerConfig.TIMEZONE_CONFIG, DATE_TIME_ZONE.toString()); @@ -61,4 +62,4 @@ public void testHourlyPartitioner() { assertThat(encodedPartition, is(generateEncodedPartitionFromMap(m))); } -} \ No newline at end of file +} diff --git a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java index 9fa9625ec..1c95c6d9e 100644 --- a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java +++ b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java @@ -110,6 +110,21 @@ public void testGeneratePartitionedPath() throws Exception { assertEquals(topic+"/year=2015/month=4/day=2/hour=0/", path); } + @Test + public void testGeneratePartitionedPathWithoutTopicName() throws Exception { + Map config = createConfig(null); + config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, false); + + BiHourlyPartitioner partitioner = (BiHourlyPartitioner) configurePartitioner( + new BiHourlyPartitioner(), null, config); + + SinkRecord sinkRecord = getSinkRecord(); + String encodedPartition = partitioner.encodePartition(sinkRecord); + final String topic = "topic"; + String path = partitioner.generatePartitionedPath(topic, encodedPartition); + assertEquals("year=2015/month=4/day=2/hour=0/", path); + } + @Test public void testInvalidPathFormat() { final String configKey = PartitionerConfig.PATH_FORMAT_CONFIG; @@ -685,8 +700,11 @@ private Map createConfig(String timeFieldName) { config.put(PartitionerConfig.PATH_FORMAT_CONFIG, PATH_FORMAT); config.put(PartitionerConfig.LOCALE_CONFIG, Locale.US.toString()); config.put(PartitionerConfig.TIMEZONE_CONFIG, DATE_TIME_ZONE.toString()); + config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT); if (timeFieldName != null) { config.put(PartitionerConfig.TIMESTAMP_FIELD_NAME_CONFIG, timeFieldName); + config.put(PartitionerConfig.TIMESTAMP_SCALING_FACTOR_CONFIG, PartitionerConfig.TIMESTAMP_SCALING_FACTOR_DEFAULT); + config.put(PartitionerConfig.TIMESTAMP_SCALING_OPERATION_CONFIG, PartitionerConfig.TIMESTAMP_SCALING_OPERATION_DEFAULT); } return config; } From 64c56d99eaba0c3e9da0305ebf3c04fa7eb675bf Mon Sep 17 00:00:00 2001 From: Hariprasad Kuppuswamy Date: Wed, 9 Sep 2020 17:46:48 +0200 Subject: [PATCH 2/2] Removed the timestamp scaling factor config references in unit tests --- .../connect/storage/partitioner/TimeBasedPartitionerTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java index 1c95c6d9e..b24be2678 100644 --- a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java +++ b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java @@ -703,8 +703,6 @@ private Map createConfig(String timeFieldName) { config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT); if (timeFieldName != null) { config.put(PartitionerConfig.TIMESTAMP_FIELD_NAME_CONFIG, timeFieldName); - config.put(PartitionerConfig.TIMESTAMP_SCALING_FACTOR_CONFIG, PartitionerConfig.TIMESTAMP_SCALING_FACTOR_DEFAULT); - config.put(PartitionerConfig.TIMESTAMP_SCALING_OPERATION_CONFIG, PartitionerConfig.TIMESTAMP_SCALING_OPERATION_DEFAULT); } return config; }