diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java index e6245ad82879..6f97cbca9a80 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java @@ -56,7 +56,7 @@ /** * Represents the state of an attempt to process a {@link WorkItem} by executing user code. * - * @implNote Not thread safe, should not be executed or accessed by more than 1 thread at a time. + * @implNote Not thread safe, should not be modified by more than 1 thread at a time. */ @NotThreadSafe @Internal @@ -70,7 +70,7 @@ public final class Work implements RefreshableWork { private final Map totalDurationPerState; private final WorkId id; private final String latencyTrackingId; - private TimedState currentState; + private volatile TimedState currentState; private volatile boolean isFailed; private volatile String processingThreadName = ""; @@ -112,7 +112,7 @@ public static ProcessingContext createProcessingContext( Consumer workCommitter, HeartbeatSender heartbeatSender) { return ProcessingContext.create( - computationId, getDataClient, workCommitter, heartbeatSender, ""); + computationId, getDataClient, workCommitter, heartbeatSender, /* backendWorkerToken= */ ""); } public static ProcessingContext createProcessingContext( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java index a8ca175c9fb5..a4d69b228073 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java @@ -34,6 +34,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import javax.annotation.CheckReturnValue; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; @@ -61,12 +62,10 @@ import org.apache.beam.sdk.util.MoreFutures; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.checkerframework.checker.initialization.qual.UnderInitialization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +80,8 @@ public final class FanOutStreamingEngineWorkerHarness implements StreamingWorkerHarness { private static final Logger LOG = LoggerFactory.getLogger(FanOutStreamingEngineWorkerHarness.class); - private static final String WORKER_METADATA_CONSUMER_THREAD_NAME = "WorkerMetadataConsumerThread"; + private static final String WORKER_METADATA_CONSUMER_THREAD_NAME = + "WindmillWorkerMetadataConsumerThread"; private static final String STREAM_MANAGER_THREAD_NAME = "WindmillStreamManager-%d"; private final JobHeader jobHeader; @@ -92,7 +92,6 @@ public final class FanOutStreamingEngineWorkerHarness implements StreamingWorker private final GetWorkBudgetDistributor getWorkBudgetDistributor; private final GetWorkBudget totalGetWorkBudget; private final ThrottleTimer getWorkerMetadataThrottleTimer; - private final Supplier getWorkerMetadataStream; private final Function workCommitterFactory; private final ThrottlingGetDataMetricTracker getDataMetricTracker; private final ExecutorService windmillStreamManager; @@ -111,6 +110,10 @@ public final class FanOutStreamingEngineWorkerHarness implements StreamingWorker @GuardedBy("this") private boolean started; + /** Set once when {@link #start()} is called. */ + @GuardedBy("this") + private @Nullable GetWorkerMetadataStream getWorkerMetadataStream = null; + private FanOutStreamingEngineWorkerHarness( JobHeader jobHeader, GetWorkBudget totalGetWorkBudget, @@ -139,7 +142,6 @@ private FanOutStreamingEngineWorkerHarness( this.getWorkBudgetDistributor = getWorkBudgetDistributor; this.totalGetWorkBudget = totalGetWorkBudget; this.activeMetadataVersion = Long.MIN_VALUE; - this.getWorkerMetadataStream = Suppliers.memoize(createGetWorkerMetadataStream()::get); this.workCommitterFactory = workCommitterFactory; this.metadataLock = new Object(); } @@ -201,13 +203,16 @@ static FanOutStreamingEngineWorkerHarness forTesting( @Override public synchronized void start() { Preconditions.checkState(!started, "StreamingEngineClient cannot start twice."); - // Starts the stream, this value is memoized. - getWorkerMetadataStream.get(); + getWorkerMetadataStream = + streamFactory.createGetWorkerMetadataStream( + dispatcherClient.getWindmillMetadataServiceStubBlocking(), + getWorkerMetadataThrottleTimer, + this::consumeWorkerMetadata); started = true; } public ImmutableSet currentWindmillEndpoints() { - return connections.get().windmillConnections().keySet().stream() + return connections.get().windmillStreams().keySet().stream() .map(Endpoint::directEndpoint) .filter(Optional::isPresent) .map(Optional::get) @@ -239,26 +244,11 @@ private GetDataStream getGlobalDataStream(String globalDataKey) { @Override public synchronized void shutdown() { Preconditions.checkState(started, "StreamingEngineClient never started."); - getWorkerMetadataStream.get().halfClose(); + Preconditions.checkNotNull(getWorkerMetadataStream).halfClose(); workerMetadataConsumer.shutdownNow(); channelCachingStubFactory.shutdown(); } - @SuppressWarnings("methodref.receiver.bound") - private Supplier createGetWorkerMetadataStream( - @UnderInitialization FanOutStreamingEngineWorkerHarness this) { - // Checker Framework complains about reference to "this" in the constructor since the instance - // is "UnderInitialization" here, which we pass as a lambda to GetWorkerMetadataStream for - // processing new worker metadata. Supplier.get() is only called in start(), after we have - // constructed the FanOutStreamingEngineWorkerHarness. - return () -> - checkNotNull(streamFactory) - .createGetWorkerMetadataStream( - checkNotNull(dispatcherClient).getWindmillMetadataServiceStubBlocking(), - checkNotNull(getWorkerMetadataThrottleTimer), - this::consumeWorkerMetadata); - } - private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) { synchronized (metadataLock) { // Only process versions greater than what we currently have to prevent double processing of @@ -281,37 +271,31 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi } long previousMetadataVersion = activeMetadataVersion; - LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints); - ImmutableMap newWindmillConnections = - createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints()); - closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams()); - ImmutableMap newStreams = - createAndStartNewStreams(newWindmillConnections.values()).join(); + LOG.debug( + "Consuming new endpoints: {}. previous metadata version: {}, current metadata version: {}", + newWindmillEndpoints, + previousMetadataVersion, + activeMetadataVersion); + closeStaleStreams( + newWindmillEndpoints.windmillEndpoints(), connections.get().windmillStreams()); + ImmutableMap newStreams = + createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join(); StreamingEngineConnectionState newConnectionsState = StreamingEngineConnectionState.builder() - .setWindmillConnections(newWindmillConnections) .setWindmillStreams(newStreams) .setGlobalDataStreams( createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints())) .build(); - LOG.info( - "Setting new connections: {}. Previous connections: {}.", - newConnectionsState, - connections.get()); connections.set(newConnectionsState); getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget); activeMetadataVersion = newWindmillEndpoints.version(); - LOG.info( - "Consumed new endpoints. previous metadata version: {}, current metadata version: {}", - previousMetadataVersion, - activeMetadataVersion); } /** Close the streams that are no longer valid asynchronously. */ @SuppressWarnings("FutureReturnValueIgnored") private void closeStaleStreams( - Collection newWindmillConnections, - ImmutableMap currentStreams) { + Collection newWindmillConnections, + ImmutableMap currentStreams) { currentStreams.entrySet().stream() .filter( connectionAndStream -> !newWindmillConnections.contains(connectionAndStream.getKey())) @@ -334,27 +318,24 @@ private void closeStaleStreams( windmillStreamManager)); } - private synchronized CompletableFuture> - createAndStartNewStreams(Collection newWindmillConnections) { - ImmutableMap currentStreams = + private synchronized CompletableFuture> + createAndStartNewStreams(Collection newWindmillConnections) { + ImmutableMap currentStreams = connections.get().windmillStreams(); - CompletionStage>> - connectionAndSenderFuture = - MoreFutures.allAsList( - newWindmillConnections.stream() - .map( - connection -> - MoreFutures.supplyAsync( - () -> - Pair.of( - connection, - Optional.ofNullable(currentStreams.get(connection)) - .orElseGet( - () -> - createAndStartWindmillStreamSender( - connection))), - windmillStreamManager)) - .collect(Collectors.toList())); + CompletionStage>> connectionAndSenderFuture = + MoreFutures.allAsList( + newWindmillConnections.stream() + .map( + connection -> + MoreFutures.supplyAsync( + () -> + Pair.of( + connection, + Optional.ofNullable(currentStreams.get(connection)) + .orElseGet( + () -> createAndStartWindmillStreamSender(connection))), + windmillStreamManager)) + .collect(Collectors.toList())); return connectionAndSenderFuture .thenApply( @@ -384,23 +365,6 @@ StreamingEngineConnectionState getCurrentConnections() { return connections.get(); } - private synchronized ImmutableMap createNewWindmillConnections( - List newWindmillEndpoints) { - ImmutableMap currentConnections = - connections.get().windmillConnections(); - return newWindmillEndpoints.stream() - .collect( - toImmutableMap( - Function.identity(), - endpoint -> - // Reuse existing stubs if they exist. Optional.orElseGet only calls the - // supplier if the value is not present, preventing constructing expensive - // objects. - Optional.ofNullable(currentConnections.get(endpoint)) - .orElseGet( - () -> WindmillConnection.from(endpoint, this::createWindmillStub)))); - } - private ImmutableMap> createNewGlobalDataStreams( ImmutableMap newGlobalDataEndpoints) { ImmutableMap> currentGlobalDataStreams = @@ -421,19 +385,13 @@ private Supplier existingOrNewGetDataStreamFor( keyedEndpoint.getKey(), () -> streamFactory.createGetDataStream( - newOrExistingStubFor(keyedEndpoint.getValue()), new ThrottleTimer()))); - } - - private CloudWindmillServiceV1Alpha1Stub newOrExistingStubFor(Endpoint endpoint) { - return Optional.ofNullable(connections.get().windmillConnections().get(endpoint)) - .map(WindmillConnection::stub) - .orElseGet(() -> createWindmillStub(endpoint)); + createWindmillStub(keyedEndpoint.getValue()), new ThrottleTimer()))); } - private WindmillStreamSender createAndStartWindmillStreamSender(WindmillConnection connection) { + private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint connection) { WindmillStreamSender windmillStreamSender = WindmillStreamSender.create( - connection, + WindmillConnection.from(connection, this::createWindmillStub), GetWorkRequest.newBuilder() .setClientId(jobHeader.getClientId()) .setJobId(jobHeader.getJobId()) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingEngineConnectionState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingEngineConnectionState.java index 3c85ee6abe1f..48d20f4a80c5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingEngineConnectionState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingEngineConnectionState.java @@ -19,7 +19,6 @@ import com.google.auto.value.AutoValue; import java.util.function.Supplier; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillConnection; import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -36,25 +35,18 @@ abstract class StreamingEngineConnectionState { static Builder builder() { return new AutoValue_StreamingEngineConnectionState.Builder() - .setWindmillConnections(ImmutableMap.of()) .setWindmillStreams(ImmutableMap.of()) .setGlobalDataStreams(ImmutableMap.of()); } - abstract ImmutableMap windmillConnections(); - - abstract ImmutableMap windmillStreams(); + abstract ImmutableMap windmillStreams(); /** Mapping of GlobalDataIds and the direct GetDataStreams used fetch them. */ abstract ImmutableMap> globalDataStreams(); @AutoValue.Builder abstract static class Builder { - public abstract Builder setWindmillConnections( - ImmutableMap value); - - public abstract Builder setWindmillStreams( - ImmutableMap value); + public abstract Builder setWindmillStreams(ImmutableMap value); public abstract Builder setGlobalDataStreams( ImmutableMap> value); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java index 8932063e22c0..0ac9566d1921 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java @@ -162,11 +162,6 @@ public void setBudget(long items, long bytes) { } } - @Override - public GetWorkBudget remainingBudget() { - return started.get() ? getWorkStream.get().remainingBudget() : getWorkBudget.get(); - } - long getAndResetThrottleTime() { return streamingEngineThrottleTimers.getAndResetThrottleTime(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java index c38824ae42c9..8c1ceada741a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java @@ -17,8 +17,8 @@ */ package org.apache.beam.runners.dataflow.worker.windmill; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet; import com.google.auto.value.AutoValue; import java.net.Inet6Address; @@ -27,8 +27,8 @@ import java.util.Map; import java.util.Optional; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress.AuthenticatedGcpServiceAddress; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,12 +53,12 @@ public static WindmillEndpoints from( endpoint.getValue(), workerMetadataResponseProto.getExternalEndpoint()))); - ImmutableList windmillServers = + ImmutableSet windmillServers = workerMetadataResponseProto.getWorkEndpointsList().stream() .map( endpointProto -> Endpoint.from(endpointProto, workerMetadataResponseProto.getExternalEndpoint())) - .collect(toImmutableList()); + .collect(toImmutableSet()); return WindmillEndpoints.builder() .setVersion(workerMetadataResponseProto.getMetadataVersion()) @@ -142,7 +142,7 @@ private static Optional tryParseDirectEndpointIntoIpV6Address( * Windmill servers. Returns a list of endpoints used to communicate with the corresponding * Windmill servers. */ - public abstract ImmutableList windmillEndpoints(); + public abstract ImmutableSet windmillEndpoints(); /** * Representation of an endpoint in {@link Windmill.WorkerMetadataResponse.Endpoint} proto with @@ -214,9 +214,9 @@ public abstract Builder setGlobalDataEndpoints( ImmutableMap globalDataServers); public abstract Builder setWindmillEndpoints( - ImmutableList windmillServers); + ImmutableSet windmillServers); - abstract ImmutableList.Builder windmillEndpointsBuilder(); + abstract ImmutableSet.Builder windmillEndpointsBuilder(); public final Builder addWindmillEndpoint(WindmillEndpoints.Endpoint endpoint) { windmillEndpointsBuilder().add(endpoint); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java index fa306ce0d93e..17c5226fc01e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java @@ -185,25 +185,27 @@ private StreamObserver requestObserver() { } /** Send a request to the server. */ - protected final synchronized void send(RequestT request) { - if (isShutdown()) { - return; - } - - if (streamClosed.get()) { - throw new IllegalStateException("Send called on a client closed stream."); - } - - try { - lastSendTimeMs.set(Instant.now().getMillis()); - requestObserver.onNext(request); - } catch (StreamObserverCancelledException e) { + protected final void send(RequestT request) { + synchronized (this) { if (isShutdown()) { - logger.debug("Stream was closed or shutdown during send.", e); return; } - requestObserver.onError(e); + if (streamClosed.get()) { + throw new IllegalStateException("Send called on a client closed stream."); + } + + try { + lastSendTimeMs.set(Instant.now().getMillis()); + requestObserver.onNext(request); + } catch (StreamObserverCancelledException e) { + if (isShutdown()) { + logger.debug("Stream was closed or shutdown during send.", e); + return; + } + + requestObserver.onError(e); + } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java index 526575c94e32..4a4a3fb4d231 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java @@ -57,7 +57,7 @@ public interface WindmillStream { @ThreadSafe interface GetWorkStream extends WindmillStream { /** Adjusts the {@link GetWorkBudget} for the stream. */ - void setBudget(long itemsDelta, long bytesDelta); + void setBudget(long newItems, long newBytes); default void setBudget(GetWorkBudget newBudget) { setBudget(newBudget.items(), newBudget.bytes()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java index 74d8814d20d5..5be13d9cc5a2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java @@ -286,10 +286,10 @@ protected void startThrottleTimer() { } @Override - public void setBudget(long items, long bytes) { + public void setBudget(long newItems, long newBytes) { GetWorkBudget currentMaxGetWorkBudget = maxGetWorkBudget.updateAndGet( - ignored -> GetWorkBudget.builder().setItems(items).setBytes(bytes).build()); + ignored -> GetWorkBudget.builder().setItems(newItems).setBytes(newBytes).build()); GetWorkBudget extension = budgetTracker.computeBudgetExtension(currentMaxGetWorkBudget); sendRequestExtension(extension); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java index a5f2685a2e89..1cb77a8a9551 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java @@ -194,7 +194,7 @@ protected void startThrottleTimer() { } @Override - public void setBudget(long itemsDelta, long bytesDelta) { + public void setBudget(long newItems, long newBytes) { // no-op } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java index fb6c27c2b8ba..7f61d9b3b886 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java @@ -97,7 +97,6 @@ public static GrpcGetWorkerMetadataStream create( getWorkerMetadataThrottleTimer, serverMappingUpdater); getWorkerMetadataStream.startStream(); - LOG.info("Started GetWorkerMetadataStream. {}", getWorkerMetadataStream); return getWorkerMetadataStream; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java index c7f02552cfc3..38419e0fb034 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributor.java @@ -17,16 +17,11 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.budget; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.DoubleMath.roundToLong; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath.divide; import java.math.RoundingMode; -import java.util.function.Function; -import java.util.function.Supplier; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,20 +29,6 @@ @Internal final class EvenGetWorkBudgetDistributor implements GetWorkBudgetDistributor { private static final Logger LOG = LoggerFactory.getLogger(EvenGetWorkBudgetDistributor.class); - private final Supplier activeWorkBudgetSupplier; - private final boolean shouldConsiderRemainingBudget; - - EvenGetWorkBudgetDistributor( - Supplier activeWorkBudgetSupplier, boolean shouldConsiderRemainingBudget) { - this.activeWorkBudgetSupplier = activeWorkBudgetSupplier; - this.shouldConsiderRemainingBudget = shouldConsiderRemainingBudget; - } - - private static boolean isBelowFiftyPercentOfTarget( - GetWorkBudget remaining, GetWorkBudget target) { - return remaining.items() < roundToLong(target.items() * 0.5, RoundingMode.CEILING) - || remaining.bytes() < roundToLong(target.bytes() * 0.5, RoundingMode.CEILING); - } @Override public void distributeBudget( @@ -62,39 +43,17 @@ public void distributeBudget( return; } - computeDesiredBudgets(budgetSpenders, getWorkBudget) - .forEach( - (getWorkBudgetSpender, desiredBudget) -> { - if (shouldConsiderRemainingBudget) { - GetWorkBudget remaining = getWorkBudgetSpender.remainingBudget(); - if (isBelowFiftyPercentOfTarget(remaining, desiredBudget)) { - GetWorkBudget adjustment = desiredBudget.subtract(remaining); - getWorkBudgetSpender.setBudget(adjustment); - } - } else { - getWorkBudgetSpender.setBudget(desiredBudget); - } - }); + GetWorkBudget budgetPerStream = computeDesiredBudgets(budgetSpenders, getWorkBudget); + budgetSpenders.forEach(getWorkBudgetSpender -> getWorkBudgetSpender.setBudget(budgetPerStream)); } - private ImmutableMap computeDesiredBudgets( + private GetWorkBudget computeDesiredBudgets( ImmutableCollection streams, GetWorkBudget totalGetWorkBudget) { - GetWorkBudget activeWorkBudget = activeWorkBudgetSupplier.get(); // TODO: Fix possibly non-deterministic handing out of budgets. // Rounding up here will drift upwards over the lifetime of the streams. - GetWorkBudget budgetPerStream = - GetWorkBudget.builder() - .setItems( - divide( - totalGetWorkBudget.items() - activeWorkBudget.items(), - streams.size(), - RoundingMode.CEILING)) - .setBytes( - divide( - totalGetWorkBudget.bytes() - activeWorkBudget.bytes(), - streams.size(), - RoundingMode.CEILING)) - .build(); - return streams.stream().collect(toImmutableMap(Function.identity(), unused -> budgetPerStream)); + return GetWorkBudget.builder() + .setItems(divide(totalGetWorkBudget.items(), streams.size(), RoundingMode.CEILING)) + .setBytes(divide(totalGetWorkBudget.bytes(), streams.size(), RoundingMode.CEILING)) + .build(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetDistributors.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetDistributors.java index 46c91182509f..2013c9ff1cb7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetDistributors.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetDistributors.java @@ -17,17 +17,11 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.budget; -import java.util.function.Supplier; import org.apache.beam.sdk.annotations.Internal; @Internal public final class GetWorkBudgetDistributors { - public static GetWorkBudgetDistributor distributeEvenly( - Supplier activeWorkBudgetSupplier) { - return new EvenGetWorkBudgetDistributor(activeWorkBudgetSupplier, true); - } - public static GetWorkBudgetDistributor distributeEvenly() { - return new EvenGetWorkBudgetDistributor(GetWorkBudget::noBudget, false); + return new EvenGetWorkBudgetDistributor(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetSpender.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetSpender.java index 9fbf66d0ada2..a2f7774389ca 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetSpender.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetSpender.java @@ -27,6 +27,4 @@ public interface GetWorkBudgetSpender { default void setBudget(GetWorkBudget adjustment) { setBudget(adjustment.items(), adjustment.bytes()); } - - GetWorkBudget remainingBudget(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java index f2edeb2b8c2e..da29528d3151 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java @@ -245,7 +245,7 @@ public void halfClose() { } @Override - public void setBudget(long itemsDelta, long bytesDelta) { + public void setBudget(long newItems, long newBytes) { // no-op. } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java index 5518e6f21455..f345d2e3592b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java @@ -220,11 +220,10 @@ public void testStreamsStartCorrectly() throws InterruptedException { StreamingEngineConnectionState currentConnections = fanOutStreamingEngineWorkProvider.getCurrentConnections(); - assertEquals(2, currentConnections.windmillConnections().size()); assertEquals(2, currentConnections.windmillStreams().size()); Set workerTokens = - currentConnections.windmillConnections().values().stream() - .map(WindmillConnection::backendWorkerToken) + currentConnections.windmillStreams().keySet().stream() + .map(endpoint -> endpoint.workerToken().orElseThrow(IllegalStateException::new)) .collect(Collectors.toSet()); assertTrue(workerTokens.contains(workerToken)); @@ -294,12 +293,11 @@ public void testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers() waitForWorkerMetadataToBeConsumed(getWorkBudgetDistributor); StreamingEngineConnectionState currentConnections = fanOutStreamingEngineWorkProvider.getCurrentConnections(); - assertEquals(1, currentConnections.windmillConnections().size()); assertEquals(1, currentConnections.windmillStreams().size()); Set workerTokens = - fanOutStreamingEngineWorkProvider.getCurrentConnections().windmillConnections().values() + fanOutStreamingEngineWorkProvider.getCurrentConnections().windmillStreams().keySet() .stream() - .map(WindmillConnection::backendWorkerToken) + .map(endpoint -> endpoint.workerToken().orElseThrow(IllegalStateException::new)) .collect(Collectors.toSet()); assertFalse(workerTokens.contains(workerToken)); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java index 95756c6a88b5..fbc76b4b14b5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java @@ -17,9 +17,7 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.budget; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -40,161 +38,38 @@ public class EvenGetWorkBudgetDistributorTest { @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); @Rule public transient Timeout globalTimeout = Timeout.seconds(600); - private static GetWorkBudgetDistributor createBudgetDistributor(GetWorkBudget activeWorkBudget) { - return GetWorkBudgetDistributors.distributeEvenly(() -> activeWorkBudget); - } - - private static GetWorkBudgetDistributor createBudgetDistributor(long activeWorkItemsAndBytes) { - return createBudgetDistributor( - GetWorkBudget.builder() - .setItems(activeWorkItemsAndBytes) - .setBytes(activeWorkItemsAndBytes) - .build()); - } - - private static GetWorkBudgetSpender createGetWorkBudgetOwnerWithRemainingBudgetOf( - GetWorkBudget getWorkBudget) { + private static GetWorkBudgetSpender createGetWorkBudgetOwner() { + // Lambdas are final and cannot be spied. return spy( new GetWorkBudgetSpender() { @Override public void setBudget(long items, long bytes) {} - - @Override - public GetWorkBudget remainingBudget() { - return getWorkBudget; - } }); } @Test public void testDistributeBudget_doesNothingWhenPassedInStreamsEmpty() { - createBudgetDistributor(1L) + GetWorkBudgetDistributors.distributeEvenly() .distributeBudget( ImmutableList.of(), GetWorkBudget.builder().setItems(10L).setBytes(10L).build()); } @Test public void testDistributeBudget_doesNothingWithNoBudget() { - GetWorkBudgetSpender getWorkBudgetSpender = - spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(GetWorkBudget.noBudget())); - createBudgetDistributor(1L) + GetWorkBudgetSpender getWorkBudgetSpender = spy(createGetWorkBudgetOwner()); + GetWorkBudgetDistributors.distributeEvenly() .distributeBudget(ImmutableList.of(getWorkBudgetSpender), GetWorkBudget.noBudget()); verifyNoInteractions(getWorkBudgetSpender); } - @Test - public void testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHighNoActiveWork() { - GetWorkBudgetSpender getWorkBudgetSpender = - spy( - createGetWorkBudgetOwnerWithRemainingBudgetOf( - GetWorkBudget.builder().setItems(10L).setBytes(10L).build())); - createBudgetDistributor(0L) - .distributeBudget( - ImmutableList.of(getWorkBudgetSpender), - GetWorkBudget.builder().setItems(10L).setBytes(10L).build()); - - verify(getWorkBudgetSpender, never()).setBudget(anyLong(), anyLong()); - } - - @Test - public void - testDistributeBudget_doesNotAdjustStreamBudgetWhenRemainingBudgetHighWithActiveWork() { - GetWorkBudgetSpender getWorkBudgetSpender = - spy( - createGetWorkBudgetOwnerWithRemainingBudgetOf( - GetWorkBudget.builder().setItems(5L).setBytes(5L).build())); - createBudgetDistributor(10L) - .distributeBudget( - ImmutableList.of(getWorkBudgetSpender), - GetWorkBudget.builder().setItems(20L).setBytes(20L).build()); - - verify(getWorkBudgetSpender, never()).setBudget(anyLong(), anyLong()); - } - - @Test - public void - testDistributeBudget_adjustsStreamBudgetWhenRemainingItemBudgetTooLowWithNoActiveWork() { - GetWorkBudget streamRemainingBudget = - GetWorkBudget.builder().setItems(1L).setBytes(10L).build(); - GetWorkBudget totalGetWorkBudget = GetWorkBudget.builder().setItems(10L).setBytes(10L).build(); - GetWorkBudgetSpender getWorkBudgetSpender = - spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget)); - createBudgetDistributor(0L) - .distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget); - - verify(getWorkBudgetSpender, times(1)) - .setBudget( - eq(totalGetWorkBudget.items() - streamRemainingBudget.items()), - eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes())); - } - - @Test - public void - testDistributeBudget_adjustsStreamBudgetWhenRemainingItemBudgetTooLowWithActiveWork() { - GetWorkBudget streamRemainingBudget = - GetWorkBudget.builder().setItems(1L).setBytes(10L).build(); - GetWorkBudget totalGetWorkBudget = GetWorkBudget.builder().setItems(10L).setBytes(10L).build(); - long activeWorkItemsAndBytes = 2L; - GetWorkBudgetSpender getWorkBudgetSpender = - spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget)); - createBudgetDistributor(activeWorkItemsAndBytes) - .distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget); - - verify(getWorkBudgetSpender, times(1)) - .setBudget( - eq( - totalGetWorkBudget.items() - - streamRemainingBudget.items() - - activeWorkItemsAndBytes), - eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes())); - } - - @Test - public void testDistributeBudget_adjustsStreamBudgetWhenRemainingByteBudgetTooLowNoActiveWork() { - GetWorkBudget streamRemainingBudget = - GetWorkBudget.builder().setItems(10L).setBytes(1L).build(); - GetWorkBudget totalGetWorkBudget = GetWorkBudget.builder().setItems(10L).setBytes(10L).build(); - GetWorkBudgetSpender getWorkBudgetSpender = - spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget)); - createBudgetDistributor(0L) - .distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget); - - verify(getWorkBudgetSpender, times(1)) - .setBudget( - eq(totalGetWorkBudget.items() - streamRemainingBudget.items()), - eq(totalGetWorkBudget.bytes() - streamRemainingBudget.bytes())); - } - - @Test - public void - testDistributeBudget_adjustsStreamBudgetWhenRemainingByteBudgetTooLowWithActiveWork() { - GetWorkBudget streamRemainingBudget = - GetWorkBudget.builder().setItems(10L).setBytes(1L).build(); - GetWorkBudget totalGetWorkBudget = GetWorkBudget.builder().setItems(10L).setBytes(10L).build(); - long activeWorkItemsAndBytes = 2L; - - GetWorkBudgetSpender getWorkBudgetSpender = - spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(streamRemainingBudget)); - createBudgetDistributor(activeWorkItemsAndBytes) - .distributeBudget(ImmutableList.of(getWorkBudgetSpender), totalGetWorkBudget); - - verify(getWorkBudgetSpender, times(1)) - .setBudget( - eq(totalGetWorkBudget.items() - streamRemainingBudget.items()), - eq( - totalGetWorkBudget.bytes() - - streamRemainingBudget.bytes() - - activeWorkItemsAndBytes)); - } - @Test public void testDistributeBudget_distributesBudgetEvenlyIfPossible() { long totalItemsAndBytes = 10L; List streams = new ArrayList<>(); for (int i = 0; i < totalItemsAndBytes; i++) { - streams.add(spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(GetWorkBudget.noBudget()))); + streams.add(spy(createGetWorkBudgetOwner())); } - createBudgetDistributor(0L) + GetWorkBudgetDistributors.distributeEvenly() .distributeBudget( ImmutableList.copyOf(streams), GetWorkBudget.builder() @@ -214,9 +89,9 @@ public void testDistributeBudget_distributesFairlyWhenNotEven() { long totalItemsAndBytes = 10L; List streams = new ArrayList<>(); for (int i = 0; i < 3; i++) { - streams.add(spy(createGetWorkBudgetOwnerWithRemainingBudgetOf(GetWorkBudget.noBudget()))); + streams.add(spy(createGetWorkBudgetOwner())); } - createBudgetDistributor(0L) + GetWorkBudgetDistributors.distributeEvenly() .distributeBudget( ImmutableList.copyOf(streams), GetWorkBudget.builder() @@ -232,18 +107,11 @@ public void testDistributeBudget_distributesFairlyWhenNotEven() { } @Test - public void - testDistributeBudget_adjustBudgetIgnoringRemainingBudget_shouldIgnoreRemainingBudget() { + public void testDistributeBudget_distributesBudgetEvenly() { long totalItemsAndBytes = 10L; List streams = new ArrayList<>(); for (int i = 0; i < totalItemsAndBytes; i++) { - streams.add( - spy( - createGetWorkBudgetOwnerWithRemainingBudgetOf( - GetWorkBudget.builder() - .setItems(totalItemsAndBytes) - .setBytes(totalItemsAndBytes) - .build()))); + streams.add(spy(createGetWorkBudgetOwner())); } GetWorkBudgetDistributors.distributeEvenly()