Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a handful of perworkermetrics the bigquery sink (#28903). #29098

Merged
merged 4 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,40 @@
public class DelegatingCounter implements Metric, Counter, Serializable {
private final MetricName name;
private final boolean processWideContainer;
private final boolean perWorkerCounter;

/**
* Create a {@code DelegatingCounter} with {@code perWorkerCounter} and {@code
* processWideContainer} set to false.
*
* @param name Metric name for this metric.
*/
public DelegatingCounter(MetricName name) {
this(name, false);
this(name, false, false);
}

/**
* Create a {@code DelegatingCounter} with {@code perWorkerCounter} set to false.
*
* @param name Metric name for this metric.
* @param processWideContainer Whether this Counter is stored in the ProcessWide container or the
* current thread's container.
*/
public DelegatingCounter(MetricName name, boolean processWideContainer) {
this(name, processWideContainer, false);
}

/**
* @param name Metric name for this metric.
* @param processWideContainer Whether this Counter is stored in the ProcessWide container or the
* current thread's container.
* @param perWorkerCounter Whether this Counter refers to a perWorker metric or not.
*/
public DelegatingCounter(
MetricName name, boolean processWideContainer, boolean perWorkerCounter) {
this.name = name;
this.processWideContainer = processWideContainer;
this.perWorkerCounter = perWorkerCounter;
}

/** Increment the counter. */
Expand All @@ -48,7 +74,12 @@ public void inc(long n) {
this.processWideContainer
? MetricsEnvironment.getProcessWideContainer()
: MetricsEnvironment.getCurrentContainer();
if (container != null) {
if (container == null) {
return;
}
if (perWorkerCounter) {
container.getPerWorkerCounter(name).inc(n);
} else {
container.getCounter(name).inc(n);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,37 @@ public class DelegatingHistogram implements Metric, Histogram, Serializable {
private final MetricName name;
private final HistogramData.BucketType bucketType;
private final boolean processWideContainer;
private final boolean perWorkerHistogram;

/**
* Create a {@code DelegatingHistogram} with {@code perWorkerHistogram} set to false.
*
* @param name Metric name for this metric.
* @param bucketType Histogram bucketing strategy.
* @param processWideContainer Whether this Counter is stored in the ProcessWide container or the
* current thread's container.
*/
public DelegatingHistogram(
MetricName name, HistogramData.BucketType bucketType, boolean processWideContainer) {
this(name, bucketType, processWideContainer, false);
}

/**
* @param name Metric name for this metric.
* @param bucketType Histogram bucketing strategy.
* @param processWideContainer Whether this Counter is stored in the ProcessWide container or the
* current thread's container.
* @param perWorkerHistogram Whether this Histogram refers to a perWorker metric or not.
*/
public DelegatingHistogram(
MetricName name,
HistogramData.BucketType bucketType,
boolean processWideContainer,
boolean perWorkerHistogram) {
this.name = name;
this.bucketType = bucketType;
this.processWideContainer = processWideContainer;
this.perWorkerHistogram = perWorkerHistogram;
}

@Override
Expand All @@ -41,7 +66,12 @@ public void update(double value) {
processWideContainer
? MetricsEnvironment.getProcessWideContainer()
: MetricsEnvironment.getCurrentContainer();
if (container != null) {
if (container == null) {
return;
}
if (perWorkerHistogram) {
container.getPerWorkerHistogram(name, bucketType).update(value);
} else {
container.getHistogram(name, bucketType).update(value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.NoOpCounter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.joda.time.Duration;

Expand All @@ -38,12 +40,14 @@ public final class FluentBackoff {
private static final Duration DEFAULT_MAX_BACKOFF = Duration.standardDays(1000);
private static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
private static final Duration DEFAULT_MAX_CUM_BACKOFF = Duration.standardDays(1000);
private static final Counter DEFAULT_THROTTLED_TIME_COUNTER = NoOpCounter.getInstance();

private final double exponent;
private final Duration initialBackoff;
private final Duration maxBackoff;
private final Duration maxCumulativeBackoff;
private final int maxRetries;
private final Counter throttledTimeCounter;

/**
* By default the {@link BackOff} created by this builder will use exponential backoff (base
Expand All @@ -65,7 +69,8 @@ public final class FluentBackoff {
DEFAULT_MIN_BACKOFF,
DEFAULT_MAX_BACKOFF,
DEFAULT_MAX_CUM_BACKOFF,
DEFAULT_MAX_RETRIES);
DEFAULT_MAX_RETRIES,
DEFAULT_THROTTLED_TIME_COUNTER);

/**
* Instantiates a {@link BackOff} that will obey the current configuration.
Expand All @@ -87,7 +92,12 @@ public BackOff backoff() {
public FluentBackoff withExponent(double exponent) {
checkArgument(exponent > 0, "exponent %s must be greater than 0", exponent);
return new FluentBackoff(
exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
exponent,
initialBackoff,
maxBackoff,
maxCumulativeBackoff,
maxRetries,
throttledTimeCounter);
}

/**
Expand All @@ -104,7 +114,12 @@ public FluentBackoff withInitialBackoff(Duration initialBackoff) {
"initialBackoff %s must be at least 1 millisecond",
initialBackoff);
return new FluentBackoff(
exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
exponent,
initialBackoff,
maxBackoff,
maxCumulativeBackoff,
maxRetries,
throttledTimeCounter);
}

/**
Expand All @@ -119,7 +134,12 @@ public FluentBackoff withMaxBackoff(Duration maxBackoff) {
checkArgument(
maxBackoff.getMillis() > 0, "maxBackoff %s must be at least 1 millisecond", maxBackoff);
return new FluentBackoff(
exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
exponent,
initialBackoff,
maxBackoff,
maxCumulativeBackoff,
maxRetries,
throttledTimeCounter);
}

/**
Expand All @@ -136,7 +156,12 @@ public FluentBackoff withMaxCumulativeBackoff(Duration maxCumulativeBackoff) {
"maxCumulativeBackoff %s must be at least 1 millisecond",
maxCumulativeBackoff);
return new FluentBackoff(
exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
exponent,
initialBackoff,
maxBackoff,
maxCumulativeBackoff,
maxRetries,
throttledTimeCounter);
}

/**
Expand All @@ -151,7 +176,22 @@ public FluentBackoff withMaxCumulativeBackoff(Duration maxCumulativeBackoff) {
public FluentBackoff withMaxRetries(int maxRetries) {
checkArgument(maxRetries >= 0, "maxRetries %s cannot be negative", maxRetries);
return new FluentBackoff(
exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries);
exponent,
initialBackoff,
maxBackoff,
maxCumulativeBackoff,
maxRetries,
throttledTimeCounter);
}

public FluentBackoff withThrottledTimeCounter(Counter throttledTimeCounter) {
return new FluentBackoff(
exponent,
initialBackoff,
maxBackoff,
maxCumulativeBackoff,
maxRetries,
throttledTimeCounter);
}

@Override
Expand Down Expand Up @@ -206,6 +246,7 @@ public long nextBackOffMillis() {
// Update state and return backoff.
currentCumulativeBackoff = currentCumulativeBackoff.plus(Duration.millis(nextBackoffMillis));
currentRetry += 1;
backoffConfig.throttledTimeCounter.inc(nextBackoffMillis);
return nextBackoffMillis;
}

Expand All @@ -229,11 +270,13 @@ private FluentBackoff(
Duration initialBackoff,
Duration maxBackoff,
Duration maxCumulativeBackoff,
int maxRetries) {
int maxRetries,
Counter throttledTimeCounter) {
this.exponent = exponent;
this.initialBackoff = initialBackoff;
this.maxBackoff = maxBackoff;
this.maxRetries = maxRetries;
this.maxCumulativeBackoff = maxCumulativeBackoff;
this.throttledTimeCounter = throttledTimeCounter;
}
}
Loading
Loading