diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingAggregationsTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingAggregationsTranslators.java index 1579a3d4affa..1683ced890c7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingAggregationsTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingAggregationsTranslators.java @@ -17,6 +17,11 @@ */ package org.apache.beam.runners.flink; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; @@ -58,14 +63,9 @@ import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.util.Collector; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class FlinkStreamingAggregationsTranslators { - public static class ConcatenateAsIterable extends Combine.CombineFn, Iterable> { + public static class ConcatenateAsIterable + extends Combine.CombineFn, Iterable> { @Override public Iterable createAccumulator() { return new ArrayList<>(); @@ -214,8 +214,7 @@ WindowDoFnOperator getWindowedAggregateDoFnOperato WindowedValue.getFullCoder(workItemCoder, windowingStrategy.getWindowFn().windowCoder()); // Key selector - WorkItemKeySelector workItemKeySelector = - new WorkItemKeySelector<>(keyCoder); + WorkItemKeySelector workItemKeySelector = new WorkItemKeySelector<>(keyCoder); return new WindowDoFnOperator<>( reduceFn, @@ -257,29 +256,30 @@ WindowDoFnOperator getWindowedAggregateDoFnOperato } private static class FlattenIterable - implements FlatMapFunction>>>, WindowedValue>>> { + implements FlatMapFunction< + WindowedValue>>>, + WindowedValue>>> { @Override public void flatMap( WindowedValue>>> w, - Collector>>> collector) throws Exception { - WindowedValue>> flattened = w.withValue( - KV.of( - w.getValue().getKey(), - Iterables.concat(w.getValue().getValue()))); + Collector>>> collector) + throws Exception { + WindowedValue>> flattened = + w.withValue(KV.of(w.getValue().getKey(), Iterables.concat(w.getValue().getValue()))); collector.collect(flattened); } } public static - SingleOutputStreamOperator>> getBatchCombinePerKeyOperator( - FlinkStreamingTranslationContext context, - PCollection> input, - Map> sideInputTagMapping, - List> sideInputs, - Coder>> windowedAccumCoder, - CombineFnBase.GlobalCombineFn combineFn, - WindowDoFnOperator finalDoFnOperator, - TypeInformation>> outputTypeInfo){ + SingleOutputStreamOperator>> getBatchCombinePerKeyOperator( + FlinkStreamingTranslationContext context, + PCollection> input, + Map> sideInputTagMapping, + List> sideInputs, + Coder>> windowedAccumCoder, + CombineFnBase.GlobalCombineFn combineFn, + WindowDoFnOperator finalDoFnOperator, + TypeInformation>> outputTypeInfo) { String fullName = FlinkStreamingTransformTranslators.getCurrentTransformName(context); DataStream>> inputDataStream = context.getInputDataStream(input); @@ -314,50 +314,55 @@ SingleOutputStreamOperator>> getBatchCombinePerKeyO if (sideInputs.isEmpty()) { return inputDataStream .transform(partialName, partialTypeInfo, partialDoFnOperator) - .uid(partialName).name(partialName) + .uid(partialName) + .name(partialName) .keyBy(accumKeySelector) .transform(fullName, outputTypeInfo, finalDoFnOperator) - .uid(fullName).name(fullName); + .uid(fullName) + .name(fullName); } else { Tuple2>, DataStream> transformSideInputs = FlinkStreamingTransformTranslators.transformSideInputs(sideInputs, context); TwoInputTransformation< - WindowedValue>, RawUnionValue, WindowedValue>> rawPartialFlinkTransform = - new TwoInputTransformation<>( - inputDataStream.getTransformation(), - transformSideInputs.f1.broadcast().getTransformation(), - partialName, - partialDoFnOperator, - partialTypeInfo, - inputDataStream.getParallelism()); + WindowedValue>, RawUnionValue, WindowedValue>> + rawPartialFlinkTransform = + new TwoInputTransformation<>( + inputDataStream.getTransformation(), + transformSideInputs.f1.broadcast().getTransformation(), + partialName, + partialDoFnOperator, + partialTypeInfo, + inputDataStream.getParallelism()); SingleOutputStreamOperator>> partialyCombinedStream = new SingleOutputStreamOperator>>( inputDataStream.getExecutionEnvironment(), rawPartialFlinkTransform) {}; // we have to cheat around the ctor being protected - inputDataStream.getExecutionEnvironment().addOperator(rawPartialFlinkTransform); + inputDataStream.getExecutionEnvironment().addOperator(rawPartialFlinkTransform); - return buildTwoInputStream( - partialyCombinedStream.keyBy(accumKeySelector), - transformSideInputs.f1, - fullName, - finalDoFnOperator, - outputTypeInfo); + return buildTwoInputStream( + partialyCombinedStream.keyBy(accumKeySelector), + transformSideInputs.f1, + fullName, + finalDoFnOperator, + outputTypeInfo); } } /** - * Creates a two-steps GBK operation. Elements are first aggregated locally to save on serialized size since in batch - * it's very likely that all the elements will be within the same window and pane. - * The only difference with batchCombinePerKey is the nature of the SystemReduceFn used. It uses SystemReduceFn.buffering() - * instead of SystemReduceFn.combining() so that new element can simply be appended without accessing the existing state. + * Creates a two-steps GBK operation. Elements are first aggregated locally to save on serialized + * size since in batch it's very likely that all the elements will be within the same window and + * pane. The only difference with batchCombinePerKey is the nature of the SystemReduceFn used. It + * uses SystemReduceFn.buffering() instead of SystemReduceFn.combining() so that new element can + * simply be appended without accessing the existing state. */ - public static SingleOutputStreamOperator>>> batchGroupByKey( - FlinkStreamingTranslationContext context, - PTransform>, PCollection>>> transform) { + public static + SingleOutputStreamOperator>>> batchGroupByKey( + FlinkStreamingTranslationContext context, + PTransform>, PCollection>>> transform) { Map> sideInputTagMapping = new HashMap<>(); List> sideInputs = Collections.emptyList(); @@ -372,7 +377,8 @@ public static SingleOutputStreamOperator> accumulatorCoder = IterableCoder.of(inputKvCoder.getValueCoder()); - KvCoder> accumKvCoder = KvCoder.of(inputKvCoder.getKeyCoder(), accumulatorCoder); + KvCoder> accumKvCoder = + KvCoder.of(inputKvCoder.getKeyCoder(), accumulatorCoder); Coder>>> windowedAccumCoder = WindowedValue.getFullCoder( @@ -380,50 +386,55 @@ public static SingleOutputStreamOperator>>>> outputCoder = WindowedValue.getFullCoder( - KvCoder.of(inputKvCoder.getKeyCoder(), IterableCoder.of(accumulatorCoder)) , input.getWindowingStrategy().getWindowFn().windowCoder()); + KvCoder.of(inputKvCoder.getKeyCoder(), IterableCoder.of(accumulatorCoder)), + input.getWindowingStrategy().getWindowFn().windowCoder()); TypeInformation>>>> accumulatedTypeInfo = new CoderTypeInformation<>( - WindowedValue.getFullCoder( - KvCoder.of(inputKvCoder.getKeyCoder(), IterableCoder.of(IterableCoder.of(inputKvCoder.getValueCoder()))), input.getWindowingStrategy().getWindowFn().windowCoder()), + WindowedValue.getFullCoder( + KvCoder.of( + inputKvCoder.getKeyCoder(), + IterableCoder.of(IterableCoder.of(inputKvCoder.getValueCoder()))), + input.getWindowingStrategy().getWindowFn().windowCoder()), serializablePipelineOptions); // final aggregation WindowDoFnOperator, Iterable>> finalDoFnOperator = - getWindowedAccumulateDoFnOperator( - context, - transform, - accumKvCoder, - outputCoder, - sideInputTagMapping, - sideInputs); - - return - getBatchCombinePerKeyOperator( - context, - input, - sideInputTagMapping, - sideInputs, - windowedAccumCoder, - new ConcatenateAsIterable<>(), - finalDoFnOperator, - accumulatedTypeInfo - ) - .flatMap(new FlattenIterable<>(), outputTypeInfo) - .name("concatenate"); + getWindowedAccumulateDoFnOperator( + context, transform, accumKvCoder, outputCoder, sideInputTagMapping, sideInputs); + + return getBatchCombinePerKeyOperator( + context, + input, + sideInputTagMapping, + sideInputs, + windowedAccumCoder, + new ConcatenateAsIterable<>(), + finalDoFnOperator, + accumulatedTypeInfo) + .flatMap(new FlattenIterable<>(), outputTypeInfo) + .name("concatenate"); } - private static WindowDoFnOperator, Iterable>> getWindowedAccumulateDoFnOperator( - FlinkStreamingTranslationContext context, - PTransform>, PCollection>>> transform, - KvCoder> accumKvCoder, - Coder>>>> outputCoder, - Map> sideInputTagMapping, - List> sideInputs) { + private static + WindowDoFnOperator, Iterable>> + getWindowedAccumulateDoFnOperator( + FlinkStreamingTranslationContext context, + PTransform>, PCollection>>> + transform, + KvCoder> accumKvCoder, + Coder>>>> outputCoder, + Map> sideInputTagMapping, + List> sideInputs) { - // Combining fn - SystemReduceFn, Iterable>, Iterable>, BoundedWindow> reduceFn = - SystemReduceFn.buffering(accumKvCoder.getValueCoder()); + // Combining fn + SystemReduceFn< + K, + Iterable, + Iterable>, + Iterable>, + BoundedWindow> + reduceFn = SystemReduceFn.buffering(accumKvCoder.getValueCoder()); return getWindowedAggregateDoFnOperator( context, transform, accumKvCoder, outputCoder, reduceFn, sideInputTagMapping, sideInputs); @@ -482,8 +493,7 @@ SingleOutputStreamOperator>> batchCombinePerKey( windowedAccumCoder, combineFn, finalDoFnOperator, - outputTypeInfo - ); + outputTypeInfo); } @SuppressWarnings({ diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 8f0e6db26dab..0607838987f1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -27,7 +27,6 @@ import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; - import org.apache.beam.runners.flink.adapter.FlinkKey; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index 901ab1c672dc..a74be9f7e9e0 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -434,8 +434,7 @@ private SingleOutputStreamOperator>>> add new WorkItemKeySelector<>(inputElementCoder.getKeyCoder()); KeyedStream>, FlinkKey> keyedWorkItemStream = - inputDataStream.keyBy( - new KvToFlinkKeyKeySelector(inputElementCoder.getKeyCoder())); + inputDataStream.keyBy(new KvToFlinkKeyKeySelector(inputElementCoder.getKeyCoder())); SystemReduceFn, Iterable, BoundedWindow> reduceFn = SystemReduceFn.buffering(inputElementCoder.getValueCoder()); @@ -829,8 +828,7 @@ private void translateExecutableStage( } if (stateful) { keyCoder = ((KvCoder) valueCoder).getKeyCoder(); - keySelector = - new KvToFlinkKeyKeySelector(keyCoder); + keySelector = new KvToFlinkKeyKeySelector(keyCoder); } else { // For an SDF, we know that the input element should be // KV>, size>. We are going to use the element @@ -844,8 +842,7 @@ private void translateExecutableStage( valueCoder.getClass().getSimpleName())); } keyCoder = ((KvCoder) ((KvCoder) valueCoder).getKeyCoder()).getKeyCoder(); - keySelector = - new SdfFlinkKeyKeySelector(keyCoder); + keySelector = new SdfFlinkKeyKeySelector(keyCoder); } inputDataStream = inputDataStream.keyBy(keySelector); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 35a08eb54115..36cf035a33be 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -105,7 +105,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.configuration.Configuration; @@ -591,8 +590,7 @@ static void translateParDo( // Based on the fact that the signature is stateful, DoFnSignatures ensures // that it is also keyed keyCoder = ((KvCoder) input.getCoder()).getKeyCoder(); - keySelector = - new KvToFlinkKeyKeySelector<>(keyCoder); + keySelector = new KvToFlinkKeyKeySelector<>(keyCoder); final PTransform> producer = context.getProducer(input); final String previousUrn = producer != null @@ -609,8 +607,7 @@ static void translateParDo( } else if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) { // we know that it is keyed on byte[] keyCoder = ByteArrayCoder.of(); - keySelector = - new WorkItemKeySelector<>(keyCoder); + keySelector = new WorkItemKeySelector<>(keyCoder); stateful = true; } @@ -962,10 +959,7 @@ public void translateNode( SingleOutputStreamOperator>>> outDataStream; // Pre-aggregate before shuffle similar to group combine if (!context.isStreaming()) { - outDataStream = - FlinkStreamingAggregationsTranslators.batchGroupByKey( - context, - transform); + outDataStream = FlinkStreamingAggregationsTranslators.batchGroupByKey(context, transform); } else { // No pre-aggregation in Streaming mode. KvToFlinkKeyKeySelector keySelector = @@ -1046,8 +1040,7 @@ public void translateNode( List> sideInputs = ((Combine.PerKey) transform).getSideInputs(); KeyedStream>, FlinkKey> keyedStream = - inputDataStream.keyBy( - new KvToFlinkKeyKeySelector<>(keyCoder)); + inputDataStream.keyBy(new KvToFlinkKeyKeySelector<>(keyCoder)); if (sideInputs.isEmpty()) { SingleOutputStreamOperator>> outDataStream; @@ -1147,8 +1140,7 @@ public void translateNode( .name("ToKeyedWorkItem"); KeyedStream>, FlinkKey> keyedWorkItemStream = - workItemStream.keyBy( - new WorkItemKeySelector<>(inputKvCoder.getKeyCoder())); + workItemStream.keyBy(new WorkItemKeySelector<>(inputKvCoder.getKeyCoder())); context.setOutputDataStream(context.getOutput(transform), keyedWorkItemStream); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkKey.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkKey.java index cf3871f68a1c..6a5e8d0458f3 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkKey.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkKey.java @@ -17,6 +17,11 @@ */ package org.apache.beam.runners.flink.adapter; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.nio.ByteBuffer; +import javax.annotation.Nullable; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils; import org.apache.beam.sdk.coders.ByteArrayCoder; @@ -26,12 +31,6 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.Value; -import javax.annotation.Nullable; -import java.io.IOException; -import java.nio.ByteBuffer; - -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; - public class FlinkKey implements Value { private final CoderTypeSerializer serializer; @@ -77,7 +76,7 @@ public K getKey(Coder coder) { @Override public int hashCode() { -// return underlying.hashCode(); + // return underlying.hashCode(); return Hashing.murmur3_128().hashBytes(underlying.array()).asInt(); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index c02d7d9c99ea..53e09f3f818c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -562,8 +562,7 @@ class SdfFlinkTimerInternalsFactory implements TimerInternalsFactory { @Override public TimerInternals timerInternalsForKey(InputT key) { try { - FlinkKey encodedKey = - (FlinkKey) keySelector.getKey(WindowedValue.valueInGlobalWindow(key)); + FlinkKey encodedKey = (FlinkKey) keySelector.getKey(WindowedValue.valueInGlobalWindow(key)); return new SdfFlinkTimerInternals(encodedKey); } catch (Exception e) { throw new RuntimeException("Couldn't get a timer internals", e); @@ -659,8 +658,7 @@ class SdfFlinkStateInternalsFactory implements StateInternalsFactory { @Override public StateInternals stateInternalsForKey(InputT key) { try { - FlinkKey encodedKey = - (FlinkKey) keySelector.getKey(WindowedValue.valueInGlobalWindow(key)); + FlinkKey encodedKey = (FlinkKey) keySelector.getKey(WindowedValue.valueInGlobalWindow(key)); return new SdfFlinkStateInternals(encodedKey); } catch (Exception e) { throw new RuntimeException("Couldn't get a state internals", e); @@ -1259,7 +1257,10 @@ void cleanupState(StateInternals stateInternals, Consumer keyContextCo cleanupTimer.setCleanupTimer(window); } else { if (LOG.isDebugEnabled()) { - LOG.debug("State cleanup for {} {}", Arrays.toString(kv.getKey().getSerializedKey().array()), window); + LOG.debug( + "State cleanup for {} {}", + Arrays.toString(kv.getKey().getSerializedKey().array()), + window); } // No more timers (finally!). Time to clean up. for (String userState : userStateNames) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SdfFlinkKeyKeySelector.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SdfFlinkKeyKeySelector.java index 176a585e993d..b316726e74f8 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SdfFlinkKeyKeySelector.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SdfFlinkKeyKeySelector.java @@ -17,10 +17,7 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming; -import java.nio.ByteBuffer; -import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.flink.adapter.FlinkKey; -import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -32,8 +29,8 @@ /** * {@link KeySelector} that retrieves a key from a {@code KV>, size>}. This will return the element as encoded by the provided {@link Coder} - * in a {@link FlinkKey}. This ensures that all key comparisons/hashing happen on the encoded - * form. Note that the reason we don't use the whole {@code KV>, Double>} as the key is when checkpoint happens, we will get different * restriction/watermarkState/size, which Flink treats as a new key. Using new key to set state and * timer may cause defined behavior. diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/adapter/FlinkKeyTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/adapter/FlinkKeyTest.java index b0fadc0c07eb..649332c1e48f 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/adapter/FlinkKeyTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/adapter/FlinkKeyTest.java @@ -17,6 +17,16 @@ */ package org.apache.beam.runners.flink.adapter; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.not; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; @@ -27,17 +37,6 @@ import org.hamcrest.core.IsInstanceOf; import org.junit.Test; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.not; - public class FlinkKeyTest { @Test public void testIsRecognizedAsValue() { @@ -48,7 +47,8 @@ public void testIsRecognizedAsValue() { assertThat(tpe, IsInstanceOf.instanceOf(ValueTypeInfo.class)); - TypeInformation> tupleTpe = TypeExtractor.getForObject(Tuple2.of(key, bs)); + TypeInformation> tupleTpe = + TypeExtractor.getForObject(Tuple2.of(key, bs)); assertThat(tupleTpe, not(IsInstanceOf.instanceOf(GenericTypeInfo.class))); } @@ -67,10 +67,11 @@ public void testIsConsistent() { private void checkDistribution(int numKeys) { int paralellism = 2100; - Set hashcodes = IntStream.range(0, numKeys) - .mapToObj(i -> FlinkKey.of(i, VarIntCoder.of())) - .map(k -> k.hashCode()) - .collect(Collectors.toSet()); + Set hashcodes = + IntStream.range(0, numKeys) + .mapToObj(i -> FlinkKey.of(i, VarIntCoder.of())) + .map(k -> k.hashCode()) + .collect(Collectors.toSet()); Set keyGroups = hashcodes.stream() diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java index 95c255159e6f..2324a262acc0 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java @@ -40,7 +40,6 @@ import org.apache.beam.sdk.util.CoderUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -212,7 +211,8 @@ public static KeyedStateBackend createStateBackend() throws Exception private static void changeKey(KeyedStateBackend keyedStateBackend) throws CoderException { keyedStateBackend.setCurrentKey( - FlinkKey.of(ByteBuffer.wrap( - CoderUtils.encodeToByteArray(StringUtf8Coder.of(), UUID.randomUUID().toString())))); + FlinkKey.of( + ByteBuffer.wrap( + CoderUtils.encodeToByteArray(StringUtf8Coder.of(), UUID.randomUUID().toString())))); } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index 7db97769d97a..f0d8816bdeab 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -376,9 +376,7 @@ public void onProcessingTime(OnTimerContext context) { OneInputStreamOperatorTestHarness, WindowedValue> testHarness = new KeyedOneInputStreamOperatorTestHarness<>( - doFnOperator, - keySelector, - ValueTypeInfo.of(FlinkKey.class)); + doFnOperator, keySelector, ValueTypeInfo.of(FlinkKey.class)); testHarness.setup( new CoderTypeSerializer<>( @@ -544,9 +542,7 @@ void emitWatermarkIfHoldChanged(long currentWatermarkHold) { WindowedValue>, WindowedValue>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>( - doFnOperator, - keySelector, - ValueTypeInfo.of(FlinkKey.class)); + doFnOperator, keySelector, ValueTypeInfo.of(FlinkKey.class)); testHarness.setup(); @@ -638,9 +634,7 @@ public void processElement(ProcessContext context) { OneInputStreamOperatorTestHarness, WindowedValue> testHarness = new KeyedOneInputStreamOperatorTestHarness<>( - doFnOperator, - keySelector, - ValueTypeInfo.of(FlinkKey.class)); + doFnOperator, keySelector, ValueTypeInfo.of(FlinkKey.class)); testHarness.open(); @@ -887,9 +881,7 @@ outputTag, coder, new SerializablePipelineOptions(options)), Collections.emptyMap()); return new KeyedOneInputStreamOperatorTestHarness<>( - doFnOperator, - keySelector, - ValueTypeInfo.of(FlinkKey.class)); + doFnOperator, keySelector, ValueTypeInfo.of(FlinkKey.class)); } @Test @@ -943,10 +935,7 @@ outputTag, coder, new SerializablePipelineOptions(FlinkPipelineOptions.defaults( // we use a dummy key for the second input since it is considered to be broadcast testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( - doFnOperator, - keySelector, - null, - ValueTypeInfo.of(FlinkKey.class)); + doFnOperator, keySelector, null, ValueTypeInfo.of(FlinkKey.class)); } testHarness.open(); @@ -1035,16 +1024,14 @@ public void processElement( TupleTag> outputTag = new TupleTag<>("main-output"); StringUtf8Coder keyCoder = StringUtf8Coder.of(); - KvToFlinkKeyKeySelector keySelector = - new KvToFlinkKeyKeySelector<>(keyCoder); + KvToFlinkKeyKeySelector keySelector = new KvToFlinkKeyKeySelector<>(keyCoder); KvCoder coder = KvCoder.of(keyCoder, VarLongCoder.of()); FullWindowedValueCoder> kvCoder = WindowedValue.getFullCoder(coder, windowingStrategy.getWindowFn().windowCoder()); - TypeInformation keyCoderInfo = - ValueTypeInfo.of(FlinkKey.class); + TypeInformation keyCoderInfo = ValueTypeInfo.of(FlinkKey.class); OneInputStreamOperatorTestHarness< WindowedValue>, WindowedValue>> @@ -1666,8 +1653,7 @@ public void finishBundle(FinishBundleContext context) { public void testBundleKeyed() throws Exception { StringUtf8Coder keyCoder = StringUtf8Coder.of(); - KvToFlinkKeyKeySelector keySelector = - new KvToFlinkKeyKeySelector<>(keyCoder); + KvToFlinkKeyKeySelector keySelector = new KvToFlinkKeyKeySelector<>(keyCoder); KvCoder kvCoder = KvCoder.of(keyCoder, StringUtf8Coder.of()); WindowedValue.ValueOnlyWindowedValueCoder> windowedValueCoder = WindowedValue.getValueOnlyCoder(kvCoder); @@ -2114,8 +2100,7 @@ public void testExactlyOnceBufferingKeyed() throws Exception { TupleTag> outputTag = new TupleTag<>("main-output"); StringUtf8Coder keyCoder = StringUtf8Coder.of(); - KvToFlinkKeyKeySelector keySelector = - new KvToFlinkKeyKeySelector<>(keyCoder); + KvToFlinkKeyKeySelector keySelector = new KvToFlinkKeyKeySelector<>(keyCoder); KvCoder kvCoder = KvCoder.of(keyCoder, StringUtf8Coder.of()); WindowedValue.ValueOnlyWindowedValueCoder> windowedValueCoder = WindowedValue.getValueOnlyCoder(kvCoder); @@ -2246,8 +2231,7 @@ public void testFailOnRequiresStableInputAndDisabledCheckpointing() { TupleTag> outputTag = new TupleTag<>("main-output"); StringUtf8Coder keyCoder = StringUtf8Coder.of(); - KvToFlinkKeyKeySelector keySelector = - new KvToFlinkKeyKeySelector<>(keyCoder); + KvToFlinkKeyKeySelector keySelector = new KvToFlinkKeyKeySelector<>(keyCoder); KvCoder kvCoder = KvCoder.of(keyCoder, StringUtf8Coder.of()); WindowedValue.ValueOnlyWindowedValueCoder> windowedValueCoder = WindowedValue.getValueOnlyCoder(kvCoder); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java index 68cffda38e36..a0a955aea1d6 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java @@ -791,9 +791,7 @@ private void testEnsureDeferredStateCleanupTimerFiring(boolean withCheckpointing FlinkKey, WindowedValue>, WindowedValue> testHarness = new KeyedOneInputStreamOperatorTestHarness( - operator, - operator.keySelector, - ValueTypeInfo.of(FlinkKey.class)); + operator, operator.keySelector, ValueTypeInfo.of(FlinkKey.class)); testHarness.open(); @@ -941,9 +939,7 @@ public void testEnsureStateCleanupOnFinalWatermark() throws Exception { FlinkKey, WindowedValue>, WindowedValue> testHarness = new KeyedOneInputStreamOperatorTestHarness( - operator, - operator.keySelector, - ValueTypeInfo.of(FlinkKey.class)); + operator, operator.keySelector, ValueTypeInfo.of(FlinkKey.class)); RemoteBundle bundle = Mockito.mock(RemoteBundle.class); when(bundle.getInputReceivers()) diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java index 221a8c458886..6380108ddb94 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java @@ -30,7 +30,6 @@ import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; - import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; @@ -75,7 +74,7 @@ public class WindowDoFnOperatorTest { public void testRestore() throws Exception { // test harness KeyedOneInputStreamOperatorTestHarness< - FlinkKey, WindowedValue>, WindowedValue>> + FlinkKey, WindowedValue>, WindowedValue>> testHarness = createTestHarness(getWindowDoFnOperator(true)); testHarness.open();