Skip to content

Commit

Permalink
Initial impl to add task watermark idle time
Browse files Browse the repository at this point in the history
  • Loading branch information
Xinyu Liu committed Aug 13, 2024
1 parent 5b4b1b7 commit 41cc049
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ public class TaskConfig extends MapConfig {
"task.transactional.state.retain.existing.state";
private static final boolean DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true;

public static final String WATERMARK_IDLE_MS = "task.watermark.idle.ms";
public static final long DEFAULT_TASK_WATERMARK_IDLE_MS = 600000;

public TaskConfig(Config config) {
super(config);
}
Expand Down Expand Up @@ -402,4 +405,8 @@ public boolean getTransactionalStateRestoreEnabled() {
public boolean getTransactionalStateRetainExistingState() {
return getBoolean(TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE);
}

public long getWatermarkIdleMs() {
return getLong(WATERMARK_IDLE_MS, DEFAULT_TASK_WATERMARK_IDLE_MS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Multimap;
import org.apache.samza.config.Config;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.context.Context;
import org.apache.samza.context.InternalTaskContext;
import org.apache.samza.job.model.JobModel;
Expand Down Expand Up @@ -100,7 +101,8 @@ public class OperatorImplGraph {
*/
public OperatorImplGraph(OperatorSpecGraph specGraph, Context context, Clock clock) {
this.clock = clock;
StreamConfig streamConfig = new StreamConfig(context.getJobContext().getConfig());
Config config = context.getJobContext().getConfig();
StreamConfig streamConfig = new StreamConfig(config);
this.internalTaskContext = new InternalTaskContext(context);
Map<SystemStream, Integer> producerTaskCounts =
hasIntermediateStreams(specGraph)
Expand All @@ -117,9 +119,11 @@ public OperatorImplGraph(OperatorSpecGraph specGraph, Context context, Clock clo
new EndOfStreamStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts));

// set states for watermark; don't include side inputs (see SAMZA-2303)
TaskConfig taskConfig = new TaskConfig(config);
internalTaskContext.registerObject(WatermarkStates.class.getName(),
new WatermarkStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts,
context.getContainerContext().getContainerMetricsRegistry()));
context.getContainerContext().getContainerMetricsRegistry(),
taskConfig.getWatermarkIdleMs()));

// set states for drain; don't include side inputs (see SAMZA-2303)
internalTaskContext.registerObject(DrainStates.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.LongSupplier;

import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemStream;
Expand All @@ -49,20 +50,27 @@ class WatermarkStates {
private final static class WatermarkState {
private final int expectedTotal;
private final Map<String, Long> timestamps = new HashMap<>();
private final Map<String, Long> lastUpdateTime = new HashMap<>();
private final long watermarkIdleTime;
private final LongSupplier systemTimeFunc;
private volatile long watermarkTime = WATERMARK_NOT_EXIST;

WatermarkState(int expectedTotal) {
WatermarkState(int expectedTotal, long watermarkIdleTime, LongSupplier systemTimeFunc) {
this.expectedTotal = expectedTotal;
this.watermarkIdleTime = watermarkIdleTime;
this.systemTimeFunc = systemTimeFunc;
}

synchronized void update(long timestamp, String taskName) {
long currentTime = systemTimeFunc.getAsLong();
if (taskName != null) {
Long ts = timestamps.get(taskName);
if (ts != null && ts > timestamp) {
LOG.warn(String.format("Incoming watermark %s is smaller than existing watermark %s for upstream task %s",
timestamp, ts, taskName));
} else {
timestamps.put(taskName, timestamp);
lastUpdateTime.put(taskName, currentTime);
}
}

Expand All @@ -72,7 +80,11 @@ synchronized void update(long timestamp, String taskName) {
} else if (timestamps.size() == expectedTotal) {
// For any intermediate streams, the expectedTotal is the upstream task count.
// Check whether we got all the watermarks, and set the watermark to be the min.
Optional<Long> min = timestamps.values().stream().min(Long::compare);
// Exclude the tasks that have been idle in watermark emission.
Optional<Long> min = timestamps.entrySet().stream()
.filter(t -> currentTime - lastUpdateTime.get(t.getKey()) < watermarkIdleTime)
.map(Map.Entry::getValue)
.min(Long::compare);
watermarkTime = min.orElse(timestamp);
}
}
Expand All @@ -85,24 +97,37 @@ long getWatermarkTime() {
private final Map<SystemStreamPartition, WatermarkState> watermarkStates;
private final List<SystemStreamPartition> intermediateSsps;
private final WatermarkMetrics watermarkMetrics;
private final LongSupplier systemTimeFunc;

WatermarkStates(
Set<SystemStreamPartition> ssps,
Map<SystemStream, Integer> producerTaskCounts,
MetricsRegistry metricsRegistry,
long watermarkIdleTime) {
this(ssps, producerTaskCounts, metricsRegistry, watermarkIdleTime, System::currentTimeMillis);
}

//Internal: test-only
WatermarkStates(
Set<SystemStreamPartition> ssps,
Map<SystemStream, Integer> producerTaskCounts,
MetricsRegistry metricsRegistry) {
MetricsRegistry metricsRegistry,
long watermarkIdleTime,
LongSupplier systemTimeFunc) {
final Map<SystemStreamPartition, WatermarkState> states = new HashMap<>();
final List<SystemStreamPartition> intSsps = new ArrayList<>();

ssps.forEach(ssp -> {
final int producerCount = producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0);
states.put(ssp, new WatermarkState(producerCount));
states.put(ssp, new WatermarkState(producerCount, watermarkIdleTime, systemTimeFunc));
if (producerCount != 0) {
intSsps.add(ssp);
}
});
this.watermarkStates = Collections.unmodifiableMap(states);
this.watermarkMetrics = new WatermarkMetrics(metricsRegistry);
this.intermediateSsps = Collections.unmodifiableList(intSsps);
this.systemTimeFunc = systemTimeFunc;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,54 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.LongSupplier;

import org.apache.samza.Partition;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.WatermarkMessage;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

import static org.apache.samza.operators.impl.WatermarkStates.WATERMARK_NOT_EXIST;

public class TestWatermarkStates {
SystemStream input;
SystemStream intermediate;
Set<SystemStreamPartition> ssps;
SystemStreamPartition inputPartition0;
SystemStreamPartition intPartition0;
SystemStreamPartition intPartition1;
Map<SystemStream, Integer> producerCounts;

@Test
public void testUpdate() {
SystemStream input = new SystemStream("system", "input");
SystemStream intermediate = new SystemStream("system", "intermediate");
@Before
public void setup() {
input = new SystemStream("system", "input");
intermediate = new SystemStream("system", "intermediate");

Set<SystemStreamPartition> ssps = new HashSet<>();
SystemStreamPartition inputPartition0 = new SystemStreamPartition(input, new Partition(0));
SystemStreamPartition intPartition0 = new SystemStreamPartition(intermediate, new Partition(0));
SystemStreamPartition intPartition1 = new SystemStreamPartition(intermediate, new Partition(1));
ssps = new HashSet<>();
inputPartition0 = new SystemStreamPartition(input, new Partition(0));
intPartition0 = new SystemStreamPartition(intermediate, new Partition(0));
intPartition1 = new SystemStreamPartition(intermediate, new Partition(1));
ssps.add(inputPartition0);
ssps.add(intPartition0);
ssps.add(intPartition1);

Map<SystemStream, Integer> producerCounts = new HashMap<>();
producerCounts = new HashMap<>();
producerCounts.put(intermediate, 2);
}


@Test
public void testUpdate() {
// advance watermark on input to 5
WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap());
WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(),
TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_MS);
IncomingMessageEnvelope envelope = IncomingMessageEnvelope.buildWatermarkEnvelope(inputPartition0, 5L);
watermarkStates.update((WatermarkMessage) envelope.getMessage(),
envelope.getSystemStreamPartition());
Expand Down Expand Up @@ -100,4 +116,48 @@ public void testUpdate() {
// verify we got a watermark 6 (min) for int stream
assertEquals(watermarkStates.getWatermark(intermediate), 6L);
}

@Test
public void testIdle() {
WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(),
TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_MS, new MockSystemTime());

WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);

watermarkMessage = new WatermarkMessage(5L, "task 1");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);

watermarkMessage = new WatermarkMessage(6L, "task 0");
watermarkStates.update(watermarkMessage, intPartition1);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), WATERMARK_NOT_EXIST);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);

// watermark from task 1 on int p1 to 4
watermarkMessage = new WatermarkMessage(10L, "task 1");
watermarkStates.update(watermarkMessage, intPartition1);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 6L);
// verify we got a watermark 3 (min) for int stream
assertEquals(watermarkStates.getWatermark(intermediate), 5L);
}

static class MockSystemTime implements LongSupplier {
boolean firstTime = true;

@Override
public long getAsLong() {
if (firstTime) {
firstTime = false;
// Make the first task idle
return System.currentTimeMillis() - TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_MS;
} else {
return System.currentTimeMillis();
}
}
}

}

0 comments on commit 41cc049

Please sign in to comment.