Skip to content

Commit

Permalink
add getworkermetadata streaming rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Sep 11, 2023
1 parent 61f0184 commit 3152fac
Show file tree
Hide file tree
Showing 15 changed files with 948 additions and 154 deletions.
135 changes: 70 additions & 65 deletions runners/google-cloud-dataflow-java/worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,90 +67,91 @@ def excluded_dependencies = [
library.java.error_prone_annotations, // Provided scope added in worker
library.java.hamcrest, // Test only
library.java.junit, // Test only
library.java.jsonassert // Test only
library.java.jsonassert, // Test only
library.java.truth // Test only
]

applyJavaNature(
automaticModuleName: 'org.apache.beam.runners.dataflow.worker',
archivesBaseName: 'beam-runners-google-cloud-dataflow-java-legacy-worker',
classesTriggerCheckerBugs: [
'BatchGroupAlsoByWindowAndCombineFn': 'TODO: file a bug report',
'AssignWindowsParDoFnFactory': 'TODO: file a bug report',
'FetchAndFilterStreamingSideInputsOperation': 'https://github.com/typetools/checker-framework/issues/5436',
'BatchGroupAlsoByWindowAndCombineFn' : 'TODO: file a bug report',
'AssignWindowsParDoFnFactory' : 'TODO: file a bug report',
'FetchAndFilterStreamingSideInputsOperation': 'https://github.com/typetools/checker-framework/issues/5436',
],
exportJavadoc: false,
enableSpotbugs: false /* TODO(BEAM-5658): enable spotbugs */,
shadowJarValidationExcludes: [
"org/apache/beam/runners/dataflow/worker/**",
"org/apache/beam/repackaged/beam_runners_google_cloud_dataflow_java_legacy_worker/**",
// TODO(https://github.com/apache/beam/issues/19114): Move DataflowRunnerHarness class under org.apache.beam.runners.dataflow.worker namespace
"com/google/cloud/dataflow/worker/DataflowRunnerHarness.class",
// Allow slf4j implementation worker for logging during pipeline execution
"org/slf4j/impl/**"
"org/apache/beam/runners/dataflow/worker/**",
"org/apache/beam/repackaged/beam_runners_google_cloud_dataflow_java_legacy_worker/**",
// TODO(https://github.com/apache/beam/issues/19114): Move DataflowRunnerHarness class under org.apache.beam.runners.dataflow.worker namespace
"com/google/cloud/dataflow/worker/DataflowRunnerHarness.class",
// Allow slf4j implementation worker for logging during pipeline execution
"org/slf4j/impl/**"
],
shadowClosure: {
// Each included dependency must also include all of its necessary transitive dependencies
// or have them provided by the users pipeline during job submission. Typically a users
// pipeline includes :runners:google-cloud-dataflow-java and its transitive dependencies
// so those dependencies don't need to be shaded (bundled and relocated) away. All other
// dependencies needed to run the worker must be shaded (bundled and relocated) to prevent
// ClassNotFound and/or MethodNotFound errors during pipeline execution.
//
// Each included dependency should have a matching relocation rule below that ensures
// that the shaded jar is correctly built.
// Each included dependency must also include all of its necessary transitive dependencies
// or have them provided by the users pipeline during job submission. Typically a users
// pipeline includes :runners:google-cloud-dataflow-java and its transitive dependencies
// so those dependencies don't need to be shaded (bundled and relocated) away. All other
// dependencies needed to run the worker must be shaded (bundled and relocated) to prevent
// ClassNotFound and/or MethodNotFound errors during pipeline execution.
//
// Each included dependency should have a matching relocation rule below that ensures
// that the shaded jar is correctly built.

dependencies {
include(dependency(library.java.slf4j_jdk14))
}
dependencies {
include(dependency(library.java.slf4j_jdk14))
}

dependencies {
include(project(path: ":model:fn-execution", configuration: "shadow"))
}
relocate("org.apache.beam.model.fnexecution.v1", getWorkerRelocatedPath("org.apache.beam.model.fnexecution.v1"))
dependencies {
include(project(path: ":model:fn-execution", configuration: "shadow"))
}
relocate("org.apache.beam.model.fnexecution.v1", getWorkerRelocatedPath("org.apache.beam.model.fnexecution.v1"))

dependencies {
include(project(":runners:core-construction-java"))
include(project(":runners:core-java"))
}
relocate("org.apache.beam.runners.core", getWorkerRelocatedPath("org.apache.beam.runners.core"))
relocate("org.apache.beam.repackaged.beam_runners_core_construction_java", getWorkerRelocatedPath("org.apache.beam.repackaged.beam_runners_core_construction_java"))
relocate("org.apache.beam.repackaged.beam_runners_core_java", getWorkerRelocatedPath("org.apache.beam.repackaged.beam_runners_core_java"))
dependencies {
include(project(":runners:core-construction-java"))
include(project(":runners:core-java"))
}
relocate("org.apache.beam.runners.core", getWorkerRelocatedPath("org.apache.beam.runners.core"))
relocate("org.apache.beam.repackaged.beam_runners_core_construction_java", getWorkerRelocatedPath("org.apache.beam.repackaged.beam_runners_core_construction_java"))
relocate("org.apache.beam.repackaged.beam_runners_core_java", getWorkerRelocatedPath("org.apache.beam.repackaged.beam_runners_core_java"))

dependencies {
include(project(":runners:java-fn-execution"))
}
relocate("org.apache.beam.runners.fnexecution", getWorkerRelocatedPath("org.apache.beam.runners.fnexecution"))
relocate("org.apache.beam.repackaged.beam_runners_java_fn_execution", getWorkerRelocatedPath("org.apache.beam.repackaged.beam_runners_java_fn_execution"))
dependencies {
include(project(":runners:java-fn-execution"))
}
relocate("org.apache.beam.runners.fnexecution", getWorkerRelocatedPath("org.apache.beam.runners.fnexecution"))
relocate("org.apache.beam.repackaged.beam_runners_java_fn_execution", getWorkerRelocatedPath("org.apache.beam.repackaged.beam_runners_java_fn_execution"))

dependencies {
include(project(":sdks:java:fn-execution"))
}
relocate("org.apache.beam.sdk.fn", getWorkerRelocatedPath("org.apache.beam.sdk.fn"))
relocate("org.apache.beam.repackaged.beam_sdks_java_fn_execution", getWorkerRelocatedPath("org.apache.beam.repackaged.beam_sdks_java_fn_execution"))
dependencies {
include(project(":sdks:java:fn-execution"))
}
relocate("org.apache.beam.sdk.fn", getWorkerRelocatedPath("org.apache.beam.sdk.fn"))
relocate("org.apache.beam.repackaged.beam_sdks_java_fn_execution", getWorkerRelocatedPath("org.apache.beam.repackaged.beam_sdks_java_fn_execution"))

dependencies {
// We have to include jetty-server/jetty-servlet and all of its transitive dependencies
// which includes several org.eclipse.jetty artifacts + servlet-api
include(dependency("org.eclipse.jetty:.*:9.2.10.v20150310"))
include(dependency("javax.servlet:javax.servlet-api:3.1.0"))
}
relocate("org.eclipse.jetty", getWorkerRelocatedPath("org.eclipse.jetty"))
relocate("javax.servlet", getWorkerRelocatedPath("javax.servlet"))
dependencies {
// We have to include jetty-server/jetty-servlet and all of its transitive dependencies
// which includes several org.eclipse.jetty artifacts + servlet-api
include(dependency("org.eclipse.jetty:.*:9.2.10.v20150310"))
include(dependency("javax.servlet:javax.servlet-api:3.1.0"))
}
relocate("org.eclipse.jetty", getWorkerRelocatedPath("org.eclipse.jetty"))
relocate("javax.servlet", getWorkerRelocatedPath("javax.servlet"))

// We don't relocate windmill since it is already underneath the org.apache.beam.runners.dataflow.worker namespace and never
// expect a user pipeline to include it. There is also a JNI component that windmill server relies on which makes
// arbitrary relocation more difficult.
dependencies {
include(project(path: ":runners:google-cloud-dataflow-java:worker:windmill", configuration: "shadow"))
}
// We don't relocate windmill since it is already underneath the org.apache.beam.runners.dataflow.worker namespace and never
// expect a user pipeline to include it. There is also a JNI component that windmill server relies on which makes
// arbitrary relocation more difficult.
dependencies {
include(project(path: ":runners:google-cloud-dataflow-java:worker:windmill", configuration: "shadow"))
}

// Include original source files extracted under
// '$buildDir/original_sources_to_package' to jar
from "$buildDir/original_sources_to_package"
// Include original source files extracted under
// '$buildDir/original_sources_to_package' to jar
from "$buildDir/original_sources_to_package"

exclude "META-INF/LICENSE.txt"
exclude "about.html"
})
exclude "META-INF/LICENSE.txt"
exclude "about.html"
})

/******************************************************************************/
// Configure the worker root project
Expand Down Expand Up @@ -219,6 +220,10 @@ dependencies {
// as well and placed within the testImplementation configuration. Otherwise we can place it within
// the shadowTest configuration.
testImplementation project(path: ":runners:core-java", configuration: "testRuntimeMigration")
// TODO: excluding Guava until Truth updates it to >32.1.x
testImplementation(library.java.truth) {
exclude group: 'com.google.guava', module: 'guava'
}
shadowTest project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntimeMigration")
shadowTest project(path: ":runners:direct-java", configuration: "shadow")
shadowTest project(path: ":sdks:java:harness", configuration: "shadowTest")
Expand All @@ -232,8 +237,8 @@ dependencies {
project.task('validateShadedJarContainsSlf4jJdk14', dependsOn: 'shadowJar') {
ext.outFile = project.file("${project.reportsDir}/${name}.out")
inputs.files(project.configurations.shadow.artifacts.files)
.withPropertyName("shadowArtifactsFiles")
.withPathSensitivity(PathSensitivity.RELATIVE)
.withPropertyName("shadowArtifactsFiles")
.withPathSensitivity(PathSensitivity.RELATIVE)
outputs.files outFile
doLast {
project.configurations.shadow.artifacts.files.each {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,16 @@
* synchronizing on this.
*/
public abstract class AbstractWindmillStream<RequestT, ResponseT> implements WindmillStream {
protected static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
// Default gRPC streams to 2MB chunks, which has shown to be a large enough chunk size to reduce
// per-chunk overhead, and small enough that we can still perform granular flow-control.
protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;

private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStream.class);

protected final AtomicBoolean clientClosed;

private final AtomicLong lastSendTimeMs;
private final Executor executor;
private final BackOff backoff;
// Indicates if the current stream in requestObserver is closed by calling close() method
private final AtomicBoolean streamClosed;
private final AtomicLong startTimeMs;
private final AtomicLong lastSendTimeMs;
private final AtomicLong lastResponseTimeMs;
private final AtomicInteger errorCount;
private final AtomicReference<String> lastError;
Expand All @@ -83,6 +78,8 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
private final int logEveryNStreamFailures;
private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
// Indicates if the current stream in requestObserver is closed by calling close() method
private final AtomicBoolean streamClosed;
private @Nullable StreamObserver<RequestT> requestObserver;

protected AbstractWindmillStream(
Expand Down Expand Up @@ -132,9 +129,9 @@ private static long debugDuration(long nowMs, long startMs) {
protected abstract boolean hasPendingRequests();

/**
* Called when the stream is throttled due to resource exhausted errors. Will be called for each
* resource exhausted error not just the first. onResponse() must stop throttling on receipt of
* the first good message.
* Called when the client side stream is throttled due to resource exhausted errors. Will be
* called for each resource exhausted error not just the first. onResponse() must stop throttling
* on receipt of the first good message.
*/
protected abstract void startThrottleTimer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@
* <p>Used to wrap existing {@link StreamObserver}s to be able to install an {@link
* ClientCallStreamObserver#setOnReadyHandler(Runnable) onReadyHandler}.
*
* <p>This is as thread-safe as the undering stream observer that is being wrapped.
* <p>This is as thread-safe as the underlying stream observer that is being wrapped.
*/
final class ForwardingClientResponseObserver<ReqT, RespT>
implements ClientResponseObserver<RespT, ReqT> {
final class ForwardingClientResponseObserver<ResponseT, RequestT>
implements ClientResponseObserver<RequestT, ResponseT> {
private final Runnable onReadyHandler;
private final Runnable onDoneHandler;
private final StreamObserver<ReqT> inboundObserver;
private final StreamObserver<ResponseT> inboundObserver;

ForwardingClientResponseObserver(
StreamObserver<ReqT> inboundObserver, Runnable onReadyHandler, Runnable onDoneHandler) {
StreamObserver<ResponseT> inboundObserver, Runnable onReadyHandler, Runnable onDoneHandler) {
this.inboundObserver = inboundObserver;
this.onReadyHandler = onReadyHandler;
this.onDoneHandler = onDoneHandler;
}

@Override
public void onNext(ReqT value) {
public void onNext(ResponseT value) {
inboundObserver.onNext(value);
}

Expand All @@ -60,7 +60,7 @@ public void onCompleted() {
}

@Override
public void beforeStart(ClientCallStreamObserver<RespT> stream) {
public void beforeStart(ClientCallStreamObserver<RequestT> stream) {
stream.setOnReadyHandler(onReadyHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public static StreamObserverFactory direct(
return new Direct(deadlineSeconds, messagesBetweenIsReadyChecks);
}

public abstract <ReqT, RespT> StreamObserver<RespT> from(
Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
StreamObserver<ReqT> responseObserver);
public abstract <ResponseT, RequestT> StreamObserver<RequestT> from(
Function<StreamObserver<ResponseT>, StreamObserver<RequestT>> clientFactory,
StreamObserver<ResponseT> responseObserver);

private static class Direct extends StreamObserverFactory {
private final long deadlineSeconds;
Expand All @@ -47,14 +47,14 @@ private static class Direct extends StreamObserverFactory {
}

@Override
public <ReqT, RespT> StreamObserver<RespT> from(
Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory,
StreamObserver<ReqT> inboundObserver) {
public <ResponseT, RequestT> StreamObserver<RequestT> from(
Function<StreamObserver<ResponseT>, StreamObserver<RequestT>> clientFactory,
StreamObserver<ResponseT> inboundObserver) {
AdvancingPhaser phaser = new AdvancingPhaser(1);
CallStreamObserver<RespT> outboundObserver =
(CallStreamObserver<RespT>)
CallStreamObserver<RequestT> outboundObserver =
(CallStreamObserver<RequestT>)
clientFactory.apply(
new ForwardingClientResponseObserver<ReqT, RespT>(
new ForwardingClientResponseObserver<ResponseT, RequestT>(
inboundObserver, phaser::arrive, phaser::forceTermination));
return new DirectStreamObserver<>(
phaser, outboundObserver, deadlineSeconds, messagesBetweenIsReadyChecks);
Expand Down
Loading

0 comments on commit 3152fac

Please sign in to comment.