From 1d917d84747e4e9ce6f19ff130b89c9f0ccbaabe Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 30 Oct 2023 18:05:48 +0000 Subject: [PATCH 01/19] add header removal --- .../java/org/apache/beam/sdk/io/TextIO.java | 33 ++++++++-- .../beam/sdk/io/TextRowCountEstimator.java | 6 +- .../org/apache/beam/sdk/io/TextSource.java | 20 ++++-- .../apache/beam/sdk/io/TextIOReadTest.java | 61 +++++++++++++++---- 4 files changed, 98 insertions(+), 22 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 33beff23b311..dbe816ec01de 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -191,6 +191,7 @@ public static Read read() { return new AutoValue_TextIO_Read.Builder() .setCompression(Compression.AUTO) .setHintMatchesManyFiles(false) + .setRemoveHeader(false) .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) .build(); } @@ -214,6 +215,7 @@ public static Read read() { public static ReadAll readAll() { return new AutoValue_TextIO_ReadAll.Builder() .setCompression(Compression.AUTO) + .setRemoveHeader(false) .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) .build(); } @@ -286,6 +288,8 @@ public abstract static class Read extends PTransform @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller abstract byte @Nullable [] getDelimiter(); + abstract boolean getRemoveHeader(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -300,6 +304,8 @@ abstract static class Builder { abstract Builder setDelimiter(byte @Nullable [] delimiter); + abstract Builder setRemoveHeader(boolean removeHeader); + abstract Read build(); } @@ -396,6 +402,10 @@ public Read withDelimiter(byte[] delimiter) { return toBuilder().setDelimiter(delimiter).build(); } + public Read withRemoveHeader(boolean removeHeader){ + return toBuilder().setRemoveHeader(removeHeader).build(); + } + static boolean isSelfOverlapping(byte[] s) { // s self-overlaps if v exists such as s = vu = wv with u and w non empty for (int i = 1; i < s.length - 1; ++i) { @@ -422,7 +432,7 @@ public PCollection expand(PBegin input) { FileIO.readMatches() .withCompression(getCompression()) .withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) - .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter())); + .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()).withRemoveHeader(getRemoveHeader())); } // Helper to create a source specific to the requested compression type. @@ -431,7 +441,7 @@ protected FileBasedSource getSource() { new TextSource( getFilepattern(), getMatchConfiguration().getEmptyMatchTreatment(), - getDelimiter())) + getDelimiter(), getRemoveHeader())) .withCompression(getCompression()); } @@ -468,6 +478,8 @@ public abstract static class ReadAll @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller abstract byte @Nullable [] getDelimiter(); + abstract boolean getRemoveHeader(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -477,6 +489,7 @@ abstract static class Builder { abstract Builder setCompression(Compression compression); abstract Builder setDelimiter(byte @Nullable [] delimiter); + abstract Builder setRemoveHeader(boolean removeHeader); abstract ReadAll build(); } @@ -560,6 +573,8 @@ public abstract static class ReadFiles @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller abstract byte @Nullable [] getDelimiter(); + abstract boolean getRemoveHeader(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -568,6 +583,8 @@ abstract static class Builder { abstract Builder setDelimiter(byte @Nullable [] delimiter); + abstract Builder setRemoveHeader(boolean removeHeader); + abstract ReadFiles build(); } @@ -581,13 +598,17 @@ public ReadFiles withDelimiter(byte[] delimiter) { return toBuilder().setDelimiter(delimiter).build(); } + public ReadFiles withRemoveHeader(boolean removeHeader){ + return toBuilder().setRemoveHeader(removeHeader).build(); + } + @Override public PCollection expand(PCollection input) { return input.apply( "Read all via FileBasedSource", new ReadAllViaFileBasedSource<>( getDesiredBundleSizeBytes(), - new CreateTextSourceFn(getDelimiter()), + new CreateTextSourceFn(getDelimiter(), getRemoveHeader()), StringUtf8Coder.of())); } @@ -602,15 +623,17 @@ public void populateDisplayData(DisplayData.Builder builder) { private static class CreateTextSourceFn implements SerializableFunction> { private byte[] delimiter; + private boolean removeHeader; - private CreateTextSourceFn(byte[] delimiter) { + private CreateTextSourceFn(byte[] delimiter, boolean removeHeader) { this.delimiter = delimiter; + this.removeHeader = removeHeader; } @Override public FileBasedSource apply(String input) { return new TextSource( - StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter); + StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter, removeHeader); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java index 32b7fb12f414..45bd21802eb2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java @@ -46,6 +46,8 @@ public abstract class TextRowCountEstimator { @SuppressWarnings("mutable") public abstract byte @Nullable [] getDelimiters(); + public abstract boolean getRemoveHeader(); + public abstract String getFilePattern(); public abstract Compression getCompression(); @@ -114,7 +116,7 @@ public Double estimateRowCount(PipelineOptions pipelineOptions) new TextSource( ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().toString()), getEmptyMatchTreatment(), - getDelimiters()); + getDelimiters(), getRemoveHeader()); FileBasedSource source = CompressedSource.from(textSource).withCompression(file.getCompression()); try (BoundedSource.BoundedReader reader = @@ -160,6 +162,8 @@ public abstract Builder setDirectoryTreatment( public abstract Builder setDelimiters(byte @Nullable [] delimiters); + public abstract Builder setRemoveHeader(boolean removeHeader); + public abstract Builder setFilePattern(String filePattern); public abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index bef30dffa8ac..7c22976fd16c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -56,26 +56,30 @@ public class TextSource extends FileBasedSource { byte[] delimiter; + boolean removeHeader; + public TextSource( - ValueProvider fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) { + ValueProvider fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter, boolean removeHeader) { super(fileSpec, emptyMatchTreatment, 1L); this.delimiter = delimiter; + this.removeHeader = removeHeader; } - public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) { + public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter, boolean removeHeader) { super(metadata, 1L, start, end); this.delimiter = delimiter; + this.removeHeader = removeHeader; } @Override protected FileBasedSource createForSubrangeOfFile( MatchResult.Metadata metadata, long start, long end) { - return new TextSource(metadata, start, end, delimiter); + return new TextSource(metadata, start, end, delimiter, removeHeader); } @Override protected FileBasedReader createSingleFileReader(PipelineOptions options) { - return new TextBasedReader(this, delimiter); + return new TextBasedReader(this, delimiter, removeHeader); } @Override @@ -98,6 +102,7 @@ static class TextBasedReader extends FileBasedReader { private static final byte LF = '\n'; private final byte @Nullable [] delimiter; + private final boolean removeHeader; private final ByteArrayOutputStream str; private final byte[] buffer; private final ByteBuffer byteBuffer; @@ -111,12 +116,13 @@ static class TextBasedReader extends FileBasedReader { private int bufferPosn = 0; // the current position in the buffer private boolean skipLineFeedAtStart; // skip an LF if at the start of the next buffer - private TextBasedReader(TextSource source, byte[] delimiter) { + private TextBasedReader(TextSource source, byte[] delimiter, boolean removeHeader) { super(source); this.buffer = new byte[READ_BUFFER_SIZE]; this.str = new ByteArrayOutputStream(); this.byteBuffer = ByteBuffer.wrap(buffer); this.delimiter = delimiter; + this.removeHeader = removeHeader; } @Override @@ -185,6 +191,10 @@ protected void startReading(ReadableByteChannel channel) throws IOException { if (fileStartsWithBom()) { startOfNextRecord = bufferPosn = UTF8_BOM.size(); } + if(removeHeader) { + readNextRecord(); + currentValue = null; + } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 379345b1001e..12f9c8745421 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -245,15 +245,15 @@ private static File createZipFile( } private static TextSource prepareSource( - TemporaryFolder temporaryFolder, byte[] data, @Nullable byte[] delimiter) throws IOException { + TemporaryFolder temporaryFolder, byte[] data, @Nullable byte[] delimiter, boolean removeHeader) throws IOException { Path path = temporaryFolder.newFile().toPath(); Files.write(path, data); - return getTextSource(path.toString(), delimiter); + return getTextSource(path.toString(), delimiter, removeHeader); } - public static TextSource getTextSource(String path, @Nullable byte[] delimiter) { + public static TextSource getTextSource(String path, @Nullable byte[] delimiter, boolean removeHeader) { return new TextSource( - ValueProvider.StaticValueProvider.of(path), EmptyMatchTreatment.DISALLOW, delimiter); + ValueProvider.StaticValueProvider.of(path), EmptyMatchTreatment.DISALLOW, delimiter, removeHeader); } private static String getFileSuffix(Compression compression) { @@ -384,7 +384,7 @@ public void testReadLinesWithDefaultDelimiterAndZeroAndOneLengthReturningChannel Files.write(path, line.getBytes(UTF_8)); Metadata metadata = FileSystems.matchSingleFileSpec(path.toString()); FileBasedSource source = - getTextSource(path.toString(), null) + getTextSource(path.toString(), null, false) .createForSubrangeOfFile(metadata, 0, metadata.sizeBytes()); FileBasedReader reader = source.createSingleFileReader(PipelineOptionsFactory.create()); @@ -433,7 +433,46 @@ public void testSplittingSource() throws Exception { } private TextSource prepareSource(byte[] data) throws IOException { - return TextIOReadTest.prepareSource(tempFolder, data, null); + return TextIOReadTest.prepareSource(tempFolder, data, null, false); + } + + private void runTestReadWithData(byte[] data, List expectedResults) throws Exception { + TextSource source = prepareSource(data); + List actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); + assertThat( + actual, containsInAnyOrder(new ArrayList<>(expectedResults).toArray(new String[0]))); + } + } + + + /** Tests for reading files with/without header */ + @RunWith(Parameterized.class) + public static class ReadWithoutHeaderTest { + private static final ImmutableList EXPECTED = ImmutableList.of("asdf", "hjkl", "xyz"); + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Parameterized.Parameters(name = "{index}: {0}") + public static Iterable data() { + return ImmutableList.builder() + .add(new Object[] {"\n\n\n", ImmutableList.of("", "")}) + .add(new Object[] {"\n", ImmutableList.of()}) + .add(new Object[] {"header\nasdf\nhjkl\nxyz\n", EXPECTED}) + .build(); + } + + @Parameterized.Parameter(0) + public String line; + + @Parameterized.Parameter(1) + public ImmutableList expected; + + @Test + public void testReadLines() throws Exception { + runTestReadWithData(line.getBytes(UTF_8), expected); + } + + private TextSource prepareSource(byte[] data) throws IOException { + return TextIOReadTest.prepareSource(tempFolder, data, null, true); } private void runTestReadWithData(byte[] data, List expectedResults) throws Exception { @@ -477,7 +516,7 @@ public static Iterable data() { @Test public void testReadLinesWithCustomDelimiter() throws Exception { SourceTestUtils.assertSplitAtFractionExhaustive( - TextIOReadTest.prepareSource(tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}), + TextIOReadTest.prepareSource(tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}, false), PipelineOptionsFactory.create()); } @@ -489,7 +528,7 @@ public void testReadLinesWithCustomDelimiterAndZeroAndOneLengthReturningChannel( Files.write(path, testCase.getBytes(UTF_8)); Metadata metadata = FileSystems.matchSingleFileSpec(path.toString()); FileBasedSource source = - getTextSource(path.toString(), delimiter) + getTextSource(path.toString(), delimiter, false) .createForSubrangeOfFile(metadata, 0, metadata.sizeBytes()); FileBasedReader reader = source.createSingleFileReader(PipelineOptionsFactory.create()); @@ -743,7 +782,7 @@ public void testTextIOGetName() { } private TextSource prepareSource(byte[] data) throws IOException { - return TextIOReadTest.prepareSource(tempFolder, data, null); + return TextIOReadTest.prepareSource(tempFolder, data, null, false); } @Test @@ -977,7 +1016,7 @@ public void testReadFilesWithFilename() throws IOException { new TextSource( ValueProvider.StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, - new byte[] {'\n'}); + new byte[] {'\n'}, false); PCollection> lines = p.apply( @@ -1102,7 +1141,7 @@ public void processElement(ProcessContext c) { ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename()); // Create a TextSource, passing null as the delimiter to use the default // delimiters ('\n', '\r', or '\r\n'). - TextSource textSource = new TextSource(filenameProvider, null, null); + TextSource textSource = new TextSource(filenameProvider, null, null, false); try { BoundedSource.BoundedReader reader = textSource From e3e547d7cc422df0ddc4f0e3fc0607ebcc51e97a Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 30 Oct 2023 19:19:13 +0000 Subject: [PATCH 02/19] spotless --- .../java/org/apache/beam/sdk/io/TextIO.java | 12 +++++++---- .../beam/sdk/io/TextRowCountEstimator.java | 3 ++- .../org/apache/beam/sdk/io/TextSource.java | 10 ++++++--- .../apache/beam/sdk/io/TextIOReadTest.java | 21 +++++++++++++------ 4 files changed, 32 insertions(+), 14 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index dbe816ec01de..68c7cb302008 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -402,7 +402,7 @@ public Read withDelimiter(byte[] delimiter) { return toBuilder().setDelimiter(delimiter).build(); } - public Read withRemoveHeader(boolean removeHeader){ + public Read withRemoveHeader(boolean removeHeader) { return toBuilder().setRemoveHeader(removeHeader).build(); } @@ -432,7 +432,9 @@ public PCollection expand(PBegin input) { FileIO.readMatches() .withCompression(getCompression()) .withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) - .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()).withRemoveHeader(getRemoveHeader())); + .apply( + "Via ReadFiles", + readFiles().withDelimiter(getDelimiter()).withRemoveHeader(getRemoveHeader())); } // Helper to create a source specific to the requested compression type. @@ -441,7 +443,8 @@ protected FileBasedSource getSource() { new TextSource( getFilepattern(), getMatchConfiguration().getEmptyMatchTreatment(), - getDelimiter(), getRemoveHeader())) + getDelimiter(), + getRemoveHeader())) .withCompression(getCompression()); } @@ -489,6 +492,7 @@ abstract static class Builder { abstract Builder setCompression(Compression compression); abstract Builder setDelimiter(byte @Nullable [] delimiter); + abstract Builder setRemoveHeader(boolean removeHeader); abstract ReadAll build(); @@ -598,7 +602,7 @@ public ReadFiles withDelimiter(byte[] delimiter) { return toBuilder().setDelimiter(delimiter).build(); } - public ReadFiles withRemoveHeader(boolean removeHeader){ + public ReadFiles withRemoveHeader(boolean removeHeader) { return toBuilder().setRemoveHeader(removeHeader).build(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java index 45bd21802eb2..7ee00ae982cc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java @@ -116,7 +116,8 @@ public Double estimateRowCount(PipelineOptions pipelineOptions) new TextSource( ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().toString()), getEmptyMatchTreatment(), - getDelimiters(), getRemoveHeader()); + getDelimiters(), + getRemoveHeader()); FileBasedSource source = CompressedSource.from(textSource).withCompression(file.getCompression()); try (BoundedSource.BoundedReader reader = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index 7c22976fd16c..29d38410bbca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -59,13 +59,17 @@ public class TextSource extends FileBasedSource { boolean removeHeader; public TextSource( - ValueProvider fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter, boolean removeHeader) { + ValueProvider fileSpec, + EmptyMatchTreatment emptyMatchTreatment, + byte[] delimiter, + boolean removeHeader) { super(fileSpec, emptyMatchTreatment, 1L); this.delimiter = delimiter; this.removeHeader = removeHeader; } - public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter, boolean removeHeader) { + public TextSource( + MatchResult.Metadata metadata, long start, long end, byte[] delimiter, boolean removeHeader) { super(metadata, 1L, start, end); this.delimiter = delimiter; this.removeHeader = removeHeader; @@ -191,7 +195,7 @@ protected void startReading(ReadableByteChannel channel) throws IOException { if (fileStartsWithBom()) { startOfNextRecord = bufferPosn = UTF8_BOM.size(); } - if(removeHeader) { + if (removeHeader) { readNextRecord(); currentValue = null; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 12f9c8745421..d7cf05e3737a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -245,15 +245,23 @@ private static File createZipFile( } private static TextSource prepareSource( - TemporaryFolder temporaryFolder, byte[] data, @Nullable byte[] delimiter, boolean removeHeader) throws IOException { + TemporaryFolder temporaryFolder, + byte[] data, + @Nullable byte[] delimiter, + boolean removeHeader) + throws IOException { Path path = temporaryFolder.newFile().toPath(); Files.write(path, data); return getTextSource(path.toString(), delimiter, removeHeader); } - public static TextSource getTextSource(String path, @Nullable byte[] delimiter, boolean removeHeader) { + public static TextSource getTextSource( + String path, @Nullable byte[] delimiter, boolean removeHeader) { return new TextSource( - ValueProvider.StaticValueProvider.of(path), EmptyMatchTreatment.DISALLOW, delimiter, removeHeader); + ValueProvider.StaticValueProvider.of(path), + EmptyMatchTreatment.DISALLOW, + delimiter, + removeHeader); } private static String getFileSuffix(Compression compression) { @@ -444,7 +452,6 @@ private void runTestReadWithData(byte[] data, List expectedResults) thro } } - /** Tests for reading files with/without header */ @RunWith(Parameterized.class) public static class ReadWithoutHeaderTest { @@ -516,7 +523,8 @@ public static Iterable data() { @Test public void testReadLinesWithCustomDelimiter() throws Exception { SourceTestUtils.assertSplitAtFractionExhaustive( - TextIOReadTest.prepareSource(tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}, false), + TextIOReadTest.prepareSource( + tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}, false), PipelineOptionsFactory.create()); } @@ -1016,7 +1024,8 @@ public void testReadFilesWithFilename() throws IOException { new TextSource( ValueProvider.StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, - new byte[] {'\n'}, false); + new byte[] {'\n'}, + false); PCollection> lines = p.apply( From 63f3dec7bf08315f87baae4351ea74f9d65759a0 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 30 Oct 2023 19:31:53 +0000 Subject: [PATCH 03/19] spotless --- .../src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index d7cf05e3737a..4598e022eb92 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -452,7 +452,7 @@ private void runTestReadWithData(byte[] data, List expectedResults) thro } } - /** Tests for reading files with/without header */ + /** Tests for reading files with/without header. */ @RunWith(Parameterized.class) public static class ReadWithoutHeaderTest { private static final ImmutableList EXPECTED = ImmutableList.of("asdf", "hjkl", "xyz"); From c4f3d6c0683c5d370f9ea481667669db80745be9 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 30 Oct 2023 19:45:22 +0000 Subject: [PATCH 04/19] tests --- .../java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java index 171f9c01dd83..30ffd5579f80 100644 --- a/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java +++ b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java @@ -75,7 +75,7 @@ public void deleteFile() throws Exception { @Benchmark public void benchmarkTextSource(Data data) throws Exception { Source.Reader reader = - ((FileBasedSource) TextIOReadTest.getTextSource(data.pathString, null)) + ((FileBasedSource) TextIOReadTest.getTextSource(data.pathString, null, false)) .createReader(PipelineOptionsFactory.create()); int length = 0; int linesRead = 0; From 79015dc0b41412f52b757502d295375989a51bd7 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 30 Oct 2023 19:57:35 +0000 Subject: [PATCH 05/19] tests --- .../java/org/apache/beam/sdk/io/TextRowCountEstimator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java index 7ee00ae982cc..e1f166c1a719 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java @@ -64,7 +64,8 @@ public static TextRowCountEstimator.Builder builder() { .setNumSampledBytesPerFile(DEFAULT_NUM_BYTES_PER_FILE) .setCompression(DEFAULT_COMPRESSION) .setDirectoryTreatment(DEFAULT_DIRECTORY_TREATMENT) - .setEmptyMatchTreatment(DEFAULT_EMPTY_MATCH_TREATMENT); + .setEmptyMatchTreatment(DEFAULT_EMPTY_MATCH_TREATMENT) + .setRemoveHeader(false); } /** From ae385f952cb84a0fcdf3a5f6ba55f18eb0323c28 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 30 Oct 2023 20:52:28 +0000 Subject: [PATCH 06/19] tests --- sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 68c7cb302008..434adc57cab5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -230,6 +230,7 @@ public static ReadFiles readFiles() { // but is not so large as to exhaust a typical runner's maximum amount of output per // ProcessElement call. .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) + .setRemoveHeader(false) .build(); } From 85034e4aed4fbe5ff2984b468c621c107761538e Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 31 Oct 2023 13:04:46 +0000 Subject: [PATCH 07/19] changes --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 6561cc2b56df..f4ee01b3d1ec 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,7 +74,7 @@ should handle this. ([#25252](https://github.com/apache/beam/issues/25252)). jobs using the DataStream API. By default the option is set to false, so the batch jobs are still executed using the DataSet API. * `upload_graph` as one of the Experiments options for DataflowRunner is no longer required when the graph is larger than 10MB for Java SDK ([PR#28621](https://github.com/apache/beam/pull/28621). - +* `TextIO` supports skipping header. ## Breaking Changes * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). From a64772431f469fc5420c706ad748136504866c3b Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 31 Oct 2023 14:32:47 +0000 Subject: [PATCH 08/19] avoiding breaking change --- .../core/src/main/java/org/apache/beam/sdk/io/TextSource.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index 29d38410bbca..5b0090d62f2d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -120,6 +120,10 @@ static class TextBasedReader extends FileBasedReader { private int bufferPosn = 0; // the current position in the buffer private boolean skipLineFeedAtStart; // skip an LF if at the start of the next buffer + private TextBasedReader(TextSource source, byte[] delimiter) { + this(source, delimiter, false); + } + private TextBasedReader(TextSource source, byte[] delimiter, boolean removeHeader) { super(source); this.buffer = new byte[READ_BUFFER_SIZE]; From 215340416be6a39baa3608d110d536a0a1189085 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 2 Nov 2023 22:41:27 +0000 Subject: [PATCH 09/19] add multiple header lines removal --- .../java/org/apache/beam/sdk/io/TextIO.java | 40 ++++++------- .../beam/sdk/io/TextRowCountEstimator.java | 8 +-- .../org/apache/beam/sdk/io/TextSource.java | 56 ++++++++++++------- .../apache/beam/sdk/io/TextIOReadTest.java | 24 ++++---- 4 files changed, 73 insertions(+), 55 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 434adc57cab5..936bfae4bee0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -191,7 +191,7 @@ public static Read read() { return new AutoValue_TextIO_Read.Builder() .setCompression(Compression.AUTO) .setHintMatchesManyFiles(false) - .setRemoveHeader(false) + .setSkipHeaderLines(0) .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) .build(); } @@ -215,7 +215,7 @@ public static Read read() { public static ReadAll readAll() { return new AutoValue_TextIO_ReadAll.Builder() .setCompression(Compression.AUTO) - .setRemoveHeader(false) + .setSkipHeaderLines(0) .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) .build(); } @@ -230,7 +230,7 @@ public static ReadFiles readFiles() { // but is not so large as to exhaust a typical runner's maximum amount of output per // ProcessElement call. .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES) - .setRemoveHeader(false) + .setSkipHeaderLines(0) .build(); } @@ -289,7 +289,7 @@ public abstract static class Read extends PTransform @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller abstract byte @Nullable [] getDelimiter(); - abstract boolean getRemoveHeader(); + abstract int getSkipHeaderLines(); abstract Builder toBuilder(); @@ -305,7 +305,7 @@ abstract static class Builder { abstract Builder setDelimiter(byte @Nullable [] delimiter); - abstract Builder setRemoveHeader(boolean removeHeader); + abstract Builder setSkipHeaderLines(int skipHeaderLines); abstract Read build(); } @@ -403,8 +403,8 @@ public Read withDelimiter(byte[] delimiter) { return toBuilder().setDelimiter(delimiter).build(); } - public Read withRemoveHeader(boolean removeHeader) { - return toBuilder().setRemoveHeader(removeHeader).build(); + public Read withSkipHeaderLines(int skipHeaderLines) { + return toBuilder().setSkipHeaderLines(skipHeaderLines).build(); } static boolean isSelfOverlapping(byte[] s) { @@ -435,7 +435,7 @@ public PCollection expand(PBegin input) { .withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) .apply( "Via ReadFiles", - readFiles().withDelimiter(getDelimiter()).withRemoveHeader(getRemoveHeader())); + readFiles().withDelimiter(getDelimiter()).withSkipHeaderLines(getSkipHeaderLines())); } // Helper to create a source specific to the requested compression type. @@ -445,7 +445,7 @@ protected FileBasedSource getSource() { getFilepattern(), getMatchConfiguration().getEmptyMatchTreatment(), getDelimiter(), - getRemoveHeader())) + getSkipHeaderLines())) .withCompression(getCompression()); } @@ -482,7 +482,7 @@ public abstract static class ReadAll @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller abstract byte @Nullable [] getDelimiter(); - abstract boolean getRemoveHeader(); + abstract int getSkipHeaderLines(); abstract Builder toBuilder(); @@ -494,7 +494,7 @@ abstract static class Builder { abstract Builder setDelimiter(byte @Nullable [] delimiter); - abstract Builder setRemoveHeader(boolean removeHeader); + abstract Builder setSkipHeaderLines(int skipHeaderLines); abstract ReadAll build(); } @@ -578,7 +578,7 @@ public abstract static class ReadFiles @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller abstract byte @Nullable [] getDelimiter(); - abstract boolean getRemoveHeader(); + abstract int getSkipHeaderLines(); abstract Builder toBuilder(); @@ -588,7 +588,7 @@ abstract static class Builder { abstract Builder setDelimiter(byte @Nullable [] delimiter); - abstract Builder setRemoveHeader(boolean removeHeader); + abstract Builder setSkipHeaderLines(int skipHeaderLines); abstract ReadFiles build(); } @@ -603,8 +603,8 @@ public ReadFiles withDelimiter(byte[] delimiter) { return toBuilder().setDelimiter(delimiter).build(); } - public ReadFiles withRemoveHeader(boolean removeHeader) { - return toBuilder().setRemoveHeader(removeHeader).build(); + public ReadFiles withSkipHeaderLines(int skipHeaderLines) { + return toBuilder().setSkipHeaderLines(skipHeaderLines).build(); } @Override @@ -613,7 +613,7 @@ public PCollection expand(PCollection input) { "Read all via FileBasedSource", new ReadAllViaFileBasedSource<>( getDesiredBundleSizeBytes(), - new CreateTextSourceFn(getDelimiter(), getRemoveHeader()), + new CreateTextSourceFn(getDelimiter(), getSkipHeaderLines()), StringUtf8Coder.of())); } @@ -628,17 +628,17 @@ public void populateDisplayData(DisplayData.Builder builder) { private static class CreateTextSourceFn implements SerializableFunction> { private byte[] delimiter; - private boolean removeHeader; + private int skipHeaderLines; - private CreateTextSourceFn(byte[] delimiter, boolean removeHeader) { + private CreateTextSourceFn(byte[] delimiter, int skipHeaderLines) { this.delimiter = delimiter; - this.removeHeader = removeHeader; + this.skipHeaderLines = skipHeaderLines; } @Override public FileBasedSource apply(String input) { return new TextSource( - StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter, removeHeader); + StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter, skipHeaderLines); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java index e1f166c1a719..8542ce011098 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextRowCountEstimator.java @@ -46,7 +46,7 @@ public abstract class TextRowCountEstimator { @SuppressWarnings("mutable") public abstract byte @Nullable [] getDelimiters(); - public abstract boolean getRemoveHeader(); + public abstract int getSkipHeaderLines(); public abstract String getFilePattern(); @@ -65,7 +65,7 @@ public static TextRowCountEstimator.Builder builder() { .setCompression(DEFAULT_COMPRESSION) .setDirectoryTreatment(DEFAULT_DIRECTORY_TREATMENT) .setEmptyMatchTreatment(DEFAULT_EMPTY_MATCH_TREATMENT) - .setRemoveHeader(false); + .setSkipHeaderLines(0); } /** @@ -118,7 +118,7 @@ public Double estimateRowCount(PipelineOptions pipelineOptions) ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().toString()), getEmptyMatchTreatment(), getDelimiters(), - getRemoveHeader()); + getSkipHeaderLines()); FileBasedSource source = CompressedSource.from(textSource).withCompression(file.getCompression()); try (BoundedSource.BoundedReader reader = @@ -164,7 +164,7 @@ public abstract Builder setDirectoryTreatment( public abstract Builder setDelimiters(byte @Nullable [] delimiters); - public abstract Builder setRemoveHeader(boolean removeHeader); + public abstract Builder setSkipHeaderLines(int skipHeaderLines); public abstract Builder setFilePattern(String filePattern); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index 5b0090d62f2d..64a5cacab997 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -56,34 +56,34 @@ public class TextSource extends FileBasedSource { byte[] delimiter; - boolean removeHeader; + int skipHeaderLines; public TextSource( ValueProvider fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter, - boolean removeHeader) { + int skipHeaderLines) { super(fileSpec, emptyMatchTreatment, 1L); this.delimiter = delimiter; - this.removeHeader = removeHeader; + this.skipHeaderLines = skipHeaderLines; } public TextSource( - MatchResult.Metadata metadata, long start, long end, byte[] delimiter, boolean removeHeader) { + MatchResult.Metadata metadata, long start, long end, byte[] delimiter, int skipHeaderLines) { super(metadata, 1L, start, end); this.delimiter = delimiter; - this.removeHeader = removeHeader; + this.skipHeaderLines = skipHeaderLines; } @Override protected FileBasedSource createForSubrangeOfFile( MatchResult.Metadata metadata, long start, long end) { - return new TextSource(metadata, start, end, delimiter, removeHeader); + return new TextSource(metadata, start, end, delimiter, skipHeaderLines); } @Override protected FileBasedReader createSingleFileReader(PipelineOptions options) { - return new TextBasedReader(this, delimiter, removeHeader); + return new TextBasedReader(this, delimiter, skipHeaderLines); } @Override @@ -106,7 +106,7 @@ static class TextBasedReader extends FileBasedReader { private static final byte LF = '\n'; private final byte @Nullable [] delimiter; - private final boolean removeHeader; + private final int skipHeaderLines; private final ByteArrayOutputStream str; private final byte[] buffer; private final ByteBuffer byteBuffer; @@ -121,16 +121,16 @@ static class TextBasedReader extends FileBasedReader { private boolean skipLineFeedAtStart; // skip an LF if at the start of the next buffer private TextBasedReader(TextSource source, byte[] delimiter) { - this(source, delimiter, false); + this(source, delimiter, 0); } - private TextBasedReader(TextSource source, byte[] delimiter, boolean removeHeader) { + private TextBasedReader(TextSource source, byte[] delimiter, int skipHeaderLines) { super(source); this.buffer = new byte[READ_BUFFER_SIZE]; this.str = new ByteArrayOutputStream(); this.byteBuffer = ByteBuffer.wrap(buffer); this.delimiter = delimiter; - this.removeHeader = removeHeader; + this.skipHeaderLines = skipHeaderLines; } @Override @@ -185,25 +185,43 @@ protected void startReading(ReadableByteChannel channel) throws IOException { } else { startOfNextRecord = bufferPosn = (int) requiredPosition; } + skipHeader(skipHeaderLines,true); } else { - ((SeekableByteChannel) channel).position(requiredPosition); - startOfNextRecord = requiredPosition; + skipHeader(skipHeaderLines,false); + if(requiredPosition>startOfNextRecord) { + ((SeekableByteChannel) channel).position(requiredPosition); + startOfNextRecord = requiredPosition; + bufferLength=bufferPosn=0; + // Read and discard the next record ensuring that startOfNextRecord and bufferPosn point + // to the beginning of the next record. + readNextRecord(); + currentValue = null; + } } - // Read and discard the next record ensuring that startOfNextRecord and bufferPosn point - // to the beginning of the next record. - readNextRecord(); - currentValue = null; + } else { // Check to see if we start with the UTF_BOM bytes skipping them if present. if (fileStartsWithBom()) { startOfNextRecord = bufferPosn = UTF8_BOM.size(); } - if (removeHeader) { + skipHeader(skipHeaderLines,true); + } + } + + private void skipHeader(int headerLines, boolean skipFirstLine) throws IOException { + if(headerLines == 1){ + readNextRecord(); + }else if(headerLines > 1){ + // this will be expensive + ((SeekableByteChannel) inChannel ).position(0); + for(int line = 0; line < headerLines; ++line){ readNextRecord(); - currentValue = null; } + }else if(headerLines == 0 && skipFirstLine){ + readNextRecord(); } + currentValue = null; } private boolean fileStartsWithBom() throws IOException { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 4598e022eb92..3d32536760e7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -248,20 +248,20 @@ private static TextSource prepareSource( TemporaryFolder temporaryFolder, byte[] data, @Nullable byte[] delimiter, - boolean removeHeader) + int skipHeaderLines) throws IOException { Path path = temporaryFolder.newFile().toPath(); Files.write(path, data); - return getTextSource(path.toString(), delimiter, removeHeader); + return getTextSource(path.toString(), delimiter, skipHeaderLines); } public static TextSource getTextSource( - String path, @Nullable byte[] delimiter, boolean removeHeader) { + String path, @Nullable byte[] delimiter, int skipHeaderLines) { return new TextSource( ValueProvider.StaticValueProvider.of(path), EmptyMatchTreatment.DISALLOW, delimiter, - removeHeader); + skipHeaderLines); } private static String getFileSuffix(Compression compression) { @@ -392,7 +392,7 @@ public void testReadLinesWithDefaultDelimiterAndZeroAndOneLengthReturningChannel Files.write(path, line.getBytes(UTF_8)); Metadata metadata = FileSystems.matchSingleFileSpec(path.toString()); FileBasedSource source = - getTextSource(path.toString(), null, false) + getTextSource(path.toString(), null, 0) .createForSubrangeOfFile(metadata, 0, metadata.sizeBytes()); FileBasedReader reader = source.createSingleFileReader(PipelineOptionsFactory.create()); @@ -441,7 +441,7 @@ public void testSplittingSource() throws Exception { } private TextSource prepareSource(byte[] data) throws IOException { - return TextIOReadTest.prepareSource(tempFolder, data, null, false); + return TextIOReadTest.prepareSource(tempFolder, data, null, 0); } private void runTestReadWithData(byte[] data, List expectedResults) throws Exception { @@ -479,7 +479,7 @@ public void testReadLines() throws Exception { } private TextSource prepareSource(byte[] data) throws IOException { - return TextIOReadTest.prepareSource(tempFolder, data, null, true); + return TextIOReadTest.prepareSource(tempFolder, data, null, 1); } private void runTestReadWithData(byte[] data, List expectedResults) throws Exception { @@ -524,7 +524,7 @@ public static Iterable data() { public void testReadLinesWithCustomDelimiter() throws Exception { SourceTestUtils.assertSplitAtFractionExhaustive( TextIOReadTest.prepareSource( - tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}, false), + tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}, 1), PipelineOptionsFactory.create()); } @@ -536,7 +536,7 @@ public void testReadLinesWithCustomDelimiterAndZeroAndOneLengthReturningChannel( Files.write(path, testCase.getBytes(UTF_8)); Metadata metadata = FileSystems.matchSingleFileSpec(path.toString()); FileBasedSource source = - getTextSource(path.toString(), delimiter, false) + getTextSource(path.toString(), delimiter, 0) .createForSubrangeOfFile(metadata, 0, metadata.sizeBytes()); FileBasedReader reader = source.createSingleFileReader(PipelineOptionsFactory.create()); @@ -790,7 +790,7 @@ public void testTextIOGetName() { } private TextSource prepareSource(byte[] data) throws IOException { - return TextIOReadTest.prepareSource(tempFolder, data, null, false); + return TextIOReadTest.prepareSource(tempFolder, data, null, 0); } @Test @@ -1025,7 +1025,7 @@ public void testReadFilesWithFilename() throws IOException { ValueProvider.StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, new byte[] {'\n'}, - false); + 0); PCollection> lines = p.apply( @@ -1150,7 +1150,7 @@ public void processElement(ProcessContext c) { ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename()); // Create a TextSource, passing null as the delimiter to use the default // delimiters ('\n', '\r', or '\r\n'). - TextSource textSource = new TextSource(filenameProvider, null, null, false); + TextSource textSource = new TextSource(filenameProvider, null, null, 0); try { BoundedSource.BoundedReader reader = textSource From 20aeb2cd0cd4d8a3a5a9d5d8d15d071379aa21b1 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 2 Nov 2023 22:44:23 +0000 Subject: [PATCH 10/19] checkstyle --- .../java/org/apache/beam/sdk/io/TextIO.java | 5 ++++- .../org/apache/beam/sdk/io/TextSource.java | 21 +++++++++---------- .../apache/beam/sdk/io/TextIOReadTest.java | 5 +---- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 936bfae4bee0..2c7a4fc5d4f5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -638,7 +638,10 @@ private CreateTextSourceFn(byte[] delimiter, int skipHeaderLines) { @Override public FileBasedSource apply(String input) { return new TextSource( - StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter, skipHeaderLines); + StaticValueProvider.of(input), + EmptyMatchTreatment.DISALLOW, + delimiter, + skipHeaderLines); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index 64a5cacab997..43d936e0893b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -185,13 +185,13 @@ protected void startReading(ReadableByteChannel channel) throws IOException { } else { startOfNextRecord = bufferPosn = (int) requiredPosition; } - skipHeader(skipHeaderLines,true); + skipHeader(skipHeaderLines, true); } else { - skipHeader(skipHeaderLines,false); - if(requiredPosition>startOfNextRecord) { + skipHeader(skipHeaderLines, false); + if (requiredPosition > startOfNextRecord) { ((SeekableByteChannel) channel).position(requiredPosition); startOfNextRecord = requiredPosition; - bufferLength=bufferPosn=0; + bufferLength = bufferPosn = 0; // Read and discard the next record ensuring that startOfNextRecord and bufferPosn point // to the beginning of the next record. readNextRecord(); @@ -199,26 +199,25 @@ protected void startReading(ReadableByteChannel channel) throws IOException { } } - } else { // Check to see if we start with the UTF_BOM bytes skipping them if present. if (fileStartsWithBom()) { startOfNextRecord = bufferPosn = UTF8_BOM.size(); } - skipHeader(skipHeaderLines,true); + skipHeader(skipHeaderLines, true); } } private void skipHeader(int headerLines, boolean skipFirstLine) throws IOException { - if(headerLines == 1){ + if (headerLines == 1) { readNextRecord(); - }else if(headerLines > 1){ + } else if (headerLines > 1) { // this will be expensive - ((SeekableByteChannel) inChannel ).position(0); - for(int line = 0; line < headerLines; ++line){ + ((SeekableByteChannel) inChannel).position(0); + for (int line = 0; line < headerLines; ++line) { readNextRecord(); } - }else if(headerLines == 0 && skipFirstLine){ + } else if (headerLines == 0 && skipFirstLine) { readNextRecord(); } currentValue = null; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 3d32536760e7..1f84e2b64cc5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -245,10 +245,7 @@ private static File createZipFile( } private static TextSource prepareSource( - TemporaryFolder temporaryFolder, - byte[] data, - @Nullable byte[] delimiter, - int skipHeaderLines) + TemporaryFolder temporaryFolder, byte[] data, @Nullable byte[] delimiter, int skipHeaderLines) throws IOException { Path path = temporaryFolder.newFile().toPath(); Files.write(path, data); From d4755d66e5fd9b1d9c9b09b35e7d06a73663f613 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 2 Nov 2023 23:10:25 +0000 Subject: [PATCH 11/19] broken tests --- .../core/src/main/java/org/apache/beam/sdk/io/TextSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index 43d936e0893b..3086d393b50f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -204,7 +204,7 @@ protected void startReading(ReadableByteChannel channel) throws IOException { if (fileStartsWithBom()) { startOfNextRecord = bufferPosn = UTF8_BOM.size(); } - skipHeader(skipHeaderLines, true); + skipHeader(skipHeaderLines, false); } } From d9499b2e4c384fe9f93f0435d7e01574f99a5436 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 3 Nov 2023 09:35:09 +0000 Subject: [PATCH 12/19] broken tests --- .../src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 1f84e2b64cc5..b07ca4a8fd01 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -521,7 +521,7 @@ public static Iterable data() { public void testReadLinesWithCustomDelimiter() throws Exception { SourceTestUtils.assertSplitAtFractionExhaustive( TextIOReadTest.prepareSource( - tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}, 1), + tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}, 0), PipelineOptionsFactory.create()); } From 6ef369b49288c0b115eb92d6e37b84292097289e Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 3 Nov 2023 09:55:49 +0000 Subject: [PATCH 13/19] broken tests --- .../java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java index 30ffd5579f80..bb89e428d955 100644 --- a/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java +++ b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java @@ -75,7 +75,7 @@ public void deleteFile() throws Exception { @Benchmark public void benchmarkTextSource(Data data) throws Exception { Source.Reader reader = - ((FileBasedSource) TextIOReadTest.getTextSource(data.pathString, null, false)) + ((FileBasedSource) TextIOReadTest.getTextSource(data.pathString, null, 0)) .createReader(PipelineOptionsFactory.create()); int length = 0; int linesRead = 0; From e8db3e7276aea70e72bbda734d34dac98a75cf43 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 3 Nov 2023 11:18:00 +0000 Subject: [PATCH 14/19] reverting breaking changes --- .../apache/beam/sdk/jmh/io/TextSourceBenchmark.java | 2 +- .../main/java/org/apache/beam/sdk/io/TextSource.java | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java index bb89e428d955..171f9c01dd83 100644 --- a/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java +++ b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/io/TextSourceBenchmark.java @@ -75,7 +75,7 @@ public void deleteFile() throws Exception { @Benchmark public void benchmarkTextSource(Data data) throws Exception { Source.Reader reader = - ((FileBasedSource) TextIOReadTest.getTextSource(data.pathString, null, 0)) + ((FileBasedSource) TextIOReadTest.getTextSource(data.pathString, null)) .createReader(PipelineOptionsFactory.create()); int length = 0; int linesRead = 0; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index 3086d393b50f..78bc1e3f29c5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -68,6 +68,13 @@ public TextSource( this.skipHeaderLines = skipHeaderLines; } + public TextSource( + ValueProvider fileSpec, + EmptyMatchTreatment emptyMatchTreatment, + byte[] delimiter) { + this(fileSpec, emptyMatchTreatment, delimiter, 0); + } + public TextSource( MatchResult.Metadata metadata, long start, long end, byte[] delimiter, int skipHeaderLines) { super(metadata, 1L, start, end); @@ -75,6 +82,11 @@ public TextSource( this.skipHeaderLines = skipHeaderLines; } + public TextSource( + MatchResult.Metadata metadata, long start, long end, byte[] delimiter) { + this(metadata, start, end, delimiter, 0); + } + @Override protected FileBasedSource createForSubrangeOfFile( MatchResult.Metadata metadata, long start, long end) { From fbe5f5fa6d0547229d826734ea9a7dd2c8ecb412 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 3 Nov 2023 11:38:44 +0000 Subject: [PATCH 15/19] reverting breaking changes --- .../src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index b07ca4a8fd01..4a9963ab8235 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -261,6 +261,11 @@ public static TextSource getTextSource( skipHeaderLines); } + public static TextSource getTextSource( + String path, @Nullable byte[] delimiter) { + return getTextSource(path, delimiter, 0); + } + private static String getFileSuffix(Compression compression) { switch (compression) { case UNCOMPRESSED: From f3b61fe3d6c0306c6cf53f6c5938362f5c247f57 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 3 Nov 2023 11:51:19 +0000 Subject: [PATCH 16/19] spotless --- .../src/main/java/org/apache/beam/sdk/io/TextSource.java | 7 ++----- .../test/java/org/apache/beam/sdk/io/TextIOReadTest.java | 3 +-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index 78bc1e3f29c5..17c3b613d33c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -69,9 +69,7 @@ public TextSource( } public TextSource( - ValueProvider fileSpec, - EmptyMatchTreatment emptyMatchTreatment, - byte[] delimiter) { + ValueProvider fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) { this(fileSpec, emptyMatchTreatment, delimiter, 0); } @@ -82,8 +80,7 @@ public TextSource( this.skipHeaderLines = skipHeaderLines; } - public TextSource( - MatchResult.Metadata metadata, long start, long end, byte[] delimiter) { + public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) { this(metadata, start, end, delimiter, 0); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 4a9963ab8235..e2e3346a52cf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -261,8 +261,7 @@ public static TextSource getTextSource( skipHeaderLines); } - public static TextSource getTextSource( - String path, @Nullable byte[] delimiter) { + public static TextSource getTextSource(String path, @Nullable byte[] delimiter) { return getTextSource(path, delimiter, 0); } From 79031690fed51e1a9a7dfac0d514f0b74d9913ed Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 3 Nov 2023 15:40:46 +0000 Subject: [PATCH 17/19] added new changes, fixed edge cases, added new test --- CHANGES.md | 37 ++++++++++++++++++- .../org/apache/beam/sdk/io/TextSource.java | 9 +++-- .../apache/beam/sdk/io/TextIOReadTest.java | 14 ++++--- 3 files changed, 50 insertions(+), 10 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index efdddafc8773..d2bc5637d14d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -53,6 +53,41 @@ * ([#X](https://github.com/apache/beam/issues/X)). --> +# [2.53.0] - Unreleased + +## Highlights + +* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). +* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). + +## I/Os + +* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* TextIO now supports skipping multiple header lines ([#17990](https://github.com/apache/beam/issues/17990)). + +## New Features / Improvements + +* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). + +## Breaking Changes + +* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). + +## Deprecations + +* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)). + +## Bugfixes + +* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). + +## Security Fixes +* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). + +## Known Issues + +* ([#X](https://github.com/apache/beam/issues/X)). + # [2.52.0] - Unreleased ## Highlights @@ -76,7 +111,7 @@ should handle this. ([#25252](https://github.com/apache/beam/issues/25252)). * `upload_graph` as one of the Experiments options for DataflowRunner is no longer required when the graph is larger than 10MB for Java SDK ([PR#28621](https://github.com/apache/beam/pull/28621). * state amd side input cache has been enabled to a default of 100 MB. Use `--max_cache_memory_usage_mb=X` to provide cache size for the user state API and side inputs. (Python) ([#28770](https://github.com/apache/beam/issues/28770)). * Beam YAML stable release. Beam pipelines can now be written using YAML and leverage the Beam YAML framework which includes a preliminary set of IO's and turnkey transforms. More information can be found in the YAML root folder and in the [README](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/README.md). -* `TextIO` supports skipping header. + ## Breaking Changes diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index 17c3b613d33c..3e09712683e7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -201,11 +201,12 @@ protected void startReading(ReadableByteChannel channel) throws IOException { ((SeekableByteChannel) channel).position(requiredPosition); startOfNextRecord = requiredPosition; bufferLength = bufferPosn = 0; - // Read and discard the next record ensuring that startOfNextRecord and bufferPosn point - // to the beginning of the next record. - readNextRecord(); - currentValue = null; } + // Read and discard the next record ensuring that startOfNextRecord and bufferPosn point + // to the beginning of the next record. + readNextRecord(); + currentValue = null; + } } else { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index e2e3346a52cf..32211311d85c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -455,16 +455,17 @@ private void runTestReadWithData(byte[] data, List expectedResults) thro /** Tests for reading files with/without header. */ @RunWith(Parameterized.class) - public static class ReadWithoutHeaderTest { + public static class SkippingHeaderTest { private static final ImmutableList EXPECTED = ImmutableList.of("asdf", "hjkl", "xyz"); @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); @Parameterized.Parameters(name = "{index}: {0}") public static Iterable data() { return ImmutableList.builder() - .add(new Object[] {"\n\n\n", ImmutableList.of("", "")}) - .add(new Object[] {"\n", ImmutableList.of()}) - .add(new Object[] {"header\nasdf\nhjkl\nxyz\n", EXPECTED}) + .add(new Object[] {"\n\n\n", ImmutableList.of("", ""),1}) + .add(new Object[] {"\n", ImmutableList.of(),1}) + .add(new Object[] {"header\nasdf\nhjkl\nxyz\n", EXPECTED,1}) + .add(new Object[] {"header1\nheader2\nasdf\nhjkl\nxyz\n", EXPECTED,2}) .build(); } @@ -474,13 +475,16 @@ public static Iterable data() { @Parameterized.Parameter(1) public ImmutableList expected; + @Parameterized.Parameter(2) + public int skipHeaderLines; + @Test public void testReadLines() throws Exception { runTestReadWithData(line.getBytes(UTF_8), expected); } private TextSource prepareSource(byte[] data) throws IOException { - return TextIOReadTest.prepareSource(tempFolder, data, null, 1); + return TextIOReadTest.prepareSource(tempFolder, data, null, skipHeaderLines); } private void runTestReadWithData(byte[] data, List expectedResults) throws Exception { From 5f99fbfbc15c034d4e501bc958e590024be078bb Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 3 Nov 2023 15:43:01 +0000 Subject: [PATCH 18/19] added new changes --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index d2bc5637d14d..c302df6b4193 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,7 +63,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). -* TextIO now supports skipping multiple header lines ([#17990](https://github.com/apache/beam/issues/17990)). +* TextIO now supports skipping multiple header lines (Java) ([#17990](https://github.com/apache/beam/issues/17990)). ## New Features / Improvements From 62baa3031ac2ae7659eedaab6f4a058012a64ebf Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 3 Nov 2023 16:09:51 +0000 Subject: [PATCH 19/19] lint --- .../src/main/java/org/apache/beam/sdk/io/TextSource.java | 1 - .../test/java/org/apache/beam/sdk/io/TextIOReadTest.java | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index 3e09712683e7..3d62c677950a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -206,7 +206,6 @@ protected void startReading(ReadableByteChannel channel) throws IOException { // to the beginning of the next record. readNextRecord(); currentValue = null; - } } else { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 32211311d85c..84c05ee6c906 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -462,10 +462,10 @@ public static class SkippingHeaderTest { @Parameterized.Parameters(name = "{index}: {0}") public static Iterable data() { return ImmutableList.builder() - .add(new Object[] {"\n\n\n", ImmutableList.of("", ""),1}) - .add(new Object[] {"\n", ImmutableList.of(),1}) - .add(new Object[] {"header\nasdf\nhjkl\nxyz\n", EXPECTED,1}) - .add(new Object[] {"header1\nheader2\nasdf\nhjkl\nxyz\n", EXPECTED,2}) + .add(new Object[] {"\n\n\n", ImmutableList.of("", ""), 1}) + .add(new Object[] {"\n", ImmutableList.of(), 1}) + .add(new Object[] {"header\nasdf\nhjkl\nxyz\n", EXPECTED, 1}) + .add(new Object[] {"header1\nheader2\nasdf\nhjkl\nxyz\n", EXPECTED, 2}) .build(); }