Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate direct path with StreamingDataflowWorker code path #32778

Merged
merged 6 commits into from
Nov 26, 2024

Conversation

m-trieu
Copy link
Contributor

@m-trieu m-trieu commented Oct 14, 2024

Add direct path code branch

R: @scwhittle @arunpandianp


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @johnjcasey added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@m-trieu m-trieu force-pushed the mt-fix-dp-experiment branch from 45fdb16 to da13655 Compare October 22, 2024 22:53
Copy link
Contributor

Reminder, please take a look at this pr: @johnjcasey

Copy link
Contributor

github-actions bot commented Nov 4, 2024

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @johnjcasey added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

Copy link
Contributor

Reminder, please take a look at this pr: @johnjcasey

@m-trieu m-trieu force-pushed the mt-fix-dp-experiment branch from 036e3a1 to 395abf1 Compare November 19, 2024 08:47
: streamingStatusPages;
}

private static ChannelzServlet createChannelZServlet(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't capitalize the Z

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -118,6 +130,8 @@ public final class StreamingDataflowWorker {
*/
public static final int MAX_SINK_BYTES = 10_000_000;

public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move back where it was and make private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -441,6 +520,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
WindmillServerStub windmillServer;
ComputationStateCache computationStateCache;
GrpcWindmillStreamFactory windmillStreamFactory;
ConfigFetcherComputationStateCacheAndWindmillClient.Builder builder =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems inconsistent to set dispatcher on builder but have the separate variables for the rest.

How about removing the varialbes and setting on the builder? It seems you can remove some nesting and return the build() result early in some cases like SE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
LOG.warn(
"DirectPath is currently only supported with IPv6 networking stack. Defaulting to"
+ " CloudPath.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about mentioning the service option to use or link to docs if some

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

StreamingDataflowWorker.class.getSimpleName());
}

private static ChannelCachingStubFactory createStubFactory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name createFanoutStubFactory since just used there and we don't want unconditional isolation (yet) in other case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

(threadName) -> Executors.newSingleThreadScheduledExecutor(),
DEFAULT_HARNESS_REPORTING_PERIOD,
DEFAULT_PER_WORKER_METRICS_PERIOD);
StreamingWorkerStatusReporter.builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make a testReporterBuilder() in this test that sets all these defaults?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu
Copy link
Contributor Author

m-trieu commented Nov 19, 2024

back to you @scwhittle thanks!

Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @johnjcasey added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

.setWindmillStreamFactory(windmillStreamFactory)
.setWindmillServer(
GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient))
.build();
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove else to reduce nesting since above always returns

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -74,7 +75,8 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
StreamingWorkScheduler streamingWorkScheduler,
Runnable waitForResources,
Function<String, Optional<ComputationState>> computationStateFetcher,
GetWorkSender getWorkSender) {
GetWorkSender getWorkSender,
Supplier<Long> throttleTimeSupplier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make a functional interface instead of a supplier? It's not clear that the supplier should get+reset, generally Supplier seems like it would just get.

Seems like this could then take supplier to vend but also could implement the same interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -25,4 +25,6 @@ public interface StreamingWorkerHarness {
void start();

void shutdown();

long getAndResetThrottleTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto could make StreamingWorkerHarness implement the ThrottlingTimerInterface (or whatever you name it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu m-trieu force-pushed the mt-fix-dp-experiment branch from d17e6a6 to a93bae5 Compare November 25, 2024 18:14
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.streaming.harness;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put in same throttling package as ThrottleTimer.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
@Internal
@FunctionalInterface
public interface ThrottledTimeTracker {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has a 'd' in Throttled but all the instances are just throttleTimeTracker. Make consistent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@scwhittle scwhittle merged commit aa21e4a into apache:master Nov 26, 2024
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants