Skip to content

Commit

Permalink
Issue 100 (#172)
Browse files Browse the repository at this point in the history
* Initial commit adding support for reading avro files. Fixes #100.

* Updated introduction.

* Added testcases and added method to start the processing of a while when you don't open in the context of the InputFile.

* Added check to test case to call out when a resource file is not found.

* Added testing data for Avro files.

* Code cleanup.

* Updated documentation about installation.

* Updated to new parent pom to correct jdk 11 issues. Fixes #170.
  • Loading branch information
jcustenborder authored Feb 3, 2021
1 parent 10dcc69 commit 011238a
Show file tree
Hide file tree
Showing 13 changed files with 7,308 additions and 67 deletions.
2,113 changes: 2,099 additions & 14 deletions README.md

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions bin/debug.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
: ${ERROR_PATH:='/tmp/spooldir/error'}
: ${FINISHED_PATH:='/tmp/spooldir/finished'}
: ${DEBUG_SUSPEND_FLAG:='y'}
export KAFKA_DEBUG='y'
export DEBUG_SUSPEND_FLAG='y'
export KAFKA_DEBUG='n'
export DEBUG_SUSPEND_FLAG='n'
# export KAFKA_OPTS='-agentpath:/Applications/YourKit-Java-Profiler-2017.02.app/Contents/Resources/bin/mac/libyjpagent.jnilib=disablestacktelemetry,exceptions=disable,delay=10000'
set -e

Expand All @@ -39,4 +39,5 @@ if [ ! -d "${FINISHED_PATH}" ]; then
fi

cp src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/csv/FieldsMatch.data "${INPUT_PATH}/FieldsMatch.csv"
connect-standalone config/connect-avro-docker.properties config/CSVSchemaGenerator.properties
connect-standalone config/connect-avro-docker.properties config/CSVSchemaGenerator.properties
# connect-standalone config/connect-avro-docker.properties config/AvroExample.properties
29 changes: 29 additions & 0 deletions config/AvroExample.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Copyright © 2016 Jeremy Custenborder ([email protected])
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name=AvroSpoolDir
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirAvroSourceConnector
input.file.pattern=^.*\.avro$

halt.on.error=false
topic=testing

input.path=/Users/jeremy/data/stackoverflow
finished.path=/tmp/spooldir/finished
error.path=/tmp/spooldir/error
batch.size = 5000
cleanup.policy = NONE
7 changes: 1 addition & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>com.github.jcustenborder.kafka.connect</groupId>
<artifactId>kafka-connect-parent</artifactId>
<version>2.6.0</version>
<version>2.6.1</version>
</parent>
<artifactId>kafka-connect-spooldir</artifactId>
<version>2.0-SNAPSHOT</version>
Expand Down Expand Up @@ -61,11 +61,6 @@
<url>https://github.com/jcustenborder/kafka-connect-spooldir/issues</url>
</issueManagement>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>net.sourceforge.argparse4j</groupId>
<artifactId>argparse4j</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

public class InputFile implements Closeable {
private static final Logger log = LoggerFactory.getLogger(InputFile.class);
private final File inputFile;
private final File file;
private final File processingFlag;
private final String name;
private final String path;
Expand All @@ -45,17 +45,18 @@ public class InputFile implements Closeable {
private final AbstractSourceConnectorConfig config;
InputStreamReader inputStreamReader;
LineNumberReader lineNumberReader;
InputStream inputStream;

InputFile(AbstractSourceConnectorConfig config, File inputFile) {
InputFile(AbstractSourceConnectorConfig config, File file) {
this.config = config;
this.inputFile = inputFile;
this.name = this.inputFile.getName();
this.path = this.inputFile.getPath();
this.lastModified = this.inputFile.lastModified();
this.length = this.inputFile.length();
String processingFileName = inputFile.getName() + config.processingFileExtension;
this.processingFlag = new File(inputFile.getParentFile(), processingFileName);
this.metadata = new Metadata(inputFile);
this.file = file;
this.name = this.file.getName();
this.path = this.file.getPath();
this.lastModified = this.file.lastModified();
this.length = this.file.length();
String processingFileName = file.getName() + config.processingFileExtension;
this.processingFlag = new File(file.getParentFile(), processingFileName);
this.metadata = new Metadata(file);
}

static final Map<String, String> SUPPORTED_COMPRESSION_TYPES = ImmutableMap.of(
Expand All @@ -66,11 +67,19 @@ public class InputFile implements Closeable {
"z", CompressorStreamFactory.Z
);

public File file() {
return this.file;
}

public File processingFlag() {
return this.processingFlag;
}

public Metadata metadata() {
return this.metadata;
}

private InputStream inputStream;


public InputStream inputStream() {
return this.inputStream;
Expand All @@ -79,26 +88,26 @@ public InputStream inputStream() {
public InputStream openStream() throws IOException {
if (null != this.inputStream) {
throw new IOException(
String.format("File %s is already open", this.inputFile)
String.format("File %s is already open", this.file)
);
}

final String extension = Files.getFileExtension(inputFile.getName());
log.trace("openStream() - fileName = '{}' extension = '{}'", inputFile, extension);
this.inputStream = new FileInputStream(this.inputFile);
final String extension = Files.getFileExtension(file.getName());
log.trace("openStream() - fileName = '{}' extension = '{}'", file, extension);
this.inputStream = new FileInputStream(this.file);

if (this.config.bufferedInputStream) {
log.trace(
"openStream() - Wrapping '{}' in a BufferedInputStream with bufferSize = {}",
this.inputFile,
this.file,
this.config.fileBufferSizeBytes
);
this.inputStream = new BufferedInputStream(this.inputStream, this.config.fileBufferSizeBytes);
}

if (SUPPORTED_COMPRESSION_TYPES.containsKey(extension)) {
final String compressor = SUPPORTED_COMPRESSION_TYPES.get(extension);
log.info("Decompressing {} as {}", inputFile, compressor);
log.info("Decompressing {} as {}", file, compressor);
final CompressorStreamFactory compressorStreamFactory = new CompressorStreamFactory();
try {
this.inputStream = compressorStreamFactory.createCompressorInputStream(
Expand All @@ -110,12 +119,16 @@ public InputStream openStream() throws IOException {
}
}

log.info("Creating processing flag {}", this.processingFlag);
Files.touch(this.processingFlag);
startProcessing();

return inputStream;
}

public void startProcessing() throws IOException {
log.info("Creating processing flag {}", this.processingFlag);
Files.touch(this.processingFlag);
}

public InputStreamReader openInputStreamReader(Charset charset) throws IOException {
if (null == this.inputStreamReader) {
InputStream inputStream = null != this.inputStream ? this.inputStream : openStream();
Expand Down Expand Up @@ -145,7 +158,7 @@ public LineNumberReader lineNumberReader() {

@Override
public String toString() {
return this.inputFile.toString();
return this.file.toString();
}

@Override
Expand All @@ -157,7 +170,7 @@ public void close() throws IOException {
this.inputStreamReader.close();
}
if (null != this.inputStream) {
log.info("Closing {}", this.inputFile);
log.info("Closing {}", this.file);
this.inputStream.close();
}
if (this.processingFlag.exists()) {
Expand Down Expand Up @@ -185,25 +198,25 @@ public long lastModified() {
}

public void moveToDirectory(File outputDirectory) {
File outputFile = new File(outputDirectory, this.inputFile.getName());
File outputFile = new File(outputDirectory, this.file.getName());
try {
if (this.inputFile.exists()) {
log.info("Moving {} to {}", this.inputFile, outputFile);
Files.move(this.inputFile, outputFile);
if (this.file.exists()) {
log.info("Moving {} to {}", this.file, outputFile);
Files.move(this.file, outputFile);
}
} catch (IOException e) {
log.error("Exception thrown while trying to move {} to {}", this.inputFile, outputFile, e);
log.error("Exception thrown while trying to move {} to {}", this.file, outputFile, e);
}
}

public void delete() {
log.info("Deleting {}", this.inputFile);
if (!this.inputFile.delete()) {
log.warn("Could not delete {}", this.inputFile);
log.info("Deleting {}", this.file);
if (!this.file.delete()) {
log.warn("Could not delete {}", this.file);
}
}

public boolean exists() {
return this.inputFile.exists();
return this.file.exists();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright © 2016 Jeremy Custenborder ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.spooldir;

import com.github.jcustenborder.kafka.connect.utils.config.Description;
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationImportant;
import com.github.jcustenborder.kafka.connect.utils.config.Title;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;

import java.util.Map;

@Title("Avro Source Connector")
@Description("This connector is used to read avro data files from the file system and write their contents " +
"to Kafka. The schema of the file is used to read the data and produce it to Kafka")
@DocumentationImportant("This connector has a dependency on the Confluent Schema Registry specifically kafka-connect-avro-converter. " +
"This dependency is not shipped along with the connector to ensure that there are not potential version mismatch issues. " +
"The easiest way to ensure this component is available is to use one of the Confluent packages or containers for deployment.")
public class SpoolDirAvroSourceConnector extends AbstractSourceConnector<SpoolDirAvroSourceConnectorConfig> {
@Override
protected SpoolDirAvroSourceConnectorConfig config(Map<String, ?> settings) {
return new SpoolDirAvroSourceConnectorConfig(settings);
}

@Override
public Class<? extends Task> taskClass() {
return SpoolDirAvroSourceTask.class;
}

@Override
public ConfigDef config() {
return SpoolDirAvroSourceConnectorConfig.config();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Copyright © 2016 Jeremy Custenborder ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.spooldir;

import org.apache.kafka.common.config.ConfigDef;

import java.util.Map;

public class SpoolDirAvroSourceConnectorConfig extends AbstractSourceConnectorConfig {


public SpoolDirAvroSourceConnectorConfig(Map<?, ?> originals) {
super(config(), originals, true);
}

public static ConfigDef config() {
return AbstractSourceConnectorConfig.config(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Copyright © 2016 Jeremy Custenborder ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.spooldir;

import io.confluent.connect.avro.AvroData;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class SpoolDirAvroSourceTask extends AbstractSourceTask<SpoolDirAvroSourceConnectorConfig> {
private static final Logger log = LoggerFactory.getLogger(SpoolDirAvroSourceTask.class);
long recordOffset;
AvroData avroData = new AvroData(1024);
DataFileReader<GenericContainer> dataFileReader;
DatumReader<GenericContainer> datumReader = new GenericDatumReader<>();


@Override
protected SpoolDirAvroSourceConnectorConfig config(Map<String, ?> settings) {
return new SpoolDirAvroSourceConnectorConfig(settings);
}

@Override
protected void configure(InputFile inputFile, Long lastOffset) throws IOException {
if (null != this.dataFileReader) {
this.dataFileReader.close();
}
inputFile.startProcessing();
this.dataFileReader = new DataFileReader<>(inputFile.file(), datumReader);
this.recordOffset = 0;

if (null != lastOffset) {
while (recordOffset < lastOffset && this.dataFileReader.hasNext()) {
this.dataFileReader.next();
recordOffset++;
}
}

}

@Override
protected List<SourceRecord> process() throws IOException {
int recordCount = 0;
List<SourceRecord> records = new ArrayList<>(this.config.batchSize);
GenericContainer container = null;
while (recordCount <= this.config.batchSize && dataFileReader.hasNext()) {
container = dataFileReader.next(container);
SchemaAndValue value = avroData.toConnectData(this.dataFileReader.getSchema(), container);
SourceRecord sourceRecord = record(null, value, null);
records.add(sourceRecord);
recordCount++;
recordOffset++;
}
return records;
}

@Override
protected long recordOffset() {
return recordOffset;
}
}
Loading

0 comments on commit 011238a

Please sign in to comment.