From a5fd2e99beb56147f36e9a19482513c03ff488e4 Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Tue, 13 Aug 2024 14:32:05 -0700 Subject: [PATCH] Make default task.watermark.idle.ms to be -1 --- .../java/org/apache/samza/config/TaskConfig.java | 2 +- .../samza/operators/impl/WatermarkStates.java | 14 +++++++++----- .../samza/operators/impl/TestWatermarkStates.java | 9 ++++++--- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java index 33bc0c3f4a..d3a7fe4527 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java @@ -148,7 +148,7 @@ public class TaskConfig extends MapConfig { private static final boolean DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true; public static final String WATERMARK_IDLE_MS = "task.watermark.idle.ms"; - public static final long DEFAULT_TASK_WATERMARK_IDLE_MS = 600000; + public static final long DEFAULT_TASK_WATERMARK_IDLE_MS = -1L; public TaskConfig(Config config) { super(config); diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java index 293dd50ba6..98330c1cca 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java @@ -80,11 +80,15 @@ synchronized void update(long timestamp, String taskName) { } else if (timestamps.size() == expectedTotal) { // For any intermediate streams, the expectedTotal is the upstream task count. // Check whether we got all the watermarks, and set the watermark to be the min. - // Exclude the tasks that have been idle in watermark emission. - Optional min = timestamps.entrySet().stream() - .filter(t -> currentTime - lastUpdateTime.get(t.getKey()) < watermarkIdleTime) - .map(Map.Entry::getValue) - .min(Long::compare); + Optional min; + if (watermarkIdleTime <= 0) { + min = timestamps.values().stream().min(Long::compare); + } else { + // Exclude the tasks that have been idle in watermark emission. + min = timestamps.entrySet().stream() + .filter(t -> currentTime - lastUpdateTime.get(t.getKey()) < watermarkIdleTime) + .map(Map.Entry::getValue).min(Long::compare); + } watermarkTime = min.orElse(timestamp); } } diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java index 1254e4634b..d93f7c08f1 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java @@ -40,6 +40,8 @@ import static org.apache.samza.operators.impl.WatermarkStates.WATERMARK_NOT_EXIST; public class TestWatermarkStates { + static final long TASK_WATERMARK_IDLE_MS = 600000; + SystemStream input; SystemStream intermediate; Set ssps; @@ -48,6 +50,8 @@ public class TestWatermarkStates { SystemStreamPartition intPartition1; Map producerCounts; + + @Before public void setup() { input = new SystemStream("system", "input"); @@ -120,7 +124,7 @@ public void testUpdate() { @Test public void testIdle() { WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(), - TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_MS, new MockSystemTime()); + TASK_WATERMARK_IDLE_MS, new MockSystemTime()); WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0"); watermarkStates.update(watermarkMessage, intPartition0); @@ -153,11 +157,10 @@ public long getAsLong() { if (firstTime) { firstTime = false; // Make the first task idle - return System.currentTimeMillis() - TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_MS; + return System.currentTimeMillis() - TASK_WATERMARK_IDLE_MS; } else { return System.currentTimeMillis(); } } } - }