diff --git a/src/main/java/io/confluent/connect/hdfs/FileUtils.java b/src/main/java/io/confluent/connect/hdfs/FileUtils.java index 184236562..f72eaf11f 100644 --- a/src/main/java/io/confluent/connect/hdfs/FileUtils.java +++ b/src/main/java/io/confluent/connect/hdfs/FileUtils.java @@ -31,6 +31,7 @@ import io.confluent.connect.hdfs.filter.CommittedFileFilter; import io.confluent.connect.hdfs.storage.Storage; +import io.confluent.connect.storage.common.StorageCommonConfig; public class FileUtils { private static final Logger log = LoggerFactory.getLogger(FileUtils.class); @@ -101,7 +102,12 @@ public static String committedFileName( } public static String topicDirectory(String url, String topicsDir, String topic) { - return url + "/" + topicsDir + "/" + topic; + boolean topicInPath = Boolean.valueOf(StorageCommonConfig.PATH_INCLUDE_TOPICNAME_CONFIG); + if (topicInPath) { + return url + "/" + topicsDir + "/" + topic; + } else { + return url + "/" + topicsDir; + } } public static FileStatus fileStatusWithMaxOffset( diff --git a/src/test/java/io/confluent/connect/hdfs/partitioner/TimeBasedPartitionerTest.java b/src/test/java/io/confluent/connect/hdfs/partitioner/TimeBasedPartitionerTest.java index f26248fed..688876bb1 100644 --- a/src/test/java/io/confluent/connect/hdfs/partitioner/TimeBasedPartitionerTest.java +++ b/src/test/java/io/confluent/connect/hdfs/partitioner/TimeBasedPartitionerTest.java @@ -66,6 +66,7 @@ private static class BiHourlyPartitioner extends TimeBasedPartitioner { @Override public void configure(Map config) { + super.configure(config); init(partitionDurationMs, pathFormat, Locale.FRENCH, DATE_TIME_ZONE, config); }