Skip to content

Commit

Permalink
Deal with trailing slash in tempRoot (apache#29478)
Browse files Browse the repository at this point in the history
* Deal with trailing slash in tempRoot

* Add FileSystems.matchNewDirectory and replace the raw string concatenate throughout the code base

* Fix test assert

* Use File.separator per suggestion
  • Loading branch information
Abacn authored Dec 12, 2023
1 parent 475a0c7 commit efe7e6a
Show file tree
Hide file tree
Showing 15 changed files with 88 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
Expand All @@ -33,12 +34,12 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -74,8 +75,15 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
public static TestDataflowRunner fromOptions(PipelineOptions options) {
TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
String tempLocation =
Joiner.on("/")
.join(dataflowOptions.getTempRoot(), dataflowOptions.getJobName(), "output", "results");
FileSystems.matchNewDirectory(
dataflowOptions.getTempRoot(), dataflowOptions.getJobName(), "output", "results")
.toString();
// to keep exact same behavior prior to matchNewDirectory introduced
if (tempLocation.endsWith("/")) {
tempLocation = tempLocation.substring(0, tempLocation.length() - 1);
} else if (tempLocation.endsWith(File.separator)) {
tempLocation = tempLocation.substring(0, tempLocation.length() - File.separator.length());
}
dataflowOptions.setTempLocation(tempLocation);

return new TestDataflowRunner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
Expand Down Expand Up @@ -603,4 +604,20 @@ public static ResourceId matchNewResource(String singleResourceSpec, boolean isD
return getFileSystemInternal(parseScheme(singleResourceSpec))
.matchNewResource(singleResourceSpec, isDirectory);
}

/**
* Returns a new {@link ResourceId} that represents the named directory resource.
*
* @param singleResourceSpec the root directory, for example "/abc"
* @param baseNames a list of named directory, for example ["d", "e", "f"]
* @return the ResourceId for the resolved directory. In same example as above, it corresponds to
* "/abc/d/e/f".
*/
public static ResourceId matchNewDirectory(String singleResourceSpec, String... baseNames) {
ResourceId currentDir = matchNewResource(singleResourceSpec, true);
for (String dir : baseNames) {
currentDir = currentDir.resolve(dir, StandardResolveOptions.RESOLVE_DIRECTORY);
}
return currentDir;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
Expand Down Expand Up @@ -316,6 +317,25 @@ public void testInvalidSchemaMatchNewResource() {
assertEquals("file", FileSystems.matchNewResource("c:/tmp/f1", false));
}

@Test
public void testMatchNewDirectory() {
List<KV<String, KV<String, String[]>>> testCases =
ImmutableList.<KV<String, KV<String, String[]>>>builder()
.add(KV.of("/abc/d/", KV.of("/abc", new String[] {"d"})))
.add(KV.of("/abc/d/", KV.of("/abc/", new String[] {"d"})))
.add(KV.of("/abc/d/", KV.of("/abc", new String[] {"d/"})))
.add(KV.of("/abc/d/e/f/", KV.of("/abc", new String[] {"d", "e", "f"})))
.add(KV.of("/abc/", KV.of("/abc", new String[] {})))
.build();
for (KV<String, KV<String, String[]>> testCase : testCases) {
ResourceId expected = FileSystems.matchNewResource(testCase.getKey(), true);
ResourceId actual =
FileSystems.matchNewDirectory(
testCase.getValue().getKey(), testCase.getValue().getValue());
assertEquals(expected, actual);
}
}

private static List<ResourceId> toResourceIds(List<Path> paths, final boolean isDirectory) {
return FluentIterable.from(paths)
.transform(path -> (ResourceId) LocalResourceId.fromPath(path, isDirectory))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.testing.UsesKms;
Expand Down Expand Up @@ -53,7 +54,8 @@ public void testRewriteMultiPart() throws IOException {
// Using a KMS key is necessary to trigger multi-part rewrites (bucket is created
// with a bucket default key).
assertNotNull(options.getTempRoot());
options.setTempLocation(options.getTempRoot() + "/testRewriteMultiPart");
options.setTempLocation(
FileSystems.matchNewDirectory(options.getTempRoot(), "testRewriteMultiPart").toString());

GcsOptions gcsOptions = options.as(GcsOptions.class);
GcsUtil gcsUtil = gcsOptions.getGcsUtil();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
Expand Down Expand Up @@ -68,7 +69,8 @@ public class BigQueryClusteringIT {
public void setUp() {
PipelineOptionsFactory.register(BigQueryClusteringITOptions.class);
options = TestPipeline.testingPipelineOptions().as(BigQueryClusteringITOptions.class);
options.setTempLocation(options.getTempRoot() + "/temp-it/");
options.setTempLocation(
FileSystems.matchNewDirectory(options.getTempRoot(), "temp-it").toString());
bqClient = BigqueryClient.getNewBigqueryClient(options.getAppName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
Expand Down Expand Up @@ -70,7 +71,8 @@ public class BigQueryIOJsonIT {

static {
TestPipelineOptions opt = TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
testOptions.setTempLocation(opt.getTempRoot() + "/java-tmp");
testOptions.setTempLocation(
FileSystems.matchNewDirectory(opt.getTempRoot(), "java-tmp").toString());
}

@Rule public final transient TestPipeline p = TestPipeline.fromOptions(testOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -71,7 +72,8 @@ private void setupTestEnvironment(String recordSize) {
PipelineOptionsFactory.register(BigQueryIOReadOptions.class);
options = TestPipeline.testingPipelineOptions().as(BigQueryIOReadOptions.class);
options.setNumRecords(numOfRecords.get(recordSize));
options.setTempLocation(options.getTempRoot() + "/temp-it/");
options.setTempLocation(
FileSystems.matchNewDirectory(options.getTempRoot(), "temp-it").toString());
project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
options.setInputTable(project + ":" + datasetId + "." + tablePrefix + recordSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.Description;
Expand Down Expand Up @@ -88,7 +89,8 @@ private void setUpTestEnvironment(String tableName) {
options = TestPipeline.testingPipelineOptions().as(BigQueryIOStorageReadTableRowOptions.class);
String project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
options.setInputTable(project + ":" + DATASET_ID + "." + TABLE_PREFIX + tableName);
options.setTempLocation(options.getTempRoot() + "/temp-it/");
options.setTempLocation(
FileSystems.matchNewDirectory(options.getTempRoot(), "temp-it").toString());
}

private static void runPipeline(BigQueryIOStorageReadTableRowOptions pipelineOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.security.SecureRandom;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down Expand Up @@ -87,7 +88,8 @@ private void testQueryAndWrite(Method method) throws Exception {
String outputTableId = "testQueryAndWrite_" + method.name();
String outputTableSpec = project + ":" + BIG_QUERY_DATASET_ID + "." + outputTableId;

options.setTempLocation(options.getTempRoot() + "/bq_it_temp");
options.setTempLocation(
FileSystems.matchNewDirectory(options.getTempRoot(), "bq_it_temp").toString());
Pipeline p = Pipeline.create(options);
// Reading triggers BQ query and extract jobs. Writing triggers either a load job or performs a
// streaming insert (depending on method).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
Expand Down Expand Up @@ -63,7 +64,8 @@ public void testNestedRecords() throws Exception {
TestPipelineOptions testOptions =
TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
Options options = testOptions.as(Options.class);
options.setTempLocation(testOptions.getTempRoot() + "/temp-it/");
options.setTempLocation(
FileSystems.matchNewDirectory(testOptions.getTempRoot(), "temp-it").toString());
runPipeline(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption;
Expand Down Expand Up @@ -150,7 +151,8 @@ private void runWriteTest(
List<List<String>> expectedResult)
throws Exception {
Options options = TestPipeline.testingPipelineOptions().as(Options.class);
options.setTempLocation(options.getTempRoot() + "/bq_it_temp");
options.setTempLocation(
FileSystems.matchNewDirectory(options.getTempRoot(), "bq_it_temp").toString());

Pipeline p = Pipeline.create(options);
Create.Values<TableRow> input = Create.<TableRow>of(rowToInsert);
Expand Down Expand Up @@ -264,7 +266,8 @@ public void runWriteTestTempTableAndDynamicDestination() throws Exception {
}

Options options = TestPipeline.testingPipelineOptions().as(Options.class);
options.setTempLocation(options.getTempRoot() + "/bq_it_temp");
options.setTempLocation(
FileSystems.matchNewDirectory(options.getTempRoot(), "bq_it_temp").toString());

Pipeline p = Pipeline.create(options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
Expand Down Expand Up @@ -90,7 +91,8 @@ public static void setupTestEnvironment() throws Exception {
public void setUp() {
PipelineOptionsFactory.register(BigQueryClusteringITOptions.class);
options = TestPipeline.testingPipelineOptions().as(BigQueryClusteringITOptions.class);
options.setTempLocation(options.getTempRoot() + "/temp-it/");
options.setTempLocation(
FileSystems.matchNewDirectory(options.getTempRoot(), "temp-it").toString());
bqClient = BigqueryClient.getNewBigqueryClient(options.getAppName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
Expand Down Expand Up @@ -118,7 +119,8 @@ private void runBigQueryToTablePipeline(BigQueryToTableOptions options) {
private BigQueryToTableOptions setupLegacyQueryTest(String outputTable) {
BigQueryToTableOptions options =
TestPipeline.testingPipelineOptions().as(BigQueryToTableOptions.class);
options.setTempLocation(options.getTempRoot() + "/bq_it_temp");
options.setTempLocation(
FileSystems.matchNewDirectory(options.getTempRoot(), "bq_it_temp").toString());
options.setQuery("SELECT * FROM (SELECT \"apple\" as fruit), (SELECT \"orange\" as fruit),");
options.setOutput(outputTable);
options.setOutputSchema(BigQueryToTableIT.LEGACY_QUERY_TABLE_SCHEMA);
Expand All @@ -128,7 +130,8 @@ private BigQueryToTableOptions setupLegacyQueryTest(String outputTable) {
private BigQueryToTableOptions setupNewTypesQueryTest(String outputTable) {
BigQueryToTableOptions options =
TestPipeline.testingPipelineOptions().as(BigQueryToTableOptions.class);
options.setTempLocation(options.getTempRoot() + "/bq_it_temp");
options.setTempLocation(
FileSystems.matchNewDirectory(options.getTempRoot(), "bq_it_temp").toString());
options.setQuery(
String.format(
"SELECT bytes, date, time FROM [%s:%s.%s]",
Expand All @@ -140,7 +143,8 @@ private BigQueryToTableOptions setupNewTypesQueryTest(String outputTable) {

private BigQueryToTableOptions setupStandardQueryTest(String outputTable) {
BigQueryToTableOptions options = this.setupLegacyQueryTest(outputTable);
options.setTempLocation(options.getTempRoot() + "/bq_it_temp");
options.setTempLocation(
FileSystems.matchNewDirectory(options.getTempRoot(), "bq_it_temp").toString());
options.setQuery(
"SELECT * FROM (SELECT \"apple\" as fruit) UNION ALL (SELECT \"orange\" as fruit)");
options.setUsingStandardSql(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public void testGcsWriteWithKmsKey() {
TestPipelineOptions options =
TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
assertNotNull(options.getTempRoot());
options.setTempLocation(options.getTempRoot() + "/testGcsWriteWithKmsKey");
options.setTempLocation(
FileSystems.matchNewDirectory(options.getTempRoot(), "testGcsWriteWithKmsKey").toString());
GcsOptions gcsOptions = options.as(GcsOptions.class);

ResourceId filenamePrefix =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public Void apply(Iterable<Metadata> input) {
public static void setUp() throws Exception {
options = TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
assertNotNull(options.getTempRoot());
options.setTempLocation(options.getTempRoot() + "/GcsMatchIT");
options.setTempLocation(
FileSystems.matchNewDirectory(options.getTempRoot(), "GcsMatchIT").toString());
GcsOptions gcsOptions = options.as(GcsOptions.class);
String dstFolderName =
gcsOptions.getGcpTempLocation()
Expand Down

0 comments on commit efe7e6a

Please sign in to comment.