From b34c014888b69028244dab4a2c1b0f292a2563ec Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 11 Jul 2024 10:24:23 -0400 Subject: [PATCH] Dedup SerializableSupplier (#31829) --- .../sdk/testing/SerializableMatchers.java | 14 ++-------- .../dofn/ReadChangeStreamPartitionDoFn.java | 1 + .../dofn/SerializableSupplier.java | 25 ----------------- .../internal/SubscriptionPartitionLoader.java | 2 +- .../ReadChangeStreamPartitionDoFnTest.java | 1 + .../pubsublite/internal/FakeSerializable.java | 2 +- .../SubscriptionPartitionLoaderTest.java | 2 +- .../org/apache/beam/sdk/io/jms/CommonJms.java | 5 ++-- .../apache/beam/io/requestresponse/Call.java | 1 + .../DefaultSerializableBackoffSupplier.java | 1 + .../io/requestresponse/RequestResponseIO.java | 1 + .../requestresponse/SerializableSupplier.java | 28 ------------------- .../WindowedCallShouldBackoff.java | 1 + .../RequestResponseIOTest.java | 1 + .../WindowedCallShouldBackoffTest.java | 1 + 15 files changed, 15 insertions(+), 71 deletions(-) delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/SerializableSupplier.java delete mode 100644 sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/SerializableSupplier.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java index 749d95960263..ad3506045995 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.SerializableSupplier; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; @@ -734,17 +735,6 @@ public static SerializableMatcher fromSupplier(SerializableSupplier(supplier); } - /** - * Supplies values of type {@code T}, and is serializable. Thus, even if {@code T} is not - * serializable, the supplier can be serialized and provide a {@code T} wherever it is - * deserialized. - * - * @param the type of value supplied. - */ - public interface SerializableSupplier extends Serializable { - T get(); - } - /** * Since the delegate {@link Matcher} is not generally serializable, instead this takes a nullary * SerializableFunction to return such a matcher. @@ -752,7 +742,7 @@ public interface SerializableSupplier extends Serializable { private static class SerializableMatcherFromSupplier extends BaseMatcher implements SerializableMatcher { - private SerializableSupplier> supplier; + private final SerializableSupplier> supplier; public SerializableMatcherFromSupplier(SerializableSupplier> supplier) { this.supplier = supplier; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java index 826710d9c588..c90ec97bfe35 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.apache.beam.sdk.util.SerializableSupplier; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/SerializableSupplier.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/SerializableSupplier.java deleted file mode 100644 index 2b09adbc75dd..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/SerializableSupplier.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn; - -import java.io.Serializable; -import java.util.function.Supplier; - -/** Union of Supplier and Serializable interfaces to allow serialized supplier for testing. */ -@FunctionalInterface -interface SerializableSupplier extends Supplier, Serializable {} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java index 589af5236de4..60ba6e0c65c0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoader.java @@ -24,7 +24,6 @@ import com.google.cloud.pubsublite.PartitionLookupUtils; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.TopicPath; -import org.apache.beam.sdk.testing.SerializableMatchers.SerializableSupplier; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.PTransform; @@ -34,6 +33,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators; +import org.apache.beam.sdk.util.SerializableSupplier; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java index d4f9da768088..582d9e709c9a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java @@ -48,6 +48,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.util.SerializableSupplier; import org.apache.beam.sdk.values.KV; import org.joda.time.Duration; import org.joda.time.Instant; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/FakeSerializable.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/FakeSerializable.java index 13d44ddfeebf..ded551bb9b05 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/FakeSerializable.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/FakeSerializable.java @@ -20,7 +20,7 @@ import java.io.Serializable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.beam.sdk.testing.SerializableMatchers.SerializableSupplier; +import org.apache.beam.sdk.util.SerializableSupplier; /** * A FakeSerializable hides a non-serializable object in a static map and returns a handle into the diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoaderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoaderTest.java index 31b1ad34179c..5d4d99beaeab 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoaderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionLoaderTest.java @@ -25,9 +25,9 @@ import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.TopicPath; import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.SerializableMatchers.SerializableSupplier; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.SerializableSupplier; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; import org.junit.Before; diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/CommonJms.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/CommonJms.java index c0f8cf258d21..1d1245e6877d 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/CommonJms.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/CommonJms.java @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.function.Supplier; import javax.jms.BytesMessage; import javax.jms.ConnectionFactory; import javax.jms.Message; @@ -34,8 +35,8 @@ import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.amqp.AmqpTransportFactory; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.SerializableSupplier; import org.apache.beam.sdk.util.ThrowingSupplier; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; /** * A common test fixture to create a broker and connection factories for {@link JmsIOIT} & {@link @@ -47,8 +48,6 @@ public class CommonJms implements Serializable { // convenient typedefs and a helper conversion functions interface ThrowingSerializableSupplier extends ThrowingSupplier, Serializable {} - private interface SerializableSupplier extends Serializable, Supplier {} - private static SerializableSupplier toSerializableSupplier( ThrowingSerializableSupplier throwingSerializableSupplier) { return () -> { diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java index b6941f8fcbb6..f9c1a23e64fe 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.SerializableSupplier; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.values.PCollection; diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/DefaultSerializableBackoffSupplier.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/DefaultSerializableBackoffSupplier.java index 89e9400854d7..b92bce53438f 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/DefaultSerializableBackoffSupplier.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/DefaultSerializableBackoffSupplier.java @@ -19,6 +19,7 @@ import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.SerializableSupplier; import org.apache.beam.sdk.util.SerializableUtils; /** diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java index 9c5c6128c29a..1bac1dd07386 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.SerializableSupplier; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.values.KV; diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/SerializableSupplier.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/SerializableSupplier.java deleted file mode 100644 index f9ebaf815605..000000000000 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/SerializableSupplier.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.io.requestresponse; - -import java.io.Serializable; -import java.util.function.Supplier; - -/** - * A union of a {@link Supplier} and {@link Serializable}, enabling configuration with {@link T} - * types that are not {@link Serializable}. - */ -@FunctionalInterface -public interface SerializableSupplier extends Supplier, Serializable {} diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoff.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoff.java index fbbafeb906f7..ab078154b8c2 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoff.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoff.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import org.apache.beam.sdk.util.SerializableSupplier; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.joda.time.Duration; import org.joda.time.Instant; diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java index f54d3e595b03..4cbadf237336 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.SerializableSupplier; import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoffTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoffTest.java index 5316f251200a..18d452451838 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoffTest.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/WindowedCallShouldBackoffTest.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.testing.SerializableMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import org.apache.beam.sdk.util.SerializableSupplier; import org.joda.time.Duration; import org.junit.Test;