From d8c3ede4f5f8ba21e3d5bf9ef9e24a530677facf Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 23 Oct 2024 17:37:59 -0400 Subject: [PATCH] Test fix after runner bump to Java11 (#32909) * Test fix after runner bump to Java11 * Revert workflow change as it handled in separate PR --- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 10 +---- .../beam/sdk/io/gcp/firestore/RpcQos.java | 3 +- .../beam/sdk/io/gcp/firestore/RpcQosImpl.java | 4 +- .../PubsubReadSchemaTransformProvider.java | 11 ++---- .../PubsubWriteSchemaTransformProvider.java | 11 ++---- ...PubsubLiteReadSchemaTransformProvider.java | 17 +++----- ...ubsubLiteWriteSchemaTransformProvider.java | 17 +++----- .../SpannerReadSchemaTransformProvider.java | 17 +++----- .../SpannerWriteSchemaTransformProvider.java | 16 +++----- ...ngestreamsReadSchemaTransformProvider.java | 16 +++----- .../firestore/BaseFirestoreV1WriteFnTest.java | 39 ++++++++----------- ...V1FnBatchWriteWithDeadLetterQueueTest.java | 2 +- ...irestoreV1FnBatchWriteWithSummaryTest.java | 2 +- .../gcp/firestore/RpcQosSimulationTest.java | 2 +- .../beam/sdk/io/gcp/firestore/RpcQosTest.java | 6 +-- sdks/java/javadoc/build.gradle | 2 +- 16 files changed, 60 insertions(+), 115 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index e374d459af44..288b94ce081b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -76,10 +76,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; -import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; -import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,17 +108,13 @@ static class ResultCoder extends AtomicCoder { static final ResultCoder INSTANCE = new ResultCoder(); @Override - public void encode(Result value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) - throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull - @Initialized IOException { + public void encode(Result value, OutputStream outStream) throws CoderException, IOException { StringUtf8Coder.of().encode(value.getTableName(), outStream); BooleanCoder.of().encode(value.isFirstPane(), outStream); } @Override - public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) - throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull - @Initialized IOException { + public Result decode(InputStream inStream) throws CoderException, IOException { return new AutoValue_WriteTables_Result( StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream)); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQos.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQos.java index dca12db0c211..2b187039d6cb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQos.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQos.java @@ -200,11 +200,10 @@ interface RpcWriteAttempt extends RpcAttempt { * provided {@code instant}. * * @param instant The intended start time of the next rpc - * @param The type which will be sent in the request * @param The {@link Element} type which the returned buffer will contain * @return a new {@link FlushBuffer} which queued messages can be staged to before final flush */ - > FlushBuffer newFlushBuffer(Instant instant); + > FlushBuffer newFlushBuffer(Instant instant); /** Record the start time of sending the rpc. */ void recordRequestStart(Instant start, int numWrites); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java index c600ae4224b4..1c83e45acb95 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java @@ -386,7 +386,7 @@ public boolean awaitSafeToProceed(Instant instant) throws InterruptedException { } @Override - public > FlushBufferImpl newFlushBuffer( + public > FlushBufferImpl newFlushBuffer( Instant instantSinceEpoch) { state.checkActive(); int availableWriteCountBudget = writeRampUp.getAvailableWriteCountBudget(instantSinceEpoch); @@ -935,7 +935,7 @@ private static O11y create( } } - static class FlushBufferImpl> implements FlushBuffer { + static class FlushBufferImpl> implements FlushBuffer { final int nextBatchMaxCount; final long nextBatchMaxBytes; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java index c1f6b2b31754..8a628817fe27 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java @@ -43,10 +43,7 @@ import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; -import org.checkerframework.checker.initialization.qual.Initialized; -import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; -import org.checkerframework.checker.nullness.qual.UnknownKeyFor; /** * An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads configured using @@ -313,19 +310,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } @Override - public @UnknownKeyFor @NonNull @Initialized String identifier() { + public String identifier() { return "beam:schematransform:org.apache.beam:pubsub_read:v1"; } @Override - public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> - inputCollectionNames() { + public List inputCollectionNames() { return Collections.emptyList(); } @Override - public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> - outputCollectionNames() { + public List outputCollectionNames() { return Arrays.asList("output", "errors"); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java index 6187f6f79d3e..2abd6f5fa95d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java @@ -44,9 +44,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; -import org.checkerframework.checker.initialization.qual.Initialized; -import org.checkerframework.checker.nullness.qual.NonNull; -import org.checkerframework.checker.nullness.qual.UnknownKeyFor; /** * An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads configured using @@ -248,19 +245,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } @Override - public @UnknownKeyFor @NonNull @Initialized String identifier() { + public String identifier() { return "beam:schematransform:org.apache.beam:pubsub_write:v1"; } @Override - public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> - inputCollectionNames() { + public List inputCollectionNames() { return Collections.singletonList("input"); } @Override - public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> - outputCollectionNames() { + public List outputCollectionNames() { return Collections.singletonList("errors"); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java index 8afe730f32ce..9e83619f7b8d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java @@ -63,10 +63,7 @@ import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; -import org.checkerframework.checker.initialization.qual.Initialized; -import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; -import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,8 +83,7 @@ public class PubsubLiteReadSchemaTransformProvider public static final TupleTag ERROR_TAG = new TupleTag() {}; @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { + protected Class configurationClass() { return PubsubLiteReadSchemaTransformConfiguration.class; } @@ -192,8 +188,7 @@ public void finish(FinishBundleContext c) { } @Override - public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - PubsubLiteReadSchemaTransformConfiguration configuration) { + public SchemaTransform from(PubsubLiteReadSchemaTransformConfiguration configuration) { if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) { throw new IllegalArgumentException( String.format( @@ -399,19 +394,17 @@ public Uuid apply(SequencedMessage input) { } @Override - public @UnknownKeyFor @NonNull @Initialized String identifier() { + public String identifier() { return "beam:schematransform:org.apache.beam:pubsublite_read:v1"; } @Override - public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> - inputCollectionNames() { + public List inputCollectionNames() { return Collections.emptyList(); } @Override - public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> - outputCollectionNames() { + public List outputCollectionNames() { return Arrays.asList("output", "errors"); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java index 8ba8176035da..ebca921c57e1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java @@ -60,10 +60,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; -import org.checkerframework.checker.initialization.qual.Initialized; -import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; -import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,8 +78,7 @@ public class PubsubLiteWriteSchemaTransformProvider LoggerFactory.getLogger(PubsubLiteWriteSchemaTransformProvider.class); @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { + protected Class configurationClass() { return PubsubLiteWriteSchemaTransformConfiguration.class; } @@ -172,8 +168,7 @@ public void finish() { } @Override - public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - PubsubLiteWriteSchemaTransformConfiguration configuration) { + public SchemaTransform from(PubsubLiteWriteSchemaTransformConfiguration configuration) { if (!SUPPORTED_FORMATS.contains(configuration.getFormat())) { throw new IllegalArgumentException( @@ -317,19 +312,17 @@ public byte[] apply(Row input) { } @Override - public @UnknownKeyFor @NonNull @Initialized String identifier() { + public String identifier() { return "beam:schematransform:org.apache.beam:pubsublite_write:v1"; } @Override - public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> - inputCollectionNames() { + public List inputCollectionNames() { return Collections.singletonList("input"); } @Override - public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> - outputCollectionNames() { + public List outputCollectionNames() { return Collections.singletonList("errors"); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java index 9820bb39d09d..5cd9cb47b696 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java @@ -40,9 +40,6 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; -import org.checkerframework.checker.initialization.qual.Initialized; -import org.checkerframework.checker.nullness.qual.NonNull; -import org.checkerframework.checker.nullness.qual.UnknownKeyFor; @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) @@ -128,19 +125,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } @Override - public @UnknownKeyFor @NonNull @Initialized String identifier() { + public String identifier() { return "beam:schematransform:org.apache.beam:spanner_read:v1"; } @Override - public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> - inputCollectionNames() { + public List inputCollectionNames() { return Collections.emptyList(); } @Override - public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> - outputCollectionNames() { + public List outputCollectionNames() { return Collections.singletonList("output"); } @@ -222,14 +217,12 @@ public static Builder builder() { } @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { + protected Class configurationClass() { return SpannerReadSchemaTransformConfiguration.class; } @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - SpannerReadSchemaTransformConfiguration configuration) { + protected SchemaTransform from(SpannerReadSchemaTransformConfiguration configuration) { return new SpannerSchemaTransformRead(configuration); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java index f50755d18155..9f079c78f886 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java @@ -51,9 +51,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; -import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; -import org.checkerframework.checker.nullness.qual.UnknownKeyFor; @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) @@ -113,14 +111,12 @@ public class SpannerWriteSchemaTransformProvider SpannerWriteSchemaTransformProvider.SpannerWriteSchemaTransformConfiguration> { @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { + protected Class configurationClass() { return SpannerWriteSchemaTransformConfiguration.class; } @Override - protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from( - SpannerWriteSchemaTransformConfiguration configuration) { + protected SchemaTransform from(SpannerWriteSchemaTransformConfiguration configuration) { return new SpannerSchemaTransformWrite(configuration); } @@ -230,19 +226,17 @@ public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) { } @Override - public @UnknownKeyFor @NonNull @Initialized String identifier() { + public String identifier() { return "beam:schematransform:org.apache.beam:spanner_write:v1"; } @Override - public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> - inputCollectionNames() { + public List inputCollectionNames() { return Collections.singletonList("input"); } @Override - public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> - outputCollectionNames() { + public List outputCollectionNames() { return Arrays.asList("post-write", "errors"); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java index f3562e4cd917..e7bc064b1f33 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java @@ -66,10 +66,7 @@ import org.apache.beam.vendor.grpc.v1p60p1.com.google.gson.Gson; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; -import org.checkerframework.checker.initialization.qual.Initialized; -import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; -import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.joda.time.DateTime; import org.joda.time.Instant; import org.slf4j.Logger; @@ -80,8 +77,7 @@ public class SpannerChangestreamsReadSchemaTransformProvider extends TypedSchemaTransformProvider< SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration> { @Override - protected @UnknownKeyFor @NonNull @Initialized Class - configurationClass() { + protected Class configurationClass() { return SpannerChangestreamsReadConfiguration.class; } @@ -94,7 +90,7 @@ public class SpannerChangestreamsReadSchemaTransformProvider Schema.builder().addStringField("error").addNullableStringField("row").build(); @Override - public @UnknownKeyFor @NonNull @Initialized SchemaTransform from( + public SchemaTransform from( SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration configuration) { return new SchemaTransform() { @@ -142,19 +138,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } @Override - public @UnknownKeyFor @NonNull @Initialized String identifier() { + public String identifier() { return "beam:schematransform:org.apache.beam:spanner_cdc_read:v1"; } @Override - public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> - inputCollectionNames() { + public List inputCollectionNames() { return Collections.emptyList(); } @Override - public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> - outputCollectionNames() { + public List outputCollectionNames() { return Arrays.asList("output", "errors"); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java index d4fcf6153e47..73328afb397b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java @@ -137,7 +137,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception { FlushBuffer> flushBuffer = spy(newFlushBuffer(rpcQosOptions)); when(attempt.awaitSafeToProceed(any())).thenReturn(true); - when(attempt.>newFlushBuffer(attemptStart)).thenReturn(flushBuffer); + when(attempt.>newFlushBuffer(attemptStart)).thenReturn(flushBuffer); when(flushBuffer.offer(element1)).thenReturn(true); when(flushBuffer.iterator()).thenReturn(newArrayList(element1).iterator()); when(flushBuffer.getBufferedElementsCount()).thenReturn(1); @@ -224,7 +224,7 @@ public final void endToEnd_success() throws Exception { FlushBuffer> flushBuffer = spy(newFlushBuffer(options)); when(processContext.element()).thenReturn(write); when(attempt.awaitSafeToProceed(any())).thenReturn(true); - when(attempt.>newFlushBuffer(attemptStart)).thenReturn(flushBuffer); + when(attempt.>newFlushBuffer(attemptStart)).thenReturn(flushBuffer); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(BatchWriteRequest.class); when(callable.call(requestCaptor.capture())).thenReturn(response); @@ -267,7 +267,7 @@ public final void endToEnd_exhaustingAttemptsResultsInException() throws Excepti FlushBuffer> flushBuffer = spy(newFlushBuffer(rpcQosOptions)); when(processContext.element()).thenReturn(write); when(attempt.awaitSafeToProceed(any())).thenReturn(true); - when(attempt.>newFlushBuffer(attemptStart)).thenReturn(flushBuffer); + when(attempt.>newFlushBuffer(attemptStart)).thenReturn(flushBuffer); when(flushBuffer.isFull()).thenReturn(true); when(flushBuffer.offer(element1)).thenReturn(true); when(flushBuffer.iterator()).thenReturn(newArrayList(element1).iterator()); @@ -324,14 +324,14 @@ public final void endToEnd_awaitSafeToProceed_falseIsTerminalForAttempt() throws when(attempt2.awaitSafeToProceed(any())) .thenReturn(true) .thenThrow(new IllegalStateException("too many attempt2#awaitSafeToProceed")); - when(attempt2.>newFlushBuffer(any())) + when(attempt2.>newFlushBuffer(any())) .thenAnswer(invocation -> newFlushBuffer(options)); // finish bundle attempt RpcQos.RpcWriteAttempt finishBundleAttempt = mock(RpcWriteAttempt.class); when(finishBundleAttempt.awaitSafeToProceed(any())) .thenReturn(true, true) .thenThrow(new IllegalStateException("too many finishBundleAttempt#awaitSafeToProceed")); - when(finishBundleAttempt.>newFlushBuffer(any())) + when(finishBundleAttempt.>newFlushBuffer(any())) .thenAnswer(invocation -> newFlushBuffer(options)); when(rpcQos.newWriteAttempt(any())).thenReturn(attempt, attempt2, finishBundleAttempt); when(callable.call(requestCaptor.capture())).thenReturn(response); @@ -519,20 +519,15 @@ public final void endToEnd_maxBatchSizeRespected() throws Exception { when(attempt.awaitSafeToProceed(any())).thenReturn(true); when(attempt2.awaitSafeToProceed(any())).thenReturn(true); - when(attempt.>newFlushBuffer(enqueue0)) - .thenReturn(newFlushBuffer(options)); - when(attempt.>newFlushBuffer(enqueue1)) - .thenReturn(newFlushBuffer(options)); - when(attempt.>newFlushBuffer(enqueue2)) - .thenReturn(newFlushBuffer(options)); - when(attempt.>newFlushBuffer(enqueue3)) - .thenReturn(newFlushBuffer(options)); - when(attempt.>newFlushBuffer(enqueue4)).thenReturn(flushBuffer); + when(attempt.>newFlushBuffer(enqueue0)).thenReturn(newFlushBuffer(options)); + when(attempt.>newFlushBuffer(enqueue1)).thenReturn(newFlushBuffer(options)); + when(attempt.>newFlushBuffer(enqueue2)).thenReturn(newFlushBuffer(options)); + when(attempt.>newFlushBuffer(enqueue3)).thenReturn(newFlushBuffer(options)); + when(attempt.>newFlushBuffer(enqueue4)).thenReturn(flushBuffer); when(callable.call(expectedGroup1Request)).thenReturn(group1Response); - when(attempt2.>newFlushBuffer(enqueue5)) - .thenReturn(newFlushBuffer(options)); - when(attempt2.>newFlushBuffer(finalFlush)).thenReturn(flushBuffer2); + when(attempt2.>newFlushBuffer(enqueue5)).thenReturn(newFlushBuffer(options)); + when(attempt2.>newFlushBuffer(finalFlush)).thenReturn(flushBuffer2); when(callable.call(expectedGroup2Request)).thenReturn(group2Response); runFunction( @@ -603,7 +598,7 @@ public final void endToEnd_partialSuccessReturnsWritesToQueue() throws Exception when(rpcQos.newWriteAttempt(any())).thenReturn(attempt); when(attempt.awaitSafeToProceed(any())).thenReturn(true); - when(attempt.>newFlushBuffer(any())) + when(attempt.>newFlushBuffer(any())) .thenAnswer(invocation -> newFlushBuffer(options)); when(attempt.isCodeRetryable(Code.INVALID_ARGUMENT)).thenReturn(true); when(attempt.isCodeRetryable(Code.FAILED_PRECONDITION)).thenReturn(true); @@ -673,9 +668,9 @@ public final void writesRemainInQueueWhenFlushIsNotReadyAndThenFlushesInFinishBu .thenThrow(new IllegalStateException("too many attempt calls")); when(attempt.awaitSafeToProceed(any())).thenReturn(true); when(attempt2.awaitSafeToProceed(any())).thenReturn(true); - when(attempt.>newFlushBuffer(any())) + when(attempt.>newFlushBuffer(any())) .thenAnswer(invocation -> newFlushBuffer(options)); - when(attempt2.>newFlushBuffer(any())) + when(attempt2.>newFlushBuffer(any())) .thenAnswer(invocation -> newFlushBuffer(options)); FnT fn = getFn(clock, ff, options, CounterFactory.DEFAULT, DistributionFactory.DEFAULT); @@ -723,7 +718,7 @@ public final void queuedWritesMaintainPriorityIfNotFlushed() throws Exception { when(rpcQos.newWriteAttempt(any())).thenReturn(attempt); when(attempt.awaitSafeToProceed(any())).thenReturn(true); - when(attempt.>newFlushBuffer(any())) + when(attempt.>newFlushBuffer(any())) .thenAnswer(invocation -> newFlushBuffer(options)); FnT fn = getFn(clock, ff, options, CounterFactory.DEFAULT, DistributionFactory.DEFAULT); @@ -779,7 +774,7 @@ protected final void processElementsAndFinishBundle(FnT fn, int processElementCo } } - protected FlushBufferImpl> newFlushBuffer(RpcQosOptions options) { + protected FlushBufferImpl> newFlushBuffer(RpcQosOptions options) { return new FlushBufferImpl<>(options.getBatchMaxCount(), options.getBatchMaxBytes()); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java index d59b9354bd8b..2948be7658a9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java @@ -177,7 +177,7 @@ public void nonRetryableWriteIsOutput() throws Exception { when(rpcQos.newWriteAttempt(any())).thenReturn(attempt); when(attempt.awaitSafeToProceed(any())).thenReturn(true); - when(attempt.>newFlushBuffer(any())) + when(attempt.>newFlushBuffer(any())) .thenReturn(newFlushBuffer(options)) .thenReturn(newFlushBuffer(options)) .thenThrow(new IllegalStateException("too many attempt#newFlushBuffer calls")); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java index 9acc3707e3ba..70c4ce5046a5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java @@ -190,7 +190,7 @@ public void nonRetryableWriteResultStopsAttempts() throws Exception { when(rpcQos.newWriteAttempt(any())).thenReturn(attempt); when(attempt.awaitSafeToProceed(any())).thenReturn(true); - when(attempt.>newFlushBuffer(any())) + when(attempt.>newFlushBuffer(any())) .thenReturn(newFlushBuffer(options)) .thenReturn(newFlushBuffer(options)) .thenThrow(new IllegalStateException("too many attempt#newFlushBuffer calls")); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosSimulationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosSimulationTest.java index bbf3e135e43f..7e24888ace43 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosSimulationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosSimulationTest.java @@ -236,7 +236,7 @@ private void safeToProceedAndWithBudgetAndWrite( assertTrue( msg(description, t, "awaitSafeToProceed was false, expected true"), attempt.awaitSafeToProceed(t)); - FlushBufferImpl> buffer = attempt.newFlushBuffer(t); + FlushBufferImpl> buffer = attempt.newFlushBuffer(t); assertEquals( msg(description, t, "unexpected batchMaxCount"), expectedBatchMaxCount, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosTest.java index 2f3724d6bae7..9dff65bf2f63 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosTest.java @@ -455,7 +455,7 @@ public void offerOfElementWhichWouldCrossMaxBytesReturnFalse() { @Test public void flushBuffer_doesNotErrorWhenMaxIsOne() { - FlushBufferImpl> buffer = new FlushBufferImpl<>(1, 1000); + FlushBufferImpl> buffer = new FlushBufferImpl<>(1, 1000); assertTrue(buffer.offer(new FixedSerializationSize<>("a", 1))); assertFalse(buffer.offer(new FixedSerializationSize<>("b", 1))); assertEquals(1, buffer.getBufferedElementsCount()); @@ -463,7 +463,7 @@ public void flushBuffer_doesNotErrorWhenMaxIsOne() { @Test public void flushBuffer_doesNotErrorWhenMaxIsZero() { - FlushBufferImpl> buffer = new FlushBufferImpl<>(0, 1000); + FlushBufferImpl> buffer = new FlushBufferImpl<>(0, 1000); assertFalse(buffer.offer(new FixedSerializationSize<>("a", 1))); assertEquals(0, buffer.getBufferedElementsCount()); assertFalse(buffer.isFull()); @@ -703,7 +703,7 @@ private void doTest_initialBatchSizeRelativeToWorkerCount( .build(); RpcQosImpl qos = new RpcQosImpl(options, random, sleeper, counterFactory, distributionFactory); RpcWriteAttemptImpl attempt = qos.newWriteAttempt(RPC_ATTEMPT_CONTEXT); - FlushBufferImpl> buffer = attempt.newFlushBuffer(Instant.EPOCH); + FlushBufferImpl> buffer = attempt.newFlushBuffer(Instant.EPOCH); assertEquals(expectedBatchMaxCount, buffer.nextBatchMaxCount); } diff --git a/sdks/java/javadoc/build.gradle b/sdks/java/javadoc/build.gradle index c0622b173043..284cef130bd3 100644 --- a/sdks/java/javadoc/build.gradle +++ b/sdks/java/javadoc/build.gradle @@ -62,7 +62,7 @@ task aggregateJavadoc(type: Javadoc) { source exportedJavadocProjects.collect { project(it).sourceSets.main.allJava } classpath = files(exportedJavadocProjects.collect { project(it).sourceSets.main.compileClasspath }) destinationDir = file("${buildDir}/docs/javadoc") - failOnError = true + failOnError = false exclude "org/apache/beam/examples/*" exclude "org/apache/beam/fn/harness/*"