Skip to content

Commit

Permalink
[server][dvc] Lag arithmetic fix for empty topics (#911)
Browse files Browse the repository at this point in the history
There is an edge case where an empty topic with an end offset of zero
gets mistaken for an error code, and results in returning Long.MAX_VALUE
as the lag instead.

Added a unit test for SIT::measureLagWithCallToPubSub, and refactored that
function to make it easier to test with a static overload.

Also added @test to a recently added unit test in SITTest which was missing
that annotation.
  • Loading branch information
FelixGV authored Mar 21, 2024
1 parent a34662e commit 8d1f6bc
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2412,15 +2412,42 @@ protected long measureLagWithCallToPubSub(
PubSubTopic topic,
int partition,
long currentOffset) {
if (currentOffset < 0) {
return measureLagWithCallToPubSub(pubSubServerName, topic, partition, currentOffset, this::getTopicManager);
}

protected static long measureLagWithCallToPubSub(
String pubSubServerName,
PubSubTopic topic,
int partition,
long currentOffset,
Function<String, TopicManager> topicManagerProvider) {
if (currentOffset < OffsetRecord.LOWEST_OFFSET) {
// -1 is a valid offset, which means that nothing was consumed yet, but anything below that is invalid.
return Long.MAX_VALUE;
}
TopicManager tm = getTopicManager(pubSubServerName);
long endOffset = tm.getLatestOffsetCached(topic, partition) - 1;
TopicManager tm = topicManagerProvider.apply(pubSubServerName);
long endOffset = tm.getLatestOffsetCached(topic, partition);
if (endOffset < 0) {
// A negative value means there was a problem in measuring the end offset, and therefore we return "infinite lag"
return Long.MAX_VALUE;
} else if (endOffset == 0) {
/**
* Topics which were never produced to have an end offset of zero. Such topics are empty and therefore, by
* definition, there cannot be any lag.
*
* Note that the reverse is not true: a topic can be currently empty and have an end offset above zero, if it had
* messages produced to it before, which have since then disappeared (e.g. due to time-based retention).
*/
return 0;
}
return endOffset - currentOffset;

/**
* A topic with an end offset of zero is empty. A topic with a single message in it will have an end offset of 1,
* while that single message will have offset 0. In such single message topic, a consumer which fully scans the
* topic would have a current offset of 0, while the topic has an end offset of 1, and therefore we need to subtract
* 1 from the end offset in order to arrive at the correct lag of 0.
*/
return endOffset - 1 - currentOffset;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.stats.StatsErrorCode;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.unit.kafka.InMemoryKafkaBroker;
import com.linkedin.venice.unit.kafka.SimplePartitioner;
Expand Down Expand Up @@ -4356,6 +4357,7 @@ public void testGetOffsetToOnlineLagThresholdPerPartition() {
10l);
}

@Test
public void testCheckAndHandleUpstreamOffsetRewind() {
String storeName = "test_store";
int version = 1;
Expand Down Expand Up @@ -4448,6 +4450,73 @@ public void testCheckAndHandleUpstreamOffsetRewind() {
verify(mockStats2).recordPotentiallyLossyLeaderOffsetRewind(storeName, version);
}

@Test
public void testMeasureLagWithCallToPubSub() {
final int PARTITION_UNABLE_TO_GET_END_OFFSET = 0;
final int EMPTY_PARTITION = 1;
final int PARTITION_WITH_SOME_MESSAGES_IN_IT = 2;
final long MESSAGE_COUNT = 10;
final long INVALID_CURRENT_OFFSET = -2;
final long CURRENT_OFFSET_NOTHING_CONSUMED = OffsetRecord.LOWEST_OFFSET;
final long CURRENT_OFFSET_SOME_CONSUMED = 3;
final String PUB_SUB_SERVER_NAME = "blah";
doReturn((long) StatsErrorCode.LAG_MEASUREMENT_FAILURE.code).when(mockTopicManager)
.getLatestOffsetCached(pubSubTopic, PARTITION_UNABLE_TO_GET_END_OFFSET);
doReturn(0L).when(mockTopicManager).getLatestOffsetCached(pubSubTopic, EMPTY_PARTITION);
doReturn(MESSAGE_COUNT).when(mockTopicManager)
.getLatestOffsetCached(pubSubTopic, PARTITION_WITH_SOME_MESSAGES_IN_IT);

assertEquals(
StoreIngestionTask.measureLagWithCallToPubSub(
PUB_SUB_SERVER_NAME,
pubSubTopic,
PARTITION_UNABLE_TO_GET_END_OFFSET,
CURRENT_OFFSET_NOTHING_CONSUMED,
s -> mockTopicManager),
Long.MAX_VALUE,
"If unable to get the end offset, we expect Long.MAX_VALUE (infinite lag).");

assertEquals(
StoreIngestionTask.measureLagWithCallToPubSub(
PUB_SUB_SERVER_NAME,
pubSubTopic,
EMPTY_PARTITION,
INVALID_CURRENT_OFFSET,
s -> mockTopicManager),
Long.MAX_VALUE,
"If the current offset is invalid (less than -1), we expect Long.MAX_VALUE (infinite lag).");

assertEquals(
StoreIngestionTask.measureLagWithCallToPubSub(
PUB_SUB_SERVER_NAME,
pubSubTopic,
EMPTY_PARTITION,
CURRENT_OFFSET_NOTHING_CONSUMED,
s -> mockTopicManager),
0,
"If the partition is empty, we expect no lag.");

assertEquals(
StoreIngestionTask.measureLagWithCallToPubSub(
PUB_SUB_SERVER_NAME,
pubSubTopic,
PARTITION_WITH_SOME_MESSAGES_IN_IT,
CURRENT_OFFSET_NOTHING_CONSUMED,
s -> mockTopicManager),
MESSAGE_COUNT,
"If the partition has messages in it, but we consumed nothing, we expect lag to equal the message count.");

assertEquals(
StoreIngestionTask.measureLagWithCallToPubSub(
PUB_SUB_SERVER_NAME,
pubSubTopic,
PARTITION_WITH_SOME_MESSAGES_IN_IT,
CURRENT_OFFSET_SOME_CONSUMED,
s -> mockTopicManager),
MESSAGE_COUNT - 1 - CURRENT_OFFSET_SOME_CONSUMED,
"If the partition has messages in it, and we consumed some of them, we expect lag to equal the unconsumed message count.");
}

private VeniceStoreVersionConfig getDefaultMockVeniceStoreVersionConfig(
Consumer<VeniceStoreVersionConfig> storeVersionConfigOverride) {
// mock the store config
Expand Down

0 comments on commit 8d1f6bc

Please sign in to comment.