Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add interfaces for direct path, and StreamingEngineClient #28835

Merged
merged 8 commits into from
Nov 30, 2023

Conversation

m-trieu
Copy link
Contributor

@m-trieu m-trieu commented Oct 4, 2023

Add interfaces/classes for direct path:

ProcessWorkItemClient
Exposed to WorkItemProcessor to give access to route GetData and CommitWork stream RPCs to the same workers where GetWork was called (currently StreamingDataflowWorker#process).

WorkItemProcessor
Replaces WorkItemReceiver, same but takes and exposes ProcessWorkItemClient instead of WorkItem. Since ProcessWorkItemClient needs a way to get data for work, refresh work, get side input data, and commit work, the place where its created (GrpcGetWorkStream) needs to be modified to accept GetDataStream (for keyed/state data), GetDataStream (global side input data), and a CommitWorkStream.

GetWorkBudget
A struct to model item and byte budgets for how much work a user worker can handle. This is passed in GetWorkRequest(s) to Windmill to control how many items/bytes of Work is returned.

GetWorkBudgetDistributor
Given a set of WindmillStreamSender(s) and GetWorkBudget, distributes the budgets to the WindmillStreamSender(s) in some manner.

EvenGetWorkBudgetDistributor
GetWorkBudgetDistributor implementation that distributes the budget evenly

WindmillStreamSender
When the Grpc*Stream(s) are created, they immediately start the underlying grpc stream (startStream is called, and has protected access). To be able to assign budgets and get the streams ready to be started (similar to GetWorkClientSender), WindmillStreamSender wraps the 3 WorkItem API RPC streams, and exposes a startStream, and closeAllStreams to manage the underlying streams. Once the streams are started they are cached (via thread safe memoization). Once certain endpoints are stale, the closeAllStreams gives a way to close all of the underlying streams un a WindmillStreamSender instance. This will also be an interface to adjust budget in the GetWorkStream as well as track windmill/backend worker backlog in the future.

DispatcherClient
Manages/vends out stubs and the dispatcher
Thread safe via synchronization on reads and writes.

Add StreamingEngineClient

  • Manages the available backend Windmill workers via GetWorkerMetadata. We never close this stream. WorkerMetadata updates are then submitted to a single threaded executor which will consume it, and update StreamingEngineClient internal connections state
  • Given a total budget, divides it amongst the available backend Windmill workers (represented as Endpoints, Connections, and WindmillStreamSenders) starts GetWorkStream(s). Closes streams via WindmillStreamSender#closeAllStreams when the endpoint for the stream is not available in updated worker metadata.
  • Contains single threaded executor for triggered budget refreshes. Budget refreshes are triggered when new worker metadata is consumed (implemented), work has completed processing (either has been committed back to windmill or put in an un-active state). Uses a SynchronousQueue to implement a publish/subscribe pattern. put blocks until another thread take(s) from the queue.
  • Contains single threaded executor for periodic budget refreshes.

Future changes need still:

  • Have GrpcGetWorkStream accept a GrpcGetDataStream and GrpcCommitWorkStream so that it can construct a ProcessWorkItemClient and pass it onto the ProcessWorkItem (replaces current behavior whereWorkItem being passed to the WorkItemReceiver).
  • Integrate with StreamingDataflowWorker, might be worth having 2 different implementations of StreamingDataflowWorker since current MetricTrackingWindmillServer is used for GetData (keyed and global) fetches.
  • Need to figure out a way to batch commits since they need to go to the same origin worker

R: @scwhittle

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@github-actions
Copy link
Contributor

github-actions bot commented Oct 4, 2023

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@m-trieu m-trieu force-pushed the mtrieu-process-work branch from 5b6d748 to d686918 Compare October 4, 2023 23:33
@github-actions
Copy link
Contributor

github-actions bot commented Oct 5, 2023

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @chamikaramj added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

WorkItem workItem();

/** Refreshes the active work for each computation owned by the backend Windmill worker. */
void refreshActiveWork(Map<String, List<KeyedGetDataRequest>> activeWorkPerComputation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this should be part of the per-work item client.

The information related to this particular workItem for refreshing is the computation+work_token. The batching is across many work items and done at a higher level.

Since KeyedGetDataRequest/Commit contain key+work_token, they are not necessarily tied to the workItem by the interface. An alternative would be to change this to expose the streams instead of wrapping the stream methods.

WorkItem workItem();
GetDataStream getDataStream();
CommitWorkStream commitWorkStream();

Then the streams can be used directly for getting data or committing.
For heartbeats, we can iterate over all the clients building batches by their corresponding GetDataStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;

/** Utilities for creating {@link Channel} for gRPC stubs. */
public final class WindmillChannelFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move refactoring to separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed for the GrpcStubFactory, is it ok to keep in this PR?
i havent replaced similar usage in other files, will do that in a refactor pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to submit first the refactor+update of existing files? I think it would help make the size of this PR smaller and it would also make it clearer which code is being moved/refactored versus the new code.

In particular this and StreamingEngineStreamFactory both seem to just be extracted from GrpcWindmillServer. A single PR adding these and updating exisitng usage will be easier to review and ensures we don't have duplicate code sitting around that could possibly drift.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping on this, this PR is very big and it would be helpful for reviewing to have this separated. With force pushes the diffs to latest reviewed files seem to not work making it difficult to review especially with so many files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure i can split it up

* <p>TODO: Replace this with an auto generated proxy which calls the underlying implementation
* delegate to reduce maintenance burden.
*/
class VendoredCredentialsAdapter extends Credentials {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to refactoring PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, reverted

* GetWorkBudget}.
*/
public GetWorkBudget subtract(long items, long bytes) {
return GetWorkBudget.builder().setBytes(bytes() - bytes).setItems(items() - items).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to go negative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call, if it drops below to negative we can set it at zero.
done.

@@ -67,12 +70,33 @@ public abstract class WindmillServerStub implements StatusDataProvider {
public abstract GetWorkStream getWorkStream(
Windmill.GetWorkRequest request, WorkItemReceiver receiver);

public GetWorkStream getWorkStream(
CloudWindmillServiceV1Alpha1Stub stub,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these don't make sense to me on this interface.
IsReady/setWindmillServiceEndpoints etc don't have effect if we are passing in the stub.

It seems like instead perhaps the WindmillStreamSender could just create streams itself instead of going through this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea, done!

import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;

@AutoValue
public abstract class WindmillConnection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this class? It seems like we can just have a map from Endpoint to streams based upon the endpoint with a shared stub without needing to keep this object around.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will need it if we want to use the backend worker tokens approach.

@m-trieu
Copy link
Contributor Author

m-trieu commented Oct 10, 2023

still working on changes will ping when done @scwhittle
Changes are:

  • Also adding a unit test for StreamingEngineClient.

  • Instead of having raw threads in StreamingEngineClient I am opting for using 3 ExecutorService(s) that are single threaded to handle the GetWorkerMetadata and budget refreshes

  1. ExecutorService for starting the GetWorkerMetadataStream
  2. ScheduledExecutorService for recurring budget refreshes
  3. ExecutorService that has a runnable listening to SynchronizedQueue, for triggered budget refreshes.
  • triggered when work is completed
  • triggered when WorkerMetadata is consumed

Thank you!

@m-trieu m-trieu force-pushed the mtrieu-process-work branch from 759b9f3 to 8ef675c Compare October 16, 2023 23:54
@m-trieu
Copy link
Contributor Author

m-trieu commented Oct 16, 2023

ready for another look thanks ! @scwhittle

@m-trieu m-trieu force-pushed the mtrieu-process-work branch 2 times, most recently from 082d14f to c38eeb5 Compare October 18, 2023 22:01
Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just some initial comments as I started to go through. only through a couple files so far but figure you might be able to parallelize some with me reviewing more.

abstract class StreamEngineConnectionState {
static final StreamEngineConnectionState EMPTY =
builder()
.setWindmillConnections(ImmutableMap.of())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense for empty maps to be the default for all of these fields? could modify builder() to set them or perhaps a default annotation would work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, set to default empty immutable maps in the builder() method

private final SynchronousQueue<Boolean> budgetRefreshTrigger;

/** Redistributes the budget on a timed cadence. */
private final ScheduledExecutorService scheduledBudgetRefreshExecutor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can get rid of the schedule. It seems like budget distribution can be just based upon events:

  • getting new set of endpoints
  • budget returned

Benefits without a scheduled component is that we don't have possible latency waiting for the scheduling to occur. If there are things that we do want to evaluate periodically (for example if we want to reclaim budget we think is misplaced) perhaps we could do that as a followup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm, is it ok that this deviates from current behavior? based on what i saw SF uses this scheduling mechanic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what triggers a budget return?
after the user worker commits a work item?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does appear there is a backup time-based trigger in existing internal implemention to recover from cases where budget is lost due to rounding etc. However it is implemented via a single thread waiting for an explicit trigger with a timeout instead of separate threads that may concurrently try to refresh the budget.

So perhaps we should keep the timeout too for the same reason but let's also move to the single thread doing refreshing to simplify (as previous comment below was also suggesting).

this.getWorkerMetadataReady = new CountDownLatch(1);
}

public static StreamingEngineClient create(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some comments would be good here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some comments about the initial state and what the method does besides create an instance (i.e starts GetWorkerMetadataStream)

}

public static StreamingEngineClient create(
AtomicBoolean running,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like an odd parameter, can it just be an isRunning() method on this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm, i think we would not even need it since the harness class (StreamingDataflowWorker), can just not initialize this until it is in a running state.

Can make all of these executors use daemon threads then which will get cleaned up and prevent mem leaks.

StreamingEngineStreamFactory streamingEngineStreamFactory,
WorkItemReceiver workItemReceiver,
WindmillGrpcStubFactory windmillGrpcStubFactory,
GetWorkRequest getWorkRequest,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like it overlaps with totalGetWorkBudget, can one of them be removed? could remove budget or change this to JobHeader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

})
private synchronized void startGetWorkerMetadataStream() {
// We only want to set and start this value once.
getWorkerMetadataStream.compareAndSet(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just called once during create methods, I think you could remove the atomic and instead just assert it is null before setting it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


private void requestBudgetRefresh() {
try {
budgetRefreshTrigger.put(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is going to cause this thread to block until the refresh thread takes it.

another option would be to use some sort of lock where you set value to true, and the other thread is waiting until condition is met, where then it clears value, releases lock, processes budget and then goes back to waiting. That makes the requestBudgetRefresh entirely non-blocking which if we're going to call from grpc threads etc is likely a property we want

com/google/common/util/concurrent/Monitor.java is perhaps one way to do that that though there may be better options.

It might be nice to structure so that refreshing is always done by a single background thread which either waits for triggers or if we need periodic evaluation can timeout and reevaluate as well. Having a single modifier will make it easier to think through the possible races.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated this

@m-trieu m-trieu force-pushed the mtrieu-process-work branch from c38eeb5 to 4269201 Compare October 24, 2023 04:55
private final SynchronousQueue<Boolean> budgetRefreshTrigger;

/** Redistributes the budget on a timed cadence. */
private final ScheduledExecutorService scheduledBudgetRefreshExecutor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does appear there is a backup time-based trigger in existing internal implemention to recover from cases where budget is lost due to rounding etc. However it is implemented via a single thread waiting for an explicit trigger with a timeout instead of separate threads that may concurrently try to refresh the budget.

So perhaps we should keep the timeout too for the same reason but let's also move to the single thread doing refreshing to simplify (as previous comment below was also suggesting).

import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;

/** Utilities for creating {@link Channel} for gRPC stubs. */
public final class WindmillChannelFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to submit first the refactor+update of existing files? I think it would help make the size of this PR smaller and it would also make it clearer which code is being moved/refactored versus the new code.

In particular this and StreamingEngineStreamFactory both seem to just be extracted from GrpcWindmillServer. A single PR adding these and updating exisitng usage will be easier to review and ensures we don't have duplicate code sitting around that could possibly drift.


public static WindmillStreamSender create(
CloudWindmillServiceV1Alpha1Stub stub,
GetWorkRequest getWorkRequest,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly this request overlaps with budget and unclear what is used.
remove the budget and start with request values or perhaps JobHeader can be used.

public void closeAllStreams() {
// Supplier<Stream>.get() starts the stream which is an expensive operation as it initiates the
// streaming RPCs. Do not close the streams unless they have already been started.
if (started.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is racy if some streams are started and others not closed. An alternative to an atomic would be to have an object you syncrhonize on here and in startStreams so close blocks if start is still ongoing.

Or you could document that start/close are not thread-safe and should be sequenced

}

public synchronized void adjustBudget(long itemsDelta, long bytesDelta) {
getWorkBudget.set(getWorkBudget.get().add(itemsDelta, bytesDelta));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will create the stream if it was not started

Collection<Windmill.LatencyAttribution> getWorkStreamLatencies);

/** Adjusts the {@link GetWorkBudget} for the stream. */
default void adjustBudget(long itemsDelta, long bytesDelta) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these don't seem like great defaults to have, can we just keep it abstract to make sure it is implemented? and put no-op in test impls if desired?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu m-trieu force-pushed the mtrieu-process-work branch 2 times, most recently from 13da1fc to b8046c6 Compare October 25, 2023 01:10
TimeUnit.MILLISECONDS);
}

private void refreshBudget() {
Copy link
Contributor Author

@m-trieu m-trieu Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about this @scwhittle
Changed to 1 thread that gets scheduled at a fixed interval with an initial delay

This will poll a BlockDeque for a triggered budget refresh without blocking, if a there was a trigger that happened after the most recent refresh, it will redistribute the budget.

Else if enough time has passed and budget has not been distributed, it will redistribute the budget (this is kind of like the scheduling mechanism)

if lgty, i will go forward and modify the tests :)

Copy link
Contributor

@scwhittle scwhittle Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like if we wanted to trigger budget refresh this would wait to notice until the next poll? Can we instead just have a thread hanging on consuming from the queue with a timeout? Then it would react immediately to triggering or at most redistribute every timeout period.

This still has the issue though that it will cause things adding to the queue to block if the background thread is busy redistributing budget. It would be nice if triggering was non-blocking and that multiple signals to trigger were collapsed. This is what I was wondering if you could do with the Monitor class unless there was some other suitable class. You might be able to use AdvancingPhaser beam class. To trigger call arrive(), the monitoring loop woudl be something like:

int phase = 0;
while (true) {
phase = phaser.awaitAdvanceInterruptibly(phase, 100, milliseconds);
if (phase < 0) break; // phaser shutdown
// either timed out or was triggered, run rebudgeting
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to use phaser.
instead of using a while(true) opted to use the ScheduledExecturorService to run each iteration consecutively one after the other with no delay between successive tasks.

we wait for the scheduled interval or phaser.arrive and then redistribute the budget.

thanks for the rec to use Phaser!
@scwhittle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of the fixedRate versus a loop if its running continuously? I think we still want some way to exit early on shutdown so we don't have to wait for the SCHEDULED_BUDGET_REFRESH for the method to finish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my thought was that if there was an issue/error, the task could just terminate the executor could schedule a new one.

however if this is not a concern we can just use the loop.

@m-trieu m-trieu force-pushed the mtrieu-process-work branch 2 times, most recently from a6c32bd to 78403b7 Compare October 26, 2023 07:03
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;

/** Utilities for creating {@link Channel} for gRPC stubs. */
public final class WindmillChannelFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping on this, this PR is very big and it would be helpful for reviewing to have this separated. With force pushes the diffs to latest reviewed files seem to not work making it difficult to review especially with so many files.

package org.apache.beam.runners.dataflow.worker.windmill.util;

import com.google.auto.value.AutoValue;
import org.apache.beam.runners.dataflow.worker.windmill.grpcclient.ThrottleTimer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems odd that ThrottleTimer is in grpc package but the collection of them is in a different package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will move them around moving a lot of this to a new PR

i will isolate this one to just adding the StreamingEngineClient

public void distributeBudget(
ImmutableCollection<WindmillStreamSender> streams, GetWorkBudget getWorkBudget) {
if (streams.isEmpty()) {
LOG.warn("Cannot distribute budget to no streams.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change this to periodically log? if not maybe better just as debug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made debug log

}

if (getWorkBudget.equals(GetWorkBudget.noBudget())) {
LOG.warn("Cannot distribute 0 budget.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, this one in particular could happen more regularly if we have enough work already on this worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made debug log

return;
}

ImmutableMap<WindmillStreamSender, GetWorkBudget> desiredBudgets =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just use Map interface here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import org.joda.time.Instant;

/**
* Receives and processes {@link
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put on the method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

0, SCHEDULED_BUDGET_REFRESH_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
// If there is an error, we will proceed with refreshing the budget.
LOG.error("Error occurred waiting for budget refresh.", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeout should not be considered an error, we don't necessarily expect to be triggered more often than timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private int waitForBudgetRefreshTrigger() {
try {
return budgetRefreshTrigger.awaitAdvanceInterruptibly(
0, SCHEDULED_BUDGET_REFRESH_MILLIS, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to remember what the last phase was not just use 0, so that this will only signal if the phase increases (via arrive() triggering) from the last waiting.
This could just be kept on the stack if you change to a while loop.

Or you could create a helper class to make this easier to use and keep the phase internal to itself. IF you do make it a separate file with tests.

class Trigger {
void trigger();

class Observer {
private phase = 0;
// Will return if observed Trigger has trigger() called or deadline reached.
// Guaranteed to be triggered if any trigger() call since last await returned.
// Multiple triggers may be collapsed into single observation. Observers are independent from each other.
boolean awaitWithDeadline(deadline) {
}
}
Observer createObserver();
}

TimeUnit.MILLISECONDS);
}

private void refreshBudget() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of the fixedRate versus a loop if its running continuously? I think we still want some way to exit early on shutdown so we don't have to wait for the SCHEDULED_BUDGET_REFRESH for the method to finish.

@m-trieu m-trieu force-pushed the mtrieu-process-work branch from 78403b7 to 33ae0c1 Compare October 26, 2023 19:11
@m-trieu
Copy link
Contributor Author

m-trieu commented Oct 26, 2023

@scwhittle added a lot of the refactoring changes to #29156

I will continue to work on this one while the other one is getting reviewed (addressing other comments and writing tests)

once the other one is merged, i will rebase this one to minimize the changes

@scwhittle
Copy link
Contributor

Thanks for separating a lot of the refactoring/moving! Let me know when this is rebased and ready for review

@m-trieu m-trieu force-pushed the mtrieu-process-work branch from 33ae0c1 to 4f73391 Compare November 3, 2023 03:08
Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry didn't get a chance to finish everything in detail but here are some initial comments. looks like some of the builds are broken too

@@ -35,7 +35,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class GetWorkTimingInfosTracker {
public class GetWorkTimingInfosTracker {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that we have @ Internal annotation on other runner classes that are public, to prevent expectation that they will not be modified acroos beam versions. Can you add that here and other public classes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added to other public classes, made this one package private

import org.joda.time.Instant;

/**
* Receives and processes {@link
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

@m-trieu m-trieu force-pushed the mtrieu-process-work branch from 4f73391 to 17c53b9 Compare November 7, 2023 07:37
@m-trieu m-trieu force-pushed the mtrieu-process-work branch from de5406d to b86569c Compare November 17, 2023 07:11
Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't finish, will look more next week

});

synchronized (this) {
// Just sent the request extension, reset the nextBudgetAdjustment. This will be set when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to in the same synchronized block take the budget and clear it. Otherwise it seems possibly racy and this could lose budget.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


GetWorkBudget newNextBudgetAdjustment = newNextBudgetAdjustmentBuilder.build();

inflightMessages.set(newNextBudgetAdjustment.items());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use atomicref to the budget since they are immutable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// The entire WorkItem has been received, it is ready to be processed.
if (chunk.getRemainingBytesForWorkItem() == 0) {
workItemBuffer.runAndReset();
// Record the fact that there are now fewer outstanding messages and bytes on the stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if using AtomicReference, could use getAndUpdate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private final GetWorkTimingInfosTracker workTimingInfosTracker;
private String computation;
private ByteString data;
private long bufferedSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use data.size()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* Reference to {@link GetWorkerMetadataStream} that is lazily initialized via double check
* locking, with its initial value being null.
*/
private volatile @Nullable GetWorkerMetadataStream getWorkerMetadataStream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you make this final and remove locking by having the create method create it and then pass it into the constructor? Actually looks like you would have to change how the stream is started then since you'd want to start it only after this client is constructed.

Another idea, what about using Suppliers.memoize() instead of doing the double-check yourself. It's more obviously correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

GetWorkBudget remaining = stream.remainingGetWorkBudget();
if (isBelowFiftyPercentOfTarget(remaining, desired)) {
GetWorkBudget adjustment = desired.subtract(remaining);
LOG.info("Adjusting budget for stream={} by {}", stream, adjustment);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be too spammy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

divide(
totalGetWorkBudget.items() - activeWorkBudget.items(),
streams.size(),
RoundingMode.CEILING))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rounding up here will drift upwards over the lifetime of the stream if the budget is a closed loop otherwise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment and TODO to fix the non-determinism

@m-trieu
Copy link
Contributor Author

m-trieu commented Nov 21, 2023

back to you @scwhittle !

@m-trieu m-trieu force-pushed the mtrieu-process-work branch from 6edec07 to 3278ab0 Compare November 21, 2023 19:28
import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;

@AutoValue
public abstract class WindmillConnection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add Internal annotation here and all other public classes in files this PR covers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

endpoint,
() ->
streamFactory.createGetDataStream(
WindmillConnection.from(endpoint, this::createWindmillStub).stub(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we already have a connection for this endpoint due to windmill endpoints, use that stub.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


abstract ImmutableMap<WindmillConnection, WindmillStreamSender> windmillStreams();

abstract ImmutableMap<String, Endpoint> globalDataEndpoints();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we combine globalDataEndpoints and globalDataStreams and have
map from String -> Supplier?

comment on what the string is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

try {
getWorkerMetadataReady.await();
} catch (InterruptedException e) {
throw new StreamingEngineClientException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.currentThread().interrupt();

https://www.baeldung.com/java-interrupted-exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noted
removed await so we don't need this.

* {@link GetWorkerMetadataStream}.
*/
public void waitForFirstWorkerMetadata() {
// Do nothing if we have already initialized the initial streams.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about removing, await() below returns immediately if already ready, and less paths to worry about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done everything can happen asynchronously then
exposed a method for testing to see if worker metadata has been populated

}

/** Publishes an event to trigger a budget refresh. */
public synchronized void requestBudgetRefresh() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm syncrhonized

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

/**
* Waits for a budget refresh trigger event with a timeout. Returns whether budgets should still
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update comment about return value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return budgetRefreshTrigger.awaitAdvanceInterruptibly(
currentBudgetRefreshPhase, SCHEDULED_BUDGET_REFRESH_MILLIS, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOG.info("Budget refresh not triggered, proceeding with scheduled refresh.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be spammy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

getWorkBudgetDistributor.distributeBudget(
currentConnectionsState.windmillStreams().values(), totalGetWorkBudget);
lastBudgetRefresh.compareAndSet(Instant.EPOCH, Instant.now());
windmillStreamSenders.forEach(WindmillStreamSender::startStreams);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do streams get started on non-first metadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added starting the stream when it is created in createAndStartWindmillStreamSenderFor( WindmillConnection connection).

LOG.info("Starting initial GetWorkStreams with connections={}", currentConnectionsState);
ImmutableCollection<WindmillStreamSender> windmillStreamSenders =
currentConnectionsState.windmillStreams().values();
getWorkBudgetDistributor.distributeBudget(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it woudl be cleaner to allow starting the stream without budget and then just trigger getworkbudgetrefresher to distribute and update refresh.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu
Copy link
Contributor Author

m-trieu commented Nov 22, 2023

thanks!
back to you @scwhittle !

@m-trieu m-trieu force-pushed the mtrieu-process-work branch from 7dc8216 to 46b617b Compare November 28, 2023 00:40
Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few more things but mostly LGTM!

@@ -480,6 +366,10 @@ private StreamingEngineConnectionState waitForWorkerMetadataToBeConsumed(
return connections.get();
}

private void waitForFirstWorkerMetadata() {
while (!Preconditions.checkNotNull(streamingEngineClient).isWorkerMetadataReady()) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have some sleep so it's not busy spinning cpu now that it just polls without waiting.

But see above, perhaps this can just be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@@ -196,8 +196,7 @@ public void testStreamsStartCorrectly() throws InterruptedException {
String workerToken = "workerToken1";
String workerToken2 = "workerToken2";

Thread streamingEngineClientThread =
new Thread(streamingEngineClient::waitForFirstWorkerMetadata);
Thread streamingEngineClientThread = new Thread(this::waitForFirstWorkerMetadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this thread be removed? You aren't joining it and now the wait itself doesn't trigger anything, so it seems like it is unnecessary.

ditto for the others, perhaps the waitForFirstWorkerMetadata can just be removed by relying on getWorkerMetadtadaReady instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@scwhittle
Copy link
Contributor

@m-trieu Looks like there is an unvendored jar pulled in from the failing dependecy analysis test.

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good other than use of unvendored guava somewhere causing failing dep test

@m-trieu m-trieu force-pushed the mtrieu-process-work branch from 5cfded3 to f7ea302 Compare November 29, 2023 02:23
@m-trieu
Copy link
Contributor Author

m-trieu commented Nov 29, 2023

changed the imports to the vendored/java versions!
ready to merge @scwhittle

thank you!

@m-trieu
Copy link
Contributor Author

m-trieu commented Nov 29, 2023

Run Dataflow Streaming ValidatesRunner

@m-trieu
Copy link
Contributor Author

m-trieu commented Nov 29, 2023

rerunning the runner but seems to be an unrelated failure due to timeouts @scwhittle

@m-trieu
Copy link
Contributor Author

m-trieu commented Nov 29, 2023

Run Java PreCommit

@m-trieu m-trieu force-pushed the mtrieu-process-work branch from 203b26e to 16253d0 Compare November 30, 2023 05:23
@m-trieu
Copy link
Contributor Author

m-trieu commented Nov 30, 2023

rebased to master and rerunning precommit, it was previously timing out and not sure why.

@m-trieu
Copy link
Contributor Author

m-trieu commented Nov 30, 2023

rebase to master did it!

@scwhittle
Copy link
Contributor

Thanks!

@scwhittle scwhittle merged commit 93fb204 into apache:master Nov 30, 2023
18 checks passed
@Abacn
Copy link
Contributor

Abacn commented Dec 12, 2023

Hi, it appears a unit test added in this PR is flaky (could stuck indefinitely), see #28957 (comment). @m-trieu would you mind taking a look? CC: @scwhittle

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants