diff --git a/common/src/main/java/io/confluent/connect/storage/common/StorageCommonConfig.java b/common/src/main/java/io/confluent/connect/storage/common/StorageCommonConfig.java index 131fb544e..a45e48943 100644 --- a/common/src/main/java/io/confluent/connect/storage/common/StorageCommonConfig.java +++ b/common/src/main/java/io/confluent/connect/storage/common/StorageCommonConfig.java @@ -46,6 +46,15 @@ public class StorageCommonConfig extends AbstractConfig implements ComposableCon public static final String DIRECTORY_DELIM_DEFAULT = "/"; public static final String DIRECTORY_DELIM_DISPLAY = "Directory Delimiter"; + public static final String PATH_INCLUDE_TOPICNAME_CONFIG = "path.include.topicname"; + public static final String PATH_INCLUDE_TOPICNAME_DOC = "Whether to append the topic" + + " name to the topics.dir parameter. If true the full path will be composed" + + " of topics.dir + delim + topic_name + delim + partitioner_path. If false" + + " topics.dir + delim + partitioner_path"; + public static final Boolean PATH_INCLUDE_TOPICNAME_DEFAULT = true; + public static final String PATH_INCLUDE_TOPICNAME_DISPLAY = + "Whether to append the topic name to the topics.dir parameter"; + public static final String FILE_DELIM_CONFIG = "file.delim"; public static final String FILE_DELIM_DOC = "File delimiter pattern"; public static final String FILE_DELIM_DEFAULT = "+"; @@ -125,6 +134,16 @@ public static ConfigDef newConfigDef(ConfigDef.Recommender storageClassRecommend Width.LONG, FILE_DELIM_DISPLAY ); + + configDef.define(PATH_INCLUDE_TOPICNAME_CONFIG, + Type.BOOLEAN, + PATH_INCLUDE_TOPICNAME_DEFAULT, + Importance.MEDIUM, + PATH_INCLUDE_TOPICNAME_DOC, + group, + ++orderInGroup, + Width.SHORT, + PATH_INCLUDE_TOPICNAME_DISPLAY); } return configDef; } diff --git a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/DefaultPartitioner.java b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/DefaultPartitioner.java index 3c8f23528..e8f4ed6e1 100644 --- a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/DefaultPartitioner.java +++ b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/DefaultPartitioner.java @@ -37,11 +37,13 @@ public class DefaultPartitioner implements Partitioner { protected Map config; protected List partitionFields = null; protected String delim; + protected boolean includeTopicInPath; @Override public void configure(Map config) { this.config = config; delim = (String) config.get(StorageCommonConfig.DIRECTORY_DELIM_CONFIG); + includeTopicInPath = (boolean) config.get(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG); } @Override @@ -51,7 +53,11 @@ public String encodePartition(SinkRecord sinkRecord) { @Override public String generatePartitionedPath(String topic, String encodedPartition) { - return topic + delim + encodedPartition; + if (includeTopicInPath) { + return topic + delim + encodedPartition; + } else { + return encodedPartition; + } } @Override diff --git a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/PartitionerConfig.java b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/PartitionerConfig.java index ba592b27a..fb39c5600 100644 --- a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/PartitionerConfig.java +++ b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/PartitionerConfig.java @@ -101,6 +101,20 @@ public class PartitionerConfig extends AbstractConfig implements ComposableConfi public static final String TIMESTAMP_FIELD_NAME_DEFAULT = "timestamp"; public static final String TIMESTAMP_FIELD_NAME_DISPLAY = "Record Field for Timestamp Extractor"; + public static final String TIMESTAMP_SCALING_FACTOR_CONFIG = "timestamp.scaling.factor"; + public static final String TIMESTAMP_SCALING_FACTOR_DOC = + "The scaling factor to be applied to the timestamp by the timestamp extractor."; + public static final long TIMESTAMP_SCALING_FACTOR_DEFAULT = 1L; + public static final String TIMESTAMP_SCALING_FACTOR_DISPLAY = + "Timestamp scaling factor for Timestamp Extractor"; + + public static final String TIMESTAMP_SCALING_OPERATION_CONFIG = "timestamp.scaling.operation"; + public static final String TIMESTAMP_SCALING_OPERATION_DOC = + "The scaling operation to be applied to the timestamp by the timestamp extractor."; + public static final String TIMESTAMP_SCALING_OPERATION_DEFAULT = "Division"; + public static final String TIMESTAMP_SCALING_OPERATION_DISPLAY = + "Timestamp scaling operation for Timestamp Extractor"; + /** * Create a new configuration definition. * @@ -208,6 +222,26 @@ public static ConfigDef newConfigDef(ConfigDef.Recommender partitionerClassRecom ++orderInGroup, Width.LONG, TIMESTAMP_FIELD_NAME_DISPLAY); + + configDef.define(TIMESTAMP_SCALING_FACTOR_CONFIG, + Type.LONG, + TIMESTAMP_SCALING_FACTOR_DEFAULT, + Importance.MEDIUM, + TIMESTAMP_SCALING_FACTOR_DOC, + group, + ++orderInGroup, + Width.LONG, + TIMESTAMP_SCALING_FACTOR_DISPLAY); + + configDef.define(TIMESTAMP_SCALING_OPERATION_CONFIG, + Type.STRING, + TIMESTAMP_SCALING_OPERATION_DEFAULT, + Importance.MEDIUM, + TIMESTAMP_SCALING_OPERATION_DOC, + group, + ++orderInGroup, + Width.LONG, + TIMESTAMP_SCALING_OPERATION_DISPLAY); } return configDef; @@ -215,11 +249,11 @@ public static ConfigDef newConfigDef(ConfigDef.Recommender partitionerClassRecom public static class BooleanParentRecommender implements ConfigDef.Recommender { protected final String parentConfigName; - + public BooleanParentRecommender(String parentConfigName) { this.parentConfigName = parentConfigName; } - + @Override public List validValues(String name, Map connectorConfigs) { return new LinkedList<>(); diff --git a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/TimeBasedPartitioner.java b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/TimeBasedPartitioner.java index e1bba1cfe..25481c489 100644 --- a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/TimeBasedPartitioner.java +++ b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/TimeBasedPartitioner.java @@ -266,11 +266,31 @@ public Long extract(ConnectRecord record) { public static class RecordFieldTimestampExtractor implements TimestampExtractor { private String fieldName; private DateTimeFormatter dateTime; + private long timestampScalingFactor; + private String timestampScalingOperation; @Override public void configure(Map config) { fieldName = (String) config.get(PartitionerConfig.TIMESTAMP_FIELD_NAME_CONFIG); dateTime = ISODateTimeFormat.dateTimeParser(); + timestampScalingFactor = (long) config.get(PartitionerConfig.TIMESTAMP_SCALING_FACTOR_CONFIG); + timestampScalingOperation = + (String) config.get(PartitionerConfig.TIMESTAMP_SCALING_OPERATION_CONFIG); + } + + public Long scalingTimestamp( + long timestamp, String timestampScalingOperation, long timestampScalingFactor) { + switch (timestampScalingOperation) { + case "Division": + return timestamp / timestampScalingFactor; + case "Multiplication": + return timestamp * timestampScalingFactor; + default: + log.error( + "Timestamp scaling operation '{}' is not recognized (timestamp remains unscaled).", + timestampScalingOperation); + return timestamp; + } } @Override @@ -288,7 +308,8 @@ public Long extract(ConnectRecord record) { switch (fieldSchema.type()) { case INT32: case INT64: - return ((Number) timestampValue).longValue(); + return scalingTimestamp(((Number) timestampValue).longValue(), + timestampScalingOperation, timestampScalingFactor); case STRING: return dateTime.parseMillis((String) timestampValue); default: @@ -304,7 +325,8 @@ public Long extract(ConnectRecord record) { Map map = (Map) value; Object timestampValue = DataUtils.getNestedFieldValue(map, fieldName); if (timestampValue instanceof Number) { - return ((Number) timestampValue).longValue(); + return scalingTimestamp(((Number) timestampValue).longValue(), + timestampScalingOperation, timestampScalingFactor); } else if (timestampValue instanceof String) { return dateTime.parseMillis((String) timestampValue); } else if (timestampValue instanceof Date) { diff --git a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DailyPartitionerTest.java b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DailyPartitionerTest.java index 7c3025bb0..0cf2a5204 100644 --- a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DailyPartitionerTest.java +++ b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DailyPartitionerTest.java @@ -37,6 +37,7 @@ public class DailyPartitionerTest extends StorageSinkTestBase { public void testDailyPartitioner() { Map config = new HashMap<>(); config.put(StorageCommonConfig.DIRECTORY_DELIM_CONFIG, StorageCommonConfig.DIRECTORY_DELIM_DEFAULT); + config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT); config.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "Record"); config.put(PartitionerConfig.LOCALE_CONFIG, Locale.US.toString()); config.put(PartitionerConfig.TIMEZONE_CONFIG, DATE_TIME_ZONE.toString()); diff --git a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DefaultPartitionerTest.java b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DefaultPartitionerTest.java index 39dac993f..b75a655d9 100644 --- a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DefaultPartitionerTest.java +++ b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/DefaultPartitionerTest.java @@ -35,6 +35,7 @@ public class DefaultPartitionerTest extends StorageSinkTestBase { public void testDefaultPartitioner() { Map config = new HashMap<>(); config.put(StorageCommonConfig.DIRECTORY_DELIM_CONFIG, StorageCommonConfig.DIRECTORY_DELIM_DEFAULT); + config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT); DefaultPartitioner partitioner = new DefaultPartitioner<>(); partitioner.configure(config); diff --git a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/HourlyPartitionerTest.java b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/HourlyPartitionerTest.java index 563032238..91f21d3e5 100644 --- a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/HourlyPartitionerTest.java +++ b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/HourlyPartitionerTest.java @@ -37,6 +37,7 @@ public class HourlyPartitionerTest extends StorageSinkTestBase { public void testHourlyPartitioner() { Map config = new HashMap<>(); config.put(StorageCommonConfig.DIRECTORY_DELIM_CONFIG, StorageCommonConfig.DIRECTORY_DELIM_DEFAULT); + config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT); config.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "Record"); config.put(PartitionerConfig.LOCALE_CONFIG, Locale.US.toString()); config.put(PartitionerConfig.TIMEZONE_CONFIG, DATE_TIME_ZONE.toString()); @@ -61,4 +62,4 @@ public void testHourlyPartitioner() { assertThat(encodedPartition, is(generateEncodedPartitionFromMap(m))); } -} \ No newline at end of file +} diff --git a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java index 2868aba10..a55a1faae 100644 --- a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java +++ b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java @@ -111,6 +111,21 @@ public void testGeneratePartitionedPath() throws Exception { assertEquals(topic+"/year=2015/month=4/day=2/hour=0/", path); } + @Test + public void testGeneratePartitionedPathWithoutTopicName() throws Exception { + Map config = createConfig(null); + config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, false); + + BiHourlyPartitioner partitioner = (BiHourlyPartitioner) configurePartitioner( + new BiHourlyPartitioner(), null, config); + + SinkRecord sinkRecord = getSinkRecord(); + String encodedPartition = partitioner.encodePartition(sinkRecord); + final String topic = "topic"; + String path = partitioner.generatePartitionedPath(topic, encodedPartition); + assertEquals("year=2015/month=4/day=2/hour=0/", path); + } + @Test public void testInvalidPathFormat() { final String configKey = PartitionerConfig.PATH_FORMAT_CONFIG; @@ -671,8 +686,11 @@ private Map createConfig(String timeFieldName) { config.put(PartitionerConfig.PATH_FORMAT_CONFIG, PATH_FORMAT); config.put(PartitionerConfig.LOCALE_CONFIG, Locale.US.toString()); config.put(PartitionerConfig.TIMEZONE_CONFIG, DATE_TIME_ZONE.toString()); + config.put(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG, StorageCommonConfig.PATH_INCLUDE_TOPICNAME_DEFAULT); if (timeFieldName != null) { config.put(PartitionerConfig.TIMESTAMP_FIELD_NAME_CONFIG, timeFieldName); + config.put(PartitionerConfig.TIMESTAMP_SCALING_FACTOR_CONFIG, PartitionerConfig.TIMESTAMP_SCALING_FACTOR_DEFAULT); + config.put(PartitionerConfig.TIMESTAMP_SCALING_OPERATION_CONFIG, PartitionerConfig.TIMESTAMP_SCALING_OPERATION_DEFAULT); } return config; }