Skip to content

Commit

Permalink
Add header removal for TextIO (#29202)
Browse files Browse the repository at this point in the history
* add header removal

* add multiple header lines removal
  • Loading branch information
stankiewicz authored Nov 8, 2023
1 parent 70093ce commit bf47289
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 28 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* TextIO now supports skipping multiple header lines (Java) ([#17990](https://github.com/apache/beam/issues/17990)).

## New Features / Improvements

Expand Down Expand Up @@ -107,6 +108,7 @@ should handle this. ([#25252](https://github.com/apache/beam/issues/25252)).
* state amd side input cache has been enabled to a default of 100 MB. Use `--max_cache_memory_usage_mb=X` to provide cache size for the user state API and side inputs. (Python) ([#28770](https://github.com/apache/beam/issues/28770)).
* Beam YAML stable release. Beam pipelines can now be written using YAML and leverage the Beam YAML framework which includes a preliminary set of IO's and turnkey transforms. More information can be found in the YAML root folder and in the [README](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/README.md).


## Breaking Changes

* `org.apache.beam.sdk.io.CountingSource.CounterMark` uses custom `CounterMarkCoder` as a default coder since all Avro-dependent
Expand Down
41 changes: 36 additions & 5 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public static Read read() {
return new AutoValue_TextIO_Read.Builder()
.setCompression(Compression.AUTO)
.setHintMatchesManyFiles(false)
.setSkipHeaderLines(0)
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
.build();
}
Expand All @@ -214,6 +215,7 @@ public static Read read() {
public static ReadAll readAll() {
return new AutoValue_TextIO_ReadAll.Builder()
.setCompression(Compression.AUTO)
.setSkipHeaderLines(0)
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
.build();
}
Expand All @@ -228,6 +230,7 @@ public static ReadFiles readFiles() {
// but is not so large as to exhaust a typical runner's maximum amount of output per
// ProcessElement call.
.setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
.setSkipHeaderLines(0)
.build();
}

Expand Down Expand Up @@ -286,6 +289,8 @@ public abstract static class Read extends PTransform<PBegin, PCollection<String>
@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
abstract byte @Nullable [] getDelimiter();

abstract int getSkipHeaderLines();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -300,6 +305,8 @@ abstract static class Builder {

abstract Builder setDelimiter(byte @Nullable [] delimiter);

abstract Builder setSkipHeaderLines(int skipHeaderLines);

abstract Read build();
}

Expand Down Expand Up @@ -396,6 +403,10 @@ public Read withDelimiter(byte[] delimiter) {
return toBuilder().setDelimiter(delimiter).build();
}

public Read withSkipHeaderLines(int skipHeaderLines) {
return toBuilder().setSkipHeaderLines(skipHeaderLines).build();
}

static boolean isSelfOverlapping(byte[] s) {
// s self-overlaps if v exists such as s = vu = wv with u and w non empty
for (int i = 1; i < s.length - 1; ++i) {
Expand All @@ -422,7 +433,9 @@ public PCollection<String> expand(PBegin input) {
FileIO.readMatches()
.withCompression(getCompression())
.withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
.apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
.apply(
"Via ReadFiles",
readFiles().withDelimiter(getDelimiter()).withSkipHeaderLines(getSkipHeaderLines()));
}

// Helper to create a source specific to the requested compression type.
Expand All @@ -431,7 +444,8 @@ protected FileBasedSource<String> getSource() {
new TextSource(
getFilepattern(),
getMatchConfiguration().getEmptyMatchTreatment(),
getDelimiter()))
getDelimiter(),
getSkipHeaderLines()))
.withCompression(getCompression());
}

Expand Down Expand Up @@ -468,6 +482,8 @@ public abstract static class ReadAll
@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
abstract byte @Nullable [] getDelimiter();

abstract int getSkipHeaderLines();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -478,6 +494,8 @@ abstract static class Builder {

abstract Builder setDelimiter(byte @Nullable [] delimiter);

abstract Builder setSkipHeaderLines(int skipHeaderLines);

abstract ReadAll build();
}

Expand Down Expand Up @@ -560,6 +578,8 @@ public abstract static class ReadFiles
@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
abstract byte @Nullable [] getDelimiter();

abstract int getSkipHeaderLines();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -568,6 +588,8 @@ abstract static class Builder {

abstract Builder setDelimiter(byte @Nullable [] delimiter);

abstract Builder setSkipHeaderLines(int skipHeaderLines);

abstract ReadFiles build();
}

Expand All @@ -581,13 +603,17 @@ public ReadFiles withDelimiter(byte[] delimiter) {
return toBuilder().setDelimiter(delimiter).build();
}

public ReadFiles withSkipHeaderLines(int skipHeaderLines) {
return toBuilder().setSkipHeaderLines(skipHeaderLines).build();
}

@Override
public PCollection<String> expand(PCollection<FileIO.ReadableFile> input) {
return input.apply(
"Read all via FileBasedSource",
new ReadAllViaFileBasedSource<>(
getDesiredBundleSizeBytes(),
new CreateTextSourceFn(getDelimiter()),
new CreateTextSourceFn(getDelimiter(), getSkipHeaderLines()),
StringUtf8Coder.of()));
}

Expand All @@ -602,15 +628,20 @@ public void populateDisplayData(DisplayData.Builder builder) {
private static class CreateTextSourceFn
implements SerializableFunction<String, FileBasedSource<String>> {
private byte[] delimiter;
private int skipHeaderLines;

private CreateTextSourceFn(byte[] delimiter) {
private CreateTextSourceFn(byte[] delimiter, int skipHeaderLines) {
this.delimiter = delimiter;
this.skipHeaderLines = skipHeaderLines;
}

@Override
public FileBasedSource<String> apply(String input) {
return new TextSource(
StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter);
StaticValueProvider.of(input),
EmptyMatchTreatment.DISALLOW,
delimiter,
skipHeaderLines);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public abstract class TextRowCountEstimator {
@SuppressWarnings("mutable")
public abstract byte @Nullable [] getDelimiters();

public abstract int getSkipHeaderLines();

public abstract String getFilePattern();

public abstract Compression getCompression();
Expand All @@ -62,7 +64,8 @@ public static TextRowCountEstimator.Builder builder() {
.setNumSampledBytesPerFile(DEFAULT_NUM_BYTES_PER_FILE)
.setCompression(DEFAULT_COMPRESSION)
.setDirectoryTreatment(DEFAULT_DIRECTORY_TREATMENT)
.setEmptyMatchTreatment(DEFAULT_EMPTY_MATCH_TREATMENT);
.setEmptyMatchTreatment(DEFAULT_EMPTY_MATCH_TREATMENT)
.setSkipHeaderLines(0);
}

/**
Expand Down Expand Up @@ -114,7 +117,8 @@ public Double estimateRowCount(PipelineOptions pipelineOptions)
new TextSource(
ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().toString()),
getEmptyMatchTreatment(),
getDelimiters());
getDelimiters(),
getSkipHeaderLines());
FileBasedSource<String> source =
CompressedSource.from(textSource).withCompression(file.getCompression());
try (BoundedSource.BoundedReader<String> reader =
Expand Down Expand Up @@ -160,6 +164,8 @@ public abstract Builder setDirectoryTreatment(

public abstract Builder setDelimiters(byte @Nullable [] delimiters);

public abstract Builder setSkipHeaderLines(int skipHeaderLines);

public abstract Builder setFilePattern(String filePattern);

public abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
Expand Down
64 changes: 54 additions & 10 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,43 @@
public class TextSource extends FileBasedSource<String> {
byte[] delimiter;

int skipHeaderLines;

public TextSource(
ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) {
ValueProvider<String> fileSpec,
EmptyMatchTreatment emptyMatchTreatment,
byte[] delimiter,
int skipHeaderLines) {
super(fileSpec, emptyMatchTreatment, 1L);
this.delimiter = delimiter;
this.skipHeaderLines = skipHeaderLines;
}

public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) {
public TextSource(
ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) {
this(fileSpec, emptyMatchTreatment, delimiter, 0);
}

public TextSource(
MatchResult.Metadata metadata, long start, long end, byte[] delimiter, int skipHeaderLines) {
super(metadata, 1L, start, end);
this.delimiter = delimiter;
this.skipHeaderLines = skipHeaderLines;
}

public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) {
this(metadata, start, end, delimiter, 0);
}

@Override
protected FileBasedSource<String> createForSubrangeOfFile(
MatchResult.Metadata metadata, long start, long end) {
return new TextSource(metadata, start, end, delimiter);
return new TextSource(metadata, start, end, delimiter, skipHeaderLines);
}

@Override
protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {
return new TextBasedReader(this, delimiter);
return new TextBasedReader(this, delimiter, skipHeaderLines);
}

@Override
Expand All @@ -98,6 +115,7 @@ static class TextBasedReader extends FileBasedReader<String> {
private static final byte LF = '\n';

private final byte @Nullable [] delimiter;
private final int skipHeaderLines;
private final ByteArrayOutputStream str;
private final byte[] buffer;
private final ByteBuffer byteBuffer;
Expand All @@ -112,11 +130,16 @@ static class TextBasedReader extends FileBasedReader<String> {
private boolean skipLineFeedAtStart; // skip an LF if at the start of the next buffer

private TextBasedReader(TextSource source, byte[] delimiter) {
this(source, delimiter, 0);
}

private TextBasedReader(TextSource source, byte[] delimiter, int skipHeaderLines) {
super(source);
this.buffer = new byte[READ_BUFFER_SIZE];
this.str = new ByteArrayOutputStream();
this.byteBuffer = ByteBuffer.wrap(buffer);
this.delimiter = delimiter;
this.skipHeaderLines = skipHeaderLines;
}

@Override
Expand Down Expand Up @@ -171,21 +194,42 @@ protected void startReading(ReadableByteChannel channel) throws IOException {
} else {
startOfNextRecord = bufferPosn = (int) requiredPosition;
}
skipHeader(skipHeaderLines, true);
} else {
((SeekableByteChannel) channel).position(requiredPosition);
startOfNextRecord = requiredPosition;
skipHeader(skipHeaderLines, false);
if (requiredPosition > startOfNextRecord) {
((SeekableByteChannel) channel).position(requiredPosition);
startOfNextRecord = requiredPosition;
bufferLength = bufferPosn = 0;
}
// Read and discard the next record ensuring that startOfNextRecord and bufferPosn point
// to the beginning of the next record.
readNextRecord();
currentValue = null;
}

// Read and discard the next record ensuring that startOfNextRecord and bufferPosn point
// to the beginning of the next record.
readNextRecord();
currentValue = null;
} else {
// Check to see if we start with the UTF_BOM bytes skipping them if present.
if (fileStartsWithBom()) {
startOfNextRecord = bufferPosn = UTF8_BOM.size();
}
skipHeader(skipHeaderLines, false);
}
}

private void skipHeader(int headerLines, boolean skipFirstLine) throws IOException {
if (headerLines == 1) {
readNextRecord();
} else if (headerLines > 1) {
// this will be expensive
((SeekableByteChannel) inChannel).position(0);
for (int line = 0; line < headerLines; ++line) {
readNextRecord();
}
} else if (headerLines == 0 && skipFirstLine) {
readNextRecord();
}
currentValue = null;
}

private boolean fileStartsWithBom() throws IOException {
Expand Down
Loading

0 comments on commit bf47289

Please sign in to comment.