From 2062444c7ffa3de6412af953b63f0b6ba0dd6403 Mon Sep 17 00:00:00 2001 From: Hyungrok Ham Date: Tue, 19 Sep 2023 00:10:06 +0900 Subject: [PATCH 1/4] feat: add withSkipHeaderLines --- .../java/org/apache/beam/sdk/io/TextIO.java | 411 ++++++++++++------ .../org/apache/beam/sdk/io/TextSource.java | 52 ++- .../apache/beam/sdk/io/TextIOReadTest.java | 259 +++++++---- 3 files changed, 500 insertions(+), 222 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 33beff23b311..b9465f16a219 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -68,8 +68,9 @@ * *

To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the - * file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link - * PCollection} you can use {@link FileIO} to match them and {@link TextIO#readFiles} to read them. + * file(s) to be read. Alternatively, if the filenames to be read are themselves in a + * {@link PCollection} you can use {@link FileIO} to match them and {@link TextIO#readFiles} to read + * them. * *

{@link #read} returns a {@link PCollection} of {@link String Strings}, each corresponding to * one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n', or @@ -78,14 +79,14 @@ *

Filepattern expansion and watching

* *

By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} or the - * combination of {@link FileIO.Match#continuously(Duration, TerminationCondition)} and {@link - * #readFiles()} allow streaming of new files matching the filepattern(s). + * combination of {@link FileIO.Match#continuously(Duration, TerminationCondition)} and + * {@link #readFiles()} allow streaming of new files matching the filepattern(s). * - *

By default, {@link #read} prohibits filepatterns that match no files, and {@link #readFiles()} - * allows them in case the filepattern contains a glob wildcard character. Use {@link - * Read#withEmptyMatchTreatment} or {@link - * FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link #readFiles()} to configure - * this behavior. + *

By default, {@link #read} prohibits filepatterns that match no files, and + * {@link #readFiles()} allows them in case the filepattern contains a glob wildcard character. Use + * {@link Read#withEmptyMatchTreatment} or + * {@link FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link #readFiles()} to + * configure this behavior. * *

Example 1: reading a file or filepattern. * @@ -178,26 +179,28 @@ * DynamicDestinations} interface for advanced features via {@link Write#to(DynamicDestinations)}. */ @SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class TextIO { + private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L; /** - * A {@link PTransform} that reads from one or more text files and returns a bounded {@link - * PCollection} containing one element for each line of the input files. + * A {@link PTransform} that reads from one or more text files and returns a bounded + * {@link PCollection} containing one element for each line of the input files. */ public static Read read() { return new AutoValue_TextIO_Read.Builder() .setCompression(Compression.AUTO) .setHintMatchesManyFiles(false) .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) + .setSkipHeaderLines(0) .build(); } /** - * A {@link PTransform} that works like {@link #read}, but reads each file in a {@link - * PCollection} of filepatterns. + * A {@link PTransform} that works like {@link #read}, but reads each file in a + * {@link PCollection} of filepatterns. * *

Can be applied to both bounded and unbounded {@link PCollection PCollections}, so this is * suitable for reading a {@link PCollection} of filepatterns arriving as a stream. However, every @@ -206,9 +209,8 @@ public static Read read() { * new entries. * * @deprecated You can achieve The functionality of {@link #readAll()} using {@link FileIO} - * matching plus {@link #readFiles()}. This is the preferred method to make composition - * explicit. {@link ReadAll} will not receive upgrades and will be removed in a future version - * of Beam. + * matching plus {@link #readFiles()}. This is the preferred method to make composition explicit. + * {@link ReadAll} will not receive upgrades and will be removed in a future version of Beam. */ @Deprecated public static ReadAll readAll() { @@ -219,8 +221,8 @@ public static ReadAll readAll() { } /** - * Like {@link #read}, but reads each file in a {@link PCollection} of {@link - * FileIO.ReadableFile}, returned by {@link FileIO#readMatches}. + * Like {@link #read}, but reads each file in a {@link PCollection} of + * {@link FileIO.ReadableFile}, returned by {@link FileIO#readMatches}. */ public static ReadFiles readFiles() { return new AutoValue_TextIO_ReadFiles.Builder() @@ -248,8 +250,8 @@ public static Write write() { *

This version allows you to apply {@link TextIO} writes to a PCollection of a custom type * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the String that * will be written to the file must be specified. If using a custom {@link DynamicDestinations} - * object this is done using {@link DynamicDestinations#formatRecord}, otherwise the {@link - * TypedWrite#withFormatFunction} can be used to specify a format function. + * object this is done using {@link DynamicDestinations#formatRecord}, otherwise the + * {@link TypedWrite#withFormatFunction} can be used to specify a format function. * *

The advantage of using a custom type is that is it allows a user-provided {@link * DynamicDestinations} object, set via {@link Write#to(DynamicDestinations)} to examine the @@ -263,7 +265,7 @@ public static TypedWrite writeCustomType() { .setFilenameSuffix(null) .setFilenamePolicy(null) .setDynamicDestinations(null) - .setDelimiter(new char[] {'\n'}) + .setDelimiter(new char[]{'\n'}) .setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED) .setWindowedWrites(false) .setNoSpilling(false) @@ -271,7 +273,9 @@ public static TypedWrite writeCustomType() { .build(); } - /** Implementation of {@link #read}. */ + /** + * Implementation of {@link #read}. + */ @AutoValue public abstract static class Read extends PTransform> { @@ -283,6 +287,8 @@ public abstract static class Read extends PTransform abstract Compression getCompression(); + abstract int getSkipHeaderLines(); + @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller abstract byte @Nullable [] getDelimiter(); @@ -290,6 +296,7 @@ public abstract static class Read extends PTransform @AutoValue.Builder abstract static class Builder { + abstract Builder setFilepattern(ValueProvider filepattern); abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration); @@ -298,6 +305,8 @@ abstract static class Builder { abstract Builder setCompression(Compression compression); + abstract Builder setSkipHeaderLines(int skipHeaderLines); + abstract Builder setDelimiter(byte @Nullable [] delimiter); abstract Read build(); @@ -313,26 +322,33 @@ abstract static class Builder { *

Standard Java * Filesystem glob patterns ("*", "?", "[..]") are supported. * - *

If it is known that the filepattern will match a very large number of files (at least tens - * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability. + *

If it is known that the filepattern will match a very large number of files (at least + * tens of thousands), use {@link #withHintMatchesManyFiles} for better performance and + * scalability. */ public Read from(String filepattern) { checkArgument(filepattern != null, "filepattern can not be null"); return from(StaticValueProvider.of(filepattern)); } - /** Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */ + /** + * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. + */ public Read from(ValueProvider filepattern) { checkArgument(filepattern != null, "filepattern can not be null"); return toBuilder().setFilepattern(filepattern).build(); } - /** Sets the {@link MatchConfiguration}. */ + /** + * Sets the {@link MatchConfiguration}. + */ public Read withMatchConfiguration(MatchConfiguration matchConfiguration) { return toBuilder().setMatchConfiguration(matchConfiguration).build(); } - /** @deprecated Use {@link #withCompression}. */ + /** + * @deprecated Use {@link #withCompression}. + */ @Deprecated public Read withCompressionType(TextIO.CompressionType compressionType) { return withCompression(compressionType.canonical); @@ -363,8 +379,8 @@ public Read watchForNewFiles( } /** - * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)} with {@code - * matchUpdatedFiles=false}. + * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)} with + * {@code matchUpdatedFiles=false}. */ public Read watchForNewFiles( Duration pollInterval, TerminationCondition terminationCondition) { @@ -384,12 +400,20 @@ public Read withHintMatchesManyFiles() { return toBuilder().setHintMatchesManyFiles(true).build(); } - /** See {@link MatchConfiguration#withEmptyMatchTreatment}. */ + /** + * See {@link MatchConfiguration#withEmptyMatchTreatment}. + */ public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) { return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); } - /** Set the custom delimiter to be used in place of the default ones ('\r', '\n' or '\r\n'). */ + public Read withSkipHeaderLines(int skipHeaderLines) { + return toBuilder().setSkipHeaderLines(skipHeaderLines).build(); + } + + /** + * Set the custom delimiter to be used in place of the default ones ('\r', '\n' or '\r\n'). + */ public Read withDelimiter(byte[] delimiter) { checkArgument(delimiter != null, "delimiter can not be null"); checkArgument(!isSelfOverlapping(delimiter), "delimiter must not self-overlap"); @@ -431,6 +455,7 @@ protected FileBasedSource getSource() { new TextSource( getFilepattern(), getMatchConfiguration().getEmptyMatchTreatment(), + getSkipHeaderLines(), getDelimiter())) .withCompression(getCompression()); } @@ -444,6 +469,10 @@ public void populateDisplayData(DisplayData.Builder builder) { .withLabel("Compression Type")) .addIfNotNull(DisplayData.item("filePattern", getFilepattern()).withLabel("File Pattern")) .include("matchConfiguration", getMatchConfiguration()) + .addIfNotDefault( + DisplayData.item("skipHeaderLines", getSkipHeaderLines()) + .withLabel("Skip Header Lines"), + 0) .addIfNotNull( DisplayData.item("delimiter", Arrays.toString(getDelimiter())) .withLabel("Custom delimiter to split records")); @@ -461,6 +490,7 @@ public void populateDisplayData(DisplayData.Builder builder) { @AutoValue public abstract static class ReadAll extends PTransform, PCollection> { + abstract MatchConfiguration getMatchConfiguration(); abstract Compression getCompression(); @@ -472,6 +502,7 @@ public abstract static class ReadAll @AutoValue.Builder abstract static class Builder { + abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration); abstract Builder setCompression(Compression compression); @@ -481,12 +512,16 @@ abstract static class Builder { abstract ReadAll build(); } - /** Sets the {@link MatchConfiguration}. */ + /** + * Sets the {@link MatchConfiguration}. + */ public ReadAll withMatchConfiguration(MatchConfiguration configuration) { return toBuilder().setMatchConfiguration(configuration).build(); } - /** @deprecated Use {@link #withCompression}. */ + /** + * @deprecated Use {@link #withCompression}. + */ @Deprecated public ReadAll withCompressionType(TextIO.CompressionType compressionType) { return withCompression(compressionType.canonical); @@ -501,12 +536,16 @@ public ReadAll withCompression(Compression compression) { return toBuilder().setCompression(compression).build(); } - /** Same as {@link Read#withEmptyMatchTreatment}. */ + /** + * Same as {@link Read#withEmptyMatchTreatment}. + */ public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) { return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); } - /** Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)}. */ + /** + * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)}. + */ public ReadAll watchForNewFiles( Duration pollInterval, TerminationCondition terminationCondition, @@ -516,7 +555,9 @@ public ReadAll watchForNewFiles( .continuously(pollInterval, terminationCondition, matchUpdatedFiles)); } - /** Same as {@link Read#watchForNewFiles(Duration, TerminationCondition)}. */ + /** + * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition)}. + */ public ReadAll watchForNewFiles( Duration pollInterval, TerminationCondition terminationCondition) { return watchForNewFiles(pollInterval, terminationCondition, false); @@ -551,10 +592,13 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - /** Implementation of {@link #readFiles}. */ + /** + * Implementation of {@link #readFiles}. + */ @AutoValue public abstract static class ReadFiles extends PTransform, PCollection> { + abstract long getDesiredBundleSizeBytes(); @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller @@ -564,6 +608,7 @@ public abstract static class ReadFiles @AutoValue.Builder abstract static class Builder { + abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes); abstract Builder setDelimiter(byte @Nullable [] delimiter); @@ -576,7 +621,9 @@ ReadFiles withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); } - /** Like {@link Read#withDelimiter}. */ + /** + * Like {@link Read#withDelimiter}. + */ public ReadFiles withDelimiter(byte[] delimiter) { return toBuilder().setDelimiter(delimiter).build(); } @@ -601,6 +648,7 @@ public void populateDisplayData(DisplayData.Builder builder) { private static class CreateTextSourceFn implements SerializableFunction> { + private byte[] delimiter; private CreateTextSourceFn(byte[] delimiter) { @@ -609,66 +657,100 @@ private CreateTextSourceFn(byte[] delimiter) { @Override public FileBasedSource apply(String input) { - return new TextSource( - StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter); + return new TextSource(StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, + delimiter); } } } // /////////////////////////////////////////////////////////////////////////// - /** Implementation of {@link #write}. */ + /** + * Implementation of {@link #write}. + */ @AutoValue public abstract static class TypedWrite extends PTransform, WriteFilesResult> { - /** The prefix of each file written, combined with suffix and shardTemplate. */ + /** + * The prefix of each file written, combined with suffix and shardTemplate. + */ abstract @Nullable ValueProvider getFilenamePrefix(); - /** The suffix of each file written, combined with prefix and shardTemplate. */ + /** + * The suffix of each file written, combined with prefix and shardTemplate. + */ abstract @Nullable String getFilenameSuffix(); - /** The base directory used for generating temporary files. */ + /** + * The base directory used for generating temporary files. + */ abstract @Nullable ValueProvider getTempDirectory(); - /** The delimiter between string records. */ + /** + * The delimiter between string records. + */ @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller abstract char[] getDelimiter(); - /** An optional header to add to each file. */ + /** + * An optional header to add to each file. + */ abstract @Nullable String getHeader(); - /** An optional footer to add to each file. */ + /** + * An optional footer to add to each file. + */ abstract @Nullable String getFooter(); - /** Requested number of shards. 0 for automatic. */ + /** + * Requested number of shards. 0 for automatic. + */ abstract @Nullable ValueProvider getNumShards(); - /** The shard template of each file written, combined with prefix and suffix. */ + /** + * The shard template of each file written, combined with prefix and suffix. + */ abstract @Nullable String getShardTemplate(); - /** A policy for naming output files. */ + /** + * A policy for naming output files. + */ abstract @Nullable FilenamePolicy getFilenamePolicy(); - /** Allows for value-dependent {@link DynamicDestinations} to be vended. */ + /** + * Allows for value-dependent {@link DynamicDestinations} to be vended. + */ abstract @Nullable DynamicDestinations getDynamicDestinations(); - /** A destination function for using {@link DefaultFilenamePolicy}. */ + /** + * A destination function for using {@link DefaultFilenamePolicy}. + */ abstract @Nullable SerializableFunction getDestinationFunction(); - /** A default destination for empty PCollections. */ + /** + * A default destination for empty PCollections. + */ abstract @Nullable Params getEmptyDestination(); - /** A function that converts UserT to a String, for writing to the file. */ + /** + * A function that converts UserT to a String, for writing to the file. + */ abstract @Nullable SerializableFunction getFormatFunction(); - /** Whether to write windowed output files. */ + /** + * Whether to write windowed output files. + */ abstract boolean getWindowedWrites(); - /** Whether to skip the spilling of data caused by having maxNumWritersPerBundle. */ + /** + * Whether to skip the spilling of data caused by having maxNumWritersPerBundle. + */ abstract boolean getNoSpilling(); - /** Whether to skip writing any output files if the PCollection is empty. */ + /** + * Whether to skip writing any output files if the PCollection is empty. + */ abstract boolean getSkipIfEmpty(); /** @@ -681,6 +763,7 @@ public abstract static class TypedWrite @AutoValue.Builder abstract static class Builder { + abstract Builder setFilenamePrefix( @Nullable ValueProvider filenamePrefix); @@ -727,13 +810,14 @@ abstract Builder setWritableByteChannelFactory( } /** - * Writes to text files with the given prefix. The given {@code prefix} can reference any {@link - * FileSystem} on the classpath. This prefix is used by the {@link DefaultFilenamePolicy} to - * generate filenames. + * Writes to text files with the given prefix. The given {@code prefix} can reference any + * {@link FileSystem} on the classpath. This prefix is used by the {@link DefaultFilenamePolicy} + * to generate filenames. * *

By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix - * to define the base output directory and file prefix, a shard identifier (see {@link - * #withNumShards(int)}), and a common suffix (if supplied using {@link #withSuffix(String)}). + * to define the base output directory and file prefix, a shard identifier (see + * {@link #withNumShards(int)}), and a common suffix (if supplied using + * {@link #withSuffix(String)}). * *

This default policy can be overridden using {@link #to(FilenamePolicy)}, in which case * {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should not be set. @@ -747,12 +831,16 @@ public TypedWrite to(String filenamePrefix) { return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix)); } - /** Like {@link #to(String)}. */ + /** + * Like {@link #to(String)}. + */ public TypedWrite to(ResourceId filenamePrefix) { return toResource(StaticValueProvider.of(filenamePrefix)); } - /** Like {@link #to(String)}. */ + /** + * Like {@link #to(String)}. + */ public TypedWrite to(ValueProvider outputPrefix) { return toResource( NestedValueProvider.of(outputPrefix, FileBasedSink::convertToFileResourceIfPossible)); @@ -772,7 +860,7 @@ public TypedWrite to(FilenamePolicy filenamePolicy) { * temporary files must be specified using {@link #withTempDirectory}. * * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()} - * instead. + * instead. */ @Deprecated public TypedWrite to( @@ -789,7 +877,7 @@ public TypedWrite to( * {@link PCollection} is empty. * * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()} - * instead. + * instead. */ @Deprecated public TypedWrite to( @@ -801,18 +889,20 @@ public TypedWrite to( .build(); } - /** Like {@link #to(ResourceId)}. */ + /** + * Like {@link #to(ResourceId)}. + */ public TypedWrite toResource(ValueProvider filenamePrefix) { return toBuilder().setFilenamePrefix(filenamePrefix).build(); } /** - * Specifies a format function to convert {@link UserT} to the output type. If {@link - * #to(DynamicDestinations)} is used, {@link DynamicDestinations#formatRecord(Object)} must be - * used instead. + * Specifies a format function to convert {@link UserT} to the output type. If + * {@link #to(DynamicDestinations)} is used, {@link DynamicDestinations#formatRecord(Object)} + * must be used instead. * * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()} - * instead. + * instead. */ @Deprecated public TypedWrite withFormatFunction( @@ -820,13 +910,17 @@ public TypedWrite withFormatFunction( return toBuilder().setFormatFunction(formatFunction).build(); } - /** Set the base directory used to generate temporary files. */ + /** + * Set the base directory used to generate temporary files. + */ public TypedWrite withTempDirectory( ValueProvider tempDirectory) { return toBuilder().setTempDirectory(tempDirectory).build(); } - /** Set the base directory used to generate temporary files. */ + /** + * Set the base directory used to generate temporary files. + */ public TypedWrite withTempDirectory(ResourceId tempDirectory) { return withTempDirectory(StaticValueProvider.of(tempDirectory)); } @@ -845,8 +939,8 @@ public TypedWrite withShardNameTemplate(String shardTemplat /** * Configures the filename suffix for written files. This option may only be used when using one - * of the default filename-prefix to() overrides - i.e. not when using either {@link - * #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}. + * of the default filename-prefix to() overrides - i.e. not when using either + * {@link #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}. * *

See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are * used. @@ -926,8 +1020,8 @@ public TypedWrite withFooter(@Nullable String footer) { } /** - * Returns a transform for writing to text files like this one but that has the given {@link - * WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The + * Returns a transform for writing to text files like this one but that has the given + * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The * default is value is {@link Compression#UNCOMPRESSED}. * *

A {@code null} value will reset the value to the default value mentioned above. @@ -948,7 +1042,8 @@ public TypedWrite withCompression(Compression compression) } /** - * Preserves windowing of input elements and writes them to files based on the element's window. + * Preserves windowing of input elements and writes them to files based on the element's + * window. * *

If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}. @@ -957,12 +1052,16 @@ public TypedWrite withWindowedWrites() { return toBuilder().setWindowedWrites(true).build(); } - /** See {@link WriteFiles#withNoSpilling()}. */ + /** + * See {@link WriteFiles#withNoSpilling()}. + */ public TypedWrite withNoSpilling() { return toBuilder().setNoSpilling(true).build(); } - /** Don't write any output files if the PCollection is empty. */ + /** + * Don't write any output files if the PCollection is empty. + */ public TypedWrite skipIfEmpty() { return toBuilder().setSkipIfEmpty(true).build(); } @@ -1012,9 +1111,9 @@ public WriteFilesResult expand(PCollection input) { checkArgument( 1 == Iterables.size( - allToArgs.stream() - .filter(Predicates.notNull()::apply) - .collect(Collectors.toList())), + allToArgs.stream() + .filter(Predicates.notNull()::apply) + .collect(Collectors.toList())), "Exactly one of filename policy, dynamic destinations, filename prefix, or destination " + "function must be set"); @@ -1085,7 +1184,9 @@ public void populateDisplayData(DisplayData.Builder builder) { * This class exists for backwards compatibility, and will be removed in Beam 3.0. */ public static class Write extends PTransform, PDone> { - @VisibleForTesting TypedWrite inner; + + @VisibleForTesting + TypedWrite inner; Write() { this(TextIO.writeCustomType()); @@ -1095,30 +1196,40 @@ public static class Write extends PTransform, PDone> { this.inner = inner; } - /** See {@link TypedWrite#to(String)}. */ + /** + * See {@link TypedWrite#to(String)}. + */ public Write to(String filenamePrefix) { return new Write( inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.identity())); } - /** See {@link TypedWrite#to(ResourceId)}. */ + /** + * See {@link TypedWrite#to(ResourceId)}. + */ public Write to(ResourceId filenamePrefix) { return new Write( inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.identity())); } - /** See {@link TypedWrite#to(ValueProvider)}. */ + /** + * See {@link TypedWrite#to(ValueProvider)}. + */ public Write to(ValueProvider outputPrefix) { return new Write(inner.to(outputPrefix).withFormatFunction(SerializableFunctions.identity())); } - /** See {@link TypedWrite#toResource(ValueProvider)}. */ + /** + * See {@link TypedWrite#toResource(ValueProvider)}. + */ public Write toResource(ValueProvider filenamePrefix) { return new Write( inner.toResource(filenamePrefix).withFormatFunction(SerializableFunctions.identity())); } - /** See {@link TypedWrite#to(FilenamePolicy)}. */ + /** + * See {@link TypedWrite#to(FilenamePolicy)}. + */ public Write to(FilenamePolicy filenamePolicy) { return new Write( inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.identity())); @@ -1127,8 +1238,8 @@ public Write to(FilenamePolicy filenamePolicy) { /** * See {@link TypedWrite#to(DynamicDestinations)}. * - * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} ()} with {@link - * #sink()} instead. + * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} ()} with + * {@link #sink()} instead. */ @Deprecated public Write to(DynamicDestinations dynamicDestinations) { @@ -1139,8 +1250,8 @@ public Write to(DynamicDestinations dynamicDestinations) { /** * See {@link TypedWrite#to(SerializableFunction, Params)}. * - * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} ()} with {@link - * #sink()} instead. + * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} ()} with + * {@link #sink()} instead. */ @Deprecated public Write to( @@ -1151,73 +1262,101 @@ public Write to( .withFormatFunction(SerializableFunctions.identity())); } - /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */ + /** + * See {@link TypedWrite#withTempDirectory(ValueProvider)}. + */ public Write withTempDirectory(ValueProvider tempDirectory) { return new Write(inner.withTempDirectory(tempDirectory)); } - /** See {@link TypedWrite#withTempDirectory(ResourceId)}. */ + /** + * See {@link TypedWrite#withTempDirectory(ResourceId)}. + */ public Write withTempDirectory(ResourceId tempDirectory) { return new Write(inner.withTempDirectory(tempDirectory)); } - /** See {@link TypedWrite#withShardNameTemplate(String)}. */ + /** + * See {@link TypedWrite#withShardNameTemplate(String)}. + */ public Write withShardNameTemplate(String shardTemplate) { return new Write(inner.withShardNameTemplate(shardTemplate)); } - /** See {@link TypedWrite#withSuffix(String)}. */ + /** + * See {@link TypedWrite#withSuffix(String)}. + */ public Write withSuffix(String filenameSuffix) { return new Write(inner.withSuffix(filenameSuffix)); } - /** See {@link TypedWrite#withNumShards(int)}. */ + /** + * See {@link TypedWrite#withNumShards(int)}. + */ public Write withNumShards(int numShards) { return new Write(inner.withNumShards(numShards)); } - /** See {@link TypedWrite#withNumShards(ValueProvider)}. */ + /** + * See {@link TypedWrite#withNumShards(ValueProvider)}. + */ public Write withNumShards(@Nullable ValueProvider numShards) { return new Write(inner.withNumShards(numShards)); } - /** See {@link TypedWrite#withoutSharding()}. */ + /** + * See {@link TypedWrite#withoutSharding()}. + */ public Write withoutSharding() { return new Write(inner.withoutSharding()); } - /** See {@link TypedWrite#withDelimiter(char[])}. */ + /** + * See {@link TypedWrite#withDelimiter(char[])}. + */ public Write withDelimiter(char[] delimiter) { return new Write(inner.withDelimiter(delimiter)); } - /** See {@link TypedWrite#withHeader(String)}. */ + /** + * See {@link TypedWrite#withHeader(String)}. + */ public Write withHeader(@Nullable String header) { return new Write(inner.withHeader(header)); } - /** See {@link TypedWrite#withFooter(String)}. */ + /** + * See {@link TypedWrite#withFooter(String)}. + */ public Write withFooter(@Nullable String footer) { return new Write(inner.withFooter(footer)); } - /** See {@link TypedWrite#withWritableByteChannelFactory(WritableByteChannelFactory)}. */ + /** + * See {@link TypedWrite#withWritableByteChannelFactory(WritableByteChannelFactory)}. + */ public Write withWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory) { return new Write(inner.withWritableByteChannelFactory(writableByteChannelFactory)); } - /** See {@link TypedWrite#withCompression(Compression)}. */ + /** + * See {@link TypedWrite#withCompression(Compression)}. + */ public Write withCompression(Compression compression) { return new Write(inner.withCompression(compression)); } - /** See {@link TypedWrite#withWindowedWrites}. */ + /** + * See {@link TypedWrite#withWindowedWrites}. + */ public Write withWindowedWrites() { return new Write(inner.withWindowedWrites()); } - /** See {@link TypedWrite#withNoSpilling}. */ + /** + * See {@link TypedWrite#withNoSpilling}. + */ public Write withNoSpilling() { return new Write(inner.withNoSpilling()); } @@ -1231,8 +1370,8 @@ public Write withNoSpilling() { * output type, allowing access to output files. * *

The supplied {@code DestinationT} type must be: the same as that supplied in {@link - * #to(DynamicDestinations)} if that method was used; {@link Params} if {@link - * #to(SerializableFunction, Params)} was used, or {@code Void} otherwise. + * #to(DynamicDestinations)} if that method was used; {@link Params} if + * {@link #to(SerializableFunction, Params)} was used, or {@code Void} otherwise. */ public TypedWrite withOutputFilenames() { return (TypedWrite) inner; @@ -1250,28 +1389,44 @@ public PDone expand(PCollection input) { } } - /** @deprecated Use {@link Compression}. */ + /** + * @deprecated Use {@link Compression}. + */ @Deprecated public enum CompressionType { - /** @see Compression#AUTO */ + /** + * @see Compression#AUTO + */ AUTO(Compression.AUTO), - /** @see Compression#UNCOMPRESSED */ + /** + * @see Compression#UNCOMPRESSED + */ UNCOMPRESSED(Compression.UNCOMPRESSED), - /** @see Compression#GZIP */ + /** + * @see Compression#GZIP + */ GZIP(Compression.GZIP), - /** @see Compression#BZIP2 */ + /** + * @see Compression#BZIP2 + */ BZIP2(Compression.BZIP2), - /** @see Compression#ZIP */ + /** + * @see Compression#ZIP + */ ZIP(Compression.ZIP), - /** @see Compression#ZSTD */ + /** + * @see Compression#ZSTD + */ ZSTD(Compression.ZSTD), - /** @see Compression#DEFLATE */ + /** + * @see Compression#DEFLATE + */ DEFLATE(Compression.DEFLATE); private final Compression canonical; @@ -1280,7 +1435,9 @@ public enum CompressionType { this.canonical = canonical; } - /** @see Compression#matches */ + /** + * @see Compression#matches + */ public boolean matches(String filename) { return canonical.matches(filename); } @@ -1289,14 +1446,16 @@ public boolean matches(String filename) { ////////////////////////////////////////////////////////////////////////////// /** - * Creates a {@link Sink} that writes newline-delimited strings in UTF-8, for use with {@link - * FileIO#write}. + * Creates a {@link Sink} that writes newline-delimited strings in UTF-8, for use with + * {@link FileIO#write}. */ public static Sink sink() { return new AutoValue_TextIO_Sink.Builder().build(); } - /** Implementation of {@link #sink}. */ + /** + * Implementation of {@link #sink}. + */ @AutoValue public abstract static class Sink implements FileIO.Sink { @@ -1308,6 +1467,7 @@ public abstract static class Sink implements FileIO.Sink { @AutoValue.Builder abstract static class Builder { + abstract Builder setHeader(String header); abstract Builder setFooter(String footer); @@ -1352,6 +1512,9 @@ public void flush() throws IOException { } } - /** Disable construction of utility class. */ - private TextIO() {} + /** + * Disable construction of utility class. + */ + private TextIO() { + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index bef30dffa8ac..8e9dacc67294 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -41,9 +41,9 @@ * *

A {@link FileBasedSource} which can decode records delimited by newline characters. * - *

This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or {@code - * \r\n} as the delimiter. This source is not strict and supports decoding the last record even if - * it is not delimited. Finally, no records are decoded if the stream is empty. + *

This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or + * {@code \r\n} as the delimiter. This source is not strict and supports decoding the last record + * even if it is not delimited. Finally, no records are decoded if the stream is empty. * *

This source supports reading from any arbitrary byte position within the stream. If the * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found @@ -51,31 +51,42 @@ */ @VisibleForTesting @SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class TextSource extends FileBasedSource { + + int skipHeaderLines; byte[] delimiter; - public TextSource( - ValueProvider fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) { + public TextSource(ValueProvider fileSpec, EmptyMatchTreatment emptyMatchTreatment, + byte[] delimiter) { super(fileSpec, emptyMatchTreatment, 1L); this.delimiter = delimiter; } - public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) { + public TextSource(ValueProvider fileSpec, EmptyMatchTreatment emptyMatchTreatment, + int skipHeaderLines, byte[] delimiter) { + super(fileSpec, emptyMatchTreatment, 1L); + this.skipHeaderLines = skipHeaderLines; + this.delimiter = delimiter; + } + + public TextSource(MatchResult.Metadata metadata, long start, long end, int skipHeaderLines, + byte[] delimiter) { super(metadata, 1L, start, end); + this.skipHeaderLines = skipHeaderLines; this.delimiter = delimiter; } @Override protected FileBasedSource createForSubrangeOfFile( MatchResult.Metadata metadata, long start, long end) { - return new TextSource(metadata, start, end, delimiter); + return new TextSource(metadata, start, end, skipHeaderLines, delimiter); } @Override protected FileBasedReader createSingleFileReader(PipelineOptions options) { - return new TextBasedReader(this, delimiter); + return new TextBasedReader(this, skipHeaderLines, delimiter); } @Override @@ -83,6 +94,15 @@ public Coder getOutputCoder() { return StringUtf8Coder.of(); } + @Override + public boolean isSplittable() throws Exception { + if (skipHeaderLines > 0) { + return false; + } + + return super.isSplittable(); + } + /** * A {@link FileBasedReader FileBasedReader} which can decode records delimited by delimiter * characters. @@ -91,12 +111,14 @@ public Coder getOutputCoder() { */ @VisibleForTesting static class TextBasedReader extends FileBasedReader { + private static final int READ_BUFFER_SIZE = 8192; private static final ByteString UTF8_BOM = - ByteString.copyFrom(new byte[] {(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}); + ByteString.copyFrom(new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}); private static final byte CR = '\r'; private static final byte LF = '\n'; + private final int skipHeaderLines; private final byte @Nullable [] delimiter; private final ByteArrayOutputStream str; private final byte[] buffer; @@ -111,11 +133,12 @@ static class TextBasedReader extends FileBasedReader { private int bufferPosn = 0; // the current position in the buffer private boolean skipLineFeedAtStart; // skip an LF if at the start of the next buffer - private TextBasedReader(TextSource source, byte[] delimiter) { + private TextBasedReader(TextSource source, int skipHeaderLines, byte[] delimiter) { super(source); this.buffer = new byte[READ_BUFFER_SIZE]; this.str = new ByteArrayOutputStream(); this.byteBuffer = ByteBuffer.wrap(buffer); + this.skipHeaderLines = skipHeaderLines; this.delimiter = delimiter; } @@ -185,6 +208,10 @@ protected void startReading(ReadableByteChannel channel) throws IOException { if (fileStartsWithBom()) { startOfNextRecord = bufferPosn = UTF8_BOM.size(); } + + for (int i = 0; i < skipHeaderLines; i++) { + readNextRecord(); + } } } @@ -198,7 +225,8 @@ private boolean fileStartsWithBom() throws IOException { } if (bufferLength >= UTF8_BOM.size()) { int i; - for (i = 0; i < UTF8_BOM.size() && buffer[i] == UTF8_BOM.byteAt(i); ++i) {} + for (i = 0; i < UTF8_BOM.size() && buffer[i] == UTF8_BOM.byteAt(i); ++i) { + } if (i == UTF8_BOM.size()) { return true; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 379345b1001e..d8cbe1532c8e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -108,9 +108,12 @@ import org.junit.runners.JUnit4; import org.junit.runners.Parameterized; -/** Tests for {@link TextIO.Read}. */ +/** + * Tests for {@link TextIO.Read}. + */ @RunWith(Enclosed.class) public class TextIOReadTest { + private static final int LINES_NUMBER_FOR_LARGE = 1000; private static final List EMPTY = Collections.emptyList(); private static final List TINY = @@ -159,7 +162,9 @@ private static void writeToStreamAndClose(List lines, OutputStream outpu } } - /** Helper to make an array of compressible strings. Returns ["word"i] for i in range(0,n). */ + /** + * Helper to make an array of compressible strings. Returns ["word"i] for i in range(0,n). + */ private static List makeLines(int n) { List ret = new ArrayList<>(); for (int i = 0; i < n; ++i) { @@ -212,9 +217,9 @@ private static void assertReadingCompressedFileMatchesExpected( /** * Create a zip file with the given lines. * - * @param expected A list of expected lines, populated in the zip file. - * @param folder A temporary folder used to create files. - * @param filename Optionally zip file name (can be null). + * @param expected A list of expected lines, populated in the zip file. + * @param folder A temporary folder used to create files. + * @param filename Optionally zip file name (can be null). * @param fieldsEntries Fields to write in zip entries. * @return The zip filename. * @throws Exception In case of a failure during zip file creation. @@ -273,30 +278,35 @@ private static String getFileSuffix(Compression compression) { } } - /** Tests for reading from different size of files with various Compression. */ + /** + * Tests for reading from different size of files with various Compression. + */ @RunWith(Parameterized.class) public static class CompressedReadTest { - @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); - @Rule public TestPipeline p = TestPipeline.create(); + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule + public TestPipeline p = TestPipeline.create(); @Parameterized.Parameters(name = "{index}: {1}") public static Iterable data() { return ImmutableList.builder() - .add(new Object[] {EMPTY, UNCOMPRESSED}) - .add(new Object[] {EMPTY, GZIP}) - .add(new Object[] {EMPTY, BZIP2}) - .add(new Object[] {EMPTY, ZIP}) - .add(new Object[] {EMPTY, DEFLATE}) - .add(new Object[] {TINY, UNCOMPRESSED}) - .add(new Object[] {TINY, GZIP}) - .add(new Object[] {TINY, BZIP2}) - .add(new Object[] {TINY, ZIP}) - .add(new Object[] {TINY, DEFLATE}) - .add(new Object[] {LARGE, UNCOMPRESSED}) - .add(new Object[] {LARGE, GZIP}) - .add(new Object[] {LARGE, BZIP2}) - .add(new Object[] {LARGE, ZIP}) - .add(new Object[] {LARGE, DEFLATE}) + .add(new Object[]{EMPTY, UNCOMPRESSED}) + .add(new Object[]{EMPTY, GZIP}) + .add(new Object[]{EMPTY, BZIP2}) + .add(new Object[]{EMPTY, ZIP}) + .add(new Object[]{EMPTY, DEFLATE}) + .add(new Object[]{TINY, UNCOMPRESSED}) + .add(new Object[]{TINY, GZIP}) + .add(new Object[]{TINY, BZIP2}) + .add(new Object[]{TINY, ZIP}) + .add(new Object[]{TINY, DEFLATE}) + .add(new Object[]{LARGE, UNCOMPRESSED}) + .add(new Object[]{LARGE, GZIP}) + .add(new Object[]{LARGE, BZIP2}) + .add(new Object[]{LARGE, ZIP}) + .add(new Object[]{LARGE, DEFLATE}) .build(); } @@ -306,7 +316,9 @@ public static Iterable data() { @Parameterized.Parameter(1) public Compression compression; - /** Tests reading from a small, compressed file with no extension. */ + /** + * Tests reading from a small, compressed file with no extension. + */ @Test @Category(NeedsRunner.class) public void testCompressedReadWithoutExtension() throws Exception { @@ -345,24 +357,79 @@ public void testReadWithAuto() throws Exception { } } - /** Tests for reading files with various delimiters. */ + @RunWith(Parameterized.class) + public static class ReadWithSkipHeaderLinesTest { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule + public TestPipeline p = TestPipeline.create(); + + @Parameterized.Parameters(name = "{index}: {1}") + public static Iterable data() { + return ImmutableList.builder() + .add(new Object[]{TINY, 0, TINY.subList(0, TINY.size())}) + .add(new Object[]{TINY, 1, TINY.subList(1, TINY.size())}) + .add(new Object[]{TINY, 2, TINY.subList(2, TINY.size())}) + .add(new Object[]{TINY, 3, TINY.subList(3, TINY.size())}) + .build(); + } + + @Parameterized.Parameter(0) + public List lines; + + @Parameterized.Parameter(1) + public int skipHeaderLines; + + @Parameterized.Parameter(2) + public List expected; + + @Test + @Category(NeedsRunner.class) + public void testReadWithSkipHeaderLines() throws Exception { + File tmpFile = tempFolder.newFile(); + String filename = tmpFile.getPath(); + + try (PrintStream writer = new PrintStream(new FileOutputStream(tmpFile))) { + for (String elem : lines) { + byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem); + String line = new String(encodedElem, Charsets.UTF_8); + writer.println(line); + } + } + + TextIO.Read read = TextIO.read() + .from(filename) + .withSkipHeaderLines(skipHeaderLines); + PCollection output = p.apply(read); + + PAssert.that(output).containsInAnyOrder(expected); + p.run(); + } + } + + /** + * Tests for reading files with various delimiters. + */ @RunWith(Parameterized.class) public static class ReadWithDefaultDelimiterTest { + private static final ImmutableList EXPECTED = ImmutableList.of("asdf", "hjkl", "xyz"); - @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); @Parameterized.Parameters(name = "{index}: {0}") public static Iterable data() { return ImmutableList.builder() - .add(new Object[] {"\n\n\n", ImmutableList.of("", "", "")}) - .add(new Object[] {"asdf\nhjkl\nxyz\n", EXPECTED}) - .add(new Object[] {"asdf\rhjkl\rxyz\r", EXPECTED}) - .add(new Object[] {"asdf\r\nhjkl\r\nxyz\r\n", EXPECTED}) - .add(new Object[] {"asdf\rhjkl\r\nxyz\n", EXPECTED}) - .add(new Object[] {"asdf\nhjkl\nxyz", EXPECTED}) - .add(new Object[] {"asdf\rhjkl\rxyz", EXPECTED}) - .add(new Object[] {"asdf\r\nhjkl\r\nxyz", EXPECTED}) - .add(new Object[] {"asdf\rhjkl\r\nxyz", EXPECTED}) + .add(new Object[]{"\n\n\n", ImmutableList.of("", "", "")}) + .add(new Object[]{"asdf\nhjkl\nxyz\n", EXPECTED}) + .add(new Object[]{"asdf\rhjkl\rxyz\r", EXPECTED}) + .add(new Object[]{"asdf\r\nhjkl\r\nxyz\r\n", EXPECTED}) + .add(new Object[]{"asdf\rhjkl\r\nxyz\n", EXPECTED}) + .add(new Object[]{"asdf\nhjkl\nxyz", EXPECTED}) + .add(new Object[]{"asdf\rhjkl\rxyz", EXPECTED}) + .add(new Object[]{"asdf\r\nhjkl\r\nxyz", EXPECTED}) + .add(new Object[]{"asdf\rhjkl\r\nxyz", EXPECTED}) .build(); } @@ -444,30 +511,34 @@ private void runTestReadWithData(byte[] data, List expectedResults) thro } } - /** Tests for reading files with various delimiters. */ + /** + * Tests for reading files with various delimiters. + */ @RunWith(Parameterized.class) public static class ReadWithCustomDelimiterTest { - @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); @Parameterized.Parameters(name = "{index}: {0}") public static Iterable data() { return ImmutableList.builder() - .add(new Object[] {"first|*second|*|*third"}) - .add(new Object[] {"first|*second|*|*third|"}) - .add(new Object[] {"first|*second|*|*third*"}) - .add(new Object[] {"first|*second|*|*third|*"}) - .add(new Object[] {"|first|*second|*|*third"}) - .add(new Object[] {"|first|*second|*|*third|"}) - .add(new Object[] {"|first|*second|*|*third*"}) - .add(new Object[] {"|first|*second|*|*third|*"}) - .add(new Object[] {"*first|*second|*|*third"}) - .add(new Object[] {"*first|*second|*|*third|"}) - .add(new Object[] {"*first|*second|*|*third*"}) - .add(new Object[] {"*first|*second|*|*third|*"}) - .add(new Object[] {"|*first|*second|*|*third"}) - .add(new Object[] {"|*first|*second|*|*third|"}) - .add(new Object[] {"|*first|*second|*|*third*"}) - .add(new Object[] {"|*first|*second|*|*third|*"}) + .add(new Object[]{"first|*second|*|*third"}) + .add(new Object[]{"first|*second|*|*third|"}) + .add(new Object[]{"first|*second|*|*third*"}) + .add(new Object[]{"first|*second|*|*third|*"}) + .add(new Object[]{"|first|*second|*|*third"}) + .add(new Object[]{"|first|*second|*|*third|"}) + .add(new Object[]{"|first|*second|*|*third*"}) + .add(new Object[]{"|first|*second|*|*third|*"}) + .add(new Object[]{"*first|*second|*|*third"}) + .add(new Object[]{"*first|*second|*|*third|"}) + .add(new Object[]{"*first|*second|*|*third*"}) + .add(new Object[]{"*first|*second|*|*third|*"}) + .add(new Object[]{"|*first|*second|*|*third"}) + .add(new Object[]{"|*first|*second|*|*third|"}) + .add(new Object[]{"|*first|*second|*|*third*"}) + .add(new Object[]{"|*first|*second|*|*third|*"}) .build(); } @@ -477,14 +548,14 @@ public static Iterable data() { @Test public void testReadLinesWithCustomDelimiter() throws Exception { SourceTestUtils.assertSplitAtFractionExhaustive( - TextIOReadTest.prepareSource(tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}), + TextIOReadTest.prepareSource(tempFolder, testCase.getBytes(UTF_8), new byte[]{'|', '*'}), PipelineOptionsFactory.create()); } @Test public void testReadLinesWithCustomDelimiterAndZeroAndOneLengthReturningChannel() throws Exception { - byte[] delimiter = new byte[] {'|', '*'}; + byte[] delimiter = new byte[]{'|', '*'}; Path path = tempFolder.newFile().toPath(); Files.write(path, testCase.getBytes(UTF_8)); Metadata metadata = FileSystems.matchSingleFileSpec(path.toString()); @@ -534,11 +605,16 @@ public void close() throws IOException { } } - /** Tests for some basic operations in {@link TextIO.Read}. */ + /** + * Tests for some basic operations in {@link TextIO.Read}. + */ @RunWith(JUnit4.class) public static class BasicIOTest { - @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); - @Rule public TestPipeline p = TestPipeline.create(); + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule + public TestPipeline p = TestPipeline.create(); private void runTestRead(String[] expected) throws Exception { File tmpFile = tempFolder.newFile(); @@ -561,26 +637,26 @@ private void runTestRead(String[] expected) throws Exception { @Test public void testDelimiterSelfOverlaps() { - assertFalse(TextIO.Read.isSelfOverlapping(new byte[] {'a', 'b', 'c'})); - assertFalse(TextIO.Read.isSelfOverlapping(new byte[] {'c', 'a', 'b', 'd', 'a', 'b'})); - assertFalse(TextIO.Read.isSelfOverlapping(new byte[] {'a', 'b', 'c', 'a', 'b', 'd'})); - assertTrue(TextIO.Read.isSelfOverlapping(new byte[] {'a', 'b', 'a'})); - assertTrue(TextIO.Read.isSelfOverlapping(new byte[] {'a', 'b', 'c', 'a', 'b'})); + assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c'})); + assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'c', 'a', 'b', 'd', 'a', 'b'})); + assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 'b', 'd'})); + assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'a'})); + assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 'b'})); } @Test @Category(NeedsRunner.class) public void testReadStringsWithCustomDelimiter() throws Exception { final String[] inputStrings = - new String[] { - // incomplete delimiter - "To be, or not to be: that |is the question: ", - // incomplete delimiter - "To be, or not to be: that *is the question: ", - // complete delimiter - "Whether 'tis nobler in the mind to suffer |*", - // truncated delimiter - "The slings and arrows of outrageous fortune,|" + new String[]{ + // incomplete delimiter + "To be, or not to be: that |is the question: ", + // incomplete delimiter + "To be, or not to be: that *is the question: ", + // complete delimiter + "Whether 'tis nobler in the mind to suffer |*", + // truncated delimiter + "The slings and arrows of outrageous fortune,|" }; File tmpFile = tempFolder.newFile("tmpfile.txt"); @@ -590,7 +666,7 @@ public void testReadStringsWithCustomDelimiter() throws Exception { writer.write(Joiner.on("").join(inputStrings)); } - PAssert.that(p.apply(TextIO.read().from(filename).withDelimiter(new byte[] {'|', '*'}))) + PAssert.that(p.apply(TextIO.read().from(filename).withDelimiter(new byte[]{'|', '*'}))) .containsInAnyOrder( "To be, or not to be: that |is the question: To be, or not to be: " + "that *is the question: Whether 'tis nobler in the mind to suffer ", @@ -631,8 +707,11 @@ public void testReadDisplayData() { assertThat(displayData, hasDisplayItem("compressionType", BZIP2.toString())); } - /** Options for testing. */ + /** + * Options for testing. + */ public interface RuntimeTestOptions extends PipelineOptions { + ValueProvider getInput(); void setInput(ValueProvider value); @@ -704,9 +783,9 @@ public void testZipCompressedReadWithNoEntries() throws Exception { @Test @Category(NeedsRunner.class) public void testZipCompressedReadWithMultiEntriesFile() throws Exception { - String[] entry0 = new String[] {"first", "second", "three"}; - String[] entry1 = new String[] {"four", "five", "six"}; - String[] entry2 = new String[] {"seven", "eight", "nine"}; + String[] entry0 = new String[]{"first", "second", "three"}; + String[] entry1 = new String[]{"four", "five", "six"}; + String[] entry2 = new String[]{"seven", "eight", "nine"}; List expected = new ArrayList<>(); @@ -727,10 +806,10 @@ public void testZipCompressedReadWithComplexEmptyAndPresentEntries() throws Exce new ArrayList<>(), tempFolder, "complex empty and present entries", - new String[] {"cat"}, - new String[] {}, - new String[] {}, - new String[] {"dog"}); + new String[]{"cat"}, + new String[]{}, + new String[]{}, + new String[]{"dog"}); assertReadingCompressedFileMatchesExpected(file, ZIP, Arrays.asList("cat", "dog"), p); p.run(); @@ -977,7 +1056,7 @@ public void testReadFilesWithFilename() throws IOException { new TextSource( ValueProvider.StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, - new byte[] {'\n'}); + new byte[]{'\n'}); PCollection> lines = p.apply( @@ -1032,10 +1111,14 @@ public void testReadWatchForNewFiles() throws IOException, InterruptedException } } - /** Tests for TextSource class. */ + /** + * Tests for TextSource class. + */ @RunWith(JUnit4.class) public static class TextSourceTest { - @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + @Rule + public transient TestPipeline pipeline = TestPipeline.create(); @Test @Category(NeedsRunner.class) @@ -1118,10 +1201,14 @@ public void processElement(ProcessContext c) { } } - /** A transform that reads CSV file records. */ + /** + * A transform that reads CSV file records. + */ private static class TextFileReadTransform extends PTransform, PCollection> { - public TextFileReadTransform() {} + + public TextFileReadTransform() { + } @Override public PCollection expand(PCollection files) { From 7be63530274c739b92876b605a5be35c1047390a Mon Sep 17 00:00:00 2001 From: Hyungrok Ham Date: Tue, 19 Sep 2023 00:16:33 +0900 Subject: [PATCH 2/4] doc: add doc for withSkipHeaderLines --- .../core/src/main/java/org/apache/beam/sdk/io/TextIO.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index b9465f16a219..ccb934cf2608 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -407,6 +407,11 @@ public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) { return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); } + /** + * Sets the number of lines to skip from the beginning of the file. + *

+ * This disables split file reading and may cause performance degradation. + */ public Read withSkipHeaderLines(int skipHeaderLines) { return toBuilder().setSkipHeaderLines(skipHeaderLines).build(); } From d60cea0df6d801d3486a930d6435de7b6d7e4541 Mon Sep 17 00:00:00 2001 From: Hyungrok Ham Date: Tue, 19 Sep 2023 00:18:22 +0900 Subject: [PATCH 3/4] chore: spotless --- .../java/org/apache/beam/sdk/io/TextIO.java | 391 ++++++------------ .../org/apache/beam/sdk/io/TextSource.java | 28 +- .../apache/beam/sdk/io/TextIOReadTest.java | 218 +++++----- 3 files changed, 234 insertions(+), 403 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index ccb934cf2608..44bf1a9438d3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -68,9 +68,8 @@ * *

To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the - * file(s) to be read. Alternatively, if the filenames to be read are themselves in a - * {@link PCollection} you can use {@link FileIO} to match them and {@link TextIO#readFiles} to read - * them. + * file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link + * PCollection} you can use {@link FileIO} to match them and {@link TextIO#readFiles} to read them. * *

{@link #read} returns a {@link PCollection} of {@link String Strings}, each corresponding to * one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n', or @@ -79,14 +78,14 @@ *

Filepattern expansion and watching

* *

By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} or the - * combination of {@link FileIO.Match#continuously(Duration, TerminationCondition)} and - * {@link #readFiles()} allow streaming of new files matching the filepattern(s). + * combination of {@link FileIO.Match#continuously(Duration, TerminationCondition)} and {@link + * #readFiles()} allow streaming of new files matching the filepattern(s). * - *

By default, {@link #read} prohibits filepatterns that match no files, and - * {@link #readFiles()} allows them in case the filepattern contains a glob wildcard character. Use - * {@link Read#withEmptyMatchTreatment} or - * {@link FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link #readFiles()} to - * configure this behavior. + *

By default, {@link #read} prohibits filepatterns that match no files, and {@link #readFiles()} + * allows them in case the filepattern contains a glob wildcard character. Use {@link + * Read#withEmptyMatchTreatment} or {@link + * FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link #readFiles()} to configure + * this behavior. * *

Example 1: reading a file or filepattern. * @@ -179,15 +178,15 @@ * DynamicDestinations} interface for advanced features via {@link Write#to(DynamicDestinations)}. */ @SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class TextIO { private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L; /** - * A {@link PTransform} that reads from one or more text files and returns a bounded - * {@link PCollection} containing one element for each line of the input files. + * A {@link PTransform} that reads from one or more text files and returns a bounded {@link + * PCollection} containing one element for each line of the input files. */ public static Read read() { return new AutoValue_TextIO_Read.Builder() @@ -199,8 +198,8 @@ public static Read read() { } /** - * A {@link PTransform} that works like {@link #read}, but reads each file in a - * {@link PCollection} of filepatterns. + * A {@link PTransform} that works like {@link #read}, but reads each file in a {@link + * PCollection} of filepatterns. * *

Can be applied to both bounded and unbounded {@link PCollection PCollections}, so this is * suitable for reading a {@link PCollection} of filepatterns arriving as a stream. However, every @@ -209,8 +208,9 @@ public static Read read() { * new entries. * * @deprecated You can achieve The functionality of {@link #readAll()} using {@link FileIO} - * matching plus {@link #readFiles()}. This is the preferred method to make composition explicit. - * {@link ReadAll} will not receive upgrades and will be removed in a future version of Beam. + * matching plus {@link #readFiles()}. This is the preferred method to make composition + * explicit. {@link ReadAll} will not receive upgrades and will be removed in a future version + * of Beam. */ @Deprecated public static ReadAll readAll() { @@ -221,8 +221,8 @@ public static ReadAll readAll() { } /** - * Like {@link #read}, but reads each file in a {@link PCollection} of - * {@link FileIO.ReadableFile}, returned by {@link FileIO#readMatches}. + * Like {@link #read}, but reads each file in a {@link PCollection} of {@link + * FileIO.ReadableFile}, returned by {@link FileIO#readMatches}. */ public static ReadFiles readFiles() { return new AutoValue_TextIO_ReadFiles.Builder() @@ -250,8 +250,8 @@ public static Write write() { *

This version allows you to apply {@link TextIO} writes to a PCollection of a custom type * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the String that * will be written to the file must be specified. If using a custom {@link DynamicDestinations} - * object this is done using {@link DynamicDestinations#formatRecord}, otherwise the - * {@link TypedWrite#withFormatFunction} can be used to specify a format function. + * object this is done using {@link DynamicDestinations#formatRecord}, otherwise the {@link + * TypedWrite#withFormatFunction} can be used to specify a format function. * *

The advantage of using a custom type is that is it allows a user-provided {@link * DynamicDestinations} object, set via {@link Write#to(DynamicDestinations)} to examine the @@ -265,7 +265,7 @@ public static TypedWrite writeCustomType() { .setFilenameSuffix(null) .setFilenamePolicy(null) .setDynamicDestinations(null) - .setDelimiter(new char[]{'\n'}) + .setDelimiter(new char[] {'\n'}) .setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED) .setWindowedWrites(false) .setNoSpilling(false) @@ -273,9 +273,7 @@ public static TypedWrite writeCustomType() { .build(); } - /** - * Implementation of {@link #read}. - */ + /** Implementation of {@link #read}. */ @AutoValue public abstract static class Read extends PTransform> { @@ -322,33 +320,26 @@ abstract static class Builder { *

Standard Java * Filesystem glob patterns ("*", "?", "[..]") are supported. * - *

If it is known that the filepattern will match a very large number of files (at least - * tens of thousands), use {@link #withHintMatchesManyFiles} for better performance and - * scalability. + *

If it is known that the filepattern will match a very large number of files (at least tens + * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability. */ public Read from(String filepattern) { checkArgument(filepattern != null, "filepattern can not be null"); return from(StaticValueProvider.of(filepattern)); } - /** - * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. - */ + /** Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */ public Read from(ValueProvider filepattern) { checkArgument(filepattern != null, "filepattern can not be null"); return toBuilder().setFilepattern(filepattern).build(); } - /** - * Sets the {@link MatchConfiguration}. - */ + /** Sets the {@link MatchConfiguration}. */ public Read withMatchConfiguration(MatchConfiguration matchConfiguration) { return toBuilder().setMatchConfiguration(matchConfiguration).build(); } - /** - * @deprecated Use {@link #withCompression}. - */ + /** @deprecated Use {@link #withCompression}. */ @Deprecated public Read withCompressionType(TextIO.CompressionType compressionType) { return withCompression(compressionType.canonical); @@ -379,8 +370,8 @@ public Read watchForNewFiles( } /** - * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)} with - * {@code matchUpdatedFiles=false}. + * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)} with {@code + * matchUpdatedFiles=false}. */ public Read watchForNewFiles( Duration pollInterval, TerminationCondition terminationCondition) { @@ -400,25 +391,21 @@ public Read withHintMatchesManyFiles() { return toBuilder().setHintMatchesManyFiles(true).build(); } - /** - * See {@link MatchConfiguration#withEmptyMatchTreatment}. - */ + /** See {@link MatchConfiguration#withEmptyMatchTreatment}. */ public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) { return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); } /** * Sets the number of lines to skip from the beginning of the file. - *

- * This disables split file reading and may cause performance degradation. + * + *

This disables split file reading and may cause performance degradation. */ public Read withSkipHeaderLines(int skipHeaderLines) { return toBuilder().setSkipHeaderLines(skipHeaderLines).build(); } - /** - * Set the custom delimiter to be used in place of the default ones ('\r', '\n' or '\r\n'). - */ + /** Set the custom delimiter to be used in place of the default ones ('\r', '\n' or '\r\n'). */ public Read withDelimiter(byte[] delimiter) { checkArgument(delimiter != null, "delimiter can not be null"); checkArgument(!isSelfOverlapping(delimiter), "delimiter must not self-overlap"); @@ -517,16 +504,12 @@ abstract static class Builder { abstract ReadAll build(); } - /** - * Sets the {@link MatchConfiguration}. - */ + /** Sets the {@link MatchConfiguration}. */ public ReadAll withMatchConfiguration(MatchConfiguration configuration) { return toBuilder().setMatchConfiguration(configuration).build(); } - /** - * @deprecated Use {@link #withCompression}. - */ + /** @deprecated Use {@link #withCompression}. */ @Deprecated public ReadAll withCompressionType(TextIO.CompressionType compressionType) { return withCompression(compressionType.canonical); @@ -541,16 +524,12 @@ public ReadAll withCompression(Compression compression) { return toBuilder().setCompression(compression).build(); } - /** - * Same as {@link Read#withEmptyMatchTreatment}. - */ + /** Same as {@link Read#withEmptyMatchTreatment}. */ public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) { return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); } - /** - * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)}. - */ + /** Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)}. */ public ReadAll watchForNewFiles( Duration pollInterval, TerminationCondition terminationCondition, @@ -560,9 +539,7 @@ public ReadAll watchForNewFiles( .continuously(pollInterval, terminationCondition, matchUpdatedFiles)); } - /** - * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition)}. - */ + /** Same as {@link Read#watchForNewFiles(Duration, TerminationCondition)}. */ public ReadAll watchForNewFiles( Duration pollInterval, TerminationCondition terminationCondition) { return watchForNewFiles(pollInterval, terminationCondition, false); @@ -597,9 +574,7 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - /** - * Implementation of {@link #readFiles}. - */ + /** Implementation of {@link #readFiles}. */ @AutoValue public abstract static class ReadFiles extends PTransform, PCollection> { @@ -626,9 +601,7 @@ ReadFiles withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); } - /** - * Like {@link Read#withDelimiter}. - */ + /** Like {@link Read#withDelimiter}. */ public ReadFiles withDelimiter(byte[] delimiter) { return toBuilder().setDelimiter(delimiter).build(); } @@ -662,100 +635,66 @@ private CreateTextSourceFn(byte[] delimiter) { @Override public FileBasedSource apply(String input) { - return new TextSource(StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, - delimiter); + return new TextSource( + StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, delimiter); } } } // /////////////////////////////////////////////////////////////////////////// - /** - * Implementation of {@link #write}. - */ + /** Implementation of {@link #write}. */ @AutoValue public abstract static class TypedWrite extends PTransform, WriteFilesResult> { - /** - * The prefix of each file written, combined with suffix and shardTemplate. - */ + /** The prefix of each file written, combined with suffix and shardTemplate. */ abstract @Nullable ValueProvider getFilenamePrefix(); - /** - * The suffix of each file written, combined with prefix and shardTemplate. - */ + /** The suffix of each file written, combined with prefix and shardTemplate. */ abstract @Nullable String getFilenameSuffix(); - /** - * The base directory used for generating temporary files. - */ + /** The base directory used for generating temporary files. */ abstract @Nullable ValueProvider getTempDirectory(); - /** - * The delimiter between string records. - */ + /** The delimiter between string records. */ @SuppressWarnings("mutable") // this returns an array that can be mutated by the caller abstract char[] getDelimiter(); - /** - * An optional header to add to each file. - */ + /** An optional header to add to each file. */ abstract @Nullable String getHeader(); - /** - * An optional footer to add to each file. - */ + /** An optional footer to add to each file. */ abstract @Nullable String getFooter(); - /** - * Requested number of shards. 0 for automatic. - */ + /** Requested number of shards. 0 for automatic. */ abstract @Nullable ValueProvider getNumShards(); - /** - * The shard template of each file written, combined with prefix and suffix. - */ + /** The shard template of each file written, combined with prefix and suffix. */ abstract @Nullable String getShardTemplate(); - /** - * A policy for naming output files. - */ + /** A policy for naming output files. */ abstract @Nullable FilenamePolicy getFilenamePolicy(); - /** - * Allows for value-dependent {@link DynamicDestinations} to be vended. - */ + /** Allows for value-dependent {@link DynamicDestinations} to be vended. */ abstract @Nullable DynamicDestinations getDynamicDestinations(); - /** - * A destination function for using {@link DefaultFilenamePolicy}. - */ + /** A destination function for using {@link DefaultFilenamePolicy}. */ abstract @Nullable SerializableFunction getDestinationFunction(); - /** - * A default destination for empty PCollections. - */ + /** A default destination for empty PCollections. */ abstract @Nullable Params getEmptyDestination(); - /** - * A function that converts UserT to a String, for writing to the file. - */ + /** A function that converts UserT to a String, for writing to the file. */ abstract @Nullable SerializableFunction getFormatFunction(); - /** - * Whether to write windowed output files. - */ + /** Whether to write windowed output files. */ abstract boolean getWindowedWrites(); - /** - * Whether to skip the spilling of data caused by having maxNumWritersPerBundle. - */ + /** Whether to skip the spilling of data caused by having maxNumWritersPerBundle. */ abstract boolean getNoSpilling(); - /** - * Whether to skip writing any output files if the PCollection is empty. - */ + /** Whether to skip writing any output files if the PCollection is empty. */ abstract boolean getSkipIfEmpty(); /** @@ -815,14 +754,13 @@ abstract Builder setWritableByteChannelFactory( } /** - * Writes to text files with the given prefix. The given {@code prefix} can reference any - * {@link FileSystem} on the classpath. This prefix is used by the {@link DefaultFilenamePolicy} - * to generate filenames. + * Writes to text files with the given prefix. The given {@code prefix} can reference any {@link + * FileSystem} on the classpath. This prefix is used by the {@link DefaultFilenamePolicy} to + * generate filenames. * *

By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix - * to define the base output directory and file prefix, a shard identifier (see - * {@link #withNumShards(int)}), and a common suffix (if supplied using - * {@link #withSuffix(String)}). + * to define the base output directory and file prefix, a shard identifier (see {@link + * #withNumShards(int)}), and a common suffix (if supplied using {@link #withSuffix(String)}). * *

This default policy can be overridden using {@link #to(FilenamePolicy)}, in which case * {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should not be set. @@ -836,16 +774,12 @@ public TypedWrite to(String filenamePrefix) { return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix)); } - /** - * Like {@link #to(String)}. - */ + /** Like {@link #to(String)}. */ public TypedWrite to(ResourceId filenamePrefix) { return toResource(StaticValueProvider.of(filenamePrefix)); } - /** - * Like {@link #to(String)}. - */ + /** Like {@link #to(String)}. */ public TypedWrite to(ValueProvider outputPrefix) { return toResource( NestedValueProvider.of(outputPrefix, FileBasedSink::convertToFileResourceIfPossible)); @@ -865,7 +799,7 @@ public TypedWrite to(FilenamePolicy filenamePolicy) { * temporary files must be specified using {@link #withTempDirectory}. * * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()} - * instead. + * instead. */ @Deprecated public TypedWrite to( @@ -882,7 +816,7 @@ public TypedWrite to( * {@link PCollection} is empty. * * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()} - * instead. + * instead. */ @Deprecated public TypedWrite to( @@ -894,20 +828,18 @@ public TypedWrite to( .build(); } - /** - * Like {@link #to(ResourceId)}. - */ + /** Like {@link #to(ResourceId)}. */ public TypedWrite toResource(ValueProvider filenamePrefix) { return toBuilder().setFilenamePrefix(filenamePrefix).build(); } /** - * Specifies a format function to convert {@link UserT} to the output type. If - * {@link #to(DynamicDestinations)} is used, {@link DynamicDestinations#formatRecord(Object)} - * must be used instead. + * Specifies a format function to convert {@link UserT} to the output type. If {@link + * #to(DynamicDestinations)} is used, {@link DynamicDestinations#formatRecord(Object)} must be + * used instead. * * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()} - * instead. + * instead. */ @Deprecated public TypedWrite withFormatFunction( @@ -915,17 +847,13 @@ public TypedWrite withFormatFunction( return toBuilder().setFormatFunction(formatFunction).build(); } - /** - * Set the base directory used to generate temporary files. - */ + /** Set the base directory used to generate temporary files. */ public TypedWrite withTempDirectory( ValueProvider tempDirectory) { return toBuilder().setTempDirectory(tempDirectory).build(); } - /** - * Set the base directory used to generate temporary files. - */ + /** Set the base directory used to generate temporary files. */ public TypedWrite withTempDirectory(ResourceId tempDirectory) { return withTempDirectory(StaticValueProvider.of(tempDirectory)); } @@ -944,8 +872,8 @@ public TypedWrite withShardNameTemplate(String shardTemplat /** * Configures the filename suffix for written files. This option may only be used when using one - * of the default filename-prefix to() overrides - i.e. not when using either - * {@link #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}. + * of the default filename-prefix to() overrides - i.e. not when using either {@link + * #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}. * *

See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are * used. @@ -1025,8 +953,8 @@ public TypedWrite withFooter(@Nullable String footer) { } /** - * Returns a transform for writing to text files like this one but that has the given - * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The + * Returns a transform for writing to text files like this one but that has the given {@link + * WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The * default is value is {@link Compression#UNCOMPRESSED}. * *

A {@code null} value will reset the value to the default value mentioned above. @@ -1047,8 +975,7 @@ public TypedWrite withCompression(Compression compression) } /** - * Preserves windowing of input elements and writes them to files based on the element's - * window. + * Preserves windowing of input elements and writes them to files based on the element's window. * *

If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}. @@ -1057,16 +984,12 @@ public TypedWrite withWindowedWrites() { return toBuilder().setWindowedWrites(true).build(); } - /** - * See {@link WriteFiles#withNoSpilling()}. - */ + /** See {@link WriteFiles#withNoSpilling()}. */ public TypedWrite withNoSpilling() { return toBuilder().setNoSpilling(true).build(); } - /** - * Don't write any output files if the PCollection is empty. - */ + /** Don't write any output files if the PCollection is empty. */ public TypedWrite skipIfEmpty() { return toBuilder().setSkipIfEmpty(true).build(); } @@ -1116,9 +1039,9 @@ public WriteFilesResult expand(PCollection input) { checkArgument( 1 == Iterables.size( - allToArgs.stream() - .filter(Predicates.notNull()::apply) - .collect(Collectors.toList())), + allToArgs.stream() + .filter(Predicates.notNull()::apply) + .collect(Collectors.toList())), "Exactly one of filename policy, dynamic destinations, filename prefix, or destination " + "function must be set"); @@ -1190,8 +1113,7 @@ public void populateDisplayData(DisplayData.Builder builder) { */ public static class Write extends PTransform, PDone> { - @VisibleForTesting - TypedWrite inner; + @VisibleForTesting TypedWrite inner; Write() { this(TextIO.writeCustomType()); @@ -1201,40 +1123,30 @@ public static class Write extends PTransform, PDone> { this.inner = inner; } - /** - * See {@link TypedWrite#to(String)}. - */ + /** See {@link TypedWrite#to(String)}. */ public Write to(String filenamePrefix) { return new Write( inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.identity())); } - /** - * See {@link TypedWrite#to(ResourceId)}. - */ + /** See {@link TypedWrite#to(ResourceId)}. */ public Write to(ResourceId filenamePrefix) { return new Write( inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.identity())); } - /** - * See {@link TypedWrite#to(ValueProvider)}. - */ + /** See {@link TypedWrite#to(ValueProvider)}. */ public Write to(ValueProvider outputPrefix) { return new Write(inner.to(outputPrefix).withFormatFunction(SerializableFunctions.identity())); } - /** - * See {@link TypedWrite#toResource(ValueProvider)}. - */ + /** See {@link TypedWrite#toResource(ValueProvider)}. */ public Write toResource(ValueProvider filenamePrefix) { return new Write( inner.toResource(filenamePrefix).withFormatFunction(SerializableFunctions.identity())); } - /** - * See {@link TypedWrite#to(FilenamePolicy)}. - */ + /** See {@link TypedWrite#to(FilenamePolicy)}. */ public Write to(FilenamePolicy filenamePolicy) { return new Write( inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.identity())); @@ -1243,8 +1155,8 @@ public Write to(FilenamePolicy filenamePolicy) { /** * See {@link TypedWrite#to(DynamicDestinations)}. * - * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} ()} with - * {@link #sink()} instead. + * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} ()} with {@link + * #sink()} instead. */ @Deprecated public Write to(DynamicDestinations dynamicDestinations) { @@ -1255,8 +1167,8 @@ public Write to(DynamicDestinations dynamicDestinations) { /** * See {@link TypedWrite#to(SerializableFunction, Params)}. * - * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} ()} with - * {@link #sink()} instead. + * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} ()} with {@link + * #sink()} instead. */ @Deprecated public Write to( @@ -1267,101 +1179,73 @@ public Write to( .withFormatFunction(SerializableFunctions.identity())); } - /** - * See {@link TypedWrite#withTempDirectory(ValueProvider)}. - */ + /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */ public Write withTempDirectory(ValueProvider tempDirectory) { return new Write(inner.withTempDirectory(tempDirectory)); } - /** - * See {@link TypedWrite#withTempDirectory(ResourceId)}. - */ + /** See {@link TypedWrite#withTempDirectory(ResourceId)}. */ public Write withTempDirectory(ResourceId tempDirectory) { return new Write(inner.withTempDirectory(tempDirectory)); } - /** - * See {@link TypedWrite#withShardNameTemplate(String)}. - */ + /** See {@link TypedWrite#withShardNameTemplate(String)}. */ public Write withShardNameTemplate(String shardTemplate) { return new Write(inner.withShardNameTemplate(shardTemplate)); } - /** - * See {@link TypedWrite#withSuffix(String)}. - */ + /** See {@link TypedWrite#withSuffix(String)}. */ public Write withSuffix(String filenameSuffix) { return new Write(inner.withSuffix(filenameSuffix)); } - /** - * See {@link TypedWrite#withNumShards(int)}. - */ + /** See {@link TypedWrite#withNumShards(int)}. */ public Write withNumShards(int numShards) { return new Write(inner.withNumShards(numShards)); } - /** - * See {@link TypedWrite#withNumShards(ValueProvider)}. - */ + /** See {@link TypedWrite#withNumShards(ValueProvider)}. */ public Write withNumShards(@Nullable ValueProvider numShards) { return new Write(inner.withNumShards(numShards)); } - /** - * See {@link TypedWrite#withoutSharding()}. - */ + /** See {@link TypedWrite#withoutSharding()}. */ public Write withoutSharding() { return new Write(inner.withoutSharding()); } - /** - * See {@link TypedWrite#withDelimiter(char[])}. - */ + /** See {@link TypedWrite#withDelimiter(char[])}. */ public Write withDelimiter(char[] delimiter) { return new Write(inner.withDelimiter(delimiter)); } - /** - * See {@link TypedWrite#withHeader(String)}. - */ + /** See {@link TypedWrite#withHeader(String)}. */ public Write withHeader(@Nullable String header) { return new Write(inner.withHeader(header)); } - /** - * See {@link TypedWrite#withFooter(String)}. - */ + /** See {@link TypedWrite#withFooter(String)}. */ public Write withFooter(@Nullable String footer) { return new Write(inner.withFooter(footer)); } - /** - * See {@link TypedWrite#withWritableByteChannelFactory(WritableByteChannelFactory)}. - */ + /** See {@link TypedWrite#withWritableByteChannelFactory(WritableByteChannelFactory)}. */ public Write withWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory) { return new Write(inner.withWritableByteChannelFactory(writableByteChannelFactory)); } - /** - * See {@link TypedWrite#withCompression(Compression)}. - */ + /** See {@link TypedWrite#withCompression(Compression)}. */ public Write withCompression(Compression compression) { return new Write(inner.withCompression(compression)); } - /** - * See {@link TypedWrite#withWindowedWrites}. - */ + /** See {@link TypedWrite#withWindowedWrites}. */ public Write withWindowedWrites() { return new Write(inner.withWindowedWrites()); } - /** - * See {@link TypedWrite#withNoSpilling}. - */ + /** See {@link TypedWrite#withNoSpilling}. */ public Write withNoSpilling() { return new Write(inner.withNoSpilling()); } @@ -1375,8 +1259,8 @@ public Write withNoSpilling() { * output type, allowing access to output files. * *

The supplied {@code DestinationT} type must be: the same as that supplied in {@link - * #to(DynamicDestinations)} if that method was used; {@link Params} if - * {@link #to(SerializableFunction, Params)} was used, or {@code Void} otherwise. + * #to(DynamicDestinations)} if that method was used; {@link Params} if {@link + * #to(SerializableFunction, Params)} was used, or {@code Void} otherwise. */ public TypedWrite withOutputFilenames() { return (TypedWrite) inner; @@ -1394,44 +1278,28 @@ public PDone expand(PCollection input) { } } - /** - * @deprecated Use {@link Compression}. - */ + /** @deprecated Use {@link Compression}. */ @Deprecated public enum CompressionType { - /** - * @see Compression#AUTO - */ + /** @see Compression#AUTO */ AUTO(Compression.AUTO), - /** - * @see Compression#UNCOMPRESSED - */ + /** @see Compression#UNCOMPRESSED */ UNCOMPRESSED(Compression.UNCOMPRESSED), - /** - * @see Compression#GZIP - */ + /** @see Compression#GZIP */ GZIP(Compression.GZIP), - /** - * @see Compression#BZIP2 - */ + /** @see Compression#BZIP2 */ BZIP2(Compression.BZIP2), - /** - * @see Compression#ZIP - */ + /** @see Compression#ZIP */ ZIP(Compression.ZIP), - /** - * @see Compression#ZSTD - */ + /** @see Compression#ZSTD */ ZSTD(Compression.ZSTD), - /** - * @see Compression#DEFLATE - */ + /** @see Compression#DEFLATE */ DEFLATE(Compression.DEFLATE); private final Compression canonical; @@ -1440,9 +1308,7 @@ public enum CompressionType { this.canonical = canonical; } - /** - * @see Compression#matches - */ + /** @see Compression#matches */ public boolean matches(String filename) { return canonical.matches(filename); } @@ -1451,16 +1317,14 @@ public boolean matches(String filename) { ////////////////////////////////////////////////////////////////////////////// /** - * Creates a {@link Sink} that writes newline-delimited strings in UTF-8, for use with - * {@link FileIO#write}. + * Creates a {@link Sink} that writes newline-delimited strings in UTF-8, for use with {@link + * FileIO#write}. */ public static Sink sink() { return new AutoValue_TextIO_Sink.Builder().build(); } - /** - * Implementation of {@link #sink}. - */ + /** Implementation of {@link #sink}. */ @AutoValue public abstract static class Sink implements FileIO.Sink { @@ -1517,9 +1381,6 @@ public void flush() throws IOException { } } - /** - * Disable construction of utility class. - */ - private TextIO() { - } + /** Disable construction of utility class. */ + private TextIO() {} } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index 8e9dacc67294..19b294fe4755 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -41,9 +41,9 @@ * *

A {@link FileBasedSource} which can decode records delimited by newline characters. * - *

This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or - * {@code \r\n} as the delimiter. This source is not strict and supports decoding the last record - * even if it is not delimited. Finally, no records are decoded if the stream is empty. + *

This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or {@code + * \r\n} as the delimiter. This source is not strict and supports decoding the last record even if + * it is not delimited. Finally, no records are decoded if the stream is empty. * *

This source supports reading from any arbitrary byte position within the stream. If the * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found @@ -51,28 +51,31 @@ */ @VisibleForTesting @SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class TextSource extends FileBasedSource { int skipHeaderLines; byte[] delimiter; - public TextSource(ValueProvider fileSpec, EmptyMatchTreatment emptyMatchTreatment, - byte[] delimiter) { + public TextSource( + ValueProvider fileSpec, EmptyMatchTreatment emptyMatchTreatment, byte[] delimiter) { super(fileSpec, emptyMatchTreatment, 1L); this.delimiter = delimiter; } - public TextSource(ValueProvider fileSpec, EmptyMatchTreatment emptyMatchTreatment, - int skipHeaderLines, byte[] delimiter) { + public TextSource( + ValueProvider fileSpec, + EmptyMatchTreatment emptyMatchTreatment, + int skipHeaderLines, + byte[] delimiter) { super(fileSpec, emptyMatchTreatment, 1L); this.skipHeaderLines = skipHeaderLines; this.delimiter = delimiter; } - public TextSource(MatchResult.Metadata metadata, long start, long end, int skipHeaderLines, - byte[] delimiter) { + public TextSource( + MatchResult.Metadata metadata, long start, long end, int skipHeaderLines, byte[] delimiter) { super(metadata, 1L, start, end); this.skipHeaderLines = skipHeaderLines; this.delimiter = delimiter; @@ -114,7 +117,7 @@ static class TextBasedReader extends FileBasedReader { private static final int READ_BUFFER_SIZE = 8192; private static final ByteString UTF8_BOM = - ByteString.copyFrom(new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}); + ByteString.copyFrom(new byte[] {(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}); private static final byte CR = '\r'; private static final byte LF = '\n'; @@ -225,8 +228,7 @@ private boolean fileStartsWithBom() throws IOException { } if (bufferLength >= UTF8_BOM.size()) { int i; - for (i = 0; i < UTF8_BOM.size() && buffer[i] == UTF8_BOM.byteAt(i); ++i) { - } + for (i = 0; i < UTF8_BOM.size() && buffer[i] == UTF8_BOM.byteAt(i); ++i) {} if (i == UTF8_BOM.size()) { return true; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index d8cbe1532c8e..24798e839342 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -108,9 +108,7 @@ import org.junit.runners.JUnit4; import org.junit.runners.Parameterized; -/** - * Tests for {@link TextIO.Read}. - */ +/** Tests for {@link TextIO.Read}. */ @RunWith(Enclosed.class) public class TextIOReadTest { @@ -162,9 +160,7 @@ private static void writeToStreamAndClose(List lines, OutputStream outpu } } - /** - * Helper to make an array of compressible strings. Returns ["word"i] for i in range(0,n). - */ + /** Helper to make an array of compressible strings. Returns ["word"i] for i in range(0,n). */ private static List makeLines(int n) { List ret = new ArrayList<>(); for (int i = 0; i < n; ++i) { @@ -217,9 +213,9 @@ private static void assertReadingCompressedFileMatchesExpected( /** * Create a zip file with the given lines. * - * @param expected A list of expected lines, populated in the zip file. - * @param folder A temporary folder used to create files. - * @param filename Optionally zip file name (can be null). + * @param expected A list of expected lines, populated in the zip file. + * @param folder A temporary folder used to create files. + * @param filename Optionally zip file name (can be null). * @param fieldsEntries Fields to write in zip entries. * @return The zip filename. * @throws Exception In case of a failure during zip file creation. @@ -278,35 +274,31 @@ private static String getFileSuffix(Compression compression) { } } - /** - * Tests for reading from different size of files with various Compression. - */ + /** Tests for reading from different size of files with various Compression. */ @RunWith(Parameterized.class) public static class CompressedReadTest { - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - @Rule - public TestPipeline p = TestPipeline.create(); + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule public TestPipeline p = TestPipeline.create(); @Parameterized.Parameters(name = "{index}: {1}") public static Iterable data() { return ImmutableList.builder() - .add(new Object[]{EMPTY, UNCOMPRESSED}) - .add(new Object[]{EMPTY, GZIP}) - .add(new Object[]{EMPTY, BZIP2}) - .add(new Object[]{EMPTY, ZIP}) - .add(new Object[]{EMPTY, DEFLATE}) - .add(new Object[]{TINY, UNCOMPRESSED}) - .add(new Object[]{TINY, GZIP}) - .add(new Object[]{TINY, BZIP2}) - .add(new Object[]{TINY, ZIP}) - .add(new Object[]{TINY, DEFLATE}) - .add(new Object[]{LARGE, UNCOMPRESSED}) - .add(new Object[]{LARGE, GZIP}) - .add(new Object[]{LARGE, BZIP2}) - .add(new Object[]{LARGE, ZIP}) - .add(new Object[]{LARGE, DEFLATE}) + .add(new Object[] {EMPTY, UNCOMPRESSED}) + .add(new Object[] {EMPTY, GZIP}) + .add(new Object[] {EMPTY, BZIP2}) + .add(new Object[] {EMPTY, ZIP}) + .add(new Object[] {EMPTY, DEFLATE}) + .add(new Object[] {TINY, UNCOMPRESSED}) + .add(new Object[] {TINY, GZIP}) + .add(new Object[] {TINY, BZIP2}) + .add(new Object[] {TINY, ZIP}) + .add(new Object[] {TINY, DEFLATE}) + .add(new Object[] {LARGE, UNCOMPRESSED}) + .add(new Object[] {LARGE, GZIP}) + .add(new Object[] {LARGE, BZIP2}) + .add(new Object[] {LARGE, ZIP}) + .add(new Object[] {LARGE, DEFLATE}) .build(); } @@ -316,9 +308,7 @@ public static Iterable data() { @Parameterized.Parameter(1) public Compression compression; - /** - * Tests reading from a small, compressed file with no extension. - */ + /** Tests reading from a small, compressed file with no extension. */ @Test @Category(NeedsRunner.class) public void testCompressedReadWithoutExtension() throws Exception { @@ -360,18 +350,16 @@ public void testReadWithAuto() throws Exception { @RunWith(Parameterized.class) public static class ReadWithSkipHeaderLinesTest { - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - @Rule - public TestPipeline p = TestPipeline.create(); + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule public TestPipeline p = TestPipeline.create(); @Parameterized.Parameters(name = "{index}: {1}") public static Iterable data() { return ImmutableList.builder() - .add(new Object[]{TINY, 0, TINY.subList(0, TINY.size())}) - .add(new Object[]{TINY, 1, TINY.subList(1, TINY.size())}) - .add(new Object[]{TINY, 2, TINY.subList(2, TINY.size())}) - .add(new Object[]{TINY, 3, TINY.subList(3, TINY.size())}) + .add(new Object[] {TINY, 0, TINY.subList(0, TINY.size())}) + .add(new Object[] {TINY, 1, TINY.subList(1, TINY.size())}) + .add(new Object[] {TINY, 2, TINY.subList(2, TINY.size())}) + .add(new Object[] {TINY, 3, TINY.subList(3, TINY.size())}) .build(); } @@ -398,9 +386,7 @@ public void testReadWithSkipHeaderLines() throws Exception { } } - TextIO.Read read = TextIO.read() - .from(filename) - .withSkipHeaderLines(skipHeaderLines); + TextIO.Read read = TextIO.read().from(filename).withSkipHeaderLines(skipHeaderLines); PCollection output = p.apply(read); PAssert.that(output).containsInAnyOrder(expected); @@ -408,28 +394,25 @@ public void testReadWithSkipHeaderLines() throws Exception { } } - /** - * Tests for reading files with various delimiters. - */ + /** Tests for reading files with various delimiters. */ @RunWith(Parameterized.class) public static class ReadWithDefaultDelimiterTest { private static final ImmutableList EXPECTED = ImmutableList.of("asdf", "hjkl", "xyz"); - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); @Parameterized.Parameters(name = "{index}: {0}") public static Iterable data() { return ImmutableList.builder() - .add(new Object[]{"\n\n\n", ImmutableList.of("", "", "")}) - .add(new Object[]{"asdf\nhjkl\nxyz\n", EXPECTED}) - .add(new Object[]{"asdf\rhjkl\rxyz\r", EXPECTED}) - .add(new Object[]{"asdf\r\nhjkl\r\nxyz\r\n", EXPECTED}) - .add(new Object[]{"asdf\rhjkl\r\nxyz\n", EXPECTED}) - .add(new Object[]{"asdf\nhjkl\nxyz", EXPECTED}) - .add(new Object[]{"asdf\rhjkl\rxyz", EXPECTED}) - .add(new Object[]{"asdf\r\nhjkl\r\nxyz", EXPECTED}) - .add(new Object[]{"asdf\rhjkl\r\nxyz", EXPECTED}) + .add(new Object[] {"\n\n\n", ImmutableList.of("", "", "")}) + .add(new Object[] {"asdf\nhjkl\nxyz\n", EXPECTED}) + .add(new Object[] {"asdf\rhjkl\rxyz\r", EXPECTED}) + .add(new Object[] {"asdf\r\nhjkl\r\nxyz\r\n", EXPECTED}) + .add(new Object[] {"asdf\rhjkl\r\nxyz\n", EXPECTED}) + .add(new Object[] {"asdf\nhjkl\nxyz", EXPECTED}) + .add(new Object[] {"asdf\rhjkl\rxyz", EXPECTED}) + .add(new Object[] {"asdf\r\nhjkl\r\nxyz", EXPECTED}) + .add(new Object[] {"asdf\rhjkl\r\nxyz", EXPECTED}) .build(); } @@ -511,34 +494,31 @@ private void runTestReadWithData(byte[] data, List expectedResults) thro } } - /** - * Tests for reading files with various delimiters. - */ + /** Tests for reading files with various delimiters. */ @RunWith(Parameterized.class) public static class ReadWithCustomDelimiterTest { - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); @Parameterized.Parameters(name = "{index}: {0}") public static Iterable data() { return ImmutableList.builder() - .add(new Object[]{"first|*second|*|*third"}) - .add(new Object[]{"first|*second|*|*third|"}) - .add(new Object[]{"first|*second|*|*third*"}) - .add(new Object[]{"first|*second|*|*third|*"}) - .add(new Object[]{"|first|*second|*|*third"}) - .add(new Object[]{"|first|*second|*|*third|"}) - .add(new Object[]{"|first|*second|*|*third*"}) - .add(new Object[]{"|first|*second|*|*third|*"}) - .add(new Object[]{"*first|*second|*|*third"}) - .add(new Object[]{"*first|*second|*|*third|"}) - .add(new Object[]{"*first|*second|*|*third*"}) - .add(new Object[]{"*first|*second|*|*third|*"}) - .add(new Object[]{"|*first|*second|*|*third"}) - .add(new Object[]{"|*first|*second|*|*third|"}) - .add(new Object[]{"|*first|*second|*|*third*"}) - .add(new Object[]{"|*first|*second|*|*third|*"}) + .add(new Object[] {"first|*second|*|*third"}) + .add(new Object[] {"first|*second|*|*third|"}) + .add(new Object[] {"first|*second|*|*third*"}) + .add(new Object[] {"first|*second|*|*third|*"}) + .add(new Object[] {"|first|*second|*|*third"}) + .add(new Object[] {"|first|*second|*|*third|"}) + .add(new Object[] {"|first|*second|*|*third*"}) + .add(new Object[] {"|first|*second|*|*third|*"}) + .add(new Object[] {"*first|*second|*|*third"}) + .add(new Object[] {"*first|*second|*|*third|"}) + .add(new Object[] {"*first|*second|*|*third*"}) + .add(new Object[] {"*first|*second|*|*third|*"}) + .add(new Object[] {"|*first|*second|*|*third"}) + .add(new Object[] {"|*first|*second|*|*third|"}) + .add(new Object[] {"|*first|*second|*|*third*"}) + .add(new Object[] {"|*first|*second|*|*third|*"}) .build(); } @@ -548,14 +528,14 @@ public static Iterable data() { @Test public void testReadLinesWithCustomDelimiter() throws Exception { SourceTestUtils.assertSplitAtFractionExhaustive( - TextIOReadTest.prepareSource(tempFolder, testCase.getBytes(UTF_8), new byte[]{'|', '*'}), + TextIOReadTest.prepareSource(tempFolder, testCase.getBytes(UTF_8), new byte[] {'|', '*'}), PipelineOptionsFactory.create()); } @Test public void testReadLinesWithCustomDelimiterAndZeroAndOneLengthReturningChannel() throws Exception { - byte[] delimiter = new byte[]{'|', '*'}; + byte[] delimiter = new byte[] {'|', '*'}; Path path = tempFolder.newFile().toPath(); Files.write(path, testCase.getBytes(UTF_8)); Metadata metadata = FileSystems.matchSingleFileSpec(path.toString()); @@ -605,16 +585,12 @@ public void close() throws IOException { } } - /** - * Tests for some basic operations in {@link TextIO.Read}. - */ + /** Tests for some basic operations in {@link TextIO.Read}. */ @RunWith(JUnit4.class) public static class BasicIOTest { - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - @Rule - public TestPipeline p = TestPipeline.create(); + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule public TestPipeline p = TestPipeline.create(); private void runTestRead(String[] expected) throws Exception { File tmpFile = tempFolder.newFile(); @@ -637,26 +613,26 @@ private void runTestRead(String[] expected) throws Exception { @Test public void testDelimiterSelfOverlaps() { - assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c'})); - assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'c', 'a', 'b', 'd', 'a', 'b'})); - assertFalse(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 'b', 'd'})); - assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'a'})); - assertTrue(TextIO.Read.isSelfOverlapping(new byte[]{'a', 'b', 'c', 'a', 'b'})); + assertFalse(TextIO.Read.isSelfOverlapping(new byte[] {'a', 'b', 'c'})); + assertFalse(TextIO.Read.isSelfOverlapping(new byte[] {'c', 'a', 'b', 'd', 'a', 'b'})); + assertFalse(TextIO.Read.isSelfOverlapping(new byte[] {'a', 'b', 'c', 'a', 'b', 'd'})); + assertTrue(TextIO.Read.isSelfOverlapping(new byte[] {'a', 'b', 'a'})); + assertTrue(TextIO.Read.isSelfOverlapping(new byte[] {'a', 'b', 'c', 'a', 'b'})); } @Test @Category(NeedsRunner.class) public void testReadStringsWithCustomDelimiter() throws Exception { final String[] inputStrings = - new String[]{ - // incomplete delimiter - "To be, or not to be: that |is the question: ", - // incomplete delimiter - "To be, or not to be: that *is the question: ", - // complete delimiter - "Whether 'tis nobler in the mind to suffer |*", - // truncated delimiter - "The slings and arrows of outrageous fortune,|" + new String[] { + // incomplete delimiter + "To be, or not to be: that |is the question: ", + // incomplete delimiter + "To be, or not to be: that *is the question: ", + // complete delimiter + "Whether 'tis nobler in the mind to suffer |*", + // truncated delimiter + "The slings and arrows of outrageous fortune,|" }; File tmpFile = tempFolder.newFile("tmpfile.txt"); @@ -666,7 +642,7 @@ public void testReadStringsWithCustomDelimiter() throws Exception { writer.write(Joiner.on("").join(inputStrings)); } - PAssert.that(p.apply(TextIO.read().from(filename).withDelimiter(new byte[]{'|', '*'}))) + PAssert.that(p.apply(TextIO.read().from(filename).withDelimiter(new byte[] {'|', '*'}))) .containsInAnyOrder( "To be, or not to be: that |is the question: To be, or not to be: " + "that *is the question: Whether 'tis nobler in the mind to suffer ", @@ -707,9 +683,7 @@ public void testReadDisplayData() { assertThat(displayData, hasDisplayItem("compressionType", BZIP2.toString())); } - /** - * Options for testing. - */ + /** Options for testing. */ public interface RuntimeTestOptions extends PipelineOptions { ValueProvider getInput(); @@ -783,9 +757,9 @@ public void testZipCompressedReadWithNoEntries() throws Exception { @Test @Category(NeedsRunner.class) public void testZipCompressedReadWithMultiEntriesFile() throws Exception { - String[] entry0 = new String[]{"first", "second", "three"}; - String[] entry1 = new String[]{"four", "five", "six"}; - String[] entry2 = new String[]{"seven", "eight", "nine"}; + String[] entry0 = new String[] {"first", "second", "three"}; + String[] entry1 = new String[] {"four", "five", "six"}; + String[] entry2 = new String[] {"seven", "eight", "nine"}; List expected = new ArrayList<>(); @@ -806,10 +780,10 @@ public void testZipCompressedReadWithComplexEmptyAndPresentEntries() throws Exce new ArrayList<>(), tempFolder, "complex empty and present entries", - new String[]{"cat"}, - new String[]{}, - new String[]{}, - new String[]{"dog"}); + new String[] {"cat"}, + new String[] {}, + new String[] {}, + new String[] {"dog"}); assertReadingCompressedFileMatchesExpected(file, ZIP, Arrays.asList("cat", "dog"), p); p.run(); @@ -1056,7 +1030,7 @@ public void testReadFilesWithFilename() throws IOException { new TextSource( ValueProvider.StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, - new byte[]{'\n'}); + new byte[] {'\n'}); PCollection> lines = p.apply( @@ -1111,14 +1085,11 @@ public void testReadWatchForNewFiles() throws IOException, InterruptedException } } - /** - * Tests for TextSource class. - */ + /** Tests for TextSource class. */ @RunWith(JUnit4.class) public static class TextSourceTest { - @Rule - public transient TestPipeline pipeline = TestPipeline.create(); + @Rule public transient TestPipeline pipeline = TestPipeline.create(); @Test @Category(NeedsRunner.class) @@ -1201,14 +1172,11 @@ public void processElement(ProcessContext c) { } } - /** - * A transform that reads CSV file records. - */ + /** A transform that reads CSV file records. */ private static class TextFileReadTransform extends PTransform, PCollection> { - public TextFileReadTransform() { - } + public TextFileReadTransform() {} @Override public PCollection expand(PCollection files) { From 6d2f1b18ae36e9b39ba959f5161d765edeab6139 Mon Sep 17 00:00:00 2001 From: Hyungrok Ham Date: Tue, 19 Sep 2023 00:31:52 +0900 Subject: [PATCH 4/4] fix: add checkArgument --- sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java | 2 ++ .../src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 44bf1a9438d3..9e6172daba9f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -402,6 +402,8 @@ public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) { *

This disables split file reading and may cause performance degradation. */ public Read withSkipHeaderLines(int skipHeaderLines) { + checkArgument( + skipHeaderLines > 0, "skipHeaderLines should be > 0, but was %s", skipHeaderLines); return toBuilder().setSkipHeaderLines(skipHeaderLines).build(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 24798e839342..935362517747 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -356,7 +356,6 @@ public static class ReadWithSkipHeaderLinesTest { @Parameterized.Parameters(name = "{index}: {1}") public static Iterable data() { return ImmutableList.builder() - .add(new Object[] {TINY, 0, TINY.subList(0, TINY.size())}) .add(new Object[] {TINY, 1, TINY.subList(1, TINY.size())}) .add(new Object[] {TINY, 2, TINY.subList(2, TINY.size())}) .add(new Object[] {TINY, 3, TINY.subList(3, TINY.size())})