diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java index b1a51e5220..33bc0c3f4a 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java @@ -147,6 +147,9 @@ public class TaskConfig extends MapConfig { "task.transactional.state.retain.existing.state"; private static final boolean DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true; + public static final String WATERMARK_IDLE_MS = "task.watermark.idle.ms"; + public static final long DEFAULT_TASK_WATERMARK_IDLE_MS = 600000; + public TaskConfig(Config config) { super(config); } @@ -402,4 +405,8 @@ public boolean getTransactionalStateRestoreEnabled() { public boolean getTransactionalStateRetainExistingState() { return getBoolean(TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE); } + + public long getWatermarkIdleMs() { + return getLong(WATERMARK_IDLE_MS, DEFAULT_TASK_WATERMARK_IDLE_MS); + } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index c62b0b2ec9..c778a1b555 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -23,6 +23,7 @@ import com.google.common.collect.Multimap; import org.apache.samza.config.Config; import org.apache.samza.config.StreamConfig; +import org.apache.samza.config.TaskConfig; import org.apache.samza.context.Context; import org.apache.samza.context.InternalTaskContext; import org.apache.samza.job.model.JobModel; @@ -100,7 +101,8 @@ public class OperatorImplGraph { */ public OperatorImplGraph(OperatorSpecGraph specGraph, Context context, Clock clock) { this.clock = clock; - StreamConfig streamConfig = new StreamConfig(context.getJobContext().getConfig()); + Config config = context.getJobContext().getConfig(); + StreamConfig streamConfig = new StreamConfig(config); this.internalTaskContext = new InternalTaskContext(context); Map producerTaskCounts = hasIntermediateStreams(specGraph) @@ -117,9 +119,11 @@ public OperatorImplGraph(OperatorSpecGraph specGraph, Context context, Clock clo new EndOfStreamStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts)); // set states for watermark; don't include side inputs (see SAMZA-2303) + TaskConfig taskConfig = new TaskConfig(config); internalTaskContext.registerObject(WatermarkStates.class.getName(), new WatermarkStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts, - context.getContainerContext().getContainerMetricsRegistry())); + context.getContainerContext().getContainerMetricsRegistry(), + taskConfig.getWatermarkIdleMs())); // set states for drain; don't include side inputs (see SAMZA-2303) internalTaskContext.registerObject(DrainStates.class.getName(), diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java index 84e0687d6d..293dd50ba6 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.LongSupplier; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.SystemStream; @@ -49,13 +50,19 @@ class WatermarkStates { private final static class WatermarkState { private final int expectedTotal; private final Map timestamps = new HashMap<>(); + private final Map lastUpdateTime = new HashMap<>(); + private final long watermarkIdleTime; + private final LongSupplier systemTimeFunc; private volatile long watermarkTime = WATERMARK_NOT_EXIST; - WatermarkState(int expectedTotal) { + WatermarkState(int expectedTotal, long watermarkIdleTime, LongSupplier systemTimeFunc) { this.expectedTotal = expectedTotal; + this.watermarkIdleTime = watermarkIdleTime; + this.systemTimeFunc = systemTimeFunc; } synchronized void update(long timestamp, String taskName) { + long currentTime = systemTimeFunc.getAsLong(); if (taskName != null) { Long ts = timestamps.get(taskName); if (ts != null && ts > timestamp) { @@ -63,6 +70,7 @@ synchronized void update(long timestamp, String taskName) { timestamp, ts, taskName)); } else { timestamps.put(taskName, timestamp); + lastUpdateTime.put(taskName, currentTime); } } @@ -72,7 +80,11 @@ synchronized void update(long timestamp, String taskName) { } else if (timestamps.size() == expectedTotal) { // For any intermediate streams, the expectedTotal is the upstream task count. // Check whether we got all the watermarks, and set the watermark to be the min. - Optional min = timestamps.values().stream().min(Long::compare); + // Exclude the tasks that have been idle in watermark emission. + Optional min = timestamps.entrySet().stream() + .filter(t -> currentTime - lastUpdateTime.get(t.getKey()) < watermarkIdleTime) + .map(Map.Entry::getValue) + .min(Long::compare); watermarkTime = min.orElse(timestamp); } } @@ -85,17 +97,29 @@ long getWatermarkTime() { private final Map watermarkStates; private final List intermediateSsps; private final WatermarkMetrics watermarkMetrics; + private final LongSupplier systemTimeFunc; + WatermarkStates( + Set ssps, + Map producerTaskCounts, + MetricsRegistry metricsRegistry, + long watermarkIdleTime) { + this(ssps, producerTaskCounts, metricsRegistry, watermarkIdleTime, System::currentTimeMillis); + } + + //Internal: test-only WatermarkStates( Set ssps, Map producerTaskCounts, - MetricsRegistry metricsRegistry) { + MetricsRegistry metricsRegistry, + long watermarkIdleTime, + LongSupplier systemTimeFunc) { final Map states = new HashMap<>(); final List intSsps = new ArrayList<>(); ssps.forEach(ssp -> { final int producerCount = producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0); - states.put(ssp, new WatermarkState(producerCount)); + states.put(ssp, new WatermarkState(producerCount, watermarkIdleTime, systemTimeFunc)); if (producerCount != 0) { intSsps.add(ssp); } @@ -103,6 +127,7 @@ long getWatermarkTime() { this.watermarkStates = Collections.unmodifiableMap(states); this.watermarkMetrics = new WatermarkMetrics(metricsRegistry); this.intermediateSsps = Collections.unmodifiableList(intSsps); + this.systemTimeFunc = systemTimeFunc; } /** diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java index 2b41ed2bac..1254e4634b 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java @@ -23,12 +23,16 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.function.LongSupplier; + import org.apache.samza.Partition; +import org.apache.samza.config.TaskConfig; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.WatermarkMessage; +import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -36,25 +40,37 @@ import static org.apache.samza.operators.impl.WatermarkStates.WATERMARK_NOT_EXIST; public class TestWatermarkStates { + SystemStream input; + SystemStream intermediate; + Set ssps; + SystemStreamPartition inputPartition0; + SystemStreamPartition intPartition0; + SystemStreamPartition intPartition1; + Map producerCounts; - @Test - public void testUpdate() { - SystemStream input = new SystemStream("system", "input"); - SystemStream intermediate = new SystemStream("system", "intermediate"); + @Before + public void setup() { + input = new SystemStream("system", "input"); + intermediate = new SystemStream("system", "intermediate"); - Set ssps = new HashSet<>(); - SystemStreamPartition inputPartition0 = new SystemStreamPartition(input, new Partition(0)); - SystemStreamPartition intPartition0 = new SystemStreamPartition(intermediate, new Partition(0)); - SystemStreamPartition intPartition1 = new SystemStreamPartition(intermediate, new Partition(1)); + ssps = new HashSet<>(); + inputPartition0 = new SystemStreamPartition(input, new Partition(0)); + intPartition0 = new SystemStreamPartition(intermediate, new Partition(0)); + intPartition1 = new SystemStreamPartition(intermediate, new Partition(1)); ssps.add(inputPartition0); ssps.add(intPartition0); ssps.add(intPartition1); - Map producerCounts = new HashMap<>(); + producerCounts = new HashMap<>(); producerCounts.put(intermediate, 2); + } + + @Test + public void testUpdate() { // advance watermark on input to 5 - WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap()); + WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(), + TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_MS); IncomingMessageEnvelope envelope = IncomingMessageEnvelope.buildWatermarkEnvelope(inputPartition0, 5L); watermarkStates.update((WatermarkMessage) envelope.getMessage(), envelope.getSystemStreamPartition()); @@ -100,4 +116,48 @@ public void testUpdate() { // verify we got a watermark 6 (min) for int stream assertEquals(watermarkStates.getWatermark(intermediate), 6L); } + + @Test + public void testIdle() { + WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(), + TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_MS, new MockSystemTime()); + + WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + + watermarkMessage = new WatermarkMessage(5L, "task 1"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + + watermarkMessage = new WatermarkMessage(6L, "task 0"); + watermarkStates.update(watermarkMessage, intPartition1); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), WATERMARK_NOT_EXIST); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + + // watermark from task 1 on int p1 to 4 + watermarkMessage = new WatermarkMessage(10L, "task 1"); + watermarkStates.update(watermarkMessage, intPartition1); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 6L); + // verify we got a watermark 3 (min) for int stream + assertEquals(watermarkStates.getWatermark(intermediate), 5L); + } + + static class MockSystemTime implements LongSupplier { + boolean firstTime = true; + + @Override + public long getAsLong() { + if (firstTime) { + firstTime = false; + // Make the first task idle + return System.currentTimeMillis() - TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_MS; + } else { + return System.currentTimeMillis(); + } + } + } + }