-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix flaky test #30322
fix flaky test #30322
Conversation
Assigning reviewers. If you would like to opt out of this review, comment R: @damccorm added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
TestGetWorkBudgetDistributor getWorkBudgetDistributor) throws InterruptedException { | ||
getWorkBudgetDistributor.waitForBudgetDistribution(); | ||
while (isBudgetRefreshPaused.get()) { | ||
// wait for budget refresh til budget refresh is unpaused. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a little sleep to not burn cpu and slow test down
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also can you improve comment on why this is necessary?
It seems we pause during updating the endpoints. But we set that to false before we trigger the refresher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Friendly ping in case this got lost
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, removed and added sleep.
@Override | ||
public void distributeBudget( | ||
ImmutableCollection<WindmillStreamSender> streams, GetWorkBudget getWorkBudget) { | ||
streams.forEach(stream -> stream.adjustBudget(getWorkBudget.items(), getWorkBudget.bytes())); | ||
getWorkBudgetDistributorTriggered.countDown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this throw if we decrement too much? if so with the schedled test it seems racy if it happens too often. Maybe could check before decrementing here since it is single-threaded distributing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't
from javadoc:
countDown
public void countDown()
Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
If the current count is greater than zero then it is decremented. If the new count is zero then all waiting threads are re-enabled for thread scheduling purposes.
If the current count equals zero then nothing happens.
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html#countDown--
Do you know how to run a test 100x to verify it isn't flaky? I'm not sure if there is a gradle invocation to do that or not. Maybe @Abacn knows |
It's unit test and the command can be this first in https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/build.gradle add a line
(ref: https://stackoverflow.com/questions/29427020/how-to-run-gradle-test-when-all-tests-are-up-to-date); then use a script like
and see the result. The problem is the tests with racing condition tends to be more flaky on GitHub Action than on local machines. This may be because the GHA runner has different load each time run, which affects timing, but the local machine do not |
Thanks! I'll run it and circle back @Abacn is there anyway I can run it in the same environment as GitHub actions? Or simulate the different loads? |
7d9911a
to
70ee9dd
Compare
running this command locally
|
was successful @Abacn |
TestGetWorkBudgetDistributor getWorkBudgetDistributor) throws InterruptedException { | ||
getWorkBudgetDistributor.waitForBudgetDistribution(); | ||
while (isBudgetRefreshPaused.get()) { | ||
// wait for budget refresh til budget refresh is unpaused. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Friendly ping in case this got lost
|
||
// Make sure we are injecting the metadata from smallest to largest. | ||
workerMetadataResponses.stream() | ||
.sorted(Comparator.comparingLong(WorkerMetadataResponse::getMetadataVersion).reversed()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the reversed doesn't make sense with the comment. Is it wrong or can comment be improved to make it clearer what reversing is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
private void waitForWorkerMetadataToBeConsumed( | ||
TestGetWorkBudgetDistributor getWorkBudgetDistributor) throws InterruptedException { | ||
getWorkBudgetDistributor.waitForBudgetDistribution(); | ||
Thread.sleep(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just meant to have a sleep in the loop checking the atomic value.
Since we removed that, the sleep is unnecessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha removed
c4f6d2c
to
572e106
Compare
Removed flaky logic around waiting in tests.
Removed thread.sleep and replaced with triggers via CountDownLatch
R: @scwhittle @Abacn
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.