Skip to content

Commit

Permalink
Issue 183 (#195)
Browse files Browse the repository at this point in the history
* Removed shaded components. Fixes #183

* Corrected description. Fixes #184.

* Corrected description. Fixes #159
  • Loading branch information
jcustenborder authored Dec 16, 2021
1 parent a790464 commit ec283f8
Show file tree
Hide file tree
Showing 47 changed files with 528 additions and 93 deletions.
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>com.github.jcustenborder.kafka.connect</groupId>
<artifactId>kafka-connect-parent</artifactId>
<version>2.6.1-1</version>
<version>2.8.0-1</version>
</parent>
<artifactId>kafka-connect-spooldir</artifactId>
<version>2.0-SNAPSHOT</version>
Expand Down Expand Up @@ -61,6 +61,10 @@
<url>https://github.com/jcustenborder/kafka-connect-spooldir/issues</url>
</issueManagement>
<dependencies>
<dependency>
<groupId>com.github.jcustenborder.kafka.connect</groupId>
<artifactId>connect-utils-parser</artifactId>
</dependency>
<dependency>
<groupId>net.sourceforge.argparse4j</groupId>
<artifactId>argparse4j</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;

import shaded.com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
import shaded.com.google.common.base.Strings;
import shaded.com.google.common.collect.ImmutableList;
import shaded.com.google.common.collect.ImmutableMap;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.github.jcustenborder.kafka.connect.utils.config.recommenders.Recommenders;
import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators;
import com.github.jcustenborder.kafka.connect.utils.config.validators.filesystem.ValidDirectoryWritable;
import shaded.com.google.common.collect.ImmutableList;
import shaded.com.google.common.io.PatternFilenameFilter;
import com.google.common.collect.ImmutableList;
import com.google.common.io.PatternFilenameFilter;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

Expand All @@ -40,6 +40,14 @@ public abstract class AbstractSourceConnectorConfig extends AbstractConfig {
public static final String FILE_MINIMUM_AGE_MS_CONF = "file.minimum.age.ms";
public static final String FILE_SORT_ATTRIBUTES_CONF = "files.sort.attributes";

public static final String INPUT_PATH_WALK_RECURSIVELY = "input.path.walk.recursively";
public static final boolean INPUT_PATH_WALK_RECURSIVELY_DEFAULT = false;
static final String INPUT_PATH_WALK_RECURSIVELY_DOC = "If enabled, any sub-directories dropped under `input.path` will be recursively walked looking for files matching the configured `input.file.pattern`. After processing is complete the discovered sub directory structure (as well as files within them) will handled according to the configured `cleanup.policy` (i.e. moved or deleted etc). For each discovered file, the walked sub-directory path will be set as a header named `file.relative.path`";

public static final String CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH = "cleanup.policy.maintain.relative.path";
static final boolean CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH_DEFAULT = false;
static final String CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH_DOC = "If `" + INPUT_PATH_WALK_RECURSIVELY + "` is enabled in combination with this flag being `true`, the walked sub-directories which contained files will be retained as-is under the `input.path`. The actual files within the sub-directories will moved (with a copy of the sub-dir structure) or deleted as per the `cleanup.policy` defined, but the parent sub-directory structure will remain.";

public static final String PROCESSING_FILE_EXTENSION_CONF = "processing.file.extension";
//RecordProcessorConfig
public static final String BATCH_SIZE_CONF = "batch.size";
Expand Down Expand Up @@ -103,6 +111,7 @@ public abstract class AbstractSourceConnectorConfig extends AbstractConfig {
static final String FILE_BUFFER_SIZE_DOC = "The size of buffer for the BufferedInputStream that will be used to " +
"interact with the file system.";


public final File inputPath;
public final File finishedPath;
public final File errorPath;
Expand All @@ -121,6 +130,8 @@ public abstract class AbstractSourceConnectorConfig extends AbstractConfig {
public final TaskPartitioner taskPartitioner;
public final boolean bufferedInputStream;
public final int fileBufferSizeBytes;
public final boolean inputPathWalkRecursively;
public final boolean inputPathWalkRecursivelyRetainSubDirs;

public final boolean finishedPathRequired() {
boolean result;
Expand Down Expand Up @@ -165,6 +176,8 @@ public AbstractSourceConnectorConfig(ConfigDef definition, Map<?, ?> originals,
this.taskIndex = getInt(TASK_INDEX_CONF);
this.taskCount = getInt(TASK_COUNT_CONF);
this.taskPartitioner = ConfigUtils.getEnum(TaskPartitioner.class, this, TASK_PARTITIONER_CONF);
this.inputPathWalkRecursively = this.getBoolean(INPUT_PATH_WALK_RECURSIVELY);
this.inputPathWalkRecursivelyRetainSubDirs = this.getBoolean(CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH);

if (bufferedInputStream) {
this.fileBufferSizeBytes = getInt(FILE_BUFFER_SIZE_CONF);
Expand Down Expand Up @@ -301,6 +314,20 @@ protected static ConfigDef config(boolean bufferedInputStream) {
.defaultValue(TaskPartitioner.ByName.toString())
.group(GROUP_FILESYSTEM)
.build()
).define(
ConfigKeyBuilder.of(INPUT_PATH_WALK_RECURSIVELY, ConfigDef.Type.BOOLEAN)
.documentation(INPUT_PATH_WALK_RECURSIVELY_DOC)
.importance(ConfigDef.Importance.LOW)
.defaultValue(INPUT_PATH_WALK_RECURSIVELY_DEFAULT)
.group(GROUP_FILESYSTEM)
.build()
).define(
ConfigKeyBuilder.of(CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH, ConfigDef.Type.BOOLEAN)
.documentation(CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH_DOC)
.importance(ConfigDef.Importance.LOW)
.defaultValue(CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH_DEFAULT)
.group(GROUP_FILESYSTEM)
.build()
);

if (bufferedInputStream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.com.google.common.base.Preconditions;
import shaded.com.google.common.base.Stopwatch;
import shaded.com.google.common.collect.ImmutableMap;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;

import java.io.File;
import java.io.FileNotFoundException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

import com.github.jcustenborder.kafka.connect.utils.VersionUtil;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
import shaded.com.google.common.base.Preconditions;
import shaded.com.google.common.collect.HashMultimap;
import shaded.com.google.common.collect.Multimap;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;

import shaded.com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
import com.github.jcustenborder.kafka.connect.utils.config.recommenders.Recommenders;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
import shaded.com.google.common.base.Preconditions;
import shaded.com.google.common.base.Strings;
import shaded.com.google.common.collect.ImmutableList;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.connect.data.Field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.github.jcustenborder.kafka.connect.utils.data.type.TimeTypeParser;
import com.github.jcustenborder.kafka.connect.utils.data.type.TimestampTypeParser;
import com.github.jcustenborder.kafka.connect.utils.data.type.TypeParser;
import shaded.com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;

import shaded.com.google.common.hash.Hashing;
import com.google.common.hash.Hashing;
import org.apache.kafka.common.config.ConfigException;

import java.io.File;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;

import shaded.com.google.common.collect.ComparisonChain;
import com.google.common.collect.ComparisonChain;

import java.io.File;
import java.util.Comparator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;

import shaded.com.google.common.collect.ImmutableMap;
import shaded.com.google.common.io.Files;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.slf4j.Logger;
Expand All @@ -31,6 +31,10 @@
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class InputFile implements Closeable {
Expand All @@ -43,6 +47,7 @@ public class InputFile implements Closeable {
private final long lastModified;
private final Metadata metadata;
private final AbstractSourceConnectorConfig config;
private final String inputPathSubDir;
InputStreamReader inputStreamReader;
LineNumberReader lineNumberReader;
InputStream inputStream;
Expand All @@ -56,7 +61,8 @@ public class InputFile implements Closeable {
this.length = this.file.length();
String processingFileName = file.getName() + config.processingFileExtension;
this.processingFlag = new File(file.getParentFile(), processingFileName);
this.metadata = new Metadata(file);
this.inputPathSubDir = determineRelativePath(file, config.inputPath);
this.metadata = new Metadata(file, this.inputPathSubDir);
}

static final Map<String, String> SUPPORTED_COMPRESSION_TYPES = ImmutableMap.of(
Expand All @@ -67,6 +73,20 @@ public class InputFile implements Closeable {
"z", CompressorStreamFactory.Z
);


private static String determineRelativePath(File inputPath, File inputFile) {
Path relative = inputFile.toPath().relativize(inputPath.toPath()); // inputPath.toPath().relativize(inputFile.getParentFile().toPath());
String subDir = relative.toString();
if ("".equals(subDir)) {
return null;
}
return subDir;
}

public String inputPathSubDir() {
return this.inputPathSubDir;
}

public File file() {
return this.file;
}
Expand All @@ -80,7 +100,6 @@ public Metadata metadata() {
}



public InputStream inputStream() {
return this.inputStream;
}
Expand Down Expand Up @@ -197,7 +216,51 @@ public long lastModified() {
return this.lastModified;
}

private List<File> getInputPathSubDirsToCleanup() {
List<File> inputPathSubDirsToCleanup = null;
if (this.inputPathSubDir != null && !config.inputPathWalkRecursivelyRetainSubDirs) {
inputPathSubDirsToCleanup = new ArrayList<File>();
File lastSubDir = this.config.inputPath;
for (String subDirName : this.inputPathSubDir.split(File.separator)) {
lastSubDir = new File(lastSubDir, subDirName);
inputPathSubDirsToCleanup.add(lastSubDir);
}
Collections.reverse(inputPathSubDirsToCleanup);
}
return inputPathSubDirsToCleanup;
}

private void cleanupInputDirSubDirs() {
List<File> inputPathSubDirsToCleanup = this.getInputPathSubDirsToCleanup();
if (inputPathSubDirsToCleanup != null) {
for (File subDir : inputPathSubDirsToCleanup) {
try {
if (subDir.listFiles() == null || subDir.listFiles().length == 0) {
if (!subDir.delete()) {
log.error("Failed to delete input.path sub-directory: {}", subDir);
} else {
log.info("Cleaned up input.path sub-directory: {}", subDir);
}
} else {
log.info("Cannot clean up input.path sub-directory as it is not empty: {}", subDir);
}
} catch (SecurityException e) {
log.error("SecurityException thrown while trying to delete input.path sub-directory: {}", subDir, e);
}
}
}
}


public void moveToDirectory(File outputDirectory) {

if (this.inputPathSubDir != null) {
outputDirectory = new File(outputDirectory, this.inputPathSubDir);
if (!outputDirectory.isDirectory()) {
outputDirectory.mkdirs();
}
}

File outputFile = new File(outputDirectory, this.file.getName());
try {
if (this.file.exists()) {
Expand All @@ -207,13 +270,18 @@ public void moveToDirectory(File outputDirectory) {
} catch (IOException e) {
log.error("Exception thrown while trying to move {} to {}", this.file, outputFile, e);
}

this.cleanupInputDirSubDirs();

}

public void delete() {
log.info("Deleting {}", this.file);
if (!this.file.delete()) {
log.warn("Could not delete {}", this.file);
}

this.cleanupInputDirSubDirs();
}

public boolean exists() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;

import shaded.com.google.common.collect.ForwardingDeque;
import com.google.common.collect.ForwardingDeque;
import com.google.common.io.PatternFilenameFilter;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class InputFileDequeue extends ForwardingDeque<InputFile> {
private static final Logger log = LoggerFactory.getLogger(InputFileDequeue.class);
Expand Down Expand Up @@ -58,7 +63,26 @@ protected Deque<InputFile> delegate() {
}

log.trace("delegate() - Searching for file(s) in {}", this.config.inputPath);
File[] input = this.config.inputPath.listFiles(this.config.inputFilenameFilter);

final File[] input;

if (this.config.inputPathWalkRecursively) {
final PatternFilenameFilter walkerFilenameFilter = this.config.inputFilenameFilter;
Predicate<File> filenameFilterPredicate = file -> walkerFilenameFilter.accept(file.getParentFile(), file.getName());

try (Stream<Path> filesWalk = Files.walk(this.config.inputPath.toPath())) {
input = filesWalk.map(Path::toFile)
.filter(File::isFile)
.filter(filenameFilterPredicate)
.toArray(File[]::new);
} catch (IOException e) {
log.error("Unexpected eror walking {}: {}", this.config.inputPath.toPath(), e.getMessage(), e);
return new ArrayDeque<>();
}
} else {
input = this.config.inputPath.listFiles(this.config.inputFilenameFilter);
}

if (null == input || input.length == 0) {
log.info("No files matching {} were found in {}", AbstractSourceConnectorConfig.INPUT_FILE_PATTERN_CONF, this.config.inputPath);
return new ArrayDeque<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package com.github.jcustenborder.kafka.connect.spooldir;

import shaded.com.fasterxml.jackson.core.JsonFactory;
import shaded.com.fasterxml.jackson.core.JsonParser;
import shaded.com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
import org.apache.kafka.connect.data.Schema;

Expand Down
Loading

0 comments on commit ec283f8

Please sign in to comment.