-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
break nested classes out of StreamingDataflowWorker #28537
Conversation
Can you separate the independent bits into separate commits? (can still be one PR) |
fc2d91f
to
6a4c788
Compare
@kennknowles I split the commits, will do this ahead next time. Thanks! |
Assigning reviewers. If you would like to opt out of this review, comment R: @AnandInguva added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
return Transport.getJsonFactory().fromString(input, MapTask.class); | ||
} | ||
|
||
public static void main(String[] args) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kennknowles out of curiosity, do you happen to know where this is referenced? I was looking for it in a gradle file, but couldn't find it and not sure if its even referenced in those.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is referenced in dataflow internal launching of the Dataflow v1 Java worker (not in github repo).
With beam portability, the runner specifics like this are behind the FnApi but for the v1 harness the dataflow details leaked some. They were initially separate repositories but we kept having jar compatibility issues so we moved it to the external beam repository and single jar.
r: @scwhittle |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
||
/** Bounded set of queues, with a maximum total weight. */ | ||
public class WeightedBoundedQueue<V> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to add some tests for this now that it's visible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
} | ||
|
||
/** Returns the current weight of the queue. */ | ||
public int weight() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this method name and size() aren't particularly obvious, what do you think?
maybe
queuedElementsWeight()
queuedElementsCount()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
? ImmutableMap.copyOf(transformUserNameToStateFamily) | ||
: ImmutableMap.of(); | ||
this.computationStateCache = computationStateCache; | ||
Preconditions.checkNotNull(mapTask.getStageName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto (also if kept put the preconditions at the top?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved, mapTask
is not nullable, but unfortunately systemName and stageName fields in map task are
this.mapTask = mapTask; | ||
this.executor = executor; | ||
this.transformUserNameToStateFamily = | ||
transformUserNameToStateFamily != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add @ Nullable annotation or remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
return executionStateQueue; | ||
} | ||
|
||
/** Mark the given shardedKey and work as active. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and schedules execution of work if there is no active work for the shardedKey already processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
static final int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB | ||
static final int NUM_COMMIT_STREAMS = 1; | ||
static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3; | ||
static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the other constants be moved to the top here?
(if not, what is the motivation for moving some of them?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think either the google java linter style or the spotless apply linter organizes constants in public, protected package private, then private order.
this was applied by spotless apply or google java formatter
} | ||
|
||
private static final Random clientIdGenerator = new Random(); | ||
final WindmillStateCache stateCache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be private final? and stay in same location?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is accessed in the StreamingDataflowWorkerTest.testMergeWindowsCaching, i can mark it as @VisibleForTesting
?
I plan to do some breakup/cleanup of StreamingDataflowWorker in a later CL. If we opt to use dependency injection and inject the cache for tests, we can make all of this private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking we can eventually have either StreamingApplianceDataflowWorker or StreamingEngineDataflowWorker (depending on the StreamingDataflowWorkerOptions). StreamingDataflowWorker can instantiate either one depending on the passed in options, and we can have different tests for each.
We can inject all the dependencies into the constructors and have some static factory class/methods to create the shapes we want (for test or not etc).
Future code changes won't ever touch the explicit code paths/classes for appliance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also for my own knowledge, why don't we use Guice or Dagger dependency injection (if we want to avoid the runtime overhead)? @scwhittle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kennknowles
I'm not very aware of Java ecosystem and didn't originally write these files. I'm not sure if either of those frameworks are used in Beam but I don't know if that was a conscious choice.
// is enabled. | ||
private ScheduledExecutorService globalConfigRefreshTimer; | ||
// Periodic sender of debug information to the debug capture service. | ||
private DebugCapture.Manager debugCaptureManager = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can remove = null, mark it nullable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
workIdBuilder.append('-'); | ||
workIdBuilder.append(Long.toHexString(workItem.getWorkToken())); | ||
DataflowWorkerLoggingMDC.setWorkId(workIdBuilder.toString()); | ||
String workIdBuilder = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer a builder. Does Java optimize repeated string additions better now? IIRC we maybe saw cpu or objects on a profile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe StringBuilder is very useful when concatenating in a loop, for simple concatenation the compiler will optimize for us. https://dzone.com/articles/string-concatenation-performacne-improvement-in-ja
@@ -1393,7 +1367,7 @@ private Commit batchCommitsToStream(CommitWorkStream commitStream) { | |||
Commit commit; | |||
try { | |||
if (commits < 5) { | |||
commit = commitQueue.poll(10 - 2 * commits, TimeUnit.MILLISECONDS); | |||
commit = commitQueue.poll(10 - 2L * commits, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is L required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to avoid integer multiplication implicitly cast to long (complaint from the static checker)
b8229e0
to
766b440
Compare
} | ||
|
||
private static final Random clientIdGenerator = new Random(); | ||
final WindmillStateCache stateCache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kennknowles
I'm not very aware of Java ecosystem and didn't originally write these files. I'm not sure if either of those frameworks are used in Beam but I don't know if that was a conscious choice.
assertEquals(MAX_WEIGHT, queue.queuedElementsWeight()); | ||
assertEquals(1, queue.size()); | ||
|
||
// Have another thread poll the queue, pulling off the only value inside and freeing up the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can just poll directly from this thread instead of via a new thread, easier to verify result etc then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); | ||
|
||
// Insert value that takes all the capacity into the queue. | ||
Thread thread1 = new Thread(() -> queue.put(MAX_WEIGHT)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can just put directly from the test thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
// Try to insert another value into the queue. This will block since there is no capacity in the | ||
// queue. | ||
Thread thread2 = new Thread(() -> queue.put(MAX_WEIGHT)); | ||
thread2.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sleep 100ms? just to give some time for thread to get into blocking put (test will pass either way so shouldn't add flakiness)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
WeightedBoundedQueue<Integer> queue = | ||
WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); | ||
|
||
assertNull(queue.poll()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add test of poll with timeout method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
throw new RuntimeException(e); | ||
} | ||
}); | ||
takeThread.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a sleep to give chance for it to start blocking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
} | ||
|
||
@Test | ||
public void testTake() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a variant where you poll with very large timeout and ensure it gets the element
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done used polling at 1 minute, inserting element at 30 seconds, so poll should return the element.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I just meant something like testPoll_withTimeout with a large timeout we never expected to get hit. You could remove testPoll_withLargeTimeout since I don't think it adds much beyond withTimeout and theses tests don't seem to run with much parallelism and we don't want to slow the build down too much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg removed!
* Marks the work for the given shardedKey as complete. Schedules queued work for the key if any. | ||
*/ | ||
public void completeWork(ShardedKey shardedKey, long workToken) { | ||
Work nextWork; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping on this and some other uncompleted comments
766b440
to
6e2aba8
Compare
@@ -125,23 +126,28 @@ public boolean activateWork(ShardedKey shardedKey, Work work) { | |||
/** | |||
* Marks the work for the given shardedKey as complete. Schedules queued work for the key if any. | |||
*/ | |||
public void completeWork(ShardedKey shardedKey, long workToken) { | |||
Work nextWork; | |||
public void completeWorkAndScheduleNextWork(ShardedKey shardedKey, long workToken) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe CompleteWorkAndScheduleNextWorkForKey
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
synchronized (activeWork) { | ||
Queue<Work> queue = activeWork.get(shardedKey); | ||
if (queue == null) { | ||
Queue<Work> workQueue = activeWork.get(shardedKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nullable annotation or optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made optional and moved into ActiveWorkState + tests
} | ||
|
||
@Test | ||
public void testTake() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I just meant something like testPoll_withTimeout with a large timeout we never expected to get hit. You could remove testPoll_withLargeTimeout since I don't think it adds much beyond withTimeout and theses tests don't seem to run with much parallelism and we don't want to slow the build down too much.
}); | ||
takeThread.start(); | ||
|
||
Thread putThread = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove putThread, can just be inlined to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -31,7 +31,10 @@ public abstract class NameContext { | |||
* systemName} and a {@code userName}. | |||
*/ | |||
public static NameContext create( | |||
String stageName, String originalName, String systemName, String userName) { | |||
String stageName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping, I realized this is just method and not members generated by autovalue below. But can stageName be non-null below?
|
||
@Override | ||
public void close() throws Exception { | ||
ExecutionState executionState; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping
|
||
import org.apache.beam.runners.dataflow.worker.windmill.Windmill; | ||
|
||
public class KeyCommitTooLargeException extends Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping
if (keyToDisplay.size() > 100) { | ||
keyToDisplay = keyToDisplay.substring(0, 100); | ||
} | ||
return String.format("%016x-%s", shardingKey(), TextFormat.escapeBytes(keyToDisplay)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping
return counterUpdates; | ||
} | ||
|
||
public String getStageName() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping
"Active key %s without work, expected token %d", | ||
shardedKey, workToken))); | ||
|
||
if (completedWork.getWorkItem().getWorkToken() != workToken) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should be comparing the workToken and cacheToken
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline, will follow up in next cl
|
||
// Ensure we don't already have this work token queued. | ||
for (Work queuedWork : workQueue) { | ||
if (queuedWork.getWorkItem().getWorkToken() == work.getWorkItem().getWorkToken()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the cache token is different then it isn't a pure duplicate it could be a retry, in that case it might be better to take the more recent observed item (guessing it is more likely the newer one) or keep both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm, I don't think this is currently done.
do we want to queue the retry then? What happens if we process both, would Windmill make sure only 1 is committed w/ the exactly once processing?
in more detail if we take the most recent observed item, then would we have to remove the older Work(sameWorkToken, existingCacheToken) from the queue, and add the more recent or passed in Work(sameWorkToken, newCacheToken) to the queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline, will follow up in next cl
.build()); | ||
} | ||
|
||
synchronized CommitsPendingCountAndActiveWorkStatus getPendingCommitsAndPrintActiveWorkAt( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about just changing this to printTo(PrintWriter writer) and doing the printing here?
Or just returning a String that we print?
I don't think we need to expose this for testing (don't see test anyway yet) and the current logic is a bit spread out with MAX_PRINTABLE_COMMIT_PENDING_KEYS used here when building and in printing. The header for the table is also separated from the population of the table here. Just having all of that in a single method instead of introducing CommitsPendingCountAndActiveWorkStatus seems cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
execute(work); | ||
return true; | ||
} | ||
// This will never happen, the switch is exhaustive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move inside default case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -120,6 +120,11 @@ public Collection<Windmill.LatencyAttribution> getLatencyAttributions() { | |||
return list; | |||
} | |||
|
|||
boolean isStuckAt(Instant stuckCommitDeadline) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isStuckCommittingAt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
} | ||
|
||
@Test | ||
public void testActivateWorkForKey_EXECUTE_emptyWorkQueueForKey() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems odd to me because I don't think this is a legal state for the class. If somethig is active it is in the queue, if it becomes active and there is nothing it should clean itself up.
It seems like it would be better to test through the interface of the class instead of injecting internal state members, maybe verifying internal state but not directly manipulating it from the test.
For example here you coudl do the above, add an item, activate it, complete it, verify that the key doesn't exist in the internal map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done im also going to remove all of the tests that we can't "simulate" with the external API. changes the test map to readonly as well.
ShardedKey shardedKey = shardedKey("someKey", 1L); | ||
Deque<Work> workQueue = new ArrayDeque<>(); | ||
workQueue.addLast(workInQueue); | ||
activeWork.put(shardedKey, workQueue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, this seems like it should just be combined with the above QUEUED test which sets up the right state, instead of hard-coding the state here which might not match the actual implementation.
If there was a bug and queueing the work returned QUEUED but didn't actually put it in the internal state, then the above test would pass and this test would pass.
Ditto for the below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
175900c
to
ce0ada3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, minor test improvement but otherwise feel free to ask someone else for review/merge.
shardedKey, workToBeCompleted.getWorkItem().getWorkToken()); | ||
|
||
assertTrue(nextWorkOpt.isPresent()); | ||
assertSame(nextWork, nextWorkOpt.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be good to then complete this returned work and verify that works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Use ActiveWorkState class instead of an activeWorkMap in ComputationState
ce0ada3
to
89b946a
Compare
Thanks for reviewing @scwhittle @kennknowles ! Thank you! |
StreamingDataflowWorker file is quite large. Opting to break out the inner static classes to a top level and introduce /streaming directory under the worker/ directory.
There will be future changes to these classes to support direct path, and this change will make the files easier to work with.
includes some formatting changes required by spotless
Slowly switching to java.util.Optional instead of the google Optional.
r: @kennknowles
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.