diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingCounter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingCounter.java
index 82808d8cb633..ece0c70348c4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingCounter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingCounter.java
@@ -25,14 +25,34 @@
public class DelegatingCounter implements Metric, Counter, Serializable {
private final MetricName name;
private final boolean processWideContainer;
+ private final boolean perWorkerCounter;
+ /**
+ * Create a {@code DelegatingCounter} with {@code perWorkerCounter} and {@code processWideContainer} set to false.
+ * @param name Metric name for this metric.
+ */
public DelegatingCounter(MetricName name) {
- this(name, false);
+ this(name, false, false);
}
+ /**
+ * Create a {@code DelegatingCounter} with {@code perWorkerCounter} set to false.
+ * @param name Metric name for this metric.
+ * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the current thread's container.
+ */
public DelegatingCounter(MetricName name, boolean processWideContainer) {
+ this(name, processWideContainer, false);
+ }
+
+ /**
+ * @param name Metric name for this metric.
+ * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the current thread's container.
+ * @param perWorkerCounter Whether this Counter refers to a perWorker metric or not.
+ */
+ public DelegatingCounter(MetricName name, boolean processWideContainer, boolean perWorkerCounter) {
this.name = name;
this.processWideContainer = processWideContainer;
+ this.perWorkerCounter = perWorkerCounter;
}
/** Increment the counter. */
@@ -48,7 +68,12 @@ public void inc(long n) {
this.processWideContainer
? MetricsEnvironment.getProcessWideContainer()
: MetricsEnvironment.getCurrentContainer();
- if (container != null) {
+ if (container == null) {
+ return;
+ }
+ if (perWorkerCounter) {
+ container.getPerWorkerCounter(name).inc(n);
+ } else {
container.getCounter(name).inc(n);
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java
index 74e3cf5719f7..b5c458836444 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java
@@ -27,21 +27,46 @@ public class DelegatingHistogram implements Metric, Histogram, Serializable {
private final MetricName name;
private final HistogramData.BucketType bucketType;
private final boolean processWideContainer;
+ private final boolean perWorkerHistogram;
+ /**
+ * Create a {@code DelegatingHistogram} with {@code perWorkerHistogram} set to false.
+ * @param name Metric name for this metric.
+ * @param bucketType Histogram bucketing strategy.
+ * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the current thread's container.
+ */
public DelegatingHistogram(
MetricName name, HistogramData.BucketType bucketType, boolean processWideContainer) {
- this.name = name;
- this.bucketType = bucketType;
- this.processWideContainer = processWideContainer;
+ this(name, bucketType, processWideContainer, false);
}
+
+ /**
+ * @param name Metric name for this metric.
+ * @param bucketType Histogram bucketing strategy.
+ * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the current thread's container.
+ * @param perWorkerHistogram Whether this Histogram refers to a perWorker metric or not.
+ */
+ public DelegatingHistogram(
+ MetricName name, HistogramData.BucketType bucketType, boolean processWideContainer, boolean perWorkerHistogram) {
+ this.name = name;
+ this.bucketType = bucketType;
+ this.processWideContainer = processWideContainer;
+ this.perWorkerHistogram = perWorkerHistogram;
+ }
+
@Override
public void update(double value) {
MetricsContainer container =
processWideContainer
? MetricsEnvironment.getProcessWideContainer()
: MetricsEnvironment.getCurrentContainer();
- if (container != null) {
+ if (container == null) {
+ return;
+ }
+ if (perWorkerHistogram) {
+ container.getPerWorkerHistogram(name, bucketType).update(value);
+ } else {
container.getHistogram(name, bucketType).update(value);
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerCounter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerCounter.java
deleted file mode 100644
index e8f6c7110eb8..000000000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerCounter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.annotations.Internal;
-
-/** Implementation of {@link Counter} that delegates to the instance for the current context. */
-@Internal
-public class DelegatingPerWorkerCounter implements Metric, Counter, Serializable {
- private final MetricName name;
- private final boolean processWideContainer;
-
- public DelegatingPerWorkerCounter(MetricName name) {
- this(name, false);
- }
-
- public DelegatingPerWorkerCounter(MetricName name, boolean processWideContainer) {
- this.name = name;
- this.processWideContainer = processWideContainer;
- }
-
- /** Increment the counter. */
- @Override
- public void inc() {
- inc(1);
- }
-
- /** Increment the counter by the given amount. */
- @Override
- public void inc(long n) {
- MetricsContainer container =
- this.processWideContainer
- ? MetricsEnvironment.getProcessWideContainer()
- : MetricsEnvironment.getCurrentContainer();
- if (container != null) {
- container.getPerWorkerCounter(name).inc(n);
- }
- }
-
- /* Decrement the counter. */
- @Override
- public void dec() {
- inc(-1);
- }
-
- /* Decrement the counter by the given amount. */
- @Override
- public void dec(long n) {
- inc(-1 * n);
- }
-
- @Override
- public MetricName getName() {
- return name;
- }
-}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java
deleted file mode 100644
index e9e520791c41..000000000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.metrics;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.util.HistogramData;
-
-/** Implementation of {@link Histogram} that delegates to the instance for the current context. */
-@Internal
-public class DelegatingPerWorkerHistogram implements Metric, Histogram, Serializable {
- private final MetricName name;
- private final HistogramData.BucketType bucketType;
- private final boolean processWideContainer;
-
- public DelegatingPerWorkerHistogram(
- MetricName name, HistogramData.BucketType bucketType, boolean processWideContainer) {
- this.name = name;
- this.bucketType = bucketType;
- this.processWideContainer = processWideContainer;
- }
-
- public DelegatingPerWorkerHistogram(MetricName name, HistogramData.BucketType bucketType) {
- this(name, bucketType, false);
- }
-
- @Override
- public void update(double value) {
- MetricsContainer container =
- processWideContainer
- ? MetricsEnvironment.getProcessWideContainer()
- : MetricsEnvironment.getCurrentContainer();
- if (container != null) {
- container.getPerWorkerHistogram(name, bucketType).update(value);
- }
- }
-
- @Override
- public MetricName getName() {
- return name;
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
index 35118bdccca8..f8984c62c911 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
@@ -21,10 +21,12 @@
import java.time.Instant;
import java.util.NavigableMap;
import java.util.TreeMap;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.DelegatingPerWorkerCounter;
-import org.apache.beam.sdk.metrics.DelegatingPerWorkerHistogram;
+import org.apache.beam.sdk.metrics.DelegatingCounter;
+import org.apache.beam.sdk.metrics.DelegatingHistogram;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;
@@ -40,9 +42,13 @@ public class BigQuerySinkMetrics {
private static Boolean supportMetricsDeletion = false;
private static final String METRICS_NAMESPACE = "BigQuerySink";
+
+ // Status codes
private static final String UNKNOWN = Status.Code.UNKNOWN.toString();
+ public static final String OK = Status.Code.OK.toString();
+ public static final String PAYLOAD_TOO_LARGE = "PayloadTooLarge";
- // Base Metric names.
+ // Base Metric names
private static final String RPC_REQUESTS = "RpcRequests";
private static final String RPC_LATENCY = "RpcLatency";
private static final String APPEND_ROWS_ROW_STATUS = "AppendRowsRowStatus";
@@ -64,7 +70,7 @@ enum RowStatus {
// Metric labels
private static final String TABLE_ID_LABEL = "TableId";
- private static final String RPC_STATUS_LABEL = "Status";
+ private static final String RPC_STATUS_LABEL = "RpcStatus";
private static final String RPC_METHOD = "Method";
private static final String ROW_STATUS = "RowStatus";
@@ -74,11 +80,11 @@ enum RowStatus {
private static final char METRIC_NAME_DELIMITER = '-';
/**
- * Returns a metric name that merges the baseName with metricLables formatted as:
+ * Returns a metric name that merges the baseName with metricLables formatted as.:
*
*
'{baseName}-{metricLabelKey1}:{metricLabelVal1};...{metricLabelKeyN}:{metricLabelValN};'
*/
- private static String CreateLabeledMetricName(
+ private static String createLabeledMetricName(
String baseName, NavigableMap metricLabels) {
StringBuilder nameBuilder = new StringBuilder(baseName + METRIC_NAME_DELIMITER);
@@ -89,15 +95,15 @@ private static String CreateLabeledMetricName(
}
/**
- * @param method StorageWriteAPI write method.
+ * @param method StorageWriteAPI method associated with this metric.
* @param rpcStatus RPC return status.
* @param tableId Table pertaining to the write method. Only included in the metric key if
* 'supportsMetricsDeletion' is enabled.
* @return Counter in namespace BigQuerySink and name
- * 'RpcRequests-Status:{status};TableId:{tableId}' TableId label is dropped if
- * 'supportsMetricsDeletion' is not enabled.
+ * 'RpcRequests-Method:{method}RpcStatus:{status};TableId:{tableId}' TableId label is dropped
+ * if 'supportsMetricsDeletion' is not enabled.
*/
- private static Counter CreateRPCRequestCounter(
+ private static Counter createRPCRequestCounter(
RpcMethod method, String rpcStatus, String tableId) {
NavigableMap metricLabels = new TreeMap();
metricLabels.put(RPC_STATUS_LABEL, rpcStatus);
@@ -106,67 +112,45 @@ private static Counter CreateRPCRequestCounter(
metricLabels.put(TABLE_ID_LABEL, tableId);
}
- String fullMetricName = CreateLabeledMetricName(RPC_REQUESTS, metricLabels);
+ String fullMetricName = createLabeledMetricName(RPC_REQUESTS, metricLabels);
MetricName metricName = MetricName.named(METRICS_NAMESPACE, fullMetricName);
- return new DelegatingPerWorkerCounter(metricName);
- }
-
- /** Creates a counter for the AppendRows RPC call based on the rpcStatus and table. */
- public static Counter AppendRPCsCounter(String rpcStatus, String tableId) {
- return CreateRPCRequestCounter(RpcMethod.APPEND_ROWS, rpcStatus, tableId);
- }
-
- /**
- * Creates a counter for the FlushRows RPC call based on the rpcStatus. TableId is not known when
- * the stream is flushed so we use the placeholder 'UNKNOWN'.
- */
- public static Counter FlushRowsCounter(String rpcStatus) {
- return CreateRPCRequestCounter(RpcMethod.FLUSH_ROWS, rpcStatus, BigQuerySinkMetrics.UNKNOWN);
- }
-
- /**
- * Creates a counter for the FinalizeRows RPC call based on the rpcStatus. TableId is not known
- * when the stream is flushed so we use the placeholder 'UNKNOWN'.
- */
- public static Counter FinalizeStreamCounter(String rpcStatus) {
- return CreateRPCRequestCounter(
- RpcMethod.FINALIZE_STREAM, rpcStatus, BigQuerySinkMetrics.UNKNOWN);
+ return new DelegatingCounter(metricName, false, true);
}
/**
- * Creates an Histogram metric to record RPC latency. Metric will have name:
+ * Creates an Histogram metric to record RPC latency. Metric will have name.:
*
* 'RpcLatency-Method:{method};'
*
* @param method StorageWriteAPI method associated with this metric.
* @return Histogram with exponential buckets with a sqrt(2) growth factor.
*/
- private static Histogram CreateRPCLatencyHistogram(RpcMethod method) {
+ private static Histogram createRPCLatencyHistogram(RpcMethod method) {
NavigableMap metricLabels = new TreeMap();
metricLabels.put(RPC_METHOD, method.toString());
- String fullMetricName = CreateLabeledMetricName(RPC_LATENCY, metricLabels);
+ String fullMetricName = createLabeledMetricName(RPC_LATENCY, metricLabels);
MetricName metricName = MetricName.named(METRICS_NAMESPACE, fullMetricName);
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 34);
- return new DelegatingPerWorkerHistogram(metricName, buckets);
+ return new DelegatingHistogram(metricName, buckets, false, true);
}
/**
- * Records the time between operationStartTime and OperationEndTime in a PerWorkerHistogram.
+ * Records an RPC operation's duration in a PerWorkerHistogram.
*
- * @param operationStartTime If null or in the future, this function is a no-op.
- * @param OperationEndTime End time of operation.
+ * @param c Retry manager context, used to get the operation start and end time.
* @param method StorageWriteAPI write method.
*/
- public static void UpdateRpcLatencyMetric(
- @Nullable Instant operationStartTime, Instant operationEndTime, RpcMethod method) {
+ private static void updateRpcLatencyMetric(@Nonnull Context> c, RpcMethod method) {
+ @Nullable Instant operationStartTime = c.getOperationStartTime();
+ @Nullable Instant operationEndTime = c.getOperationEndTime();
if (operationStartTime == null || operationEndTime == null) {
return;
}
long timeElapsed = java.time.Duration.between(operationStartTime, operationEndTime).toMillis();
if (timeElapsed > 0) {
- BigQuerySinkMetrics.CreateRPCLatencyHistogram(method).update(timeElapsed);
+ BigQuerySinkMetrics.createRPCLatencyHistogram(method).update(timeElapsed);
}
}
@@ -177,7 +161,7 @@ public static void UpdateRpcLatencyMetric(
* 'supportsMetricsDeletion' is enabled.
* @return Metric that tracks the status of BigQuery rows after making an AppendRows RPC call.
*/
- public static Counter AppendRowsRowStatusCounter(
+ public static Counter appendRowsRowStatusCounter(
RowStatus rowStatus, String rpcStatus, String tableId) {
NavigableMap metricLabels = new TreeMap();
metricLabels.put(RPC_STATUS_LABEL, rpcStatus);
@@ -186,19 +170,22 @@ public static Counter AppendRowsRowStatusCounter(
metricLabels.put(TABLE_ID_LABEL, tableId);
}
- String fullMetricName = CreateLabeledMetricName(APPEND_ROWS_ROW_STATUS, metricLabels);
+ String fullMetricName = createLabeledMetricName(APPEND_ROWS_ROW_STATUS, metricLabels);
MetricName metricName = MetricName.named(METRICS_NAMESPACE, fullMetricName);
- return new DelegatingPerWorkerCounter(metricName);
+ return new DelegatingCounter(metricName, false, true);
}
- /** Metric that tracks throttled time due between RPC retries. */
- public static Counter ThrottledTimeCounter(RpcMethod method) {
+ /**
+ * @param method StorageWriteAPI write method.
+ * @return Counter that tracks throttled time due to RPC retries.
+ */
+ public static Counter throttledTimeCounter(RpcMethod method) {
NavigableMap metricLabels = new TreeMap();
metricLabels.put(RPC_METHOD, method.toString());
- String fullMetricName = CreateLabeledMetricName(THROTTLED_TIME, metricLabels);
+ String fullMetricName = createLabeledMetricName(THROTTLED_TIME, metricLabels);
MetricName metricName = MetricName.named(METRICS_NAMESPACE, fullMetricName);
- return new DelegatingPerWorkerCounter(metricName);
+ return new DelegatingCounter(metricName, false, true);
}
/**
@@ -207,13 +194,74 @@ public static Counter ThrottledTimeCounter(RpcMethod method) {
* @param t Throwable.
* @return gRPC status code string or 'UNKNOWN' if 't' is null or does not map to a gRPC error.
*/
- public static String ThrowableToGRPCCodeString(@Nullable Throwable t) {
+ public static String throwableToGRPCCodeString(@Nullable Throwable t) {
if (t == null) {
return BigQuerySinkMetrics.UNKNOWN;
}
return Status.fromThrowable(t).getCode().toString();
}
+ /**
+ * Records RpcRequests counter and RpcLatency histogram for this RPC call. If
+ * 'SupportMetricsDeletion' is enabled, RpcRequests counter will have tableId label set to {@code
+ * UNKNOWN}. RpcRequets counter will have RpcStatus label set to {@code OK}.
+ *
+ * @param c Context of successful RPC call.
+ * @param method StorageWriteAPI method associated with this metric.
+ */
+ public static void reportSuccessfulRpcMetrics(@Nullable Context> c, RpcMethod method) {
+ reportSuccessfulRpcMetrics(c, method, UNKNOWN);
+ }
+
+ /**
+ * Records RpcRequests counter and RpcLatency histogram for this RPC call. RpcRequets counter will
+ * have RpcStatus label set to {@code OK}.
+ *
+ * @param c Context of successful RPC call.
+ * @param method StorageWriteAPI method associated with this metric.
+ * @param tableId Table pertaining to the write method. Only included in the metric key if
+ * 'supportsMetricsDeletion' is enabled.
+ */
+ public static void reportSuccessfulRpcMetrics(
+ @Nullable Context> c, RpcMethod method, String tableId) {
+ if (c == null) {
+ return;
+ }
+ createRPCRequestCounter(method, OK, tableId).inc(1);
+ updateRpcLatencyMetric(c, method);
+ }
+
+ /**
+ * Records RpcRequests counter and RpcLatency histogram for this RPC call. If
+ * 'SupportMetricsDeletion' is enabled, RpcRequests counter will have tableId label set to {@code
+ * UNKNOWN}. RpcRequets counter will have a RpcStatus label set from the gRPC error.
+ *
+ * @param c Context of successful RPC call.
+ * @param method StorageWriteAPI method associated with this metric.
+ */
+ public static void reportFailedRPCMetrics(@Nullable Context> c, RpcMethod method) {
+ reportFailedRPCMetrics(c, method, UNKNOWN);
+ }
+
+ /**
+ * Records RpcRequests counter and RpcLatency histogram for this RPC call. RpcRequets counter will
+ * have a RpcStatus label set from the gRPC error.
+ *
+ * @param c Context of successful RPC call.
+ * @param method StorageWriteAPI method associated with this metric.
+ * @param tableId Table pertaining to the write method. Only included in the metric key if
+ * 'supportsMetricsDeletion' is enabled.
+ */
+ public static void reportFailedRPCMetrics(
+ @Nullable Context> c, RpcMethod method, String tableId) {
+ if (c == null) {
+ return;
+ }
+ String statusCode = throwableToGRPCCodeString(c.getError());
+ createRPCRequestCounter(method, statusCode, tableId).inc(1);
+ updateRpcLatencyMetric(c, method);
+ }
+
public static void setSupportMetricsDeletion(Boolean supportMetricsDeletion) {
BigQuerySinkMetrics.supportMetricsDeletion = supportMetricsDeletion;
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java
index 0878f4541176..ae2eafa7a2b7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java
@@ -104,6 +104,7 @@ static class Context {
private @Nullable Throwable error = null;
private @Nullable ResultT result = null;
private @Nullable Instant operationStartTime = null;
+ private @Nullable Instant operationEndTime = null;
public void setError(@Nullable Throwable error) {
this.error = error;
@@ -128,6 +129,14 @@ public void setOperationStartTime(@Nullable Instant operationStartTime) {
public @Nullable Instant getOperationStartTime() {
return operationStartTime;
}
+
+ public void setOperationEndTime(@Nullable Instant operationEndTime) {
+ this.operationEndTime = operationEndTime;
+ }
+
+ public @Nullable Instant getOperationEndTime() {
+ return operationEndTime;
+ }
}
private final Function> runOperation;
@@ -153,6 +162,7 @@ public Operation(
void run(Executor executor) {
this.context.setOperationStartTime(Instant.now());
+ this.context.setOperationEndTime(null);
this.future = runOperation.apply(context);
this.callback = new Callback<>(hasSucceeded);
ApiFutures.addCallback(future, callback, executor);
@@ -170,6 +180,7 @@ private static class Callback implements ApiFutureCallback {
private final Function hasSucceeded;
@Nullable private Throwable failure = null;
boolean failed = false;
+ @Nullable Instant operationEndTime = null;
Callback(Function hasSucceeded) {
this.waiter = new CountDownLatch(1);
@@ -187,6 +198,7 @@ boolean await(long timeoutSec) throws InterruptedException {
@Override
public void onFailure(Throwable t) {
synchronized (this) {
+ operationEndTime = Instant.now();
failure = t;
failed = true;
}
@@ -196,6 +208,7 @@ public void onFailure(Throwable t) {
@Override
public void onSuccess(ResultT result) {
synchronized (this) {
+ operationEndTime = Instant.now();
if (hasSucceeded.apply(result)) {
failure = null;
} else {
@@ -218,6 +231,13 @@ boolean getFailed() {
return failed;
}
}
+
+ @Nullable
+ Instant getOperationEndTime() {
+ synchronized (this) {
+ return operationEndTime;
+ }
+ }
}
void addOperation(
@@ -280,6 +300,10 @@ void await() throws Exception {
while (!this.operations.isEmpty()) {
Operation operation = this.operations.element();
boolean failed = operation.await();
+ @Nullable Callback> callback = operation.callback;
+ if (callback != null) {
+ operation.context.setOperationEndTime(callback.getOperationEndTime());
+ }
if (failed) {
Throwable failure = Preconditions.checkStateNotNull(operation.callback).getFailure();
operation.context.setError(failure);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java
index 831a8ff6a94b..853a63fca8cc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java
@@ -22,7 +22,6 @@
import com.google.cloud.bigquery.storage.v1.StorageError;
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
import java.io.IOException;
-import java.time.Instant;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
@@ -109,7 +108,7 @@ public void process(PipelineOptions pipelineOptions, @Element KV
Duration.standardSeconds(1),
Duration.standardMinutes(1),
3,
- BigQuerySinkMetrics.ThrottledTimeCounter(
+ BigQuerySinkMetrics.throttledTimeCounter(
BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM));
retryManager.addOperation(
c -> {
@@ -121,13 +120,8 @@ public void process(PipelineOptions pipelineOptions, @Element KV
Preconditions.checkArgumentNotNull(Iterables.getFirst(contexts, null));
LOG.error("Finalize of stream " + streamId + " failed with " + firstContext.getError());
finalizeOperationsFailed.inc();
- String errorCode = BigQuerySinkMetrics.ThrowableToGRPCCodeString(firstContext.getError());
- BigQuerySinkMetrics.FinalizeStreamCounter(errorCode).inc();
-
- @Nullable Instant operationStartTime = firstContext.getOperationStartTime();
- Instant operationEndTime = Instant.now();
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(
- operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM);
+ BigQuerySinkMetrics.reportFailedRPCMetrics(
+ firstContext, BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM);
return RetryType.RETRY_ALL_OPERATIONS;
},
@@ -140,11 +134,8 @@ public void process(PipelineOptions pipelineOptions, @Element KV
rowsFinalized.inc(response.getRowCount());
finalizeOperationsSucceeded.inc();
- BigQuerySinkMetrics.FinalizeStreamCounter("ok").inc();
- @Nullable Instant operationStartTime = c.getOperationStartTime();
- Instant operationEndTime = Instant.now();
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(
- operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM);
+ BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
+ c, BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM);
commitStreams.computeIfAbsent(tableId, d -> Lists.newArrayList()).add(streamId);
},
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
index 0cde0b7017b4..333a0c0b36bf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
@@ -146,7 +146,7 @@ public void process(PipelineOptions pipelineOptions, @Element KV {
@@ -165,12 +165,8 @@ public void process(PipelineOptions pipelineOptions, @Element KV {
- BigQuerySinkMetrics.FlushRowsCounter("ok").inc();
- if (c != null) {
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(
- c.getOperationStartTime(),
- Instant.now(),
- BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS);
- }
+ BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
+ c, BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS);
flushOperationsSucceeded.inc();
},
new Context<>());
@@ -221,18 +212,11 @@ public void process(PipelineOptions pipelineOptions, @Element KV {
finalizeOperationsSent.inc();
- if (c != null) {
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(
- c.getOperationStartTime(),
- Instant.now(),
- BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS);
- }
- BigQuerySinkMetrics.FinalizeStreamCounter("ok").inc();
return datasetService.finalizeWriteStream(streamId);
},
@@ -246,17 +230,10 @@ public void process(PipelineOptions pipelineOptions, @Element KV firstContext = Iterables.getFirst(contexts, null);
+ BigQuerySinkMetrics.reportFailedRPCMetrics(
+ firstContext, BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM);
@Nullable Throwable error = firstContext == null ? null : firstContext.getError();
- String errorCode = BigQuerySinkMetrics.ThrowableToGRPCCodeString(error);
- BigQuerySinkMetrics.FinalizeStreamCounter(errorCode).inc();
- if (firstContext != null) {
- @Nullable Instant operationStartTime = firstContext.getOperationStartTime();
- Instant operationEndTime = Instant.now();
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(
- operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS);
- }
-
if (error instanceof ApiException) {
Code statusCode = ((ApiException) error).getStatusCode().getCode();
if (statusCode.equals(Code.NOT_FOUND)) {
@@ -266,6 +243,8 @@ public void process(PipelineOptions pipelineOptions, @Element KV {
+ BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
+ r, BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM);
finalizeOperationsSucceeded.inc();
},
new Context<>());
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 587e1cd82c0b..444d87f62da6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -612,8 +612,10 @@ long flush(
timestamp);
}
int numRowsFailed = inserts.getSerializedRowsCount();
- BigQuerySinkMetrics.AppendRowsRowStatusCounter(
- BigQuerySinkMetrics.RowStatus.FAILED, "PayloadTooLarge", shortTableUrn)
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
+ BigQuerySinkMetrics.RowStatus.FAILED,
+ BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
+ shortTableUrn)
.inc(numRowsFailed);
rowsSentToFailedRowsCollection.inc(numRowsFailed);
return 0;
@@ -657,13 +659,10 @@ long flush(
contexts -> {
AppendRowsContext failedContext =
Preconditions.checkStateNotNull(Iterables.getFirst(contexts, null));
- Instant operationEndTime = Instant.now();
- @Nullable Instant operationStartTime = failedContext.getOperationStartTime();
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(
- operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS);
+ BigQuerySinkMetrics.reportFailedRPCMetrics(
+ failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableUrn);
String errorCode =
- BigQuerySinkMetrics.ThrowableToGRPCCodeString(failedContext.getError());
- BigQuerySinkMetrics.AppendRPCsCounter(errorCode, shortTableUrn).inc();
+ BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError());
if (failedContext.getError() != null
&& failedContext.getError() instanceof Exceptions.AppendSerializtionError) {
@@ -695,7 +694,7 @@ long flush(
}
int numRowsFailed = failedRowIndices.size();
rowsSentToFailedRowsCollection.inc(numRowsFailed);
- BigQuerySinkMetrics.AppendRowsRowStatusCounter(
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.FAILED, errorCode, shortTableUrn)
.inc(numRowsFailed);
@@ -713,7 +712,7 @@ long flush(
failedContext.protoRows = retryRows.build();
failedContext.timestamps = retryTimestamps;
int numRowsRetried = failedContext.protoRows.getSerializedRowsCount();
- BigQuerySinkMetrics.AppendRowsRowStatusCounter(
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableUrn)
.inc(numRowsRetried);
@@ -756,8 +755,6 @@ long flush(
// The following errors are known to be persistent, so always fail the work item in
// this case.
- Throwable error = Preconditions.checkStateNotNull(failedContext.getError());
- Status.Code statusCode = Status.fromThrowable(error).getCode();
if (statusCode.equals(Status.Code.OUT_OF_RANGE)
|| statusCode.equals(Status.Code.ALREADY_EXISTS)) {
throw new RuntimeException(
@@ -789,7 +786,7 @@ long flush(
}
int numRowsRetried = failedContext.protoRows.getSerializedRowsCount();
- BigQuerySinkMetrics.AppendRowsRowStatusCounter(
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableUrn)
.inc(numRowsRetried);
@@ -797,17 +794,16 @@ long flush(
return RetryType.RETRY_ALL_OPERATIONS;
},
c -> {
- BigQuerySinkMetrics.AppendRPCsCounter("ok", shortTableUrn).inc();
int numRecordsAppended = c.protoRows.getSerializedRowsCount();
recordsAppended.inc(numRecordsAppended);
- BigQuerySinkMetrics.AppendRowsRowStatusCounter(
- BigQuerySinkMetrics.RowStatus.SUCCESSFUL, "ok", shortTableUrn)
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
+ BigQuerySinkMetrics.RowStatus.SUCCESSFUL,
+ BigQuerySinkMetrics.OK,
+ shortTableUrn)
.inc(numRecordsAppended);
- @Nullable Instant operationStartTime = c.getOperationStartTime();
- Instant operationEndTime = Instant.now();
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(
- operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS);
+ BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
+ c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableUrn);
if (successfulRowsReceiver != null) {
for (int i = 0; i < c.protoRows.getSerializedRowsCount(); ++i) {
@@ -949,7 +945,7 @@ void flushAll(
Duration.standardSeconds(1),
Duration.standardSeconds(10),
1000,
- BigQuerySinkMetrics.ThrottledTimeCounter(
+ BigQuerySinkMetrics.throttledTimeCounter(
BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
retryManagers.add(retryManager);
numRowsWritten +=
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 6e59e31719d6..d3042984638f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -546,8 +546,10 @@ public void process(
new BigQueryStorageApiInsertError(failedRow.getValue(), errorMessage),
failedRow.getTimestamp());
rowsSentToFailedRowsCollection.inc();
- BigQuerySinkMetrics.AppendRowsRowStatusCounter(
- BigQuerySinkMetrics.RowStatus.FAILED, "PayloadTooLarge", shortTableId)
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
+ BigQuerySinkMetrics.RowStatus.FAILED,
+ BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
+ shortTableId)
.inc(1);
},
autoUpdateSchema,
@@ -632,10 +634,10 @@ public void process(
// The first context is always the one that fails.
AppendRowsContext failedContext =
Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null));
- @Nullable Instant operationStartTime = failedContext.getOperationStartTime();
- Instant operationEndTime = Instant.now();
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(
- operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS);
+ BigQuerySinkMetrics.reportFailedRPCMetrics(
+ failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS);
+ String errorCode =
+ BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError());
// AppendSerializationError means that BigQuery detected errors on individual rows, e.g.
// a row not conforming
@@ -648,8 +650,6 @@ public void process(
Preconditions.checkArgumentNotNull(
(Exceptions.AppendSerializtionError) failedContext.getError());
- String errorCode = BigQuerySinkMetrics.ThrowableToGRPCCodeString(error);
- BigQuerySinkMetrics.AppendRPCsCounter(errorCode, shortTableId).inc();
Set failedRowIndices = error.getRowIndexToErrorMessage().keySet();
for (int failedIndex : failedRowIndices) {
// Convert the message to a TableRow and send it to the failedRows collection.
@@ -664,7 +664,7 @@ public void process(
}
int failedRows = failedRowIndices.size();
rowsSentToFailedRowsCollection.inc(failedRows);
- BigQuerySinkMetrics.AppendRowsRowStatusCounter(
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.FAILED, errorCode, shortTableId)
.inc(failedRows);
@@ -682,7 +682,7 @@ public void process(
failedContext.protoRows = retryRows.build();
failedContext.timestamps = timestamps;
int retriedRows = failedContext.protoRows.getSerializedRowsCount();
- BigQuerySinkMetrics.AppendRowsRowStatusCounter(
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId)
.inc(retriedRows);
@@ -698,8 +698,7 @@ public void process(
Throwable error = Preconditions.checkStateNotNull(failedContext.getError());
Status.Code statusCode = Status.fromThrowable(error).getCode();
- String errorCode = statusCode.toString();
- BigQuerySinkMetrics.AppendRPCsCounter(errorCode, shortTableId).inc();
+
// This means that the offset we have stored does not match the current end of
// the stream in the Storage API. Usually this happens because a crash or a bundle
// failure
@@ -733,7 +732,7 @@ public void process(
}
appendFailures.inc();
int retriedRows = failedContext.protoRows.getSerializedRowsCount();
- BigQuerySinkMetrics.AppendRowsRowStatusCounter(
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId)
.inc(retriedRows);
@@ -782,14 +781,10 @@ public void process(
false)));
int flushedRows = context.protoRows.getSerializedRowsCount();
flushesScheduled.inc(flushedRows);
- BigQuerySinkMetrics.AppendRowsRowStatusCounter(
- BigQuerySinkMetrics.RowStatus.SUCCESSFUL, "ok", shortTableId)
- .inc(flushedRows);
- BigQuerySinkMetrics.AppendRPCsCounter("ok", shortTableId).inc();
- @Nullable Instant operationStartTime = context.getOperationStartTime();
- Instant operationEndTime = Instant.now();
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(
- operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS);
+ BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
+ context, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableId);
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
+ BigQuerySinkMetrics.RowStatus.SUCCESSFUL, BigQuerySinkMetrics.OK, shortTableId);
if (successfulRowsTag != null) {
for (int i = 0; i < context.protoRows.getSerializedRowsCount(); ++i) {
@@ -807,7 +802,7 @@ public void process(
Duration.standardSeconds(1),
Duration.standardSeconds(10),
1000,
- BigQuerySinkMetrics.ThrottledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
+ BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
int numAppends = 0;
for (SplittingIterable.Value splitValue : messages) {
// Handle the case of a row that is too large.
@@ -834,8 +829,10 @@ public void process(
}
int numRowsFailed = splitValue.getProtoRows().getSerializedRowsCount();
rowsSentToFailedRowsCollection.inc(numRowsFailed);
- BigQuerySinkMetrics.AppendRowsRowStatusCounter(
- BigQuerySinkMetrics.RowStatus.FAILED, "PayloadTooLarge", shortTableId)
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
+ BigQuerySinkMetrics.RowStatus.FAILED,
+ BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
+ shortTableId)
.inc(numRowsFailed);
} else {
++numAppends;
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
index e5569c502b71..c937c6b299b2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
@@ -21,17 +21,23 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
+import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import io.grpc.Status;
import java.time.Instant;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.HistogramData;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.hamcrest.collection.IsMapContaining;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -57,7 +63,12 @@ public MetricName getName() {
public static class TestMetricsContainer extends MetricsContainerImpl {
- public TestHistogram testHistogram = new TestHistogram();
+ // public TestHistogram testHistogram = new TestHistogram();
+ public ConcurrentHashMap, TestHistogram>
+ perWorkerHistograms =
+ new ConcurrentHashMap, TestHistogram>();
+ public ConcurrentHashMap perWorkerCounters =
+ new ConcurrentHashMap();
public TestMetricsContainer() {
super("TestStep");
@@ -66,104 +77,228 @@ public TestMetricsContainer() {
@Override
public Histogram getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
- return testHistogram;
+ perWorkerHistograms.computeIfAbsent(KV.of(metricName, bucketType), kv -> new TestHistogram());
+ return perWorkerHistograms.get(KV.of(metricName, bucketType));
+ // return testHistogram;
}
- }
-
- @Test
- public void testAppendRPCsCounter() throws Exception {
- BigQuerySinkMetrics.setSupportMetricsDeletion(false);
- Counter deletesEnabledCounter = BigQuerySinkMetrics.AppendRPCsCounter("rpcStatus", "tableId");
- assertThat(
- deletesEnabledCounter.getName().getName(),
- equalTo("RpcRequests-Method:APPEND_ROWS;Status:rpcStatus;"));
-
- BigQuerySinkMetrics.setSupportMetricsDeletion(true);
- Counter deletesDisabledCounter = BigQuerySinkMetrics.AppendRPCsCounter("rpcStatus", "tableId");
- assertThat(
- deletesDisabledCounter.getName().getName(),
- equalTo("RpcRequests-Method:APPEND_ROWS;Status:rpcStatus;TableId:tableId;"));
- }
- @Test
- public void testFlushRowsCounter() throws Exception {
- BigQuerySinkMetrics.setSupportMetricsDeletion(false);
- Counter deletesEnabledCounter = BigQuerySinkMetrics.FlushRowsCounter("rpcStatus");
- assertThat(
- deletesEnabledCounter.getName().getName(),
- equalTo("RpcRequests-Method:FLUSH_ROWS;Status:rpcStatus;"));
+ @Override
+ public Counter getPerWorkerCounter(MetricName metricName) {
+ perWorkerCounters.computeIfAbsent(metricName, name -> new CounterCell(name));
+ return perWorkerCounters.get(metricName);
+ }
- BigQuerySinkMetrics.setSupportMetricsDeletion(true);
- Counter deletesDisabledCounter = BigQuerySinkMetrics.FlushRowsCounter("rpcStatus");
- assertThat(
- deletesDisabledCounter.getName().getName(),
- equalTo("RpcRequests-Method:FLUSH_ROWS;Status:rpcStatus;TableId:UNKNOWN;"));
+ @Override
+ public void reset() {
+ // testHistogram.values.clear();
+ perWorkerHistograms.clear();
+ perWorkerCounters.clear();
+ }
}
@Test
public void testAppendRowsRowStatusCounter() throws Exception {
+ // Setup
+ TestMetricsContainer testContainer = new TestMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(testContainer);
+
BigQuerySinkMetrics.setSupportMetricsDeletion(false);
- Counter deletesEnabledCounter =
- BigQuerySinkMetrics.AppendRowsRowStatusCounter(
+ Counter deletesDisabledCounter =
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.SUCCESSFUL, "rpcStatus", "tableId");
+ deletesDisabledCounter.inc();
+ MetricName deletesDisabledCounterName =
+ MetricName.named(
+ "BigQuerySink", "AppendRowsRowStatus-RowStatus:SUCCESSFUL;RpcStatus:rpcStatus;");
+ assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(deletesDisabledCounterName));
assertThat(
- deletesEnabledCounter.getName().getName(),
- equalTo("AppendRowsRowStatus-RowStatus:SUCCESSFUL;Status:rpcStatus;"));
+ testContainer.perWorkerCounters.get(deletesDisabledCounterName).getCumulative(),
+ equalTo(1L));
BigQuerySinkMetrics.setSupportMetricsDeletion(true);
- Counter deletesDisabledCounter =
- BigQuerySinkMetrics.AppendRowsRowStatusCounter(
+ testContainer.reset();
+ Counter deletesEnabledCounter =
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
BigQuerySinkMetrics.RowStatus.SUCCESSFUL, "rpcStatus", "tableId");
+ deletesEnabledCounter.inc();
+ MetricName deletesEnabledCounterName =
+ MetricName.named(
+ "BigQuerySink",
+ "AppendRowsRowStatus-RowStatus:SUCCESSFUL;RpcStatus:rpcStatus;TableId:tableId;");
+ assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(deletesEnabledCounterName));
assertThat(
- deletesDisabledCounter.getName().getName(),
- equalTo("AppendRowsRowStatus-RowStatus:SUCCESSFUL;Status:rpcStatus;TableId:tableId;"));
+ testContainer.perWorkerCounters.get(deletesEnabledCounterName).getCumulative(),
+ equalTo(1L));
}
@Test
public void testThrowableToGRPCCodeString() throws Exception {
Throwable nullThrowable = null;
- assertThat(BigQuerySinkMetrics.ThrowableToGRPCCodeString(nullThrowable), equalTo("UNKNOWN"));
+ assertThat(BigQuerySinkMetrics.throwableToGRPCCodeString(nullThrowable), equalTo("UNKNOWN"));
Throwable nonGrpcError = new IndexOutOfBoundsException("Test Error");
- assertThat(BigQuerySinkMetrics.ThrowableToGRPCCodeString(nonGrpcError), equalTo("UNKNOWN"));
+ assertThat(BigQuerySinkMetrics.throwableToGRPCCodeString(nonGrpcError), equalTo("UNKNOWN"));
- int not_found_val = Status.Code.NOT_FOUND.value();
+ int notFoundVal = Status.Code.NOT_FOUND.value();
Throwable grpcError =
- new Exceptions.AppendSerializtionError(not_found_val, "Test Error", "Stream name", null);
- assertThat(BigQuerySinkMetrics.ThrowableToGRPCCodeString(grpcError), equalTo("NOT_FOUND"));
+ new Exceptions.AppendSerializtionError(notFoundVal, "Test Error", "Stream name", null);
+ assertThat(BigQuerySinkMetrics.throwableToGRPCCodeString(grpcError), equalTo("NOT_FOUND"));
}
@Test
public void testThrottledTimeCounter() throws Exception {
+ // Setup
+ TestMetricsContainer testContainer = new TestMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(testContainer);
+
+ // Test throttleCounter metric.
Counter appendRowsThrottleCounter =
- BigQuerySinkMetrics.ThrottledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS);
+ BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS);
+ appendRowsThrottleCounter.inc(1);
assertThat(
appendRowsThrottleCounter.getName().getName(),
equalTo("ThrottledTime-Method:APPEND_ROWS;"));
+ MetricName counterName = MetricName.named("BigQuerySink", "ThrottledTime-Method:APPEND_ROWS;");
+ assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(counterName));
+ assertThat(testContainer.perWorkerCounters.get(counterName).getCumulative(), equalTo(1L));
+ }
+
+ @Test
+ public void testReportSuccessfulRpcMetrics() throws Exception {
+ // Setup
+ TestMetricsContainer testContainer = new TestMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(testContainer);
+ Context c = new Context();
+ Instant t1 = Instant.now();
+ c.setOperationStartTime(t1);
+ c.setOperationEndTime(t1.plusMillis(3));
+
+ // Test disabled SupportMetricsDeletion
+ BigQuerySinkMetrics.setSupportMetricsDeletion(false);
+ BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
+ c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
+ MetricName counterNameDisabledDeletes =
+ MetricName.named("BigQuerySink", "RpcRequests-Method:APPEND_ROWS;RpcStatus:OK;");
+ MetricName histogramName = MetricName.named("BigQuerySink", "RpcLatency-Method:APPEND_ROWS;");
+ HistogramData.BucketType bucketType = HistogramData.ExponentialBuckets.of(1, 34);
+ assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(counterNameDisabledDeletes));
+ assertThat(
+ testContainer.perWorkerCounters.get(counterNameDisabledDeletes).getCumulative(),
+ equalTo(1L));
+ assertThat(
+ testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values,
+ containsInAnyOrder(Double.valueOf(3.0)));
+
+ // Test enable SupportMetricsDeletion.
+ BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+ testContainer.reset();
+ BigQuerySinkMetrics.reportSuccessfulRpcMetrics(
+ c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
+ MetricName counterNameEnabledDeletes =
+ MetricName.named(
+ "BigQuerySink", "RpcRequests-Method:APPEND_ROWS;RpcStatus:OK;TableId:tableId;");
+ assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(counterNameEnabledDeletes));
+ assertThat(
+ testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),
+ equalTo(1L));
+ assertThat(
+ testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values,
+ containsInAnyOrder(Double.valueOf(3.0)));
}
@Test
- public void testUpdateRpcLatencyMetric() throws Exception {
+ public void testReportFailedRPCMetrics_KnownGrpcError() throws Exception {
+ // Setup
TestMetricsContainer testContainer = new TestMetricsContainer();
MetricsEnvironment.setCurrentContainer(testContainer);
- BigQuerySinkMetrics.RpcMethod append = BigQuerySinkMetrics.RpcMethod.APPEND_ROWS;
+ Context c = new Context();
Instant t1 = Instant.now();
+ c.setOperationStartTime(t1);
+ c.setOperationEndTime(t1.plusMillis(5));
+ int notFoundVal = Status.Code.NOT_FOUND.value();
+ Throwable grpcError =
+ new Exceptions.AppendSerializtionError(notFoundVal, "Test Error", "Stream name", null);
+ c.setError(grpcError);
+
+ // Test disabled SupportMetricsDeletion
+ BigQuerySinkMetrics.setSupportMetricsDeletion(false);
+ BigQuerySinkMetrics.reportFailedRPCMetrics(
+ c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
+ MetricName counterNameDisabledDeletes =
+ MetricName.named("BigQuerySink", "RpcRequests-Method:APPEND_ROWS;RpcStatus:NOT_FOUND;");
+ MetricName histogramName = MetricName.named("BigQuerySink", "RpcLatency-Method:APPEND_ROWS;");
+ HistogramData.BucketType bucketType = HistogramData.ExponentialBuckets.of(1, 34);
+ assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(counterNameDisabledDeletes));
+ assertThat(
+ testContainer.perWorkerCounters.get(counterNameDisabledDeletes).getCumulative(),
+ equalTo(1L));
+ assertThat(
+ testContainer.perWorkerHistograms,
+ IsMapContaining.hasKey(KV.of(histogramName, bucketType)));
+ assertThat(
+ testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values,
+ containsInAnyOrder(Double.valueOf(5.0)));
- // Expect no updates to the histogram when we pass a null instant.
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(null, t1, append);
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1, null, append);
- assertThat(testContainer.testHistogram.values.size(), equalTo(0));
+ // Test enable SupportMetricsDeletion
+ BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+ testContainer.reset();
+ BigQuerySinkMetrics.reportFailedRPCMetrics(
+ c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
+ MetricName counterNameEnabledDeletes =
+ MetricName.named(
+ "BigQuerySink", "RpcRequests-Method:APPEND_ROWS;RpcStatus:NOT_FOUND;TableId:tableId;");
+ assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(counterNameEnabledDeletes));
+ assertThat(
+ testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),
+ equalTo(1L));
+ assertThat(
+ testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values,
+ containsInAnyOrder(Double.valueOf(5.0)));
+ }
+
+ @Test
+ public void testReportFailedRPCMetrics_UnknownGrpcError() throws Exception {
+ // Setup
+ BigQuerySinkMetrics.setSupportMetricsDeletion(false);
+ TestMetricsContainer testContainer = new TestMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(testContainer);
+ Context c = new Context();
+ Instant t1 = Instant.now();
+ c.setOperationStartTime(t1);
+ c.setOperationEndTime(t1.plusMillis(15));
+ Throwable nonGrpcError = new IndexOutOfBoundsException("Test Error");
+ c.setError(nonGrpcError);
- // Expect no updates when end time is before start time.
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1, t1.minusMillis(5), append);
- assertThat(testContainer.testHistogram.values.size(), equalTo(0));
+ // Test disabled SupportMetricsDeletion
+ BigQuerySinkMetrics.setSupportMetricsDeletion(false);
+ BigQuerySinkMetrics.reportFailedRPCMetrics(
+ c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
+ MetricName counterNameDisabledDeletes =
+ MetricName.named("BigQuerySink", "RpcRequests-Method:APPEND_ROWS;RpcStatus:UNKNOWN;");
+ MetricName histogramName = MetricName.named("BigQuerySink", "RpcLatency-Method:APPEND_ROWS;");
+ HistogramData.BucketType bucketType = HistogramData.ExponentialBuckets.of(1, 34);
+ assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(counterNameDisabledDeletes));
+ assertThat(
+ testContainer.perWorkerCounters.get(counterNameDisabledDeletes).getCumulative(),
+ equalTo(1L));
+ assertThat(
+ testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values,
+ containsInAnyOrder(Double.valueOf(15.0)));
- // Expect valid updates to be recorded in the underlying histogram.
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1.minusMillis(5), t1, append);
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1.minusMillis(10), t1, append);
- BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1.minusMillis(15), t1, append);
+ // Test enable SupportMetricsDeletion
+ BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+ testContainer.reset();
+ BigQuerySinkMetrics.reportFailedRPCMetrics(
+ c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId");
+ MetricName counterNameEnabledDeletes =
+ MetricName.named(
+ "BigQuerySink", "RpcRequests-Method:APPEND_ROWS;RpcStatus:UNKNOWN;TableId:tableId;");
+ assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(counterNameEnabledDeletes));
+ assertThat(
+ testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(),
+ equalTo(1L));
assertThat(
- testContainer.testHistogram.values,
- containsInAnyOrder(Double.valueOf(5.0), Double.valueOf(10.0), Double.valueOf(15.0)));
+ testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values,
+ containsInAnyOrder(Double.valueOf(15.0)));
}
}