Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report File Lineage on directory #32662

Merged
merged 4 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,25 @@ protected final List<KV<FileResult<DestinationT>, ResourceId>> finalizeDestinati
distinctFilenames.get(finalFilename));
distinctFilenames.put(finalFilename, result);
outputFilenames.add(KV.of(result, finalFilename));
FileSystems.reportSinkLineage(finalFilename);
}
reportSinkLineage(outputFilenames);
return outputFilenames;
}

/**
* Report sink Lineage. Report every file if number of files no more than 100, otherwise only
* report at directory level.
*/
private void reportSinkLineage(List<KV<FileResult<DestinationT>, ResourceId>> outputFilenames) {
if (outputFilenames.size() <= 100) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason to pick 100? It it is just a good average number then that is fine too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I admit it's a hardcoded number which is not ideal generally. The consideration here is that I wish to not introduce functionality regression (i.e. report one level above) from 2.59.0 for use cases of small number of files, while we have to do that in general to mitigate the metrics size limit.

Speaking about the number, finalizeDestination is called in code paths

  • Without dynamic destination, finalizeDestination is called once, on single worker.

  • With dynamic destination, finalizeDestination is called by finalizeAllDestination, per destination.

In the first case, it is indeed safe to set a higher number, generally (however if the filepath length is very long, it will still trigger the set size truncation (#32650)

In the second case, when there are lots of destinations, and even single destination only report full number of files when shardsize <= 100, it may still trigger #32650

That being said there is no good choice to this number that can prevent #32650 being triggered in all use cases, but I think it is good enough to avoid the functionality regression for relevant use cases (small shards, when the whole path is mostly useful)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Picking a good estimate number sounds totally reasonable. Thank you. For completeness of discussion:

The consideration here is that I wish to not introduce functionality regression (i.e. report one level above) from 2.59.0

Since lineage is not a launched feature we do not need to worry about regression. For cases where jobs running on 2.59.0 SDK are currently emitting full path and will switch to reporting some aggregate is fine because these metrics are not consumed by anyone as of now. Dataflow service is not reporting these as lineage and cannot until the job is restarted with lineage enabled flag.

100 sounds ok to me for now.

In general I am wondering if we can make it configurable i.e. customer who use FileIOs can specify at what n we should roll up to report at higher level i.e. if number of files is >= n report dir if number of dir is >= n report bucket. We do not need to do this now but can do in future releases with more considerations.

for (KV<FileResult<DestinationT>, ResourceId> kv : outputFilenames) {
FileSystems.reportSinkLineage(kv.getValue());
}
} else {
FileSystems.reportSinkLineage(outputFilenames.get(0).getValue().getCurrentDirectory());
}
}

private Collection<FileResult<DestinationT>> createMissingEmptyShards(
@Nullable DestinationT dest,
@Nullable Integer numShards,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,10 @@ public final List<? extends FileBasedSource<T>> split(
System.currentTimeMillis() - startTime,
expandedFiles.size(),
splitResults.size());

reportSourceLineage(expandedFiles);
return splitResults;
} else {
FileSystems.reportSourceLineage(getSingleFileMetadata().resourceId());
if (isSplittable()) {
@SuppressWarnings("unchecked")
List<FileBasedSource<T>> splits =
Expand All @@ -315,6 +316,22 @@ public final List<? extends FileBasedSource<T>> split(
}
}

/** Report source Lineage. Depend on the number of files, report full file name or only dir. */
private void reportSourceLineage(List<Metadata> expandedFiles) {
if (expandedFiles.size() <= 100) {
for (Metadata metadata : expandedFiles) {
FileSystems.reportSourceLineage(metadata.resourceId());
}
} else {
for (Metadata metadata : expandedFiles) {
// TODO(yathu) Currently it simply report one level up if num of files exceeded 100.
// Consider more dedicated strategy (e.g. resolve common ancestor) for accurancy, and work
// with metrics size limit.
FileSystems.reportSourceLineage(metadata.resourceId().getCurrentDirectory());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that one level up dir is above 100?

say for example
a/b/**/*.avro
will match all avro files under a/b/ at any level.
a/b/1/1.avro
a/b/2/2.avro
..
a/b/100/100.avro
a/b/101/101.avro

will lead to more than 100 dirs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is absolutely possible, then it relies on #32650 to guard the stringset size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the <= check here too and if > roll up one level and see if we find common ancestors in dir path where the path reported will remain less than 100 and we recursively check this till we reach the root of shortest path in the list? (just a suggestion, keeping it as is also fine).

Copy link
Contributor Author

@Abacn Abacn Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see https://github.com/apache/beam/pull/32662/files#r1790385413 in readAll we do not know in advance the number of files, so "<= check" is not applicable here sorry this is for source

Yes, one can handle the report smarter here. And in general make this reportSourceLineage to be more accurate -- as we have the information of full list of files here, added a TODO

}
}
}

/**
* Determines whether a file represented by this source is can be split into bundles.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public SplitIntoRangesFn(long desiredBundleSizeBytes) {
@ProcessElement
public void process(ProcessContext c) {
MatchResult.Metadata metadata = c.element().getMetadata();
FileSystems.reportSourceLineage(metadata.resourceId().getCurrentDirectory());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also hit 100 or more dir limit like above in file source. But if this is best effort reporting then that is fine. When combined with limit on lineage (#32650 (comment)) it will be ok. but in that case should we leave it at file level reporting itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for ReadAll and the challenge here is that we do not know how many files it is going to be, and actually here it can be used in a streaming pipeline where file metadata comes in over time. So do not have hint whether report one level above or file path is preferred (like other places)

  • Pro of report one level above:

result still accurate (no entry dropped) when the upstream filepattern is matching, or watching a directory (which is a common use case)

  • Con of report one level above:

less granularity, obviously


  • Pro of report whole path:

More granularity

  • Con of report whole path

More likely to hit metrics limit, and report incomplete list of files

I'm open to both. Personally I prefer the former, the reason is same as the concen in the comments of #32650 , that is, given the limitation of metrics size, I consider the preference is

full information > info with less granularity > has to truncate and report incomplete results

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. +1 to what you have current then. Thank you.

if (!metadata.isReadSeekEfficient()) {
c.output(KV.of(c.element(), new OffsetRange(0, metadata.sizeBytes())));
return;
Expand Down Expand Up @@ -140,7 +141,6 @@ public void process(ProcessContext c) throws IOException {
throw e;
}
}
FileSystems.reportSourceLineage(resourceId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,12 @@ protected String getScheme() {
protected void reportLineage(GcsResourceId resourceId, Lineage lineage) {
GcsPath path = resourceId.getGcsPath();
if (!path.getBucket().isEmpty()) {
lineage.add("gcs", ImmutableList.of(path.getBucket(), path.getObject()));
ImmutableList.Builder<String> segments =
ImmutableList.<String>builder().add(path.getBucket());
if (!path.getObject().isEmpty()) {
segments.add(path.getObject());
}
lineage.add("gcs", segments.build());
} else {
LOG.warn("Report Lineage on relative path {} is unsupported", path.getObject());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.services.storage.model.Objects;
Expand All @@ -38,6 +41,7 @@
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -235,6 +239,20 @@ public void testMatchNonGlobs() throws Exception {
contains(toFilenames(matchResults.get(4)).toArray()));
}

@Test
public void testReportLineageOnBucket() {
verifyLineage("gs://testbucket", ImmutableList.of("testbucket"));
verifyLineage("gs://testbucket/", ImmutableList.of("testbucket"));
verifyLineage("gs://testbucket/foo/bar.txt", ImmutableList.of("testbucket", "foo/bar.txt"));
}

private void verifyLineage(String uri, List<String> expected) {
GcsResourceId path = GcsResourceId.fromGcsPath(GcsPath.fromUri(uri));
Lineage mockLineage = mock(Lineage.class);
gcsFileSystem.reportLineage(path, mockLineage);
verify(mockLineage, times(1)).add("gcs", expected);
}

private StorageObject createStorageObject(String gcsFilename, long fileSize) {
GcsPath gcsPath = GcsPath.fromUri(gcsFilename);
// Google APIs will use null for empty files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,12 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir

@Override
protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey()));
ImmutableList.Builder<String> segments =
ImmutableList.<String>builder().add(resourceId.getBucket());
if (!resourceId.getKey().isEmpty()) {
segments.add(resourceId.getKey());
}
lineage.add("s3", segments.build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -74,6 +75,7 @@
import org.apache.beam.sdk.io.aws.options.S3Options;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -1209,6 +1211,21 @@ public void testWriteAndReadWithS3Options() throws IOException {
open.close();
}

@Test
public void testReportLineageOnBucket() {
verifyLineage("s3://testbucket", ImmutableList.of("testbucket"));
verifyLineage("s3://testbucket/", ImmutableList.of("testbucket"));
verifyLineage("s3://testbucket/foo/bar.txt", ImmutableList.of("testbucket", "foo/bar.txt"));
}

private void verifyLineage(String uri, List<String> expected) {
S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Config("mys3"), client);
S3ResourceId path = S3ResourceId.fromUri(uri);
Lineage mockLineage = mock(Lineage.class);
s3FileSystem.reportLineage(path, mockLineage);
verify(mockLineage, times(1)).add("s3", expected);
}

/** A mockito argument matcher to implement equality on GetObjectMetadataRequest. */
private static class GetObjectMetadataRequestMatcher
implements ArgumentMatcher<GetObjectMetadataRequest> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,12 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir

@Override
protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey()));
ImmutableList.Builder<String> segments =
ImmutableList.<String>builder().add(resourceId.getBucket());
if (!resourceId.getKey().isEmpty()) {
segments.add(resourceId.getKey());
}
lineage.add("s3", segments.build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -55,6 +56,7 @@
import org.apache.beam.sdk.io.aws2.options.S3Options;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -1068,6 +1070,21 @@ public void testWriteAndRead() throws IOException {
open.close();
}

@Test
public void testReportLineageOnBucket() {
verifyLineage("s3://testbucket", ImmutableList.of("testbucket"));
verifyLineage("s3://testbucket/", ImmutableList.of("testbucket"));
verifyLineage("s3://testbucket/foo/bar.txt", ImmutableList.of("testbucket", "foo/bar.txt"));
}

private void verifyLineage(String uri, List<String> expected) {
S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Config("mys3"), client);
S3ResourceId path = S3ResourceId.fromUri(uri);
Lineage mockLineage = mock(Lineage.class);
s3FileSystem.reportLineage(path, mockLineage);
verify(mockLineage, times(1)).add("s3", expected);
}

/** A mockito argument matcher to implement equality on GetHeadObjectRequest. */
private static class GetHeadObjectRequestMatcher implements ArgumentMatcher<HeadObjectRequest> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -51,6 +52,7 @@
import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -338,4 +340,20 @@ public void testMatchNonGlobs() throws Exception {

blobContainerClient.delete();
}

@Test
public void testReportLineageOnBucket() {
verifyLineage("azfs://account/container", ImmutableList.of("account", "container"));
verifyLineage("azfs://account/container/", ImmutableList.of("account", "container"));
verifyLineage(
"azfs://account/container/foo/bar.txt",
ImmutableList.of("account", "container", "foo/bar.txt"));
}

private void verifyLineage(String uri, List<String> expected) {
AzfsResourceId path = AzfsResourceId.fromUri(uri);
Lineage mockLineage = mock(Lineage.class);
azureBlobStoreFileSystem.reportLineage(path, mockLineage);
verify(mockLineage, times(1)).add("abs", expected);
}
}
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/io/aws/s3filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,11 @@ def delete(self, paths):

def report_lineage(self, path, lineage):
try:
components = s3io.parse_s3_path(path, get_account=True)
components = s3io.parse_s3_path(path, object_optional=True)
except ValueError:
# report lineage is fail-safe
return
if len(components) > 1 and components[-1] == '':
# bucket only
components = components[:-1]
lineage.add('s3', *components)
9 changes: 9 additions & 0 deletions sdks/python/apache_beam/io/aws/s3filesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,15 @@ def test_rename(self, unused_mock_arg):
src_dest_pairs = list(zip(sources, destinations))
s3io_mock.rename_files.assert_called_once_with(src_dest_pairs)

def test_lineage(self):
self._verify_lineage("s3://bucket/", ("bucket", ))
self._verify_lineage("s3://bucket/foo/bar.txt", ("bucket", "foo/bar.txt"))

def _verify_lineage(self, uri, expected_segments):
lineage_mock = mock.MagicMock()
self.fs.report_lineage(uri, lineage_mock)
lineage_mock.add.assert_called_once_with("s3", *expected_segments)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
6 changes: 5 additions & 1 deletion sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,12 @@ def delete(self, paths):

def report_lineage(self, path, lineage):
try:
components = blobstorageio.parse_azfs_path(path, get_account=True)
components = blobstorageio.parse_azfs_path(
path, blob_optional=True, get_account=True)
except ValueError:
# report lineage is fail-safe
return
if len(components) > 1 and components[-1] == '':
# bucket only
components = components[:-1]
lineage.add('abs', *components)
12 changes: 12 additions & 0 deletions sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,18 @@ def test_rename(self, unused_mock_blobstorageio):
src_dest_pairs = list(zip(sources, destinations))
blobstorageio_mock.rename_files.assert_called_once_with(src_dest_pairs)

def test_lineage(self):
self._verify_lineage(
"azfs://storageaccount/container/", ("storageaccount", "container"))
self._verify_lineage(
"azfs://storageaccount/container/foo/bar.txt",
("storageaccount", "container", "foo/bar.txt"))

def _verify_lineage(self, uri, expected_segments):
lineage_mock = mock.MagicMock()
self.fs.report_lineage(uri, lineage_mock)
lineage_mock.add.assert_called_once_with("abs", *expected_segments)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
24 changes: 23 additions & 1 deletion sdks/python/apache_beam/io/filebasedsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,31 @@ def _check_state_for_finalize_write(self, writer_results, num_shards):

src_files.append(src)
dst_files.append(dst)
FileSystems.report_sink_lineage(dst)

self._report_sink_lineage(dst_glob, dst_files)
return src_files, dst_files, delete_files, num_skipped

def _report_sink_lineage(self, dst_glob, dst_files):
"""
Report sink Lineage. Report every file if number of files no more than 100,
otherwise only report at directory level.
"""
if len(dst_files) <= 100:
for dst in dst_files:
FileSystems.report_sink_lineage(dst)
else:
dst = dst_glob
# dst_glob has a wildcard for shard number (see _shard_name_template)
sep = dst_glob.find('*')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why *?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a file pattern to match sharded file that contains a wildcard like "prefix-*-of-10000". The wildcard was introduced here:

shard_name_format = shard_name_template.replace(match.group(0), '*')

indeed it's not obvious, added code comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, the the java representation above hide this above.

if sep > 0:
dst = dst[:sep]
try:
dst, _ = FileSystems.split(dst)
except ValueError:
return # lineage report is fail-safe

FileSystems.report_sink_lineage(dst)

@check_accessible(['file_path_prefix'])
def finalize_write(
self, init_result, writer_results, unused_pre_finalize_results):
Expand Down
Loading
Loading