Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a new IO named DataLakeIO (#23074) #23075

Closed
wants to merge 2 commits into from

Conversation

zhangt-nhlab
Copy link

We developed a new IO named DataLakeIO, which support beam to read data from data lake (delta, iceberg, hudi), and write data to data lake(delta, icberg, hudi).

Because delta , iceberg and hudi does not provide enough java api to read and write, so we use spark datasouce api to read and write data in DataLakeIO. Therefore, the spark dependencies is needed.

BeamDeltaTest, BeamIcebergTest and BeamHudiTest show how to use the above features.

@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2022

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @kileys for label java.
R: @Abacn for label build.
R: @johnjcasey for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@codecov
Copy link

codecov bot commented Sep 8, 2022

Codecov Report

Merging #23075 (ac21df5) into master (e3ba8d8) will increase coverage by 0.00%.
The diff coverage is n/a.

@@           Coverage Diff           @@
##           master   #23075   +/-   ##
=======================================
  Coverage   73.58%   73.58%           
=======================================
  Files         716      716           
  Lines       95301    95301           
=======================================
+ Hits        70124    70125    +1     
+ Misses      23881    23880    -1     
  Partials     1296     1296           
Flag Coverage Δ
python 83.40% <ø> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/python/apache_beam/utils/interactive_utils.py 95.12% <0.00%> (-2.44%) ⬇️
...hon/apache_beam/runners/worker/bundle_processor.py 93.30% <0.00%> (-0.25%) ⬇️
sdks/go/pkg/beam/util/gcsx/gcs.go 27.41% <0.00%> (ø)
sdks/go/pkg/beam/artifact/stage.go 61.87% <0.00%> (ø)
sdks/go/pkg/beam/io/filesystem/util.go 96.29% <0.00%> (ø)
sdks/go/pkg/beam/io/filesystem/memfs/memory.go 96.15% <0.00%> (ø)
...ks/python/apache_beam/runners/worker/sdk_worker.py 89.09% <0.00%> (+0.15%) ⬆️
sdks/python/apache_beam/runners/direct/executor.py 97.01% <0.00%> (+0.54%) ⬆️
.../python/apache_beam/transforms/periodicsequence.py 100.00% <0.00%> (+1.61%) ⬆️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@zhangt-nhlab
Copy link
Author

@kileys
@Abacn
@johnjcasey

Copy link
Contributor

@johnjcasey johnjcasey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general comment, it may also be worth looking at the in-progress SparkReceiverIO to see if a more generic spark connection can be used.

}
}

private static class ReadFn<ParameterT, OutputT> extends DoFn<ParameterT, OutputT> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we are trying to have all sources implemented using the SplittableDoFn pattern to enable scalability, and are doing our best to not include new sources that are not implemented as SDFs. Can this be re-implemented as an SDF instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll learn SplittableDoFn, then re-implemente it as an SDF instead.

@@ -0,0 +1,54 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you re-work these dependencies to match the pattern used for other IOs? See io/google-cloud-platform/build.gradel for an example.

New dependencies themselves are included buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'll modify it according to this

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @kileys @Abacn @johnjcasey

@Abacn
Copy link
Contributor

Abacn commented Sep 16, 2022

waiting on author

@github-actions
Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.
R: @damccorm for label build.
R: @pabloem for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

Comment on lines +31 to +33
implementation "org.apache.spark:spark-sql_2.12:3.1.2"
implementation "org.apache.spark:spark-core_2.12:3.1.2"
implementation "org.apache.spark:spark-streaming_2.12:3.1.2"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This IO would be really neat and I understand the motivation of using Spark underneath.
Nevertheless, the spark dependency is rather problematic and I'm very concerned about the consequences ...

There's also a Spark runner, which supports both Spark 2.4 and Spark >= 3.1. This IO would certainly conflict with the Spark 2.4 runner. The Spark 3 runner is build in a way that it supports various versions of Spark 3 (the path from 3.1 to 3.3 is full of breaking changes), Spark dependencies are typically provided (as available on the cluster). Even further, Spark comes with a massive tail of dependencies prone to causing conflicts with versions used in Beam.

The one common candidate to mention here is Avro. Spark 3.1 is still using Avro 1.8 matching Beam's version, Spark 3.2 bumps Avro to 1.10 which is incompatible with Beam :/ This kinda exemplifies the maintenance headache ahead.

Have you evaluated any alternative to using Spark underneath?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This IO would be really neat and I understand the motivation of using Spark underneath. Nevertheless, the spark dependency is rather problematic and I'm very concerned about the consequences ...

There's also a Spark runner, which supports both Spark 2.4 and Spark >= 3.1. This IO would certainly conflict with the Spark 2.4 runner. The Spark 3 runner is build in a way that it supports various versions of Spark 3 (the path from 3.1 to 3.3 is full of breaking changes), Spark dependencies are typically provided (as available on the cluster). Even further, Spark comes with a massive tail of dependencies prone to causing conflicts with versions used in Beam.

The one common candidate to mention here is Avro. Spark 3.1 is still using Avro 1.8 matching Beam's version, Spark 3.2 bumps Avro to 1.10 which is incompatible with Beam :/ This kinda exemplifies the maintenance headache ahead.

Have you evaluated any alternative to using Spark underneath?

Your advice is great ! I'll consider it, and then think about other alternatives

Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all - thanks for your contribution!

Before proceeding to review from my side, I'd like to know if there is a design doc or similar for this IO connector? It would be very helpful to understand the goals and the implementation of this connector in advance.

Also, several notes that are worth to mention:

  1. Please, create a new github issue for this feature.
  2. Please, avoid merging a master branch into your feature branch. Use git rebase instead.
  3. Run ./gradlew :sdks:java:io:datalake:check locally before pushing your changes to origin.

You can find a Beam contribution guide here:
https://beam.apache.org/contribute/get-started-contributing/

@zhangt-nhlab
Copy link
Author

First of all - thanks for your contribution!

Before proceeding to review from my side, I'd like to know if there is a design doc or similar for this IO connector? It would be very helpful to understand the goals and the implementation of this connector in advance.

Also, several notes that are worth to mention:

  1. Please, create a new github issue for this feature.
  2. Please, avoid merging a master branch into your feature branch. Use git rebase instead.
  3. Run ./gradlew :sdks:java:io:datalake:check locally before pushing your changes to origin.

You can find a Beam contribution guide here: https://beam.apache.org/contribute/get-started-contributing/

@zhangt-nhlab
Copy link
Author

First of all - thanks for your contribution!
Before proceeding to review from my side, I'd like to know if there is a design doc or similar for this IO connector? It would be very helpful to understand the goals and the implementation of this connector in advance.
Also, several notes that are worth to mention:

  1. Please, create a new github issue for this feature.
  2. Please, avoid merging a master branch into your feature branch. Use git rebase instead.
  3. Run ./gradlew :sdks:java:io:datalake:check locally before pushing your changes to origin.

You can find a Beam contribution guide here: https://beam.apache.org/contribute/get-started-contributing/

Thank you for your reply! I will make my changes, and create a new github issue later.

@aaltay
Copy link
Member

aaltay commented Apr 25, 2024

Was there any progress on getting this IO into Beam?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants