Skip to content

Commit

Permalink
Make default task.watermark.idle.ms to be -1
Browse files Browse the repository at this point in the history
  • Loading branch information
Xinyu Liu committed Aug 13, 2024
1 parent 41cc049 commit a5fd2e9
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public class TaskConfig extends MapConfig {
private static final boolean DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true;

public static final String WATERMARK_IDLE_MS = "task.watermark.idle.ms";
public static final long DEFAULT_TASK_WATERMARK_IDLE_MS = 600000;
public static final long DEFAULT_TASK_WATERMARK_IDLE_MS = -1L;

public TaskConfig(Config config) {
super(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ synchronized void update(long timestamp, String taskName) {
} else if (timestamps.size() == expectedTotal) {
// For any intermediate streams, the expectedTotal is the upstream task count.
// Check whether we got all the watermarks, and set the watermark to be the min.
// Exclude the tasks that have been idle in watermark emission.
Optional<Long> min = timestamps.entrySet().stream()
.filter(t -> currentTime - lastUpdateTime.get(t.getKey()) < watermarkIdleTime)
.map(Map.Entry::getValue)
.min(Long::compare);
Optional<Long> min;
if (watermarkIdleTime <= 0) {
min = timestamps.values().stream().min(Long::compare);
} else {
// Exclude the tasks that have been idle in watermark emission.
min = timestamps.entrySet().stream()
.filter(t -> currentTime - lastUpdateTime.get(t.getKey()) < watermarkIdleTime)
.map(Map.Entry::getValue).min(Long::compare);
}
watermarkTime = min.orElse(timestamp);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import static org.apache.samza.operators.impl.WatermarkStates.WATERMARK_NOT_EXIST;

public class TestWatermarkStates {
static final long TASK_WATERMARK_IDLE_MS = 600000;

SystemStream input;
SystemStream intermediate;
Set<SystemStreamPartition> ssps;
Expand All @@ -48,6 +50,8 @@ public class TestWatermarkStates {
SystemStreamPartition intPartition1;
Map<SystemStream, Integer> producerCounts;



@Before
public void setup() {
input = new SystemStream("system", "input");
Expand Down Expand Up @@ -120,7 +124,7 @@ public void testUpdate() {
@Test
public void testIdle() {
WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(),
TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_MS, new MockSystemTime());
TASK_WATERMARK_IDLE_MS, new MockSystemTime());

WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0");
watermarkStates.update(watermarkMessage, intPartition0);
Expand Down Expand Up @@ -153,11 +157,10 @@ public long getAsLong() {
if (firstTime) {
firstTime = false;
// Make the first task idle
return System.currentTimeMillis() - TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_MS;
return System.currentTimeMillis() - TASK_WATERMARK_IDLE_MS;
} else {
return System.currentTimeMillis();
}
}
}

}

0 comments on commit a5fd2e9

Please sign in to comment.