Skip to content

Commit

Permalink
[dvc][server]Make recordNearlineLocalBrokerToReadyToServeLatency igno…
Browse files Browse the repository at this point in the history
…re catchup messages after push (#912)

While measuring nearline latency from local broker to ready to serve, the messages catching up after a push records longer latencies to these metrics. This changes records lagCaughtUpTimeInMs and considers the nearline messages that were ingested only after that time to ignore the already ingested messages.
  • Loading branch information
m-nagarajan authored Mar 24, 2024
1 parent 8d1f6bc commit 08a4d0c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public class PartitionConsumptionState {
private boolean deferredWrite;
private boolean errorReported;
private boolean lagCaughtUp;
/**
* Save the time when the lag is caught up to record nearline metrics
* for messages produced after this time to ignore the old messages that are
* getting caught up after a push.
*/
private long lagCaughtUpTimeInMs;
private boolean completionReported;
private boolean isSubscribed;
private boolean isDataRecoveryCompleted;
Expand Down Expand Up @@ -225,6 +231,7 @@ public PartitionConsumptionState(int partition, int amplificationFactor, OffsetR
this.offsetRecord = offsetRecord;
this.errorReported = false;
this.lagCaughtUp = false;
this.lagCaughtUpTimeInMs = 0;
this.completionReported = false;
this.isSubscribed = true;
this.processedRecordSizeSinceLastSync = 0;
Expand Down Expand Up @@ -313,13 +320,25 @@ public boolean isWaitingForReplicationLag() {
}

public void lagHasCaughtUp() {
this.lagCaughtUp = true;
if (!this.lagCaughtUp) {
this.lagCaughtUp = true;
this.lagCaughtUpTimeInMs = System.currentTimeMillis();
}
}

public boolean hasLagCaughtUp() {
return lagCaughtUp;
}

/**
* check to ignore calculating latency from pubsub broker to ready to serve for
* messages that are getting caught up from previous pushes.
* @param producerTimeStampInMs timestamp of the message
*/
public boolean isNearlineMetricsRecordingValid(long producerTimeStampInMs) {
return (lagCaughtUp && lagCaughtUpTimeInMs > 0 && producerTimeStampInMs > lagCaughtUpTimeInMs);
}

public boolean isCompletionReported() {
return completionReported;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3815,21 +3815,23 @@ private void recordNearlineLocalBrokerToReadyToServerLatency(
KafkaMessageEnvelope kafkaMessageEnvelope,
LeaderProducedRecordContext leaderProducedRecordContext) {
/**
* Record nearline latency only when it's a hybrid store and the lag has been caught up. Sometimes
* the producerTimestamp can be -1 if the leaderProducedRecordContext had an error after callback
* Don't record latency for invalid timestamps
* Record nearline latency only when it's a hybrid store, the lag has been caught up and ignore
* messages that are getting caughtup. Sometimes the producerTimestamp can be -1 if the
* leaderProducedRecordContext had an error after callback. Don't record latency for invalid timestamps.
*/
if (!isUserSystemStore() && isHybridMode() && partitionConsumptionState.hasLagCaughtUp()) {
long afterProcessingRecordTimestampMs = System.currentTimeMillis();
long producerTimestamp = (leaderProducedRecordContext == null)
? kafkaMessageEnvelope.producerMetadata.messageTimestamp
: leaderProducedRecordContext.getProducedTimestampMs();
if (producerTimestamp > 0) {
versionedIngestionStats.recordNearlineLocalBrokerToReadyToServeLatency(
storeName,
versionNumber,
afterProcessingRecordTimestampMs - producerTimestamp,
afterProcessingRecordTimestampMs);
if (partitionConsumptionState.isNearlineMetricsRecordingValid(producerTimestamp)) {
long afterProcessingRecordTimestampMs = System.currentTimeMillis();
versionedIngestionStats.recordNearlineLocalBrokerToReadyToServeLatency(
storeName,
versionNumber,
afterProcessingRecordTimestampMs - producerTimestamp,
afterProcessingRecordTimestampMs);
}
} else if (!REDUNDANT_LOGGING_FILTER.isRedundantException(storeName, "IllegalTimestamp")) {
LOGGER.warn(
"Illegal timestamp for storeName: {}, versionNumber: {}, partition: {}, "
Expand Down

0 comments on commit 08a4d0c

Please sign in to comment.