diff --git a/pom.xml b/pom.xml
index dd7c21b..63c21a3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,7 @@
com.github.jcustenborder.kafka.connect
kafka-connect-parent
- 2.6.1-1
+ 2.8.0-1
kafka-connect-spooldir
2.0-SNAPSHOT
@@ -61,6 +61,10 @@
https://github.com/jcustenborder/kafka-connect-spooldir/issues
+
+ com.github.jcustenborder.kafka.connect
+ connect-utils-parser
+
net.sourceforge.argparse4j
argparse4j
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSchemaGenerator.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSchemaGenerator.java
index cbba312..04b253c 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSchemaGenerator.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSchemaGenerator.java
@@ -15,11 +15,11 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
-import shaded.com.google.common.base.Strings;
-import shaded.com.google.common.collect.ImmutableList;
-import shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSourceConnectorConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSourceConnectorConfig.java
index 7446893..39c2d38 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSourceConnectorConfig.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSourceConnectorConfig.java
@@ -22,8 +22,8 @@
import com.github.jcustenborder.kafka.connect.utils.config.recommenders.Recommenders;
import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators;
import com.github.jcustenborder.kafka.connect.utils.config.validators.filesystem.ValidDirectoryWritable;
-import shaded.com.google.common.collect.ImmutableList;
-import shaded.com.google.common.io.PatternFilenameFilter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.PatternFilenameFilter;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
@@ -40,6 +40,14 @@ public abstract class AbstractSourceConnectorConfig extends AbstractConfig {
public static final String FILE_MINIMUM_AGE_MS_CONF = "file.minimum.age.ms";
public static final String FILE_SORT_ATTRIBUTES_CONF = "files.sort.attributes";
+ public static final String INPUT_PATH_WALK_RECURSIVELY = "input.path.walk.recursively";
+ public static final boolean INPUT_PATH_WALK_RECURSIVELY_DEFAULT = false;
+ static final String INPUT_PATH_WALK_RECURSIVELY_DOC = "If enabled, any sub-directories dropped under `input.path` will be recursively walked looking for files matching the configured `input.file.pattern`. After processing is complete the discovered sub directory structure (as well as files within them) will handled according to the configured `cleanup.policy` (i.e. moved or deleted etc). For each discovered file, the walked sub-directory path will be set as a header named `file.relative.path`";
+
+ public static final String CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH = "cleanup.policy.maintain.relative.path";
+ static final boolean CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH_DEFAULT = false;
+ static final String CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH_DOC = "If `" + INPUT_PATH_WALK_RECURSIVELY + "` is enabled in combination with this flag being `true`, the walked sub-directories which contained files will be retained as-is under the `input.path`. The actual files within the sub-directories will moved (with a copy of the sub-dir structure) or deleted as per the `cleanup.policy` defined, but the parent sub-directory structure will remain.";
+
public static final String PROCESSING_FILE_EXTENSION_CONF = "processing.file.extension";
//RecordProcessorConfig
public static final String BATCH_SIZE_CONF = "batch.size";
@@ -103,6 +111,7 @@ public abstract class AbstractSourceConnectorConfig extends AbstractConfig {
static final String FILE_BUFFER_SIZE_DOC = "The size of buffer for the BufferedInputStream that will be used to " +
"interact with the file system.";
+
public final File inputPath;
public final File finishedPath;
public final File errorPath;
@@ -121,6 +130,8 @@ public abstract class AbstractSourceConnectorConfig extends AbstractConfig {
public final TaskPartitioner taskPartitioner;
public final boolean bufferedInputStream;
public final int fileBufferSizeBytes;
+ public final boolean inputPathWalkRecursively;
+ public final boolean inputPathWalkRecursivelyRetainSubDirs;
public final boolean finishedPathRequired() {
boolean result;
@@ -165,6 +176,8 @@ public AbstractSourceConnectorConfig(ConfigDef definition, Map, ?> originals,
this.taskIndex = getInt(TASK_INDEX_CONF);
this.taskCount = getInt(TASK_COUNT_CONF);
this.taskPartitioner = ConfigUtils.getEnum(TaskPartitioner.class, this, TASK_PARTITIONER_CONF);
+ this.inputPathWalkRecursively = this.getBoolean(INPUT_PATH_WALK_RECURSIVELY);
+ this.inputPathWalkRecursivelyRetainSubDirs = this.getBoolean(CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH);
if (bufferedInputStream) {
this.fileBufferSizeBytes = getInt(FILE_BUFFER_SIZE_CONF);
@@ -301,6 +314,20 @@ protected static ConfigDef config(boolean bufferedInputStream) {
.defaultValue(TaskPartitioner.ByName.toString())
.group(GROUP_FILESYSTEM)
.build()
+ ).define(
+ ConfigKeyBuilder.of(INPUT_PATH_WALK_RECURSIVELY, ConfigDef.Type.BOOLEAN)
+ .documentation(INPUT_PATH_WALK_RECURSIVELY_DOC)
+ .importance(ConfigDef.Importance.LOW)
+ .defaultValue(INPUT_PATH_WALK_RECURSIVELY_DEFAULT)
+ .group(GROUP_FILESYSTEM)
+ .build()
+ ).define(
+ ConfigKeyBuilder.of(CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH, ConfigDef.Type.BOOLEAN)
+ .documentation(CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH_DOC)
+ .importance(ConfigDef.Importance.LOW)
+ .defaultValue(CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH_DEFAULT)
+ .group(GROUP_FILESYSTEM)
+ .build()
);
if (bufferedInputStream) {
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSourceTask.java
index 179c3d6..fbc2308 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSourceTask.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSourceTask.java
@@ -22,9 +22,9 @@
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import shaded.com.google.common.base.Preconditions;
-import shaded.com.google.common.base.Stopwatch;
-import shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.FileNotFoundException;
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceConnector.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceConnector.java
index 286be17..63b06bd 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceConnector.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceConnector.java
@@ -17,9 +17,9 @@
import com.github.jcustenborder.kafka.connect.utils.VersionUtil;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
-import shaded.com.google.common.base.Preconditions;
-import shaded.com.google.common.collect.HashMultimap;
-import shaded.com.google.common.collect.Multimap;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceConnectorConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceConnectorConfig.java
index 9bd7422..7ffa2bb 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceConnectorConfig.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceConnectorConfig.java
@@ -15,13 +15,13 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
import com.github.jcustenborder.kafka.connect.utils.config.recommenders.Recommenders;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
-import shaded.com.google.common.base.Preconditions;
-import shaded.com.google.common.base.Strings;
-import shaded.com.google.common.collect.ImmutableList;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.connect.data.Field;
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceTask.java
index da6b305..c4f68d4 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceTask.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceTask.java
@@ -20,7 +20,7 @@
import com.github.jcustenborder.kafka.connect.utils.data.type.TimeTypeParser;
import com.github.jcustenborder.kafka.connect.utils.data.type.TimestampTypeParser;
import com.github.jcustenborder.kafka.connect.utils.data.type.TypeParser;
-import shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractTaskPartitionerPredicate.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractTaskPartitionerPredicate.java
index 6c852b4..e3751e6 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractTaskPartitionerPredicate.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractTaskPartitionerPredicate.java
@@ -15,7 +15,7 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.hash.Hashing;
+import com.google.common.hash.Hashing;
import org.apache.kafka.common.config.ConfigException;
import java.io.File;
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/FileComparator.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/FileComparator.java
index 5ec31c2..2d6295c 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/FileComparator.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/FileComparator.java
@@ -15,7 +15,7 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ComparisonChain;
import java.io.File;
import java.util.Comparator;
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFile.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFile.java
index ca7b8a5..7808bf5 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFile.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFile.java
@@ -15,8 +15,8 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.collect.ImmutableMap;
-import shaded.com.google.common.io.Files;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.slf4j.Logger;
@@ -31,6 +31,10 @@
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.nio.charset.Charset;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
public class InputFile implements Closeable {
@@ -43,6 +47,7 @@ public class InputFile implements Closeable {
private final long lastModified;
private final Metadata metadata;
private final AbstractSourceConnectorConfig config;
+ private final String inputPathSubDir;
InputStreamReader inputStreamReader;
LineNumberReader lineNumberReader;
InputStream inputStream;
@@ -56,7 +61,8 @@ public class InputFile implements Closeable {
this.length = this.file.length();
String processingFileName = file.getName() + config.processingFileExtension;
this.processingFlag = new File(file.getParentFile(), processingFileName);
- this.metadata = new Metadata(file);
+ this.inputPathSubDir = determineRelativePath(file, config.inputPath);
+ this.metadata = new Metadata(file, this.inputPathSubDir);
}
static final Map SUPPORTED_COMPRESSION_TYPES = ImmutableMap.of(
@@ -67,6 +73,20 @@ public class InputFile implements Closeable {
"z", CompressorStreamFactory.Z
);
+
+ private static String determineRelativePath(File inputPath, File inputFile) {
+ Path relative = inputFile.toPath().relativize(inputPath.toPath()); // inputPath.toPath().relativize(inputFile.getParentFile().toPath());
+ String subDir = relative.toString();
+ if ("".equals(subDir)) {
+ return null;
+ }
+ return subDir;
+ }
+
+ public String inputPathSubDir() {
+ return this.inputPathSubDir;
+ }
+
public File file() {
return this.file;
}
@@ -80,7 +100,6 @@ public Metadata metadata() {
}
-
public InputStream inputStream() {
return this.inputStream;
}
@@ -197,7 +216,51 @@ public long lastModified() {
return this.lastModified;
}
+ private List getInputPathSubDirsToCleanup() {
+ List inputPathSubDirsToCleanup = null;
+ if (this.inputPathSubDir != null && !config.inputPathWalkRecursivelyRetainSubDirs) {
+ inputPathSubDirsToCleanup = new ArrayList();
+ File lastSubDir = this.config.inputPath;
+ for (String subDirName : this.inputPathSubDir.split(File.separator)) {
+ lastSubDir = new File(lastSubDir, subDirName);
+ inputPathSubDirsToCleanup.add(lastSubDir);
+ }
+ Collections.reverse(inputPathSubDirsToCleanup);
+ }
+ return inputPathSubDirsToCleanup;
+ }
+
+ private void cleanupInputDirSubDirs() {
+ List inputPathSubDirsToCleanup = this.getInputPathSubDirsToCleanup();
+ if (inputPathSubDirsToCleanup != null) {
+ for (File subDir : inputPathSubDirsToCleanup) {
+ try {
+ if (subDir.listFiles() == null || subDir.listFiles().length == 0) {
+ if (!subDir.delete()) {
+ log.error("Failed to delete input.path sub-directory: {}", subDir);
+ } else {
+ log.info("Cleaned up input.path sub-directory: {}", subDir);
+ }
+ } else {
+ log.info("Cannot clean up input.path sub-directory as it is not empty: {}", subDir);
+ }
+ } catch (SecurityException e) {
+ log.error("SecurityException thrown while trying to delete input.path sub-directory: {}", subDir, e);
+ }
+ }
+ }
+ }
+
+
public void moveToDirectory(File outputDirectory) {
+
+ if (this.inputPathSubDir != null) {
+ outputDirectory = new File(outputDirectory, this.inputPathSubDir);
+ if (!outputDirectory.isDirectory()) {
+ outputDirectory.mkdirs();
+ }
+ }
+
File outputFile = new File(outputDirectory, this.file.getName());
try {
if (this.file.exists()) {
@@ -207,6 +270,9 @@ public void moveToDirectory(File outputDirectory) {
} catch (IOException e) {
log.error("Exception thrown while trying to move {} to {}", this.file, outputFile, e);
}
+
+ this.cleanupInputDirSubDirs();
+
}
public void delete() {
@@ -214,6 +280,8 @@ public void delete() {
if (!this.file.delete()) {
log.warn("Could not delete {}", this.file);
}
+
+ this.cleanupInputDirSubDirs();
}
public boolean exists() {
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFileDequeue.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFileDequeue.java
index 495e72c..bf4b159 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFileDequeue.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFileDequeue.java
@@ -15,17 +15,22 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.collect.ForwardingDeque;
+import com.google.common.collect.ForwardingDeque;
+import com.google.common.io.PatternFilenameFilter;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class InputFileDequeue extends ForwardingDeque {
private static final Logger log = LoggerFactory.getLogger(InputFileDequeue.class);
@@ -58,7 +63,26 @@ protected Deque delegate() {
}
log.trace("delegate() - Searching for file(s) in {}", this.config.inputPath);
- File[] input = this.config.inputPath.listFiles(this.config.inputFilenameFilter);
+
+ final File[] input;
+
+ if (this.config.inputPathWalkRecursively) {
+ final PatternFilenameFilter walkerFilenameFilter = this.config.inputFilenameFilter;
+ Predicate filenameFilterPredicate = file -> walkerFilenameFilter.accept(file.getParentFile(), file.getName());
+
+ try (Stream filesWalk = Files.walk(this.config.inputPath.toPath())) {
+ input = filesWalk.map(Path::toFile)
+ .filter(File::isFile)
+ .filter(filenameFilterPredicate)
+ .toArray(File[]::new);
+ } catch (IOException e) {
+ log.error("Unexpected eror walking {}: {}", this.config.inputPath.toPath(), e.getMessage(), e);
+ return new ArrayDeque<>();
+ }
+ } else {
+ input = this.config.inputPath.listFiles(this.config.inputFilenameFilter);
+ }
+
if (null == input || input.length == 0) {
log.info("No files matching {} were found in {}", AbstractSourceConnectorConfig.INPUT_FILE_PATTERN_CONF, this.config.inputPath);
return new ArrayDeque<>();
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/JsonSchemaGenerator.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/JsonSchemaGenerator.java
index 139e93e..20d7dfe 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/JsonSchemaGenerator.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/JsonSchemaGenerator.java
@@ -15,9 +15,9 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.fasterxml.jackson.core.JsonFactory;
-import shaded.com.fasterxml.jackson.core.JsonParser;
-import shaded.com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
import org.apache.kafka.connect.data.Schema;
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/Metadata.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/Metadata.java
index 47de9b8..cc55c51 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/Metadata.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/Metadata.java
@@ -15,8 +15,8 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.collect.ImmutableMap;
-import shaded.com.google.common.io.Files;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
@@ -36,14 +36,17 @@ class Metadata {
static final String HEADER_LAST_MODIFIED = "file.last.modified";
static final String HEADER_LENGTH = "file.length";
static final String HEADER_OFFSET = "file.offset";
+ static final String HEADER_FILE_RELATIVE_PATH = "file.relative.path";
final String path;
final String name;
final String nameWithoutExtension;
final Date lastModified;
final long length;
+ final String relativePath;
String parentDirName = null;
+
public static final Map HEADER_DESCRIPTIONS;
static {
@@ -55,6 +58,7 @@ class Metadata {
result.put(HEADER_LAST_MODIFIED, "The last modified date of the file.");
result.put(HEADER_LENGTH, "The size of the file in bytes.");
result.put(HEADER_OFFSET, "The offset for this piece of data within the file.");
+ result.put(HEADER_FILE_RELATIVE_PATH, "The file's parent sub-directory relative from the input.path.");
HEADER_DESCRIPTIONS = ImmutableMap.copyOf(result);
}
@@ -75,7 +79,7 @@ class Metadata {
- public Metadata(File file) {
+ public Metadata(File file, String relativePath) {
this.path = file.getAbsolutePath();
this.name = file.getName();
this.lastModified = new Date(file.lastModified());
@@ -85,6 +89,8 @@ public Metadata(File file) {
if (file.getParentFile() != null) {
this.parentDirName = file.getParentFile().getName();
}
+
+ this.relativePath = relativePath;
}
/**
@@ -101,6 +107,11 @@ public Headers headers(long offset) {
headers.addLong(HEADER_LENGTH, this.length);
headers.addLong(HEADER_OFFSET, offset);
headers.addTimestamp(HEADER_LAST_MODIFIED, this.lastModified);
+
+ if (this.relativePath != null) {
+ headers.addString(HEADER_FILE_RELATIVE_PATH, this.relativePath);
+ }
+
return headers;
}
}
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirBinaryFileSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirBinaryFileSourceTask.java
index e235a96..119a673 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirBinaryFileSourceTask.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirBinaryFileSourceTask.java
@@ -15,7 +15,7 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.io.ByteStreams;
+import com.google.common.io.ByteStreams;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.source.SourceRecord;
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorConfig.java
index dc92a05..1b6ffdd 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorConfig.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorConfig.java
@@ -18,8 +18,8 @@
import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils;
import com.github.jcustenborder.kafka.connect.utils.config.ValidEnum;
-import shaded.com.google.common.base.Joiner;
-import shaded.com.google.common.base.Preconditions;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
import com.opencsv.CSVParser;
import com.opencsv.CSVParserBuilder;
import com.opencsv.CSVReader;
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTask.java
index 0106c32..90e5071 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTask.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTask.java
@@ -15,7 +15,7 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.base.Joiner;
+import com.google.common.base.Joiner;
import com.opencsv.CSVReader;
import com.opencsv.CSVReaderBuilder;
import com.opencsv.ICSVParser;
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceTask.java
index bdc7f62..77b40ca 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceTask.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceTask.java
@@ -15,9 +15,9 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.fasterxml.jackson.core.JsonFactory;
-import shaded.com.fasterxml.jackson.core.JsonParser;
-import shaded.com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaAndValue;
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSchemaLessJsonSourceConnector.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSchemaLessJsonSourceConnector.java
index 3f276b7..f62ca8d 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSchemaLessJsonSourceConnector.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSchemaLessJsonSourceConnector.java
@@ -24,9 +24,9 @@
import java.util.Map;
@Title("Schema Less Json Source Connector")
-@Description("This connector is used to `stream _` JSON files from a directory " +
- "while converting the data based on the schema supplied in the configuration. This connector will read each file node by node " +
- "writing the result to Kafka. For example if your data file contains several json objects the connector will read from { to } " +
+@Description("This connector is used to `stream _` JSON files from a directory. " +
+ "This connector will read each file node by node writing each node as a record in Kafka." +
+ "For example if your data file contains several json objects the connector will read from { to } " +
"for each object and write each object to Kafka.")
@DocumentationImportant("This connector does not try to convert the json records to a schema. " +
"The recommended converter to use is the StringConverter. " +
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSchemaLessJsonSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSchemaLessJsonSourceTask.java
index 4f40c5f..d9cf349 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSchemaLessJsonSourceTask.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSchemaLessJsonSourceTask.java
@@ -15,9 +15,9 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.fasterxml.jackson.core.JsonParser;
-import shaded.com.fasterxml.jackson.databind.JsonNode;
-import shaded.com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.MappingIterator;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversionBuilder.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversionBuilder.java
index 2d41be0..3a5b8d7 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversionBuilder.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversionBuilder.java
@@ -18,7 +18,7 @@
import com.github.jcustenborder.kafka.connect.spooldir.elf.converters.LogFieldConverter;
import com.github.jcustenborder.kafka.connect.spooldir.elf.converters.LogFieldConverterFactory;
import com.github.jcustenborder.parsers.elf.ElfParser;
-import shaded.com.google.common.base.Preconditions;
+import com.google.common.base.Preconditions;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractCleanUpPolicyTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractCleanUpPolicyTest.java
index 6574e2b..d183f5e 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractCleanUpPolicyTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractCleanUpPolicyTest.java
@@ -1,7 +1,7 @@
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.collect.ImmutableMap;
-import shaded.com.google.common.io.Files;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -24,29 +24,43 @@ public abstract class AbstractCleanUpPolicyTest
File inputPath;
File finishedPath;
File errorPath;
+ String inputPathSubDir;
protected T cleanupPolicy;
protected abstract T create(
InputFile inputFile, File errorPath, File finishedPath
);
+ protected String defineInputPathSubDir() {
+ return null;
+ }
+
+ protected ImmutableMap.Builder getConnectorConfigMap() {
+ return new ImmutableMap.Builder()
+ .put(SpoolDirBinaryFileSourceConnectorConfig.TOPIC_CONF, "foo")
+ .put(SpoolDirBinaryFileSourceConnectorConfig.INPUT_PATH_CONFIG, this.inputPath.toString())
+ .put(SpoolDirBinaryFileSourceConnectorConfig.INPUT_FILE_PATTERN_CONF, "^.$")
+ .put(SpoolDirBinaryFileSourceConnectorConfig.ERROR_PATH_CONFIG, this.errorPath.toString())
+ .put(SpoolDirBinaryFileSourceConnectorConfig.FINISHED_PATH_CONFIG, this.finishedPath.toString());
+ }
+
@BeforeEach
public void before() throws IOException {
this.errorPath = Files.createTempDir();
this.finishedPath = Files.createTempDir();
this.inputPath = Files.createTempDir();
+ this.inputPathSubDir = defineInputPathSubDir();
- File inputFile = File.createTempFile("input", "file", this.inputPath);
+ File tempFileParentPathDir = this.inputPath;
+ if (this.inputPathSubDir != null) {
+ tempFileParentPathDir = new File(this.inputPath, this.inputPathSubDir);
+ tempFileParentPathDir.mkdirs();
+ }
+
+ File inputFile = File.createTempFile("input", "file", tempFileParentPathDir);
- SpoolDirBinaryFileSourceConnectorConfig config = new SpoolDirBinaryFileSourceConnectorConfig(
- ImmutableMap.of(
- SpoolDirBinaryFileSourceConnectorConfig.TOPIC_CONF, "foo",
- SpoolDirBinaryFileSourceConnectorConfig.INPUT_PATH_CONFIG, this.inputPath.toString(),
- SpoolDirBinaryFileSourceConnectorConfig.INPUT_FILE_PATTERN_CONF, "^.$",
- SpoolDirBinaryFileSourceConnectorConfig.ERROR_PATH_CONFIG, this.errorPath.toString(),
- SpoolDirBinaryFileSourceConnectorConfig.FINISHED_PATH_CONFIG, this.finishedPath.toString()
- )
- );
+ SpoolDirBinaryFileSourceConnectorConfig config =
+ new SpoolDirBinaryFileSourceConnectorConfig(getConnectorConfigMap().build());
this.inputFile = new InputFile(config, inputFile);
this.inputFile.inputStreamReader = mock(InputStreamReader.class);
@@ -54,12 +68,17 @@ public void before() throws IOException {
this.cleanupPolicy = create(this.inputFile, this.errorPath, this.finishedPath);
}
+ protected File getTargetFilePath(File containerPath, InputFile inputFile) {
+ String subDir = (this.defineInputPathSubDir() != null ? this.defineInputPathSubDir() : "");
+ return new File(new File(containerPath,subDir), inputFile.getName());
+ }
+
@Test
public void error() throws IOException {
assertTrue(this.inputFile.exists(), "Input file should exist");
this.cleanupPolicy.error();
assertFalse(this.inputFile.exists(), "input file should not exist");
- File erroredFile = new File(this.errorPath, this.inputFile.getName());
+ File erroredFile = this.getTargetFilePath(this.errorPath,this.inputFile);
assertTrue(erroredFile.exists(), "errored file should exist.");
}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSchemaGeneratorTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSchemaGeneratorTest.java
index 0b27b9a..ec27613 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSchemaGeneratorTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSchemaGeneratorTest.java
@@ -15,7 +15,7 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.io.Files;
+import com.google.common.io.Files;
import org.junit.jupiter.api.BeforeEach;
import java.io.File;
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceConnectorTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceConnectorTest.java
index b524874..866c581 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceConnectorTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceConnectorTest.java
@@ -15,7 +15,7 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.io.Files;
+import com.google.common.io.Files;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceTaskTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceTaskTest.java
index 48078b2..d7d5d4a 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceTaskTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSpoolDirSourceTaskTest.java
@@ -15,11 +15,11 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.SerializationFeature;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
-import shaded.com.google.common.collect.Maps;
-import shaded.com.google.common.io.ByteStreams;
-import shaded.com.google.common.io.Files;
+import com.google.common.collect.Maps;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
@@ -34,7 +34,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -85,6 +85,17 @@ protected Map settings() {
return settings;
}
+ protected String defineInputPathSubDir() {
+ return null;
+ }
+
+ protected File getTargetFilePath(File containerPath, String inputFileName) {
+ String subDir = (this.defineInputPathSubDir() != null ? this.defineInputPathSubDir() : "");
+ File targetDir = new File(containerPath, subDir);
+ targetDir.mkdirs();
+ return new File(targetDir, inputFileName);
+ }
+
protected void poll(final String packageName, TestCase testCase) throws InterruptedException, IOException {
String keySchemaConfig = ObjectMapperFactory.INSTANCE.writeValueAsString(testCase.keySchema);
String valueSchemaConfig = ObjectMapperFactory.INSTANCE.writeValueAsString(testCase.valueSchema);
@@ -119,7 +130,7 @@ protected void poll(final String packageName, TestCase testCase) throws Interrup
//Use this config because it's the simplest.
SpoolDirBinaryFileSourceConnectorConfig config = new SpoolDirBinaryFileSourceConnectorConfig(settings);
- final File p = new File(this.inputPath, inputFileName);
+ final File p = this.getTargetFilePath(this.inputPath, inputFileName);
try (InputStream inputStream = this.getClass().getResourceAsStream(dataFile)) {
assertNotNull(
inputStream,
@@ -147,13 +158,13 @@ protected void poll(final String packageName, TestCase testCase) throws Interrup
The following headers will change. Lets ensure they are there but we don't care about their
values since they are driven by things that will change such as lastModified dates and paths.
*/
- List headersToRemove = Arrays.asList(
- Metadata.HEADER_LAST_MODIFIED,
- Metadata.HEADER_PATH,
- Metadata.HEADER_LENGTH,
- Metadata.HEADER_NAME_WITHOUT_EXTENSION,
- Metadata.HEADER_PARENT_DIR_NAME
- );
+ List headersToRemove = new ArrayList();
+ headersToRemove.add(Metadata.HEADER_LAST_MODIFIED);
+ headersToRemove.add(Metadata.HEADER_PATH);
+ headersToRemove.add(Metadata.HEADER_LENGTH);
+ headersToRemove.add(Metadata.HEADER_NAME_WITHOUT_EXTENSION);
+ headersToRemove.add(Metadata.HEADER_PARENT_DIR_NAME);
+ headersToRemove.add(Metadata.HEADER_FILE_RELATIVE_PATH);
for (int i = 0; i < testCase.expected.size(); i++) {
SourceRecord expectedRecord = testCase.expected.get(i);
@@ -176,7 +187,7 @@ protected void poll(final String packageName, TestCase testCase) throws Interrup
assertNull(records, "records should be null after first poll.");
assertFalse(inputFile.exists(), String.format("inputFile %s should not exist.", inputFile));
assertFalse(inputFile.processingFlag().exists(), String.format("processingFile %s should not exist.", inputFile.processingFlag()));
- final File finishedFile = new File(this.finishedPath, inputFileName);
+ final File finishedFile = this.getTargetFilePath(this.finishedPath, inputFileName);
assertTrue(finishedFile.exists(), String.format("finishedFile %s should exist.", finishedFile));
}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/DeleteCleanupPolicySubDirsNoRetainTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/DeleteCleanupPolicySubDirsNoRetainTest.java
new file mode 100644
index 0000000..d3f3d5b
--- /dev/null
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/DeleteCleanupPolicySubDirsNoRetainTest.java
@@ -0,0 +1,31 @@
+package com.github.jcustenborder.kafka.connect.spooldir;
+
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class DeleteCleanupPolicySubDirsNoRetainTest extends DeleteCleanupPolicyTest {
+ @Override
+ protected String defineInputPathSubDir() {
+ return "test/01/02/03";
+ }
+
+ protected ImmutableMap.Builder getConnectorConfigMap() {
+ return super.getConnectorConfigMap()
+ .put(SpoolDirBinaryFileSourceConnectorConfig.INPUT_PATH_WALK_RECURSIVELY, "true")
+ .put(SpoolDirBinaryFileSourceConnectorConfig.CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH, "false");
+ }
+
+ @Test
+ public void success() throws IOException {
+ super.success();
+
+ assertFalse(new File(this.inputPath,this.defineInputPathSubDir()).exists(),
+ "The input.path sub-directory "+this.defineInputPathSubDir()+" should not exist");
+ }
+}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/DeleteCleanupPolicySubDirsRetainTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/DeleteCleanupPolicySubDirsRetainTest.java
new file mode 100644
index 0000000..c308e58
--- /dev/null
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/DeleteCleanupPolicySubDirsRetainTest.java
@@ -0,0 +1,31 @@
+package com.github.jcustenborder.kafka.connect.spooldir;
+
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DeleteCleanupPolicySubDirsRetainTest extends DeleteCleanupPolicyTest {
+ @Override
+ protected String defineInputPathSubDir() {
+ return "test/01/02/03";
+ }
+
+ protected ImmutableMap.Builder getConnectorConfigMap() {
+ return super.getConnectorConfigMap()
+ .put(SpoolDirBinaryFileSourceConnectorConfig.INPUT_PATH_WALK_RECURSIVELY, "true")
+ .put(SpoolDirBinaryFileSourceConnectorConfig.CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH, "true");
+ }
+
+ @Test
+ public void success() throws IOException {
+ super.success();
+
+ assertTrue(new File(this.inputPath,this.defineInputPathSubDir()).exists(),
+ "The input.path sub-directory "+this.defineInputPathSubDir()+" should exist");
+ }
+}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/FileComparatorTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/FileComparatorTest.java
index 207673b..f499594 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/FileComparatorTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/FileComparatorTest.java
@@ -1,7 +1,7 @@
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.collect.ImmutableList;
-import shaded.com.google.common.io.Files;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveByDateCleanupPolicySubDirsNoRetainTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveByDateCleanupPolicySubDirsNoRetainTest.java
new file mode 100644
index 0000000..d85f5f9
--- /dev/null
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveByDateCleanupPolicySubDirsNoRetainTest.java
@@ -0,0 +1,32 @@
+package com.github.jcustenborder.kafka.connect.spooldir;
+
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class MoveByDateCleanupPolicySubDirsNoRetainTest extends MoveByDateCleanupPolicyTest {
+ @Override
+ protected String defineInputPathSubDir() {
+ return "test/01/02/03";
+ }
+
+ protected ImmutableMap.Builder getConnectorConfigMap() {
+ return super.getConnectorConfigMap()
+ .put(SpoolDirBinaryFileSourceConnectorConfig.INPUT_PATH_WALK_RECURSIVELY, "true")
+ .put(SpoolDirBinaryFileSourceConnectorConfig.CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH, "false");
+ }
+
+ @Test
+ public void success() throws IOException {
+ super.success();
+
+ assertFalse(new File(this.inputPath,this.defineInputPathSubDir()).exists(),
+ "The input.path sub-directory "+this.defineInputPathSubDir()+" should not exist");
+
+ }
+}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveByDateCleanupPolicySubDirsRetainTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveByDateCleanupPolicySubDirsRetainTest.java
new file mode 100644
index 0000000..ef38450
--- /dev/null
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveByDateCleanupPolicySubDirsRetainTest.java
@@ -0,0 +1,31 @@
+package com.github.jcustenborder.kafka.connect.spooldir;
+
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.io.IOException;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MoveByDateCleanupPolicySubDirsRetainTest extends MoveByDateCleanupPolicyTest {
+ @Override
+ protected String defineInputPathSubDir() {
+ return "test/01/02/03";
+ }
+
+ protected ImmutableMap.Builder getConnectorConfigMap() {
+ return super.getConnectorConfigMap()
+ .put(SpoolDirBinaryFileSourceConnectorConfig.INPUT_PATH_WALK_RECURSIVELY, "true")
+ .put(SpoolDirBinaryFileSourceConnectorConfig.CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH, "true");
+ }
+
+ @Test
+ public void success() throws IOException {
+ super.success();
+
+ assertTrue(new File(this.inputPath,this.defineInputPathSubDir()).exists(),
+ "The input.path sub-directory "+this.defineInputPathSubDir()+" should exist");
+
+ }
+}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveByDateCleanupPolicyTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveByDateCleanupPolicyTest.java
index 819e730..814beff 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveByDateCleanupPolicyTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveByDateCleanupPolicyTest.java
@@ -21,7 +21,7 @@ protected AbstractCleanUpPolicy.MoveByDate create(InputFile inputFile, File erro
public void success() throws IOException {
SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd");
Path subDirectory = Paths.get(this.finishedPath.getAbsolutePath(), dateFormatter.format(this.inputFile.lastModified()));
- File finishedFile = new File(subDirectory.toFile(), this.inputFile.getName());
+ File finishedFile = this.getTargetFilePath(subDirectory.toFile(), this.inputFile);
assertTrue(this.inputFile.exists(), "Input file should exist");
assertFalse(finishedFile.exists(), "Finished file should not exist");
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveCleanupPolicySubDirsNoRetainTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveCleanupPolicySubDirsNoRetainTest.java
new file mode 100644
index 0000000..7afa40f
--- /dev/null
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveCleanupPolicySubDirsNoRetainTest.java
@@ -0,0 +1,33 @@
+package com.github.jcustenborder.kafka.connect.spooldir;
+
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class MoveCleanupPolicySubDirsNoRetainTest extends MoveCleanupPolicyTest {
+
+ @Override
+ protected String defineInputPathSubDir() {
+ return "test/01/02/03";
+ }
+
+ protected ImmutableMap.Builder getConnectorConfigMap() {
+ return super.getConnectorConfigMap()
+ .put(SpoolDirBinaryFileSourceConnectorConfig.INPUT_PATH_WALK_RECURSIVELY, "true")
+ .put(SpoolDirBinaryFileSourceConnectorConfig.CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH, "false");
+ }
+
+ @Test
+ public void success() throws IOException {
+ super.success();
+
+ assertFalse(new File(this.inputPath,this.defineInputPathSubDir()).exists(),
+ "The input.path sub-directory "+this.defineInputPathSubDir()+" should not exist");
+
+ }
+}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveCleanupPolicySubDirsRetainTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveCleanupPolicySubDirsRetainTest.java
new file mode 100644
index 0000000..e70a497
--- /dev/null
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveCleanupPolicySubDirsRetainTest.java
@@ -0,0 +1,33 @@
+package com.github.jcustenborder.kafka.connect.spooldir;
+
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class MoveCleanupPolicySubDirsRetainTest extends MoveCleanupPolicyTest {
+
+ @Override
+ protected String defineInputPathSubDir() {
+ return "test/01/02/03";
+ }
+
+ protected ImmutableMap.Builder getConnectorConfigMap() {
+ return super.getConnectorConfigMap()
+ .put(SpoolDirBinaryFileSourceConnectorConfig.INPUT_PATH_WALK_RECURSIVELY, "true")
+ .put(SpoolDirBinaryFileSourceConnectorConfig.CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH, "true");
+ }
+
+ @Test
+ public void success() throws IOException {
+ super.success();
+
+ assertTrue(new File(this.inputPath,this.defineInputPathSubDir()).exists(),
+ "The input.path sub-directory "+this.defineInputPathSubDir()+" should exist");
+
+ }
+}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveCleanupPolicyTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveCleanupPolicyTest.java
index edf243b..4e65c18 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveCleanupPolicyTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/MoveCleanupPolicyTest.java
@@ -16,7 +16,7 @@ protected AbstractCleanUpPolicy.Move create(InputFile inputFile, File errorPath,
@Test
public void success() throws IOException {
- File finishedFile = new File(this.finishedPath, this.inputFile.getName());
+ File finishedFile = this.getTargetFilePath(this.finishedPath, this.inputFile);
assertTrue(this.inputFile.exists(), "Input file should exist");
assertFalse(finishedFile.exists(), "Finished file should not exist");
this.cleanupPolicy.success();
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/ProcessingFileExistsPredicateTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/ProcessingFileExistsPredicateTest.java
index 8e3eae1..1984f33 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/ProcessingFileExistsPredicateTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/ProcessingFileExistsPredicateTest.java
@@ -1,6 +1,6 @@
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.io.Files;
+import com.google.common.io.Files;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirAvroSourceTaskTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirAvroSourceTaskTest.java
index 06ace13..26b6537 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirAvroSourceTaskTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirAvroSourceTaskTest.java
@@ -1,6 +1,6 @@
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.io.Files;
+import com.google.common.io.Files;
import io.confluent.connect.avro.AvroData;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirBinaryFileSourceTaskTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirBinaryFileSourceTaskTest.java
index 415c312..359c91b 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirBinaryFileSourceTaskTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirBinaryFileSourceTaskTest.java
@@ -1,6 +1,6 @@
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.io.Files;
+import com.google.common.io.Files;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;
import org.slf4j.Logger;
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorTest.java
index 9591f83..e8f63af 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorTest.java
@@ -15,7 +15,7 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.io.ByteStreams;
+import com.google.common.io.ByteStreams;
import org.apache.kafka.connect.errors.DataException;
import org.junit.jupiter.api.Test;
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTaskSubDirsNoRetainTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTaskSubDirsNoRetainTest.java
new file mode 100644
index 0000000..112ebec
--- /dev/null
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTaskSubDirsNoRetainTest.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.jcustenborder.kafka.connect.spooldir;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+public class SpoolDirCsvSourceTaskSubDirsNoRetainTest extends SpoolDirCsvSourceTaskTest {
+ private static final Logger log = LoggerFactory.getLogger(SpoolDirCsvSourceTaskSubDirsNoRetainTest.class);
+
+ @Override
+ protected Map settings() {
+ Map settings = super.settings();
+
+ settings.put(AbstractSourceConnectorConfig.INPUT_PATH_WALK_RECURSIVELY,"true");
+ settings.put(AbstractSourceConnectorConfig.CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH,"false");
+
+ return settings;
+ }
+
+ @Override
+ protected String defineInputPathSubDir() {
+ return "test/01/02/03";
+ }
+
+}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTaskSubDirsRetainTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTaskSubDirsRetainTest.java
new file mode 100644
index 0000000..26e68d3
--- /dev/null
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTaskSubDirsRetainTest.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.jcustenborder.kafka.connect.spooldir;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+public class SpoolDirCsvSourceTaskSubDirsRetainTest extends SpoolDirCsvSourceTaskTest {
+ private static final Logger log = LoggerFactory.getLogger(SpoolDirCsvSourceTaskSubDirsRetainTest.class);
+
+ @Override
+ protected Map settings() {
+ Map settings = super.settings();
+
+ settings.put(AbstractSourceConnectorConfig.INPUT_PATH_WALK_RECURSIVELY,"true");
+ settings.put(AbstractSourceConnectorConfig.CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH,"true");
+
+ return settings;
+ }
+
+ @Override
+ protected String defineInputPathSubDir() {
+ return "test/01/02/03";
+ }
+
+}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTaskTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTaskTest.java
index c2cc9d4..1fc6bf0 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTaskTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTaskTest.java
@@ -16,7 +16,7 @@
package com.github.jcustenborder.kafka.connect.spooldir;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
-import shaded.com.google.common.io.Files;
+import com.google.common.io.Files;
import com.opencsv.CSVWriterBuilder;
import com.opencsv.ICSVWriter;
import org.apache.kafka.connect.data.Field;
@@ -112,8 +112,8 @@ public void rebalance() throws IOException, InterruptedException {
.put("id", i)
);
}
-
- File inputFile = new File(this.inputPath, "input.csv");
+
+ File inputFile = this.getTargetFilePath(this.inputPath, "input.csv");
writeCSV(inputFile, schema, values);
Map settings = settings();
settings.put(SpoolDirCsvSourceConnectorConfig.KEY_SCHEMA_CONF, ObjectMapperFactory.INSTANCE.writeValueAsString(schema));
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceConnectorTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceConnectorTest.java
index 1028140..b17258d 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceConnectorTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceConnectorTest.java
@@ -15,7 +15,7 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.io.ByteStreams;
+import com.google.common.io.ByteStreams;
import org.apache.kafka.connect.errors.DataException;
import org.junit.jupiter.api.Test;
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceTaskTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceTaskTest.java
index 4b6b7c2..5cecc44 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceTaskTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceTaskTest.java
@@ -15,7 +15,7 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.io.Files;
+import com.google.common.io.Files;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;
import org.slf4j.Logger;
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSchemaLessJsonSourceTaskTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSchemaLessJsonSourceTaskTest.java
index 2ca6c53..19c6278 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSchemaLessJsonSourceTaskTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSchemaLessJsonSourceTaskTest.java
@@ -1,6 +1,6 @@
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.google.common.io.Files;
+import com.google.common.io.Files;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;
import org.slf4j.Logger;
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/TestCase.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/TestCase.java
index 0b7d050..dce2a24 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/TestCase.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/TestCase.java
@@ -15,7 +15,7 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;
-import shaded.com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/TestDataUtils.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/TestDataUtils.java
index 1c497fc..ca6790b 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/TestDataUtils.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/TestDataUtils.java
@@ -16,7 +16,7 @@
package com.github.jcustenborder.kafka.connect.spooldir;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
-import shaded.com.google.common.base.Preconditions;
+import com.google.common.base.Preconditions;
import org.junit.jupiter.api.Test;
import org.reflections.Reflections;
import org.reflections.scanners.ResourcesScanner;
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversionBuilderTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversionBuilderTest.java
index e80178a..4077d6f 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversionBuilderTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversionBuilderTest.java
@@ -17,7 +17,7 @@
import com.github.jcustenborder.parsers.elf.ElfParser;
import com.github.jcustenborder.parsers.elf.LogEntry;
-import shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceTaskTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceTaskTest.java
index 5534525..bdbed96 100644
--- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceTaskTest.java
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceTaskTest.java
@@ -17,7 +17,7 @@
import com.github.jcustenborder.kafka.connect.spooldir.AbstractSpoolDirSourceTaskTest;
import com.github.jcustenborder.kafka.connect.spooldir.TestCase;
-import shaded.com.google.common.io.Files;
+import com.google.common.io.Files;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;
import org.slf4j.Logger;