diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java index 78ea34503e54..f0e840807c79 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java @@ -110,6 +110,13 @@ enum LogLevel { void setLogMdc(boolean value); + /** This option controls whether logging will be redirected through the FnApi. */ + @Description("Controls whether logging will be redirected through the FnApi.") + @Default.Boolean(true) + boolean getEnableLogViaFnApi(); + + void setEnableLogViaFnApi(boolean enableLogViaFnApi); + /** * Size (in MB) of each grouping table used to pre-combine elements. Larger values may reduce the * amount of data shuffled. If unset, defaults to 100 MB. diff --git a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java index b9e4b20db00b..745e1f078646 100644 --- a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java +++ b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.fn.harness.logging.BeamFnLoggingClient; import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC; +import org.apache.beam.fn.harness.logging.LoggingClient; +import org.apache.beam.fn.harness.logging.LoggingClientFactory; import org.apache.beam.fn.harness.logging.QuotaEvent; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc; @@ -80,7 +82,7 @@ public void onCompleted() { /** Setup a simple logging service and configure the {@link BeamFnLoggingClient}. */ @State(Scope.Benchmark) public static class ManageLoggingClientAndService { - public final BeamFnLoggingClient loggingClient; + public final LoggingClient loggingClient; public final CallCountLoggingService loggingService; public final Server server; @@ -98,7 +100,7 @@ public ManageLoggingClientAndService() { .build(); server.start(); loggingClient = - BeamFnLoggingClient.createAndStart( + LoggingClientFactory.createAndStart( PipelineOptionsFactory.create(), apiServiceDescriptor, managedChannelFactory::forDescriptor); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 9df9f12bc52b..3c8784d0ee42 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -39,7 +39,8 @@ import org.apache.beam.fn.harness.control.ProcessBundleHandler; import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient; import org.apache.beam.fn.harness.debug.DataSampler; -import org.apache.beam.fn.harness.logging.BeamFnLoggingClient; +import org.apache.beam.fn.harness.logging.LoggingClient; +import org.apache.beam.fn.harness.logging.LoggingClientFactory; import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache; import org.apache.beam.fn.harness.status.BeamFnStatusClient; import org.apache.beam.fn.harness.stream.HarnessStreamObserverFactories; @@ -62,6 +63,7 @@ import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.SdkHarnessOptions; import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.TextFormat; @@ -283,8 +285,8 @@ public static void main( // The logging client variable is not used per se, but during its lifetime (until close()) it // intercepts logging and sends it to the logging service. - try (BeamFnLoggingClient logging = - BeamFnLoggingClient.createAndStart( + try (LoggingClient logging = + LoggingClientFactory.createAndStart( options, loggingApiServiceDescriptor, channelFactory::forDescriptor)) { LOG.info("Fn Harness started"); // Register standard file systems. @@ -410,7 +412,11 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) { outboundObserverFactory, executorService, handlers); - CompletableFuture.anyOf(control.terminationFuture(), logging.terminationFuture()).get(); + if (options.as(SdkHarnessOptions.class).getEnableLogViaFnApi()) { + CompletableFuture.anyOf(control.terminationFuture(), logging.terminationFuture()).get(); + } else { + control.terminationFuture().get(); + } if (beamFnStatusClient != null) { beamFnStatusClient.close(); } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java index 7812d8c0bc30..112104e4d251 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java @@ -68,7 +68,7 @@ /** * Configures {@link java.util.logging} to send all {@link LogRecord}s via the Beam Fn Logging API. */ -public class BeamFnLoggingClient implements AutoCloseable { +public class BeamFnLoggingClient implements LoggingClient { private static final String ROOT_LOGGER_NAME = ""; private static final ImmutableMap LOG_LEVEL_MAP = ImmutableMap.builder() @@ -119,7 +119,7 @@ public class BeamFnLoggingClient implements AutoCloseable { */ private @Nullable Thread logEntryHandlerThread = null; - public static BeamFnLoggingClient createAndStart( + static BeamFnLoggingClient createAndStart( PipelineOptions options, Endpoints.ApiServiceDescriptor apiServiceDescriptor, Function channelFactory) { @@ -383,6 +383,7 @@ void flushFinalLogs(@UnderInitialization BeamFnLoggingClient this) { } } + @Override public CompletableFuture terminationFuture() { checkNotNull(bufferedLogConsumer, "BeamFnLoggingClient not fully started"); return bufferedLogConsumer; diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/LoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/LoggingClient.java new file mode 100644 index 000000000000..3c1972ec643d --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/LoggingClient.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.logging; + +import java.util.concurrent.CompletableFuture; + +public interface LoggingClient extends AutoCloseable { + + CompletableFuture terminationFuture(); +} diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/LoggingClientFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/LoggingClientFactory.java new file mode 100644 index 000000000000..f61b8d3c4ba3 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/LoggingClientFactory.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.logging; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.SdkHarnessOptions; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; + +/** + * A factory for {@link LoggingClient}s. Provides {@link BeamFnLoggingClient} if the logging service + * is enabled, otherwise provides a no-op client. + */ +public class LoggingClientFactory { + + private LoggingClientFactory() {} + + /** + * A factory for {@link LoggingClient}s. Provides {@link BeamFnLoggingClient} if the logging + * service is enabled, otherwise provides a no-op client. + */ + public static LoggingClient createAndStart( + PipelineOptions options, + Endpoints.ApiServiceDescriptor apiServiceDescriptor, + Function channelFactory) { + if (options.as(SdkHarnessOptions.class).getEnableLogViaFnApi()) { + return BeamFnLoggingClient.createAndStart(options, apiServiceDescriptor, channelFactory); + } else { + return new NoOpLoggingClient(); + } + } + + static final class NoOpLoggingClient implements LoggingClient { + @Override + public CompletableFuture terminationFuture() { + return CompletableFuture.completedFuture(new Object()); + } + + @Override + public void close() throws Exception {} + } +} diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java index 245c87f3e194..96e68586b3a5 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java @@ -47,8 +47,9 @@ import org.apache.beam.fn.harness.control.ExecutionStateSampler; import org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionStateTracker; import org.apache.beam.fn.harness.debug.DataSampler; -import org.apache.beam.fn.harness.logging.BeamFnLoggingClient; import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC; +import org.apache.beam.fn.harness.logging.LoggingClient; +import org.apache.beam.fn.harness.logging.LoggingClientFactory; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor; import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc; @@ -647,8 +648,8 @@ public StreamObserver logging( // Start the test within the logging context. This reroutes logging through to the boiler-plate // that was set up // earlier. - try (BeamFnLoggingClient ignored = - BeamFnLoggingClient.createAndStart( + try (LoggingClient ignored = + LoggingClientFactory.createAndStart( PipelineOptionsFactory.create(), apiServiceDescriptor, (Endpoints.ApiServiceDescriptor descriptor) -> channel)) {