Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report File Lineage on directory #32662

Merged
merged 4 commits into from
Oct 8, 2024
Merged

Report File Lineage on directory #32662

merged 4 commits into from
Oct 8, 2024

Conversation

Abacn
Copy link
Contributor

@Abacn Abacn commented Oct 4, 2024

supercedes #32642

  • FileBasedSource will report every read-in file if number of file <= 100

  • FileBasedSource will report every unique directory (one level up) if number of file > 100 but number of unique directory <= 100

  • FileBasedSource will report bucket otherwise

  • ReadAll will report every file if number of file <= 100

  • ReadAll will report bucket otherwise

  • FileBasedSink will report every write-to file if shards <= 100

  • FileBasedSink (for each destination) will report single directory to write if number of file > 100

In contrast to read, we are able to report single directory because shards are in the same directory.

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@Abacn
Copy link
Contributor Author

Abacn commented Oct 4, 2024

Python:

tested with simple pipeline

p | ReadFromText(file_pattern='gs://dataflow-samples/shakespeare/kinghenry*') \
       | WriteToText(file_path_prefix='gs://clouddfe-yihu-test/tmp/fileiolineage')

query lineage on pipeline result:

{'gcs:dataflow-samples.`shakespeare/kinghenryv.txt`', 'gcs:dataflow-samples.`shakespeare/kinghenryviii.txt`'}
{'gcs:clouddfe-yihu-test.tmp/fileiolineage-00000-of-00001'}

Java, same pipeline:

p.apply(TextIO.read().from("gs://dataflow-samples/shakespeare/kinghenry*"))
        .apply(TextIO.write().to("gs://clouddfe-yihu-test/tmp/fileio-lineage"));

query lineage on pipeline result:

[gcs:dataflow-samples.`shakespeare/kinghenryv.txt`, gcs:dataflow-samples.`shakespeare/kinghenryviii.txt`]
[gcs:clouddfe-yihu-test.tmp/fileio-lineage-00002-of-00004, gcs:clouddfe-yihu-test.tmp/fileio-lineage-00003-of-00004, gcs:clouddfe-yihu-test.tmp/fileio-lineage-00000-of-00004, gcs:clouddfe-yihu-test.tmp/fileio-lineage-00001-of-00004]

@Abacn Abacn marked this pull request as ready for review October 4, 2024 16:58
Copy link
Contributor

github-actions bot commented Oct 4, 2024

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@Abacn Abacn mentioned this pull request Oct 4, 2024
3 tasks
Copy link
Contributor

@rohitsinha54 rohitsinha54 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some early comments. Still reviewing the whole PR.

Thanks for the prompt PR.

* report at directory level.
*/
private void reportSinkLineage(List<KV<FileResult<DestinationT>, ResourceId>> outputFilenames) {
if (outputFilenames.size() <= 100) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason to pick 100? It it is just a good average number then that is fine too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I admit it's a hardcoded number which is not ideal generally. The consideration here is that I wish to not introduce functionality regression (i.e. report one level above) from 2.59.0 for use cases of small number of files, while we have to do that in general to mitigate the metrics size limit.

Speaking about the number, finalizeDestination is called in code paths

  • Without dynamic destination, finalizeDestination is called once, on single worker.

  • With dynamic destination, finalizeDestination is called by finalizeAllDestination, per destination.

In the first case, it is indeed safe to set a higher number, generally (however if the filepath length is very long, it will still trigger the set size truncation (#32650)

In the second case, when there are lots of destinations, and even single destination only report full number of files when shardsize <= 100, it may still trigger #32650

That being said there is no good choice to this number that can prevent #32650 being triggered in all use cases, but I think it is good enough to avoid the functionality regression for relevant use cases (small shards, when the whole path is mostly useful)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Picking a good estimate number sounds totally reasonable. Thank you. For completeness of discussion:

The consideration here is that I wish to not introduce functionality regression (i.e. report one level above) from 2.59.0

Since lineage is not a launched feature we do not need to worry about regression. For cases where jobs running on 2.59.0 SDK are currently emitting full path and will switch to reporting some aggregate is fine because these metrics are not consumed by anyone as of now. Dataflow service is not reporting these as lineage and cannot until the job is restarted with lineage enabled flag.

100 sounds ok to me for now.

In general I am wondering if we can make it configurable i.e. customer who use FileIOs can specify at what n we should roll up to report at higher level i.e. if number of files is >= n report dir if number of dir is >= n report bucket. We do not need to do this now but can do in future releases with more considerations.

}
} else {
for (Metadata metadata : expandedFiles) {
FileSystems.reportSourceLineage(metadata.resourceId().getCurrentDirectory());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that one level up dir is above 100?

say for example
a/b/**/*.avro
will match all avro files under a/b/ at any level.
a/b/1/1.avro
a/b/2/2.avro
..
a/b/100/100.avro
a/b/101/101.avro

will lead to more than 100 dirs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is absolutely possible, then it relies on #32650 to guard the stringset size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the <= check here too and if > roll up one level and see if we find common ancestors in dir path where the path reported will remain less than 100 and we recursively check this till we reach the root of shortest path in the list? (just a suggestion, keeping it as is also fine).

Copy link
Contributor Author

@Abacn Abacn Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see https://github.com/apache/beam/pull/32662/files#r1790385413 in readAll we do not know in advance the number of files, so "<= check" is not applicable here sorry this is for source

Yes, one can handle the report smarter here. And in general make this reportSourceLineage to be more accurate -- as we have the information of full list of files here, added a TODO

@@ -88,6 +88,7 @@ public SplitIntoRangesFn(long desiredBundleSizeBytes) {
@ProcessElement
public void process(ProcessContext c) {
MatchResult.Metadata metadata = c.element().getMetadata();
FileSystems.reportSourceLineage(metadata.resourceId().getCurrentDirectory());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also hit 100 or more dir limit like above in file source. But if this is best effort reporting then that is fine. When combined with limit on lineage (#32650 (comment)) it will be ok. but in that case should we leave it at file level reporting itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for ReadAll and the challenge here is that we do not know how many files it is going to be, and actually here it can be used in a streaming pipeline where file metadata comes in over time. So do not have hint whether report one level above or file path is preferred (like other places)

  • Pro of report one level above:

result still accurate (no entry dropped) when the upstream filepattern is matching, or watching a directory (which is a common use case)

  • Con of report one level above:

less granularity, obviously


  • Pro of report whole path:

More granularity

  • Con of report whole path

More likely to hit metrics limit, and report incomplete list of files

I'm open to both. Personally I prefer the former, the reason is same as the concen in the comments of #32650 , that is, given the limitation of metrics size, I consider the preference is

full information > info with less granularity > has to truncate and report incomplete results

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. +1 to what you have current then. Thank you.

* report at directory level.
*/
private void reportSinkLineage(List<KV<FileResult<DestinationT>, ResourceId>> outputFilenames) {
if (outputFilenames.size() <= 100) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Picking a good estimate number sounds totally reasonable. Thank you. For completeness of discussion:

The consideration here is that I wish to not introduce functionality regression (i.e. report one level above) from 2.59.0

Since lineage is not a launched feature we do not need to worry about regression. For cases where jobs running on 2.59.0 SDK are currently emitting full path and will switch to reporting some aggregate is fine because these metrics are not consumed by anyone as of now. Dataflow service is not reporting these as lineage and cannot until the job is restarted with lineage enabled flag.

100 sounds ok to me for now.

In general I am wondering if we can make it configurable i.e. customer who use FileIOs can specify at what n we should roll up to report at higher level i.e. if number of files is >= n report dir if number of dir is >= n report bucket. We do not need to do this now but can do in future releases with more considerations.

}
} else {
for (Metadata metadata : expandedFiles) {
FileSystems.reportSourceLineage(metadata.resourceId().getCurrentDirectory());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the <= check here too and if > roll up one level and see if we find common ancestors in dir path where the path reported will remain less than 100 and we recursively check this till we reach the root of shortest path in the list? (just a suggestion, keeping it as is also fine).

@@ -88,6 +88,7 @@ public SplitIntoRangesFn(long desiredBundleSizeBytes) {
@ProcessElement
public void process(ProcessContext c) {
MatchResult.Metadata metadata = c.element().getMetadata();
FileSystems.reportSourceLineage(metadata.resourceId().getCurrentDirectory());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. +1 to what you have current then. Thank you.

FileSystems.report_sink_lineage(dst)
else:
dst = dst_glob
sep = dst_glob.find('*')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why *?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a file pattern to match sharded file that contains a wildcard like "prefix-*-of-10000". The wildcard was introduced here:

shard_name_format = shard_name_template.replace(match.group(0), '*')

indeed it's not obvious, added code comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, the the java representation above hide this above.

Lineage.query(result.metrics(), Lineage.Type.SINK));
} else {
assertEquals(
Lineage.query(result.metrics(), Lineage.Type.SOURCE).isEmpty(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of V1 wouldn't this just test equality of F == F and later after service side change for v2 same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. After review I think we can keep TextIOIT untouched, let me testing with TextIOIT-manyfiles test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confirm that this does not need to change. Ran TextIOIT-manyfiles and test passed the original assert

FileSystems.report_sink_lineage(dst)
else:
dst = dst_glob
sep = dst_glob.find('*')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, the the java representation above hide this above.

@rohitsinha54
Copy link
Contributor

rohitsinha54 commented Oct 8, 2024

I think we should make FileIO reported lineage fit in limit by itself without dependency on StringSet metric type enforcing a limit as there is no clean and short way to do so see comment: #32650 (comment)

We need to handle cases

  • Write: Sharded Files: This is already handled in this PR. When sharded file size is greater than 100 we report the directory above (guaranteed to be common).
  • Read: Wildcard and large number of files under a dir: For both these cases we can take a simple approach for now. We look at files if > 100 then we look one level up. We can do either of these approach based on your preference
    a) if unique dir > 100 we report bucket only (we avoid traversing further up to find a common ancestor under limit 100) if < 100 we report the dir path. b) we report on level up directory only if it is common ancestor (a common case when reading from a specific dir which has many files <mypath>/*txt) for all paths else we report bucket.

This gives us accurate visibility in lineage in most cases except when files are spread across many folders at different level in which case we only get to know bucket which is fine. On customer demand we can improve this in later releases.

(cc: @robertwb to keep in loop.)

@github-actions github-actions bot added the tests label Oct 8, 2024
FileSystems.reportSourceLineage(dir, LineageLevel.TOP_LEVEL);
break;
}
FileSystems.reportSourceLineage(dir);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesnt this mean that when unique dir > 100 you end up reporting first 100 dire path and then when you see 101 you report the top level leading to mix of dir path reporting and bucket reporting which can be confusing.
I think a consistent approach will be report dir only if unique dirs < 100 for all known files else report bucket only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still working on the branch actively. Now fixed. Will ask for PTAL when test passes.

@Abacn Abacn force-pushed the dirlineage branch 4 times, most recently from 993526a to bedc919 Compare October 8, 2024 21:14
Copy link
Contributor

@rohitsinha54 rohitsinha54 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. LGTM.

@Abacn Abacn merged commit 1479362 into apache:master Oct 8, 2024
116 checks passed
@Abacn Abacn deleted the dirlineage branch October 8, 2024 23:51
Abacn added a commit to Abacn/beam that referenced this pull request Oct 8, 2024
* Report File Lineage on directory

* added comments, restore lineage assert in TextIOIT

* Report bucket level Lineage for files larger than 100

* fix lint
damccorm pushed a commit that referenced this pull request Oct 9, 2024
* Report File Lineage on directory

* added comments, restore lineage assert in TextIOIT

* Report bucket level Lineage for files larger than 100

* fix lint
reeba212 pushed a commit to reeba212/beam that referenced this pull request Dec 4, 2024
* Report File Lineage on directory

* added comments, restore lineage assert in TextIOIT

* Report bucket level Lineage for files larger than 100

* fix lint
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants