diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java index 5cf395eef55f..fc359c1d4ca9 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java @@ -173,10 +173,4 @@ public ExecutorService create(PipelineOptions options) { new ThreadFactoryBuilder().setNameFormat("Process Element Thread-%d").build()); } } - - @Description("Enable/disable late data dropping in GroupByKey/Combine transforms") - @Default.Boolean(false) - boolean getDropLateData(); - - void setDropLateData(boolean dropLateData); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java index 1b19275dd967..3ecd406da615 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java @@ -180,19 +180,11 @@ public TimerInternals timerInternals() { DoFnSchemaInformation.create(), Collections.emptyMap()); - final DoFnRunner, KV> dropLateDataRunner = - pipelineOptions.getDropLateData() - ? DoFnRunners.lateDataDroppingRunner( - doFnRunner, keyedInternals.timerInternals(), windowingStrategy) - : doFnRunner; - final SamzaExecutionContext executionContext = (SamzaExecutionContext) context.getApplicationContainerContext(); - final DoFnRunner, KV> doFnRunnerWithMetrics = + this.fnRunner = DoFnRunnerWithMetrics.wrap( - dropLateDataRunner, executionContext.getMetricsContainer(), transformFullName); - - this.fnRunner = new DoFnRunnerWithKeyedInternals<>(doFnRunnerWithMetrics, keyedInternals); + doFnRunner, executionContext.getMetricsContainer(), transformFullName); } @Override diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/GroupByKeyOpTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/GroupByKeyOpTest.java deleted file mode 100644 index 8670d9a46eac..000000000000 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/GroupByKeyOpTest.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.samza.runtime; - -import java.io.Serializable; -import java.util.Arrays; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestStream; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; - -/** Tests for GroupByKeyOp. */ -public class GroupByKeyOpTest implements Serializable { - @Rule - public final transient TestPipeline pipeline = - TestPipeline.fromOptions( - PipelineOptionsFactory.fromArgs("--runner=TestSamzaRunner").create()); - - @Rule - public final transient TestPipeline dropLateDataPipeline = - TestPipeline.fromOptions( - PipelineOptionsFactory.fromArgs("--runner=TestSamzaRunner", "--dropLateData=true") - .create()); - - @Test - public void testDefaultGbk() { - TestStream.Builder testStream = - TestStream.create(VarIntCoder.of()) - .addElements(TimestampedValue.of(1, new Instant(1000))) - .addElements(TimestampedValue.of(2, new Instant(2000))) - .advanceWatermarkTo(new Instant(3000)) - .addElements(TimestampedValue.of(10, new Instant(1000))) - .advanceWatermarkTo(new Instant(10000)); - - PCollection aggregated = - pipeline - .apply(testStream.advanceWatermarkToInfinity()) - .apply( - Window.into(FixedWindows.of(Duration.standardSeconds(3))) - .accumulatingFiredPanes()) - .apply(Combine.globally(Sum.ofIntegers()).withoutDefaults()); - - PAssert.that(aggregated).containsInAnyOrder(Arrays.asList(3, 10)); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testDropLateDataNonKeyed() { - TestStream.Builder testStream = - TestStream.create(VarIntCoder.of()) - .addElements(TimestampedValue.of(1, new Instant(1000))) - .addElements(TimestampedValue.of(2, new Instant(2000))) - .advanceWatermarkTo(new Instant(3000)) - .addElements(TimestampedValue.of(10, new Instant(1000))) - .advanceWatermarkTo(new Instant(10000)); - - PCollection aggregated = - dropLateDataPipeline - .apply(testStream.advanceWatermarkToInfinity()) - .apply( - Window.into(FixedWindows.of(Duration.standardSeconds(3))) - .accumulatingFiredPanes()) - .apply(Combine.globally(Sum.ofIntegers()).withoutDefaults()); - - PAssert.that(aggregated).containsInAnyOrder(3); - - dropLateDataPipeline.run().waitUntilFinish(); - } - - @Test - public void testDropLateDataKeyed() { - TestStream.Builder> testStream = - TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) - .addElements(TimestampedValue.of(KV.of("a", 1), new Instant(1000))) - .addElements(TimestampedValue.of(KV.of("b", 2), new Instant(2000))) - .addElements(TimestampedValue.of(KV.of("a", 3), new Instant(2500))) - .advanceWatermarkTo(new Instant(3000)) - .addElements(TimestampedValue.of(KV.of("a", 10), new Instant(1000))) - .advanceWatermarkTo(new Instant(10000)); - - PCollection> aggregated = - dropLateDataPipeline - .apply(testStream.advanceWatermarkToInfinity()) - .apply( - Window.>into(FixedWindows.of(Duration.standardSeconds(3))) - .accumulatingFiredPanes()) - .apply(Sum.integersPerKey()); - - PAssert.that(aggregated).containsInAnyOrder(Arrays.asList(KV.of("a", 4), KV.of("b", 2))); - - dropLateDataPipeline.run().waitUntilFinish(); - } -}