diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java index fc359c1d4ca9..5cf395eef55f 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java @@ -173,4 +173,10 @@ public ExecutorService create(PipelineOptions options) { new ThreadFactoryBuilder().setNameFormat("Process Element Thread-%d").build()); } } + + @Description("Enable/disable late data dropping in GroupByKey/Combine transforms") + @Default.Boolean(false) + boolean getDropLateData(); + + void setDropLateData(boolean dropLateData); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java index 3ecd406da615..1b19275dd967 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java @@ -180,11 +180,19 @@ public TimerInternals timerInternals() { DoFnSchemaInformation.create(), Collections.emptyMap()); + final DoFnRunner, KV> dropLateDataRunner = + pipelineOptions.getDropLateData() + ? DoFnRunners.lateDataDroppingRunner( + doFnRunner, keyedInternals.timerInternals(), windowingStrategy) + : doFnRunner; + final SamzaExecutionContext executionContext = (SamzaExecutionContext) context.getApplicationContainerContext(); - this.fnRunner = + final DoFnRunner, KV> doFnRunnerWithMetrics = DoFnRunnerWithMetrics.wrap( - doFnRunner, executionContext.getMetricsContainer(), transformFullName); + dropLateDataRunner, executionContext.getMetricsContainer(), transformFullName); + + this.fnRunner = new DoFnRunnerWithKeyedInternals<>(doFnRunnerWithMetrics, keyedInternals); } @Override diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/GroupByKeyOpTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/GroupByKeyOpTest.java new file mode 100644 index 000000000000..8670d9a46eac --- /dev/null +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/GroupByKeyOpTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.runtime; + +import java.io.Serializable; +import java.util.Arrays; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; + +/** Tests for GroupByKeyOp. */ +public class GroupByKeyOpTest implements Serializable { + @Rule + public final transient TestPipeline pipeline = + TestPipeline.fromOptions( + PipelineOptionsFactory.fromArgs("--runner=TestSamzaRunner").create()); + + @Rule + public final transient TestPipeline dropLateDataPipeline = + TestPipeline.fromOptions( + PipelineOptionsFactory.fromArgs("--runner=TestSamzaRunner", "--dropLateData=true") + .create()); + + @Test + public void testDefaultGbk() { + TestStream.Builder testStream = + TestStream.create(VarIntCoder.of()) + .addElements(TimestampedValue.of(1, new Instant(1000))) + .addElements(TimestampedValue.of(2, new Instant(2000))) + .advanceWatermarkTo(new Instant(3000)) + .addElements(TimestampedValue.of(10, new Instant(1000))) + .advanceWatermarkTo(new Instant(10000)); + + PCollection aggregated = + pipeline + .apply(testStream.advanceWatermarkToInfinity()) + .apply( + Window.into(FixedWindows.of(Duration.standardSeconds(3))) + .accumulatingFiredPanes()) + .apply(Combine.globally(Sum.ofIntegers()).withoutDefaults()); + + PAssert.that(aggregated).containsInAnyOrder(Arrays.asList(3, 10)); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testDropLateDataNonKeyed() { + TestStream.Builder testStream = + TestStream.create(VarIntCoder.of()) + .addElements(TimestampedValue.of(1, new Instant(1000))) + .addElements(TimestampedValue.of(2, new Instant(2000))) + .advanceWatermarkTo(new Instant(3000)) + .addElements(TimestampedValue.of(10, new Instant(1000))) + .advanceWatermarkTo(new Instant(10000)); + + PCollection aggregated = + dropLateDataPipeline + .apply(testStream.advanceWatermarkToInfinity()) + .apply( + Window.into(FixedWindows.of(Duration.standardSeconds(3))) + .accumulatingFiredPanes()) + .apply(Combine.globally(Sum.ofIntegers()).withoutDefaults()); + + PAssert.that(aggregated).containsInAnyOrder(3); + + dropLateDataPipeline.run().waitUntilFinish(); + } + + @Test + public void testDropLateDataKeyed() { + TestStream.Builder> testStream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .addElements(TimestampedValue.of(KV.of("a", 1), new Instant(1000))) + .addElements(TimestampedValue.of(KV.of("b", 2), new Instant(2000))) + .addElements(TimestampedValue.of(KV.of("a", 3), new Instant(2500))) + .advanceWatermarkTo(new Instant(3000)) + .addElements(TimestampedValue.of(KV.of("a", 10), new Instant(1000))) + .advanceWatermarkTo(new Instant(10000)); + + PCollection> aggregated = + dropLateDataPipeline + .apply(testStream.advanceWatermarkToInfinity()) + .apply( + Window.>into(FixedWindows.of(Duration.standardSeconds(3))) + .accumulatingFiredPanes()) + .apply(Sum.integersPerKey()); + + PAssert.that(aggregated).containsInAnyOrder(Arrays.asList(KV.of("a", 4), KV.of("b", 2))); + + dropLateDataPipeline.run().waitUntilFinish(); + } +}