Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Xinyu Liu committed Aug 14, 2024
1 parent 97fab20 commit f36e86d
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 57 deletions.
10 changes: 6 additions & 4 deletions samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@ public class TaskConfig extends MapConfig {
"task.transactional.state.retain.existing.state";
private static final boolean DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true;

public static final String WATERMARK_IDLE_MS = "task.watermark.idle.ms";
public static final long DEFAULT_TASK_WATERMARK_IDLE_MS = -1L;
// This config allows excluding the tasks that have been "idle" in generating watermark for the configured time,
// so that the watermarks will still be generated from other active tasks.
public static final String WATERMARK_IDLE_TIMEOUT_MS = "task.watermark.idle.timeout.ms";
public static final long DEFAULT_TASK_WATERMARK_IDLE_TIMEOUT_MS = -1L;

public TaskConfig(Config config) {
super(config);
Expand Down Expand Up @@ -406,7 +408,7 @@ public boolean getTransactionalStateRetainExistingState() {
return getBoolean(TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE);
}

public long getWatermarkIdleMs() {
return getLong(WATERMARK_IDLE_MS, DEFAULT_TASK_WATERMARK_IDLE_MS);
public long getWatermarkIdleTimeoutMs() {
return getLong(WATERMARK_IDLE_TIMEOUT_MS, DEFAULT_TASK_WATERMARK_IDLE_TIMEOUT_MS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public OperatorImplGraph(OperatorSpecGraph specGraph, Context context, Clock clo
internalTaskContext.registerObject(WatermarkStates.class.getName(),
new WatermarkStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts,
context.getContainerContext().getContainerMetricsRegistry(),
taskConfig.getWatermarkIdleMs()));
taskConfig.getWatermarkIdleTimeoutMs()));

// set states for drain; don't include side inputs (see SAMZA-2303)
internalTaskContext.registerObject(DrainStates.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.LongSupplier;

Expand All @@ -38,6 +37,9 @@
/**
* This class manages the watermarks coming from input/intermediate streams in a task. Internally it keeps track
* of the latest watermark timestamp from each upstream task, and use the min as the consolidated watermark time.
* Lagging tasks can be excluded from the calculation by configuring "task.watermark.idle.timeout.ms", and watermarks
* will be computed from the other tasks. This will help unblock downstream aggregations, but at the risk of advancing
* event-time clock faster and events coming later from lagging tasks will become late arrivals.
*
* This class is thread-safe. However, having parallelism within a task may result in out-of-order processing
* and inaccurate watermarks. In this scenario, watermarks might be emitted before the previous messages fully processed.
Expand All @@ -51,14 +53,14 @@ private final static class WatermarkState {
private final int expectedTotal;
private final Map<String, Long> timestamps = new HashMap<>();
private final Map<String, Long> lastUpdateTime = new HashMap<>();
private final long watermarkIdleTime;
private final long watermarkIdleTimeout;
private final long createTime;
private final LongSupplier systemTimeFunc;
private volatile long watermarkTime = WATERMARK_NOT_EXIST;

WatermarkState(int expectedTotal, long watermarkIdleTime, LongSupplier systemTimeFunc) {
WatermarkState(int expectedTotal, long watermarkIdleTimeout, LongSupplier systemTimeFunc) {
this.expectedTotal = expectedTotal;
this.watermarkIdleTime = watermarkIdleTime;
this.watermarkIdleTimeout = watermarkIdleTimeout;
this.systemTimeFunc = systemTimeFunc;
this.createTime = systemTimeFunc.getAsLong();
}
Expand All @@ -80,18 +82,23 @@ synchronized void update(long timestamp, String taskName) {
// we get watermark either from the source or from the aggregator task
watermarkTime = Math.max(watermarkTime, timestamp);
} else if (canUpdateWatermark(currentTime)) {
Optional<Long> min;
if (watermarkIdleTime <= 0) {
final long minWatermark;
if (watermarkIdleTimeout <= 0) {
// All upstream tasks are required in the computation
min = timestamps.values().stream().min(Long::compare);
minWatermark = timestamps.values().stream().min(Long::compare).orElse(timestamp);
} else {
// Exclude the tasks that have been idle in watermark emission.
min = timestamps.entrySet().stream()
.filter(t -> currentTime - lastUpdateTime.get(t.getKey()) < watermarkIdleTime)
.map(Map.Entry::getValue)
.min(Long::compare);
long min = Long.MAX_VALUE;
long watermarkIdleThreshold = currentTime - watermarkIdleTimeout;
for (Map.Entry<String, Long> entry : timestamps.entrySet()) {
// Check the update happens before the idle timeout
if (lastUpdateTime.get(entry.getKey()) > watermarkIdleThreshold) {
min = Math.min(min, entry.getValue());
}
}
minWatermark = min == Long.MAX_VALUE ? WATERMARK_NOT_EXIST : min;
}
watermarkTime = Math.max(watermarkTime, min.orElse(timestamp));
watermarkTime = Math.max(watermarkTime, minWatermark);
}
}

Expand All @@ -100,7 +107,8 @@ private boolean canUpdateWatermark(long currentTime) {
// 1. we received watermarks from all upstream tasks, or
// 2. we allow task idle in emitting watermarks and the idle time has passed.
return (timestamps.size() == expectedTotal)
|| (watermarkIdleTime > 0 && currentTime - createTime > watermarkIdleTime);
// Handle the case we didn't receive the watermarks from some tasks since startup
|| (watermarkIdleTimeout > 0 && currentTime - createTime > watermarkIdleTimeout);
}

long getWatermarkTime() {
Expand All @@ -111,37 +119,35 @@ long getWatermarkTime() {
private final Map<SystemStreamPartition, WatermarkState> watermarkStates;
private final List<SystemStreamPartition> intermediateSsps;
private final WatermarkMetrics watermarkMetrics;
private final LongSupplier systemTimeFunc;

WatermarkStates(
Set<SystemStreamPartition> ssps,
Map<SystemStream, Integer> producerTaskCounts,
MetricsRegistry metricsRegistry,
long watermarkIdleTime) {
this(ssps, producerTaskCounts, metricsRegistry, watermarkIdleTime, System::currentTimeMillis);
long watermarkIdleTimeout) {
this(ssps, producerTaskCounts, metricsRegistry, watermarkIdleTimeout, System::currentTimeMillis);
}

//Internal: test-only
WatermarkStates(
Set<SystemStreamPartition> ssps,
Map<SystemStream, Integer> producerTaskCounts,
MetricsRegistry metricsRegistry,
long watermarkIdleTime,
long watermarkIdleTimeout,
LongSupplier systemTimeFunc) {
final Map<SystemStreamPartition, WatermarkState> states = new HashMap<>();
final List<SystemStreamPartition> intSsps = new ArrayList<>();

ssps.forEach(ssp -> {
final int producerCount = producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0);
states.put(ssp, new WatermarkState(producerCount, watermarkIdleTime, systemTimeFunc));
states.put(ssp, new WatermarkState(producerCount, watermarkIdleTimeout, systemTimeFunc));
if (producerCount != 0) {
intSsps.add(ssp);
}
});
this.watermarkStates = Collections.unmodifiableMap(states);
this.watermarkMetrics = new WatermarkMetrics(metricsRegistry);
this.intermediateSsps = Collections.unmodifiableList(intSsps);
this.systemTimeFunc = systemTimeFunc;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,15 @@
import static org.apache.samza.operators.impl.WatermarkStates.WATERMARK_NOT_EXIST;

public class TestWatermarkStates {
static final long TASK_WATERMARK_IDLE_MS = 600000;

SystemStream input;
SystemStream intermediate;
Set<SystemStreamPartition> ssps;
SystemStreamPartition inputPartition0;
SystemStreamPartition intPartition0;
SystemStreamPartition intPartition1;
Map<SystemStream, Integer> producerCounts;

static final long TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS = 600000;

private SystemStream input;
private SystemStream intermediate;
private Set<SystemStreamPartition> ssps;
private SystemStreamPartition inputPartition0;
private SystemStreamPartition intPartition0;
private SystemStreamPartition intPartition1;
private Map<SystemStream, Integer> producerCounts;

@Before
public void setup() {
Expand All @@ -69,12 +67,11 @@ public void setup() {
producerCounts.put(intermediate, 2);
}


@Test
public void testUpdate() {
// advance watermark on input to 5
WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(),
TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_MS);
TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_TIMEOUT_MS);
IncomingMessageEnvelope envelope = IncomingMessageEnvelope.buildWatermarkEnvelope(inputPartition0, 5L);
watermarkStates.update((WatermarkMessage) envelope.getMessage(),
envelope.getSystemStreamPartition());
Expand Down Expand Up @@ -123,22 +120,29 @@ public void testUpdate() {

@Test
public void testIdle() {
MockSystemTime systemTime = new MockSystemTime();
WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(),
TASK_WATERMARK_IDLE_MS, new MockSystemTime());
TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS, systemTime);

// First wm is marked before the idle time, so it's not counted
// First watermark
WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);

// Watermark is computed based on "task 1" since "task 0" passes the idle time
// Advance currentTime to pass the idle timeout
systemTime.advance(TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS);

// Watermark is computed based on "task 1" alone since "task 0" passes the idle timeout
watermarkMessage = new WatermarkMessage(5L, "task 1");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);

// Watermark is computed based on "task 0" since the time already passes the idle threshold
// Advance currentTime without exceeding the timeout
systemTime.advance(1);

// Watermark is computed based on "task 0" since the currentTime already passes the idle threshold
watermarkMessage = new WatermarkMessage(6L, "task 0");
watermarkStates.update(watermarkMessage, intPartition1);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 6L);
Expand All @@ -151,43 +155,44 @@ public void testIdle() {
// verify we got a watermark (min) for int stream
assertEquals(watermarkStates.getWatermark(intermediate), 5L);

// Advance currentTime without exceeding the timeout
systemTime.advance(1);

// Watermark from "task 0" is updated, but less than current watermark
watermarkMessage = new WatermarkMessage(3L, "task 0");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L);
assertEquals(watermarkStates.getWatermark(intermediate), 5L);

// Watermark is computed this time due to advance in "task 0"
// Advance currentTime without exceeding the timeout
systemTime.advance(1);

// Watermark is computed this currentTime due to advance in "task 0"
watermarkMessage = new WatermarkMessage(7L, "task 0");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L);
assertEquals(watermarkStates.getWatermark(intermediate), 5L);

// Watermark is computed this time due to advance in "task 1"
// Advance currentTime without exceeding the timeout
systemTime.advance(1);

// Watermark is computed this currentTime due to advance in "task 1"
watermarkMessage = new WatermarkMessage(10L, "task 1");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 7L);
assertEquals(watermarkStates.getWatermark(intermediate), 6L);
}

class MockSystemTime implements LongSupplier {
int createTime = 0;
boolean firstWatermark = true;
static class MockSystemTime implements LongSupplier {
long currentTime = System.currentTimeMillis();

void advance(long delta) {
this.currentTime += delta;
}

@Override
public long getAsLong() {
if (createTime < ssps.size()) {
createTime++;
return System.currentTimeMillis() - TASK_WATERMARK_IDLE_MS;
}

if (firstWatermark) {
firstWatermark = false;
// Make the first task idle
return System.currentTimeMillis() - TASK_WATERMARK_IDLE_MS;
} else {
return System.currentTimeMillis();
}
return currentTime;
}
}
}

0 comments on commit f36e86d

Please sign in to comment.