From eb971843600596703de4440492156349c1932155 Mon Sep 17 00:00:00 2001 From: Arjun Date: Wed, 18 Oct 2023 00:14:04 -0700 Subject: [PATCH] add dataset root in some common type of datasets --- .../data/management/copy/RecursiveCopyableDataset.java | 5 +++++ .../data/management/copy/iceberg/IcebergDataset.java | 9 +++++++++ .../data/management/copy/iceberg/IcebergTable.java | 6 ++---- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java index 1a7e0d069ff..6b181011a88 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java @@ -237,4 +237,9 @@ private static boolean sameFile(FileStatus fileInSource, FileStatus fileInTarget return fileInTarget.getLen() == fileInSource.getLen() && fileInSource.getModificationTime() <= fileInTarget .getModificationTime(); } + + @Override + public String getDatasetPath() { + return Path.getPathWithoutSchemeAndAuthority(this.rootPath).toString(); + } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java index 05f1e265d2a..a59fc368812 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java @@ -89,6 +89,15 @@ public String datasetURN() { return this.getFileSetId(); } + @Override + public String getDatasetPath() { + try { + return this.destIcebergTable.accessTableMetadata().location(); + } catch (IcebergTable.TableNotFoundException e) { + throw new RuntimeException(e); + } + } + /** * Finds all files read by the table and generates CopyableFiles. * For the specific semantics see {@link #createFileSets}. diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index 529f53a45e1..4fe1840fd00 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -178,11 +178,8 @@ protected static IcebergSnapshotInfo.ManifestFileInfo calcManifestFileInfo(Manif } protected static List discoverDataFilePaths(ManifestFile manifest, FileIO io) throws IOException { - CloseableIterable manifestPathsIterable = ManifestFiles.readPaths(manifest, io); - try { + try (CloseableIterable manifestPathsIterable = ManifestFiles.readPaths(manifest, io)) { return Lists.newArrayList(manifestPathsIterable); - } finally { - manifestPathsIterable.close(); } } protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) { @@ -194,6 +191,7 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) { descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString()); return descriptor; } + /** Registers {@link IcebergTable} after publishing data. * @param dstMetadata is null if destination {@link IcebergTable} is absent, in which case registration is skipped */ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dstMetadata) {