From 46855a0037e2c82fe24e57b27447a76276248d8c Mon Sep 17 00:00:00 2001 From: Jayaj Poudel Date: Wed, 11 Oct 2023 02:07:32 +0000 Subject: [PATCH 1/4] Implement a handful of perworker bigquery sink metrics --- .../metrics/DelegatingPerWorkerCounter.java | 72 ++++++ .../metrics/DelegatingPerWorkerHistogram.java | 58 +++++ .../apache/beam/sdk/util/FluentBackoff.java | 27 ++- .../io/gcp/bigquery/BigQuerySinkMetrics.java | 220 ++++++++++++++++++ .../sdk/io/gcp/bigquery/RetryManager.java | 24 ++ .../StorageApiFinalizeWritesDoFn.java | 22 +- .../StorageApiFlushAndFinalizeDoFn.java | 50 +++- .../StorageApiWriteUnshardedRecords.java | 56 ++++- .../StorageApiWritesShardedRecords.java | 50 +++- .../sdk/io/gcp/bigquery/TableDestination.java | 6 + .../gcp/bigquery/BigQuerySinkMetricsTest.java | 169 ++++++++++++++ .../sdk/io/gcp/bigquery/RetryManagerTest.java | 21 +- 12 files changed, 752 insertions(+), 23 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerCounter.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerCounter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerCounter.java new file mode 100644 index 000000000000..e8f6c7110eb8 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerCounter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Internal; + +/** Implementation of {@link Counter} that delegates to the instance for the current context. */ +@Internal +public class DelegatingPerWorkerCounter implements Metric, Counter, Serializable { + private final MetricName name; + private final boolean processWideContainer; + + public DelegatingPerWorkerCounter(MetricName name) { + this(name, false); + } + + public DelegatingPerWorkerCounter(MetricName name, boolean processWideContainer) { + this.name = name; + this.processWideContainer = processWideContainer; + } + + /** Increment the counter. */ + @Override + public void inc() { + inc(1); + } + + /** Increment the counter by the given amount. */ + @Override + public void inc(long n) { + MetricsContainer container = + this.processWideContainer + ? MetricsEnvironment.getProcessWideContainer() + : MetricsEnvironment.getCurrentContainer(); + if (container != null) { + container.getPerWorkerCounter(name).inc(n); + } + } + + /* Decrement the counter. */ + @Override + public void dec() { + inc(-1); + } + + /* Decrement the counter by the given amount. */ + @Override + public void dec(long n) { + inc(-1 * n); + } + + @Override + public MetricName getName() { + return name; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java new file mode 100644 index 000000000000..5320545b17fc --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.util.HistogramData; + +/** Implementation of {@link Histogram} that delegates to the instance for the current context. */ +@Internal +public class DelegatingPerWorkerHistogram implements Metric, Histogram, Serializable { + private final MetricName name; + private final HistogramData.BucketType bucketType; + private final boolean processWideContainer; + + public DelegatingPerWorkerHistogram( + MetricName name, HistogramData.BucketType bucketType, boolean processWideContainer) { + this.name = name; + this.bucketType = bucketType; + this.processWideContainer = processWideContainer; + } + + public DelegatingPerWorkerHistogram( + MetricName name, HistogramData.BucketType bucketType) { + this(name, bucketType, false); + } + + @Override + public void update(double value) { + MetricsContainer container = + processWideContainer + ? MetricsEnvironment.getProcessWideContainer() + : MetricsEnvironment.getCurrentContainer(); + if (container != null) { + container.getPerWorkerHistogram(name, bucketType).update(value); + } + } + + @Override + public MetricName getName() { + return name; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java index 0d45ff0a57fb..93cd50d23e3d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java @@ -19,6 +19,8 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import org.apache.beam.sdk.metrics.NoOpCounter; +import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.joda.time.Duration; @@ -38,12 +40,14 @@ public final class FluentBackoff { private static final Duration DEFAULT_MAX_BACKOFF = Duration.standardDays(1000); private static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; private static final Duration DEFAULT_MAX_CUM_BACKOFF = Duration.standardDays(1000); + private static final Counter DEFAULT_THROTTLED_TIME_COUNTER = NoOpCounter.getInstance(); private final double exponent; private final Duration initialBackoff; private final Duration maxBackoff; private final Duration maxCumulativeBackoff; private final int maxRetries; + private final Counter throttledTimeCounter; /** * By default the {@link BackOff} created by this builder will use exponential backoff (base @@ -65,7 +69,8 @@ public final class FluentBackoff { DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, DEFAULT_MAX_CUM_BACKOFF, - DEFAULT_MAX_RETRIES); + DEFAULT_MAX_RETRIES, + DEFAULT_THROTTLED_TIME_COUNTER); /** * Instantiates a {@link BackOff} that will obey the current configuration. @@ -87,7 +92,7 @@ public BackOff backoff() { public FluentBackoff withExponent(double exponent) { checkArgument(exponent > 0, "exponent %s must be greater than 0", exponent); return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); + exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries, throttledTimeCounter); } /** @@ -104,7 +109,7 @@ public FluentBackoff withInitialBackoff(Duration initialBackoff) { "initialBackoff %s must be at least 1 millisecond", initialBackoff); return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); + exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries, throttledTimeCounter); } /** @@ -119,7 +124,7 @@ public FluentBackoff withMaxBackoff(Duration maxBackoff) { checkArgument( maxBackoff.getMillis() > 0, "maxBackoff %s must be at least 1 millisecond", maxBackoff); return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); + exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries, throttledTimeCounter); } /** @@ -136,7 +141,7 @@ public FluentBackoff withMaxCumulativeBackoff(Duration maxCumulativeBackoff) { "maxCumulativeBackoff %s must be at least 1 millisecond", maxCumulativeBackoff); return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); + exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries, throttledTimeCounter); } /** @@ -151,7 +156,12 @@ public FluentBackoff withMaxCumulativeBackoff(Duration maxCumulativeBackoff) { public FluentBackoff withMaxRetries(int maxRetries) { checkArgument(maxRetries >= 0, "maxRetries %s cannot be negative", maxRetries); return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); + exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries, throttledTimeCounter); + } + + public FluentBackoff withThrottledTimeCounter(Counter throttledTimeCounter) { + return new FluentBackoff( + exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries, throttledTimeCounter); } @Override @@ -206,6 +216,7 @@ public long nextBackOffMillis() { // Update state and return backoff. currentCumulativeBackoff = currentCumulativeBackoff.plus(Duration.millis(nextBackoffMillis)); currentRetry += 1; + backoffConfig.throttledTimeCounter.inc(nextBackoffMillis); return nextBackoffMillis; } @@ -229,11 +240,13 @@ private FluentBackoff( Duration initialBackoff, Duration maxBackoff, Duration maxCumulativeBackoff, - int maxRetries) { + int maxRetries, + Counter throttledTimeCounter) { this.exponent = exponent; this.initialBackoff = initialBackoff; this.maxBackoff = maxBackoff; this.maxRetries = maxRetries; this.maxCumulativeBackoff = maxCumulativeBackoff; + this.throttledTimeCounter = throttledTimeCounter; } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java new file mode 100644 index 000000000000..35118bdccca8 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import io.grpc.Status; +import java.time.Instant; +import java.util.NavigableMap; +import java.util.TreeMap; +import javax.annotation.Nullable; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.DelegatingPerWorkerCounter; +import org.apache.beam.sdk.metrics.DelegatingPerWorkerHistogram; +import org.apache.beam.sdk.metrics.Histogram; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.util.HistogramData; + +/** + * Helper class to create perworker metrics for BigQuery Sink stages. + * + *

In general metrics be in the namespace 'BigQuerySink' and have their name formatted as: + * + *

'{baseName}-{metricLabelKey1}:{metricLabelVal1};...{metricLabelKeyN}:{metricLabelValN};' + */ +public class BigQuerySinkMetrics { + private static Boolean supportMetricsDeletion = false; + + private static final String METRICS_NAMESPACE = "BigQuerySink"; + private static final String UNKNOWN = Status.Code.UNKNOWN.toString(); + + // Base Metric names. + private static final String RPC_REQUESTS = "RpcRequests"; + private static final String RPC_LATENCY = "RpcLatency"; + private static final String APPEND_ROWS_ROW_STATUS = "AppendRowsRowStatus"; + private static final String THROTTLED_TIME = "ThrottledTime"; + + // StorageWriteAPI Method names + enum RpcMethod { + APPEND_ROWS, + FLUSH_ROWS, + FINALIZE_STREAM + } + + // Status of a BigQuery row from the AppendRows RPC call. + enum RowStatus { + SUCCESSFUL, + RETRIED, + FAILED + } + + // Metric labels + private static final String TABLE_ID_LABEL = "TableId"; + private static final String RPC_STATUS_LABEL = "Status"; + private static final String RPC_METHOD = "Method"; + private static final String ROW_STATUS = "RowStatus"; + + // Delimiters + private static final char LABEL_DELIMITER = ';'; + private static final char METRIC_KV_DELIMITER = ':'; + private static final char METRIC_NAME_DELIMITER = '-'; + + /** + * Returns a metric name that merges the baseName with metricLables formatted as: + * + *

'{baseName}-{metricLabelKey1}:{metricLabelVal1};...{metricLabelKeyN}:{metricLabelValN};' + */ + private static String CreateLabeledMetricName( + String baseName, NavigableMap metricLabels) { + StringBuilder nameBuilder = new StringBuilder(baseName + METRIC_NAME_DELIMITER); + + metricLabels.forEach( + (labelKey, labelVal) -> + nameBuilder.append(labelKey + METRIC_KV_DELIMITER + labelVal + LABEL_DELIMITER)); + return nameBuilder.toString(); + } + + /** + * @param method StorageWriteAPI write method. + * @param rpcStatus RPC return status. + * @param tableId Table pertaining to the write method. Only included in the metric key if + * 'supportsMetricsDeletion' is enabled. + * @return Counter in namespace BigQuerySink and name + * 'RpcRequests-Status:{status};TableId:{tableId}' TableId label is dropped if + * 'supportsMetricsDeletion' is not enabled. + */ + private static Counter CreateRPCRequestCounter( + RpcMethod method, String rpcStatus, String tableId) { + NavigableMap metricLabels = new TreeMap(); + metricLabels.put(RPC_STATUS_LABEL, rpcStatus); + metricLabels.put(RPC_METHOD, method.toString()); + if (BigQuerySinkMetrics.supportMetricsDeletion) { + metricLabels.put(TABLE_ID_LABEL, tableId); + } + + String fullMetricName = CreateLabeledMetricName(RPC_REQUESTS, metricLabels); + MetricName metricName = MetricName.named(METRICS_NAMESPACE, fullMetricName); + return new DelegatingPerWorkerCounter(metricName); + } + + /** Creates a counter for the AppendRows RPC call based on the rpcStatus and table. */ + public static Counter AppendRPCsCounter(String rpcStatus, String tableId) { + return CreateRPCRequestCounter(RpcMethod.APPEND_ROWS, rpcStatus, tableId); + } + + /** + * Creates a counter for the FlushRows RPC call based on the rpcStatus. TableId is not known when + * the stream is flushed so we use the placeholder 'UNKNOWN'. + */ + public static Counter FlushRowsCounter(String rpcStatus) { + return CreateRPCRequestCounter(RpcMethod.FLUSH_ROWS, rpcStatus, BigQuerySinkMetrics.UNKNOWN); + } + + /** + * Creates a counter for the FinalizeRows RPC call based on the rpcStatus. TableId is not known + * when the stream is flushed so we use the placeholder 'UNKNOWN'. + */ + public static Counter FinalizeStreamCounter(String rpcStatus) { + return CreateRPCRequestCounter( + RpcMethod.FINALIZE_STREAM, rpcStatus, BigQuerySinkMetrics.UNKNOWN); + } + + /** + * Creates an Histogram metric to record RPC latency. Metric will have name: + * + *

'RpcLatency-Method:{method};' + * + * @param method StorageWriteAPI method associated with this metric. + * @return Histogram with exponential buckets with a sqrt(2) growth factor. + */ + private static Histogram CreateRPCLatencyHistogram(RpcMethod method) { + NavigableMap metricLabels = new TreeMap(); + metricLabels.put(RPC_METHOD, method.toString()); + String fullMetricName = CreateLabeledMetricName(RPC_LATENCY, metricLabels); + MetricName metricName = MetricName.named(METRICS_NAMESPACE, fullMetricName); + + HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 34); + + return new DelegatingPerWorkerHistogram(metricName, buckets); + } + + /** + * Records the time between operationStartTime and OperationEndTime in a PerWorkerHistogram. + * + * @param operationStartTime If null or in the future, this function is a no-op. + * @param OperationEndTime End time of operation. + * @param method StorageWriteAPI write method. + */ + public static void UpdateRpcLatencyMetric( + @Nullable Instant operationStartTime, Instant operationEndTime, RpcMethod method) { + if (operationStartTime == null || operationEndTime == null) { + return; + } + long timeElapsed = java.time.Duration.between(operationStartTime, operationEndTime).toMillis(); + if (timeElapsed > 0) { + BigQuerySinkMetrics.CreateRPCLatencyHistogram(method).update(timeElapsed); + } + } + + /** + * @param rowStatus Status of these BigQuery rows. + * @param rpcStatus rpcStatus + * @param tableId Table pertaining to the write method. Only included in the metric key if + * 'supportsMetricsDeletion' is enabled. + * @return Metric that tracks the status of BigQuery rows after making an AppendRows RPC call. + */ + public static Counter AppendRowsRowStatusCounter( + RowStatus rowStatus, String rpcStatus, String tableId) { + NavigableMap metricLabels = new TreeMap(); + metricLabels.put(RPC_STATUS_LABEL, rpcStatus); + metricLabels.put(ROW_STATUS, rowStatus.toString()); + if (BigQuerySinkMetrics.supportMetricsDeletion) { + metricLabels.put(TABLE_ID_LABEL, tableId); + } + + String fullMetricName = CreateLabeledMetricName(APPEND_ROWS_ROW_STATUS, metricLabels); + MetricName metricName = MetricName.named(METRICS_NAMESPACE, fullMetricName); + return new DelegatingPerWorkerCounter(metricName); + } + + /** Metric that tracks throttled time due between RPC retries. */ + public static Counter ThrottledTimeCounter(RpcMethod method) { + NavigableMap metricLabels = new TreeMap(); + metricLabels.put(RPC_METHOD, method.toString()); + String fullMetricName = CreateLabeledMetricName(THROTTLED_TIME, metricLabels); + MetricName metricName = MetricName.named(METRICS_NAMESPACE, fullMetricName); + + return new DelegatingPerWorkerCounter(metricName); + } + + /** + * Converts a Throwable to a gRPC Status code. + * + * @param t Throwable. + * @return gRPC status code string or 'UNKNOWN' if 't' is null or does not map to a gRPC error. + */ + public static String ThrowableToGRPCCodeString(@Nullable Throwable t) { + if (t == null) { + return BigQuerySinkMetrics.UNKNOWN; + } + return Status.fromThrowable(t).getCode().toString(); + } + + public static void setSupportMetricsDeletion(Boolean supportMetricsDeletion) { + BigQuerySinkMetrics.supportMetricsDeletion = supportMetricsDeletion; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java index 245d05284236..0878f4541176 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java @@ -22,6 +22,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import java.time.Instant; import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -33,6 +34,7 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context; +import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.BackOffUtils; import org.apache.beam.sdk.util.FluentBackoff; @@ -85,10 +87,23 @@ Object getResult() { .backoff(); } + RetryManager( + Duration initialBackoff, Duration maxBackoff, int maxRetries, Counter throttledTimeCounter) { + this.operations = Queues.newArrayDeque(); + backoff = + FluentBackoff.DEFAULT + .withInitialBackoff(initialBackoff) + .withMaxBackoff(maxBackoff) + .withMaxRetries(maxRetries) + .withThrottledTimeCounter(throttledTimeCounter) + .backoff(); + } + static class Operation> { static class Context { private @Nullable Throwable error = null; private @Nullable ResultT result = null; + private @Nullable Instant operationStartTime = null; public void setError(@Nullable Throwable error) { this.error = error; @@ -105,6 +120,14 @@ public void setResult(@Nullable ResultT result) { public @Nullable ResultT getResult() { return result; } + + public void setOperationStartTime(@Nullable Instant operationStartTime) { + this.operationStartTime = operationStartTime; + } + + public @Nullable Instant getOperationStartTime() { + return operationStartTime; + } } private final Function> runOperation; @@ -129,6 +152,7 @@ public Operation( } void run(Executor executor) { + this.context.setOperationStartTime(Instant.now()); this.future = runOperation.apply(context); this.callback = new Callback<>(hasSucceeded); ApiFutures.addCallback(future, callback, executor); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java index 6ce58a10ef8b..831a8ff6a94b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java @@ -22,6 +22,7 @@ import com.google.cloud.bigquery.storage.v1.StorageError; import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode; import java.io.IOException; +import java.time.Instant; import java.util.Collection; import java.util.Map; import java.util.Set; @@ -104,7 +105,12 @@ public void process(PipelineOptions pipelineOptions, @Element KV DatasetService datasetService = getDatasetService(pipelineOptions); RetryManager> retryManager = - new RetryManager<>(Duration.standardSeconds(1), Duration.standardMinutes(1), 3); + new RetryManager<>( + Duration.standardSeconds(1), + Duration.standardMinutes(1), + 3, + BigQuerySinkMetrics.ThrottledTimeCounter( + BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM)); retryManager.addOperation( c -> { finalizeOperationsSent.inc(); @@ -115,6 +121,14 @@ public void process(PipelineOptions pipelineOptions, @Element KV Preconditions.checkArgumentNotNull(Iterables.getFirst(contexts, null)); LOG.error("Finalize of stream " + streamId + " failed with " + firstContext.getError()); finalizeOperationsFailed.inc(); + String errorCode = BigQuerySinkMetrics.ThrowableToGRPCCodeString(firstContext.getError()); + BigQuerySinkMetrics.FinalizeStreamCounter(errorCode).inc(); + + @Nullable Instant operationStartTime = firstContext.getOperationStartTime(); + Instant operationEndTime = Instant.now(); + BigQuerySinkMetrics.UpdateRpcLatencyMetric( + operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM); + return RetryType.RETRY_ALL_OPERATIONS; }, c -> { @@ -126,6 +140,12 @@ public void process(PipelineOptions pipelineOptions, @Element KV rowsFinalized.inc(response.getRowCount()); finalizeOperationsSucceeded.inc(); + BigQuerySinkMetrics.FinalizeStreamCounter("ok").inc(); + @Nullable Instant operationStartTime = c.getOperationStartTime(); + Instant operationEndTime = Instant.now(); + BigQuerySinkMetrics.UpdateRpcLatencyMetric( + operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM); + commitStreams.computeIfAbsent(tableId, d -> Lists.newArrayList()).add(streamId); }, new Context<>()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java index bb2bfba85fbb..0cde0b7017b4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java @@ -142,7 +142,11 @@ public void process(PipelineOptions pipelineOptions, @Element KV= 0) { Instant now = Instant.now(); RetryManager> retryManager = - new RetryManager<>(Duration.standardSeconds(1), Duration.standardMinutes(1), 3); + new RetryManager<>( + Duration.standardSeconds(1), + Duration.standardMinutes(1), + 3, + BigQuerySinkMetrics.ThrottledTimeCounter(BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS)); retryManager.addOperation( // runOperation c -> { @@ -155,11 +159,19 @@ public void process(PipelineOptions pipelineOptions, @Element KV { - Throwable error = - Preconditions.checkArgumentNotNull(Iterables.getFirst(contexts, null)).getError(); + Context failedContext = + Preconditions.checkArgumentNotNull(Iterables.getFirst(contexts, null)); + Throwable error = failedContext.getError(); LOG.warn( "Flush of stream " + streamId + " to offset " + offset + " failed with " + error); flushOperationsFailed.inc(); + String errorCode = BigQuerySinkMetrics.ThrowableToGRPCCodeString(error); + BigQuerySinkMetrics.FlushRowsCounter(errorCode).inc(); + @Nullable Instant operationStartTime = failedContext.getOperationStartTime(); + Instant operationEndTime = Instant.now(); + BigQuerySinkMetrics.UpdateRpcLatencyMetric( + operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS); + if (error instanceof ApiException) { Code statusCode = ((ApiException) error).getStatusCode().getCode(); if (statusCode.equals(Code.ALREADY_EXISTS)) { @@ -181,6 +193,13 @@ public void process(PipelineOptions pipelineOptions, @Element KV { + BigQuerySinkMetrics.FlushRowsCounter("ok").inc(); + if (c != null) { + BigQuerySinkMetrics.UpdateRpcLatencyMetric( + c.getOperationStartTime(), + Instant.now(), + BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS); + } flushOperationsSucceeded.inc(); }, new Context<>()); @@ -198,10 +217,23 @@ public void process(PipelineOptions pipelineOptions, @Element KV> retryManager = - new RetryManager<>(Duration.standardSeconds(1), Duration.standardMinutes(1), 3); + new RetryManager<>( + Duration.standardSeconds(1), + Duration.standardMinutes(1), + 3, + BigQuerySinkMetrics.ThrottledTimeCounter( + BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM)); retryManager.addOperation( c -> { finalizeOperationsSent.inc(); + if (c != null) { + BigQuerySinkMetrics.UpdateRpcLatencyMetric( + c.getOperationStartTime(), + Instant.now(), + BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS); + } + BigQuerySinkMetrics.FinalizeStreamCounter("ok").inc(); + return datasetService.finalizeWriteStream(streamId); }, contexts -> { @@ -215,6 +247,16 @@ public void process(PipelineOptions pipelineOptions, @Element KV firstContext = Iterables.getFirst(contexts, null); @Nullable Throwable error = firstContext == null ? null : firstContext.getError(); + + String errorCode = BigQuerySinkMetrics.ThrowableToGRPCCodeString(error); + BigQuerySinkMetrics.FinalizeStreamCounter(errorCode).inc(); + if (firstContext != null) { + @Nullable Instant operationStartTime = firstContext.getOperationStartTime(); + Instant operationEndTime = Instant.now(); + BigQuerySinkMetrics.UpdateRpcLatencyMetric( + operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS); + } + if (error instanceof ApiException) { Code statusCode = ((ApiException) error).getStatusCode().getCode(); if (statusCode.equals(Code.NOT_FOUND)) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 21c2a485c279..587e1cd82c0b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -265,6 +265,7 @@ public AppendRowsContext( class DestinationState { private final String tableUrn; + private final String shortTableUrn; private String streamName = ""; private @Nullable AppendClientInfo appendClientInfo = null; private long currentOffset = 0; @@ -295,6 +296,7 @@ class DestinationState { public DestinationState( String tableUrn, + String shortTableUrn, MessageConverter messageConverter, DatasetService datasetService, boolean useDefaultStream, @@ -305,6 +307,7 @@ public DestinationState( boolean includeCdcColumns) throws Exception { this.tableUrn = tableUrn; + this.shortTableUrn = shortTableUrn; this.pendingMessages = Lists.newArrayList(); this.pendingTimestamps = Lists.newArrayList(); this.maybeDatasetService = datasetService; @@ -608,7 +611,11 @@ long flush( failedRow, "Row payload too large. Maximum size " + maxRequestSize), timestamp); } - rowsSentToFailedRowsCollection.inc(inserts.getSerializedRowsCount()); + int numRowsFailed = inserts.getSerializedRowsCount(); + BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.FAILED, "PayloadTooLarge", shortTableUrn) + .inc(numRowsFailed); + rowsSentToFailedRowsCollection.inc(numRowsFailed); return 0; } @@ -650,11 +657,20 @@ long flush( contexts -> { AppendRowsContext failedContext = Preconditions.checkStateNotNull(Iterables.getFirst(contexts, null)); + Instant operationEndTime = Instant.now(); + @Nullable Instant operationStartTime = failedContext.getOperationStartTime(); + BigQuerySinkMetrics.UpdateRpcLatencyMetric( + operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS); + String errorCode = + BigQuerySinkMetrics.ThrowableToGRPCCodeString(failedContext.getError()); + BigQuerySinkMetrics.AppendRPCsCounter(errorCode, shortTableUrn).inc(); + if (failedContext.getError() != null && failedContext.getError() instanceof Exceptions.AppendSerializtionError) { Exceptions.AppendSerializtionError error = Preconditions.checkStateNotNull( (Exceptions.AppendSerializtionError) failedContext.getError()); + Set failedRowIndices = error.getRowIndexToErrorMessage().keySet(); for (int failedIndex : failedRowIndices) { // Convert the message to a TableRow and send it to the failedRows collection. @@ -677,7 +693,11 @@ long flush( LOG.error("Failed to insert row and could not parse the result!", e); } } - rowsSentToFailedRowsCollection.inc(failedRowIndices.size()); + int numRowsFailed = failedRowIndices.size(); + rowsSentToFailedRowsCollection.inc(numRowsFailed); + BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.FAILED, errorCode, shortTableUrn) + .inc(numRowsFailed); // Remove the failed row from the payload, so we retry the batch without the failed // rows. @@ -692,6 +712,10 @@ long flush( } failedContext.protoRows = retryRows.build(); failedContext.timestamps = retryTimestamps; + int numRowsRetried = failedContext.protoRows.getSerializedRowsCount(); + BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableUrn) + .inc(numRowsRetried); // Since we removed rows, we need to update the insert offsets for all remaining // rows. @@ -732,6 +756,8 @@ long flush( // The following errors are known to be persistent, so always fail the work item in // this case. + Throwable error = Preconditions.checkStateNotNull(failedContext.getError()); + Status.Code statusCode = Status.fromThrowable(error).getCode(); if (statusCode.equals(Status.Code.OUT_OF_RANGE) || statusCode.equals(Status.Code.ALREADY_EXISTS)) { throw new RuntimeException( @@ -762,11 +788,27 @@ long flush( throw new RuntimeException(e); } + int numRowsRetried = failedContext.protoRows.getSerializedRowsCount(); + BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableUrn) + .inc(numRowsRetried); + appendFailures.inc(); return RetryType.RETRY_ALL_OPERATIONS; }, c -> { - recordsAppended.inc(c.protoRows.getSerializedRowsCount()); + BigQuerySinkMetrics.AppendRPCsCounter("ok", shortTableUrn).inc(); + int numRecordsAppended = c.protoRows.getSerializedRowsCount(); + recordsAppended.inc(numRecordsAppended); + BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.SUCCESSFUL, "ok", shortTableUrn) + .inc(numRecordsAppended); + + @Nullable Instant operationStartTime = c.getOperationStartTime(); + Instant operationEndTime = Instant.now(); + BigQuerySinkMetrics.UpdateRpcLatencyMetric( + operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS); + if (successfulRowsReceiver != null) { for (int i = 0; i < c.protoRows.getSerializedRowsCount(); ++i) { ByteString rowBytes = c.protoRows.getSerializedRowsList().get(i); @@ -903,7 +945,12 @@ void flushAll( for (DestinationState destinationState : Preconditions.checkStateNotNull(destinations).values()) { RetryManager retryManager = - new RetryManager<>(Duration.standardSeconds(1), Duration.standardSeconds(10), 1000); + new RetryManager<>( + Duration.standardSeconds(1), + Duration.standardSeconds(10), + 1000, + BigQuerySinkMetrics.ThrottledTimeCounter( + BigQuerySinkMetrics.RpcMethod.APPEND_ROWS)); retryManagers.add(retryManager); numRowsWritten += destinationState.flush(retryManager, failedRowsReceiver, successfulRowsReceiver); @@ -974,6 +1021,7 @@ DestinationState createDestinationState( messageConverter = messageConverters.get(destination, dynamicDestinations, datasetService); return new DestinationState( tableDestination1.getTableUrn(bigQueryOptions), + tableDestination1.getShortTableUrn(), messageConverter, datasetService, useDefaultStream, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index f4982396e9d5..6e59e31719d6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -440,6 +440,7 @@ public void process( return tableDestination1; }); final String tableId = tableDestination.getTableUrn(bigQueryOptions); + final String shortTableId = tableDestination.getShortTableUrn(); final DatasetService datasetService = getDatasetService(pipelineOptions); Coder destinationCoder = dynamicDestinations.getDestinationCoder(); @@ -545,6 +546,9 @@ public void process( new BigQueryStorageApiInsertError(failedRow.getValue(), errorMessage), failedRow.getTimestamp()); rowsSentToFailedRowsCollection.inc(); + BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.FAILED, "PayloadTooLarge", shortTableId) + .inc(1); }, autoUpdateSchema, ignoreUnknownValues, @@ -628,6 +632,10 @@ public void process( // The first context is always the one that fails. AppendRowsContext failedContext = Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null)); + @Nullable Instant operationStartTime = failedContext.getOperationStartTime(); + Instant operationEndTime = Instant.now(); + BigQuerySinkMetrics.UpdateRpcLatencyMetric( + operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS); // AppendSerializationError means that BigQuery detected errors on individual rows, e.g. // a row not conforming @@ -639,6 +647,9 @@ public void process( Exceptions.AppendSerializtionError error = Preconditions.checkArgumentNotNull( (Exceptions.AppendSerializtionError) failedContext.getError()); + + String errorCode = BigQuerySinkMetrics.ThrowableToGRPCCodeString(error); + BigQuerySinkMetrics.AppendRPCsCounter(errorCode, shortTableId).inc(); Set failedRowIndices = error.getRowIndexToErrorMessage().keySet(); for (int failedIndex : failedRowIndices) { // Convert the message to a TableRow and send it to the failedRows collection. @@ -651,7 +662,11 @@ public void process( failedRow, error.getRowIndexToErrorMessage().get(failedIndex)), timestamp); } - rowsSentToFailedRowsCollection.inc(failedRowIndices.size()); + int failedRows = failedRowIndices.size(); + rowsSentToFailedRowsCollection.inc(failedRows); + BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.FAILED, errorCode, shortTableId) + .inc(failedRows); // Remove the failed row from the payload, so we retry the batch without the failed // rows. @@ -666,6 +681,10 @@ public void process( } failedContext.protoRows = retryRows.build(); failedContext.timestamps = timestamps; + int retriedRows = failedContext.protoRows.getSerializedRowsCount(); + BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId) + .inc(retriedRows); // Since we removed rows, we need to update the insert offsets for all remaining rows. long offset = failedContext.offset; @@ -679,6 +698,8 @@ public void process( Throwable error = Preconditions.checkStateNotNull(failedContext.getError()); Status.Code statusCode = Status.fromThrowable(error).getCode(); + String errorCode = statusCode.toString(); + BigQuerySinkMetrics.AppendRPCsCounter(errorCode, shortTableId).inc(); // This means that the offset we have stored does not match the current end of // the stream in the Storage API. Usually this happens because a crash or a bundle // failure @@ -711,6 +732,10 @@ public void process( clearClients.accept(failedContexts); } appendFailures.inc(); + int retriedRows = failedContext.protoRows.getSerializedRowsCount(); + BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId) + .inc(retriedRows); boolean explicitStreamFinalized = failedContext.getError() instanceof StreamFinalizedException; @@ -755,7 +780,16 @@ public void process( new Operation( context.offset + context.protoRows.getSerializedRowsCount() - 1, false))); - flushesScheduled.inc(context.protoRows.getSerializedRowsCount()); + int flushedRows = context.protoRows.getSerializedRowsCount(); + flushesScheduled.inc(flushedRows); + BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.SUCCESSFUL, "ok", shortTableId) + .inc(flushedRows); + BigQuerySinkMetrics.AppendRPCsCounter("ok", shortTableId).inc(); + @Nullable Instant operationStartTime = context.getOperationStartTime(); + Instant operationEndTime = Instant.now(); + BigQuerySinkMetrics.UpdateRpcLatencyMetric( + operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS); if (successfulRowsTag != null) { for (int i = 0; i < context.protoRows.getSerializedRowsCount(); ++i) { @@ -769,7 +803,11 @@ public void process( Instant now = Instant.now(); List contexts = Lists.newArrayList(); RetryManager retryManager = - new RetryManager<>(Duration.standardSeconds(1), Duration.standardSeconds(10), 1000); + new RetryManager<>( + Duration.standardSeconds(1), + Duration.standardSeconds(10), + 1000, + BigQuerySinkMetrics.ThrottledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS)); int numAppends = 0; for (SplittingIterable.Value splitValue : messages) { // Handle the case of a row that is too large. @@ -794,7 +832,11 @@ public void process( failedRow, "Row payload too large. Maximum size " + maxRequestSize), timestamp); } - rowsSentToFailedRowsCollection.inc(splitValue.getProtoRows().getSerializedRowsCount()); + int numRowsFailed = splitValue.getProtoRows().getSerializedRowsCount(); + rowsSentToFailedRowsCollection.inc(numRowsFailed); + BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.FAILED, "PayloadTooLarge", shortTableId) + .inc(numRowsFailed); } else { ++numAppends; // RetryManager diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java index 3f9f1b750ccd..df96e1bc2260 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -136,6 +136,12 @@ public String getTableUrn(BigQueryOptions bigQueryOptions) { table.getProjectId(), table.getDatasetId(), table.getTableId()); } + /** Return shortened tablespec in datasets/[dataset]/tables/[table] format. */ + public String getShortTableUrn() { + TableReference table = getTableReference(); + return String.format("datasets/%s/tables/%s", table.getDatasetId(), table.getTableId()); + } + public TableReference getTableReference() { return BigQueryHelpers.parseTableSpec(tableSpec); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java new file mode 100644 index 000000000000..e5569c502b71 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; + +import com.google.cloud.bigquery.storage.v1.Exceptions; +import io.grpc.Status; +import java.time.Instant; +import java.util.List; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Histogram; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.util.HistogramData; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link BigQuerySinkMetrics}. */ +@RunWith(JUnit4.class) +public class BigQuerySinkMetricsTest { + + public static class TestHistogram implements Histogram { + public List values = Lists.newArrayList(); + private MetricName metricName = MetricName.named("namespace", "name"); + + @Override + public void update(double value) { + values.add(value); + } + + @Override + public MetricName getName() { + return metricName; + } + } + + public static class TestMetricsContainer extends MetricsContainerImpl { + + public TestHistogram testHistogram = new TestHistogram(); + + public TestMetricsContainer() { + super("TestStep"); + } + + @Override + public Histogram getPerWorkerHistogram( + MetricName metricName, HistogramData.BucketType bucketType) { + return testHistogram; + } + } + + @Test + public void testAppendRPCsCounter() throws Exception { + BigQuerySinkMetrics.setSupportMetricsDeletion(false); + Counter deletesEnabledCounter = BigQuerySinkMetrics.AppendRPCsCounter("rpcStatus", "tableId"); + assertThat( + deletesEnabledCounter.getName().getName(), + equalTo("RpcRequests-Method:APPEND_ROWS;Status:rpcStatus;")); + + BigQuerySinkMetrics.setSupportMetricsDeletion(true); + Counter deletesDisabledCounter = BigQuerySinkMetrics.AppendRPCsCounter("rpcStatus", "tableId"); + assertThat( + deletesDisabledCounter.getName().getName(), + equalTo("RpcRequests-Method:APPEND_ROWS;Status:rpcStatus;TableId:tableId;")); + } + + @Test + public void testFlushRowsCounter() throws Exception { + BigQuerySinkMetrics.setSupportMetricsDeletion(false); + Counter deletesEnabledCounter = BigQuerySinkMetrics.FlushRowsCounter("rpcStatus"); + assertThat( + deletesEnabledCounter.getName().getName(), + equalTo("RpcRequests-Method:FLUSH_ROWS;Status:rpcStatus;")); + + BigQuerySinkMetrics.setSupportMetricsDeletion(true); + Counter deletesDisabledCounter = BigQuerySinkMetrics.FlushRowsCounter("rpcStatus"); + assertThat( + deletesDisabledCounter.getName().getName(), + equalTo("RpcRequests-Method:FLUSH_ROWS;Status:rpcStatus;TableId:UNKNOWN;")); + } + + @Test + public void testAppendRowsRowStatusCounter() throws Exception { + BigQuerySinkMetrics.setSupportMetricsDeletion(false); + Counter deletesEnabledCounter = + BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.SUCCESSFUL, "rpcStatus", "tableId"); + assertThat( + deletesEnabledCounter.getName().getName(), + equalTo("AppendRowsRowStatus-RowStatus:SUCCESSFUL;Status:rpcStatus;")); + + BigQuerySinkMetrics.setSupportMetricsDeletion(true); + Counter deletesDisabledCounter = + BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.SUCCESSFUL, "rpcStatus", "tableId"); + assertThat( + deletesDisabledCounter.getName().getName(), + equalTo("AppendRowsRowStatus-RowStatus:SUCCESSFUL;Status:rpcStatus;TableId:tableId;")); + } + + @Test + public void testThrowableToGRPCCodeString() throws Exception { + Throwable nullThrowable = null; + assertThat(BigQuerySinkMetrics.ThrowableToGRPCCodeString(nullThrowable), equalTo("UNKNOWN")); + + Throwable nonGrpcError = new IndexOutOfBoundsException("Test Error"); + assertThat(BigQuerySinkMetrics.ThrowableToGRPCCodeString(nonGrpcError), equalTo("UNKNOWN")); + + int not_found_val = Status.Code.NOT_FOUND.value(); + Throwable grpcError = + new Exceptions.AppendSerializtionError(not_found_val, "Test Error", "Stream name", null); + assertThat(BigQuerySinkMetrics.ThrowableToGRPCCodeString(grpcError), equalTo("NOT_FOUND")); + } + + @Test + public void testThrottledTimeCounter() throws Exception { + Counter appendRowsThrottleCounter = + BigQuerySinkMetrics.ThrottledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS); + assertThat( + appendRowsThrottleCounter.getName().getName(), + equalTo("ThrottledTime-Method:APPEND_ROWS;")); + } + + @Test + public void testUpdateRpcLatencyMetric() throws Exception { + TestMetricsContainer testContainer = new TestMetricsContainer(); + MetricsEnvironment.setCurrentContainer(testContainer); + BigQuerySinkMetrics.RpcMethod append = BigQuerySinkMetrics.RpcMethod.APPEND_ROWS; + Instant t1 = Instant.now(); + + // Expect no updates to the histogram when we pass a null instant. + BigQuerySinkMetrics.UpdateRpcLatencyMetric(null, t1, append); + BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1, null, append); + assertThat(testContainer.testHistogram.values.size(), equalTo(0)); + + // Expect no updates when end time is before start time. + BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1, t1.minusMillis(5), append); + assertThat(testContainer.testHistogram.values.size(), equalTo(0)); + + // Expect valid updates to be recorded in the underlying histogram. + BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1.minusMillis(5), t1, append); + BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1.minusMillis(10), t1, append); + BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1.minusMillis(15), t1, append); + assertThat( + testContainer.testHistogram.values, + containsInAnyOrder(Double.valueOf(5.0), Double.valueOf(10.0), Double.valueOf(15.0))); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManagerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManagerTest.java index 958fd356344c..804552fc83c1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManagerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManagerTest.java @@ -22,9 +22,11 @@ import com.google.api.core.ApiFutures; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.metrics.CounterCell; import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation; import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context; import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType; +import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.joda.time.Duration; @@ -44,8 +46,10 @@ static class Context extends Operation.Context { @Test public void testNoFailures() throws Exception { List contexts = Lists.newArrayList(); + MetricName metricName = MetricName.named("TEST_NAMESPACE", "THROTTLED_COUNTER"); + CounterCell throttleTimeCounter = new CounterCell(metricName); RetryManager retryManager = - new RetryManager<>(Duration.millis(1), Duration.millis(1), 5); + new RetryManager<>(Duration.millis(1), Duration.millis(1), 5, throttleTimeCounter); for (int i = 0; i < 5; ++i) { Context context = new Context(); contexts.add(context); @@ -74,6 +78,8 @@ public void testNoFailures() throws Exception { assertEquals(1, c.numSucceeded); assertEquals(0, c.numFailed); }); + + assertEquals(0L, (long) throttleTimeCounter.getCumulative()); } @Test @@ -82,8 +88,11 @@ public void testRetryInOrder() throws Exception { Map expectedStarts = Maps.newHashMap(); Map expectedFailures = Maps.newHashMap(); + MetricName metricName = MetricName.named("TEST_NAMESPACE", "THROTTLED_COUNTER"); + CounterCell throttleTimeCounter = new CounterCell(metricName); + RetryManager retryManager = - new RetryManager<>(Duration.millis(1), Duration.millis(1), 50); + new RetryManager<>(Duration.millis(1), Duration.millis(1), 50, throttleTimeCounter); for (int i = 0; i < 5; ++i) { final int index = i; String value = "yes " + i; @@ -129,14 +138,18 @@ public void testRetryInOrder() throws Exception { assertEquals(1, e.getValue().numSucceeded); assertEquals((int) expectedFailures.get(e.getKey()), e.getValue().numFailed); }); + // Each operation backsoff once and each backoff is 1ms. + assertEquals(5L, (long) throttleTimeCounter.getCumulative()); } @Test public void testDontRetry() throws Exception { List contexts = Lists.newArrayList(); + MetricName metricName = MetricName.named("TEST_NAMESPACE", "THROTTLED_COUNTER"); + CounterCell throttleTimeCounter = new CounterCell(metricName); RetryManager retryManager = - new RetryManager<>(Duration.millis(1), Duration.millis(1), 50); + new RetryManager<>(Duration.millis(1), Duration.millis(1), 50, throttleTimeCounter); for (int i = 0; i < 5; ++i) { Context context = new Context(); contexts.add(context); @@ -172,6 +185,8 @@ public void testDontRetry() throws Exception { assertEquals(0, c.numSucceeded); assertEquals(1, c.numFailed); }); + + assertEquals(0L, (long) throttleTimeCounter.getCumulative()); } @Test From 1589d44c92237122f93cac4cabb10bda0b413b3b Mon Sep 17 00:00:00 2001 From: Jayaj Poudel Date: Fri, 20 Oct 2023 22:09:08 +0000 Subject: [PATCH 2/4] Spotless check --- .../metrics/DelegatingPerWorkerHistogram.java | 3 +- .../apache/beam/sdk/util/FluentBackoff.java | 44 ++++++++++++++++--- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java index 5320545b17fc..e9e520791c41 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java @@ -35,8 +35,7 @@ public DelegatingPerWorkerHistogram( this.processWideContainer = processWideContainer; } - public DelegatingPerWorkerHistogram( - MetricName name, HistogramData.BucketType bucketType) { + public DelegatingPerWorkerHistogram(MetricName name, HistogramData.BucketType bucketType) { this(name, bucketType, false); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java index 93cd50d23e3d..ce10885b0686 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java @@ -19,8 +19,8 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import org.apache.beam.sdk.metrics.NoOpCounter; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.NoOpCounter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.joda.time.Duration; @@ -92,7 +92,12 @@ public BackOff backoff() { public FluentBackoff withExponent(double exponent) { checkArgument(exponent > 0, "exponent %s must be greater than 0", exponent); return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries, throttledTimeCounter); + exponent, + initialBackoff, + maxBackoff, + maxCumulativeBackoff, + maxRetries, + throttledTimeCounter); } /** @@ -109,7 +114,12 @@ public FluentBackoff withInitialBackoff(Duration initialBackoff) { "initialBackoff %s must be at least 1 millisecond", initialBackoff); return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries, throttledTimeCounter); + exponent, + initialBackoff, + maxBackoff, + maxCumulativeBackoff, + maxRetries, + throttledTimeCounter); } /** @@ -124,7 +134,12 @@ public FluentBackoff withMaxBackoff(Duration maxBackoff) { checkArgument( maxBackoff.getMillis() > 0, "maxBackoff %s must be at least 1 millisecond", maxBackoff); return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries, throttledTimeCounter); + exponent, + initialBackoff, + maxBackoff, + maxCumulativeBackoff, + maxRetries, + throttledTimeCounter); } /** @@ -141,7 +156,12 @@ public FluentBackoff withMaxCumulativeBackoff(Duration maxCumulativeBackoff) { "maxCumulativeBackoff %s must be at least 1 millisecond", maxCumulativeBackoff); return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries, throttledTimeCounter); + exponent, + initialBackoff, + maxBackoff, + maxCumulativeBackoff, + maxRetries, + throttledTimeCounter); } /** @@ -156,12 +176,22 @@ public FluentBackoff withMaxCumulativeBackoff(Duration maxCumulativeBackoff) { public FluentBackoff withMaxRetries(int maxRetries) { checkArgument(maxRetries >= 0, "maxRetries %s cannot be negative", maxRetries); return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries, throttledTimeCounter); + exponent, + initialBackoff, + maxBackoff, + maxCumulativeBackoff, + maxRetries, + throttledTimeCounter); } public FluentBackoff withThrottledTimeCounter(Counter throttledTimeCounter) { return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries, throttledTimeCounter); + exponent, + initialBackoff, + maxBackoff, + maxCumulativeBackoff, + maxRetries, + throttledTimeCounter); } @Override From ba0850123b3df04f5d489de27feae1d3c3a7b557 Mon Sep 17 00:00:00 2001 From: Jayaj Poudel Date: Wed, 8 Nov 2023 18:40:09 +0000 Subject: [PATCH 3/4] Address comments and spotless fixes --- .../beam/sdk/metrics/DelegatingCounter.java | 29 +- .../beam/sdk/metrics/DelegatingHistogram.java | 33 ++- .../metrics/DelegatingPerWorkerCounter.java | 72 ----- .../metrics/DelegatingPerWorkerHistogram.java | 57 ---- .../io/gcp/bigquery/BigQuerySinkMetrics.java | 152 +++++++---- .../sdk/io/gcp/bigquery/RetryManager.java | 24 ++ .../StorageApiFinalizeWritesDoFn.java | 19 +- .../StorageApiFlushAndFinalizeDoFn.java | 41 +-- .../StorageApiWriteUnshardedRecords.java | 38 ++- .../StorageApiWritesShardedRecords.java | 45 ++-- .../gcp/bigquery/BigQuerySinkMetricsTest.java | 253 ++++++++++++++---- 11 files changed, 427 insertions(+), 336 deletions(-) delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerCounter.java delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingCounter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingCounter.java index 82808d8cb633..ece0c70348c4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingCounter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingCounter.java @@ -25,14 +25,34 @@ public class DelegatingCounter implements Metric, Counter, Serializable { private final MetricName name; private final boolean processWideContainer; + private final boolean perWorkerCounter; + /** + * Create a {@code DelegatingCounter} with {@code perWorkerCounter} and {@code processWideContainer} set to false. + * @param name Metric name for this metric. + */ public DelegatingCounter(MetricName name) { - this(name, false); + this(name, false, false); } + /** + * Create a {@code DelegatingCounter} with {@code perWorkerCounter} set to false. + * @param name Metric name for this metric. + * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the current thread's container. + */ public DelegatingCounter(MetricName name, boolean processWideContainer) { + this(name, processWideContainer, false); + } + + /** + * @param name Metric name for this metric. + * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the current thread's container. + * @param perWorkerCounter Whether this Counter refers to a perWorker metric or not. + */ + public DelegatingCounter(MetricName name, boolean processWideContainer, boolean perWorkerCounter) { this.name = name; this.processWideContainer = processWideContainer; + this.perWorkerCounter = perWorkerCounter; } /** Increment the counter. */ @@ -48,7 +68,12 @@ public void inc(long n) { this.processWideContainer ? MetricsEnvironment.getProcessWideContainer() : MetricsEnvironment.getCurrentContainer(); - if (container != null) { + if (container == null) { + return; + } + if (perWorkerCounter) { + container.getPerWorkerCounter(name).inc(n); + } else { container.getCounter(name).inc(n); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java index 74e3cf5719f7..b5c458836444 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java @@ -27,21 +27,46 @@ public class DelegatingHistogram implements Metric, Histogram, Serializable { private final MetricName name; private final HistogramData.BucketType bucketType; private final boolean processWideContainer; + private final boolean perWorkerHistogram; + /** + * Create a {@code DelegatingHistogram} with {@code perWorkerHistogram} set to false. + * @param name Metric name for this metric. + * @param bucketType Histogram bucketing strategy. + * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the current thread's container. + */ public DelegatingHistogram( MetricName name, HistogramData.BucketType bucketType, boolean processWideContainer) { - this.name = name; - this.bucketType = bucketType; - this.processWideContainer = processWideContainer; + this(name, bucketType, processWideContainer, false); } + + /** + * @param name Metric name for this metric. + * @param bucketType Histogram bucketing strategy. + * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the current thread's container. + * @param perWorkerHistogram Whether this Histogram refers to a perWorker metric or not. + */ + public DelegatingHistogram( + MetricName name, HistogramData.BucketType bucketType, boolean processWideContainer, boolean perWorkerHistogram) { + this.name = name; + this.bucketType = bucketType; + this.processWideContainer = processWideContainer; + this.perWorkerHistogram = perWorkerHistogram; + } + @Override public void update(double value) { MetricsContainer container = processWideContainer ? MetricsEnvironment.getProcessWideContainer() : MetricsEnvironment.getCurrentContainer(); - if (container != null) { + if (container == null) { + return; + } + if (perWorkerHistogram) { + container.getPerWorkerHistogram(name, bucketType).update(value); + } else { container.getHistogram(name, bucketType).update(value); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerCounter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerCounter.java deleted file mode 100644 index e8f6c7110eb8..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerCounter.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import java.io.Serializable; -import org.apache.beam.sdk.annotations.Internal; - -/** Implementation of {@link Counter} that delegates to the instance for the current context. */ -@Internal -public class DelegatingPerWorkerCounter implements Metric, Counter, Serializable { - private final MetricName name; - private final boolean processWideContainer; - - public DelegatingPerWorkerCounter(MetricName name) { - this(name, false); - } - - public DelegatingPerWorkerCounter(MetricName name, boolean processWideContainer) { - this.name = name; - this.processWideContainer = processWideContainer; - } - - /** Increment the counter. */ - @Override - public void inc() { - inc(1); - } - - /** Increment the counter by the given amount. */ - @Override - public void inc(long n) { - MetricsContainer container = - this.processWideContainer - ? MetricsEnvironment.getProcessWideContainer() - : MetricsEnvironment.getCurrentContainer(); - if (container != null) { - container.getPerWorkerCounter(name).inc(n); - } - } - - /* Decrement the counter. */ - @Override - public void dec() { - inc(-1); - } - - /* Decrement the counter by the given amount. */ - @Override - public void dec(long n) { - inc(-1 * n); - } - - @Override - public MetricName getName() { - return name; - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java deleted file mode 100644 index e9e520791c41..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingPerWorkerHistogram.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.metrics; - -import java.io.Serializable; -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.util.HistogramData; - -/** Implementation of {@link Histogram} that delegates to the instance for the current context. */ -@Internal -public class DelegatingPerWorkerHistogram implements Metric, Histogram, Serializable { - private final MetricName name; - private final HistogramData.BucketType bucketType; - private final boolean processWideContainer; - - public DelegatingPerWorkerHistogram( - MetricName name, HistogramData.BucketType bucketType, boolean processWideContainer) { - this.name = name; - this.bucketType = bucketType; - this.processWideContainer = processWideContainer; - } - - public DelegatingPerWorkerHistogram(MetricName name, HistogramData.BucketType bucketType) { - this(name, bucketType, false); - } - - @Override - public void update(double value) { - MetricsContainer container = - processWideContainer - ? MetricsEnvironment.getProcessWideContainer() - : MetricsEnvironment.getCurrentContainer(); - if (container != null) { - container.getPerWorkerHistogram(name, bucketType).update(value); - } - } - - @Override - public MetricName getName() { - return name; - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java index 35118bdccca8..f8984c62c911 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java @@ -21,10 +21,12 @@ import java.time.Instant; import java.util.NavigableMap; import java.util.TreeMap; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context; import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.DelegatingPerWorkerCounter; -import org.apache.beam.sdk.metrics.DelegatingPerWorkerHistogram; +import org.apache.beam.sdk.metrics.DelegatingCounter; +import org.apache.beam.sdk.metrics.DelegatingHistogram; import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.util.HistogramData; @@ -40,9 +42,13 @@ public class BigQuerySinkMetrics { private static Boolean supportMetricsDeletion = false; private static final String METRICS_NAMESPACE = "BigQuerySink"; + + // Status codes private static final String UNKNOWN = Status.Code.UNKNOWN.toString(); + public static final String OK = Status.Code.OK.toString(); + public static final String PAYLOAD_TOO_LARGE = "PayloadTooLarge"; - // Base Metric names. + // Base Metric names private static final String RPC_REQUESTS = "RpcRequests"; private static final String RPC_LATENCY = "RpcLatency"; private static final String APPEND_ROWS_ROW_STATUS = "AppendRowsRowStatus"; @@ -64,7 +70,7 @@ enum RowStatus { // Metric labels private static final String TABLE_ID_LABEL = "TableId"; - private static final String RPC_STATUS_LABEL = "Status"; + private static final String RPC_STATUS_LABEL = "RpcStatus"; private static final String RPC_METHOD = "Method"; private static final String ROW_STATUS = "RowStatus"; @@ -74,11 +80,11 @@ enum RowStatus { private static final char METRIC_NAME_DELIMITER = '-'; /** - * Returns a metric name that merges the baseName with metricLables formatted as: + * Returns a metric name that merges the baseName with metricLables formatted as.: * *

'{baseName}-{metricLabelKey1}:{metricLabelVal1};...{metricLabelKeyN}:{metricLabelValN};' */ - private static String CreateLabeledMetricName( + private static String createLabeledMetricName( String baseName, NavigableMap metricLabels) { StringBuilder nameBuilder = new StringBuilder(baseName + METRIC_NAME_DELIMITER); @@ -89,15 +95,15 @@ private static String CreateLabeledMetricName( } /** - * @param method StorageWriteAPI write method. + * @param method StorageWriteAPI method associated with this metric. * @param rpcStatus RPC return status. * @param tableId Table pertaining to the write method. Only included in the metric key if * 'supportsMetricsDeletion' is enabled. * @return Counter in namespace BigQuerySink and name - * 'RpcRequests-Status:{status};TableId:{tableId}' TableId label is dropped if - * 'supportsMetricsDeletion' is not enabled. + * 'RpcRequests-Method:{method}RpcStatus:{status};TableId:{tableId}' TableId label is dropped + * if 'supportsMetricsDeletion' is not enabled. */ - private static Counter CreateRPCRequestCounter( + private static Counter createRPCRequestCounter( RpcMethod method, String rpcStatus, String tableId) { NavigableMap metricLabels = new TreeMap(); metricLabels.put(RPC_STATUS_LABEL, rpcStatus); @@ -106,67 +112,45 @@ private static Counter CreateRPCRequestCounter( metricLabels.put(TABLE_ID_LABEL, tableId); } - String fullMetricName = CreateLabeledMetricName(RPC_REQUESTS, metricLabels); + String fullMetricName = createLabeledMetricName(RPC_REQUESTS, metricLabels); MetricName metricName = MetricName.named(METRICS_NAMESPACE, fullMetricName); - return new DelegatingPerWorkerCounter(metricName); - } - - /** Creates a counter for the AppendRows RPC call based on the rpcStatus and table. */ - public static Counter AppendRPCsCounter(String rpcStatus, String tableId) { - return CreateRPCRequestCounter(RpcMethod.APPEND_ROWS, rpcStatus, tableId); - } - - /** - * Creates a counter for the FlushRows RPC call based on the rpcStatus. TableId is not known when - * the stream is flushed so we use the placeholder 'UNKNOWN'. - */ - public static Counter FlushRowsCounter(String rpcStatus) { - return CreateRPCRequestCounter(RpcMethod.FLUSH_ROWS, rpcStatus, BigQuerySinkMetrics.UNKNOWN); - } - - /** - * Creates a counter for the FinalizeRows RPC call based on the rpcStatus. TableId is not known - * when the stream is flushed so we use the placeholder 'UNKNOWN'. - */ - public static Counter FinalizeStreamCounter(String rpcStatus) { - return CreateRPCRequestCounter( - RpcMethod.FINALIZE_STREAM, rpcStatus, BigQuerySinkMetrics.UNKNOWN); + return new DelegatingCounter(metricName, false, true); } /** - * Creates an Histogram metric to record RPC latency. Metric will have name: + * Creates an Histogram metric to record RPC latency. Metric will have name.: * *

'RpcLatency-Method:{method};' * * @param method StorageWriteAPI method associated with this metric. * @return Histogram with exponential buckets with a sqrt(2) growth factor. */ - private static Histogram CreateRPCLatencyHistogram(RpcMethod method) { + private static Histogram createRPCLatencyHistogram(RpcMethod method) { NavigableMap metricLabels = new TreeMap(); metricLabels.put(RPC_METHOD, method.toString()); - String fullMetricName = CreateLabeledMetricName(RPC_LATENCY, metricLabels); + String fullMetricName = createLabeledMetricName(RPC_LATENCY, metricLabels); MetricName metricName = MetricName.named(METRICS_NAMESPACE, fullMetricName); HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 34); - return new DelegatingPerWorkerHistogram(metricName, buckets); + return new DelegatingHistogram(metricName, buckets, false, true); } /** - * Records the time between operationStartTime and OperationEndTime in a PerWorkerHistogram. + * Records an RPC operation's duration in a PerWorkerHistogram. * - * @param operationStartTime If null or in the future, this function is a no-op. - * @param OperationEndTime End time of operation. + * @param c Retry manager context, used to get the operation start and end time. * @param method StorageWriteAPI write method. */ - public static void UpdateRpcLatencyMetric( - @Nullable Instant operationStartTime, Instant operationEndTime, RpcMethod method) { + private static void updateRpcLatencyMetric(@Nonnull Context c, RpcMethod method) { + @Nullable Instant operationStartTime = c.getOperationStartTime(); + @Nullable Instant operationEndTime = c.getOperationEndTime(); if (operationStartTime == null || operationEndTime == null) { return; } long timeElapsed = java.time.Duration.between(operationStartTime, operationEndTime).toMillis(); if (timeElapsed > 0) { - BigQuerySinkMetrics.CreateRPCLatencyHistogram(method).update(timeElapsed); + BigQuerySinkMetrics.createRPCLatencyHistogram(method).update(timeElapsed); } } @@ -177,7 +161,7 @@ public static void UpdateRpcLatencyMetric( * 'supportsMetricsDeletion' is enabled. * @return Metric that tracks the status of BigQuery rows after making an AppendRows RPC call. */ - public static Counter AppendRowsRowStatusCounter( + public static Counter appendRowsRowStatusCounter( RowStatus rowStatus, String rpcStatus, String tableId) { NavigableMap metricLabels = new TreeMap(); metricLabels.put(RPC_STATUS_LABEL, rpcStatus); @@ -186,19 +170,22 @@ public static Counter AppendRowsRowStatusCounter( metricLabels.put(TABLE_ID_LABEL, tableId); } - String fullMetricName = CreateLabeledMetricName(APPEND_ROWS_ROW_STATUS, metricLabels); + String fullMetricName = createLabeledMetricName(APPEND_ROWS_ROW_STATUS, metricLabels); MetricName metricName = MetricName.named(METRICS_NAMESPACE, fullMetricName); - return new DelegatingPerWorkerCounter(metricName); + return new DelegatingCounter(metricName, false, true); } - /** Metric that tracks throttled time due between RPC retries. */ - public static Counter ThrottledTimeCounter(RpcMethod method) { + /** + * @param method StorageWriteAPI write method. + * @return Counter that tracks throttled time due to RPC retries. + */ + public static Counter throttledTimeCounter(RpcMethod method) { NavigableMap metricLabels = new TreeMap(); metricLabels.put(RPC_METHOD, method.toString()); - String fullMetricName = CreateLabeledMetricName(THROTTLED_TIME, metricLabels); + String fullMetricName = createLabeledMetricName(THROTTLED_TIME, metricLabels); MetricName metricName = MetricName.named(METRICS_NAMESPACE, fullMetricName); - return new DelegatingPerWorkerCounter(metricName); + return new DelegatingCounter(metricName, false, true); } /** @@ -207,13 +194,74 @@ public static Counter ThrottledTimeCounter(RpcMethod method) { * @param t Throwable. * @return gRPC status code string or 'UNKNOWN' if 't' is null or does not map to a gRPC error. */ - public static String ThrowableToGRPCCodeString(@Nullable Throwable t) { + public static String throwableToGRPCCodeString(@Nullable Throwable t) { if (t == null) { return BigQuerySinkMetrics.UNKNOWN; } return Status.fromThrowable(t).getCode().toString(); } + /** + * Records RpcRequests counter and RpcLatency histogram for this RPC call. If + * 'SupportMetricsDeletion' is enabled, RpcRequests counter will have tableId label set to {@code + * UNKNOWN}. RpcRequets counter will have RpcStatus label set to {@code OK}. + * + * @param c Context of successful RPC call. + * @param method StorageWriteAPI method associated with this metric. + */ + public static void reportSuccessfulRpcMetrics(@Nullable Context c, RpcMethod method) { + reportSuccessfulRpcMetrics(c, method, UNKNOWN); + } + + /** + * Records RpcRequests counter and RpcLatency histogram for this RPC call. RpcRequets counter will + * have RpcStatus label set to {@code OK}. + * + * @param c Context of successful RPC call. + * @param method StorageWriteAPI method associated with this metric. + * @param tableId Table pertaining to the write method. Only included in the metric key if + * 'supportsMetricsDeletion' is enabled. + */ + public static void reportSuccessfulRpcMetrics( + @Nullable Context c, RpcMethod method, String tableId) { + if (c == null) { + return; + } + createRPCRequestCounter(method, OK, tableId).inc(1); + updateRpcLatencyMetric(c, method); + } + + /** + * Records RpcRequests counter and RpcLatency histogram for this RPC call. If + * 'SupportMetricsDeletion' is enabled, RpcRequests counter will have tableId label set to {@code + * UNKNOWN}. RpcRequets counter will have a RpcStatus label set from the gRPC error. + * + * @param c Context of successful RPC call. + * @param method StorageWriteAPI method associated with this metric. + */ + public static void reportFailedRPCMetrics(@Nullable Context c, RpcMethod method) { + reportFailedRPCMetrics(c, method, UNKNOWN); + } + + /** + * Records RpcRequests counter and RpcLatency histogram for this RPC call. RpcRequets counter will + * have a RpcStatus label set from the gRPC error. + * + * @param c Context of successful RPC call. + * @param method StorageWriteAPI method associated with this metric. + * @param tableId Table pertaining to the write method. Only included in the metric key if + * 'supportsMetricsDeletion' is enabled. + */ + public static void reportFailedRPCMetrics( + @Nullable Context c, RpcMethod method, String tableId) { + if (c == null) { + return; + } + String statusCode = throwableToGRPCCodeString(c.getError()); + createRPCRequestCounter(method, statusCode, tableId).inc(1); + updateRpcLatencyMetric(c, method); + } + public static void setSupportMetricsDeletion(Boolean supportMetricsDeletion) { BigQuerySinkMetrics.supportMetricsDeletion = supportMetricsDeletion; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java index 0878f4541176..ae2eafa7a2b7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java @@ -104,6 +104,7 @@ static class Context { private @Nullable Throwable error = null; private @Nullable ResultT result = null; private @Nullable Instant operationStartTime = null; + private @Nullable Instant operationEndTime = null; public void setError(@Nullable Throwable error) { this.error = error; @@ -128,6 +129,14 @@ public void setOperationStartTime(@Nullable Instant operationStartTime) { public @Nullable Instant getOperationStartTime() { return operationStartTime; } + + public void setOperationEndTime(@Nullable Instant operationEndTime) { + this.operationEndTime = operationEndTime; + } + + public @Nullable Instant getOperationEndTime() { + return operationEndTime; + } } private final Function> runOperation; @@ -153,6 +162,7 @@ public Operation( void run(Executor executor) { this.context.setOperationStartTime(Instant.now()); + this.context.setOperationEndTime(null); this.future = runOperation.apply(context); this.callback = new Callback<>(hasSucceeded); ApiFutures.addCallback(future, callback, executor); @@ -170,6 +180,7 @@ private static class Callback implements ApiFutureCallback { private final Function hasSucceeded; @Nullable private Throwable failure = null; boolean failed = false; + @Nullable Instant operationEndTime = null; Callback(Function hasSucceeded) { this.waiter = new CountDownLatch(1); @@ -187,6 +198,7 @@ boolean await(long timeoutSec) throws InterruptedException { @Override public void onFailure(Throwable t) { synchronized (this) { + operationEndTime = Instant.now(); failure = t; failed = true; } @@ -196,6 +208,7 @@ public void onFailure(Throwable t) { @Override public void onSuccess(ResultT result) { synchronized (this) { + operationEndTime = Instant.now(); if (hasSucceeded.apply(result)) { failure = null; } else { @@ -218,6 +231,13 @@ boolean getFailed() { return failed; } } + + @Nullable + Instant getOperationEndTime() { + synchronized (this) { + return operationEndTime; + } + } } void addOperation( @@ -280,6 +300,10 @@ void await() throws Exception { while (!this.operations.isEmpty()) { Operation operation = this.operations.element(); boolean failed = operation.await(); + @Nullable Callback callback = operation.callback; + if (callback != null) { + operation.context.setOperationEndTime(callback.getOperationEndTime()); + } if (failed) { Throwable failure = Preconditions.checkStateNotNull(operation.callback).getFailure(); operation.context.setError(failure); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java index 831a8ff6a94b..853a63fca8cc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java @@ -22,7 +22,6 @@ import com.google.cloud.bigquery.storage.v1.StorageError; import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode; import java.io.IOException; -import java.time.Instant; import java.util.Collection; import java.util.Map; import java.util.Set; @@ -109,7 +108,7 @@ public void process(PipelineOptions pipelineOptions, @Element KV Duration.standardSeconds(1), Duration.standardMinutes(1), 3, - BigQuerySinkMetrics.ThrottledTimeCounter( + BigQuerySinkMetrics.throttledTimeCounter( BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM)); retryManager.addOperation( c -> { @@ -121,13 +120,8 @@ public void process(PipelineOptions pipelineOptions, @Element KV Preconditions.checkArgumentNotNull(Iterables.getFirst(contexts, null)); LOG.error("Finalize of stream " + streamId + " failed with " + firstContext.getError()); finalizeOperationsFailed.inc(); - String errorCode = BigQuerySinkMetrics.ThrowableToGRPCCodeString(firstContext.getError()); - BigQuerySinkMetrics.FinalizeStreamCounter(errorCode).inc(); - - @Nullable Instant operationStartTime = firstContext.getOperationStartTime(); - Instant operationEndTime = Instant.now(); - BigQuerySinkMetrics.UpdateRpcLatencyMetric( - operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM); + BigQuerySinkMetrics.reportFailedRPCMetrics( + firstContext, BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM); return RetryType.RETRY_ALL_OPERATIONS; }, @@ -140,11 +134,8 @@ public void process(PipelineOptions pipelineOptions, @Element KV rowsFinalized.inc(response.getRowCount()); finalizeOperationsSucceeded.inc(); - BigQuerySinkMetrics.FinalizeStreamCounter("ok").inc(); - @Nullable Instant operationStartTime = c.getOperationStartTime(); - Instant operationEndTime = Instant.now(); - BigQuerySinkMetrics.UpdateRpcLatencyMetric( - operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM); + BigQuerySinkMetrics.reportSuccessfulRpcMetrics( + c, BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM); commitStreams.computeIfAbsent(tableId, d -> Lists.newArrayList()).add(streamId); }, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java index 0cde0b7017b4..333a0c0b36bf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java @@ -146,7 +146,7 @@ public void process(PipelineOptions pipelineOptions, @Element KV { @@ -165,12 +165,8 @@ public void process(PipelineOptions pipelineOptions, @Element KV { - BigQuerySinkMetrics.FlushRowsCounter("ok").inc(); - if (c != null) { - BigQuerySinkMetrics.UpdateRpcLatencyMetric( - c.getOperationStartTime(), - Instant.now(), - BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS); - } + BigQuerySinkMetrics.reportSuccessfulRpcMetrics( + c, BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS); flushOperationsSucceeded.inc(); }, new Context<>()); @@ -221,18 +212,11 @@ public void process(PipelineOptions pipelineOptions, @Element KV { finalizeOperationsSent.inc(); - if (c != null) { - BigQuerySinkMetrics.UpdateRpcLatencyMetric( - c.getOperationStartTime(), - Instant.now(), - BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS); - } - BigQuerySinkMetrics.FinalizeStreamCounter("ok").inc(); return datasetService.finalizeWriteStream(streamId); }, @@ -246,17 +230,10 @@ public void process(PipelineOptions pipelineOptions, @Element KV firstContext = Iterables.getFirst(contexts, null); + BigQuerySinkMetrics.reportFailedRPCMetrics( + firstContext, BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM); @Nullable Throwable error = firstContext == null ? null : firstContext.getError(); - String errorCode = BigQuerySinkMetrics.ThrowableToGRPCCodeString(error); - BigQuerySinkMetrics.FinalizeStreamCounter(errorCode).inc(); - if (firstContext != null) { - @Nullable Instant operationStartTime = firstContext.getOperationStartTime(); - Instant operationEndTime = Instant.now(); - BigQuerySinkMetrics.UpdateRpcLatencyMetric( - operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS); - } - if (error instanceof ApiException) { Code statusCode = ((ApiException) error).getStatusCode().getCode(); if (statusCode.equals(Code.NOT_FOUND)) { @@ -266,6 +243,8 @@ public void process(PipelineOptions pipelineOptions, @Element KV { + BigQuerySinkMetrics.reportSuccessfulRpcMetrics( + r, BigQuerySinkMetrics.RpcMethod.FINALIZE_STREAM); finalizeOperationsSucceeded.inc(); }, new Context<>()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 587e1cd82c0b..444d87f62da6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -612,8 +612,10 @@ long flush( timestamp); } int numRowsFailed = inserts.getSerializedRowsCount(); - BigQuerySinkMetrics.AppendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.FAILED, "PayloadTooLarge", shortTableUrn) + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.FAILED, + BigQuerySinkMetrics.PAYLOAD_TOO_LARGE, + shortTableUrn) .inc(numRowsFailed); rowsSentToFailedRowsCollection.inc(numRowsFailed); return 0; @@ -657,13 +659,10 @@ long flush( contexts -> { AppendRowsContext failedContext = Preconditions.checkStateNotNull(Iterables.getFirst(contexts, null)); - Instant operationEndTime = Instant.now(); - @Nullable Instant operationStartTime = failedContext.getOperationStartTime(); - BigQuerySinkMetrics.UpdateRpcLatencyMetric( - operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS); + BigQuerySinkMetrics.reportFailedRPCMetrics( + failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableUrn); String errorCode = - BigQuerySinkMetrics.ThrowableToGRPCCodeString(failedContext.getError()); - BigQuerySinkMetrics.AppendRPCsCounter(errorCode, shortTableUrn).inc(); + BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError()); if (failedContext.getError() != null && failedContext.getError() instanceof Exceptions.AppendSerializtionError) { @@ -695,7 +694,7 @@ long flush( } int numRowsFailed = failedRowIndices.size(); rowsSentToFailedRowsCollection.inc(numRowsFailed); - BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.appendRowsRowStatusCounter( BigQuerySinkMetrics.RowStatus.FAILED, errorCode, shortTableUrn) .inc(numRowsFailed); @@ -713,7 +712,7 @@ long flush( failedContext.protoRows = retryRows.build(); failedContext.timestamps = retryTimestamps; int numRowsRetried = failedContext.protoRows.getSerializedRowsCount(); - BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.appendRowsRowStatusCounter( BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableUrn) .inc(numRowsRetried); @@ -756,8 +755,6 @@ long flush( // The following errors are known to be persistent, so always fail the work item in // this case. - Throwable error = Preconditions.checkStateNotNull(failedContext.getError()); - Status.Code statusCode = Status.fromThrowable(error).getCode(); if (statusCode.equals(Status.Code.OUT_OF_RANGE) || statusCode.equals(Status.Code.ALREADY_EXISTS)) { throw new RuntimeException( @@ -789,7 +786,7 @@ long flush( } int numRowsRetried = failedContext.protoRows.getSerializedRowsCount(); - BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.appendRowsRowStatusCounter( BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableUrn) .inc(numRowsRetried); @@ -797,17 +794,16 @@ long flush( return RetryType.RETRY_ALL_OPERATIONS; }, c -> { - BigQuerySinkMetrics.AppendRPCsCounter("ok", shortTableUrn).inc(); int numRecordsAppended = c.protoRows.getSerializedRowsCount(); recordsAppended.inc(numRecordsAppended); - BigQuerySinkMetrics.AppendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.SUCCESSFUL, "ok", shortTableUrn) + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.SUCCESSFUL, + BigQuerySinkMetrics.OK, + shortTableUrn) .inc(numRecordsAppended); - @Nullable Instant operationStartTime = c.getOperationStartTime(); - Instant operationEndTime = Instant.now(); - BigQuerySinkMetrics.UpdateRpcLatencyMetric( - operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS); + BigQuerySinkMetrics.reportSuccessfulRpcMetrics( + c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableUrn); if (successfulRowsReceiver != null) { for (int i = 0; i < c.protoRows.getSerializedRowsCount(); ++i) { @@ -949,7 +945,7 @@ void flushAll( Duration.standardSeconds(1), Duration.standardSeconds(10), 1000, - BigQuerySinkMetrics.ThrottledTimeCounter( + BigQuerySinkMetrics.throttledTimeCounter( BigQuerySinkMetrics.RpcMethod.APPEND_ROWS)); retryManagers.add(retryManager); numRowsWritten += diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 6e59e31719d6..d3042984638f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -546,8 +546,10 @@ public void process( new BigQueryStorageApiInsertError(failedRow.getValue(), errorMessage), failedRow.getTimestamp()); rowsSentToFailedRowsCollection.inc(); - BigQuerySinkMetrics.AppendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.FAILED, "PayloadTooLarge", shortTableId) + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.FAILED, + BigQuerySinkMetrics.PAYLOAD_TOO_LARGE, + shortTableId) .inc(1); }, autoUpdateSchema, @@ -632,10 +634,10 @@ public void process( // The first context is always the one that fails. AppendRowsContext failedContext = Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null)); - @Nullable Instant operationStartTime = failedContext.getOperationStartTime(); - Instant operationEndTime = Instant.now(); - BigQuerySinkMetrics.UpdateRpcLatencyMetric( - operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS); + BigQuerySinkMetrics.reportFailedRPCMetrics( + failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS); + String errorCode = + BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError()); // AppendSerializationError means that BigQuery detected errors on individual rows, e.g. // a row not conforming @@ -648,8 +650,6 @@ public void process( Preconditions.checkArgumentNotNull( (Exceptions.AppendSerializtionError) failedContext.getError()); - String errorCode = BigQuerySinkMetrics.ThrowableToGRPCCodeString(error); - BigQuerySinkMetrics.AppendRPCsCounter(errorCode, shortTableId).inc(); Set failedRowIndices = error.getRowIndexToErrorMessage().keySet(); for (int failedIndex : failedRowIndices) { // Convert the message to a TableRow and send it to the failedRows collection. @@ -664,7 +664,7 @@ public void process( } int failedRows = failedRowIndices.size(); rowsSentToFailedRowsCollection.inc(failedRows); - BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.appendRowsRowStatusCounter( BigQuerySinkMetrics.RowStatus.FAILED, errorCode, shortTableId) .inc(failedRows); @@ -682,7 +682,7 @@ public void process( failedContext.protoRows = retryRows.build(); failedContext.timestamps = timestamps; int retriedRows = failedContext.protoRows.getSerializedRowsCount(); - BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.appendRowsRowStatusCounter( BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId) .inc(retriedRows); @@ -698,8 +698,7 @@ public void process( Throwable error = Preconditions.checkStateNotNull(failedContext.getError()); Status.Code statusCode = Status.fromThrowable(error).getCode(); - String errorCode = statusCode.toString(); - BigQuerySinkMetrics.AppendRPCsCounter(errorCode, shortTableId).inc(); + // This means that the offset we have stored does not match the current end of // the stream in the Storage API. Usually this happens because a crash or a bundle // failure @@ -733,7 +732,7 @@ public void process( } appendFailures.inc(); int retriedRows = failedContext.protoRows.getSerializedRowsCount(); - BigQuerySinkMetrics.AppendRowsRowStatusCounter( + BigQuerySinkMetrics.appendRowsRowStatusCounter( BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId) .inc(retriedRows); @@ -782,14 +781,10 @@ public void process( false))); int flushedRows = context.protoRows.getSerializedRowsCount(); flushesScheduled.inc(flushedRows); - BigQuerySinkMetrics.AppendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.SUCCESSFUL, "ok", shortTableId) - .inc(flushedRows); - BigQuerySinkMetrics.AppendRPCsCounter("ok", shortTableId).inc(); - @Nullable Instant operationStartTime = context.getOperationStartTime(); - Instant operationEndTime = Instant.now(); - BigQuerySinkMetrics.UpdateRpcLatencyMetric( - operationStartTime, operationEndTime, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS); + BigQuerySinkMetrics.reportSuccessfulRpcMetrics( + context, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableId); + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.SUCCESSFUL, BigQuerySinkMetrics.OK, shortTableId); if (successfulRowsTag != null) { for (int i = 0; i < context.protoRows.getSerializedRowsCount(); ++i) { @@ -807,7 +802,7 @@ public void process( Duration.standardSeconds(1), Duration.standardSeconds(10), 1000, - BigQuerySinkMetrics.ThrottledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS)); + BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS)); int numAppends = 0; for (SplittingIterable.Value splitValue : messages) { // Handle the case of a row that is too large. @@ -834,8 +829,10 @@ public void process( } int numRowsFailed = splitValue.getProtoRows().getSerializedRowsCount(); rowsSentToFailedRowsCollection.inc(numRowsFailed); - BigQuerySinkMetrics.AppendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.FAILED, "PayloadTooLarge", shortTableId) + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.FAILED, + BigQuerySinkMetrics.PAYLOAD_TOO_LARGE, + shortTableId) .inc(numRowsFailed); } else { ++numAppends; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java index e5569c502b71..c937c6b299b2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java @@ -21,17 +21,23 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.Exceptions; import io.grpc.Status; import java.time.Instant; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.runners.core.metrics.CounterCell; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; +import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.HistogramData; +import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.hamcrest.collection.IsMapContaining; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -57,7 +63,12 @@ public MetricName getName() { public static class TestMetricsContainer extends MetricsContainerImpl { - public TestHistogram testHistogram = new TestHistogram(); + // public TestHistogram testHistogram = new TestHistogram(); + public ConcurrentHashMap, TestHistogram> + perWorkerHistograms = + new ConcurrentHashMap, TestHistogram>(); + public ConcurrentHashMap perWorkerCounters = + new ConcurrentHashMap(); public TestMetricsContainer() { super("TestStep"); @@ -66,104 +77,228 @@ public TestMetricsContainer() { @Override public Histogram getPerWorkerHistogram( MetricName metricName, HistogramData.BucketType bucketType) { - return testHistogram; + perWorkerHistograms.computeIfAbsent(KV.of(metricName, bucketType), kv -> new TestHistogram()); + return perWorkerHistograms.get(KV.of(metricName, bucketType)); + // return testHistogram; } - } - - @Test - public void testAppendRPCsCounter() throws Exception { - BigQuerySinkMetrics.setSupportMetricsDeletion(false); - Counter deletesEnabledCounter = BigQuerySinkMetrics.AppendRPCsCounter("rpcStatus", "tableId"); - assertThat( - deletesEnabledCounter.getName().getName(), - equalTo("RpcRequests-Method:APPEND_ROWS;Status:rpcStatus;")); - - BigQuerySinkMetrics.setSupportMetricsDeletion(true); - Counter deletesDisabledCounter = BigQuerySinkMetrics.AppendRPCsCounter("rpcStatus", "tableId"); - assertThat( - deletesDisabledCounter.getName().getName(), - equalTo("RpcRequests-Method:APPEND_ROWS;Status:rpcStatus;TableId:tableId;")); - } - @Test - public void testFlushRowsCounter() throws Exception { - BigQuerySinkMetrics.setSupportMetricsDeletion(false); - Counter deletesEnabledCounter = BigQuerySinkMetrics.FlushRowsCounter("rpcStatus"); - assertThat( - deletesEnabledCounter.getName().getName(), - equalTo("RpcRequests-Method:FLUSH_ROWS;Status:rpcStatus;")); + @Override + public Counter getPerWorkerCounter(MetricName metricName) { + perWorkerCounters.computeIfAbsent(metricName, name -> new CounterCell(name)); + return perWorkerCounters.get(metricName); + } - BigQuerySinkMetrics.setSupportMetricsDeletion(true); - Counter deletesDisabledCounter = BigQuerySinkMetrics.FlushRowsCounter("rpcStatus"); - assertThat( - deletesDisabledCounter.getName().getName(), - equalTo("RpcRequests-Method:FLUSH_ROWS;Status:rpcStatus;TableId:UNKNOWN;")); + @Override + public void reset() { + // testHistogram.values.clear(); + perWorkerHistograms.clear(); + perWorkerCounters.clear(); + } } @Test public void testAppendRowsRowStatusCounter() throws Exception { + // Setup + TestMetricsContainer testContainer = new TestMetricsContainer(); + MetricsEnvironment.setCurrentContainer(testContainer); + BigQuerySinkMetrics.setSupportMetricsDeletion(false); - Counter deletesEnabledCounter = - BigQuerySinkMetrics.AppendRowsRowStatusCounter( + Counter deletesDisabledCounter = + BigQuerySinkMetrics.appendRowsRowStatusCounter( BigQuerySinkMetrics.RowStatus.SUCCESSFUL, "rpcStatus", "tableId"); + deletesDisabledCounter.inc(); + MetricName deletesDisabledCounterName = + MetricName.named( + "BigQuerySink", "AppendRowsRowStatus-RowStatus:SUCCESSFUL;RpcStatus:rpcStatus;"); + assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(deletesDisabledCounterName)); assertThat( - deletesEnabledCounter.getName().getName(), - equalTo("AppendRowsRowStatus-RowStatus:SUCCESSFUL;Status:rpcStatus;")); + testContainer.perWorkerCounters.get(deletesDisabledCounterName).getCumulative(), + equalTo(1L)); BigQuerySinkMetrics.setSupportMetricsDeletion(true); - Counter deletesDisabledCounter = - BigQuerySinkMetrics.AppendRowsRowStatusCounter( + testContainer.reset(); + Counter deletesEnabledCounter = + BigQuerySinkMetrics.appendRowsRowStatusCounter( BigQuerySinkMetrics.RowStatus.SUCCESSFUL, "rpcStatus", "tableId"); + deletesEnabledCounter.inc(); + MetricName deletesEnabledCounterName = + MetricName.named( + "BigQuerySink", + "AppendRowsRowStatus-RowStatus:SUCCESSFUL;RpcStatus:rpcStatus;TableId:tableId;"); + assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(deletesEnabledCounterName)); assertThat( - deletesDisabledCounter.getName().getName(), - equalTo("AppendRowsRowStatus-RowStatus:SUCCESSFUL;Status:rpcStatus;TableId:tableId;")); + testContainer.perWorkerCounters.get(deletesEnabledCounterName).getCumulative(), + equalTo(1L)); } @Test public void testThrowableToGRPCCodeString() throws Exception { Throwable nullThrowable = null; - assertThat(BigQuerySinkMetrics.ThrowableToGRPCCodeString(nullThrowable), equalTo("UNKNOWN")); + assertThat(BigQuerySinkMetrics.throwableToGRPCCodeString(nullThrowable), equalTo("UNKNOWN")); Throwable nonGrpcError = new IndexOutOfBoundsException("Test Error"); - assertThat(BigQuerySinkMetrics.ThrowableToGRPCCodeString(nonGrpcError), equalTo("UNKNOWN")); + assertThat(BigQuerySinkMetrics.throwableToGRPCCodeString(nonGrpcError), equalTo("UNKNOWN")); - int not_found_val = Status.Code.NOT_FOUND.value(); + int notFoundVal = Status.Code.NOT_FOUND.value(); Throwable grpcError = - new Exceptions.AppendSerializtionError(not_found_val, "Test Error", "Stream name", null); - assertThat(BigQuerySinkMetrics.ThrowableToGRPCCodeString(grpcError), equalTo("NOT_FOUND")); + new Exceptions.AppendSerializtionError(notFoundVal, "Test Error", "Stream name", null); + assertThat(BigQuerySinkMetrics.throwableToGRPCCodeString(grpcError), equalTo("NOT_FOUND")); } @Test public void testThrottledTimeCounter() throws Exception { + // Setup + TestMetricsContainer testContainer = new TestMetricsContainer(); + MetricsEnvironment.setCurrentContainer(testContainer); + + // Test throttleCounter metric. Counter appendRowsThrottleCounter = - BigQuerySinkMetrics.ThrottledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS); + BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS); + appendRowsThrottleCounter.inc(1); assertThat( appendRowsThrottleCounter.getName().getName(), equalTo("ThrottledTime-Method:APPEND_ROWS;")); + MetricName counterName = MetricName.named("BigQuerySink", "ThrottledTime-Method:APPEND_ROWS;"); + assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(counterName)); + assertThat(testContainer.perWorkerCounters.get(counterName).getCumulative(), equalTo(1L)); + } + + @Test + public void testReportSuccessfulRpcMetrics() throws Exception { + // Setup + TestMetricsContainer testContainer = new TestMetricsContainer(); + MetricsEnvironment.setCurrentContainer(testContainer); + Context c = new Context(); + Instant t1 = Instant.now(); + c.setOperationStartTime(t1); + c.setOperationEndTime(t1.plusMillis(3)); + + // Test disabled SupportMetricsDeletion + BigQuerySinkMetrics.setSupportMetricsDeletion(false); + BigQuerySinkMetrics.reportSuccessfulRpcMetrics( + c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId"); + MetricName counterNameDisabledDeletes = + MetricName.named("BigQuerySink", "RpcRequests-Method:APPEND_ROWS;RpcStatus:OK;"); + MetricName histogramName = MetricName.named("BigQuerySink", "RpcLatency-Method:APPEND_ROWS;"); + HistogramData.BucketType bucketType = HistogramData.ExponentialBuckets.of(1, 34); + assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(counterNameDisabledDeletes)); + assertThat( + testContainer.perWorkerCounters.get(counterNameDisabledDeletes).getCumulative(), + equalTo(1L)); + assertThat( + testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values, + containsInAnyOrder(Double.valueOf(3.0))); + + // Test enable SupportMetricsDeletion. + BigQuerySinkMetrics.setSupportMetricsDeletion(true); + testContainer.reset(); + BigQuerySinkMetrics.reportSuccessfulRpcMetrics( + c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId"); + MetricName counterNameEnabledDeletes = + MetricName.named( + "BigQuerySink", "RpcRequests-Method:APPEND_ROWS;RpcStatus:OK;TableId:tableId;"); + assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(counterNameEnabledDeletes)); + assertThat( + testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(), + equalTo(1L)); + assertThat( + testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values, + containsInAnyOrder(Double.valueOf(3.0))); } @Test - public void testUpdateRpcLatencyMetric() throws Exception { + public void testReportFailedRPCMetrics_KnownGrpcError() throws Exception { + // Setup TestMetricsContainer testContainer = new TestMetricsContainer(); MetricsEnvironment.setCurrentContainer(testContainer); - BigQuerySinkMetrics.RpcMethod append = BigQuerySinkMetrics.RpcMethod.APPEND_ROWS; + Context c = new Context(); Instant t1 = Instant.now(); + c.setOperationStartTime(t1); + c.setOperationEndTime(t1.plusMillis(5)); + int notFoundVal = Status.Code.NOT_FOUND.value(); + Throwable grpcError = + new Exceptions.AppendSerializtionError(notFoundVal, "Test Error", "Stream name", null); + c.setError(grpcError); + + // Test disabled SupportMetricsDeletion + BigQuerySinkMetrics.setSupportMetricsDeletion(false); + BigQuerySinkMetrics.reportFailedRPCMetrics( + c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId"); + MetricName counterNameDisabledDeletes = + MetricName.named("BigQuerySink", "RpcRequests-Method:APPEND_ROWS;RpcStatus:NOT_FOUND;"); + MetricName histogramName = MetricName.named("BigQuerySink", "RpcLatency-Method:APPEND_ROWS;"); + HistogramData.BucketType bucketType = HistogramData.ExponentialBuckets.of(1, 34); + assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(counterNameDisabledDeletes)); + assertThat( + testContainer.perWorkerCounters.get(counterNameDisabledDeletes).getCumulative(), + equalTo(1L)); + assertThat( + testContainer.perWorkerHistograms, + IsMapContaining.hasKey(KV.of(histogramName, bucketType))); + assertThat( + testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values, + containsInAnyOrder(Double.valueOf(5.0))); - // Expect no updates to the histogram when we pass a null instant. - BigQuerySinkMetrics.UpdateRpcLatencyMetric(null, t1, append); - BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1, null, append); - assertThat(testContainer.testHistogram.values.size(), equalTo(0)); + // Test enable SupportMetricsDeletion + BigQuerySinkMetrics.setSupportMetricsDeletion(true); + testContainer.reset(); + BigQuerySinkMetrics.reportFailedRPCMetrics( + c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId"); + MetricName counterNameEnabledDeletes = + MetricName.named( + "BigQuerySink", "RpcRequests-Method:APPEND_ROWS;RpcStatus:NOT_FOUND;TableId:tableId;"); + assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(counterNameEnabledDeletes)); + assertThat( + testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(), + equalTo(1L)); + assertThat( + testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values, + containsInAnyOrder(Double.valueOf(5.0))); + } + + @Test + public void testReportFailedRPCMetrics_UnknownGrpcError() throws Exception { + // Setup + BigQuerySinkMetrics.setSupportMetricsDeletion(false); + TestMetricsContainer testContainer = new TestMetricsContainer(); + MetricsEnvironment.setCurrentContainer(testContainer); + Context c = new Context(); + Instant t1 = Instant.now(); + c.setOperationStartTime(t1); + c.setOperationEndTime(t1.plusMillis(15)); + Throwable nonGrpcError = new IndexOutOfBoundsException("Test Error"); + c.setError(nonGrpcError); - // Expect no updates when end time is before start time. - BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1, t1.minusMillis(5), append); - assertThat(testContainer.testHistogram.values.size(), equalTo(0)); + // Test disabled SupportMetricsDeletion + BigQuerySinkMetrics.setSupportMetricsDeletion(false); + BigQuerySinkMetrics.reportFailedRPCMetrics( + c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId"); + MetricName counterNameDisabledDeletes = + MetricName.named("BigQuerySink", "RpcRequests-Method:APPEND_ROWS;RpcStatus:UNKNOWN;"); + MetricName histogramName = MetricName.named("BigQuerySink", "RpcLatency-Method:APPEND_ROWS;"); + HistogramData.BucketType bucketType = HistogramData.ExponentialBuckets.of(1, 34); + assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(counterNameDisabledDeletes)); + assertThat( + testContainer.perWorkerCounters.get(counterNameDisabledDeletes).getCumulative(), + equalTo(1L)); + assertThat( + testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values, + containsInAnyOrder(Double.valueOf(15.0))); - // Expect valid updates to be recorded in the underlying histogram. - BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1.minusMillis(5), t1, append); - BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1.minusMillis(10), t1, append); - BigQuerySinkMetrics.UpdateRpcLatencyMetric(t1.minusMillis(15), t1, append); + // Test enable SupportMetricsDeletion + BigQuerySinkMetrics.setSupportMetricsDeletion(true); + testContainer.reset(); + BigQuerySinkMetrics.reportFailedRPCMetrics( + c, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, "tableId"); + MetricName counterNameEnabledDeletes = + MetricName.named( + "BigQuerySink", "RpcRequests-Method:APPEND_ROWS;RpcStatus:UNKNOWN;TableId:tableId;"); + assertThat(testContainer.perWorkerCounters, IsMapContaining.hasKey(counterNameEnabledDeletes)); + assertThat( + testContainer.perWorkerCounters.get(counterNameEnabledDeletes).getCumulative(), + equalTo(1L)); assertThat( - testContainer.testHistogram.values, - containsInAnyOrder(Double.valueOf(5.0), Double.valueOf(10.0), Double.valueOf(15.0))); + testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values, + containsInAnyOrder(Double.valueOf(15.0))); } } From 08475bc124d5e0670578322e7929525de4d6c977 Mon Sep 17 00:00:00 2001 From: Jayaj Poudel Date: Wed, 8 Nov 2023 19:51:32 +0000 Subject: [PATCH 4/4] Spotless --- .../beam/sdk/metrics/DelegatingCounter.java | 16 +++++++++---- .../beam/sdk/metrics/DelegatingHistogram.java | 23 +++++++++++-------- .../io/gcp/bigquery/BigQuerySinkMetrics.java | 4 ++-- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingCounter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingCounter.java index ece0c70348c4..a0b2e3b34678 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingCounter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingCounter.java @@ -28,7 +28,9 @@ public class DelegatingCounter implements Metric, Counter, Serializable { private final boolean perWorkerCounter; /** - * Create a {@code DelegatingCounter} with {@code perWorkerCounter} and {@code processWideContainer} set to false. + * Create a {@code DelegatingCounter} with {@code perWorkerCounter} and {@code + * processWideContainer} set to false. + * * @param name Metric name for this metric. */ public DelegatingCounter(MetricName name) { @@ -37,19 +39,23 @@ public DelegatingCounter(MetricName name) { /** * Create a {@code DelegatingCounter} with {@code perWorkerCounter} set to false. + * * @param name Metric name for this metric. - * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the current thread's container. + * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the + * current thread's container. */ public DelegatingCounter(MetricName name, boolean processWideContainer) { - this(name, processWideContainer, false); + this(name, processWideContainer, false); } /** * @param name Metric name for this metric. - * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the current thread's container. + * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the + * current thread's container. * @param perWorkerCounter Whether this Counter refers to a perWorker metric or not. */ - public DelegatingCounter(MetricName name, boolean processWideContainer, boolean perWorkerCounter) { + public DelegatingCounter( + MetricName name, boolean processWideContainer, boolean perWorkerCounter) { this.name = name; this.processWideContainer = processWideContainer; this.perWorkerCounter = perWorkerCounter; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java index b5c458836444..b877db69b0bf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java @@ -31,29 +31,34 @@ public class DelegatingHistogram implements Metric, Histogram, Serializable { /** * Create a {@code DelegatingHistogram} with {@code perWorkerHistogram} set to false. + * * @param name Metric name for this metric. * @param bucketType Histogram bucketing strategy. - * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the current thread's container. + * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the + * current thread's container. */ public DelegatingHistogram( MetricName name, HistogramData.BucketType bucketType, boolean processWideContainer) { this(name, bucketType, processWideContainer, false); } - /** * @param name Metric name for this metric. * @param bucketType Histogram bucketing strategy. - * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the current thread's container. + * @param processWideContainer Whether this Counter is stored in the ProcessWide container or the + * current thread's container. * @param perWorkerHistogram Whether this Histogram refers to a perWorker metric or not. */ public DelegatingHistogram( - MetricName name, HistogramData.BucketType bucketType, boolean processWideContainer, boolean perWorkerHistogram) { - this.name = name; - this.bucketType = bucketType; - this.processWideContainer = processWideContainer; - this.perWorkerHistogram = perWorkerHistogram; - } + MetricName name, + HistogramData.BucketType bucketType, + boolean processWideContainer, + boolean perWorkerHistogram) { + this.name = name; + this.bucketType = bucketType; + this.processWideContainer = processWideContainer; + this.perWorkerHistogram = perWorkerHistogram; + } @Override public void update(double value) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java index f8984c62c911..24323fce6895 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java @@ -80,7 +80,7 @@ enum RowStatus { private static final char METRIC_NAME_DELIMITER = '-'; /** - * Returns a metric name that merges the baseName with metricLables formatted as.: + * Returns a metric name that merges the baseName with metricLables formatted as. * *

'{baseName}-{metricLabelKey1}:{metricLabelVal1};...{metricLabelKeyN}:{metricLabelValN};' */ @@ -118,7 +118,7 @@ private static Counter createRPCRequestCounter( } /** - * Creates an Histogram metric to record RPC latency. Metric will have name.: + * Creates an Histogram metric to record RPC latency. Metric will have name. * *

'RpcLatency-Method:{method};' *