Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OrderedListState support for SparkRunner #33212

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

twosom
Copy link
Contributor

@twosom twosom commented Nov 25, 2024

Please add a meaningful description for your change here

fixes #33211
fixes #31724
fixes #31723

This PR contains these changes

  • added OrderedList in SparkStateInternals
  • implementation equals and hashcode in FlinkOrderedListState
  • added OrderedListState test
  • fixed FlinkRunner state test

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Copy link

codecov bot commented Nov 25, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 59.01%. Comparing base (e5bd69d) to head (74033fa).

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #33212      +/-   ##
============================================
+ Coverage     57.42%   59.01%   +1.58%     
- Complexity     1475     3162    +1687     
============================================
  Files           970     1136     +166     
  Lines        154525   175078   +20553     
  Branches       1076     3354    +2278     
============================================
+ Hits          88743   103324   +14581     
- Misses        63578    68406    +4828     
- Partials       2204     3348    +1144     
Flag Coverage Δ
java 70.27% <ø> (+1.69%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.


🚨 Try these New Features:

@twosom
Copy link
Contributor Author

twosom commented Dec 3, 2024

Run Flink Container PreCommit

@liferoad
Copy link
Collaborator

liferoad commented Dec 4, 2024

R: @shunping

Copy link
Contributor

github-actions bot commented Dec 4, 2024

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@liferoad
Copy link
Collaborator

liferoad commented Dec 4, 2024

Did you rebase your branch?

@twosom twosom force-pushed the spark-ordered-list-state branch from f9ec51d to 2c06551 Compare December 5, 2024 03:52
@twosom
Copy link
Contributor Author

twosom commented Dec 5, 2024

Did you rebase your branch?

Thanks!

i rebased now!

@twosom
Copy link
Contributor Author

twosom commented Dec 5, 2024

Run Java_Spark3_Versions PreCommit

@liferoad
Copy link
Collaborator

liferoad commented Dec 5, 2024

Please check this:
:runners:spark:3:analyzeClassesDependencies FAILED

@twosom
Copy link
Contributor Author

twosom commented Dec 5, 2024

@liferoad
Sorry, my bad. Fixed the import statement to use Beam's vendored Guava Lists instead of clearspring's.
Thanks

@kennknowles kennknowles self-requested a review December 5, 2024 14:55
@twosom
Copy link
Contributor Author

twosom commented Dec 5, 2024

Run Java PreCommit

1 similar comment
@twosom
Copy link
Contributor Author

twosom commented Dec 15, 2024

Run Java PreCommit

@twosom
Copy link
Contributor Author

twosom commented Dec 19, 2024

Run Java_GCP_IO_Direct PreCommit

@twosom
Copy link
Contributor Author

twosom commented Dec 19, 2024

Run Java PreCommit

CHANGES.md Outdated Show resolved Hide resolved

@Override
public int hashCode() {
int result = namespace.hashCode();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use Objects.hashCode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kennknowles
The current implementation follows the same hashCode semantics used in FlinkStateInternals. I'm a bit unclear whether suggesting Objects.hashCode means we should replace the current implementation, or if you're suggesting that overriding hashCode is unnecessary altogether.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overriding equality and hashcode should always occur together, so it is necessary to override hashCode.

I was just suggesting this pattern instead of doing your own math:

return Objects.hashCode(isSuccess, site, throwable);

}

private SortedMap<Instant, TimestampedValue<T>> readAsMap() {
final List<TimestampedValue<T>> listValues =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for each additional kind of state is to efficiently offer a novel form of a state access. The state access here as the same performance characteristics as ValueState. It is actually better for the runner to reject a pipeline than to run it with performance characteristics that don't match the expected performance contract.

Is there some underlying mechanism in Spark that could implement OrderedListState efficiently and scalably?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your point. Let me share my thoughts on why I chose this implementation.

I've noticed that ListState/OrderedListState is mostly used in situations where writes happen much more frequently than reads. That's why I went with ArrayList instead of SortedMap - it's simply better at handling these frequent writes.

When it comes to reading data, it usually happens in just a couple of scenarios - either during OnTimer execution or when the list hits a certain size. So even if the read performance takes a small hit, it's not really going to affect the overall performance much.

It's also worth mentioning that FlinkOrderedListState uses the same approach, which gives me confidence in this design choice.

That's why I think the current implementation makes more sense for real-world usage patterns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. If Flink is implemented then it is OK with me to follow that precedent. My point was that this does not actually add capability that is more than ValueState provides. It is just a minor API wrapper adjustment - still useful but not the main purpose.

So we can merge with this design. But if you think about following up, here is how we would really like this to behave:

  • add should call some native Spark API that writes the element without reading the list
  • readRange should only read the requested range, ideally seeking in near-constant time (aka without a scan or sort)
  • clearRange should also seek in near-constant time
  • isEmpty should not read the list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kennknowles

Hello. I've been reviewing the Spark documentation over the weekend.

Unfortunately, as far as I can tell, Spark doesn't have the native state API that we're looking for. Moreover, since the current SparkStateInternals is a class implemented independently of the Spark API, it's also difficult to apply RDD-based APIs.

The performance requirements you mentioned seem to be well satisfied by WindmillOrderedList. Given that the current implementation cannot guarantee the same performance, I completely understand if the PR doesn't get merged.

However, regardless of whether this PR gets merged or not, I will continue researching to meet the requirements you've outlined. I plan to explore better implementation approaches, taking inspiration from concepts like pendingAdds and pendingDeletes as used in WindmillOrderedList.

Belated Merry Christmas.

@twosom twosom force-pushed the spark-ordered-list-state branch from bfea9e1 to 9c5d9ba Compare December 20, 2024 14:18
@twosom
Copy link
Contributor Author

twosom commented Dec 20, 2024

Run Java PreCommit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants