diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java index 78ea34503e54..26d7bab323e1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java @@ -110,6 +110,13 @@ enum LogLevel { void setLogMdc(boolean value); + /** This option controls whether logging will be redirected through the logging service. */ + @Description("Controls whether logging will be redirected through the logging service.") + @Default.Boolean(true) + boolean getEnableLoggingService(); + + void setEnableLoggingService(boolean enableLoggingService); + /** * Size (in MB) of each grouping table used to pre-combine elements. Larger values may reduce the * amount of data shuffled. If unset, defaults to 100 MB. diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 9df9f12bc52b..46dc2d0779dc 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -39,7 +39,8 @@ import org.apache.beam.fn.harness.control.ProcessBundleHandler; import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient; import org.apache.beam.fn.harness.debug.DataSampler; -import org.apache.beam.fn.harness.logging.BeamFnLoggingClient; +import org.apache.beam.fn.harness.logging.LoggingClient; +import org.apache.beam.fn.harness.logging.LoggingClientFactory; import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache; import org.apache.beam.fn.harness.status.BeamFnStatusClient; import org.apache.beam.fn.harness.stream.HarnessStreamObserverFactories; @@ -283,8 +284,8 @@ public static void main( // The logging client variable is not used per se, but during its lifetime (until close()) it // intercepts logging and sends it to the logging service. - try (BeamFnLoggingClient logging = - BeamFnLoggingClient.createAndStart( + try (LoggingClient logging = + LoggingClientFactory.createAndStart( options, loggingApiServiceDescriptor, channelFactory::forDescriptor)) { LOG.info("Fn Harness started"); // Register standard file systems. @@ -410,7 +411,7 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) { outboundObserverFactory, executorService, handlers); - CompletableFuture.anyOf(control.terminationFuture(), logging.terminationFuture()).get(); + CompletableFuture.allOf(control.terminationFuture(), logging.terminationFuture()).get(); if (beamFnStatusClient != null) { beamFnStatusClient.close(); } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java index 7812d8c0bc30..112104e4d251 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java @@ -68,7 +68,7 @@ /** * Configures {@link java.util.logging} to send all {@link LogRecord}s via the Beam Fn Logging API. */ -public class BeamFnLoggingClient implements AutoCloseable { +public class BeamFnLoggingClient implements LoggingClient { private static final String ROOT_LOGGER_NAME = ""; private static final ImmutableMap LOG_LEVEL_MAP = ImmutableMap.builder() @@ -119,7 +119,7 @@ public class BeamFnLoggingClient implements AutoCloseable { */ private @Nullable Thread logEntryHandlerThread = null; - public static BeamFnLoggingClient createAndStart( + static BeamFnLoggingClient createAndStart( PipelineOptions options, Endpoints.ApiServiceDescriptor apiServiceDescriptor, Function channelFactory) { @@ -383,6 +383,7 @@ void flushFinalLogs(@UnderInitialization BeamFnLoggingClient this) { } } + @Override public CompletableFuture terminationFuture() { checkNotNull(bufferedLogConsumer, "BeamFnLoggingClient not fully started"); return bufferedLogConsumer; diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/LoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/LoggingClient.java new file mode 100644 index 000000000000..9ab401567de1 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/LoggingClient.java @@ -0,0 +1,8 @@ +package org.apache.beam.fn.harness.logging; + +import java.util.concurrent.CompletableFuture; + +public interface LoggingClient extends AutoCloseable { + + CompletableFuture terminationFuture(); +} diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/LoggingClientFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/LoggingClientFactory.java new file mode 100644 index 000000000000..b129d0988f97 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/LoggingClientFactory.java @@ -0,0 +1,42 @@ +package org.apache.beam.fn.harness.logging; + +import io.grpc.ManagedChannel; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.SdkHarnessOptions; + +/** + * A factory for {@link LoggingClient}s. Provides {@link BeamFnLoggingClient} if the logging service + * is enabled, otherwise provides a no-op client. + */ +public class LoggingClientFactory { + + private LoggingClientFactory() {} + + /** + * A factory for {@link LoggingClient}s. Provides {@link BeamFnLoggingClient} if the logging + * service is enabled, otherwise provides a no-op client. + */ + public static LoggingClient createAndStart( + PipelineOptions options, + Endpoints.ApiServiceDescriptor apiServiceDescriptor, + Function channelFactory) { + if (options.as(SdkHarnessOptions.class).getEnableLoggingService()) { + return BeamFnLoggingClient.createAndStart(options, apiServiceDescriptor, channelFactory); + } else { + return new NoOpLoggingClient(); + } + } + + static final class NoOpLoggingClient implements LoggingClient { + @Override + public CompletableFuture terminationFuture() { + return CompletableFuture.completedFuture(new Object()); + } + + @Override + public void close() throws Exception {} + } +} diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java index 245c87f3e194..5b32614fb2b6 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java @@ -49,6 +49,7 @@ import org.apache.beam.fn.harness.debug.DataSampler; import org.apache.beam.fn.harness.logging.BeamFnLoggingClient; import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC; +import org.apache.beam.fn.harness.logging.LoggingClientFactory; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor; import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc; @@ -648,7 +649,7 @@ public StreamObserver logging( // that was set up // earlier. try (BeamFnLoggingClient ignored = - BeamFnLoggingClient.createAndStart( + LoggingClientFactory.createAndStart( PipelineOptionsFactory.create(), apiServiceDescriptor, (Endpoints.ApiServiceDescriptor descriptor) -> channel)) {