-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
check for cachetoken representing a retry before activating and completing work #29082
Conversation
Assigning reviewers. If you would like to opt out of this review, comment R: @Abacn added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs a rebase also
@@ -83,6 +84,29 @@ static ActiveWorkState forTesting( | |||
return new ActiveWorkState(activeWork, computationStateCache); | |||
} | |||
|
|||
private static Stream<KeyedGetDataRequest> toKeyedGetDataRequestStream( | |||
Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue, Instant refreshDeadline) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have the ShardedKey and the queue be separate parameters?
also could take just an Iterable instead of Deque
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -94,6 +94,7 @@ | |||
import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; | |||
import org.apache.beam.runners.dataflow.worker.streaming.Work; | |||
import org.apache.beam.runners.dataflow.worker.streaming.Work.State; | |||
import org.apache.beam.runners.dataflow.worker.streaming.WorkDedupeKey; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe name WorkIdentifiers? WorkTokens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done change to WorkId
if (queuedWork.getWorkItem().getWorkToken() == work.getWorkItem().getWorkToken()) { | ||
return ActivateWorkResult.DUPLICATE; | ||
// Work is a duplicate. | ||
if (queuedWork.getWorkItem().getCacheToken() == work.getWorkItem().getCacheToken()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be cleaner to use WorkDedupKey or whatever it is renamed and call methods like equals or maybe something like knownEarlierThan
if the cache token is the same, we'd prefer to keep the later work item # since they are increasing from windmill within a worker. Otherwise we can just take the last observed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add the startTime to the WorkDedupeKey/WorkId then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
realized we don't need to do that since Work
has the startTime
|
||
if (completedWork.getWorkItem().getWorkToken() != workToken) { | ||
if (completedWork.getWorkItem().getWorkToken() != workDedupeKey.workToken() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the new class and an equals method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
shardedKey, | ||
workToken, | ||
completedWork.getWorkItem().getWorkToken()); | ||
workDedupeKey.workToken(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add toString for new class and use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done auto value has the out of the box so just printing the object
getStuckCommitsAt(stuckCommitDeadline).entrySet()) { | ||
ShardedKey shardedKey = shardedKeyAndWorkToken.getKey(); | ||
long workToken = shardedKeyAndWorkToken.getValue(); | ||
WorkDedupeKey workDedupeKey = shardedKeyAndWorkToken.getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update variable names as type changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -125,6 +125,11 @@ boolean isStuckCommittingAt(Instant stuckCommitDeadline) { | |||
&& currentState.startTime().isBefore(stuckCommitDeadline); | |||
} | |||
|
|||
boolean isRetryOf(WorkDedupeKey workDedupeKey) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe instead put this method on WorkDedupeKey and have a method vending WorkDedupKey from Work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
.build(); | ||
} | ||
|
||
public static WorkDedupeKey of(Work work) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm if Work changes to have method to return WorkDedupKey
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
ShardedKey shardedKey = shardedKey("someKey", 1L); | ||
|
||
activeWorkState.activateWorkForKey(shardedKey, activeWork); | ||
activeWorkState.activateWorkForKey(shardedKey, nextWork); | ||
activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workTokenToComplete); | ||
activeWorkState.completeWorkAndGetNextWorkForKey( | ||
shardedKey, WorkDedupeKey.of(activeWork.getWorkItem())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we add method to get dedup key on Work this could be simplified, ditto below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Latest changes don't appear to have been pushed |
47dc32a
to
bce2b3d
Compare
|
||
// Work tokens and cache tokens are equal. | ||
if (queuedWork.id().equals(work.id())) { | ||
if (work.newerThan(queuedWork)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If cache token + work token are equal, the queuedWork and new work shoudl be equivalent. Let's just return DUPLICATE in that case.
What I meant with the newer than check is the case where we have equal cache tokens but different work tokens. Since the equal cache tokens implies it was from the same worker, and workers give out increasing work tokens we can just keep the work item with the higher work token.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to clarify:
existingWorkToken
==newWorkToken
&&existingCacheToken
==newCacheToken
: return DUPLICATEexistingWorkToken
>newWorkToken
&&existingCacheToken
==cacheToken2
: ignoreexistingWorkToken
<newWorkToken
&&existingCacheToken
==cacheToken2
: return QUEUED (queue newWorkToken, removeExistingWorkToken if it is not active)existingWorkToken
==newWorkToken
&&existingCacheToken
!=newCacheToken
: return QUEUED (queue newWorkToken, removeExistingWorkToken if it is not active)
is this the behavior we want? If it is equal cache tokens, but different work tokens what does that mean? Same worker with different work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the listed are correct (maybe return STALE in the ignore case?).
If it is equal cache tokens, but different work tokens what does that mean? Same worker with different work?
It means that the the user worker may be observing a stale retry of work previously sent by the windmill worker (if new work token is less or equal to existing work) or it might mean the existing item was a retry of a work item that has since committed and the newly arriving work item for the key is the now active item.
In general, if we get different cache tokens we don't know how the previous work relates to the existing work. We could guess based upon observed ordering but since the requests could be received out of worker if there were multiple windmill workers who sent work, it seems safer to queue in that case.
There is separate work ongoing to use heartbeats to identify work items that are no longer valid and stop processing them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack done
|
||
// Retries have the same work token, but different cache tokens. | ||
if (work.isRetryOf(queuedWork)) { | ||
workQueueIterator.remove(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for these cases we're removing, I'm not sure we should remove if the iterator is the first in the queue. In that case it is the active item, removing it isn't actually terminating the active work (we may add support for this in the future in which case we can improve handling here). And removing it will just lead to confusing logs below about tokens mismatching when the active finishes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added check done
@Nullable Queue<Work> workQueue = activeWork.get(shardedKey); | ||
if (workQueue == null) { | ||
// Work may have been completed due to clearing of stuck commits. | ||
LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workToken); | ||
LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: replace token with ids
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
// avoid Preconditions.checkState here to prevent eagerly evaluating the | ||
// format string parameters for the error message. | ||
Work completedWork = | ||
Work workToComplete = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think the previous name was better since the work has already completed user processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
if (completedWork.getWorkItem().getWorkToken() != workToken) { | ||
if (!workToComplete.id().isForSameWork(workId) || workToComplete.id().isRetryOf(workId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just use !equals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
Instant stuckCommitDeadline, BiConsumer<ShardedKey, Long> shardedKeyAndWorkTokenConsumer) { | ||
for (Entry<ShardedKey, Long> shardedKeyAndWorkToken : | ||
Instant stuckCommitDeadline, BiConsumer<ShardedKey, WorkId> shardedKeyAndWorkIdConsumer) { | ||
for (Entry<ShardedKey, WorkId> shardedKeyAndWorkToken : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: update variable name to id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
return id.isRetryOf(other.id); | ||
} | ||
|
||
boolean newerThan(Work other) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see other comment, I was thinking of this as a WorkId method
boolean knownObsoletedBy(WorkId other) {
return other.cacheToken == this.cacheToken && workToken < other.cacheToken;
}
in that case I think the forwarding Work.isRetryOf methods could be removed and just use id().isRetryOf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workToken < other.workToken?
/** | ||
* A composite key used to identify a unit of {@link Work}. If multiple units of {@link Work} have | ||
* the same workToken AND cacheToken, the {@link Work} is a duplicate. If multiple units of {@link | ||
* Work} have the same workToken, but different cacheTokens, the {@link Work} is a retry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can document the cacheToken same, workToken different as known obsolete in comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
return other.workToken() == workToken() && other.cacheToken() != cacheToken(); | ||
} | ||
|
||
boolean isForSameWork(WorkId other) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove? seems like accessors can just be called
also we have some weird cases (backend truncation of work item) where the work token can be the same but the work is not entirely the same if the cache token is changed so the method name is perhaps confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
7847c40
to
3af9e61
Compare
sgtm i think i got confused about the recommendations :) will keep them at seperate commits from now on |
3af9e61
to
9665150
Compare
.map( | ||
work -> | ||
Windmill.KeyedGetDataRequest.newBuilder() | ||
.setKey(shardedKey.key()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can rm the key, sharding key is sufficient and cheaper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could do separately (and possibly in other places where we make reads or commits) since this is just moved
@@ -83,6 +85,33 @@ static ActiveWorkState forTesting( | |||
return new ActiveWorkState(activeWork, computationStateCache); | |||
} | |||
|
|||
private static Stream<KeyedGetDataRequest> toKeyedGetDataRequests( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is just moved, but since further from context now how about a better name capturing that this is for heartbeats
makeHeartbeatKeyedGetDataRequests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you missing a push? not seeing this change and others below
.map( | ||
work -> | ||
Windmill.KeyedGetDataRequest.newBuilder() | ||
.setKey(shardedKey.key()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could do separately (and possibly in other places where we make reads or commits) since this is just moved
return ActivateWorkResult.STALE; | ||
} | ||
} else if (queuedWork.id().workToken() == work.id().workToken()) { | ||
if (queuedWork.id().cacheToken() != work.id().cacheToken()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is known true from previous statement in else if chain
I think the case
queuedWork.id().workToken() == work.id().workToken()
could just be removed though and fall through to the bottom. If the cache tokens are different we're not actually sure which is the valid item in the backend. And in that case it is safer to keep both since then we will eventually complete the right one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} else if (queuedWork.id().cacheToken() == work.id().cacheToken()) { | ||
if (work.id().workToken() > queuedWork.id().workToken()) { | ||
removeIfNotActive(queuedWork, workIterator, workQueue); | ||
workQueue.addLast(work); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could instead continue, it might be able to remove other queued items
for example if all same cache token,
[1 active] [2 queued]
if
[3] arrives
it would currently not remove [1] since it's active but by returning we aren't removing [2]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -147,16 +199,19 @@ private synchronized void removeCompletedWorkFromQueue( | |||
() -> | |||
new IllegalStateException( | |||
String.format( | |||
"Active key %s without work, expected token %d", | |||
shardedKey, workToken))); | |||
"Active key %s without work, expected work_token %d, expected cache_token %d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log workId instead of separate fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -2725,14 +2725,14 @@ public void testActiveWorkForShardedKeys() throws Exception { | |||
|
|||
// Verify a different shard of key is a separate queue. | |||
Work m4 = createMockWork(3); | |||
assertFalse(computationState.activateWork(key1Shard1, m4)); | |||
assertTrue(computationState.activateWork(key1Shard1, m4)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did this change? it seems like it should be duplicate of m3. Think this might be related to early return I commented on in code.
.build(); | ||
} | ||
|
||
private static WorkId workDedupeToken(long workToken, long cacheToken) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: name workId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Reminder, please take a look at this pr: @Abacn |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @Abacn added as fallback since no labels match configuration Available commands:
|
waiting on author |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
@m-trieu Can we rebase and get this in? I still think it would be beneficial, it just fell off the radar. |
76a6e20
to
91b3e99
Compare
@scwhittle ready for another look Thanks! |
} | ||
|
||
workQueue.removeIf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't updating the activeworkbudget for the removals
see/share removal logic with the failing for heartbeats
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
workQueue.removeIf( | ||
queuedWork -> | ||
queuedWorkToRemove.contains(queuedWork.id()) && !queuedWork.equals(workQueue.peek())); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't add a comment on the right line because github is stupid, but can you replace FailedTokens below with the new WorkId class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// This set is for adding remove-able WorkItems if they exist in the workQueue. We add them to | ||
// this set since a ConcurrentModificationException will be thrown if we modify the workQueue | ||
// and then resume iteration. | ||
Set<WorkId> queuedWorkToRemove = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of a set and then removeif, how about doing in a single pass with an iterator and Iterator.remove()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Instead of just checking a
Work
/WorkItem
'sworkToken
to detect duplicates, also check thecacheToken
.Change behavior of
ActiveWorkState#activateWork
to only returnActivateWorkResult.DUPLICATE
if both work and cache tokens are the same. Else if the work tokens match but the cache tokens do not, evict the currently queued work and add the newer retry of the work to the work queue.more context:
r: @scwhittle
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.